Home Streamsets 源码流程梳理
Post
Cancel

Streamsets 源码流程梳理

启动入口,模块bootstrap

BootstrapMain.java

main,模块container

DataCollectorMain.java

继承自Main.java,模块container-common

Api接口

创建pipelinePipelineStoreResource.java

1
http://127.0.0.1:18630/rest/v1/pipeline/test2?autoGeneratePipelineId=true&description=ccccc

启动pipelineManagerResource.java

1
http://127.0.0.1:18630/rest/v1/pipelines/start

StandaloneRunner.java

1
ManagerResource.start()--> runner.start()(Runner runner = manager.getRunner(pipelineId, rev))-->StandaloneRunner.start()

stack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2018-01-03 01:02:21,988 [user:*admin] [pipeline:test/test19628fc5-95de-42cd-ba29-f55760e1d2
56] [runner:] [thread:ProductionPipelineRunnable-test19628fc5-95de-42cd-ba29-f55760e1d256-t
est] ERROR MysqlSource - Error connecting to MySql binlog: BinaryLogClient was unable to co
nnect in 5000ms
java.util.concurrent.TimeoutException: BinaryLogClient was unable to connect in 5000ms
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:644)
        at com.streamsets.pipeline.stage.origin.mysql.MysqlSource.init(MysqlSource.java:103
)
        at com.streamsets.pipeline.api.base.BaseStage.init(BaseStage.java:52)
        at com.streamsets.datacollector.runner.StageRuntime.init(StageRuntime.java:156)
        at com.streamsets.datacollector.runner.StagePipe.init(StagePipe.java:105)
        at com.streamsets.datacollector.runner.StagePipe.init(StagePipe.java:53)
        at com.streamsets.datacollector.runner.Pipeline.initPipe(Pipeline.java:299)
        at com.streamsets.datacollector.runner.Pipeline.init(Pipeline.java:214)
        at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(Prod
uctionPipeline.java:96)
        at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.
run(ProductionPipelineRunnable.java:79)
        at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(
StandaloneRunner.java:668)
        at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(
AsyncRunner.java:149)
        at com.streamsets.datacollector.execution.runner.common.AsyncRunner$$Lambda$30/1593
004381.call(Unknown Source)
        at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.c
all(SafeScheduledExecutorService.java:233)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)

ProductionPipelineRunner.run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
     try {
      if (originPipe.getStage().getStage() instanceof PushSource) {
        runPushSource();
      } else {
        runPollSource();
      }

    } catch (Throwable throwable) {}
//runPushSource

 originPipe.process(offsetTracker.getOffsets(), batchSize, this);

//SourcePipe.process
 getStage().execute(offsets, batchSize);

ProductionPipelineRunner.run()—>ProductionPipelineRunner.processPipe()—>StagePipe.process()—>StageRuntime.execute

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
   public void execute(final Map<String, String> offsets, final int batchSize) throws StageException {
      Callable<String> callable = () -> {
        switch (getDefinition().getType()) {
          case SOURCE:
            if(getStage() instanceof PushSource) {
              ((PushSource)getStage()).produce(offsets, batchSize);
              return null;
            }
            // fall through
          default:
            throw new IllegalStateException(Utils.format("Unknown stage type: '{}'", getDefinition().getType()));
        }
      };

      execute(callable, null, null);
  }

  public String execute(
    final String previousOffset,
    final int batchSize,
    final Batch batch,
    final BatchMaker batchMaker,
    ErrorSink errorSink,
    EventSink eventSink
  ) throws StageException {
    Callable<String> callable = new Callable<String>() {
      @Override
      public String call() throws Exception {
        String newOffset = null;
        switch (getDefinition().getType()) {
          case SOURCE: {
            newOffset = ((Source) getStage()).produce(previousOffset, batchSize, batchMaker);
            break;
          }
          case PROCESSOR: {
            ((Processor) getStage()).process(batch, batchMaker);
            break;

          }
          case EXECUTOR:
          case TARGET: {
            ((Target) getStage()).write(batch);
            break;
          }
          default: {
            throw new IllegalStateException(Utils.format("Unknown stage type: '{}'", getDefinition().getType()));
          }
        }
        return newOffset;
      }
    };

    return execute(callable, errorSink, eventSink);
  }

target为例,这里可以是kudu,jdbc等

KuduTarget

1
2
3
4
5
6
7
8
9
10
11
12
   public void write(final Batch batch) throws StageException {
    try {
      if (!batch.getRecords().hasNext()) {
        // No records - take the opportunity to clean up the cache so that we don't hold on to memory indefinitely
        cacheCleaner.periodicCleanUp();
      }

      writeBatch(batch);
    } catch (Exception e) {
      throw throwStageException(e);
    }
  }

JdbcTarget

1
2
3
4
5
6
7
8
9
   public void write(Batch batch) throws StageException {
    if (!batch.getRecords().hasNext()) {
      // No records - take the opportunity to clean up the cache so that we don't hold on to memory indefinitely
      cacheCleaner.periodicCleanUp();
    }
    // jdbc target always commit batch execution
    final boolean perRecord = false;
    JdbcUtil.write(batch, schema, tableNameEval, tableNameVars, tableNameTemplate, caseSensitive, recordWriters, errorRecordHandler, perRecord);
  }
This post is licensed under CC BY 4.0 by the author.