Home Elasticsearch 搜索过程代码整理
Post
Cancel

Elasticsearch 搜索过程代码整理

前面的处理跟索引阶段差不多,对照参考elasticsearch-1.3.0 之索引代码粗略梳理

先来个term的query

1
2
3
4
5
6
7
{
"query":{
"term":{
"content":"a"
}
}
}

下面multi_match的Query

1
2
3
4
5
6
7
8
{
  "query": {
    "multi_match": {
        "query" : "我的宝马多少马力",
        "fields" : ["title", "content"]
    }
  }
}

Query query = parseContext.parseInnerQuery()// 最后是一个DisjunctionMaxQuery

1
((title:我 title:的 title:宝 title:马 title:多 title:少 title:马 title:力) | (query.term.content:我 query.term.content:的 query.term.content:宝 query.term.content:马 query.term.content:多 query.term.content:少 query.term.content:马 query.term.content:力))

从RestController说起

1
RestController.dispatchRequest-->RestController.executeHandler-->BaseRestHandler.handleRequest()-->RestSearchAction.handleRequest()-->NodeClient(AbstractClient).search-->NodeClient.execute()-->TransportSearchAction.execute-->TransportAction.execute-->TransportSearchAction.doExecute-->TransportSearchQueryAndFetchAction(TransportAction).execute-->TransportSearchQueryThenFetchAction.doExecute

TransportSearchQueryThenFetchAction的doExecute中启动一个异步task AsyncAction

1
2
3
   protected void doExecute(SearchRequest searchRequest, ActionListener<SearchResponse> listener) {
        new AsyncAction(searchRequest, listener).start();
    }

在start中主要分两个过程queryPhase.execute(searchService.executeQueryPhase)与fetchPhase.execute(searchService.executeFetchPhase)

queryPhase.execute流程

1
AsyncAction.start-->AsyncAction.performFirstPhase-->TransportSearchQueryThenFetchAction.sendExecuteFirstPhase-->SearchService.sendExecuteQuery-->SearchService.executeQueryPhase-->queryPhase.execute

在SearchService.executeQueryPhase的中首先调用createAndPutContext创建SearchContext,createAndPutContext创建SearchContext,并且接入activeContexts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final SearchContext createAndPutContext(ShardSearchRequest request) throws ElasticsearchException {
    SearchContext context = createContext(request, null);
    boolean success = false;
    try {
        activeContexts.put(context.id(), context);
        context.indexShard().searchService().onNewContext(context);
        success = true;
        return context;
    } finally {
        if (!success) {
            freeContext(context);
        }
    }
}

createContext中创建SearchContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
IndexService indexService = indicesService.indexServiceSafe(request.index());
IndexShard indexShard = indexService.shardSafe(request.shardId());

SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.index(), request.shardId());

Engine.Searcher engineSearcher = searcher == null ? indexShard.acquireSearcher("search") : searcher;
SearchContext context = new DefaultSearchContext(idGenerator.incrementAndGet(), request, shardTarget, engineSearcher, indexService, indexShard, scriptService, cacheRecycler, pageCacheRecycler, bigArrays);
SearchContext.setCurrent(context);
try {
    context.scroll(request.scroll());
    context.useSlowScroll(request.useSlowScroll());

    parseTemplate(request);
    parseSource(context, request.source());
    parseSource(context, request.extraSource());

    // if the from and size are still not set, default them
    if (context.from() == -1) {
        context.from(0);
    }
    if (context.size() == -1) {
        context.size(10);
    }

    // pre process
    dfsPhase.preProcess(context);
    queryPhase.preProcess(context);
    fetchPhase.preProcess(context);

    // compute the context keep alive
    long keepAlive = defaultKeepAlive;
    if (request.scroll() != null && request.scroll().keepAlive() != null) {
        keepAlive = request.scroll().keepAlive().millis();
    }
    context.keepAlive(keepAlive);
} 

parseSource中包含Query的解析过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// nothing to parse...
if (source == null || source.length() == 0) {
    return;
}
XContentParser parser = null;
try {
    String sourceStr = new String(source.toBytes());
    parser = XContentFactory.xContent(source).createParser(source);
    XContentParser.Token token;
    token = parser.nextToken();
    if (token != XContentParser.Token.START_OBJECT) {
        throw new ElasticsearchParseException("Expected START_OBJECT but got " + token.name() + " " + parser.currentName());
    }
    while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
        if (token == XContentParser.Token.FIELD_NAME) {
            String fieldName = parser.currentName();
            parser.nextToken();
            SearchParseElement element = elementParsers.get(fieldName);//QueryParseElement
            if (element == null) {
                throw new SearchParseException(context, "No parser for element [" + fieldName + "]");
            }
            element.parse(parser, context);
        } else {
            if (token == null) {
                throw new ElasticsearchParseException("End of query source reached but query is not complete.");
            } else {
                throw new ElasticsearchParseException("Expected field name but got " + token.name() + " \"" + parser.currentName() + "\"");
            }
        }
    }
} 

上边的两个语句都是query,source还是bytes,可以String sourceStr = new String(source.toBytes())直接转成String直观;

parser = XContentFactory.xContent(source).createParser(source);这句很关键

SearchParseElement是QueryParseElement,QueryParseElement.parse的方法

1
2
3
4
5
6
7
public class QueryParseElement implements SearchParseElement {

    @Override
    public void parse(XContentParser parser, SearchContext context) throws Exception {
        context.parsedQuery(context.queryParserService().parse(parser));
    }
}

终于看到queryParserService,这个就是IndexQueryParserService,IndexQueryParserService中的调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ParsedQuery parse(XContentParser parser) {
    return parse(cache.get(), parser);
}
public ParsedQuery parse(QueryParseContext context, XContentParser parser) {
    try {
        return innerParse(context, parser);
    } catch (IOException e) {
        throw new QueryParsingException(index, "Failed to parse", e);
    }
}
private ParsedQuery innerParse(QueryParseContext parseContext, XContentParser parser) throws IOException, QueryParsingException {
    parseContext.reset(parser);
    try {
        if (strict) {
            parseContext.parseFlags(EnumSet.of(ParseField.Flag.STRICT));
        }
        Query query = parseContext.parseInnerQuery();//
        if (query == null) {
            query = Queries.newMatchNoDocsQuery();
        }
        return new ParsedQuery(query, parseContext.copyNamedFilters());
    } finally {
        parseContext.reset(null);
    }
}

cache是CloseableThreadLocal ,这个是包装的ThreadLocal

Query的解析过程主要在parseInnerQuery

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public Query parseInnerQuery() throws IOException, QueryParsingException {
    // move to START object
    XContentParser.Token token;
    if (parser.currentToken() != XContentParser.Token.START_OBJECT) {
        token = parser.nextToken();
        if (token != XContentParser.Token.START_OBJECT) {
            throw new QueryParsingException(index, "[_na] query malformed, must start with start_object");
        }
    }
    token = parser.nextToken();
    if (token != XContentParser.Token.FIELD_NAME) {
        throw new QueryParsingException(index, "[_na] query malformed, no field after start_object");
    }
    String queryName = parser.currentName();
    // move to the next START_OBJECT
    token = parser.nextToken();
    if (token != XContentParser.Token.START_OBJECT && token != XContentParser.Token.START_ARRAY) {
        throw new QueryParsingException(index, "[_na] query malformed, no field after start_object");
    }

    QueryParser queryParser = indexQueryParser.queryParser(queryName);
    if (queryParser == null) {
        throw new QueryParsingException(index, "No query registered for [" + queryName + "]");
    }
    Query result = queryParser.parse(this);
    if (parser.currentToken() == XContentParser.Token.END_OBJECT || parser.currentToken() == XContentParser.Token.END_ARRAY) {
        // if we are at END_OBJECT, move to the next one...
        parser.nextToken();
    }
    return result;
}

依据name获取 QueryParser,第一个语句是TermQueryParser,第二个语句MultiMatchQueryParser

MatchQueryParser解析的语句比较多 NAME, “match_phrase”, “matchPhrase”, “match_phrase_prefix”, “matchPhrasePrefix”, “matchFuzzy”, “match_fuzzy”, “fuzzy_match”,其中name就是match

1
QueryParser queryParser = indexQueryParser.queryParser(queryName);

query的解析,第一个语句最后解析成TermQuery,第二条解析成DisjunctionMaxQuery

1
Query result = queryParser.parse(this)

queryPhase.execute(context)内部会调用Lucene的IndexSearcher的search()

第二阶段是在第一阶段sendExecuteFirstPhase的回调onFirstPhaseResult中调用innerMoveToSecondPhase,主要是调用子类的moveToSecondPhase,TransportSearchQueryThenFetchAction.moveToSecondPhase()

在5.1.1版本中,已经改为AsyncAction(SearchQueryThenFetchAsyncAction)

AsyncAction在TransportTasksAction了?TransportTasksAction->doExecute()

1
2
3
protected void doExecute(Task task, TasksRequest request, ActionListener<TasksResponse> listener) {
    new AsyncAction(task, request, listener).start();
}

都会走到这一步

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
this.asyncSender = interceptor.interceptSender(this::sendRequestInternal);

default AsyncSender interceptSender(AsyncSender sender) {
    return sender;
}
public final <T extends TransportResponse> void sendRequest(final DiscoveryNode node, final String action,
                                                              final TransportRequest request,
                                                              final TransportRequestOptions options,
                                                              TransportResponseHandler<T> handler) {
      asyncSender.sendRequest(node, action, request, options, handler);
  }
 private <T extends TransportResponse> void sendRequestInternal(final DiscoveryNode node, final String action,
                                                                   final TransportRequest request,
                                                                   final TransportRequestOptions options,
                                                                   TransportResponseHandler<T> handler) {...}

顺便理了下Primary与Replica,TransportService中的sendRequestInternal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
if (node == null) {
throw new IllegalStateException("can't send request to a null node");
}
final long requestId = newRequestId();
final TimeoutHandler timeoutHandler;
try {

if (options.timeout() == null) {
    timeoutHandler = null;
} else {
    timeoutHandler = new TimeoutHandler(requestId);
}
TransportResponseHandler<T> responseHandler =
    new ContextRestoreResponseHandler<>(threadPool.getThreadContext().newStoredContext(), handler);
clientHandlers.put(requestId, new RequestHolder<>(responseHandler, node, action, timeoutHandler));
if (lifecycle.stoppedOrClosed()) {
    // if we are not started the exception handling will remove the RequestHolder again and calls the handler to notify
    // the caller. It will only notify if the toStop code hasn't done the work yet.
    throw new TransportException("TransportService is closed stopped can't send request");
}
if (timeoutHandler != null) {
    assert options.timeout() != null;
    timeoutHandler.future = threadPool.schedule(options.timeout(), ThreadPool.Names.GENERIC, timeoutHandler);
}
if (node.equals(localNode)) {
    sendLocalRequest(requestId, action, request);
} else {
    transport.sendRequest(node, requestId, action, request, options);
}

省取了部分代码,暂时不看TimeoutHandler,就是sendLocalRequest()执行本节点操作或transport.sendRequest()发送消息出去?看下local

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private void sendLocalRequest(long requestId, final String action, final TransportRequest request) {

      final RequestHandlerRegistry reg = adapter.getRequestHandler(action);

      final String executor = reg.getExecutor();
    
      if (ThreadPool.Names.SAME.equals(executor)) {
          //noinspection unchecked
          reg.processMessageReceived(request, channel);
      } else {
          threadPool.executor(executor).execute(new AbstractRunnable() {
              @Override
              protected void doRun() throws Exception {
                  //noinspection unchecked
                  reg.processMessageReceived(request, channel);
              }

主要是 reg.processMessageReceived(request, channel);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
    final Task task = taskManager.register(channel.getChannelType(), action, request);
    if (task == null) {
        handler.messageReceived(request, channel);
    } else {
        boolean success = false;
        try { // ReplicaOperationTransportHandler/PrimaryOperationTransportHandler
            handler.messageReceived(request, new TransportChannelWrapper(taskManager, task, channel), task);
            success = true;
        } finally {
            if (success == false) {
                taskManager.unregister(task);
            }
        }
    }
}

这里的handler注册位于TransportReplicationAction里面

1
2
3
4
5
6
7
8
transportService.registerRequestHandler(actionName, request, ThreadPool.Names.SAME, new OperationTransportHandler());
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
    new PrimaryOperationTransportHandler());
// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(transportReplicaAction,
    () -> new ConcreteShardRequest<>(replicaRequest),
    executor, true, true,
    new ReplicaOperationTransportHandler());

PrimaryOperationTransportHandler的messageReceived方法

1
2
3
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
    new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
}

ReplicaOperationTransportHandler的messageReceived方法

1
2
3
4
public void messageReceived(ConcreteShardRequest<ReplicaRequest> requestWithAID, TransportChannel channel, Task task)
    throws Exception {
    new AsyncReplicaAction(requestWithAID.request, requestWithAID.targetAllocationID, channel, (ReplicationTask) task).run();
}

6.0.0-alpha1 中Primary与Replica

1
2
3
4
5
6
7
8
9
10
11
12
class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
    @Override
    public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
        new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
    }
}
class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<ReplicaRequest>> {
    public void messageReceived(ConcreteShardRequest<ReplicaRequest> requestWithAID, TransportChannel channel, Task task)
        throws Exception {
        new AsyncReplicaAction(requestWithAID.request, requestWithAID.targetAllocationID, channel, (ReplicationTask) task).run();
    }
}

Primary

1
AsyncPrimaryAction.onResponse-->ReplicationOperation.execute-->primary.perform-->PrimaryShardReference.perform-->TransportShardBulkAction.shardOperationOnPrimary

Replica

1
AsyncReplicaAction.onResponse-->TransportShardBulkAction.shardOperationOnReplica

ElasticSearch:剖析query_and_fetch和query_then_fetch的区别

This post is licensed under CC BY 4.0 by the author.