xxljob客户端架构流程
一、xxl-job的调度流程和配置
任务调度器和执行器使用http协议通信,各自有轮询线程处理不同业务。
二、XXL-JOB客户端启动流程加载Bean: 从spring容器获取所有对象,并遍历查找方法上标记XxlJob注解的方法 将xxljob配置的jobname作为key,对象Handle作为value注册Map 中 ConcurrentMap jobHandlerRepository的Map中维护; 创建执行任务的线程池; 启动内嵌的Netty服务; 启动注册线程,每隔30s上报一次注册信息。
public class EmbedServer { public void start(final String address, final int port, final String appname, final String accessToken) { executorBiz = new ExecutorBizImpl(); thread = new Thread(new Runnable() { @Override public void run() { // param EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor( 0, 200, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(2000)); // start server ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle .addLast(new HttpServerCodec()) .addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool)); } }).childOption(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.bind(port).sync(); // start registry startRegistry(appname, address); // wait util stop future.channel().closeFuture().sync(); } }); thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave thread.start(); } }三、任务的下发与执行
任务的下发与执行(服务端发送给客户端):
收到服务器不动执行进行任务分发:
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) { switch (uri) { case "/beat": return executorBiz.beat(); case "/idleBeat": IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class); return executorBiz.idleBeat(idleBeatParam); case "/run": TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class); return executorBiz.run(triggerParam); case "/kill": KillParam killParam = GsonTool.fromJson(requestData, KillParam.class); return executorBiz.kill(killParam); case "/log": LogParam logParam = GsonTool.fromJson(requestData, LogParam.class); return executorBiz.log(logParam); default: return new ReturnT(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found."); } }1 /beat:心跳保活检测
直接return success,用户服务器探活; 2 /idleBeat:任务执行策略配置为忙碌转移时使用;
等待队列如果存在待执行任务时,返回false;
等待队列为空时:返回true; 3 /run:接收到执行任务指令
将任务提交到执行队列中,并返回true;
队列满或handler不存在时返回false; 4 /kill:中断任务
对执行任务的线程执行 JobThread.interrupt();
每个任务Id会有一个线程,Kill仅杀死执行该任务Id的线程,下次再下发任务发现线程已中断会重新创建线程。 5 /log:获取执行log
返回客户端执行的本地log给服务端。 四、客户端注册和执行结果上报
客户端注册和执行结果上报(客户端发送给服务端)
@Override public ReturnT callback(List callbackParamList) { return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class); } @Override public ReturnT registry(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class); } @Override public ReturnT registryRemove(RegistryParam registryParam) { return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class); }1 /registry:注册客户端信息
启动线程定时注册自己的服务到调度器;
创建线程,30s轮询一次,上报注册信息。 2 /registryRemove:移出执行器请求
将自己从执行器列表移除;
程序退出时会调用一次,在Netty的finally代码块自动执行。 3 /callback:异步回调结果
执行器异步回调给调度器执行任务结果;
每次任务完成时上报。 五、附录1 网络通信格式:
(1)客户端注册
http://127.0.0.1:8080/xxl-job-admin/api/registry { "registryGroup": "EXECUTOR" "registryKey": "xxl-job-executor-sample" "registryValue": "http://172.30.0.67:9999/" } Response: { "code": 200 "msg": null "content": null }
(2)客户端移除注册
http://127.0.0.1:8080/xxl-job-admin/api/registryRemove { "registryGroup": "EXECUTOR" "registryKey": "xxl-job-executor-sample" "registryValue": "http://xxljob-axzo.cn" } Response: { "code": 200 "msg": null "content": null }
(3)客户端执行任务结果上报
http://127.0.0.1:8080/xxl-job-admin/api/callback { "logId": 1238 "logDateTim": 1667197980007 "handleCode": 200 } Response: { "code": 200 "msg": null "content": null }
(4)执行器下发任务:同步回调仅代表任务是否发送成功
http://172.30.0.67:9999/run { "jobId": 4 "executorHandler": "demoJobHandler" "executorParams": "" "executorBlockStrategy": "SERIAL_EXECUTION" "executorTimeout": 0 "logId": 1238 "logDateTime": 1667197980007 "glueType": "BEAN" "glueSource": "" "glueUpdatetime": 1666683613000 "broadcastIndex": 0 "broadcastTotal": 1 } Response: { "code": 200 "msg": null "content": null }2 Token配置详解
1.配置了token后,client发送的每隔http请求头会带上XXL-JOB-ACCESS-TOKEN :{xxl.job.accessToken} ;
2.该参数不会对请求参数加密;
3.如果配置不匹配,客户端请求报错:
{ "code": 500 "msg": "The access token is wrong." "content": null }
4.发送配置token的请求,Header中新增了Token参数
5.配置错token的返回
程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习, 关注 "IT巅峰技术" 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。
作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 "RocketMQ 上海社区"联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。