启动入口,模块bootstrap
main,模块container
继承自Main.java,模块container-common
Api接口
创建pipelinePipelineStoreResource.java
1
http://127.0.0.1:18630/rest/v1/pipeline/test2?autoGeneratePipelineId=true&description=ccccc
启动pipelineManagerResource.java
1
http://127.0.0.1:18630/rest/v1/pipelines/start
1
ManagerResource.start()--> runner.start()(Runner runner = manager.getRunner(pipelineId, rev))-->StandaloneRunner.start()
stack
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2018-01-03 01:02:21,988 [user:*admin] [pipeline:test/test19628fc5-95de-42cd-ba29-f55760e1d2
56] [runner:] [thread:ProductionPipelineRunnable-test19628fc5-95de-42cd-ba29-f55760e1d256-t
est] ERROR MysqlSource - Error connecting to MySql binlog: BinaryLogClient was unable to co
nnect in 5000ms
java.util.concurrent.TimeoutException: BinaryLogClient was unable to connect in 5000ms
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:644)
at com.streamsets.pipeline.stage.origin.mysql.MysqlSource.init(MysqlSource.java:103
)
at com.streamsets.pipeline.api.base.BaseStage.init(BaseStage.java:52)
at com.streamsets.datacollector.runner.StageRuntime.init(StageRuntime.java:156)
at com.streamsets.datacollector.runner.StagePipe.init(StagePipe.java:105)
at com.streamsets.datacollector.runner.StagePipe.init(StagePipe.java:53)
at com.streamsets.datacollector.runner.Pipeline.initPipe(Pipeline.java:299)
at com.streamsets.datacollector.runner.Pipeline.init(Pipeline.java:214)
at com.streamsets.datacollector.execution.runner.common.ProductionPipeline.run(Prod
uctionPipeline.java:96)
at com.streamsets.datacollector.execution.runner.common.ProductionPipelineRunnable.
run(ProductionPipelineRunnable.java:79)
at com.streamsets.datacollector.execution.runner.standalone.StandaloneRunner.start(
StandaloneRunner.java:668)
at com.streamsets.datacollector.execution.runner.common.AsyncRunner.lambda$start$3(
AsyncRunner.java:149)
at com.streamsets.datacollector.execution.runner.common.AsyncRunner$$Lambda$30/1593
004381.call(Unknown Source)
at com.streamsets.pipeline.lib.executor.SafeScheduledExecutorService$SafeCallable.c
all(SafeScheduledExecutorService.java:233)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
ProductionPipelineRunner.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
try {
if (originPipe.getStage().getStage() instanceof PushSource) {
runPushSource();
} else {
runPollSource();
}
} catch (Throwable throwable) {}
//runPushSource
originPipe.process(offsetTracker.getOffsets(), batchSize, this);
//SourcePipe.process
getStage().execute(offsets, batchSize);
ProductionPipelineRunner.run()—>ProductionPipelineRunner.processPipe()—>StagePipe.process()—>StageRuntime.execute
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public void execute(final Map<String, String> offsets, final int batchSize) throws StageException {
Callable<String> callable = () -> {
switch (getDefinition().getType()) {
case SOURCE:
if(getStage() instanceof PushSource) {
((PushSource)getStage()).produce(offsets, batchSize);
return null;
}
// fall through
default:
throw new IllegalStateException(Utils.format("Unknown stage type: '{}'", getDefinition().getType()));
}
};
execute(callable, null, null);
}
public String execute(
final String previousOffset,
final int batchSize,
final Batch batch,
final BatchMaker batchMaker,
ErrorSink errorSink,
EventSink eventSink
) throws StageException {
Callable<String> callable = new Callable<String>() {
@Override
public String call() throws Exception {
String newOffset = null;
switch (getDefinition().getType()) {
case SOURCE: {
newOffset = ((Source) getStage()).produce(previousOffset, batchSize, batchMaker);
break;
}
case PROCESSOR: {
((Processor) getStage()).process(batch, batchMaker);
break;
}
case EXECUTOR:
case TARGET: {
((Target) getStage()).write(batch);
break;
}
default: {
throw new IllegalStateException(Utils.format("Unknown stage type: '{}'", getDefinition().getType()));
}
}
return newOffset;
}
};
return execute(callable, errorSink, eventSink);
}
target为例,这里可以是kudu,jdbc等
KuduTarget
1
2
3
4
5
6
7
8
9
10
11
12
public void write(final Batch batch) throws StageException {
try {
if (!batch.getRecords().hasNext()) {
// No records - take the opportunity to clean up the cache so that we don't hold on to memory indefinitely
cacheCleaner.periodicCleanUp();
}
writeBatch(batch);
} catch (Exception e) {
throw throwStageException(e);
}
}
JdbcTarget
1
2
3
4
5
6
7
8
9
public void write(Batch batch) throws StageException {
if (!batch.getRecords().hasNext()) {
// No records - take the opportunity to clean up the cache so that we don't hold on to memory indefinitely
cacheCleaner.periodicCleanUp();
}
// jdbc target always commit batch execution
final boolean perRecord = false;
JdbcUtil.write(batch, schema, tableNameEval, tableNameVars, tableNameTemplate, caseSensitive, recordWriters, errorRecordHandler, perRecord);
}