我写了一份关于卡夫卡的笔记
卡夫卡是什么?它有什么特点?
-
- 分散型イベント駆動型プラットフォーム
-
- PUB/SUB(Producer/Consumer) でイベント駆動開発ができる
-
- message順番を守ることができる1
message保持方法を必要に応じて設定できる2
時間が経過すると、自動削除
logデータが大きくなると、自動削除
データの上書きで最新のデータのみを保持
永遠に保持し、time travelもできる
Consumer側でどこまでデータをconsumeしたのかを管理できる3
自動
手動
データのlossをコントロールできる4
transactionサポート5
atomicity, consistencyを実現するため、データを大きなjsonに入れて、PUBする方法もある
使用kafka进行系统级事件驱动开发的概念
就像Kafka的标志所示,各种服务可以通过Kafka实现实时信息的协同传输。

系统设置时需要注意的事项
-
- messageの順番を守るべきか
-
- ある程度のデータlossは許すべきか
-
- 重複データをconsumeするときに、処理を如何するか
-
- schema lessなので、topicやデータ構造のドキュメントを書くべき
-
- データのバージョン管理
データ自体にバージョンを入れるか、topicでバージョン管理するかを決める必要がある
使用Docker创建Kafka服务器
使用wurstmeister/kafka-docker
docker-compose -f docker-compose-single-broker.yml up
使用Rails + Karafka来尝试消息交换。
- rails appを作成
rails new karafka_example
- Gemfileに以下のgemを追加し、bundle installする
gem 'karafka'
- Karafkaの設定を作成
bundle exec karafka install
将下面的文件创建
app/consumers/application_consumer.rb
app/responders/application_responder.rb
karafka.rb
- karafka.rbを編集
ENV['RAILS_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] = ENV['RAILS_ENV']
require ::File.expand_path('../config/environment', __FILE__)
Rails.application.eager_load!
class KarafkaApp < Karafka::App
setup do |config|
config.kafka.seed_brokers = %w[kafka://127.0.0.1:9092]
config.client_id = 'example_app'
config.backend = :inline
config.batch_fetching = true
config.logger = Rails.logger
end
Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)
consumer_groups.draw do
consumer_group :bigger_group do
batch_fetching false
topic :users do
consumer UsersConsumer
end
end
end
end
KarafkaApp.boot!
- consumerを作成する
app/consumers/users_consumers.rb 的中文翻译如下:
应用程序/消费者/用户消费者.rb
class UsersConsumer < ApplicationConsumer
def consume
Karafka.logger.info "New [User] event: #{params}"
end
end
- responderを作成する
应用/回应者/用户回应者.rb
class UsersResponder < ApplicationResponder
topic :users
def respond(event_payload)
respond_to :users, event_payload
end
end
- karafka server起動
bundle exec karafka server
- rails consoleでデータ作成を試す
UsersResponder.call({ event_name: "user_created", payload: { id: 1 } }
相关资料
消息顺序
删除日志
提交偏移量
消息保证
事务
删除日志
提交偏移量
消息保证
事务