Kafka很强大,可是一步出错就可能致使系统数据损坏!

前言

Apache Kafka 已成为跨微服务异步通讯的主流平台。它有不少强大的特性,让咱们可以构建健壮、有弹性的异步架构。面试

同时,咱们在使用它的过程当中也须要当心不少潜在的陷阱。若是未能提早发现可能发生(换句话说就是早晚会发生)的问题,咱们就要面对一个容易出错和损坏数据的系统了,Java中间件面试真题 +学习笔记算法

今天小编会将重点介绍其中的一个陷阱:尝试处理消息时遭遇失败。首先,咱们须要意识到消息消费可能会,并且早晚会遭遇失败。其次,咱们须要确保在处理此类故障时不会引入更多问题。数据库

Kafka 简介

网上也有一些介绍 Kafka 及其使用方法的深度文章。话虽如此,咱们这里仍是先简要回顾一下对咱们的讨论很重要的一些概念。设计模式

事件日志、发布者和消费者

Kafka 是用来处理数据流的系统。从概念上讲,咱们能够认为 Kafka 包含三个基本组件:架构

  • 一个事件日志(Event Log),消息会发布到它这里
  • 发布者(Publisher),将消息发布到事件日志
  • 消费者(Consumer),消费(也就是使用)事件日志中的消息

与 RabbitMQ 之类的传统消息队列不一样,Kafka 由消费者来决定什么时候读取消息(也就是说,Kafka 采用了拉取而非推送模式)。每条消息都有一个偏移量(offset),每一个消费者都跟踪(或提交)其最近消费消息的偏移量。这样,消费者就能够经过这条消息的偏移量请求下一条消息。异步

主题

事件日志分为几个主题(topic),每一个主题都定义了要发布给它的消息类型。定义主题是咱们这些工程师的责任,因此咱们应该记住一些经验法则:分布式

  • 每一个主题都应描述一个其余服务可能须要了解的事件。
  • 每一个主题都应定义每条消息都将遵循的一个惟一模式(schema)。

分区和分区键

主题被进一步细分为多个分区(partition)。分区使消息能够被并行消费。Kafka 容许经过一个**分区键(partition key)**来肯定性地将消息分配给各个分区。分区键是一段数据(一般是消息自己的某些属性,例如 ID),其上会应用一个算法以肯定分区。微服务

这里,咱们将消息的 UUID 字段分配为分区键。生产者应用一种算法(例如按照分区数修改每一个 UUID 值)来将每条消息分配给一个分区。学习

以这种方式使用分区键,使咱们可以确保与给定 ID 关联的每条消息都会发布到单个分区上。测试

还须要注意的是,能够将一个消费者的多个实例部署为一个消费者组。Kafka 将确保给定分区中的任何消息将始终由组中的同一消费者实例读取。

在微服务中使用 Kafka

Kafka 很是强大。因此它可用于多种环境中,涵盖众多用例。在这里,咱们将重点介绍微服务架构中最多见的用法。

跨有界上下文传递消息

当咱们刚开始构建微服务时,咱们许多人一开始采用的是某种中心化模式。每条数据都有一个驻留的单一微服务(即单一真实来源)。若是其余任何微服务须要访问这份数据,它将发起一个同步调用以检索它。

这种方法致使了许多问题,包括同步调用链较长、单点故障、团队自主权降低等。

最后咱们找到了更好的办法。在今天的成熟架构中,咱们将通讯分为命令处理和事件处理。

命令处理一般在单个有界上下文中执行,而且每每仍是会包含同步通讯。

另外一方面,事件一般由一个有界上下文中的服务发出,并异步发布到 Kafka,以供其余有界上下文中的服务消费。

左侧是咱们之前设计微服务通讯的方式:一个有界上下文(由虚线框表示)中的服务从其余有界上下文中的服务接收同步调用。右边是咱们现在的作法:一个有界上下文中的服务发布事件,其余有界上下文中的服务在本身空闲时消费它们。

例如,以一个 User 有界上下文为例。咱们的 User 团队会构建负责启用新用户、更新现有用户账户等任务的应用程序和服务。

建立或修改用户账户后,UserAccount 服务会将一个相应的事件发布到 Kafka。其余感兴趣的有界上下文能够消费该事件,将其存储在本地,使用其余数据加强它,等等。例如,咱们的 Login 有界上下文可能想知道用户的当前名称,以便在登陆时向他们致意。

咱们将这种用例称为跨边界事件发布。

在执行跨边界事件发布时,咱们应该发布聚合(Aggregate)。聚合是自包含的实体组,每一个实体都被视为一个单独的原子实体。每一个聚合都有一个“根”实体,以及一些提供附加数据的从属实体。

当管理聚合的服务发布一条消息时,该消息的负载将是一个聚合的某种表示形式(例如 JSON 或 Avro)。重要的是,该服务将指定聚合的惟一标识符做为分区键。这将确保对任何给定聚合实体的更改都将发布到同一分区。

出问题的时候怎么办?

尽管 Kafka 的跨边界事件发布机制显得至关优雅,但毕竟这是一个分布式系统,所以系统可能会有不少错误。咱们将关注也许是最多见的恼人问题:消费者可能没法成功处理其消费的消息。

图片

咱们如今该怎么办?

肯定这是一个问题

团队作错的第一件事就是根本没有意识到这是一个潜在的问题。消息失败时有发生,咱们须要制定一种策略来处理它……要未雨绸缪,而非亡羊补牢。

所以,了解这是一种早晚会发生的问题并设计针对性的解决方案是咱们要作的第一步。若是咱们作到了这一点,就应该向本身表示一点祝贺。如今最大的问题仍然存在:咱们该如何处理这种状况?

咱们不能一直重试那条消息吗?

默认状况下,若是消费者没有成功消费一条消息(也就是说消费者没法提交当前偏移量),它将重试同一条消息。那么,难道咱们不能简单地让这种默认行为接管一切,而后重试消息直到成功吗?

问题是这条消息可能永远不会成功。至少,没有某种形式的手动干预它是不会成功的。因而乎,消费者就永远不会继续处理后续的任何消息,而且咱们的消息处理将陷入困境。

好吧,咱们不能简单地跳过那条消息吗?

咱们一般容许同步请求失败。例如,对咱们的 UserAccount 服务所作的一个“create-user”POST 可能包含错误或丢失的数据。在这种状况下,咱们能够简单地返回一个错误代码(例如 HTTP 400),而后要求调用方重试。

虽然这种办法并不不理想,但这不会对咱们的数据完整性形成任何长期问题。那个 POST 表明一条命令,是尚未发生的事情。即便咱们让它失败,咱们的数据也将保持一致状态。

当咱们丢弃消息时状况并不是如此。消息表示已经发生的事件。任何忽略这些事件的消费者都将与生成事件的上游服务再也不同步。

全部这些都代表,咱们不想丢弃消息。

那么咱们如何解决这个问题呢?

对咱们来讲这不是什么容易解决的问题。所以,一旦咱们认识到它须要解决,就能够向互联网咨询解决方案。但这引出了咱们的第二个问题:网上有一些咱们可能不该该遵循的建议。

重试主题:流行的解决方案

你会发现最受欢迎的一种解决方案就是重试主题(retry topics)的概念。具体细节因实现而异,但整体概念是这样的:

  • 消费者尝试消费主要主题中的一条消息。
  • 若是未能正确消费该消息,则消费者将消息发布到第一个重试主题,而后提交消息的偏移量,以便继续处理下一条消息。
  • 订阅重试主题的是重试消费者,它包含与主消费者相同的逻辑。该消费者在消息消费尝试之间引入了短暂的延迟。若是这个消费者也没法消费该消息,则会将该消息发布到另外一个重试主题,并提交该消息的偏移量。
  • 这一过程继续,并增长了一些重试主题和重试消费者,每一个重试的延迟愈来愈多(用做退避策略)。最后,在最终重试消费者没法处理某条消息后,该消息将发布到一个死信队列(Dead Letter Queue,DLQ)中,工程团队将在该队列中对其进行手动分类。

概念上讲,重试主题模式定义了失败的消息将被分流到的多个主题。若是主要主题的消费者消费了它没法处理的消息,它会将该消息发布到重试主题 1 并提交当前偏移量,从而将自身释放给下一条消息。重试主题的消费者将是主消费者的副本,但若是它没法处理该消息,它将发布到一个新的重试主题。最终,若是最后一个重试消费者也没法处理该消息,它将把该消息发布到一个死信队列(DLQ)。

问题出在哪里?

看起来这种方法彷佛很合理。实际上,它在许多用例中都能正常工做。问题在于它不能充当一种通用解决方案。现实中存在一些特殊用例(例如咱们的跨边界事件发布),对于这些用例来讲,这种方法其实是危险的。

它忽略了不一样类型的错误

第一个问题是,它没有考虑到致使事件消费失败的两大缘由:可恢复错误和不可恢复错误。

可恢复错误指的是,若是咱们屡次重试,这些错误最终将得以解决。一个简单的示例是将数据保存到数据库的消费者。若是数据库暂时不可用,那么当下一条消息经过时,消费者将失败。一旦数据库再次变得可用,消费者就可以再次处理该消息。

从另外一个角度来看:可恢复错误指的是那些根源在消息和消费者外部的错误。解决这种错误后,咱们的消费者将继续前进,好像无事发生同样。(不少人在这里被弄糊涂了。“可恢复”一词并不意味着应用程序自己——在咱们的示例中为消费者——能够恢复。相反,它指的是某些外部资源——在此示例中为数据库——会失败并最终恢复。)

关于可恢复错误须要注意的是,它们将困扰主题中的几乎每一条消息。回想一下,主题中的全部消息都应遵循相同的架构,并表明相同类型的数据。一样,咱们的消费者将针对该主题的每一个事件执行相同的操做。所以,若是消息 A 因为数据库中断而失败,那么消息 B、消息 C 等也将失败。

不可恢复错误指的是不管咱们重试多少次都将失败的错误。例如,消息中缺乏字段可能会致使一个 NullPointerException,或者包含特殊字符的字段可能会使消息没法解析。

与可恢复错误不一样,不可恢复错误一般会影响单个孤立消息。例如,若是只有消息 A 包含不可解析的特殊字符,则消息 B 将成功,消息 C 等也将成功。

与可恢复错误不一样,解决不可恢复错误意味着咱们必须修复消费者自己(永远不要“修复”消息自己——它们是不可变的记录!)例如,咱们可能会修复消费者以便正确处理空值,而后从新部署它。

那么,这与重试主题解决方案有什么关系?

对于初学者来讲,它对可恢复错误不是特别有用。请记住,在解决外部问题以前,可恢复错误将影响每一条消息,而不只仅是当前的一条消息。所以能够确定的是,将失败的消息分流到重试主题将为下一条消息清理出通道。但接下来的消息也将失败,下一条以及再下一条也将失败。咱们最好仍是让消费者本身重试,直到问题解决为止。

不可恢复的错误呢?重试队列能够在这些状况下提供帮助。若是一条麻烦的消息阻止了全部后续消息的消费,那么毫无疑问,分流该消息确定会为咱们的用户消费清除障碍(固然,多个重试主题是不必的)。

可是,虽然重试队列能够帮助受不可恢复错误困扰的消息消费者继续前进,但它也可能带来更多隐患。下面咱们就进一步分析背后的缘由。

它会忽略排序

咱们简要回顾一下跨边界事件发布的一些重要环节。在有界上下文中处理一条命令后,咱们会将一个对应的事件发布到一个 Kafka 主题。重要的是,咱们会将聚合的 ID 指定为分区键。

为何这很重要?它确保的是对任何给定聚合的更改都会发布到同一分区。

好吧,那这一点为何会那么重要呢?当事件发布到同一分区时,能够保证各个事件按照它们发生的顺序进行处理。若是对同一聚合进行连续更改,而且所产生的事件发布到不一样的分区,就可能发生争用情况,也就是消费者在消费第一个更改以前就消费了第二个更改。这会致使数据不一致。

咱们举个简单的例子。咱们的 User 有界上下文提供了一个容许用户更改其名称的应用程序。一位用户将他的名字从 Zoey 更改成 Zoë,而后当即又更改成 Zoiee。若是咱们无论排序,则某个下游消费者(例如 Login 有界上下文)可能会先处理对 Zoiee 的更改,而后不久用 Zoë覆盖它。

如今,登陆数据与咱们的用户数据已经不一样步了。更麻烦的是,每当 Zoiee 登陆咱们的网站时都会看到“欢迎光临,Zoë!”的登陆提示。

这才是重试主题真正出问题的地方。它们让咱们的消费者容易打乱处理事件的顺序。若是一个消费者在处理 Zoë更改时受到某个临时的数据库中断的影响,它会把这个消息分流到一个重试主题,稍后再尝试。若是在 Zoiee 更改到达时数据库中断已获得纠正,则这条消息将先被成功处理,而后再由 Zoë更改覆盖。

为了说明问题,这里用了 Zoiee/Zoë这样一个简单的示例。实际上,乱序处理事件可能致使会各类各样的数据损坏问题。更糟糕的是,这些问题不多会在一开始就被注意到。相反,它们所致使的数据损坏每每在一段时间内都不会引发注意,但损坏程度会随着时间的推移而增加。通常来讲,当咱们意识到发生了什么事情时,已经有大量数据受到影响了。

重试主题何时可行?

须要明确的是,重试主题并不是一直都是错误的模式。固然,它也存在一些合适的用例。具体来讲,当消费者的工做是收集不可修改的记录时,这种模式就很不错。这样的例子可能包括:

  • 处理网站活动流以生成报告的消费者
  • 将交易添加到分类帐的消费者(只要这些交易用不着按特定顺序跟踪)
  • 正在从另外一个数据源 ETL 数据的消费者

这类消费者可能会从重试主题模式中受益,同时没有数据损坏的风险。

不过,请注意

即便存在这种用例,咱们仍应谨慎行事。构建这样的解决方案既复杂又耗时。所以,做为一个组织,咱们不想为每一个新的消费者编写一个新的解决方案。相反,咱们要建立一个统一的解决方案,好比一个库或一个容器等,能够在各类服务之间重复使用。

还存在另外一个问题。咱们可能会为相关消费者构建一个重试主题的解决方案。不幸的是,不久以后,这个解决方案就会进入跨边界事件发布消费者的领域了。拥有这些消费者的团队可能没有意识到风险的存在。正如咱们前面所讨论的那样,在发生重大数据损坏以前,他们可能不会意识到任何问题。

所以,在实现重试主题解决方案以前,咱们应 100%肯定:

  • 咱们的业务中永远不会有消费者来更新现有数据,或者
  • 咱们拥有严格的控制措施,以确保咱们的重试主题解决方案不会在此类消费者中实现

咱们如何改善这种模式?

鉴于重试主题模式可能不是跨边界事件发布消费者的可接受解决方案,咱们是否能够对其作一些调整来改善它呢?

一开始,本文想要提供一种完整的解决方案。但以后我意识到,并不存在什么万能的路径。所以,咱们将只讨论一些在制定合适解决方案时须要考虑的事项。

消除错误类型

若是咱们可以在可恢复错误和不可恢复错误之间消除歧义,生活就会变得轻松许多。例如,若是咱们的消费者开始遇到可恢复错误,那么重试主题就变得多余了。

所以,咱们能够尝试肯定所遇到的错误类型:

void processMessage(KafkaMessage km) {
  try {
    Message m = km.getMessage();
    transformAndSave(m);
  } catch (Throwable t) {
    if (isRecoverable(t)) {
      // ...
    } else {
      // ...
    }
  }
}

在上面的 Java 伪代码示例中,isRecoverable()将采用一种白名单方法来肯定 t 是否表示可恢复错误。换句话说,它检查 t 以肯定它是否与任何已知的可恢复错误(例如 SQL 链接错误或 ReST 客户端超时)相匹配,若是匹配则返回 true,不然返回 false。这样就能防止咱们的消费者被不可恢复错误一直阻塞下去。

诚然,要在可恢复错误和不可恢复错误之间消除歧义可能很困难。例如,一个 SQLException 可能指的是一次数据库故障(可恢复)或一次约束违反情况(不可恢复)。若有疑问,咱们可能应该假设错误是不可恢复的——为此要冒的风险是将其余好的消息发送给隐藏主题,从而延迟它们的处理……但这也能避免咱们无心间陷入泥潭,无休止地尝试处理不可恢复错误。

在消费者内重试可恢复错误

正如咱们所讨论的那样,存在可恢复错误时,将消息发布到重试主题毫无心义。咱们只会为下一条消息的失败扫清道路。相反,消费者能够简单地重试,直到条件恢复。

固然,出现可恢复错误意味着外部资源存在问题。咱们不断对这块资源发送请求是无济于事的。所以,咱们但愿对重试应用一个退避策略。咱们的伪 Java 代码如今可能看起来像这样:

void processMessage(KafkaMessage km) {
  try {
    Message m = km.getMessage();
    transformAndSave(m);
  } catch (Throwable t) {
    if (isRecoverable(t)) {
      doWithRetry(m, Backoff.EXPONENTIAL, this::transformAndSave);
    } else {
      // ...
    }
  }
}

(注意:咱们使用的任何退避机制都应配置为在达到某个阈值时向咱们发出警报,并通知咱们潜在的严重错误)

遇到不可恢复错误时,将消息直接发送到最后一个主题

另外一方面,当咱们的消费者遇到不可恢复错误时,咱们可能但愿当即隐藏(stash)该消息,以释放后续消息。但在这里使用多个重试主题会有用吗?答案是否认的。在转到 DLQ 以前,咱们的消息只会经历 n 次消费失败而已。那么,为何不从一开始就将消息粘贴在那里呢?

与重试主题同样,这个主题(在这里,咱们将其称为隐藏主题)将拥有本身的消费者,其与主消费者保持一致。但就像 DLQ 同样,这个消费者并不老是在消费消息;它只有在咱们明确须要时才会这么作。

考虑排序

来看看排序的状况。咱们在这里重用以前的“用户/登陆”示例。尝试处理 Zoë名称中的ë字符时,Login 消费者可能会遇到错误。消费者将其识别为一个不可恢复错误,将消息放在一边,而后继续处理后续消息。不久以后,消费者将得到 Zoiee 消息并成功处理它。

Zoë消息已隐藏,而且 Zoiee 消息如今已成功处理完毕。目前,两个有界上下文之间的数据是一致的。

晚些时候,咱们的团队会修复消费者,以便其能够正确处理特殊字符并从新部署它。而后,咱们将 Zoë消息从新发布给消费者,消费者如今能够正确处理该消息了。

当更新的消费者随后处理隐藏的 Zoë消息后,两个有界上下文之间的数据将变得不一致。所以,当 User 有界上下文将用户视为 Zoiee 时,Login 有界上下文会将她称为 Zoë。

显然,咱们没有保持排序;Zoë是在 Zoiee 以前由 Login 消费者处理的,但正确的顺序是倒过来的。隐藏一条消息后,咱们能够开始隐藏全部消息,但在那种状况下咱们实际上会陷入困境。幸运的是,咱们不须要保持全部消息的顺序,只需考虑与单个聚合相关联的消息便可。所以,若是咱们的消费者能够跟踪已隐藏的特定聚合,它就能够确保属于同一聚合的后续消息也被隐藏。

收到隐藏主题中消息的警报后,咱们能够取消部署消费者并修复其代码(请注意:切勿修改消息自己;消息表明不可变的事件!)在修复并测试了咱们的消费者以后,咱们能够从新部署它。固然,在继续使用主要主题以前,咱们将须要特别注意先处理隐藏主题中的全部记录。这样,咱们将继续保持正确的排序状态。出于这个缘由,咱们将首先部署隐藏消费者,而且只有在其完成时(这意味着消费者组中的全部实例都完成,若是咱们使用了多个消费者),咱们才会取消部署它并部署主消费者。

咱们还应该考虑如下事实:固定的消费者处理了隐藏消息后,它仍可能会遇到其余错误。在这种状况下,其错误处理行为应像咱们以前描述的那样:

  • 若是错误是可恢复的,则使用退避策略重试;
  • 若是错误是不可恢复的,它将隐藏消息并继续下一条消息。

为此,咱们能够考虑使用第二个隐藏主题。

能够接受一些数据不一致?

这样的系统构建起来可能会变得至关复杂。它们可能很难构建、测试和维护。所以,某些组织可能会想要肯定出数据不一致的可能性,并判断他们是否能够承受这种风险。

在许多状况下,这些组织可能会采用数据协调机制,以使他们的数据最终(是相对较长的“最终”)变得一致。为此也存在许多策略(超出了本文的范围)。

总结

处理重试彷佛很复杂,那是由于它就是这么麻烦——和一切正常时 Kafka 相对优雅的风格相比之下尤为明显。咱们构建的任何合适的解决方案(不管是重试主题、隐藏主题仍是其余解决方案)都将比咱们想要的更复杂。

不幸的是,若是咱们但愿在微服务之间创建弹性的异步通讯流,那么咱们就不能忽略它。

本文介绍了一种流行的解决方案、它的缺点以及在设计替代解决方案时应考虑的一些事项。到最后,想要构建正确的解决方案,咱们就应该牢记一些事情,例如:

  • 了解 Kafka 经过主题、分区和分区键提供的功能。
  • 考虑到可恢复错误与不可恢复错误之间的差别。
  • Java中间件面试真题 +学习笔记
  • 设计模式的用法,例若有界上下文和聚合。
  • 不管如今仍是未来,都要搞清楚咱们组织的用例特性。咱们只是在移动独立的记录吗?……在这种状况下,咱们可能不关心排序;仍是说咱们正在传播表示数据更改的事件?……在这种状况下,排序相当重要。
  • 仔细考虑咱们是否愿意承受任何水平的数据不一致。