基于flink-1.8, 1.9包含了blink的代码
flink SQL 示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static void main(String[] args) throws Exception {
// check parameter
if (args.length != 1) {
System.err.println("Please provide the path to the taxi rides file as a parameter");
}
String inputPath = args[0];
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// configure event-time and watermarks
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000L);
// create table environment
StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
// register user-defined function
tEnv.registerFunction("toCellId", new GeoUtils.ToCellId());
// get taxi ride event stream
DataStream<TaxiRide> rides = TaxiRides.getRides(env, inputPath);
// register taxi ride event stream as table "Rides"
tEnv.registerDataStream(
"Rides",
rides,
"medallion, licenseId, pickUpTime, dropOffTime.rowtime, " +
"pickUpLon, pickUpLat, dropOffLon, dropOffLat, total");
// define SQL query to compute average total per area and hour of day.
Table result = tEnv.sqlQuery(
"SELECT " +
" toCellId(dropOffLon, dropOffLat) AS area, " +
" EXTRACT(HOUR FROM dropOffTime) AS hourOfDay, " +
" AVG(total) AS avgTotal " +
"FROM Rides " +
"GROUP BY " +
" toCellId(dropOffLon, dropOffLat), " +
" EXTRACT(HOUR FROM dropOffTime)"
);
// convert result table into a retraction stream and print it
tEnv.toRetractStream(result, Row.class)
.print();
// execute the query
env.execute();
}
1,解析sql语句,转换成AST语法树,这里用SqlNode表示
2,验证SQL,结合Catalog,验证/检查sql语法等
3,生成逻辑执行(Logical Plan)计划,将sqlNode表示的AST转换成LogicalPlan,flink里边用RelNode表示,但是这个返回的是一个RelRoot,里边封装了RelNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def sqlQuery(query: String): Table = {
val planner = new FlinkPlannerImpl(getFrameworkConfig, getPlanner, getTypeFactory)
// parse the sql query
val parsed = planner.parse(query)
if (null != parsed && parsed.getKind.belongsTo(SqlKind.QUERY)) {
// validate the sql query
val validated = planner.validate(parsed)
// transform to a relational tree
val relational = planner.rel(validated)
new Table(this, LogicalRelNode(relational.rel))
} else {
throw new TableException(
"Unsupported SQL query! sqlQuery() only accepts SQL queries of type " +
"SELECT, UNION, INTERSECT, EXCEPT, VALUES, and ORDER_BY.")
}
}
所以sqlQuery之后生成的是最初的Logical Plan,按照正常的sql,应该还有optimize,以及生成物理执行计划physicalPlan 继续看tEnv.toAppendStream(result, Row.class)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def toAppendStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
toAppendStream(table, clazz, queryConfig)
}
def toAppendStream[T](
table: Table,
clazz: Class[T],
queryConfig: StreamQueryConfig): DataStream[T] = {
val typeInfo = TypeExtractor.createTypeInfo(clazz)
TableEnvironment.validateType(typeInfo)
translate[T](table, queryConfig, updatesAsRetraction = false, withChangeFlag = false)(typeInfo)
}
protected def translate[A](
table: Table,
queryConfig: StreamQueryConfig,
updatesAsRetraction: Boolean,
withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A] = {
val relNode = table.getRelNode
val dataStreamPlan = optimize(relNode, updatesAsRetraction)
val rowType = getResultType(relNode, dataStreamPlan)
translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
}
看到optimize就知道了,看下里边的优化规则什么的,规则都在FlinkRuleSets里边定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = {
// 0. convert sub-queries before query decorrelation
val convSubQueryPlan = runHepPlanner(
HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet)
// 0. convert table references
val fullRelNode = runHepPlanner(
HepMatchOrder.BOTTOM_UP,
FlinkRuleSets.TABLE_REF_RULES,
convSubQueryPlan,
relNode.getTraitSet)
// 1. decorrelate
val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)
// 2. convert time indicators
val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
// 3. normalize the logical plan
val normRuleSet = getNormRuleSet
val normalizedPlan = if (normRuleSet.iterator().hasNext) {
runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)
} else {
convPlan
}
// 4. optimize the logical Flink plan
val logicalOptRuleSet = getLogicalOptRuleSet
val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
} else {
normalizedPlan
}
// 5. optimize the physical Flink plan
val physicalOptRuleSet = getPhysicalOptRuleSet
val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
} else {
logicalPlan
}
// 6. decorate the optimized plan
val decoRuleSet = getDecoRuleSet
val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
val planToDecorate = if (updatesAsRetraction) {
physicalPlan.copy(
physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
physicalPlan.getInputs)
} else {
physicalPlan
}
runHepPlanner(
HepMatchOrder.BOTTOM_UP,
decoRuleSet,
planToDecorate,
planToDecorate.getTraitSet)
} else {
physicalPlan
}
decoratedPlan
}
先看下FlinkRuleSets定义的规则
1
2
3
4
5
6
7
8
FlinkRuleSets.TABLE_SUBQUERY_RULES
FlinkRuleSets.TABLE_REF_RULES
FlinkRuleSets.LOGICAL_OPT_RULES
FlinkRuleSets.DATASET_NORM_RULES
FlinkRuleSets.DATASET_OPT_RULES
FlinkRuleSets.DATASTREAM_NORM_RULES
FlinkRuleSets.DATASTREAM_OPT_RULES
FlinkRuleSets.DATASTREAM_DECO_RULES
optimize这块在1.9有重新调整了下代码结构,流批的Optimizer的optimize方法如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//StreamOptimizer.scala
def optimize(
relNode: RelNode,
updatesAsRetraction: Boolean,
relBuilder: RelBuilder): RelNode = {
val convSubQueryPlan = optimizeConvertSubQueries(relNode)//subquery规则优化
val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan, relBuilder)
val planWithMaterializedTimeAttributes =
RelTimeIndicatorConverter.convert(decorPlan, relBuilder.getRexBuilder)
val normalizedPlan = optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes)
val logicalPlan = optimizeLogicalPlan(normalizedPlan)
val physicalPlan = optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASTREAM)
optimizeDecoratePlan(physicalPlan, updatesAsRetraction)
}
//BatchOptimizer.scala
def optimize(relNode: RelNode): RelNode = {
val convSubQueryPlan = optimizeConvertSubQueries(relNode)
val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)
val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan)
val logicalPlan = optimizeLogicalPlan(normalizedPlan)
optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET)
}
前两步优化规则都一样
1
2
3
FlinkRuleSets.TABLE_SUBQUERY_RULES
FlinkRuleSets.EXPAND_PLAN_RULES,
FlinkRuleSets.POST_EXPAND_CLEAN_UP_RULES
另外optimizeLogicalPlan也是一样的规则
1
2
3
protected def getBuiltInLogicalOptRuleSet: RuleSet = {
FlinkRuleSets.LOGICAL_OPT_RULES
}
但是optimizeNormalizeLogicalPlan,optimizePhysicalPlan使用的规则是不一样的, batch使用的规则
1
2
FlinkRuleSets.DATASET_NORM_RULES
FlinkRuleSets.DATASET_OPT_RULES
stream使用的规则
1
2
FlinkRuleSets.DATASTREAM_NORM_RULES
FlinkRuleSets.DATASTREAM_OPT_RULES
stream在后边还有一步optimizeDecoratePlan,使用的规则是
1
FlinkRuleSets.DATASTREAM_DECO_RULES
最后会转换成dataStream/dataset?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//StreamOptimizer.scala
val dataStream = translateToCRow(optimizedPlan, queryConfig)
private def translateOptimized[A](
optimizedPlan: RelNode,
logicalSchema: TableSchema,
tpe: TypeInformation[A],
queryConfig: StreamQueryConfig,
withChangeFlag: Boolean)
: DataStream[A] = {
val dataStream = translateToCRow(optimizedPlan, queryConfig)
DataStreamConversions.convert(dataStream, logicalSchema, withChangeFlag, tpe, config)
}
private def translateToCRow(
logicalPlan: RelNode,
queryConfig: StreamQueryConfig): DataStream[CRow] = {
logicalPlan match {
case node: DataStreamRel =>
node.translateToPlan(this, queryConfig)
case _ =>
throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
"This is a bug and should not happen. Please file an issue.")
}
}
这里的DataStreamRel是个trait,extends了FlinkRelNode
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
trait DataStreamRel extends FlinkRelNode {
/**
* Translates the FlinkRelNode into a Flink operator.
*
* @param tableEnv The [[StreamPlanner]] of the translated Table.
* @param queryConfig The configuration for the query to generate.
* @return DataStream of type [[CRow]]
*/
def translateToPlan(
tableEnv: StreamPlanner,
queryConfig: StreamQueryConfig): DataStream[CRow]
}
...
}
实现translateToPlan的很多,比如DataStreamWindowJoin,DataStreamJoin