FlinkSQL流程图 StreamSQLExample 示例代码 def main(args: Array[String]): Unit = { // set up the Scala DataStream API val env = StreamExecutionEnvironment.getExecutionEnvironment // set up ...
FlinkSQL(Flink1.15)规则优化以及Calcite原理
SQL语句示例 select p.id,o.id from products p join orders o on p.id=o.id where p.id > 5 优化前,从SqlNode到RelNode阶段,从SqlToRelConverter.convertQuery的trace日志 [DEBUG] 2022-08-22 14:50:41,662(627) --> [ma...
Flink1.15 JobMaster源码分析
启动流程 dispatcher.start();调用start后回调onStart public final void start() { rpcServer.start(); } StandaloneDispatcher继承自Dispatcher public class StandaloneDispatcher extends Dispatcher ...
Flink1.15 Dispatcher源码分析
AM启动类YarnApplicationClusterEntryPointYarnApplicationClusterEntryPoint.mainClusterEntrypoint.runClusterEntrypointClusterEntrypoint.startClusterClusterEntrypoint.runCluster private void runCluster(Co...
Flink1.15 Cli提交源码分析
org.apache.flink.client.cli.CliFrontend#run public static void main(final String[] args) { EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args); // 1. find the confi...
Flink1.15 SLOT源码分析
Flink Slot管理 JobMaster继承了RpcEndpoint,start的回调Onstart protected void onStart() throws JobMasterException { try { startJobExecution(); } catch (Exception e) { final JobMaster...
Flink1.15 Task源码分析
DefaultScheduler.allocateSlotsAndDeploy public void allocateSlotsAndDeploy( final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) { validateDeploymentOptions(e...
Flink1.15 CKP InputData&OutPutData源码分析
AlternatingCollectingBarriers.alignmentTimeout public BarrierHandlerState alignmentTimeout( Controller controller, CheckpointBarrier checkpointBarrier) throws IOException, CheckpointException {...
Flink1.15 CheckPoint restore源码分析
当所有task成功后,执行completePendingCheckpoint()操作 将PendingCheckpoint 转成CompletedCheckpoint,并将CheckpointMetadata持久化 更新completedCheckpointStore 通知其他算子ack,执行commit ``` //CheckpointCoordinator pr...