此处使用分析的flink版本为1。10。0 直接使用jobmanager。sh和taskmanager。sh启动单机模式的flink服务的脚本逻辑其实是调用flinkdaemon。sh Usage:flinkdaemon。sh(startstopstopall)(taskexecutorzookeeperhistoryserverstandalonesessionstandalonejob)〔args〕 里面有启动的入口类,即可以追踪到flink单机版启动逻辑caseDAEMONin(taskexecutor)CLASSTORUNorg。apache。flink。runtime。taskexecutor。TaskManagerRunner;;(zookeeper)CLASSTORUNorg。apache。flink。runtime。zookeeper。FlinkZooKeeperQuorumPeer;;(historyserver)CLASSTORUNorg。apache。flink。runtime。webmonitor。history。HistoryServer;;(standalonesession)CLASSTORUNorg。apache。flink。runtime。entrypoint。StandaloneSessionClusterEntrypoint;;(standalonejob)CLASSTORUNorg。apache。flink。container。entrypoint。StandaloneJobClusterEntryPoint;; 可以得出jobmanager和taskmanager的独立启动入口类 org。apache。flink。container。entrypoint。StandaloneJobClusterEntryPoint org。apache。flink。runtime。taskexecutor。TaskManagerRunner 和本地集群方式启动入口类 org。apache。flink。runtime。entrypoint。StandaloneSessionClusterEntrypoint 其他脚本的执行其实都是使用的这个脚本逻辑,如startcluster。sh脚本启动,不管是启动HA模式,还是非HA模式,都会执行jobmanager。sh脚本,所以都会走flinkdaemon。sh 那我们就可以通过代码进行分析了 StandaloneSessionClusterEntrypoint启动源码,StandaloneSessionClusterEntrypoint继承自SessionClusterEntrypoint进行启动检查,获取启动环境信息,如版本,scala版本,git提交号,jvm版本,hadoop版本,javahome等EnvironmentInformation。logEnvironmentInfo(LOG,StandaloneSessionClusterEntrypoint。class。getSimpleName(),args);捕捉系统信号windows包含:终止(TERM),INT(键盘中断);其它系统多包含:HUP(终端挂起或者控制进程终止)SignalHandler。register(LOG);注册JVM关闭的钩子函数设置5秒延迟退出JvmShutdownSafeguard。installAsShutdownHook(LOG);获取启动行参数,使用的是org。apache。commons。cli所有的启动参数都写在了org。apache。flink。runtime。entrypoint。parser。CommandLineOptions类中包括:cconfigDir:Directorywhichcontainstheconfigurationfileflinkconf。yml。rwebuiport:PortfortherestendpointandthewebUI。Dpropertyvalue:usevalueforgivenpropertyhhost:HostnamefortheRPCservice。xexecutionMode:DeprecatedoptionEntrypointClusterConfigurationentrypointClusterConfigurationnull;finalCommandLineParserEntrypointClusterConfigurationcommandLineParsernewCommandLineParser(newEntrypointClusterConfigurationParserFactory());try{entrypointClusterConfigurationcommandLineParser。parse(args);}catch(FlinkParseExceptione){LOG。error(Couldnotparsecommandlinearguments{}。,args,e);commandLineParser。printHelp(StandaloneSessionClusterEntrypoint。class。getSimpleName());System。exit(1);}获取到参数ConfigurationconfigurationloadConfiguration(entrypointClusterConfiguration);StandaloneSessionClusterEntrypointentrypointnewStandaloneSessionClusterEntrypoint(configuration);运行执行父类方法ClusterEntrypoint。runClusterEntrypoint(entrypoint); SessionClusterEntrypoint继承ClusterEntrypointpublicstaticvoidrunClusterEntrypoint(ClusterEntrypointclusterEntrypoint){finalStringclusterEntrypointNameclusterEntrypoint。getClass()。getSimpleName();try{启动集群方法clusterEntrypoint。startCluster();}catch(ClusterEntrypointExceptione){LOG。error(String。format(Couldnotstartclusterentrypoints。,clusterEntrypointName),e);System。exit(STARTUPFAILURERETURNCODE);}clusterEntrypoint。getTerminationFuture()。whenComplete((applicationStatus,throwable){finalintreturnCode;if(throwable!null){returnCodeRUNTIMEFAILURERETURNCODE;}else{returnCodeapplicationStatus。processExitCode();}LOG。info(Terminatingclusterentrypointprocess{}withexitcode{}。,clusterEntrypointName,returnCode,throwable);System。exit(returnCode);});} 进入ClusterEntrypoint的startCluster方法 publicvoidstartCluster()throwsClusterEntrypointException{try{replaceGracefulExitWithHaltIfConfigured(configuration);初始化共享文件系统设置,会将文件系统映射为urlconfigureFileSystems(configuration);初始化安全上下文:其中为进程范围的安全配置,使用可用的安全模块(即Hadoop、JAAS)应用配置。SecurityContextsecurityContextinstallSecurityContext(configuration);securityContext。runSecured((CallableVoid)(){runCluster(configuration);returnnull;});}} 接下来进入runCluster方法privatevoidrunCluster(Configurationconfiguration)throwsException{synchronized(lock){初始化集群服务,initializeServices(configuration);writehostinformationintoconfigurationconfiguration。setString(JobManagerOptions。ADDRESS,commonRpcService。getAddress());configuration。setInteger(JobManagerOptions。PORT,commonRpcService。getPort());finalDispatcherResourceManagerComponentFactorydispatcherResourceManagerComponentFactorycreateDispatcherResourceManagerComponentFactory(configuration);clusterComponentdispatcherResourceManagerComponentFactory。create(configuration,ioExecutor,commonRpcService,haServices,blobServer,heartbeatServices,metricRegistry,archivedExecutionGraphStore,newRpcMetricQueryServiceRetriever(metricRegistry。getMetricQueryServiceRpcService()),this); 根据上述服务创建资源,调度,监控进程DispatcherResourceManagerComponent,并设置同步状态关闭,这之中还包含网关及查询服务检索器的创建,基本了解了flink启动读取运行参数,获取本地配置,并增加了JVM关闭钩子等一些我们平常开发不常用的一些方法,flink的启动流程刚刚迈入门槛。 后续看看flink是如何实现这些服务,并管理好task,保证服务的稳定性的。