范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

项目系统中使用Springboot集成kafka业务实现系统处理消费实例

  框架学习整理记录下日常spring boot开发Kafka下面涉及的文件类; 1、QiDongManagerApp.java 启动类处理 , 2、application.yml 配置文件, 3、TestCommandBusProduce.java 生产者接口, 4、TestCommandBusProduceImpl.java 生产者接口实现类, 5、TestCommandBusConsumer.java 总线消费者, 6、TestCommandBusListener.java业务场景监听处理逻辑 7、maven下pom.xml集成kafka包
  pom.xml文件
  org.apache.kafka
  kafka-clients
  2.8.1
  
  org.apache.kafka
  kafka-clients
  0.8.2.0
  
  application.yml 配置文件
  app:
  kafka:
  event-bus:
  cluster-name: teste_kcz8px7x01
  system-url: http://tk.kafkasit.loud.local:1080/mestService.pub
  topic: Test_COMMAND_BUS_PRODUCE
  product-topic-tokens: COMMAND_BUS_PRODUCE:test
  product-poolsize: 5
  consume-thread-count: 1
  consume-message-group-size: 20
  consume-topic-tokens: COMMAND_BUS_MANAGER:test01
  启动类处理
  QiDongManagerApp.java
  @Bean
  @ConfigurationProperties(prefix = "app.kafka.event-bus")
  public EventBusKafkaConf eventBusKafkaConf() {
  return new EventBusKafkaConf();
  }
  @Bean
  @Profile(value = {"sit","prod"})
  public TestCommandBusProduce TestCommandBusProduce(EventBusKafkaConf eventBusKafkaConf){
  return new TestCommandBusProduceImpl(eventBusKafkaConf);
  }
  @Bean
  @Profile(value = {"sit","prod"})
  public TestCommandBusConsumer TestCommandBusConsumer(EventBusKafkaConf eventBusKafkaConf, IStringMessageConsumeListener ConsumeListener){
  return new TestCommandBusConsumer(eventBusKafkaConf,ConsumeListener);
  }
  监听时间配置
  EventBusKafkaConf.java
  @Data
  public class EventBusKafkaConf {
  /**集群名称*/
  private String clusterName;
  /**地址*/
  private String systemUrl;
  /**命令总线:主题*/
  private String topic;
  private String productTopicTokens;
  /**线程池大小*/
  private int productPoolsize;
  private int consumeMessageGroupSize;
  private int consumeThreadCount;
  private String consumeTopicTokens;
  }
  TestCommandBusProduce.java
  public interface TestCommandBusProduce extends InitializingBean, DisposableBean {
  /**
  * 发送消息
  *
  * @param message
  */
  public void sendSingleString(String id,String type,String message) ;
  /**
  * 发送消息到某个分区
  * @param id 消息唯一编号
  * @param type 消息类型
  * @param key 分区key
  * @param message 消息数据
  */
  public void sendSingleStringWithKey(String id,String type,String key,String message) ;
  /**
  * 发送消息
  *
  * @param message
  */
  public void batchSendString(String id,String type,List list) ;
  public void batchSendList(List list);
  public void sendSingleString(String message);
  }
  TestCommandBusProduceImpl.java
  @Slf4j
  public class TestCommandBusProduceImpl implements InitializingBean, DisposableBean,TestCommandBusProduce{
  private IKafkaProducer kafkaProducer;
  private EventBusKafkaConf kafkaConf;
  public TestCommandBusProduceImpl(EventBusKafkaConf kafkaConf){
  this.kafkaConf = kafkaConf;
  }
  @Override
  public void afterPropertiesSet() throws Exception {
  int poolSize = kafkaConf.getProductPoolsize();
  String systemUrl = kafkaConf.getSystemUrl();
  String clusterName = kafkaConf.getClusterName();
  String topicTokens = kafkaConf.getProductTopicTokens();
  ProduceConfig produceConfig = new ProduceConfig(poolSize, systemUrl, clusterName, topicTokens);
  kafkaProducer = new ProducerPool(produceConfig);
  }
  /**
  *
  * @param topicTokens
  * @throws Exception
  */
  public void setProperties(String topicTokens) throws Exception {
  int poolSize = kafkaConf.getProductPoolsize();
  String systemUrl = kafkaConf.getSystemUrl();
  String clusterName = kafkaConf.getClusterName();
  ProduceConfig produceConfig = new ProduceConfig(poolSize, systemUrl, clusterName, topicTokens);
  kafkaProducer = new ProducerPool(produceConfig);
  }
  @Override
  public void destroy() throws Exception {
  if (kafkaProducer != null) {
  kafkaProducer.close();
  }
  }
  /**
  * 发送消息
  *
  * @param message
  */
  public void sendSingleString(String id,String type,String message) {
  JSONObject jsonObject = new JSONObject();
  jsonObject.put("id", id);
  jsonObject.put("type", type);
  jsonObject.put("eventTime", new Date());
  jsonObject.put("data", message);
  log.info("sendSingleString jsonObject={}",jsonObject);
  String topic = kafkaConf.getTopic();
  sendSingleString(topic, jsonObject.toString());
  }
  public void sendSingleString(String message) {
  log.info("sendSingleString jsonObject={}",message);
  sendSingleString(kafkaConf.getTopic(), message);
  }
  private void sendSingleString(String topic, String message) {
  //kafkaProducer.sendString(topic, message);
  KeyedString keyedMessage=null;
  String batchId = getBatchId(message);
  if(StringUtils.isNotEmpty(batchId)){
  keyedMessage=new KeyedString(batchId,message);
  }else{
  keyedMessage=new KeyedString("Test-event-stream",message);
  }
  //保证消息次序
  kafkaProducer.sendKeyedString(topic, keyedMessage);
  }
  /**
  * 业务场景开启不同分区
  * @param message
  * @return
  */
  private String getBatchId(String message){
  try {
  JSONObject msg = JSONObject.parseObject(message);
  String type = msg.getString("type");
  JSONObject data = JSONObject.parseObject(msg.getString("data"));
  //业务场景数据检查
  if(TaskInfoConstants.SEND_CONTACT_CHECK.equals(type)
  ||TaskInfoConstants.SEND_CONTACT_CHECK_RESULT.equals(type)){
  return data.getString("batchId");
  }
  //业务场景计算
  if(TaskInfoConstants.SEND_CONTACT_BILL.equals(type)
  ||TaskInfoConstants.SEND_CONTACT_BILL_RESULT.equals(type)
  ||TaskInfoConstants.SEND_BILL_CALCSTATUS.equals(type)){
  return data.getString("batchId");
  }
  //业务场景计算
  if(TaskInfoConstants.WITHHOLD_CONTACT.equals(type)
  ||TaskInfoConstants.WITHHOLD_CONTACT_RESULT.equals(type)
  ||TaskInfoConstants.WITHHOLD_REPORT_RESULT.equals(type)){
  return data.getString("batchId");
  }
  //业务场景
  if(TaskInfoConstants.ICS_CONTRACT.equals(type)){
  return data.getString("contractNo");
  }
  //业务场景回调
  if(TaskInfoConstants.ZH_SALARY.equals(type)){
  return data.getString("taskId");
  }
  } catch (Exception e) {
  log.error(e.getMessage(), e);
  }
  return null;
  }
  /**
  * 发送消息
  *
  * @param message
  */
  public void batchSendString(String id, String type, List list) {
  log.info("sendSingleString id={},type={},list={}",id,type,list);
  //保证消息次序
  String topic = kafkaConf.getTopic();
  batchSendString(topic, list);
  }
  private void batchSendString(String topic, List list) {
  //kafkaProducer.batchSendString(topic, list);
  //保证消息次序
  List keyedStringList= Lists.newArrayList();
  for(String message:list) {
  keyedStringList.add(new KeyedString("Test-event-stream",message));
  }
  kafkaProducer.batchSendKeyedString(topic, keyedStringList);
  }
  /**
  * 发送消息
  *
  * @param message
  */
  @Override
  public void batchSendList(List list) {
  log.info("batchSendList id={}",list);
  //保证消息次序
  String topic = kafkaConf.getTopic();
  kafkaProducer.batchSendString(topic, list);
  }
  @Override
  public void sendSingleStringWithKey(String id, String type, String key, String message) {
  JSONObject jsonObject = new JSONObject();
  jsonObject.put("id", id);
  jsonObject.put("type", type);
  jsonObject.put("eventTime", new Date());
  jsonObject.put("data", message);
  log.info("sendSingleStringWithKey key={}, jsonObject={}",key,jsonObject);
  String topic = kafkaConf.getTopic();
  KeyedString keyedMessage=new KeyedString(key,jsonObject.toString());
  //保证消息次序
  kafkaProducer.sendKeyedString(topic, keyedMessage);
  }
  }
  TestCommandBusProduce.java
  public interface TestCommandBusProduce extends InitializingBean, DisposableBean {
  /**
  * 发送消息
  *
  * @param message
  */
  public void sendSingleString(String id,String type,String message) ;
  /**
  * 发送消息到某个分区
  * @param id 消息唯一编号
  * @param type 消息类型
  * @param key 分区key
  * @param message 消息数据
  */
  public void sendSingleStringWithKey(String id,String type,String key,String message) ;
  /**
  * 发送消息
  *
  * @param message
  */
  public void batchSendString(String id,String type,List list) ;
  public void batchSendList(List list);
  public void sendSingleString(String message);
  }
  TestCommandBusConsumer.java
  public class TestCommandBusConsumer implements InitializingBean {
  private EventBusKafkaConf kafkaConf;
  private IStringMessageConsumeListener ConsumeListener;
  public TestCommandBusConsumer(EventBusKafkaConf kafkaConf, IStringMessageConsumeListener ConsumeListener) {
  super();
  this.kafkaConf = kafkaConf;
  this.ConsumeListener = ConsumeListener;
  }
  @Override
  public void afterPropertiesSet() throws Exception {
  initKafkaConfig();
  }
  public void initKafkaConfig() throws KafkaException {
  ConsumeOptionalConfig optionalConfig = new ConsumeOptionalConfig();
  optionalConfig.setMessageGroupSize(kafkaConf.getConsumeMessageGroupSize());
  optionalConfig.setAutoOffsetReset(AutoOffsetReset.NOW);
  ConsumeConfig pickupConsumeConfig = getConsumeConfig(kafkaConf.getTopic());
  KafkaConsumerRegister.registerStringConsumer(pickupConsumeConfig, ConsumeListener, optionalConfig);
  }
  private ConsumeConfig getConsumeConfig(String topicName) {
  return new ConsumeConfig(kafkaConf.getConsumeTopicTokens(), kafkaConf.getSystemUrl(), kafkaConf.getClusterName(), topicName, kafkaConf.getConsumeThreadCount());
  }
  }
  TestCommandBusListener.java
  @Slf4j
  @Component
  public class TestCommandBusListener implements IStringMessageConsumeListener {
  @Resource
  private ContactAreaConfigService contactAreaConfigService;
  @Resource
  private IAgContactInfoSnapshotService iAgContactInfoSnapshotService;
  @Resource
  private BillDispatcherService billDispatcherService;
  @Resource
  private YtBillDispatcherService ytBillDispatcherService;
  @Resource
  private IFileSysDateService iFileSysDateService;
  @Autowired
  private ApplicationEventPublisher publisher;
  @Resource
  private IAgContactInfoBatchService iAgContactInfoBatchService;
  @Resource
  private ContactAreaJtInfoService contactAreaJtInfoService;
  @Resource
  ExceptionNotifier exceptionNotifier;
  @Autowired
  private MessageReceiver messageReceiver;
  @Override
  public void onMessage(List list) throws KafkaConsumeRetryException {
  try {
  DataPermissionHodler.disablePermissionFilter();
  // 剥离消息日志记录过程与消息处理过程
  for (int i = 0; i < list.size(); i++) {
  JSONObject jsonObject = JSON
  .parseObject(list.get(i).toString());
  publisher.publishEvent(new KafkaEvent(jsonObject
  .getString("id"), jsonObject.getString("refId"),
  jsonObject.getString("type"), jsonObject
  .getDate("eventTime"), jsonObject
  .getString("data")));
  }
  //消息异步处理分发
  for (int i = 0; i < list.size(); i++) {
  JSONObject jsonObject = JSON.parseObject(list.get(i).toString());
  Message msg=new KafkaEvent(jsonObject.getString("id"),jsonObject.getString("refId"),
  jsonObject.getString("type"),jsonObject.getDate("eventTime"),jsonObject.getString("data"));
  messageReceiver.pushMessage(msg);
  }
  for (int i = 0; i < list.size(); i++) {
  JSONObject jsonObject = JSON
  .parseObject(list.get(i).toString());
  String type = jsonObject.getString("type");
  if(TaskInfoConstants.SEND_CONTACT_CHECK_RESULT.equals(type)){
  billDispatcherService.csAndJsCheck(jsonObject
  .getJSONObject("data"));
  }
  if(TaskInfoConstants.SEND_CONTACT_BILL_RESULT.equals(type)){
  billDispatcherService.csAndJsJs(jsonObject
  .getJSONObject("data"));
  }
  if(TaskInfoConstants.SEND_CONTRACT_CHANGED.equals(type)){
  contactAreaConfigService.sysnContractConf(jsonObject
  .getJSONObject("data"));
  contactAreaJtInfoService.sysnContractConfJtInfo(jsonObject
  .getJSONObject("data"));
  }
  // if(TaskInfoConstants.RETURN_BILL_DATA.equals(type)){
  // iFileSysDateService.updatesysFileAttbill(jsonObject);
  // }
  if(TaskInfoConstants.WITHHOLD_CONTACT_RESULT.equals(type)){
  ytBillDispatcherService.csAndJsJs(jsonObject
  .getJSONObject("data"));
  }
  if(TaskInfoConstants.WITHHOLD_REPORT_RESULT.equals(type)){
  ytBillDispatcherService.csAndYtJsEndJs(jsonObject
  .getJSONObject("data"));
  }
  if(TaskInfoConstants.SEND_BILL_CALCULATE_INFO.equals(type)){
  BillCalculateParam billCalculateParam = JSON
  .parseObject(JSON.toJSONString(jsonObject
  .getJSONObject("data")),
  BillCalculateParam.class);
  iAgContactInfoBatchService
  .deleteBillCalculateData(billCalculateParam);
  }
  }
  } catch (Exception e) {
  exceptionNotifier.notify(e);
  log.info("ContactAccountListener list={}", JsonUtil.toJson(list));
  log.error(e.getMessage(), e);
  }
  }
  }

专访杭州浩岳机械设备的出海新谋略杭州浩岳机械设备有限公司,成立至今,已有20年的历史,是一家专业生产出口捆绑带拖车带等相关工业纺织品的企业。公司拥有先进的自动化机械设备,且远销全球25个国家和地区,获得了莱茵TUOPPO等国产手机厂商全面发力,全球5G终端设备数量创新高不久前,全球知名数据调研机构GlobalCertificationForum(GCF)发表了一份关于5G发展的研究报告。在这份报告中,GCF指出,虽然目前5G行业中所有的焦点都集中百元的蓝牙鼠标,可无线充电,可多设备同时连接!移动办公越来越成为一种主流的工作模式,该模式往往需要便携且可靠的移动设备来支撑。移动设备除了笔记本电脑外,另一个不可或缺的硬件就是鼠标。在我们工作中,一款性能稳定且可靠的鼠标,有时去除牙齿污垢,不给细菌留温床,罗曼丁香医生联名款冲牙器体验毋庸置疑,口腔健康一直都是大家容易忽略的问题。清洁口腔,光靠刷牙是远远不够的。因为牙刷只能清除牙齿上大的食物残渣,而对于牙缝间的污垢就显得苍白无力,时间久了,牙齿内难免会产生牙结石我的第一间工作室录音设备就只有一个麦克风?笔者的工作室系列正式开始啦,在就地过年初二就开始上班的摧残之后,我终于在元宵节回到了家。现在按照原来的设计思路简化,先把家这边搭起来,毕竟不涉及到硬装的问题。现在也是初步有了一定的票房8天破10亿,微博如何讲述父辈故事微博的电影营销战略前面哭惨了,后面笑爆了。类似的评价在微博上屡见不鲜,我和我的父辈让人又哭又笑我和我的父辈口碑我和我的父辈有多上头等频登微博热搜。相关讨论发酵下,我和我的父辈目前保微博热搜去娱乐化大众需要一个怎样的热搜榜?在一段时间里,微博热搜被诸多人诟病热搜榜上经常充斥着娱乐明星们各种鸡毛蒜皮的小事,而讨论度挺高的一些时事热点话题却上不了热搜。这个现象在改变。如果仔细观察,就可以发现微博热搜上的议食验室从年轻人中来,到年轻人中去针不戳品牌V计划零食垃圾食品?不健康?胖?零食好像是健康的敌人,想吃零食的欲望与健康之间的矛盾的痛点,被这个95后团队发现。本期看见新锐的主人公是致力于解决这一矛盾的食验室创始人孙思达,他是95后谐音梗social小能手冲浪少年哈啤的社交达人养成记要说今夏最火的刑侦局,非扫黑风暴莫属!超前点映都不足以压制网友被孙兴气死的怒火。只要集数更新,相关剧情常会成为网友热议焦点。无植入,不电视剧。如此火爆的影片,自然少不了广告合作商的如何有质感的触动更多年轻人?BMW是这样回答的您知道闷得儿蜜是什么意思吗?在8月底,微博突然之间被闷得儿蜜刷屏,随着易烊千玺闷得儿蜜了点东西冲上热搜,无数人脑袋里都出现了这个疑问闷得儿蜜是个啥?别说很多南方小伙伴满心疑惑,小编复盘永劫无间,武侠风端游如何实现社交国民化?内容营销,正成为现象级游戏传播的标配手段头部游戏越来越国民化了。从是兄弟就来砍我的传奇手游名字超长的坎公骑冠剑再到近期成功出圈的永劫无间。越来越多的头部游戏在社交媒体上国民化,通过
哈啰打车接入享道T3等服务聚合打车模式再升级?新京报贝壳财经讯(记者陈维城)12月28日,哈啰出行旗下哈啰打车宣布,升级运力合作平台为品牌联盟,并接入享道出行T3出行如祺出行等多家出行企业。加上此前已达成合作的曹操出行首汽约车美佳美侬在新零售模式上大力布局渠道拓展成为品牌重要课题目前,服饰市场还远远没有达到行业的天花板,未来的市场空间还是比较大的,这和当前国民经济水平提升较快,消费能力不断提高有着密切的关系,消费者对穿着的品味品质要求也水涨船高。在这样的市外星人M15R6,用了2个月的感受和小建议购买原因第一台电脑就是戴尔的,对品牌挺有好感的,我的专业是产品设计,平常需要建模和渲染,偶尔玩玩游戏,软磨硬泡,老爸给了1万的梦想基金后,自己省吃简用两个月,贴了亿点点入手了用了2教你用C来实现基于Mempool的内存池设计前言设计内存池的目标是为了保证服务器长时间高效地运行,通过对申请空间小而申请频繁的对象进行有效管理,减少内存碎片的产生,合理分配管理用户内存,从而减少系统中出现有效空间足够,而无法比蚂蚁藏得还深?联想上市被叫停的谜底,被证监会揭开了7年前,联想的老板杨元庆和特斯拉的老板马斯克曾经同台接受采访,当时的联想业务高居世界前列,连杨元庆说话的底气都很足,连连质问马斯克你们有多少用户,你们能卖多少台设备?那时的马斯克算新春购机指南来了!预算一万,内行人推荐入手这四款2021年即将完结,新年的脚步也离我们越来越近,大街小巷也充满着迎接新年的气息。新年新气象,相信很多朋友在收到了丰厚的年终奖之后,都按耐不住想换新机。在辞旧迎新之际,手机厂商也会推六大取暖设备?体验成本安全性全面对比大家好,我是尤呢呢。说起冬季取暖,最常见的一句话就是北方人靠暖气,南方人靠一身正气。但是作为一个老婆家在南方的北方人来说,真的是扛不住南方的法术攻击。而目前来说常见的加热设备分为三关注变异毒株影响美股涨跌互现腾讯概念股重挫拼多多跌逾11金融界1月5日消息,市场市场继续关注奥密克戎变异毒株对经济的潜在影响,同时美国单日新增确诊新增病例数超100万及12月ISM制造业指数创11个月来最低增幅令市场承压,美股收盘涨跌互欧拉新能源在前方无车时根据设定速度实际路况与交通指示标志行驶,直行与过弯控制自然平滑。前方有车时,可跟随前车起步跟车减速跟停,保持安全距离行驶。AI自动泊车深度植入神经网络学习算法,有效实现研究团队开发钯纳米巧克力颗粒储氢或成为氢能关键技术突破据美国化学学会(ACS)期刊ACSNano上发表的一项研究,德国电子同步加速器(DESY)团队开发出一种创新方法,可将纳米粒子变成简单的储氢库。DESY纳米实验室负责人安德雷斯斯蒂Python爬虫笔记4目录lxml模块和xpth语法学习正则表达式用法re模块match方法importre使用match方法进行匹配操作resre。match(hello,helloworld)如果上