使用Redis实现简单的事件驱动架构DDD事件溯源和一致性哈希
用 Redis 模拟 Kafka实现事件驱动 架构:[DDD、事件溯源和一致性哈希],
Apache Kafka 已成为大多数技术栈中的主流组件。使用 Kafka 的好处包括确保事件中的因果顺序,同时保持并行性,通过在服务器之间快速复制分区来恢复故障,等等。
然而,运行 Kafka 也面临着一系列挑战。虽然许多工程团队都希望将 Kafka 添加到他们的堆栈中并与"真正的"工程师一起赢得一席之地,但运营开销构成了强大的进入障碍。
在这篇文章中,我们将重点介绍如何构建一个看起来像传统单体应用程序但又是松散耦合的事件驱动系统的系统。为此,我们依赖于从领域驱动设计、事件溯源和一致性哈希等概念中学习。
有序事件
大多数系统关心事件的顺序。大多数系统中的排序仅限于所考虑的域。例如,当我们查看帖子到一个线程时,我们关心的是相对于帖子的排序。当我们查看金融系统时,排序主要限于账户。大型系统中事件的全局排序很少有用,但可能是相关的。
场景:帖子被添加到一个线程Thread中
假设我们对每个添加的帖子都有相当多的后期处理,这反过来会更新线程的某些属性。
这创建了一个相当好的场景来说明分区的使用。
在这种情况下,默认方法是将所有帖子发送到队列中,并让一群工作人员(或消费者)完成工作。这为我们提供了系统所需的并行性,但在我们与多个消费者打交道的那一刻,顺序就会丢失。
我们保留顺序的唯一方法是确保我们一次处理一个任务,从而才能反映该线程上发生的事情的真实顺序。
下一个明显的想法是使用每个线程的专用队列来处理相同的问题,但如果我们知道我们将生成大量线程,那立即感觉像是矫枉过正。
分区
分区只是将我们的排队系统分解为专门的分区。因此,如果我们从一个天真的估计开始,即8个工人每分钟能够处理1600个事件,那么我们的设计就从16个分区开始。
你可能需要做更多的工作来确保你的估计是好的,但在这个例子中,我们将以假设它是好的来工作。我们还为一个分区分配一个worker,因为我们希望每个分区都能始终保持因果排序。
现在我们需要确保一个特定线程的帖子都被路由到同一个分区。每个分区都由一个消费者管理,所以我们的排序不会被打乱。
重要的是要记住,"队列分区 "或 "专用分区 "是一个抽象的结构。它实际上只是一个队列。我们使用分区这个术语,因为它使我们很容易与该领域广泛使用的术语保持一致。
一致性哈希
我们将使用一致哈希散列作为一种手段,将属于特定线程的所有帖子路由到同一队列分区(或队列)。
在我们的例子中,我们将使用Murmurhash和一个由名为uHashRing的库管理这个持续体。
将我们的队列视为一个连续体
现在,如果我们简单地将所有的8个队列放在一个圆圈中,我们会得到这样的结果。让我们把这个称为连续体,因为第7个队列后面是第一个队列,即第0个队列。
现在,一致性散列允许我们使用threadId将一个给定的任务/工作映射到一个特定的队列。因此,在这种情况下,我们使用threadId作为分区的关键。
这里需要注意的一个重要方面是,我们没有把我们的队列称为后处理队列。它们不是专用队列。你可以把一个Transaction事务事件扔到这里,并期望相应的消费者(和事件处理程序)来处理它。
事件
在前面的几段话中,我们已经说了很多关于事件的内容,但我们还没有真正定义事件的含义。
我们的系统会把事件看成是发生在我们系统中的事实。
事实通常是指以某种方式改变了系统状态的事情(或者是失败的事情)。
例如,PostCreatedEvent发生在一个新帖子被创建时。同样地,当帖子被更新时,PostUpdatedEvent也会发生。
你可以将一个事件映射到你系统中的大多数CRUD操作。
如果将你的系统设计成领域,你会惊讶地发现一个应用服务所触发的事件的数量。
一个事件也映射了系统的周围状态。
让我们设计一个创建帖子的应用服务:
from typing import List from .services.base import ServiceBase from sqlalchemy.session import Session class PostService(ServiceBase) def __init__(self, thread_id: UUID, params: PostCreateAPIParams, db_session: Session): self.thread_id = thread_id self.params = params self.user: Union[User, None] = None self.post: Union[Post, None] = None self.db_session = db_session self.errors: List[str] = [] self.error_code: Union[str, None] = None async def __call__(self) return await self.invoke() async def invoke(self): await self.find_thread() await self.verify_author() await self.create_post() await self.build_response_dao() await self.trigger_events() return self async def find_thread(self): # truncated for brevity pass async def trigger_events(self): user_dao: UserDAO = UserDAO.from_orm(self.user) if self.user else None post_dao: PostDAO = PostDAO.from_orm(self.post) if self.post else None thread_dao: ThreadDAO = ThreadDAO.from_orm(self.thread) if self.thread else None if await self.has_errors: event_dao = PostCreatedEventDAO( user=user_dao, thread=thread_dao, post=post_dao, params=self.params, errors=self.errors, error_code= self.error_code ) else: event_dao = PostCreationFailedEventDAO( user=user_dao, thread=thread_dao, params=self.params, post=post_dao errors=self.errors, error_code= self.error_code ) partition_key = ( str(self.thread.id) if self.thread else "PostCreationFailedEvent" ) await SystemEventService.trigger(partition_key=partition_key, event_dao=event_dao, db_session=self.db_session) return self
在这个例子中, trigger_events方法决定了要发布的事实。在这种情况下,它收集了周围的上下文。这也可能包括请求参数(如传递到事件中的params属性)。然而,什么是正确捕获的上下文也取决于上下文:)。
因此,我们的最终事件可能看起来像这样。请注意,该事件没有一个 updated_at 属性,因为我们认为事件是不可改变的事实。我们不能撤消已经发生的事情。
{ "event_name": "PostCreatedEvent", "event_id": "0fb6a4d4-ae65-4f18-be44-edb9ace6b5bb", "event_version": "v1.0", "time": "2022-09-03T04:16:59.294509+00:00", "payload": { "user": { "user_id":"1a1269ee-6b6f-4325-8562-cb169a68e7b3", "is_blocked": false, "first_name": "Siddharth", "last_name": "R", "email": "sid@........}, "post" :{ "post_id": "fa3e7b12-4908-4d53-be11-629e6f47ae90", "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", ...... }, "thread": { "thread_id": "666d0404-0756-4b93-892f-19d9b4b25a99", .....}, "params": { .... }, "errors": [ ... ], "error_code": "" }, "created_at": "2022-09-03T04:16:59.294" "logged_at": "2022-09-03T04:16:59.294" }
在我们的例子中,应用服务通过调用触发方法将事件转给一个叫做SystemEventsService的服务。 该方法在为我们实际发布该事件之前做了一系列的工作。它通过我们先前看到的连续体运行,根据我们传递给它的分区键识别队列(和相应的工作者worker)。 这几乎就是我们需要一致的散列的原因。这可以确保我们的事件总是由同一个分区(和工作者)处理。
因此,一旦我们为我们的任务确定了工作者,我们就要求工作者 保留该事件,以备我们以后需要再来处理它 将其发布给所有相关的工作者 让我们订阅该任务的事件驱动型工作负载触发其工作流程。
将事件分配到正确的分区
@staticmethod async def trigger( partition_key: str, event_dao: SystemEventDAO ): try: worker: SystemEventPartitionConfig = await SystemEventsService._get_worker( partition_key=partition_key ) worker_func = getattr(system_events_workers, worker.worker_name) log_info(msg=f"Trigger called with worker: {worker.worker_name}") worker_func.delay(event_dao=event_dao.json()) except (OperationalError, ConnectionError) as e: log_error(msg=f"[RedisError] {e}", e=e, method="trigger", loc=f"{__name__}") except Exception as e: log_error( msg=f"SystemEventError: {e}", e=e, method="trigger", loc=f"{__name__}" ) return @staticmethod async def _get_worker(partition_key: str) -> SystemEventPartitionConfig: """For a given string it returns the worker that should process the event by running it through a murmurr hashing function and uses that to fetch the nodes from the continuum""" node = ring.get(key=partition_key) nodename = node.get("nodename", None) if not nodename: raise ValueError("Could not find a node in the continuum for key {node}") node_config = continuum.get(nodename, None) if not node_config: raise ValueError( "Could not find a node in the continuum for key {nodename}" ) config_attrs = {"partition_key": partition_key, "partition_id": nodename} config_attrs = {**node_config, **config_attrs} return SystemEventPartitionConfig(**config_attrs)
事件驱动的系统
下面是最好的部分:
现在整个系统可以让你把你的应用程序作为一系列的 异步 事件处理程序来运行,这些处理程序可以在特定的事件上被调用。当一个事件到达正确的分区时,工作者会将该事件分配给一系列的事件处理程序。
async def create_system_event( task_type, event_dao: SystemEventDAO, db_session: Session = None ): if not db_session: db_session = get_session() system_event: Union[SystemEvent, None] = None try: if event_dao.event_name not in SYSTEM_GENERATED_REQUEST_EVENTS: system_event = await SystemEventsService.create( event_dao=event_dao, db_session=db_session ) if system_event: log_info( msg=f"system_event with id: {system_event.id} created for event_name: {system_event.event_name}" ) event_dao.id = system_event.id else: log_info( msg=f"system_generated_request_event with name: {event_dao.event_name} ready for processing." ) await EventHandler.process(event_dao=event_dao, db_session=db_session) except Exception as e: log_error( msg=f"Error handling events: {event_dao.event_name}: {e} n {traceback.print_exc()}" ) capture_exc(error=e) finally: db_session.close() return system_event
分区工作者持久化该事件,并将其分派给EventHandler。 事件处理程序是一系列可独立部署的函数,可以做任何你想做的事情。
如:
handlers = [ PostInsightsGeneratorEventHandler, ThreadActivityManagerEventHandler, SpamDetectionEventHandler, #..., #...., ImageResizerEventHandler, ]
而我们的处理器可以以任何你喜欢的方式处理它们。在这里,我们按顺序处理它们,但它也可以并行派发:
class EventHandler: @staticmethod async def process(event_dao: SystemEventDAO, db_session: Session = None): log_info(msg=f"Event: {event_dao.event_name} arrived") for event_handler in handlers: await event_handler.process(event_dao=event_dao, db_session=db_session) return
处理程序本身是一个相当简单的类,它检查相关的事件。
class PostInsightsGeneratorEventHandler(event_dao: SystemEventDAO, db_session: Session): if not event_dao.event_name == "PostCreatedEvent" : return log_info(msg=f"PostCreatedEventHandler called with {event_dao.event_id}") # Do whatever you need to here
使用Redis实现简单的事件驱动架构 [DDD、事件溯源和一致性哈希] - core27
歼20将飞进A股!千亿战斗机龙头欲借壳中航电测,股民没有50个板不卖本文来源时代财经作者张汀雯图片来源Pixabay1月11日晚,中航电测(300114。SZ)公告,将向中国航空工业集团(以下简称航空工业集团)购买其控股子公司成都飞机工业(集团)有
爱旭股份2022年预盈超30亿元市值限售股1月13日解禁每经记者朱成祥每经编辑张海妮1月12日晚间,光伏电池片巨头爱旭股份(SH600732,股价37。78元,市值491。94亿元)披露2022年度业绩预盈公告,预计2022年度净利润为
新能源汽车坐中欧班列到欧洲1月12日,载有50台国产新能源汽车的X8186次班列从广州国际港站驶出。此趟班列将通过霍尔果斯口岸出境前往欧洲,预计全程用时15天。这是广东开出的首列国产新能源汽车中欧班列,标志
大众汽车集团(中国)22年实现纯电车型强劲增长2022年,大众汽车集团(中国)保持领先市场地位,并不断推进集团向全面网联的电动出行转型。集团在华新能源汽车交付量同比增长37。1,达到206500辆,其中,纯电动车型交付量同比增
书店里搭起了帐篷?不是露营风,带你倾听远方的声音新民晚报讯(通讯员张文菁施昱辰记者袁玮)露营风围炉风不管是咖啡还是煮茶,热爱生活的人们一向善于用平常器物点亮美好生活。在徐汇区文定路218号乐开书店里,最近也支起了一顶帐篷,不过,
联盟飞船泄漏受损俄罗斯将另派飞船接回宇航员新华社北京1月12日电俄罗斯国家航天集团11日宣布,下月将派另一艘联盟系列飞船前往国际空间站,代替因微流星体撞击发生冷却剂泄漏的联盟MS22号飞船,接回三名宇航员。俄航天集团总裁尤
180枚!2022年火箭发射创纪录据自然网站报道,2022年全球共有180枚火箭成功发射入轨,比2021年多44枚,创历史新高。这些火箭主要来自美国太空探索技术公司(SpaceX)中国政府和企业。哈佛史密松森天体物
人类曾经淘汰的器官,却再一次出现?科学家又一次陷入僵局在进化论中,达尔文解释说,地球上所有生物都是在进化过程中,从单细胞进化为多细胞生物。而且人类无疑是地球上众多生物中进化得最好的一类。但许多人对此表示怀疑。难道人类现在的样子已经进化
天文学家在太阳上发现了网状的等离子体结构来自美国西南研究院美国宇航局和马克斯普朗克太阳系研究所的一组研究人员通过使用一种创新的观察方法对中日冕进行紫外线波长的成像,从而发现了太阳中日冕的网状等离子体结构。他们的发现最近发
央行研究出台保交楼贷款支持计划等结构性货币政策工具1月13日下午,在国新办介绍2022年金融数据发布会上,中国人民银行货币政策司司长邹澜透露,最近在研究推出几项结构性货币政策工具,主要是重点支持房地产市场的平稳运行,包括保交楼贷款
我国法律发展中国的法律体系始于先秦时期,当时的法律主要集中于组织社会关系,所有的民事案件刑事案件以及治安案件都以探讨以及实践公道原则为主。公元前221年,秦始皇采纳奴隶主张的示法政策,法律变得