Home FlinkSQL源码分析
Post
Cancel

FlinkSQL源码分析

基于flink-1.8, 1.9包含了blink的代码

flink SQL 示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    public static void main(String[] args) throws Exception {

        // check parameter
        if (args.length != 1) {
            System.err.println("Please provide the path to the taxi rides file as a parameter");
        }
        String inputPath = args[0];

        // create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // configure event-time and watermarks
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000L);

        // create table environment
        StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
        // register user-defined function
        tEnv.registerFunction("toCellId", new GeoUtils.ToCellId());

        // get taxi ride event stream
        DataStream<TaxiRide> rides = TaxiRides.getRides(env, inputPath);
        // register taxi ride event stream as table "Rides"
        tEnv.registerDataStream(
            "Rides",
            rides,
            "medallion, licenseId, pickUpTime, dropOffTime.rowtime, " +
                "pickUpLon, pickUpLat, dropOffLon, dropOffLat, total");

        // define SQL query to compute average total per area and hour of day.
        Table result = tEnv.sqlQuery(
            "SELECT " +
            "  toCellId(dropOffLon, dropOffLat) AS area, " +
            "  EXTRACT(HOUR FROM dropOffTime) AS hourOfDay, " +
            "  AVG(total) AS avgTotal " +
            "FROM Rides " +
            "GROUP BY " +
            "  toCellId(dropOffLon, dropOffLat), " +
            "  EXTRACT(HOUR FROM dropOffTime)"
        );

        // convert result table into a retraction stream and print it
        tEnv.toRetractStream(result, Row.class)
                .print();

        // execute the query
        env.execute();
    }

1,解析sql语句,转换成AST语法树,这里用SqlNode表示
2,验证SQL,结合Catalog,验证/检查sql语法等
3,生成逻辑执行(Logical Plan)计划,将sqlNode表示的AST转换成LogicalPlan,flink里边用RelNode表示,但是这个返回的是一个RelRoot,里边封装了RelNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
  def sqlQuery(query: String): Table = {
    val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
    // parse the sql query
    val parsed = planner.parse(query)
    if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
      // validate the sql query
      val validated = planner.validate(parsed)
      // transform to a relational tree
      val relational = planner.rel(validated)
      new Table(this, LogicalRelNode(relational.rel))
    } else {
      throw new TableException(
        "Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
          "SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")
    }
  }

所以sqlQuery之后生成的是最初的Logical Plan,按照正常的sql,应该还有optimize,以及生成物理执行计划physicalPlan 继续看tEnv.toAppendStream(result, Row.class)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def toAppendStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
    toAppendStream(table, clazz, queryConfig)
}
def toAppendStream[T](
  table: Table,
  clazz: Class[T],
  queryConfig: StreamQueryConfig): DataStream[T] = {
val typeInfo = TypeExtractor.createTypeInfo(clazz)
TableEnvironment.validateType(typeInfo)
translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
}
protected def translate[A](
  table: Table,
  queryConfig: StreamQueryConfig,
  updatesAsRetraction: Boolean,
  withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {
val relNode = table.getRelNode
val dataStreamPlan = optimize(relNode, updatesAsRetraction)

val rowType = getResultType(relNode, dataStreamPlan)

translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
}
  

看到optimize就知道了,看下里边的优化规则什么的,规则都在FlinkRuleSets里边定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
 private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = {

    // 0. convert sub-queries before query decorrelation
    val convSubQueryPlan = runHepPlanner(
      HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet)

    // 0. convert table references
    val fullRelNode = runHepPlanner(
      HepMatchOrder.BOTTOM_UP,
      FlinkRuleSets.TABLE_REF_RULES,
      convSubQueryPlan,
      relNode.getTraitSet)

    // 1. decorrelate
    val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)

    // 2. convert time indicators
    val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)

    // 3. normalize the logical plan
    val normRuleSet = getNormRuleSet
    val normalizedPlan = if (normRuleSet.iterator().hasNext) {
      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)
    } else {
      convPlan
    }

    // 4. optimize the logical Flink plan
    val logicalOptRuleSet = getLogicalOptRuleSet
    val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
    val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
      runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
    } else {
      normalizedPlan
    }

    // 5. optimize the physical Flink plan
    val physicalOptRuleSet = getPhysicalOptRuleSet
    val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
    val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
      runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
    } else {
      logicalPlan
    }

    // 6. decorate the optimized plan
    val decoRuleSet = getDecoRuleSet
    val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
      val planToDecorate = if (updatesAsRetraction) {
        physicalPlan.copy(
          physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
          physicalPlan.getInputs)
      } else {
        physicalPlan
      }
      runHepPlanner(
        HepMatchOrder.BOTTOM_UP,
        decoRuleSet,
        planToDecorate,
        planToDecorate.getTraitSet)
    } else {
      physicalPlan
    }

    decoratedPlan
  }

先看下FlinkRuleSets定义的规则

1
2
3
4
5
6
7
8
FlinkRuleSets.TABLE_SUBQUERY_RULES
FlinkRuleSets.TABLE_REF_RULES
FlinkRuleSets.LOGICAL_OPT_RULES
FlinkRuleSets.DATASET_NORM_RULES
FlinkRuleSets.DATASET_OPT_RULES
FlinkRuleSets.DATASTREAM_NORM_RULES
FlinkRuleSets.DATASTREAM_OPT_RULES
FlinkRuleSets.DATASTREAM_DECO_RULES

optimize这块在1.9有重新调整了下代码结构,流批的Optimizer的optimize方法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//StreamOptimizer.scala
def optimize(
    relNode: RelNode,
    updatesAsRetraction: Boolean,
    relBuilder: RelBuilder): RelNode = {
    val convSubQueryPlan = optimizeConvertSubQueries(relNode)//subquery规则优化
    val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
    val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan, relBuilder)
    val planWithMaterializedTimeAttributes =
      RelTimeIndicatorConverter.convert(decorPlan, relBuilder.getRexBuilder)
    val normalizedPlan = optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes)
    val logicalPlan = optimizeLogicalPlan(normalizedPlan)

    val physicalPlan = optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASTREAM)
    optimizeDecoratePlan(physicalPlan, updatesAsRetraction)
}
//BatchOptimizer.scala
def optimize(relNode: RelNode): RelNode = {
    val convSubQueryPlan = optimizeConvertSubQueries(relNode)
    val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
    val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)
    val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan)
    val logicalPlan = optimizeLogicalPlan(normalizedPlan)
    optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET)
}

前两步优化规则都一样

1
2
3
 FlinkRuleSets.TABLE_SUBQUERY_RULES
 FlinkRuleSets.EXPAND_PLAN_RULES,
 FlinkRuleSets.POST_EXPAND_CLEAN_UP_RULES

另外optimizeLogicalPlan也是一样的规则

1
2
3
  protected def getBuiltInLogicalOptRuleSet: RuleSet = {
    FlinkRuleSets.LOGICAL_OPT_RULES
  }

但是optimizeNormalizeLogicalPlan,optimizePhysicalPlan使用的规则是不一样的, batch使用的规则

1
2
FlinkRuleSets.DATASET_NORM_RULES
FlinkRuleSets.DATASET_OPT_RULES

stream使用的规则

1
2
FlinkRuleSets.DATASTREAM_NORM_RULES
FlinkRuleSets.DATASTREAM_OPT_RULES

stream在后边还有一步optimizeDecoratePlan,使用的规则是

1
FlinkRuleSets.DATASTREAM_DECO_RULES

最后会转换成dataStream/dataset?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//StreamOptimizer.scala
val dataStream = translateToCRow(optimizedPlan, queryConfig)
private def translateOptimized[A](
  optimizedPlan: RelNode,
  logicalSchema: TableSchema,
  tpe: TypeInformation[A],
  queryConfig: StreamQueryConfig,
  withChangeFlag: Boolean)
: DataStream[A] = {
val dataStream = translateToCRow(optimizedPlan, queryConfig)
DataStreamConversions.convert(dataStream, logicalSchema, withChangeFlag, tpe, config)
}
  private def translateToCRow(
    logicalPlan: RelNode,
    queryConfig: StreamQueryConfig): DataStream[CRow] = {

    logicalPlan match {
      case node: DataStreamRel =>
        node.translateToPlan(this, queryConfig)
      case _ =>
        throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
          "This is a bug and should not happen. Please file an issue.")
    }
  }

这里的DataStreamRel是个trait,extends了FlinkRelNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
trait DataStreamRel extends FlinkRelNode {

  /**
    * Translates the FlinkRelNode into a Flink operator.
    *
    * @param tableEnv    The [[StreamPlanner]] of the translated Table.
    * @param queryConfig The configuration for the query to generate.
    * @return DataStream of type [[CRow]]
    */
  def translateToPlan(
    tableEnv: StreamPlanner,
    queryConfig: StreamQueryConfig): DataStream[CRow]
    }
    ...
}

实现translateToPlan的很多,比如DataStreamWindowJoin,DataStreamJoin
translateToPlan

This post is licensed under CC BY 4.0 by the author.