在Ubuntu 20.04上安装Apache Kafka的步骤

引言

Apache Kafka 是一款流行的分布式消息代理器,专为处理大量实时数据而设计。Kafka 集群具备高可伸缩性和容错性。与 ActiveMQ 和 RabbitMQ 等其他消息代理器相比,Kafka 的吞吐量更高。尽管通常用作发布/订阅消息系统,但许多组织也将其用于日志聚合,因为它为已发布的消息提供了持久存储。

发布/订阅消息系统允许一个或多个生产者发布消息,而不考虑消费者的数量或消息的处理方式。订阅的客户端会自动收到更新和新消息的通知。相比于客户端定期轮询以确定是否有新消息可用的系统,这种系统更加高效和可扩展。

在本教程中,您将在Ubuntu 20.04上安装并配置Apache Kafka 2.8.2。

先决条件

为了跟随,您需要:

  • An Ubuntu 20.04 server with at least 4 GB of RAM and a non-root user with sudo privileges. You can set this up by following our Initial Server Setup guide if you do not have a non-root user set up. Installations with less than 4GB of RAM may cause the Kafka service to fail.
  • OpenJDK 11 installed on your server. To install this version, follow our tutorial on How To Install Java with APT on Ubuntu 20.04. Kafka is written in Java, so it requires a JVM.

第一步 – 创建 Kafka 用户

由于Kafka可以处理网络请求,因此您的第一步是为服务创建一个专用用户。这样,一旦有人入侵Kafka服务器,将最大限度地减少对您的Ubuntu机器造成的损害。在这一步中,您将创建一个专用的kafka用户。

以您的非root sudo用户登录到您的服务器,然后创建一个名为kafka的用户。

  1. sudo adduser kafka

根据引导设置密码和创建kafka用户。

接下来,使用adduser命令将kafka用户添加到sudo组中。您需要这些特权来安装Kafka的依赖项。

  1. sudo adduser kafka sudo

您的kafka用户已准备好。使用su命令登录到kafka账户。

  1. su -l kafka

现在您已经创建了一个专用于Kafka的用户,您可以开始下载并提取Kafka二进制文件。

第二步 – 下载和解压 Kafka 二进制文件

在这一步中,您将把Kafka二进制文件下载并解压到专门的文件夹中,放置在您kafka用户的主目录下。

首先,在 /home/kafka 目录下创建一个名为“Downloads”的文件夹来存储你的下载文件。

  1. mkdir ~/Downloads

使用curl命令下载Kafka二进制文件。

  1. curl "https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz" -o ~/Downloads/kafka.tgz

创建一个名为kafka的文件夹,并切换到该文件夹。您将使用该文件夹作为Kafka安装的基础目录。

  1. mkdir ~/kafka && cd ~/kafka

使用tar命令解压您下载的存档文件。

  1. tar -xvzf ~/Downloads/kafka.tgz --strip 1

为确保将存档的内容解压缩到~/kafka/而不是其中的另一个目录(例如~/kafka/kafka_2.13-2.8.2/),您需要指定–strip 1标志。

现在你已经成功下载并解压了二进制文件,可以开始配置你的Kafka服务器了。

步骤3 — 配置Kafka服务器

Kafka主题是消息可以发布到的类别、组或数据源名称。然而,Kafka的默认行为不允许你删除一个主题。为了修改这一点,在这个步骤中,你必须编辑配置文件。

打开 server.properties 文件,可以使用 nano 或你最喜欢的编辑器来指定 Kafka 的配置选项。

  1. nano ~/kafka/config/server.properties

首先,添加一个设置,使您能够删除Kafka主题。在文件底部添加以下行:

~/kafka/config/server.properties
delete.topic.enable = true

其次,您可以通过修改log.dirs属性来更改Kafka日志存储的目录。找到log.dirs属性,并将现有的路径替换为突出显示的路径。

/kafka/config/server.properties 的路径
log.dirs=/home/kafka/logs

保存并关闭文件。

既然您已经配置好了Kafka,您可以创建systemd单元文件来运行并在启动时启用Kafka服务器。

第四步 – 创建systemd单位文件并启动Kafka服务器。

在这一部分中,您将为Kafka服务创建systemd单元文件。这些文件将帮助您执行常见的服务操作,例如以与其他Linux服务一致的方式启动、停止和重新启动Kafka。

卡夫卡使用Zookeeper来管理其集群状态和配置。它在许多分布式系统中使用,在官方的Zookeeper文档中可以详细了解这个工具。你将使用这些单元文件将Zookeeper作为一个服务。

为zookeeper创建单位文件。

  1. sudo nano /etc/systemd/system/zookeeper.service

将以下单位定义输入文件中。

/etc/systemd/system/zookeeper.service
[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

在这个[Unit]部分中说明了在Zookeeper启动之前,需要网络和文件系统准备就绪。

【服务】部分规定了systemd应该使用zookeeper-server-start.sh和zookeeper-server-stop.sh shell文件来启动和停止服务。它还规定了如果Zookeeper异常退出,应重新启动。

添加完此内容后,请保存并关闭文件。

接下来,为kafka创建systemd服务文件。

  1. sudo nano /etc/systemd/system/kafka.service

将以下的单位定义输入到文件中:

/etc/systemd/system/kafka.service
[Unit]
Requires=zookeeper.service
After=zookeeper.service

[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal

[Install]
WantedBy=multi-user.target

[Unit] 部分指定了该单元文件依赖于 zookeeper.service,这将确保在 kafka 服务启动时自动启动 zookeeper。

[服务] 部分指定 systemd 应使用 kafka-server-start.sh 和 kafka-server-stop.sh 脚本文件来启动和停止服务。它还指定如果 Kafka 异常退出,应重新启动。

保存并关闭文件。

现在你已经定义了单位,使用以下命令启动Kafka:

  1. sudo systemctl start kafka

为确保服务器成功启动,检查kafka单元的日志记录。

  1. sudo systemctl status kafka

你将收到类似这样的输出。

Output
● kafka.service Loaded: loaded (/etc/systemd/system/kafka.service; disabled; vendor preset> Active: active (running) since Wed 2023-02-01 23:44:12 UTC; 4s ago Main PID: 17770 (sh) Tasks: 69 (limit: 4677) Memory: 321.9M CGroup: /system.slice/kafka.service ├─17770 /bin/sh -c /home/kafka/kafka/bin/kafka-server-start.sh /ho> └─17793 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMill>

现在你有一个 Kafka 服务器正在监听9092端口,这是Kafka服务器使用的默认端口。

你已经启动了Kafka服务。但是如果重新启动服务器,Kafka将不会自动重新启动。为了在服务器启动时启用Kafka服务,请运行以下命令:

  1. sudo systemctl enable zookeeper

您将收到一条响应,说明已创建了一个符号链接。

Output
Created symlink /etc/systemd/system/multi-user.target.wants/zookeeper.service → /etc/systemd/system/zookeeper.service.

然后运行这个命令:

  1. sudo systemctl enable kafka

您将收到一条响应,表示已创建了一个符号链接。

Output
Created symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /etc/systemd/system/kafka.service.

在这一步中,您开始并启用了Kafka和Zookeeper服务。在下一步中,您将检查Kafka的安装。

第五步 — 测试 Kafka 安装

在这个步骤中,你将测试你的Kafka安装。你将发布和消费一个Hello World消息,以确保Kafka服务器的运行符合预期。

在Kafka中发布消息需要:

  • A producer, who enables the publication of records and data to topics.
  • A consumer, who reads messages and data from topics.

首先,创建一个名为 “教程主题”(TutorialTopic)的主题。

  1. ~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

您可以使用kafka-console-producer.sh脚本从命令行创建一个生产者。它需要Kafka服务器的主机名、端口和主题作为参数。

你将收到一条回复,告诉你这个主题已经被创建了。

Output
Created topic TutorialTopic.

现在将字符串“Hello, World”发布到TutorialTopic主题上。

  1. echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

接下来,使用kafka-console-consumer.sh脚本创建一个Kafka消费者。它需要ZooKeeper服务器的主机名和端口,以及一个主题名作为参数。以下命令会从TutorialTopic中消费消息。请注意使用了–from-beginning标志,它允许消费在消费者启动之前已发布的消息。

  1. ~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

如果没有配置问题,你将在终端中收到一个“Hello, World”的回应。

Output
Hello, World

脚本将继续运行,等待发布更多的消息。为了测试这一点,请打开一个新的终端窗口并登录到您的服务器。记得以您的kafka用户身份登录。

  1. su -l kafka

在这个新的终端中,启动生产者来发布第二条消息。

  1. echo "Hello World from Sammy at Silicon Cloud!" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

此消息将在您的原始终端中加载到消费者的输出。

Output
Hello, World Hello World from Sammy at Silicon Cloud!

当你测试完成后,在原始终端窗口中按下CTRL+C来停止消费者脚本。

您已经在Ubuntu 20.04上安装并配置了一个Kafka服务器。在接下来的步骤中,您将执行一些快速任务来加强您的Kafka服务器的安全性。

第六步 – 加固Kafka服务器

在安装完成后,您可以取消kafka用户的管理员权限并加固Kafka服务器。

在执行操作之前,请注销并以任何非根sudo用户重新登录。如果您仍在运行开始此教程的相同shell会话,请输入”exit”。

从sudo组中删除kafka用户。

  1. sudo deluser kafka sudo

为了进一步提高Kafka服务器的安全性,使用passwd命令锁定kafka用户的密码。这个操作可以确保没有人可以直接使用该账户登录服务器。

  1. sudo passwd kafka -l

“使用-l选项可以锁定更改密码(passwd)的命令。”

在这个时刻,只有root用户或者具有sudo权限的用户可以使用以下命令登录kafka:

  1. sudo su - kafka

将来,如果你想解锁更改密码的能力,请使用具有-u选项的passwd命令。

  1. sudo passwd kafka -u

现在,您已成功限制了Kafka用户的管理权限。您可以开始使用Kafka了。如果需要,您可以选择继续下一步,将KafkaT添加到您的系统中。

第七步 — 安装 KafkaT(可选)

KafkaT是为了提升您在命令行中查看Kafka集群详情和执行某些管理任务的能力而开发的。由于它是一个Ruby gem,您需要安装Ruby来使用它。您还需要build-essential软件包来构建KafkaT所依赖的其他gems。

使用apt安装Ruby和build-essential包。

  1. sudo apt install ruby ruby-dev build-essential

现在您可以使用gem命令安装KafkaT。

  1. sudo CFLAGS=-Wno-error=format-overflow gem install kafkat

在kafkat的安装过程中,需要使用Wno-error=format-overflow编译标志来抑制Zookeeper的警告和错误。

安装完成后,您将接收到一个提示,表示完成了。

Output
... Done installing documentation for json, colored, retryable, highline, trollop, zookeeper, zk, kafkat after 3 seconds 8 gems installed

KafkaT使用.kafkatcfg作为配置文件来确定您的Kafka服务器的安装和日志目录。它还应该有一个指向您的ZooKeeper实例的入口。

创建一个名为.kafkatcfg的新文件。

  1. nano ~/.kafkatcfg

请添加以下行来指定您的Kafka服务器和Zookeeper实例所需的信息:

~/.kafkatcfg
{
  "kafka_path": "~/kafka",
  "log_path": "/home/kafka/logs",
  "zk_path": "localhost:2181"
}

保存并关闭文件。现在,您可以开始使用KafkaT了。

要查看有关所有Kafka分区的详细信息,请尝试运行以下命令:

  1. kafkat partitions

您将收到以下的输出结果:

Output
[DEPRECATION] The trollop gem has been renamed to optimist and will no longer be supported. Please switch to optimist as soon as possible. /var/lib/gems/2.7.0/gems/json-1.8.6/lib/json/common.rb:155: warning: Using the last argument as keyword parameters is deprecated ... Topic Partition Leader Replicas ISRs TutorialTopic 0 0 [0] [0] __consumer_offsets 0 0 [0] [0] ... ...

输出将包括TutorialTopic和__consumer_offsets,__consumer_offsets是Kafka用于存储与客户端相关信息的内部主题。您可以安全地忽略以__consumer_offsets开头的行。

要了解更多关于KafkaT的信息,请参考它的GitHub资料库。

结论

现在你的Ubuntu服务器上安全地运行着Apache Kafka。你可以使用Kafka客户端将Kafka集成到你喜欢的编程语言中。

想要了解更多关于Kafka的知识,你也可以参考它的文档。

发表回复 0

Your email address will not be published. Required fields are marked *