文章目錄
  1. 1. 接收请求
  2. 2. 请求处理
    1. 2.1. client类型
    2. 2.2. transportAction类型
    3. 2.3. 具体执行
    4. 2.4. query阶段
    5. 2.5. fetch阶段
  3. 3. 请求结果返回

Elasticsearch源码分析-启动里简单了解Elasticsearch的启动过程,这里来看看查询过程。

接收请求

从启动篇里知道HttpRequestHandler,进入这个类查看,看到messageReceived, 进入NettyHttpRequest, 看到String uri = request.getUri(); 看到这里没有日志输出,一直纳闷为什么Elasticsearch没有请求url输出,于是加上日志

1
2
3
4
5
try {
logger.info("query uri {}", URLDecoder.decode(uri, "UTF-8"));
} catch (java.io.UnsupportedEncodingException e) {
logger.info("query uri {}", uri);
}

之后日志里就有请求的uri了。看到RestUtils.decodeQueryString(uri, pathEndPos + 1, params), 知道请求参数是在这里完成解析。

查看serverTransport.dispatchRequest,进入httpServerAdapter.dispatchRequest(request, channel),这里要知道httpServerAdapter的具体对象,查看

1
2
3
public void httpServerAdapter(HttpServerAdapter httpServerAdapter) {
this.httpServerAdapter = httpServerAdapter;
}

被哪个函数调用,跳到HttpServer.java, 打开server.internalDispatchRequest(request, channel); 之后到了restController.dispatchRequest(request, channel);

最终请求的处理由restController.dispatchRequest(request, channel);完成

请求处理

进入RestController的dispatchRequest方法, 进入executeHandler方法, 在getHandler(request)里,根据不同的请求方法,返回不同的handler,然后调用handler里的handleRequest方法处理请求,这里以GET方法为例。

对于不同的动作,都可以使用GET方法,如curl -XGET /index/type/id, curl -XGET /index/type/_search, 这里以/index/type/_search这查询为例。

在RestSearchAction.java里,有语句controller.registerHandler(GET, "/{index}/{type}/_search", this);, 所以执行curl -XGET /index/type/_search时,得到的handler就是RestSearchAction, 并执行这个类里的handleRequest方法。

进入RestSearchAction.java里的handleRequest方法,先是执行RestSearchAction.parseSearchRequest(searchRequest, request, parseFieldMatcher, null),这个方法主要对查询参数进行设置,之后调用client.search(searchRequest, new RestStatusToXContentListener(channel))进行查询。

client类型

现在要弄清楚client的具体类型, 在Node初始化里,有modules.add(new NodeClientModule())这句,打开查看,有bind(Client.class).to(NodeClient.class).asEagerSingleton(),所以这里的client具体类型是NodeClient, 而NodeClent继承自AbstractClient,

然后看查询调用过程client.search ->client.execute->client.doExecute->transportAction.execute, 最终还是由transportAction来完成实际的查询

值得注意的一点是client. execute是execute(SearchAction.INSTANCE, request, listener);

transportAction类型

在Node初始化时,有modules.add(new ActionModule(false)),进入ActionModule.java查看,有registerAction(SearchAction.INSTANCE, TransportSearchAction.class);所以transportAction是TransportSearchAction类型。

具体执行

transportAction.execute最终会调用transportAction.doExecute, 这里是进入TransportSearchAction.java的doExecute,这里会对search_type进行判断

对于search_type, 是由RestSearchAction.java里的searchRequest.searchType(searchType)语句设定,默认是SearchType.DEFAULT, 也就是SearchType.QUERY_THEN_FETCH

query阶段

由此新建了一个SearchQueryThenFetchAsyncAction实例,之后searchAsyncAction.start();开始查询。在父类AbstractSearchAsyncAction的start()函数里,

1
2
3
4
5
6
7
8
9
10
for (final ShardIterator shardIt : shardsIts) {
shardIndex++;
final ShardRouting shard = shardIt.nextOrNull();
if (shard != null) {
performFirstPhase(shardIndex, shardIt, shard);
} else {
// really, no shards active in this group
onFirstPhaseResult(shardIndex, null, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()));
}
}

对每一个shard调用performFirstPhase,

查看performFirstPhase, 最终会调用sendExecuteFirstPhase,并添加了ActionListener, 如果成功则执行onResponse里的onFirstPhaseResult, 在onFirstPhaseResult里有个判断, if (xTotalOps == expectedTotalOps),当所有shard都执行完后,执行innerMoveToSecondPhase, 最终执行moveToSecondPhase

fetch阶段

在moveToSecondPhase里, sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults)对第一阶段的结果进行合并,之后对每个shard里入选到topN的doc进行fetch,即执行executeFetch(entry.index, queryResult.shardTarget(), counter, fetchSearchRequest, node),

在executeFetch里,

1
2
3
if (counter.decrementAndGet() == 0) {
finishHim();
}

当所有需要执行的shard都结束后,执行finishHim(),标志着查询结束。

在finishHim里,

1
final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,fetchResults, request);

对fetch阶段Shard返回的结果进行合并.

1
listener.onResponse(new SearchResponse(internalResponse, scrollId, expectedSuccessfulOps,successfulOps.get(), buildTookInMillis(), buildShardFailures()))

设置返回的SearchResponse对象.

请求结果返回

在TransportAction调用execute时,有添加Actionlistener,

1
2
3
4
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}

这里的Response就是上面返回的SearchResponse, 而listener可以在RestSearchAction中找到, 是RestStatusToXContentListener(channel).

RestStatusToXContentListener继承RestResponseListener, RestResponseListener继承RestActionListener, 最终onResponse方法会调用RestStatusToXContentListener中的buildResponse, 也就调用了SearchResponse中的toXContent方法。

到此,大致了解Elasticsearch的查询过程。目前,我修改JSON返回格式,就是修改SearchResponse的toXContent方法。

打赏作者

文章目錄
  1. 1. 接收请求
  2. 2. 请求处理
    1. 2.1. client类型
    2. 2.2. transportAction类型
    3. 2.3. 具体执行
    4. 2.4. query阶段
    5. 2.5. fetch阶段
  3. 3. 请求结果返回