前面的处理跟索引阶段差不多,对照参考elasticsearch-1.3.0 之索引代码粗略梳理
先来个term的query
1
2
3
4
5
6
7
{
"query":{
"term":{
"content":"a"
}
}
}
下面multi_match的Query
1
2
3
4
5
6
7
8
{
"query": {
"multi_match": {
"query" : "我的宝马多少马力",
"fields" : ["title", "content"]
}
}
}
Query query = parseContext.parseInnerQuery()// 最后是一个DisjunctionMaxQuery
1
((title:我 title:的 title:宝 title:马 title:多 title:少 title:马 title:力) | (query.term.content:我 query.term.content:的 query.term.content:宝 query.term.content:马 query.term.content:多 query.term.content:少 query.term.content:马 query.term.content:力))
从RestController说起
1
RestController.dispatchRequest-->RestController.executeHandler-->BaseRestHandler.handleRequest()-->RestSearchAction.handleRequest()-->NodeClient(AbstractClient).search-->NodeClient.execute()-->TransportSearchAction.execute-->TransportAction.execute-->TransportSearchAction.doExecute-->TransportSearchQueryAndFetchAction(TransportAction).execute-->TransportSearchQueryThenFetchAction.doExecute
TransportSearchQueryThenFetchAction的doExecute中启动一个异步task AsyncAction
1
2
3
protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
new AsyncAction(searchRequest, listener).start();
}
在start中主要分两个过程queryPhase.execute(searchService.executeQueryPhase)与fetchPhase.execute(searchService.executeFetchPhase)
queryPhase.execute流程
1
AsyncAction.start-->AsyncAction.performFirstPhase-->TransportSearchQueryThenFetchAction.sendExecuteFirstPhase-->SearchService.sendExecuteQuery-->SearchService.executeQueryPhase-->queryPhase.execute
在SearchService.executeQueryPhase的中首先调用createAndPutContext创建SearchContext,createAndPutContext创建SearchContext,并且接入activeContexts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final SearchContext createAndPutContext(ShardSearchRequest request) throws ElasticsearchException {
SearchContext context = createContext(request, null);
boolean success = false;
try {
activeContexts.put(context.id(), context);
context.indexShard().searchService().onNewContext(context);
success = true;
return context;
} finally {
if (!success) {
freeContext(context);
}
}
}
createContext中创建SearchContext
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(request.shardId());
SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());
Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays);
SearchContext.setCurrent(context);
try {
context.scroll(request.scroll());
context.useSlowScroll(request.useSlowScroll());
parseTemplate(request);
parseSource(context, request.source());
parseSource(context, request.extraSource());
// if the from and size are still not set, default them
if (context.from() == -1) {
context.from(0);
}
if (context.size() == -1) {
context.size(10);
}
// pre process
dfsPhase.preProcess(context);
queryPhase.preProcess(context);
fetchPhase.preProcess(context);
// compute the context keep alive
long keepAlive = defaultKeepAlive;
if (request.scroll() != null && request.scroll().keepAlive() != null) {
keepAlive = request.scroll().keepAlive().millis();
}
context.keepAlive(keepAlive);
}
parseSource中包含Query的解析过程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// nothing to parse...
if (source == null || source.length() == 0) {
return;
}
XContentParser parser = null;
try {
String sourceStr = new String(source.toBytes());
parser = XContentFactory.xContent(source).createParser(source);
XContentParser.Token token;
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new ElasticsearchParseException("Expected START_OBJECT but got " + token.name() + " " + parser.currentName());
}
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
String fieldName = parser.currentName();
parser.nextToken();
SearchParseElement element = elementParsers.get(fieldName);//QueryParseElement
if (element == null) {
throw new SearchParseException(context, "No parser for element [" + fieldName + "]");
}
element.parse(parser, context);
} else {
if (token == null) {
throw new ElasticsearchParseException("End of query source reached but query is not complete.");
} else {
throw new ElasticsearchParseException("Expected field name but got " + token.name() + " \"" + parser.currentName() + "\"");
}
}
}
}
上边的两个语句都是query,source还是bytes,可以String sourceStr = new String(source.toBytes())直接转成String直观;
parser = XContentFactory.xContent(source).createParser(source);这句很关键
SearchParseElement是QueryParseElement,QueryParseElement.parse的方法
1
2
3
4
5
6
7
public class QueryParseElement implements SearchParseElement {
@Override
public void parse(XContentParser parser, SearchContext context) throws Exception {
context.parsedQuery(context.queryParserService().parse(parser));
}
}
终于看到queryParserService,这个就是IndexQueryParserService,IndexQueryParserService中的调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ParsedQuery parse(XContentParser parser) {
return parse(cache.get(), parser);
}
public ParsedQuery parse(QueryParseContext context, XContentParser parser) {
try {
return innerParse(context, parser);
} catch (IOException e) {
throw new QueryParsingException(index, "Failed to parse", e);
}
}
private ParsedQuery innerParse(QueryParseContext parseContext, XContentParser parser) throws IOException, QueryParsingException {
parseContext.reset(parser);
try {
if (strict) {
parseContext.parseFlags(EnumSet.of(ParseField.Flag.STRICT));
}
Query query = parseContext.parseInnerQuery();//
if (query == null) {
query = Queries.newMatchNoDocsQuery();
}
return new ParsedQuery(query, parseContext.copyNamedFilters());
} finally {
parseContext.reset(null);
}
}
cache是CloseableThreadLocal ,这个是包装的ThreadLocal
Query的解析过程主要在parseInnerQuery
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public Query parseInnerQuery() throws IOException, QueryParsingException {
// move to START object
XContentParser.Token token;
if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new QueryParsingException(index, "[_na] query malformed, must start with start_object");
}
}
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new QueryParsingException(index, "[_na] query malformed, no field after start_object");
}
String queryName = parser.currentName();
// move to the next START_OBJECT
token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT && token != XContentParser.Token.START_ARRAY) {
throw new QueryParsingException(index, "[_na] query malformed, no field after start_object");
}
QueryParser queryParser = indexQueryParser.queryParser(queryName);
if (queryParser == null) {
throw new QueryParsingException(index, "No query registered for [" + queryName + "]");
}
Query result = queryParser.parse(this);
if (parser.currentToken() == XContentParser.Token.END_OBJECT || parser.currentToken() == XContentParser.Token.END_ARRAY) {
// if we are at END_OBJECT, move to the next one...
parser.nextToken();
}
return result;
}
依据name获取 QueryParser,第一个语句是TermQueryParser,第二个语句MultiMatchQueryParser
MatchQueryParser解析的语句比较多 NAME, “match_phrase”, “matchPhrase”, “match_phrase_prefix”, “matchPhrasePrefix”, “matchFuzzy”, “match_fuzzy”, “fuzzy_match”,其中name就是match
1
QueryParser queryParser = indexQueryParser.queryParser(queryName);
query的解析,第一个语句最后解析成TermQuery,第二条解析成DisjunctionMaxQuery
1
Query result = queryParser.parse(this)
queryPhase.execute(context)内部会调用Lucene的IndexSearcher的search()
第二阶段是在第一阶段sendExecuteFirstPhase的回调onFirstPhaseResult中调用innerMoveToSecondPhase,主要是调用子类的moveToSecondPhase,TransportSearchQueryThenFetchAction.moveToSecondPhase()
在5.1.1版本中,已经改为AsyncAction(SearchQueryThenFetchAsyncAction)
AsyncAction在TransportTasksAction了?TransportTasksAction->doExecute()
1
2
3
protected void doExecute(Task task, TasksRequest request, ActionListener<TasksResponse> listener) {
new AsyncAction(task, request, listener).start();
}
都会走到这一步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);
default AsyncSender interceptSender(AsyncSender sender) {
return sender;
}
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {
asyncSender.sendRequest(node, action, request, options, handler);
}
private <T extends TransportResponse> void sendRequestInternal(final DiscoveryNode node, final String action,
final TransportRequest request,
final TransportRequestOptions options,
TransportResponseHandler<T> handler) {...}
顺便理了下Primary与Replica,TransportService中的sendRequestInternal
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if (node == null) {
throw new IllegalStateException("can't send request to a null node");
}
final long requestId = newRequestId();
final TimeoutHandler timeoutHandler;
try {
if (options.timeout() == null) {
timeoutHandler = null;
} else {
timeoutHandler = new TimeoutHandler(requestId);
}
TransportResponseHandler<T> responseHandler =
new ContextRestoreResponseHandler<>(threadPool.getThreadContext().newStoredContext(), handler);
clientHandlers.put(requestId, new RequestHolder<>(responseHandler, node, action, timeoutHandler));
if (lifecycle.stoppedOrClosed()) {
// if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify
// the caller. It will only notify if the toStop code hasn't done the work yet.
throw new TransportException("TransportService is closed stopped can't send request");
}
if (timeoutHandler != null) {
assert options.timeout() != null;
timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
}
if (node.equals(localNode)) {
sendLocalRequest(requestId, action, request);
} else {
transport.sendRequest(node, requestId, action, request, options);
}
省取了部分代码,暂时不看TimeoutHandler,就是sendLocalRequest()执行本节点操作或transport.sendRequest()发送消息出去?看下local
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void sendLocalRequest(long requestId, final String action, final TransportRequest request) {
final RequestHandlerRegistry reg = adapter.getRequestHandler(action);
final String executor = reg.getExecutor();
if (ThreadPool.Names.SAME.equals(executor)) {
//noinspection unchecked
reg.processMessageReceived(request, channel);
} else {
threadPool.executor(executor).execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
//noinspection unchecked
reg.processMessageReceived(request, channel);
}
主要是 reg.processMessageReceived(request, channel);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
final Task task = taskManager.register(channel.getChannelType(), action, request);
if (task == null) {
handler.messageReceived(request, channel);
} else {
boolean success = false;
try { // ReplicaOperationTransportHandler/PrimaryOperationTransportHandler
handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
}
}
}
这里的handler注册位于TransportReplicationAction里面
1
2
3
4
5
6
7
8
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
new PrimaryOperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction,
() -> new ConcreteShardRequest<>(replicaRequest),
executor, true, true,
new ReplicaOperationTransportHandler());
PrimaryOperationTransportHandler的messageReceived方法
1
2
3
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
}
ReplicaOperationTransportHandler的messageReceived方法
1
2
3
4
public void messageReceived(ConcreteShardRequest<ReplicaRequest> requestWithAID, TransportChannel channel, Task task)
throws Exception {
new AsyncReplicaAction(requestWithAID.request, requestWithAID.targetAllocationID, channel, (ReplicationTask) task).run();
}
6.0.0-alpha1 中Primary与Replica
1
2
3
4
5
6
7
8
9
10
11
12
class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
@Override
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
}
}
class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<ReplicaRequest>> {
public void messageReceived(ConcreteShardRequest<ReplicaRequest> requestWithAID, TransportChannel channel, Task task)
throws Exception {
new AsyncReplicaAction(requestWithAID.request, requestWithAID.targetAllocationID, channel, (ReplicationTask) task).run();
}
}
Primary
1
AsyncPrimaryAction.onResponse-->ReplicationOperation.execute-->primary.perform-->PrimaryShardReference.perform-->TransportShardBulkAction.shardOperationOnPrimary
Replica
1
AsyncReplicaAction.onResponse-->TransportShardBulkAction.shardOperationOnReplica