PythonでKafkaを非同期でサブスクライブする方法

PythonでのKafkaの非同期コンシュームに関する実装方法はいくつかありますが、ここでは2つの一般的な方法を紹介します。1. aiokafkaライブラリを使用した実装: aiokafkaはasyncioベースのKafkaクライアントライブラリで、Kafkaメッセージの非同期コンシュームに使用できます。以下は簡単なサンプルコードです。

import asyncio
from aiokafka import AIOKafkaConsumer
async def consume():

consumer = AIOKafkaConsumer(

 

‘topic_name’,

 

bootstrap_servers=’kafka_broker’,

 

group_id=’consumer_group_id’,

 

loop=asyncio.get_event_loop()

 

)

 

await consumer.start()

 

 

 

try:

 

async for message in consumer:

 

# 处理消息逻辑

 

print(message.value)

 

 

 

finally:

 

await consumer.stop() loop = asyncio.get_event_loop() loop.run_until_complete(consume())


confluent-kafka-pythonとasyncioライブラリを組み合わせることで、asyncioライブラリと組み合わせて、Kafkaメッセージの非同期消費を実現できます。以下に簡単なサンプルコードを示します。

import asyncio
from confluent_kafka import Consumer, KafkaException
async def consume():

 

consumer_config = {

 

‘bootstrap.servers’: ‘kafka_broker’,

 

‘group.id’: ‘consumer_group_id’,

 

‘enable.auto.commit’: True,

 

‘auto.offset.reset’: ‘earliest’

 

}

 

 

 

consumer = Consumer(consumer_config)

 

consumer.subscribe([‘topic_name’])

 

 

 

try:

 

while True:

 

msg = consumer.poll(1.0)

 

if msg is None:

 

continue

 

if msg.error():

 

if msg.error().code() == KafkaException._PARTITION_EOF:

 

continue

 

else:

 

print(‘Consumer error: {}’.format(msg.error()))

 

break

 

else:

 

# 处理消息逻辑

 

print(msg.value())

 

 

 

finally:

 

consumer.close() loop = asyncio.get_event_loop() loop.run_until_complete(consume())


どちらの方法でもKafka メッセージの非同期コンシューマが実現できますので、ご自身のアプリケーションの状況に合わせて選択してください。

bannerAds