博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
玩转Elasticsearch源码-ActionModule启动分析
阅读量:6833 次
发布时间:2019-06-26

本文共 23701 字,大约阅读时间需要 79 分钟。

ActionModule的用途

org.elasticsearch.action.ActionModule主要维护了请求和响应相关组件,它们可能来自ES本身或者来自plugin。

  • transportClient:标识是否是客户端
  • settings: 配置信息
  • indexNameExpressionResolver:索引名称表达式处理器
  • indexScopedSettings:index级别的配置信息
  • clusterSettings:cluster级别的配置信息
  • settingsFilter:允许通过简单正则表达式模式或完整设置键筛选设置对象的类。它用于rest层上的响应过滤,例如过滤出access keys等敏感信息
  • actionPlugins:实现ActionPlugin接口的插件
  • actions:请求和响应的处理逻辑
  • actionFilters:从ActionPlugin中获取,在TransportAction里面requestFilterChain会去调用
  • autoCreateIndex:封装是否创建新索引的逻辑
  • destructiveOperations:帮助处理破坏性操作和通配符使用
  • restController:rest接口处理器

源码分析

构造方法

先看看构造方法,每一步都简单加了解释

public ActionModule(boolean transportClient, Settings settings, IndexNameExpressionResolver indexNameExpressionResolver,                        IndexScopedSettings indexScopedSettings, ClusterSettings clusterSettings, SettingsFilter settingsFilter,                        ThreadPool threadPool, List
actionPlugins, NodeClient nodeClient, CircuitBreakerService circuitBreakerService, UsageService usageService) { this.transportClient = transportClient;//服务端为false,客户端为true this.settings = settings; this.indexNameExpressionResolver = indexNameExpressionResolver;//将提供的索引表达式转换为实际的具体索引 this.indexScopedSettings = indexScopedSettings;//index级别的配置 this.clusterSettings = clusterSettings;//集群级别的配置 this.settingsFilter = settingsFilter;//允许通过简单正则表达式模式或完整设置键筛选设置对象的类。它用于rest层上的响应过滤,例如过滤出access keys等敏感信息。 this.actionPlugins = actionPlugins;//actionPlugins主要实现了 ActionPlugin 的 getActions() actions = setupActions(actionPlugins);//安装actions actionFilters = setupActionFilters(actionPlugins);//从ActionPlugin中获取,后面在TransportAction里面requestFilterChain会去调用,以后再细讲 autoCreateIndex = transportClient ? null : new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver);//封装是否创建新索引的逻辑 destructiveOperations = new DestructiveOperations(settings, clusterSettings);//帮助处理破坏性操作和通配符使用。 Set
headers = actionPlugins.stream().flatMap(p -> p.getRestHeaders().stream()).collect(Collectors.toSet()); UnaryOperator
restWrapper = null;//RestHandler包装器,下面从actionPlugins里面遍历获取 for (ActionPlugin plugin : actionPlugins) { UnaryOperator
newRestWrapper = plugin.getRestHandlerWrapper(threadPool.getThreadContext()); if (newRestWrapper != null) { logger.debug("Using REST wrapper from plugin " + plugin.getClass().getName()); if (restWrapper != null) { throw new IllegalArgumentException("Cannot have more than one plugin implementing a REST wrapper"); } restWrapper = newRestWrapper; } } if (transportClient) {//客户端不需要暴露http服务 restController = null; } else {//服务端创建一个RestController,实现了Dispatcher接口,系统接受到请求后从netty的handler一路调用到 dispatchRequest 方法 restController = new RestController(settings, headers, restWrapper, nodeClient, circuitBreakerService, usageService); } }

首先设置了settings等参数,然后进入

setupActions代码比较好理解,直接贴源码:

static Map
> setupActions(List
actionPlugins) { // Subclass NamedRegistry for easy registration class ActionRegistry extends NamedRegistry
> { ActionRegistry() { super("action"); } public void register(ActionHandler
handler) { register(handler.getAction().name(), handler); } public
void register( GenericAction
action, Class
> transportAction, Class
... supportTransportActions) { register(new ActionHandler<>(action, transportAction, supportTransportActions)); } } ActionRegistry actions = new ActionRegistry(); actions.register(MainAction.INSTANCE, TransportMainAction.class); actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class); actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class); actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class); actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class); actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class); actions.register(ListTasksAction.INSTANCE, TransportListTasksAction.class); actions.register(GetTaskAction.INSTANCE, TransportGetTaskAction.class); actions.register(CancelTasksAction.INSTANCE, TransportCancelTasksAction.class); actions.register(ClusterAllocationExplainAction.INSTANCE, TransportClusterAllocationExplainAction.class); actions.register(ClusterStatsAction.INSTANCE, TransportClusterStatsAction.class); actions.register(ClusterStateAction.INSTANCE, TransportClusterStateAction.class); actions.register(ClusterHealthAction.INSTANCE, TransportClusterHealthAction.class); actions.register(ClusterUpdateSettingsAction.INSTANCE, TransportClusterUpdateSettingsAction.class); actions.register(ClusterRerouteAction.INSTANCE, TransportClusterRerouteAction.class); actions.register(ClusterSearchShardsAction.INSTANCE, TransportClusterSearchShardsAction.class); actions.register(PendingClusterTasksAction.INSTANCE, TransportPendingClusterTasksAction.class); actions.register(PutRepositoryAction.INSTANCE, TransportPutRepositoryAction.class); actions.register(GetRepositoriesAction.INSTANCE, TransportGetRepositoriesAction.class); actions.register(DeleteRepositoryAction.INSTANCE, TransportDeleteRepositoryAction.class); actions.register(VerifyRepositoryAction.INSTANCE, TransportVerifyRepositoryAction.class); actions.register(GetSnapshotsAction.INSTANCE, TransportGetSnapshotsAction.class); actions.register(DeleteSnapshotAction.INSTANCE, TransportDeleteSnapshotAction.class); actions.register(CreateSnapshotAction.INSTANCE, TransportCreateSnapshotAction.class); actions.register(RestoreSnapshotAction.INSTANCE, TransportRestoreSnapshotAction.class); actions.register(SnapshotsStatusAction.INSTANCE, TransportSnapshotsStatusAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); actions.register(CreateIndexAction.INSTANCE, TransportCreateIndexAction.class); actions.register(ShrinkAction.INSTANCE, TransportShrinkAction.class); actions.register(ResizeAction.INSTANCE, TransportResizeAction.class); actions.register(RolloverAction.INSTANCE, TransportRolloverAction.class); actions.register(DeleteIndexAction.INSTANCE, TransportDeleteIndexAction.class); actions.register(GetIndexAction.INSTANCE, TransportGetIndexAction.class); actions.register(OpenIndexAction.INSTANCE, TransportOpenIndexAction.class); actions.register(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class); actions.register(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class); actions.register(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class); actions.register(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class); actions.register(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class, TransportGetFieldMappingsIndexAction.class); actions.register(PutMappingAction.INSTANCE, TransportPutMappingAction.class); actions.register(IndicesAliasesAction.INSTANCE, TransportIndicesAliasesAction.class); actions.register(UpdateSettingsAction.INSTANCE, TransportUpdateSettingsAction.class); actions.register(AnalyzeAction.INSTANCE, TransportAnalyzeAction.class); actions.register(PutIndexTemplateAction.INSTANCE, TransportPutIndexTemplateAction.class); actions.register(GetIndexTemplatesAction.INSTANCE, TransportGetIndexTemplatesAction.class); actions.register(DeleteIndexTemplateAction.INSTANCE, TransportDeleteIndexTemplateAction.class); actions.register(ValidateQueryAction.INSTANCE, TransportValidateQueryAction.class); actions.register(RefreshAction.INSTANCE, TransportRefreshAction.class); actions.register(FlushAction.INSTANCE, TransportFlushAction.class); actions.register(SyncedFlushAction.INSTANCE, TransportSyncedFlushAction.class); actions.register(ForceMergeAction.INSTANCE, TransportForceMergeAction.class); actions.register(UpgradeAction.INSTANCE, TransportUpgradeAction.class); actions.register(UpgradeStatusAction.INSTANCE, TransportUpgradeStatusAction.class); actions.register(UpgradeSettingsAction.INSTANCE, TransportUpgradeSettingsAction.class); actions.register(ClearIndicesCacheAction.INSTANCE, TransportClearIndicesCacheAction.class); actions.register(GetAliasesAction.INSTANCE, TransportGetAliasesAction.class); actions.register(AliasesExistAction.INSTANCE, TransportAliasesExistAction.class); actions.register(GetSettingsAction.INSTANCE, TransportGetSettingsAction.class); actions.register(IndexAction.INSTANCE, TransportIndexAction.class); actions.register(GetAction.INSTANCE, TransportGetAction.class); actions.register(TermVectorsAction.INSTANCE, TransportTermVectorsAction.class); actions.register(MultiTermVectorsAction.INSTANCE, TransportMultiTermVectorsAction.class, TransportShardMultiTermsVectorAction.class); actions.register(DeleteAction.INSTANCE, TransportDeleteAction.class); actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class); actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class, TransportShardMultiGetAction.class); actions.register(BulkAction.INSTANCE, TransportBulkAction.class, TransportShardBulkAction.class); actions.register(SearchAction.INSTANCE, TransportSearchAction.class); actions.register(SearchScrollAction.INSTANCE, TransportSearchScrollAction.class); actions.register(MultiSearchAction.INSTANCE, TransportMultiSearchAction.class); actions.register(ExplainAction.INSTANCE, TransportExplainAction.class); actions.register(ClearScrollAction.INSTANCE, TransportClearScrollAction.class); actions.register(RecoveryAction.INSTANCE, TransportRecoveryAction.class); //Indexed scripts actions.register(PutStoredScriptAction.INSTANCE, TransportPutStoredScriptAction.class); actions.register(GetStoredScriptAction.INSTANCE, TransportGetStoredScriptAction.class); actions.register(DeleteStoredScriptAction.INSTANCE, TransportDeleteStoredScriptAction.class); actions.register(FieldCapabilitiesAction.INSTANCE, TransportFieldCapabilitiesAction.class, TransportFieldCapabilitiesIndexAction.class); actions.register(PutPipelineAction.INSTANCE, PutPipelineTransportAction.class); actions.register(GetPipelineAction.INSTANCE, GetPipelineTransportAction.class); actions.register(DeletePipelineAction.INSTANCE, DeletePipelineTransportAction.class); actions.register(SimulatePipelineAction.INSTANCE, SimulatePipelineTransportAction.class); actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register); return unmodifiableMap(actions.getRegistry()); }

注册了一大堆Action然后返回 Map<String, ActionHandler<?, ?>> 对象

image.png
可以注意到ActionHandler包装了action和transportAction的Class对象。action是对各个类型请求的request和response的包装,并非是真正的功能实现者,可以看到它只是提供了两个新建response和request的方法,及一个字NAME字段,这个NAME字段会用于后面action调用中。每个action对应的功能实现是在对应的transportAction中。TransportAction一般都会有 @Inject 注解的构造方法用于依赖注入,并且实现doExecute执行具体逻辑。

configure方法

configure方法由ES封装的注入器Injector调用,看看调用栈

image.png
可以知道由ES启动时候Node构造方法里面发起的

injector = modules.createInjector();

configure里面逻辑其实就是绑定一些对象到Injector中:

protected void configure() {        bind(ActionFilters.class).toInstance(actionFilters);        bind(DestructiveOperations.class).toInstance(destructiveOperations);        if (false == transportClient) {            // Supporting classes only used when not a transport client            bind(AutoCreateIndex.class).toInstance(autoCreateIndex);            bind(TransportLivenessAction.class).asEagerSingleton();            // register GenericAction -> transportAction Map used by NodeClient            @SuppressWarnings("rawtypes")            MapBinder
transportActionsBinder = MapBinder.newMapBinder(binder(), GenericAction.class, TransportAction.class); for (ActionHandler
action : actions.values()) { // bind the action as eager singleton, so the map binder one will reuse it bind(action.getTransportAction()).asEagerSingleton(); transportActionsBinder.addBinding(action.getAction()).to(action.getTransportAction()).asEagerSingleton(); for (Class
supportAction : action.getSupportTransportActions()) { bind(supportAction).asEagerSingleton(); } } } }

initRestHandlers

initRestHandlers是在Node构造方法里面直接调用的,主要是注册一大堆RestHandler到restController用于处理http 请求

public void initRestHandlers(Supplier
nodesInCluster) { List
catActions = new ArrayList<>(); Consumer
registerHandler = a -> { if (a instanceof AbstractCatAction) { catActions.add((AbstractCatAction) a); } }; registerHandler.accept(new RestMainAction(settings, restController)); registerHandler.accept(new RestNodesInfoAction(settings, restController, settingsFilter)); registerHandler.accept(new RestRemoteClusterInfoAction(settings, restController)); registerHandler.accept(new RestNodesStatsAction(settings, restController)); registerHandler.accept(new RestNodesUsageAction(settings, restController)); registerHandler.accept(new RestNodesHotThreadsAction(settings, restController)); registerHandler.accept(new RestClusterAllocationExplainAction(settings, restController)); registerHandler.accept(new RestClusterStatsAction(settings, restController)); registerHandler.accept(new RestClusterStateAction(settings, restController, settingsFilter)); registerHandler.accept(new RestClusterHealthAction(settings, restController)); registerHandler.accept(new RestClusterUpdateSettingsAction(settings, restController)); registerHandler.accept(new RestClusterGetSettingsAction(settings, restController, clusterSettings, settingsFilter)); registerHandler.accept(new RestClusterRerouteAction(settings, restController, settingsFilter)); registerHandler.accept(new RestClusterSearchShardsAction(settings, restController)); registerHandler.accept(new RestPendingClusterTasksAction(settings, restController)); registerHandler.accept(new RestPutRepositoryAction(settings, restController)); registerHandler.accept(new RestGetRepositoriesAction(settings, restController, settingsFilter)); registerHandler.accept(new RestDeleteRepositoryAction(settings, restController)); registerHandler.accept(new RestVerifyRepositoryAction(settings, restController)); registerHandler.accept(new RestGetSnapshotsAction(settings, restController)); registerHandler.accept(new RestCreateSnapshotAction(settings, restController)); registerHandler.accept(new RestRestoreSnapshotAction(settings, restController)); registerHandler.accept(new RestDeleteSnapshotAction(settings, restController)); registerHandler.accept(new RestSnapshotsStatusAction(settings, restController)); registerHandler.accept(new RestGetAllAliasesAction(settings, restController)); registerHandler.accept(new RestGetAllMappingsAction(settings, restController)); registerHandler.accept(new RestGetAllSettingsAction(settings, restController, indexScopedSettings, settingsFilter)); registerHandler.accept(new RestGetIndicesAction(settings, restController, indexScopedSettings, settingsFilter)); registerHandler.accept(new RestIndicesStatsAction(settings, restController)); registerHandler.accept(new RestIndicesSegmentsAction(settings, restController)); registerHandler.accept(new RestIndicesShardStoresAction(settings, restController)); registerHandler.accept(new RestGetAliasesAction(settings, restController)); registerHandler.accept(new RestIndexDeleteAliasesAction(settings, restController)); registerHandler.accept(new RestIndexPutAliasAction(settings, restController)); registerHandler.accept(new RestIndicesAliasesAction(settings, restController)); registerHandler.accept(new RestCreateIndexAction(settings, restController)); registerHandler.accept(new RestShrinkIndexAction(settings, restController)); registerHandler.accept(new RestSplitIndexAction(settings, restController)); registerHandler.accept(new RestRolloverIndexAction(settings, restController)); registerHandler.accept(new RestDeleteIndexAction(settings, restController)); registerHandler.accept(new RestCloseIndexAction(settings, restController)); registerHandler.accept(new RestOpenIndexAction(settings, restController)); registerHandler.accept(new RestUpdateSettingsAction(settings, restController)); registerHandler.accept(new RestGetSettingsAction(settings, restController, indexScopedSettings, settingsFilter)); registerHandler.accept(new RestAnalyzeAction(settings, restController)); registerHandler.accept(new RestGetIndexTemplateAction(settings, restController)); registerHandler.accept(new RestPutIndexTemplateAction(settings, restController)); registerHandler.accept(new RestDeleteIndexTemplateAction(settings, restController)); registerHandler.accept(new RestPutMappingAction(settings, restController)); registerHandler.accept(new RestGetMappingAction(settings, restController)); registerHandler.accept(new RestGetFieldMappingAction(settings, restController)); registerHandler.accept(new RestRefreshAction(settings, restController)); registerHandler.accept(new RestFlushAction(settings, restController)); registerHandler.accept(new RestSyncedFlushAction(settings, restController)); registerHandler.accept(new RestForceMergeAction(settings, restController)); registerHandler.accept(new RestUpgradeAction(settings, restController)); registerHandler.accept(new RestClearIndicesCacheAction(settings, restController)); registerHandler.accept(new RestIndexAction(settings, restController)); registerHandler.accept(new RestGetAction(settings, restController)); registerHandler.accept(new RestGetSourceAction(settings, restController)); registerHandler.accept(new RestMultiGetAction(settings, restController)); registerHandler.accept(new RestDeleteAction(settings, restController)); registerHandler.accept(new org.elasticsearch.rest.action.document.RestCountAction(settings, restController)); registerHandler.accept(new RestTermVectorsAction(settings, restController)); registerHandler.accept(new RestMultiTermVectorsAction(settings, restController)); registerHandler.accept(new RestBulkAction(settings, restController)); registerHandler.accept(new RestUpdateAction(settings, restController)); registerHandler.accept(new RestSearchAction(settings, restController)); registerHandler.accept(new RestSearchScrollAction(settings, restController)); registerHandler.accept(new RestClearScrollAction(settings, restController)); registerHandler.accept(new RestMultiSearchAction(settings, restController)); registerHandler.accept(new RestValidateQueryAction(settings, restController)); registerHandler.accept(new RestExplainAction(settings, restController)); registerHandler.accept(new RestRecoveryAction(settings, restController)); // Scripts API registerHandler.accept(new RestGetStoredScriptAction(settings, restController)); registerHandler.accept(new RestPutStoredScriptAction(settings, restController)); registerHandler.accept(new RestDeleteStoredScriptAction(settings, restController)); registerHandler.accept(new RestFieldCapabilitiesAction(settings, restController)); // Tasks API registerHandler.accept(new RestListTasksAction(settings, restController, nodesInCluster)); registerHandler.accept(new RestGetTaskAction(settings, restController)); registerHandler.accept(new RestCancelTasksAction(settings, restController, nodesInCluster)); // Ingest API registerHandler.accept(new RestPutPipelineAction(settings, restController)); registerHandler.accept(new RestGetPipelineAction(settings, restController)); registerHandler.accept(new RestDeletePipelineAction(settings, restController)); registerHandler.accept(new RestSimulatePipelineAction(settings, restController)); // CAT API registerHandler.accept(new RestAllocationAction(settings, restController)); registerHandler.accept(new RestShardsAction(settings, restController)); registerHandler.accept(new RestMasterAction(settings, restController)); registerHandler.accept(new RestNodesAction(settings, restController)); registerHandler.accept(new RestTasksAction(settings, restController, nodesInCluster)); registerHandler.accept(new RestIndicesAction(settings, restController, indexNameExpressionResolver)); registerHandler.accept(new RestSegmentsAction(settings, restController)); // Fully qualified to prevent interference with rest.action.count.RestCountAction registerHandler.accept(new org.elasticsearch.rest.action.cat.RestCountAction(settings, restController)); // Fully qualified to prevent interference with rest.action.indices.RestRecoveryAction registerHandler.accept(new org.elasticsearch.rest.action.cat.RestRecoveryAction(settings, restController)); registerHandler.accept(new RestHealthAction(settings, restController)); registerHandler.accept(new org.elasticsearch.rest.action.cat.RestPendingClusterTasksAction(settings, restController)); registerHandler.accept(new RestAliasAction(settings, restController)); registerHandler.accept(new RestThreadPoolAction(settings, restController)); registerHandler.accept(new RestPluginsAction(settings, restController)); registerHandler.accept(new RestFielddataAction(settings, restController)); registerHandler.accept(new RestNodeAttrsAction(settings, restController)); registerHandler.accept(new RestRepositoriesAction(settings, restController)); registerHandler.accept(new RestSnapshotAction(settings, restController)); registerHandler.accept(new RestTemplatesAction(settings, restController)); for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers(settings, restController, clusterSettings, indexScopedSettings, settingsFilter, indexNameExpressionResolver, nodesInCluster)) { registerHandler.accept(handler); } } registerHandler.accept(new RestCatAction(settings, restController, catActions)); }

再深入去看RestController里面的注册逻辑可以看到是用Trie树来做路径匹配的,具体可以看TrieNode类。

转载地址:http://lpnkl.baihongyu.com/

你可能感兴趣的文章
wordpress如何使用vim和markdown写blog
查看>>
Javascript设计模式之--单例模式
查看>>
Android应用打包时遇到的问题
查看>>
Maven构建项目忽视掉服务器已存在的jar文件
查看>>
Spring WebSocket实现消息推送
查看>>
Java日期格式化方法
查看>>
mysql性能监控指标
查看>>
A Complete Guide to FreeNAS Hardware Design, Part III: Pools, Performance, and Cache
查看>>
Luban(鲁班)——Android图片压缩工具,仿微信朋友圈压缩策略
查看>>
深入理解JAVA中的NIO 原
查看>>
android6.0源码分析-Looper,Handler,Message(二)
查看>>
cordova开发wp8应用的经验总结
查看>>
删除单链表中的重复节点(删除多余项)
查看>>
error: org.springframework.web.util.WebAppRootLi
查看>>
Mysql3306 端口被占用的解决方法
查看>>
MODIS批量处理软件MRT的安装说明
查看>>
右键文本文档消失——解决办法
查看>>
spring源码剖析之Spring Security安全框架
查看>>
开启关闭mysql函数功能
查看>>
运行Perl程序
查看>>