Apache Kafka®能够确保消息的顺序性吗?
Apache Kafka®真的能保留消息排序吗?
2023年3月21日 -> 2023年3月21号
Apache Kafka® 能够真正保持消息的顺序吗?
Apache Kafka®被称为可以按照主题/分区保持消息顺序,但这是真的吗?
Apache Kafka®最著名的口号之一是“保持每个主题分区的消息顺序”,但这是否总是正确的呢?在这篇博文中,我们将分析接受这个口号而不怀疑的话,可能会出现意料之外的、错误的消息顺序的一些实际场景。
基本情节:一个单一的制作人
让我们从基本的场景开始。一个单独的生产者依次向拥有单个分区的Apache Kafka主题发送消息。
在这种基本情况下,根据众所周知的口头禅,我们应该总是期待正确的顺序。但这是真的吗?嗯…这取决于情况!
网络不平等
在理想的世界中,单一生产者的情况应该始终是正确的顺序。然而,我们的世界并不完美!不同的网络路径、错误和延迟意味着消息可能会延迟或丢失。
让我们想象以下这种情况:一个唯一的生产者向一个话题发送了三条消息。
-
- メッセージ 1 は、何らかの理由で Apache Kafka への長いネットワークルートを見つける。
-
- メッセージ 2 は Apache Kafka への最短のネットワークルートを見つける。
- メッセージ 3 はネットワークで迷子になる。
即使是这种基本的情节,只要有一个制片人,就可以获得关于话题的一系列意外信息。
Kafka主题的最终结果只保存了两个事件,并以意外的顺序2和1进行显示。
考えてみると、从Apache Kafka的角度来看,这是正确的顺序。主题只不过是信息日志而已,Apache Kafka会在察觉到新事件到达时,根据消息的时间戳将其写入日志。这是基于Kafka的摄入时间,而不是消息创建的时间(事件时间)。
重试。
然而,并不是一切都已经失去!通过研究生产库(aiokafka就是一个例子),可以找到确保消息正确传递的方法。
首先,可以定义适当的批准机制来避免上述情景中的第3个问题。使用`acks`生产者参数,可以定义从Apache Kafka中希望接收的确认类型。
当将此参数设置为1时,可以确保从负责主副本的代理服务器接收到确认响应。当设置为all时,只有在主副本和副本都正确保存消息时才能接收到确认响应,从而避免了在主服务器接收消息并传播到副本之前发生失败的问题。
在设置了适当的ack之后,如果没有收到适当的确认响应,则设置重新尝试发送消息。与其他库(例如kafka-python)不同的是,aiokafka会自动重新尝试发送消息直到超出超时时间(通过request_timeout_ms参数进行设置)。
如果使用了 acknowledgment 和自动重试,应该能够解决消息 3 的问题。因此,在 retry_backoff_ms 的间隔后,生产者会再次发送消息 3。
所以,在 retry_backoff_ms 的时间间隔之后,将重新发送消息3。
飞行过程中的最大请求数量
然而,当仔细查看Apache Kafka主题的最终结果时,可以发现结果的顺序并不正确。
古老的方法(可在kafka-python中使用)是设置每个连接的最大飞行中请求数。如果可以同时发送的消息数量越多,乱序的风险就越高。
在使用kafka-python时,如果在主题中需要绝对有序性,就不得不将max_in_flight_requests_per_connection限制为1。基本上,即使将ack参数设置为最低为1,也必须在发送下一条消息之前等待所有消息(如果消息的大小小于批处理大小,则为消息的批处理)的肯定响应。
排列、确认和重试的绝对正确性将以吞吐量为代价。同时,“存在于”空中的消息量越少,就需要接收更多的acks,所以在规定的时间范围内,可以传递给Kafka的消息总量就会减少。
应该制片人
为了克服一次只能发送一个消息并等待确认响应的严格序列化问题,可以定义幂等生产者。在幂等生产者中,每个消息都会被标记为生产者ID和序列号(在每个分区中保持的顺序)。这个组合的ID将与消息一起发送到代理。
经纪人会按照生产者和主题/分区对通行编号进行管理。每当有新消息到达时,经纪人会检查已配置的ID,并且如果该值在同一生产者中等于前一个编号加1,则会批准新消息,否则将拒绝。这样可以确保消息的全局顺序,并且可以增加每个连接的在途请求数量(在Java客户端中最多为5)。
由多位制作人导致的复杂性增加。
到目前为止,我们只想象了一个人的基本情节,但Apache Kafka的现实情况是,通常会有多个制片人。要确保最终订单结果,我们应该注意哪些细节呢?
不同的位置,不同的延遲
请注意,网络并不是完全公平的。而且,由于多个生产者可能位于非常遥远的地方,不同的延迟意味着Kafka的排序可能与事件时间不同。
很遗憾地,无法在地球上的不同地方之间修正不同的延迟,所以需要接受这种情况。
批处理、附加变量
为了实现更高的吞吐量, 有时候我们希望将消息进行批处理。在批处理中, 我们将消息分组发送, 最小化总体调用次数, 增加对整体消息大小的有效载荷比例。然而, 这样一来可能会重新改变事件的顺序。Apache Kafka的消息会根据批量接收时间被保存在各个批次中。因此, 消息顺序会在每个批次中恢复正常, 但不同批次中的消息可能存在顺序不同的情况。
当存在不同的延迟和批处理时,全局排序的前提似乎完全丧失了…。那么,为什么我们要声称可以按顺序管理事件呢?
救世主:活动时间
我已经理解了Kafka最初的假设是保持消息顺序并不是百分之百正确的。消息的顺序取决于Kafka的接收时间,而不是事件发生的时间。但是,如果基于事件时间进行排序是重要的,那该怎么办呢?
我們的生產方無法解決這個問題,但消費者方可以解決。Kafka Streams、搭載專用Timestamp extractor和Single Message Transform (SMT)功能的Kafka Connect、Apache Flink®等,這些是基於Apache Kafka運行的最常見工具,它們都擁有定義要作為事件時間使用的字段的功能。
如果”消费者”被正确定义,那么可以重新排列从特定的Apache Kafka话题中发送的消息的顺序。接下来,让我们分析一下Apache Flink的实例:
CREATE TABLE CPU_IN (
hostname STRING,
cpu STRING,
usage DOUBLE,
occurred_at BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3),
WATERMARK FOR time_ltz AS time_ltz - INTERVAL '10' SECOND
)
WITH (
'connector' = 'kafka',
'properties.bootstrap.servers' = '',
'topic' = 'cpu_load_stats_real',
'value.format' = 'json',
'scan.startup.mode' = 'earliest-offset'
)
在上述的Apache Flink表定义中,注意到以下内容:
-
- フィールドは Apache Kafka のソーストピックに unix 時間で定義されている(データ型は BIGINT)。
-
- time_ltz AS TO_TIMESTAMP_LTZ(occurred_at, 3)`: unix time を Flink のタイムスタンプに変換する。
- WATERMARK FOR time_ltz AS time_ltz – INTERVAL ’10’ SECOND: (occurred_atから計算された) 新しいフィールドtime_ltz` をイベント時刻として定義し、イベントの到着が最大 10 秒遅れる閾値を定義する。
当定义了上述的表后,可以使用time_ltz字段来正确地按顺序排列事件,并确保所有在允许的延迟时间内的事件都被计算到聚合窗口中。
`INTERVAL ’10’ SECOND` 是用来定义数据流水线的延迟,并确保正确捕获延迟到达的事件所必需的。但请注意,它不会影响吞吐量。流水线可以传递尽可能多的消息,但在计算最终的KPI之前,我们会”等待10秒”以确保包含特定时间范围内的所有事件。
作为另一种方法,我们可以对于特定的键(例如,hostname和cpu),只有当事件包含完整状态时,保持迄今为止达到的事件时间的最大值,并且只有当新的事件时间大于最大值时才接受变更。
总结
在 Kafka 中,有关排序的概念即使仅包含一个具有一个分区的主题也可能会很棘手。在这篇文章中,我们分享了一些可能引发意外事件序列的常见情况。幸运的是,通过限制在飞行中的消息数或使用幂等生产者,可以实现预期的排序。
当涉及到多个生产者和网络延迟的不可预测性时,可行的选项是通过在消费者端适当处理指定的事件时间,以修正整体顺序。这需要使用有效载荷进行操作。
参考资料
-
- Kafka Streamsのイベント時間
- Kafka ConnectのタイムスタンプルーターSMTをチェック