redis定义container @Bean public ThreadPoolTaskExecutor redisSubcriberTaskExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(4); threadPoolTaskExecutor.setMaxPoolSize(16); threadPoolTaskExecutor.setQueueCapacity(100); threadPoolTaskExecutor.setKeepAliveSeconds(10); threadPoolTaskExecutor.setThreadNamePrefix("redis message listener"); return threadPoolTaskExecutor; } @Bean public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory, ThreadPoolTaskExecutor redisSubcriberTaskExecutor) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); container.setTaskExecutor(redisSubcriberTaskExecutor()); return container; }发布消息 public void convertAndSend(String channel, Object value) { stringRedisTemplate.convertAndSend(channel, value); } log.info("推送邮件解析状态到前端======>{}", state); final String jsonMessage = MessageHelper.emailParseStateJson(state); if (jsonMessage == null) { log.error("转换邮件解析状态json数据失败, 发布推送数据失败,{}", state); return; } redisService.convertAndSend(FofEmailParseHelper.REDIS_EMAIL_PARSE_STATE_CHANNEL, jsonMessage);接收消息 定义一个抽象类/** *Title: *
Description: *
Copyright: CopyRight (c) 2020-2035 *
Company: lehoon Co. LTD. *
Author: lehoon *
Date: 2021/12/8 16:08 */ public abstract class AbstractRedisMessageListener implements MessageListener, InitializingBean { private RedisMessageListenerContainer container; public AbstractRedisMessageListener(RedisMessageListenerContainer container) { this.container = container; } @Override public void afterPropertiesSet() throws Exception { container.addMessageListener(this, getTopic()); } protected abstract Topic getTopic(); } 具体消息接收处理类/** *
Title: *
Description: *
Copyright: CopyRight (c) 2020-2035 *
Company: lehoon Co. LTD. *
Author: lehoon *
Date: 2021/12/9 14:53 */ @Slf4j @Component public class FofRedisParseStateListemer extends AbstractRedisMessageListener { @Resource private SimpMessagingTemplate simpMessagingTemplate; public FofRedisParseStateListemer(RedisMessageListenerContainer container) { super(container); } @Override protected Topic getTopic() { return new PatternTopic(FofEmailParseHelper.REDIS_EMAIL_PARSE_STATE_CHANNEL); } @Override public void onMessage(Message message, byte[] bytes) { log.info("接收到邮件解析返回的状态消息{}", message); EmailParseState response = MessageHelper.emailParseStateObject(new String(message.getBody())); if (response == null || !response.isValid()) { log.error("推送邮件解析状态到前端失败, 转换消息失败{}", message); return; } try { simpMessagingTemplate.convertAndSendToUser(response.getUserId(), FofEmailParseHelper.WEBSOCKET_EMAIL_PARSE_EVENT_TOPIC, response); log.info("推送邮件解析状态到前端==>{}", response); } catch (Exception e) { log.error("推送邮件解析状态到前端发生异常{}", e); } } }