Home Flink NoWindowJoin原理
Post
Cancel

Flink NoWindowJoin原理

示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
object StreamTableExample {

  // *************************************************************************
  //     PROGRAM
  // *************************************************************************

  def main(args: Array[String]): Unit = {

    // set up execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = StreamTableEnvironment.create(env)

    val orderA = env.fromCollection(Seq(
      OrderA(1L, "beer", 3),
      OrderA(1L, "diaper", 4),
      OrderA(3L, "rubber", 2)))
      //.toTable(tEnv,'aId,'product1,'amount1)

    val orderB = env.fromCollection(Seq(
      OrderB(1L, "pen", 3),
      OrderB(2L, "rubber", 3),
      OrderB(4L, "beer", 1)))
      //.toTable(tEnv,'bId,'product2,'amount2)

    tEnv.registerDataStream("orderA",orderA)
    tEnv.registerDataStream("orderB",orderB)
    val result = tEnv.sqlQuery("select orderA.aId as aId,orderB.bId as bId, product1, amount1 from orderA  Inner join orderB on orderA.aId = orderB.bId" )
//    tEnv.registerDataStream("retTable",result.toRetractStream[OrderRet]);
//    // union the two tables
//    val result: DataStream[OrderRet] = orderA.join(orderB).where('aId === 'bId)
//      .select( 'aId,'bId,'product1, 'amount1)
//     // .where('amount > 2)
//      .toAppendStream[OrderRet]
//
//    tEnv.sqlQuery("select aId,bId,product1,amount1 from retTable where amount1>3")
    println(tEnv.explain(result))
    result.toRetractStream[OrderRet].print()

    env.execute()
  }

  // *************************************************************************
  //     USER DATA TYPES
  // *************************************************************************

  case class OrderA(aId: Long, product1: String, amount1: Int)

  case class OrderB(bId: Long, product2: String, amount2: Int)

  case class OrderRet(aId: Long,bId: Long, product1: String, amount1: Int)

}

sql语句:

1
select orderA.aId as aId,orderB.bId as bId, product1, amount1 from orderA  Inner join orderB on orderA.aId = orderB.bId

执行计划:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
== Abstract Syntax Tree ==
LogicalProject(aId=[$0], bId=[$3], product1=[$1], amount1=[$2])
  LogicalJoin(condition=[=($0, $3)], joinType=[inner])
    LogicalTableScan(table=[[default_catalog, default_database, orderA]])
    LogicalTableScan(table=[[default_catalog, default_database, orderB]])

== Optimized Logical Plan ==
DataStreamCalc(select=[aId, bId, product1, amount1])
  DataStreamJoin(where=[=(aId, bId)], join=[aId, product1, amount1, bId], joinType=[InnerJoin])
    DataStreamScan(id=[1], fields=[aId, product1, amount1])
    DataStreamCalc(select=[bId])
      DataStreamScan(id=[2], fields=[bId, product2, amount2])

== Physical Execution Plan ==
Stage 1 : Data Source
	content : collect elements with CollectionInputFormat

Stage 2 : Data Source
	content : collect elements with CollectionInputFormat

	Stage 3 : Operator
		content : from: (aId, product1, amount1)
		ship_strategy : REBALANCE

		Stage 4 : Operator
			content : from: (bId, product2, amount2)
			ship_strategy : REBALANCE

			Stage 5 : Operator
				content : select: (bId)
				ship_strategy : FORWARD

				Stage 8 : Operator
					content : InnerJoin(where: (=(aId, bId)), join: (aId, product1, amount1, bId))
					ship_strategy : HASH

					Stage 9 : Operator
						content : select: (aId, bId, product1, amount1)
						ship_strategy : FORWARD

这里debug可以看下Optimized Logical Plan
No_Window_Join

看下DataStreamJoin的translateToPlan DataStreamJoin——translateToPlan
其中比较重要的还是joinOperator,这里是LegacyKeyedCoProcessOperator, 顺便看下LegacyKeyedCoProcessOperator
DataStreamJoin——translateToPlan
LegacyKeyedCoProcessOperator实现TwoInputStreamOperator接口,实现了 processElement1(StreamRecord var1)和processElement2(StreamRecord var1),

1
2
3
4
5
6
7
8
9
10
11
12
13
public void processElement1(StreamRecord<IN1> element) throws Exception {
	this.collector.setTimestamp(element);
	this.context.element = element;
	((CoProcessFunction)this.userFunction).processElement1(element.getValue(), this.context, this.collector);
	this.context.element = null;
}

public void processElement2(StreamRecord<IN2> element) throws Exception {
	this.collector.setTimestamp(element);
	this.context.element = element;
	((CoProcessFunction)this.userFunction).processElement2(element.getValue(), this.context, this.collector);
	this.context.element = null;
}

这里的userFunction可以是NonWindowLeftRightJoin,NonWindowInnerJoin,NonWindowLeftRightJoinWithNonEquiPredicates,NonWindowFullJoin等, 具体可以看DataStreamJoinToCoProcessTranslator的createJoinOperator方法,依据不同的join类型生成不同的userFunction
直接看NonWindowLeftRightJoin的processElement

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
override def processElement(
	value: CRow,
	ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
	out: Collector[CRow],
	currentSideState: MapState[Row, JTuple2[Long, Long]],
	otherSideState: MapState[Row, JTuple2[Long, Long]],
	recordFromLeft: Boolean): Unit = {

	val inputRow = value.row
	updateCurrentSide(value, ctx, currentSideState)

	cRowWrapper.reset()
	cRowWrapper.setCollector(out)
	cRowWrapper.setChange(value.change)

	// join other side data
	if (recordFromLeft == isLeftJoin) {
	preservedJoin(inputRow, recordFromLeft, otherSideState)
	} else {
	retractJoin(value, recordFromLeft, currentSideState, otherSideState)
	}
}

preservedJoin之后会调用到callJoinFunction

1
2
3
4
5
6
7
8
9
10
11
12
protected def callJoinFunction(
		inputRow: Row,
		inputRowFromLeft: Boolean,
		otherSideRow: Row,
		cRowWrapper: Collector[Row]): Unit = {

		if (inputRowFromLeft) {
		joinFunction.join(inputRow, otherSideRow, cRowWrapper)
		} else {
		joinFunction.join(otherSideRow, inputRow, cRowWrapper)
		}
}

这里的joinFunction是通过gencode生成的,将之前LegacyKeyedCoProcessOperator里的genJoinFuncCode,copy出来大概是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
public class DataStreamJoinRule$25 extends org.apache.flink.api.common.functions.RichFlatJoinFunction {

  
  final org.apache.flink.types.Row out =
      new org.apache.flink.types.Row(4);
  
  

  public DataStreamJoinRule$25() throws Exception {
    
    
  }

  
  

  @Override
  public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
    
    
  }

  @Override
  public void join(Object _in1, Object _in2, org.apache.flink.util.Collector c) throws Exception {
    org.apache.flink.types.Row in1 = (org.apache.flink.types.Row) _in1;
    org.apache.flink.types.Row in2 = (org.apache.flink.types.Row) _in2;
    
    boolean isNull$22 = (java.lang.Integer) in1.getField(2) == null;
    int result$21;
    if (isNull$22) {
      result$21 = -1;
    }
    else {
      result$21 = (java.lang.Integer) in1.getField(2);
    }
    
    
    boolean isNull$18 = (java.lang.Long) in1.getField(0) == null;
    long result$17;
    if (isNull$18) {
      result$17 = -1L;
    }
    else {
      result$17 = (java.lang.Long) in1.getField(0);
    }
    
    
    boolean isNull$24 = (java.lang.Long) in2.getField(0) == null;
    long result$23;
    if (isNull$24) {
      result$23 = -1L;
    }
    else {
      result$23 = (java.lang.Long) in2.getField(0);
    }
    
    
    boolean isNull$20 = (java.lang.String) in1.getField(1) == null;
    java.lang.String result$19;
    if (isNull$20) {
      result$19 = "";
    }
    else {
      result$19 = (java.lang.String) (java.lang.String) in1.getField(1);
    }
    
    
    
    
    
    
    
    if (isNull$18) {
      out.setField(0, null);
    }
    else {
      out.setField(0, result$17);
    }
    
    
    
    if (isNull$20) {
      out.setField(1, null);
    }
    else {
      out.setField(1, result$19);
    }
    
    
    
    if (isNull$22) {
      out.setField(2, null);
    }
    else {
      out.setField(2, result$21);
    }
    
    
    
    if (isNull$24) {
      out.setField(3, null);
    }
    else {
      out.setField(3, result$23);
    }
    
    c.collect(out);
    
  }

  @Override
  public void close() throws Exception {
    
    
  }
}

这是一个RichFlatJoinFunction函数,主要是将两个流中的需要的字段提取生成一个Row?

在经过sql解析生成AST以及各种逻辑/物理计划优化后最后调用toDataStream/toRetractStream

这里会包装一串transformations,于StreamGraph生成有关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
  private def toDataStream[T](
      table: Table,
      modifyOperation: OutputConversionModifyOperation)
    : DataStream[T] = {
    val transformations = planner
      .translate(Collections.singletonList(modifyOperation))
    val streamTransformation: Transformation[T] = getTransformation(
      table,
      transformations)
    scalaExecutionEnvironment.getWrappedStreamExecutionEnvironment.addOperator(streamTransformation)
    new DataStream[T](new JDataStream[T](
      scalaExecutionEnvironment
        .getWrappedStreamExecutionEnvironment, streamTransformation))
  }

这个addOperator添加到transformations

1
2
3
4
public void addOperator(Transformation<?> transformation) {
    Preconditions.checkNotNull(transformation, "transformation must not be null.");
    this.transformations.add(transformation);
}

transformations在后续会用来生成StreamGraph

1
2
3
4
5
6
7
8
9
10
11
   public StreamGraph getStreamGraph(String jobName) {
        return this.getStreamGraphGenerator().setJobName(jobName).generate();
    }

    private StreamGraphGenerator getStreamGraphGenerator() {
        if (this.transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        } else {
            return (new StreamGraphGenerator(this.transformations, this.config, this.checkpointCfg)).setStateBackend(this.defaultStateBackend).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout);
        }
    }

这里展示了示例sql的transformations:
transformation1
transformation2

This post is licensed under CC BY 4.0 by the author.