data1:
(A,L-7,2019-10-06 20:41:00.001)
(B,L-7,2019-10-06 20:41:05.000)
(C,L-7,2019-10-29 16:12:57.999)
(D,L-7,2019-10-29 16:18:31.001)
data2:
(A,R-2,2019-10-06 20:41:00.000)
(A,R-2,2019-10-06 20:41:03.000)
(A,R-2,2019-10-06 20:41:04.000)
(B,R-2,2019-10-06 20:41:05.000)
(C,R-2,2019-10-29 16:12:58.000)
(D,R-2,2019-10-29 16:18:31.000)
参照JoinITCase.scala中的testRowTimeInnerJoinWithWindowAggregateOnSecondTime写测试代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.types.Row
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.api.scala._
import scala.collection.mutable
object TumbleTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
// env.setStateBackend(getStateBackend)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// StreamITCase.clear
val data1 = new mutable.MutableList[(String, String, Long)]
data1.+=(("A", "L-7", 1570365660001L))
data1.+=(("B", "L-7", 1570365665000L))
data1.+=(("C", "L-7", 1572336777999L)) // no joining record
data1.+=(("D", "L-7", 1572337111001L))
// data1.+=(("D", "L-7", 1510365661000L))
// data1.+=(("D", "L-7", 1510365665000L))
val data2 = new mutable.MutableList[(String, String, Long)]
// data2.+=(("A", "R-1", 7000L)) // 3 joining records
// data2.+=(("B", "R-4", 7000L)) // 1 joining records
// data2.+=(("A", "R-3", 8000L)) // 3 joining records
// data2.+=(("D", "R-2", 8000L)) // no joining record
data2.+=(("A", "R-2", 1570365660000L)) // no joining record
data2.+=(("A", "R-2", 1570365663000L)) // no joining record
data2.+=(("A", "R-2", 1570365664000L)) // no joining record
data2.+=(("B", "R-2", 1570365665000L)) // no joining record1572336778
data2.+=(("C", "R-2", 1572336778000L)) // no joining record
data2.+=(("D", "R-2", 1572337111000L))
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
println("data1:")
data1.map(x =>{
val date:Date = new Date(x._3)
println((x._1,x._2,format.format(date)))
})
println("data2:")
data2.map(x =>{
val date:Date = new Date(x._3)
println((x._1,x._2,format.format(date)))
})
val t1 = env.fromCollection(data1)
.assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
.toTable(tEnv, 'key, 'id, 'rt.rowtime)
val t2 = env.fromCollection(data2)
.assignTimestampsAndWatermarks(new Row3WatermarkExtractor2)
.toTable(tEnv, 'key, 'id, 'rt.rowtime)
tEnv.registerTable("T1", t1)
tEnv.registerTable("T2", t2)
val sqlQuery =
"""
|SELECT t2.key,TUMBLE_START(t2.rt, INTERVAL '4' SECOND),TUMBLE_END(t2.rt, INTERVAL '4' SECOND), COUNT(t1.key)
|FROM T1 AS t1 join T2 AS t2 ON
| t1.key = t2.key AND
| t1.rt BETWEEN t2.rt - INTERVAL '3' SECOND AND
| t2.rt + INTERVAL '3' SECOND
|GROUP BY TUMBLE(t2.rt, INTERVAL '4' SECOND),t2.key
|""".stripMargin
val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
result.print()
env.execute("")
}
}
private class Row3WatermarkExtractor2
extends AssignerWithPunctuatedWatermarks[(String, String, Long)] {
override def checkAndGetNextWatermark(
lastElement: (String, String, Long),
extractedTimestamp: Long): Watermark = {
new Watermark(extractedTimestamp - 1)
}
override def extractTimestamp(
element: (String, String, Long),
previousElementTimestamp: Long): Long = {
element._3
}
}
运行结果:
1
2
3
4
6> A,2019-10-06 12:41:00.0,2019-10-06 12:41:04.0,2
2> C,2019-10-29 08:12:56.0,2019-10-29 08:13:00.0,1
3> D,2019-10-29 08:18:28.0,2019-10-29 08:18:32.0,1
4> B,2019-10-06 12:41:04.0,2019-10-06 12:41:08.0,1