范文健康探索娱乐情感热点
投稿投诉
热点动态
科技财经
情感日志
励志美文
娱乐时尚
游戏搞笑
探索旅游
历史星座
健康养生
美丽育儿
范文作文
教案论文
国学影视

使用Redis实现简单的事件驱动架构DDD事件溯源和一致性哈希

  用 Redis 模拟 Kafka实现事件驱动 架构:[DDD、事件溯源和一致性哈希],
  Apache Kafka 已成为大多数技术栈中的主流组件。使用 Kafka 的好处包括确保事件中的因果顺序,同时保持并行性,通过在服务器之间快速复制分区来恢复故障,等等。
  然而,运行 Kafka 也面临着一系列挑战。虽然许多工程团队都希望将 Kafka 添加到他们的堆栈中并与"真正的"工程师一起赢得一席之地,但运营开销构成了强大的进入障碍。
  在这篇文章中,我们将重点介绍如何构建一个看起来像传统单体应用程序但又是松散耦合的事件驱动系统的系统。为此,我们依赖于从领域驱动设计、事件溯源和一致性哈希等概念中学习。
  有序事件
  大多数系统关心事件的顺序。大多数系统中的排序仅限于所考虑的域。例如,当我们查看帖子到一个线程时,我们关心的是相对于帖子的排序。当我们查看金融系统时,排序主要限于账户。大型系统中事件的全局排序很少有用,但可能是相关的。
  场景:帖子被添加到一个线程Thread中
  假设我们对每个添加的帖子都有相当多的后期处理,这反过来会更新线程的某些属性。
  这创建了一个相当好的场景来说明分区的使用。
  在这种情况下,默认方法是将所有帖子发送到队列中,并让一群工作人员(或消费者)完成工作。这为我们提供了系统所需的并行性,但在我们与多个消费者打交道的那一刻,顺序就会丢失。
  我们保留顺序的唯一方法是确保我们一次处理一个任务,从而才能反映该线程上发生的事情的真实顺序。
  下一个明显的想法是使用每个线程的专用队列来处理相同的问题,但如果我们知道我们将生成大量线程,那立即感觉像是矫枉过正。
  分区
  分区只是将我们的排队系统分解为专门的分区。因此,如果我们从一个天真的估计开始,即8个工人每分钟能够处理1600个事件,那么我们的设计就从16个分区开始。
  你可能需要做更多的工作来确保你的估计是好的,但在这个例子中,我们将以假设它是好的来工作。我们还为一个分区分配一个worker,因为我们希望每个分区都能始终保持因果排序。
  现在我们需要确保一个特定线程的帖子都被路由到同一个分区。每个分区都由一个消费者管理,所以我们的排序不会被打乱。
  重要的是要记住,"队列分区 "或 "专用分区 "是一个抽象的结构。它实际上只是一个队列。我们使用分区这个术语,因为它使我们很容易与该领域广泛使用的术语保持一致。
  一致性哈希
  我们将使用一致哈希散列作为一种手段,将属于特定线程的所有帖子路由到同一队列分区(或队列)。
  在我们的例子中,我们将使用Murmurhash和一个由名为uHashRing的库管理这个持续体。
  将我们的队列视为一个连续体
  现在,如果我们简单地将所有的8个队列放在一个圆圈中,我们会得到这样的结果。让我们把这个称为连续体,因为第7个队列后面是第一个队列,即第0个队列。
  现在,一致性散列允许我们使用threadId将一个给定的任务/工作映射到一个特定的队列。因此,在这种情况下,我们使用threadId作为分区的关键。
  这里需要注意的一个重要方面是,我们没有把我们的队列称为后处理队列。它们不是专用队列。你可以把一个Transaction事务事件扔到这里,并期望相应的消费者(和事件处理程序)来处理它。
  事件
  在前面的几段话中,我们已经说了很多关于事件的内容,但我们还没有真正定义事件的含义。
  我们的系统会把事件看成是发生在我们系统中的事实。
  事实通常是指以某种方式改变了系统状态的事情(或者是失败的事情)。
  例如,PostCreatedEvent发生在一个新帖子被创建时。同样地,当帖子被更新时,PostUpdatedEvent也会发生。
  你可以将一个事件映射到你系统中的大多数CRUD操作。
  如果将你的系统设计成领域,你会惊讶地发现一个应用服务所触发的事件的数量。
  一个事件也映射了系统的周围状态。
  让我们设计一个创建帖子的应用服务:
  from typing import List from .services.base import ServiceBase from sqlalchemy.session import Session  class PostService(ServiceBase)     def __init__(self, thread_id: UUID, params:   PostCreateAPIParams, db_session: Session):         self.thread_id = thread_id         self.params = params         self.user: Union[User, None] = None         self.post: Union[Post, None] =  None         self.db_session = db_session         self.errors: List[str] = []         self.error_code: Union[str, None] = None        async def __call__(self)         return await self.invoke()       async def invoke(self):          await self.find_thread()          await self.verify_author()          await self.create_post()          await self.build_response_dao()          await self.trigger_events()          return self          async def find_thread(self):     # truncated for brevity     pass                async def trigger_events(self):     user_dao: UserDAO = UserDAO.from_orm(self.user) if self.user else None         post_dao: PostDAO = PostDAO.from_orm(self.post) if self.post else None         thread_dao: ThreadDAO = ThreadDAO.from_orm(self.thread) if self.thread else None          if await self.has_errors:             event_dao = PostCreatedEventDAO(                 user=user_dao,                 thread=thread_dao,                 post=post_dao,                 params=self.params,                 errors=self.errors,                 error_code= self.error_code             )         else:             event_dao = PostCreationFailedEventDAO(         user=user_dao,                 thread=thread_dao,                 params=self.params,                 post=post_dao                 errors=self.errors,                 error_code= self.error_code             )                               partition_key = (        str(self.thread.id) if self.thread else "PostCreationFailedEvent"         )        await SystemEventService.trigger(partition_key=partition_key, event_dao=event_dao, db_session=self.db_session)        return self
  在这个例子中, trigger_events方法决定了要发布的事实。在这种情况下,它收集了周围的上下文。这也可能包括请求参数(如传递到事件中的params属性)。然而,什么是正确捕获的上下文也取决于上下文:)。
  因此,我们的最终事件可能看起来像这样。请注意,该事件没有一个 updated_at 属性,因为我们认为事件是不可改变的事实。我们不能撤消已经发生的事情。
  {    "event_name": "PostCreatedEvent",     "event_id": "0fb6a4d4-ae65-4f18-be44-edb9ace6b5bb",    "event_version": "v1.0",    "time": "2022-09-03T04:16:59.294509+00:00",    "payload": {            "user": { "user_id":"1a1269ee-6b6f-4325-8562-cb169a68e7b3", "is_blocked": false, "first_name": "Siddharth", "last_name": "R", "email": "sid@........},            "post" :{ "post_id": "fa3e7b12-4908-4d53-be11-629e6f47ae90", "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", ...... },            "thread": { "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", .....},            "params": { .... },            "errors": [ ... ],            "error_code": ""     },     "created_at": "2022-09-03T04:16:59.294"     "logged_at": "2022-09-03T04:16:59.294" }
  在我们的例子中,应用服务通过调用触发方法将事件转给一个叫做SystemEventsService的服务。 该方法在为我们实际发布该事件之前做了一系列的工作。它通过我们先前看到的连续体运行,根据我们传递给它的分区键识别队列(和相应的工作者worker)。 这几乎就是我们需要一致的散列的原因。这可以确保我们的事件总是由同一个分区(和工作者)处理。
  因此,一旦我们为我们的任务确定了工作者,我们就要求工作者 保留该事件,以备我们以后需要再来处理它 将其发布给所有相关的工作者 让我们订阅该任务的事件驱动型工作负载触发其工作流程。
  将事件分配到正确的分区
  @staticmethod async def trigger(     partition_key: str, event_dao: SystemEventDAO ):          try:         worker: SystemEventPartitionConfig = await SystemEventsService._get_worker(             partition_key=partition_key         )         worker_func = getattr(system_events_workers, worker.worker_name)         log_info(msg=f"Trigger called with worker: {worker.worker_name}")         worker_func.delay(event_dao=event_dao.json())     except (OperationalError, ConnectionError) as e:         log_error(msg=f"[RedisError] {e}", e=e, method="trigger", loc=f"{__name__}")     except Exception as e:         log_error(             msg=f"SystemEventError: {e}", e=e, method="trigger", loc=f"{__name__}"         )     return  @staticmethod async def _get_worker(partition_key: str) -> SystemEventPartitionConfig:     """For a given string it returns the worker that should process the event by running it through a murmurr hashing     function and uses that to fetch the nodes from the continuum"""      node = ring.get(key=partition_key)     nodename = node.get("nodename", None)      if not nodename:         raise ValueError("Could not find a node in the continuum for key {node}")      node_config = continuum.get(nodename, None)     if not node_config:         raise ValueError(             "Could not find a node in the continuum for key {nodename}"         )      config_attrs = {"partition_key": partition_key, "partition_id": nodename}     config_attrs = {**node_config, **config_attrs}     return SystemEventPartitionConfig(**config_attrs)
  事件驱动的系统
  下面是最好的部分:
  现在整个系统可以让你把你的应用程序作为一系列的 异步 事件处理程序来运行,这些处理程序可以在特定的事件上被调用。当一个事件到达正确的分区时,工作者会将该事件分配给一系列的事件处理程序。
  async def create_system_event(     task_type, event_dao: SystemEventDAO, db_session: Session = None ):     if not db_session:         db_session = get_session()      system_event: Union[SystemEvent, None] = None      try:         if event_dao.event_name not in SYSTEM_GENERATED_REQUEST_EVENTS:             system_event = await SystemEventsService.create(                 event_dao=event_dao, db_session=db_session             )              if system_event:                 log_info(                     msg=f"system_event with id: {system_event.id} created for event_name: {system_event.event_name}"                 )                 event_dao.id = system_event.id         else:             log_info(                 msg=f"system_generated_request_event with name: {event_dao.event_name} ready for processing."             )         await EventHandler.process(event_dao=event_dao, db_session=db_session)     except Exception as e:         log_error(             msg=f"Error handling events: {event_dao.event_name}: {e} n {traceback.print_exc()}"         )         capture_exc(error=e)     finally:         db_session.close()     return system_event
  分区工作者持久化该事件,并将其分派给EventHandler。 事件处理程序是一系列可独立部署的函数,可以做任何你想做的事情。
  如:
  handlers = [     PostInsightsGeneratorEventHandler,     ThreadActivityManagerEventHandler,     SpamDetectionEventHandler,     #...,     #....,     ImageResizerEventHandler, ]
  而我们的处理器可以以任何你喜欢的方式处理它们。在这里,我们按顺序处理它们,但它也可以并行派发:
  class EventHandler:     @staticmethod     async def process(event_dao: SystemEventDAO, db_session: Session = None):         log_info(msg=f"Event: {event_dao.event_name} arrived")                  for event_handler in handlers:             await event_handler.process(event_dao=event_dao, db_session=db_session)         return
  处理程序本身是一个相当简单的类,它检查相关的事件。
  class PostInsightsGeneratorEventHandler(event_dao: SystemEventDAO, db_session: Session):     if not event_dao.event_name == "PostCreatedEvent" :          return       log_info(msg=f"PostCreatedEventHandler called with {event_dao.event_id}")     # Do whatever you need to here
  使用Redis实现简单的事件驱动架构 [DDD、事件溯源和一致性哈希] - core27

歼20将飞进A股!千亿战斗机龙头欲借壳中航电测,股民没有50个板不卖本文来源时代财经作者张汀雯图片来源Pixabay1月11日晚,中航电测(300114。SZ)公告,将向中国航空工业集团(以下简称航空工业集团)购买其控股子公司成都飞机工业(集团)有爱旭股份2022年预盈超30亿元市值限售股1月13日解禁每经记者朱成祥每经编辑张海妮1月12日晚间,光伏电池片巨头爱旭股份(SH600732,股价37。78元,市值491。94亿元)披露2022年度业绩预盈公告,预计2022年度净利润为新能源汽车坐中欧班列到欧洲1月12日,载有50台国产新能源汽车的X8186次班列从广州国际港站驶出。此趟班列将通过霍尔果斯口岸出境前往欧洲,预计全程用时15天。这是广东开出的首列国产新能源汽车中欧班列,标志大众汽车集团(中国)22年实现纯电车型强劲增长2022年,大众汽车集团(中国)保持领先市场地位,并不断推进集团向全面网联的电动出行转型。集团在华新能源汽车交付量同比增长37。1,达到206500辆,其中,纯电动车型交付量同比增书店里搭起了帐篷?不是露营风,带你倾听远方的声音新民晚报讯(通讯员张文菁施昱辰记者袁玮)露营风围炉风不管是咖啡还是煮茶,热爱生活的人们一向善于用平常器物点亮美好生活。在徐汇区文定路218号乐开书店里,最近也支起了一顶帐篷,不过,联盟飞船泄漏受损俄罗斯将另派飞船接回宇航员新华社北京1月12日电俄罗斯国家航天集团11日宣布,下月将派另一艘联盟系列飞船前往国际空间站,代替因微流星体撞击发生冷却剂泄漏的联盟MS22号飞船,接回三名宇航员。俄航天集团总裁尤180枚!2022年火箭发射创纪录据自然网站报道,2022年全球共有180枚火箭成功发射入轨,比2021年多44枚,创历史新高。这些火箭主要来自美国太空探索技术公司(SpaceX)中国政府和企业。哈佛史密松森天体物人类曾经淘汰的器官,却再一次出现?科学家又一次陷入僵局在进化论中,达尔文解释说,地球上所有生物都是在进化过程中,从单细胞进化为多细胞生物。而且人类无疑是地球上众多生物中进化得最好的一类。但许多人对此表示怀疑。难道人类现在的样子已经进化天文学家在太阳上发现了网状的等离子体结构来自美国西南研究院美国宇航局和马克斯普朗克太阳系研究所的一组研究人员通过使用一种创新的观察方法对中日冕进行紫外线波长的成像,从而发现了太阳中日冕的网状等离子体结构。他们的发现最近发央行研究出台保交楼贷款支持计划等结构性货币政策工具1月13日下午,在国新办介绍2022年金融数据发布会上,中国人民银行货币政策司司长邹澜透露,最近在研究推出几项结构性货币政策工具,主要是重点支持房地产市场的平稳运行,包括保交楼贷款我国法律发展中国的法律体系始于先秦时期,当时的法律主要集中于组织社会关系,所有的民事案件刑事案件以及治安案件都以探讨以及实践公道原则为主。公元前221年,秦始皇采纳奴隶主张的示法政策,法律变得
埃尔多安预告,将在两国发起军事行动消灭美盟友不需要美国批准前段时间芬兰瑞典申请加入北约一事,一下子闹得北欧局势也紧张了起来,不过由于土耳其狮子大张口,向美国提出了解除因为购买俄制S400被施加的制裁等条件,这两个北欧国家加入北约的计划被暂法网女单八强出炉,美国3人,俄罗斯2人,斯瓦泰克捍卫种子尊严北京时间5月31日凌晨,法网女单8强全部出炉。美国队的高芙佩古拉斯蒂芬斯3人入围,成为了最大赢家俄罗斯的库德梅托娃卡萨特金娜2名选手入列,非常意外另外3人分别是来自加拿大的费尔南德美元真的危险了,19国抛售美债后,美国政府最大债主也要抛3万亿很多中国网友都一直坚定认为,中国应该清空美债。那么现在好消息来了,中国目前一直在持续减持美债,3月份在抛售了100多亿美元之后,目前中国持有的美债只有1。05万亿美元,这比高峰的时美媒解放军新隐身歼轰机将改变游戏规则,克制美航母力压陆战队美媒1945认为,中国正在研制自己的隐身歼轰机,并且在最近连续发文报道中国新隐身歼轰机的动向,尽管到现在官方没有任何消息和渠道,透露这款新型隐身歼击轰炸机的存在。美媒认为中国的新型乌克兰接受鱼叉反舰导弹,俄罗斯黑海舰队有危险?昨日,乌克兰接收了来自丹麦的鱼叉反舰导弹和来自美国的榴弹炮。北约国家一直对乌克兰进行各种军事援助,美国甚至通过了对乌克兰的租借法案,最新通过的援助金额高达400亿美元。早些时候,俄俄军使用核武前提是英美出兵军事专家宋忠平在接受凤凰卫视采访时表示俄军使用核武前提是美英出兵。在乌战场,俄不会使用核武,同一个民族且不会对俄军造成重大伤亡,即使是有西方军援,也达不到这一条件。但在英美出兵参战抽烟20年一定会得肺癌?不要存在侥幸心理!这4类人群需定期检查48岁的刘先生,有20年吸烟史,出现刺激性干咳半年之久,却迟迟不肯去医院检查。总以为癌症不会找上他。咳嗽严重时,刘先生就自行去药店买各种抗生素和止咳药,一开始还能缓解,越吃越没有效周二重磅!拜登将与美联储主席鲍威尔罕见会晤,关键词通胀白宫发布声明称,美国总统拜登将于当地时间5月31日周二会见美联储主席鲍威尔,两人将讨论美国和全球经济的状况。据媒体总结,截至3月的美联储主席公开日程记录显示,这是自去年11月拜登宣中俄反对美国制裁,有人坐不住了!立陶宛提议由G7替代安理会上周,为了应对美国及其盟国日益严重的威胁,朝鲜试射了弹道导弹。美国对此暴跳如雷,在安理会上提出要对朝鲜展开制裁。不过在中国和俄罗斯的阻止之下,这项制裁案没能通过,我国驻联合国代表张前列腺癌发现就是晚期?医生提醒男人出现3种症状,别大意李先生今年70岁了,之前在单位上班的时候,公司每年都会组织员工体检。可自从退休以后,他觉得体检也没查出来什么事,又贵又浪费时间,就没再做过体检。一天,李先生的儿子下班回来,听说楼下我,80后,花220万移民,到了发现被坑80万,房子买3年只住了半年这是我们讲述的第332位真人的故事我叫蒋富康一家四口在希腊,生于1987年,河北沧州献县人。小时候,家里建新房,父母就到北京打烧饼挣钱还债,一年半载才回家一次。每次离家,父亲骑摩托