我写了一份关于卡夫卡的笔记

卡夫卡是什么?它有什么特点?

    • 分散型イベント駆動型プラットフォーム

 

    • PUB/SUB(Producer/Consumer) でイベント駆動開発ができる

 

    • message順番を守ることができる1

message保持方法を必要に応じて設定できる2

時間が経過すると、自動削除
logデータが大きくなると、自動削除
データの上書きで最新のデータのみを保持
永遠に保持し、time travelもできる

Consumer側でどこまでデータをconsumeしたのかを管理できる3

自動
手動

データのlossをコントロールできる4

transactionサポート5

atomicity, consistencyを実現するため、データを大きなjsonに入れて、PUBする方法もある

使用kafka进行系统级事件驱动开发的概念

就像Kafka的标志所示,各种服务可以通过Kafka实现实时信息的协同传输。

kafka

系统设置时需要注意的事项

    • messageの順番を守るべきか

 

    • ある程度のデータlossは許すべきか

 

    • 重複データをconsumeするときに、処理を如何するか

 

    • schema lessなので、topicやデータ構造のドキュメントを書くべき

 

    • データのバージョン管理

データ自体にバージョンを入れるか、topicでバージョン管理するかを決める必要がある

使用Docker创建Kafka服务器

使用wurstmeister/kafka-docker

docker-compose -f docker-compose-single-broker.yml up

使用Rails + Karafka来尝试消息交换。

    rails appを作成
rails new karafka_example
    Gemfileに以下のgemを追加し、bundle installする
gem 'karafka'
    Karafkaの設定を作成
bundle exec karafka install

将下面的文件创建

app/consumers/application_consumer.rb
app/responders/application_responder.rb
karafka.rb
    karafka.rbを編集
ENV['RAILS_ENV'] ||= 'development'
ENV['KARAFKA_ENV'] = ENV['RAILS_ENV']
require ::File.expand_path('../config/environment', __FILE__)
Rails.application.eager_load!

class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka.seed_brokers = %w[kafka://127.0.0.1:9092]
    config.client_id = 'example_app'
    config.backend = :inline
    config.batch_fetching = true
    config.logger = Rails.logger
  end

  Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)

  consumer_groups.draw do
    consumer_group :bigger_group do
      batch_fetching false

      topic :users do
        consumer UsersConsumer
      end
    end
  end
end

KarafkaApp.boot!
    consumerを作成する

app/consumers/users_consumers.rb 的中文翻译如下:
应用程序/消费者/用户消费者.rb

class UsersConsumer < ApplicationConsumer
  def consume
    Karafka.logger.info "New [User] event: #{params}"
  end
end
    responderを作成する

应用/回应者/用户回应者.rb

class UsersResponder < ApplicationResponder
  topic :users

  def respond(event_payload)
    respond_to :users, event_payload
  end
end
    karafka server起動
bundle exec karafka server
    rails consoleでデータ作成を試す
UsersResponder.call({ event_name: "user_created", payload: { id: 1 } }

相关资料

消息顺序
删除日志
提交偏移量
消息保证
事务
bannerAds