尝试使用Apache Kafka

阿帕奇卡夫卡

由于可能需要使用Apache Kafka,因此我进行了操作确认。
Kafka是一个分布式消息传递平台,用于交换消息的平台。

与现有的消息传递服务的区别

以下是与现有的JMS等消息传递系统不同的几个方面:
* 基于分布式处理,具有高可靠性和可扩展性。
* 可以保留消息历史记录,并具有存储的功能。
* 可以进行流处理。对于输入消息,可以进行各种实时处理。

安装

进行测试安装非常容易。
这次我们在AWS上的Amazon Linux上进行操作。

安装Java

因为环境是全新的,所以我们从安装Java开始。

su -
yum install java-1.8.0-openjdk-devel

下载和解压 Kafka

在写这篇文章的时候,1.1.0是最新版本。

wget http://ftp.tsukuba.wide.ad.jp/software/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
tar -zxf  kafka_2.11-1.1.0.tgz
cd kafka_2.11-1.1.0

准备工作已经完成。

开始 Kafka

Zookeeper是一款用于构建分布式处理系统的中间件。
尽管Kafka似乎可在无Zookeeper情况下运行,但由于稍后需要进行多个代理节点的测试,我们会使用它。

启动动物园管理员。

bin/zookeeper-server-start.sh config/zookeeper.properties
[2018-06-20 02:06:32,944] INFO Reading configuration from: config/zookeeper.properties 
(org.apache.zookeeper.server.quorum.QuorumPeerConfig)

启动经纪人

bin/kafka-server-start.sh config/server.properties
[2018-06-20 02:08:00,771] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-06-20 02:08:01,131] INFO starting (kafka.server.KafkaServer)

这样一来,服务就可以被使用了。

通信测试

创建主题

我将创建一个名为“test”的主题作为试验。
主题就像消息的地址,我们可以通过主题来发送和接收消息。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

确认创建的主题

bin/kafka-topics.sh --list --zookeeper localhost:2181
test

发送消息到测试主题

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>hi this is from aws
>I'll send message

确认消费者的留言

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
hi this is from aws
I'll send message

能够获取存储在中的现有消息真是令人耳目一新!!

我已经创建了一个主题,并确认了消息订阅和消费者检查消息的一系列流程。

多经纪人测试

我想测试一下卡夫卡的一个特征——多个代理。

多经纪商设定

请将配置文件复制给新经纪人。
由于此次需要构建3个设备,因此需要再复制两个文件。

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

编辑文件

每个人都可以更改经纪人ID、监听端口和日志文件的设置。

編輯config/server-1.properties文件

vi config/server-1.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9093
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-1

编辑 config/server-2.properties

vi config/server-2.properties
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9094
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs-2

启动经纪人

启动了最初设置以及新增的三个代理商。

bin/kafka-server-start.sh config/server.properties &
bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

创建一个复制因子为3的新主题

创建一个涵盖三个经纪人的话题。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

确认群集状态

获取新创建的主题信息。

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 0       Replicas: 2,1,0 Isr: 1,0,2

最初的行是关于所有分区的摘要,而后的每一行都展示了关于分区的信息。
* “Leader” 是负责该分区的所有读写操作的节点。在这次例子中,节点0被指定为Leader。
* “Replicas” 是复制该分区日志的节点列表。
* “Isr” 是指“同步副本”的集合。

与现有的话题进行比较

最初创建的单一代理主题中,所有的节点都由节点0负责。

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

向聚类过的主题发送消息。

发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
>this is clusterd Topic!
>next message

以下为中文翻译版本:

请从消费者处确认。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
this is clusterd Topic!
next message

容错性验证

我們試著關掉主機,節點0。

ps -ef | grep server 
ec2-user  9072  2984  2 03:15 pts/0 kafka.Kafka config/server.properties
kill -9 9072

动物园管理员的日志

[2018-06-20 03:20:51,000] INFO Processed session termination for sessionid: 0x1641b2fd2520000 (org.apache.zookeeper.server.PrepRequestProcessor)

[1]   Killed                  bin/kafka-server-start.sh config/server.properties

再次确认集群

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic       PartitionCount:1        ReplicationFactor:3     Configs:
        Topic: my-replicated-topic      Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 1,2

可以确认由于主节点0的故障,节点2自动成为了领导者。