Apache Kafka的推荐配置和性能估算方法
日期:2018年10月12日
作者:伊藤雅博,日立制作株式会社
首先
这篇帖子介绍了在2017年オープンソースカンファレンス.Enterprise中所做的演讲“目标!成为Kafka专家-如何在Apache Kafka中达到最佳性能”的研究内容(总共8次发表)。本帖子的内容基于Kafka 0.11.0版本发布于2017年6月的情况。
本次是第三次的会议,我们将介绍Kafka的推荐系统配置以及网络和磁盘性能的估算方法。
投稿一覧:
1. Apache Kafka概览与架构
2. Apache Kafka Producer/Broker/Consumer机制与配置列表
3. Apache Kafka推荐配置与性能估算方法(本投稿)
4. Apache Kafka性能验证(1):验证环境与参数调优内容
5. Apache Kafka性能验证(2):Producer调优结果
6. Apache Kafka性能验证(3):Broker调优结果
7. Apache Kafka性能验证(4):Producer再调优及Consumer调优结果
8. Apache Kafka性能验证(5):系统整体延迟相关
硬件配置
以下是关于Broker节点和ZooKeeper节点推荐的硬件规格说明。值得注意的是,Producer和Consumer是用于发送和接收的库,所需规格取决于使用库的用户应用程序,因此在这里不做说明。
经纪人
以下是Broker使用节点的推荐硬件规格。
台数:4台以上
データの耐久性を考慮するとReplicaを最低3個は確保したい。Brokerノードが4台以上あれば、1台が故障しても、そのBroker上のReplicaは別のBroker上に再構築されるため、3個のReplicaを維持できる。
CPUコア数:20コア以上
Brokerでは多くのスレッドが実行される。特にリクエスト送受信(デフォルト設定で3スレッド)、リクエスト処理(デフォルト設定で8スレッド)、複製処理(デフォルト設定で1スレッド)は比較的使用率が高い。その他にもバックグラウンド処理用のスレッドが複数存在する。そのため20コア以上を推奨する。
メモリ容量:24GB+キャッシュしたいデータ量
Kafkaでは書き込んだメッセージをすぐに読み出すことが多いため、ページキャッシュを活用することで高速なデータ読み出しが可能となる。例えば1,000MB/秒で書き込まれる直近60秒間のデータをキャッシュしたい場合、必要なメモリ量は1,000MB * 60秒 ≒ 60GBと見積もることができる。これにBrokerに割り当てたJavaヒープメモリ(6-16GB程度)と、OS動作用に残しておくメモリ(4-8GB程度)を合計したものが、必要なメモリ容量となる。
ディスク台数:10台以上
Kafkaではメッセージをディスクに保存するため、ディスク台数が多いほどBrokerの書き込み/読み出しスループットが向上する。OS用に2台でRAID1構成、残りをJBOD(または1台のRAID0)でKafka用に割り当てる。Kafka用のディスクはSASなどのシーケンシャルアクセス性能が高いものがよい。
ディスク容量:保存したいデータ量に依存
Kafkaはデータを指定した期間ディスクに保存するため、それに合わせた容量が必要となる。例えば1日1TBのデータを受信し、3個に複製する場合、1日当たり3TBのディスク容量を消費する。このとき過去7日間のデータを保存しておく場合は21TBの容量が必要となる。これに加えてメタデータなどのファイル用に最低10%以上の容量が必要となる。
ネットワーク回線:10Gbps回線
必要な回線速度はデータの流量に依存するが、最低でも10 Gbps回線の使用を推奨する。さらに帯域が必要な場合は、複数NICによるボンディングや、40Gbps回線の使用を検討するとよい。
动物园管理员
以下是ZooKeeper使用的节点推荐硬件规格。
台数:3台以上(奇数台)
3台構成だと1台の故障に耐えられ、5台構成だと2台までの故障に耐えられる。
CPUコア数:2コア以上
OSとZooKeeperの処理用にそれぞれ1コアずつあるとよい。
メモリ容量:8GB以上
ZooKeeperに3-5GB程度のJavaヒープを割り当て、OS用に4GB程度のメモリを残しておく。
ディスク台数:4台以上
ディスクの用途は以下の3種類に分けられる。
OS用:2台でRAID1構成のディスクを割り当てる
トランザクションログ用:トランザクションログは同期的に書き込まれるため、他の用途とは独立したディスク(2台でRAID1構成)を割り当てる
スナップショット用:スナップショットはディスクに非同期で書き込まれるためOS用のディスクと共有でもよい
ネットワーク回線:1Gbps以上
ZooKeeperとの通信データ量は小さいため、高速な回線で接続する必要はない。
软件参数设置
操作系统的设置
以下是适用于使用RHEL 6的Kafka参数设置。
スワップ頻度(vm.swappiness):1
ディスクをメモリとして使うスワップ領域の設定です。0-100の範囲で設定し、数値が高いほど積極的にスワップを利用します。メモリ上のページを頻繁にディスクへスワップすると、SegmentファイルのディスクI/Oに影響し、ページキャッシュに割り当てられるメモリも減少します。そのためスワップ頻度を低く抑えることを推奨します。
ファイルディスクリプタ数:100,000
Brokerは大量のファイルとネットワークコネクションをオープンします。そのため、ファイルディスクリプタ数は1Brokerプロセスあたり100,000個から始めることを推奨します。なお、最小必要数は以下のように計算できます。
最小必要数= 1Brokerの接続数 + (Partition数)*(Partitionサイズ / Segmentサイズ)
ソケットバッファの最大値
Kafka側でソケットバッファのサイズを指定しても、OS側の制限によりこの最大値を超えることはできません。特にデータセンタ間で通信を行う場合はRTTが長くなるため、ソケットバッファを大きくする必要があります。
文件系统设置
为了增加Broker的磁盘I/O性能,在Linux上通常使用EXT4或XFS作为文件系统。建议使用一些挂载选项。有关选项的详细信息,请参阅官方文档。
以下是XFS/EXT4通用的挂载选项。由于XFS已经进行了自动调整,因此不需要更改默认设置以外的设置。
-
- noatime
ファイルのatime(最終アクセス時刻)属性の更新を無効にする。これにより、ファイル読み込み時に発生する書き込み処理を抑制できる。Kafkaはatime属性に依存しないため、無効にしても問題ない。
如果使用EXT4文件系统,请根据需要添加在官方文档中记录的选项,以最大限度地提高性能。
JVM的配置
由于Broker是用Scala语言编写的并在JVM上运行,因此在执行之前需要安装JDK。对于生产者和消费者,如果使用Java或Scala客户端,则同样需要JDK。出于安全考虑,建议使用最新发布版的JDK 1.8。
另外,LinkedIn作为Kafka的发布方在使用以下的Java选项。
-Xmx6g
-Xms6g
-XX:MetaspaceSize = 96m
-XX:+UseG1GC
-XX:MaxGCPauseMillis = 20
-XX:InitiatingHeapOccupancyPercent = 35
-XX:G1HeapRegionSize = 16M
-XX:MinMetaspaceFreeRatio = 50
-XX:MaxMetaspaceFreeRatio = 80
理论上的最大吞吐量估计
在流式数据处理系统中,性能最低的部分成为瓶颈,决定了整个系统的处理性能。 Kafka中容易成为瓶颈的是节点之间的网络I/O和Broker的日志读写所涉及的磁盘I/O。因此,对于预期的数据流量,估算网络I/O性能和磁盘I/O性能是否足够重要。
在这里,我们将介绍一种估计理论最大吞吐量的方法,该方法基于磁盘性能和网络性能。需要注意的是,这仅仅是理论上的最大性能,实际上会有各种开销。关于实际性能的详细说明将在后续的性能测试中进行,但如果复制是异步的,则可以实现接近理论值的吞吐量。然而,如果进行同步复制,则吞吐量只能达到理论值的60-70%左右。
整个系统的数据流动
为了估计所需的网络I/O性能和磁盘I/O性能,首先需要解释整个系统的数据流程。本次估计的是以下系统配置示例的理论最大吞吐量。请注意,此处不考虑与ZooKeeper的通信量。

以下是在上述系统配置中数据流的示例。这里假设了一个流数据处理系统,其中生产者发送的数据被消费者组立即读取。在该系统中,最大吞吐量由最低吞吐量的瓶颈部分决定。因此,在以下数据流中,“所有生产者的总数据发送量”和“每个消费者组的数据接收量”达到平衡的最大吞吐量将成为整个系统的最大吞吐量。

我们将详细查看上述数据的流程。
1. 生产者将数据发送给经纪人
每个生产者节点将要发送的数据分成三份,分别发送到三个Broker节点。每个Broker节点将接收到的数据写入本地磁盘。
2. 经纪商之间的复制
假设复制因子为3,每个Broker节点都会复制数据,将接收到的数据发送到其他两台节点,并从其他两台节点接收数据。因此,发送和接收的数据量都是生产者接收到的数据量的两倍。此外,每个Broker都会将接收到的数据写入磁盘,只有发送的数据不在缓存中才需要从磁盘中读取。
3. 由经纪人向消费者发送数据
Broker节点根据Consumer的Fetch请求将数据发送到Consumer节点。由于有两个Consumer Group分别获取相同的数据,所以发送的数据量是从Producer接收到的数据量的两倍。此外,如果每个Broker的发送数据不在缓存中,则需要从磁盘读取。
从以上的数据流量可以看出,Kafka的复制机制会导致Broker的数据输入输出量增加。因此容易成为瓶颈的是连接到Broker的网络带宽和Broker上的磁盘的连续访问性能。
Broker节点的磁盘和网络I/O性能估算
以下是从生产者发送的数据量中计算出所需的网络I/O吞吐量和磁盘I/O吞吐量所需的经纪节点的方法。
网络输入/输出吞吐量
一般来说,网络连接通常都是全双工的,因此需要独立考虑发送吞吐量和接收吞吐量。以下是每个经纪人节点的接收吞吐量和发送吞吐量。
1Brokerの受信スループット
= Producerからの受信スループット + 他のBrokerからの受信スループット
= Producerからの受信スループット + Producerからの受信スループット * (Replication Factor - 1)
= Producerからの受信スループット * Replication Factor
= Producerの合計送信スループット / Brokerノード数 * Replication Factor
1Brokerの送信スループット
= 他のBrokerへの送信スループット + Consumer Groupへの送信スループット
= Producerからの受信スループット * (Replication Factor - 1) + Producerからの受信スループット * Consumer Group数
= Producerからの受信スループット * (Replication Factor + Consumer Group数 - 1)
= Producerの合計送信スループット / Brokerノード数 * (Replication Factor + Consumer Group数 - 1)
磁盘输入/输出吞吐量
一般来说,由于磁盘无法同时执行写入和读取操作,因此需要将磁盘I/O吞吐量考虑为写入和读取的总和。下面是每个代理节点的写入吞吐量和读取吞吐量。
磁盘写入吞吐量
以下是每个 Broker 节点的总磁盘写入吞吐量。
1Brokerの書き込みスループット
= 1Brokerの受信スループット
= Producerの合計送信スループット / Brokerノード数 * Replication Factor
磁盘读取吞吐量
以下是每个Broker节点的总磁盘读取吞吐量。这可以分为以下4种模式,取决于复制数据(以下称为复制数据)和Consumer Group的读取数据(以下称为消费数据)是否存在于Broker节点的页面缓存中。
如果复制数据和Consume数据都存在于缓存中时。
只要Broker之间的复制不延迟,并且Consumer Group可以立即读取到Broker写入的数据,由于数据都存储在缓存中,不会发生磁盘读取。
1Brokerの読み出しスループット = 0
2. 如果复制的数据不在缓存中。
如果Broker之间的复制延迟导致复制的数据被从缓存中驱逐出去,那么就需要从磁盘上读取该数据。
1Brokerの読み出しスループット
= 他のBrokerへの送信スループット
= Producerからの受信スループット * (Replication Factor - 1)
= Producerの合計送信スループット / Brokerノード数 * (Replication Factor - 1)
3. 如果缺乏缓存的消耗数据
如果由于Consumer Group的读取速度延迟而导致待读取的数据被缓存清除,那么就需要从磁盘读取该数据。
1Brokerの読み出しスループット
= Consumer Groupへの送信スループット
= Producerからの受信スループット * Consumer Group数
= Producerの合計送信スループット / Brokerノード数 * Consumer Group数
如果复制数据和消耗数据都不在缓存中的情况下。
如果复制机和消费者组之间的读取延迟,导致目标数据从缓存中被驱逐,那么需要从磁盘中读取这些数据。
1Brokerの読み出しスループット
= 1Brokerの送信スループット
= Producerの合計送信スループット / Brokerノード数 * (Replication Factor + Consumer Group数 - 1)
根据以上估计,我们可以得出结论,对于经纪人来说,内存容量也是存储数据的缓存的重要因素。一般来说,在Kafka中,经纪人会频繁读取刚刚写入的消息,因此通过利用页面缓存,可以避免磁盘读取,实现快速的数据读取。
然而,当Broker之间的数据复制或消费者组的数据读取出现延迟时,就需要从磁盘中读取页面缓存中不存在的数据。如果磁盘性能较低,那么磁盘就成为了瓶颈,整个系统的吞吐量可能会突然下降。
最后。
在本文中,我们介绍了评估Kafka网络和磁盘性能的方法以及推荐的系统配置。从下一次开始,我们将介绍实际进行Kafka性能验证时的测量结果。
第4部分:Apache Kafka性能测试(1):测试环境和参数调整的内容。