本文共 7639 字,大约阅读时间需要 25 分钟。
事件驱动是一种灵活的系统设计方法,在事件驱动的系统中,当数据发生变化时系统会产生、发布一个对应的事件,其它对这个事件感兴趣的部分会接收到通知,并进行相应的处理。事件驱动设计最大的好处在我看来有两点:一是它为系统提供了很好的扩展能力,比如我们可以对某类事件增加一个订阅者来对系统进行扩展,最主要的是我们并不需要修改任何已有的代码,它完全符合开闭原则;二是它实现了模块间的低偶合,系统间各个部分不是强依赖关系,而是通过事件把整个系统串联起来。
当然,任何事务都有两面性,事件驱动也有其不好的方面。首先,实现一套这样的系统复杂度就很高,对开发人员的要求也很高;再次,对系统的整体把控会很困难,想象一下面对几百个类别的事件,并且没有一个统一的地方可以让我们看到整个业务处理流程,会是什么心情?所以当我们决定采用事件驱动实现系统中,一定要维护好相关的文档,并保持它们的有效性。我们再来看看事件驱动架构的一些其它的优点:更好的响应性事件驱动中,事件的响应是异步处理的,所以它具有更好的响应性。更好的容错性业务主流程在发布事件之后便结束了,扩展流程的延后处理可以异步不断的失败重试,直到成功为止,系统整体容错性更强。设计篇首先,我们需要定义什么是事件?从业务角度看,事件包括以下属性:事件的定义属性字段类型说明标识IDstring系统内部每个事件都需要一个唯一的标识。类型eventTypestring数据发生变化产生事件,不同类型的数据变化产生不同类型的事件。比如会员下单、会员注册、用户修改手机号等等。时间eventTimedatetime即数据发生变化的时间。上下文contextstring事件发生时的上下文信息。比如会员修改手机号事件,需要原号码和新号码,会员 ID 等信息。接下来,我们看看如何设计一套基于事件驱动的系统,你知道设计模式中的观察者模式吗?观察者模式 :定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知所有观察者对象,使它们能够自动更新自己。观察者模式 :定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态发生变化时,会通知所有观察者对象,使它们能够自动更新自己。观察者模式天生就是事件驱动的一个实现,但是直接使用它有很多的弊端。首先,它是基于主题的,有多少类事件就需要多少个主题类,这可能会导致类爆炸;其次,观察者模式是同步实现的,这样我们可能会牺牲掉响应性和容错性等优势。所以我们需要对观察者模式稍作改进,我们分别从事件发发布和消费两个方面来分析。事件的发布本文的标题是《基于 Kafka 实现事件驱动架构》,很明显,我们使用 kafka 作为消息中间件来传递事件消息。所以,像修改会员手机号码的代码可能实现如下:@Transactional(readOnly =false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)// 发布 用户手机号码变更 事件
Event event =newEvent(...);// 创建一个事件对象,表示用户修改手机号码ProducerRecord record =newProducerRecord(...event);// 根据 event 生成 kakfa recordFuture<RecordMetadata> f = kafkaProducer.send(record);
try{ f.get();}catch(InterruptedException | ExecutionException e) { thrownewRuntimeException(e);}}这段代码正确吗?从逻辑上看,它完全正确。但从可靠性角度看它是有问题的。Kafka 和数据库是两个异构系统,我们不能仅仅通过一个本地事务保证他们之间的数据一致性。例如,推送 Kafka 成功了,但是在提交 DB 事务的时候失败了呢(比如说事务超时滚)?这样 kafka 中就会存在一个脏数据,因为本地数据库事务已经回滚了。
分布式系统数据一致性一直就是复杂的问题,常用的方案有两阶段提交、三阶段提交、zookeeper 的 zab 协议、proxs、raft 等算法,这不是本文的重点。我们采用一个简单易懂的方式来解决上面的问题。我们引入一张 DB 事件表,在发布事件时将事件信息存入这个事件表,将事件的发布和业务处理包装在同一个本地事务中。createtableifnotexistsevent_queue
(id
bigintnotnullauto_incrementcomment'主键',event_id
char(32)notnullcomment'事件 ID',event_type
char(12)notnullcomment'事件类型',event_time
datetimenotnullcomment'事件发生时间',context
mediumtextnotnullcomment'事件内容',primarykey(id
),uniquekey(event_id
))engine=innodbdefaultcharset=utf8comment='事件队列表'; 发布事件,就是向这个事件表中增加一条记录,修改会员手机号码的代码现在变成了:
@Transactional(readOnly =false, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)// 发布 用户手机号码变更 事件
Event event =newEvent(...);// 创建一个事件对象,表示用户修改手机号码eventDao.insert(event);// 向事件表创建一条新记录。}由于事件消息现在被暂存进了 DB,我们还需要将它取出来推到 Kafka,为此我们需要起一个线程,不断的读取事件表中的记录发送给 Kafka,并在成功发送之后将记录从 DB 中删除。如果删除 DB 的时候失败了,那么消息会被重新推送到 kafka,意为着我们实现的是 At least once 的递交语义,对于业务上不接受重复的场景,在消费端需要做好幂等处理。
讲到这里,关于事件的分布已经接近尾声,但还有一个问题: 性能 。如果一个系统的负载很高,一秒内产生成千上万个事件,那我们的事件表就会成为瓶颈,因为只用了一个线程来处理事件表向 Kafka 的推送,集群中只有一个实例能发辉作用,无法实现弹性。为了解决这个问题,我们可以对事件表进行分表,并使用多线程并发处理,而且这些线程可以分布在不同的集群实例中。但这样使设计变得更复杂了,现在我们需要解决一个新的问题: 如何保证一个事件表,最多只被一个线程处理? 我们需要保证一个事件表同一时刻只能被一个线程处理,同时在实例宕机后,其它实例可以起线程接替它的工作。这句话我们换一种方式来描述更容易理解:集群有 M 个实例,需要进行 N 个任务(任务是把事件分表中的事件信息推送到 kafka)一个任务最多可以分配给 1 个实例,1 个实例可以同时执行多个任务。如果一个实例宕机了,分配给它的任务需要重新在其它实例上分配。N 个任务固定不变,实例可以动态增加或减少,需要实现实例之间的均衡负载。如果你熟悉像 HBase、ES 这类分布式系统的话,不难理解我们需要在集群中选出一个实例作为 Master,由它来负责任务在集群中的分配工作。我们借助 Kookeeper,所有实例在启动时创建一个 EPHEMERAL 类型的 master 节点,创建成功的实例成为 Master,其它实例则监听 master 节点,当 Master 实例宕机后重新竞选。每个实例启动后,会在 workers 节点下创建一个临时节点,表示自己作为一个 Worker 加入集群;Worker 同时会监听自己创建的子节点,接收由 Master 分配给自己的任务。Master 会监听 workers 下子节点的变化,当实例下线或有新的实例加入集群中时,Master 会收到通知并重新进行任务的分配。分配的具体信息保存在 Worker 实例创建的子节点中,Master 通过直接修改这些子节点的内容实现分配。
从事件的发布来看,系统的架构是这样的:这里有个细节需要说明: 因为 Kafka 只保证 partition 级别的有序性,我们的事件分表数必须大于或等于 partition 的数量,否则事件的顺序得不到保证 。
事件的消费因为我们使用了 Kafka 作为事件消息中间件,事件的消费简单很多。每个实例在启动时启一个 Kafka Consumer 即可,像实例间的负载、可用性、故障转移等等问题,Kafka 已经帮我们解决了,我们只需要从 Kafka 中获取事件消息,并通知相应的订阅者即可。订阅者需要实现 BaseSubscriber 接口,另外在启动时,需要把事件与订阅者的关系维护在 SubscriberConfig 类中:
BaseSubscriber sub = ...// your implementationSubscriberConfig.instance().addSubscriber("event_type", sub);系统整体的设计是面向扩展的,我们可以通过调整集群应用实例数、事件表分表数量和 kafka partitions 数量来提高系统整体的吞吐量。事件表分表越多,事件消息从 DB 到 kafka 的延迟就更低;应用实例越多,系统单位时间内能承受的事件上限也越多,另外也能更好的负载 kafka 消息的消费。
每一个应用,作为事件发布者,其产生的事件最终都被推送到一个 Kafka Topic;但作为消费者,可以订阅不同的 Topic,这些 Topic 可以是自己的推送的,也可以是其它应用推送的事件。实现篇这里我们只对部分核心代码作一个简单的介绍:SimpleLock 是一个基于 Zookeeper 的简单分布式锁实现,我们使用 SimpleLock 来实现 Master 的竞选。EventSubmitter 是一个线程,负责把事件表中的事件信息推送到 Kafka broker。初始化时需要传入一个 int 参数,表示处理哪一个事件分表。它被实现成一个响应中断的线程,因为当 Master 重新分配任务后,Worker 需要先停掉当前进行中的任务。Master 类是 Master 实例的主要实现。实例在启动时会调 Master 类的 start 方法,Master 实例监听 workers 节点,当有新实例加入或实例下线时,Master 实例会调用 onWorkerChange 方法进行重新分配, onWorkerChange 方法实现了一个简单的分配算法,只有任务变更的 Worker 实例会收到分配通知。Worker 类是 Worker 实例的主要实现,实例在启动时会调 Worker 类的 start 方法。集群中的每一个实例都是 Worker,会在 workers 节点下创建一个临时的节点表示自己,同时监听该节点,接受 Master 分配给自己的任务。当 Worker 接收到分配通知时,会先停止当前在运行的所有任务,再根据 worker 节点的内容开始执行新分配的任务。示例来看一个具体的事例,假设我们要以天为维度,统计每天的下单量和下单金额。现在,我们已经有了订单表:createtableifnotexistsorder
(order_id
bigintnotnullauto_incrementcomment'主键',user_id
bigintnotnullcomment'客户 id',order_time
datetimenotnullcomment'订单时间',order_amount
intnotnullcomment'订单金额,单位:分',primarykey(order_id
))engine=innodbdefaultcharset=utf8comment='订单表'; 这个需求我们可以简单的使用 sql 来做,比如:
selectdate(order_time)as day,count(*)as total_num,sum(order_amount)as total_amount fromorder
group bydate(order_time){1} 但是在生产环境中这么做往往不现实,比如性能问题、或者我们对订单表做了分表、或者几个月前的数据库了备份,而你正好需要查询这些数据,等等。实现这个需求更好的方式是采用事件驱动,在下单的时候发布一个事件,然后异步的维护一个查询表,这样之间的种种问题都将不复存在。先创建一个查询表,如下:
createtableifnotexistsdaily_order_report
(id
bigintnotnullauto_incrementcomment'主键',day
datenotnullcomment'统计日',order_num
bigintnotnullcomment'订单数量',order_total
bigintnotnullcomment'订单总金额,单位:分',primarykey(id
),uniquekey(day
))engine=innodbdefaultcharset=utf8comment='订单日报表'; 在下单的时候,我们需要发布一个 下单事件 :
@Transactional(readOnly =false, propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED, rollbackFor = Exception.class)// 发布下单事件
publisher.publish("order_created",newDate(), order.json(), order.getUserId().intValue() % Configuration.instance().getNumOfEventTables());}之后,我们需要实现一个订阅者,在接收到 下单事件 后,根据订单的日期做相应的统计:
@Autowired
privateOrderRepos repos;@Override
publicvoidonEvent(Event e){ Order order = Order.fromJson(e.getContext());DailyOrderReport report = repos.selectDailyOrderReportByKey(newjava.sql.Date(order.getOrderTime().getTime()));if(null== report) {
report =newDailyOrderReport();report.setDay(newjava.sql.Date(order.getOrderTime().getTime()));report.setOrderNum(1l);report.setOrderTotal(newLong(order.getOrderAmount()));repos.createDailyOrderReport(report);
}else{ report.setOrderNum(report.getOrderNum() +1);report.setOrderTotal(report.getOrderTotal() + order.getOrderAmount());repos.updateDailyOrderReport(report);
}}}随机创建 10 个订单后,我们的报表情况如下:
mysql>select*fromorder
;+----------+---------+---------------------+--------------+| order_id | user_id | order_time | order_amount |+----------+---------+---------------------+--------------+| 21 | 3 | 2018-09-24 01:06:43 | 251 || 22 | 2 | 2018-09-24 01:06:43 | 371 || 23 | 5 | 2018-09-24 01:06:43 | 171 || 24 | 0 | 2018-09-24 01:06:43 | 904 || 25 | 3 | 2018-09-24 01:06:43 | 55 || 26 | 5 | 2018-09-24 01:06:44 | 315 || 27 | 8 | 2018-09-24 01:06:44 | 543 || 28 | 8 | 2018-09-24 01:06:44 | 537 || 29 | 2 | 2018-09-24 01:06:44 | 123 || 30 | 3 | 2018-09-24 01:06:45 | 938 |+----------+---------+---------------------+--------------+10 rows inset(0.00sec) mysql>select*fromdaily_order_report;
+----+------------+-----------+-------------+| id | day | order_num | order_total |+----+------------+-----------+-------------+| 2 | 2018-09-24 | 10 | 4208 |+----+------------+-----------+-------------+1 row inset(0.00sec)mysql>
转载于:https://blog.51cto.com/14158311/2352487