范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

记录一次发送千万级别数量消息的定时任务优化

  业务场景
  我们每天都要对最近三个月内的活跃用户进行批量营销、账单逾期计算等操作,用户数据大概是  800w   。我们的方案是发送一个 CUSTOMER_DAILY   消息,然后订阅这个消息再去分别发送批量营销、账单逾期等业务消息。目前发送完 CUSTOMER_DAILY   消息大约需要五个小时。勿纠结当下
  大家不必纠结当下为什么效率这么低......因为系统都是慢慢优化出来的嘛,以前的代码肯定多少有些问题。或许再过一段时间我们自己的代码也有很多问题,这都很正常。下面简单地贴一些我适当改造过的当前实现逻辑的代码来分析当前方案存在的问题。 目前的方案
  目前是采用定时任务触发,线程池提交任务的方式,代码如下(行尾注释是我写的 example 值): // 最大活跃idlong maxId = customerService.findMaxActiveId(beginTime); //20001// 最小活跃idlong minId = customerService.findMinActiveId(beginTime); //1// 查询的id最大可能总条数long listSize = maxId - minId; //20000// 开启的线程数int runSize = 4;// 平均每次查询数目long count = listSize / runSize; // 5000// 创建一个线程池,核心线程数量和开启线程的数量一样ExecutorService executor = CreateThreadUtil.createThread(runSize);for (int i = 0; i <= runSize; i++) {  // 计算sql语句中每个分页查询的起始和结束数据下标  long min = minId + i * count; //1   ,  5001 ,  10001 , 15001  long max = min + count;       //5001,  10001,  15001 , 20001  executor.execute(() -> {    List customers = customerService.findByXxx(beginTime, min, max);    customers.forEach(c -> {       Message message = Message.build();//省略构造消息体       messageService.save(message);      applicationEventPublisher.publishEvent(new MessageSendEvent(message));    });  });}
  大家不用在意什么事务细节,因为这是我为了减少代码简化的,看大致实现逻辑即可。后面就是监听这个事件然后修改数据库消息发送的状态,发送消息 @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)@Asyncpublic void listen(MessageSendEvent event) {  //修改数据库消息发送状态为:发送中  //执行发送消息 SDK 的 API  //修改数据库消息发送状态为:成功}存在的问题线程池资源可能未充分利用
  仔细观察第一段代码的逻辑,通过查询最大最小的活跃用户id来计算活跃用户总数,这并不是准确数值。它可能存在这样一种情况 maxId = 20001,minId = 12~5000 都是非活跃用户,5001~20000 是活跃用户
  这样一来 for 循环中负责  1-5001   的那个线程其实只有一个用户任务需要处理,也就是说总共 4   个线程,1   个线程执行任务是 0.5   秒 ,其余三个线程可能要十几分钟。这样第一个线程的资源就被浪费着了。看过我前面文章 学习 CompletableFuture 进阶之前先掌握两种线程池 的都知道  ForkJoinPool   有任务窃取机制,可以解决这个问题。 循环访问数据库
  由于我们发送消息需要入库记录,发送过程中又要修改两次状态(第一次是发送中、第二次是发送成功),也就是说一条消息会有三次数据库的 IO 操作,这样在大量循环下是一个最大的性能瓶颈。我们可以估算一个阈值,批量地对该阈值的一组消息用一个  batchInsert   只访问一次数据库。循环调用发送消息 SDK 的 API
  其实这个和循环访问数据库是一个道理,正常的消息队列都有批量发送消息的功能,而不是只能一条消息调用一个 SDK 的 API 。 只有一台机器执行该任务
  目前使用的定时任务是  xxl-job   ,路由策略是第一个,也就是说在 N 个服务实例上,只有一台实例会执行。这就相当于有 N-1 台实例在这个工作上是处于闲置状态的。我们可以让 N 台机器一起来做这个事情,xxl-job   的分片广播可以满足。
  总结下来可以优化的点 xxl-job   分片广播ForkJoinPool  批量访问数据库 批量发送消息 动手优化xxl-job 分片广播
  分片广播的含义是触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;简单来说就是每一台机器都会触发任务,同时每台机器会接到不同的参数,我们可以根据这不同的参数去分配不同的应用实例处理不同的数据。
  具体操作很简单,在  xxl-job   管理页面编辑任务路由策略为分片广播即可。 int shardIndex = XxlJobHelper.getShardIndex();// 当前分片 0/1/2/3int shardTotal = XxlJobHelper.getShardTotal();// 总分片 4List customerIds = customerService.findIdsByShard(request);//根据分片查询该机器要处理的数据
  findIdsByShard   的实现其实是一个非常简单的 SQL SELECT ID FROM t_customer WHERE MOD ( ID, ${request.shardTotal} ) = ${request.shardIndex}
  我们将  ID   对总分片数 4   进行求余,一个数对 4   求余只有四个结果,0,1,2,3   。这样一条 SQL 在所有机器上执行的数据结果就能瓜分要执行的总数据。ForkJoinPool
  假设上面每个应用实例拿到的  customerIds   数量是 200w  ,那么我们现在使用 ForkJoinPool   对这 200w   数据进行分治。首先定义任务类public class CustomerDailyTask extends RecursiveTask {  private final List customerIds; //客户 id 集合  private final CustomerService customerService;  public static final int THRESHOLD = 1000; //拆分阈值(这里需要自己多次实验效率最高的阈值,目前我试了十几个值,1000 是最合适的)  //省略构造方法  @Override  protected Integer compute() {    if (customerIds.size() <= THRESHOLD) {      return customerService.sendDailyMessage(customerIds);     }    int groupSize = (int) Math.ceil(customerIds.size() * 1.0 / 2); //对半拆分    List> partition = Lists.partition(customerIds, groupSize);    CustomerDailyTask task1 = new CustomerDailyTask(partition.get(0), customerService);    CustomerDailyTask task2 = new CustomerDailyTask(partition.get(1), customerService);    invokeAll(task1, task2);    return task1.join() + task2.join();  }}
  初始化  ForkJoinPool   执行 CustomerDailyTask task = new CustomerDailyTask(customerIds, customerService);int core = Runtime.getRuntime().availableProcessors();ForkJoinPool pool = new ForkJoinPool(core - 1); //留一个线程pool.invoke(task);批量访问数据库
  这里其实就简单了,下面这行代码中 return customerService.sendDailyMessage(customerIds);
  根据传入的  customerIds   构造一个  List   使用  batchInsert   方法插入数据库,然后发送一个  Spring   本地事件 Spring 事件发布 ,之后在事件监听器中  batchUpdate   去更新状态,这里省略。 批量发送消息
  这个很简单直接调用批量发送的 SDK 即可,由于我们用的  AWS SNS-SQS  ,SDK 版本比较低,我还升级了个SDK版本......这都是小问题,蛋疼的是 SDK 最多支持一次发送 10 条消息,我直接无语......
  我表示很疑惑,没办法那就拆吧 // SNS 限制一次最多发 10 条消息List> split = Lists.partition(list, 10);List> future = split.stream().map(item -> senderFactory.batchSend(item)).collect(Collectors.toList());List response = CompletableFutureUtil.allOfCompleted(future);//...更新数据库消息状态为 SUCCESS
  这里我们使用  CompletableFuture   批量异步发送消息,其实它内部用的线程池默认也是  ForkJoinPool  , allOfCompleted()   实现如下 public static  List allOfCompleted(List> list) {  CompletableFuture future = CompletableFuture.allOf(list.toArray(new CompletableFuture[list.size()]));  List result = list.stream().map(CompletableFuture::join).collect(Collectors.toList());  future.thenApply(v -> list); //阻塞主线程,执行完所有异步任务  return result;}
  外面是  ForkJoinPool   拆分数据,每一个拆分的子单元里面又是一个 ForkJoinPool   来发消息......我觉得机器配置如果很高的话这个设计方案就只有两个字 NB!作者请教
  大家看到了,目前我公司使用消息队列的时候是要持久化消息发送到数据库的,先在当前事务中插入一条消息发送记录,状态记为  CREATED  ,使用 Spring   事件机制实现当前业务方法事务提交后执行消息发送的 API ,状态记为 PENDING  ,发送成功后状态记为 SUCCESS   ,反之 FAILED  。
  这样做的好处是能  100%   保证消息丢失有迹可寻,每一条消息的发送都有记录。并且方便后续消息重试,或者重发,因为我这里记录了消息体。
  但是我觉得这样 emmm 访问数据库的 IO 操作感觉有点浪费,在面对高并发业务时比如秒杀系统,感觉这样的实现是不可用的,因为数据库面临巨大性能瓶颈。问了很多朋友他们公司是怎么做的,一半是入库记录,一半是不入库记录......在此请教广大网友,贵司是咋做的,请留言指教,谢谢! 结语
  本篇文章分析了一个大批量任务优化的方案,从  集群实例分担工作、合理使用线程资源、任务的分治、减少数据库IO、减少API的调用   一步步优化出一个目前较为合适的处理方案。

iPhone14最新渲染图亮相,iPhone13泪流满面,开启加速降价之前无数的报道都指出今年的iPhone14系列砍掉了5。4寸mini小屏机。然而,Evleaks拍摄的截图显示,在来自亚太地区苹果一顶级合作经销商的备货清单中,正在准备的新品有7款iphone的好处你知道几个?1。世界最薄的智能手机没什么稀奇,跟以往曝光手机完全,样,方形圆角设计外观,虽然看起来没有新意,但绝对有惊喜。全金属不锈钢金属机身,更加解释耐用的蓝宝石玻璃屏幕且支持防水设计内置天谁是现役交易价值最高小前?美媒公布前20名单詹姆斯11,小卡6无论是中锋统治的时代,还是当下的小球时代,在攻防两端可以扮演多种角色的小前锋都是一支球队核心球员的首选。从詹姆斯到杜兰特,再到伦纳德与塔图姆,联盟中一直不缺少顶级小前锋。那么谁才是士兰微长期投资价值分析(报告节选)全面剖析基本面(本报告由大连估股科技有限公司版权所有。完整报告参考公司官方公众号估股)报告完整目录主营业务分析士兰微成立于1997年9月,总部位于浙江杭州,2003年3月在上交所主板上市,是一家李景亮vs夜魔大战确定!嘎子哥就是那个善于创造奇迹的人昨天UFC278的比赛刚刚结束时,一条极为炸裂的消息在拳迷的朋友圈里传播开来李景亮要和夜魔打了!!!此消息一出,乌斯曼被爱德华兹KO的热度迅速被国内拳迷翻过,争相讨论该消息来源的可输出拉满阵容多变云顶之弈风暴龙王敖兴玩法思路解析云顶之弈S7自版本更新后,各种龙神羁绊五花八门,而以非龙神为主的阵容玩法却如凤毛麟角,所以接下来小编就给大家介绍一个拼多多非龙神主C的阵容玩法吧!版本更新风暴龙的核心双C都获得了一刚刚,阿瓦雷兹三战击败戈洛夫金,拳坛超级大战尘埃落定在刚刚结束的一场超中量级拳王争霸战中,现四个级别的世界拳王苏尔阿瓦雷兹(5822,39KO)12回合点数击败老对手根纳迪戈洛夫金(4221,37KO),赢得了两人的三番战,同时也卫续写传奇!张君龙冲击世界重量级金腰带,欲接棒霍利菲尔德刘易斯北京时间9月16日,多伦多泛太平洋酒店举办了别开生面的新闻发布会,发布会宣布10月15日,名将云集的世界重量级拳坛再起波澜,亚洲重量级霸主龙王张君龙将在加拿大多伦多打响个人职业生涯凯特贝金赛尔穿着一件宽松的紧身连衣裙出席多伦多电影节凯特贝金赛尔周四,她在2022年多伦多国际电影节期间出席了囚犯的女儿新闻日,赢得了一片喝彩。这位49岁的女演员穿着紧身连衣裙,搭配宽松的裙子和手绘的紫罗兰花,在晚宴会的活动上看起来怒砍2分北控王牌中锋让人失望9中1神勇不再马布里1800万打水漂89110,北控队在和山西队的热身赛当中以21分的分差输给对手。本场比赛北控队的大外援哈斯登场,而小外援高登仍然继续高挂免战牌。反倒是山西队全主力出战,威姆斯费尔德全都展现出来绝佳罚球得2分却狂砍95分!女篮太可怕了没罚球靠1招打服对手北京时间9月26日,女篮世界杯小组赛第4战,中国队最终以9560击败波多黎各。在这一场比赛中,中国队虽然只在罚球线上拿到2分,不过,凭借着强大的整体优势,中国队最终还是砍下95分的
沙特工业和矿产资源大臣期待与中国合作央视网消息为期三天的第六届未来投资倡议大会于27日结束,这是中东地区规模最大的国际投资和创新论坛。大会期间,沙特工业和矿产资源大臣胡莱夫对中国企业和科技发展给予了高度评价,并表示希大S带老公聚会!穿裙暴露大肚子孕像明显,光头老公穿背带裤装嫩在裙装这一块,女性真是一年365天换着穿都不带重复的,不过随着裙装款式不同,整体展现出来的既视感也是不一样的,如果从舒适度这一块来讲,女性比较倾向于选择衬衫裙,衬衫裙虽不像别的裙装中行前三季净利超1730亿增5。85,不良率1。3110月28日晚间,中国银行(601988。SH3988。HK)发布的2022年三季报显示,今年79月,中国银行实现营业收入1545。64亿元,同比增长0。97,实现归属于银行股东的小米突然发布新机,2亿像素210W快充,仅售1199元起小米突然发布了一款新机,而且性价比很高,仅售1199元起,它就是红米Note12,这次的RedmiNote12最大的亮点就是影像,一个是Note12pro的2亿像素,另一个是Not停不下来买买买!全红婵比完赛在德国疯狂购物,比赛奖金花不完就在最近,中国跳水队已经以八枚金牌的优秀成绩,结束了德国世界杯的征程,不过后勤部门将他们的行程安排的很紧,所以他们没有能够在当地游玩的闲暇时间,只能在机场或者酒店附近购买一些纪念品美国经济止跌回升!是转折点还是昙花一现?三季度美国经济爆了个冷门。根据美国商务部最新公布的数据,三季度美国GDP按年率计算增长2。6,这是按季度算美国GDP今年首次增长,高于市场预期。今年一季度和二季度,美国GDP分别下痛风不痛就不用治?专家大错特错随着生活水平的提高,海鲜肉类等高嘌呤食物在餐桌上越来越常见,高尿酸血症与痛风的发病率也逐年上升,根据统计,我国高尿酸血症患者已达1。86亿,占总人口13。3痛风患者已有两千万左右,512GB存储IMX800三主摄,跌至2499元,还要啥自行车呢?先问大家一个问题,大家平时买手机,需要多大的存储呢?当前比较常见的是128GB和256GB,其实128GB基本不够用,使用1年左右一般都会满了,需要经常清理存储空间,256GB存储经济发展看亮点乌鲁木齐上市公司数量全疆第一现有上市公司36家,其中A股31家,能源装备制造业优势明显新疆网讯(全媒体记者王丽丽)近日,每经品牌价值研究院根据2022中国上市公司品牌价值总榜的3000家企业,排出新疆的城市榜单,新疆共有7个城市上榜。上榜的新疆城市包括乌鲁木齐市昌吉939亿!伊利连续29年业绩稳健增长雷达财经出品文苏红编深海10月27日,伊利股份披露2022年前三季度业绩,公司实现营业总收入938。61亿元,同比增长10。42实现净利润80。61亿元,同比增长1。47。公司全年天价收购推特,马斯克又完善了一块商业版图,你看懂了吗文嬉笑怒骂A博士10月27日马斯克抱着一个水槽高高兴兴地走进推特总部,完成了对推特的收购。马斯克又一次搏了一波眼球,完成了一场直播秀从自家推特挣到第一笔钱,同时也给推特做了一波免费价值投资已死,绝大多数人应该放弃买股票今年的股市让很多人记忆犹新,确切地说是无可奈何。一个令人困惑的现实就是,在中国买股票这件事情变得越来越难了。因为似乎没有一个恒定的规则,可以确保你的投资策略是正确的。没有哪些股票,19次出手,14个三分球!回不去了,对不起克莱,你难做勇士二当家NBA常规赛第一阶段竞争就非常激烈,大多数球队已经打了大概45场比赛。虽说比赛场次并不多,或许很多球队都没有从休赛期当中调整过来,但从战绩上看,仍有很多出乎球迷意料的地方。比如说作勇士估值高达70亿,登顶NBA球队估值榜北京时间10月28日,福布斯公布了NBA球队的最新估值排名,勇士以70亿美元估值排名第一,尼克斯61亿美元屈居第二,湖人59亿美元排名第三。70亿美元勇士创造了NBA联盟历史纪录,打一场歇一场!伦纳德变成联盟头号玻璃人!4个赛季缺席119场比赛说实话,季前赛刚开打那会儿,看了卡子哥的首秀,我对他还是有很大期待的。尽管由于体型变小,对抗大不如前,但速度明显轻盈了许多,篮子也没明显下滑。当时技巧君对他的评价是不如巅峰,但是能明明说好摆烂却拿第一!爵士或成新赛季最大黑马6金刚急需证真身一波三连胜,让犹他爵士在新赛季开始后坐上了西部第一的宝座,尽管在后面的比赛中他们的排名会下滑,但是无论最终的战绩如何,爵士从新赛季开始后便已经证明了自己。作为一支重建中的球队,爵士绯闻?德甲高塔霍夫曼将驰援海港据德国体育图片报报道,德甲波鸿俱乐部近日收到了中超某俱乐部引援报价,欲以400万欧元求购29岁的德国中锋菲利普霍夫曼。霍夫曼身高1。95米,是典型的高中锋对抗头球和传射能力极强。本中国队成功拿到满额奥运席位原标题射击世锦赛8个奥运会资格项目比赛结束(引题)中国队成功拿到满额奥运席位(主题)北京日报讯(记者卓然)日前,在埃及开罗举行的2022射击(步手枪)世锦赛结束了全部8个奥运会资格第十七届象棋世锦赛落幕,王天一左文静折桂,中国男团首次失金第十七届象棋世锦赛落幕,王天一左文静折桂,中国男团首次失金10月28日,第十七届世界象棋锦标赛在马来西亚砂拉越州首府古晋市完美收官,男子组决赛中王天一慢棋战胜中国香港的冯家俊,王天韩国芯片制造商SK第三季度利润暴跌,同比剧减60。3来源环球时报环球时报驻韩国特约记者张静韩联社26日报道称,韩国芯片制造商SK海力士当天发布业绩报告,受全球内存芯片市场下行的影响,第三季度营业利润为1。6556万亿韩元(约合人民币快入冬了多吃炖菜,5道家常炖菜,热气腾腾,好吃省事越吃越舒服霜降过后,秋天正式谢幕,冬天即将登场,寒意越发的浓了。驱散寒冷,最有效的方式莫过于吃上一碗热气腾腾的炖菜。不管是炖肉,还是炖菜,只要搭配好了炖到火候,荤的素的都能吃得唇齿留香,浑身这才是50岁普女穿搭典范!不靠帽子名牌包,200块大衣穿出贵妇感尽管在某些社交平台上,我们看到的一些50女人,似乎个个都有体面工作,有钱有颜,随便买件衣服就4位数,包包多到数不清,但关掉WiFi出街走走,就会发现现实中的妈妈们,真的没有那么精致