我使用了Apache Kafka
今天是MDC圣诞日历2019年的第15天。
卡夫卡是什么?
LinkedIn创建的分布式消息传递系统(消息队列),具有高吞吐量和可扩展性。它使用Java(Scala)编写。
由生产者、代理人和消费者三个组件组成。
将来自生产者的数据流转发给消费者。为了应对故障,还进行数据持久化。同时实现了投递保证。
– 生产者:分发消息
– 代理人:将消息从生产者交给消费者
– 消费者:接收消息
※以下的参考资料可以对细致的机制和架构等提供帮助:
・Apache Kafka的概述和架构
・Apache Kafka入门指南
用于什么目的? yú ?)
以下是可能的用例:
-
- 为了防止系统过于分散,将数据集线器作为架构的一部分进行整合(通过微服务等方式)。
-
- 与Fluentd等进行协作,用于日志收集。
-
- 收集网站用户的页面浏览等数据,用于Web活动分析。
-
- 汇总IoT设备传感器的数值,并用于可视化、分析以及其他设备的控制等。
- 适用于大数据、机器学习等领域。
以下是具体的方面:
– LINE的大规模数据管道
– 雅虎实时搜索
– 大型保健IT公司Cerner公司使用Kafka的案例
试着动一下
我会制作一个简单的样本并进行测试。
这次我们使用Kafka-docker进行Kafka本体的环境搭建,使用kafka-python作为Producer和Consumer的客户端。
Kafka-docker的安装与启动
按照公式的步骤,下载kafka-docker并在docker-compose.yml中的KAFKA_ADVERTISED_HOST_NAME选项中填写Docker主机的IP地址后,
docker-compose up -d
只要你这样做就可以了。
参考资料:Kafka在Docker上的教程
实现Producer
实际上,我本来想使用类似Twitter流API的数据,或者获取IoT传感器的值。但是由于这次没有准备,我只能随便找一个能获取数值的东西,因此我编写了一个脚本,用于获取鼠标的x坐标,并将其每秒发送到Kafka。

from kafka import KafkaProducer
import pyautogui
import time
def main():
producer = KafkaProducer(bootstrap_servers='{Docker HostのIPアドレス}:{Port}')
while True:
result = producer.send('test', str(pyautogui.position().x).encode()).get(timeout=60)
print(result)
time.sleep(1)
if __name__ == '__main__':
main()
实现Consumer
接下来是消费者端的实现。同样,需要指定Kafka的IP地址和端口。
使用”for message in consumer:”这个语句可以实现逐个从Kafka中拉取数据的功能。
from kafka import KafkaConsumer
def main():
consumer = KafkaConsumer(
'test',
bootstrap_servers=['{Docker HostのIPアドレス}:{Port}'])
for message in consumer:
print("x = " + message.value.decode())
if __name__ == '__main__':
main()
执行
在左边下方执行Producer,右边执行Consumer。可以确认Producer将值(鼠标的x坐标)发送到Kafka后,Consumer可以从Kafka中获取并显示该值。

将来想做的事情
使用树莓派、Arduino等设备将传感器的值集成进来。
将获取到的数据制作成图表或进行分析。