Home Flink Join说明
Post
Cancel

Flink Join说明

Unbounded JOIN

Regula/Equi-join

Regularjoin,没有time相关条件?

1
2
3
4
5
6
7
8
9
10
SELECT *
FROM Orders INNER JOIN Product ON Orders.productId = Product.id
SELECT *
FROM Orders LEFT JOIN Product ON Orders.productId = Product.id

SELECT *
FROM Orders RIGHT JOIN Product ON Orders.productId = Product.id

SELECT *
FROM Orders FULL OUTER JOIN Product ON Orders.productId = Product.id

bounded JOIN

Time-windowed Join

也算是Regular join的子集 A time-windowed join requires at least one equi-join predicate and a join condition that bounds the time on both sides

. ltime = rtime
. ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
. ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND

1
2
3
4
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

Flink Sql Join

Window Join&Interval Join

Flink执行计划都是DataStreamWindowJoin?

Window Join

1
2
3
4
5
6
7
8
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream.join(greenStream)
    .where(elem => /* select key */)
    .equalTo(elem => /* select key */)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply { (e1, e2) => e1 + "," + e2 }

window join sql?

1
2
3
4
5
6
SELECT t2.key,TUMBLE_START(t2.rt, INTERVAL '4' SECOND),TUMBLE_END(t2.rt, INTERVAL '4' SECOND), COUNT(t1.key)
FROM T1 AS t1 join T2 AS t2 ON
  t1.key = t2.key AND
  t1.rt BETWEEN t2.rt - INTERVAL '3' SECOND AND
    t2.rt + INTERVAL '3' SECOND
GROUP BY TUMBLE(t2.rt, INTERVAL '4' SECOND),t2.key

Interval Join

. b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound]
. or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

1
2
3
4
5
6
7
8
9
10
11
12
13
val orangeStream: DataStream[Integer] = ...
val greenStream: DataStream[Integer] = ...

orangeStream
    .keyBy(elem => /* select key */)
    .intervalJoin(greenStream.keyBy(elem => /* select key */))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction[Integer, Integer, String] {
        override def processElement(left: Integer, right: Integer, ctx: ProcessJoinFunction[Integer, Integer, String]#Context, out: Collector[String]): Unit = {
         out.collect(left + "," + right); 
        }
      });
    });

Window Join&Interval Join

This post is licensed under CC BY 4.0 by the author.