分散メッセージキューkafkaそのものについてはここの解説が詳しい。
あとはこれとか。
AWSのAmazonMSKのようなフルマネージドが楽そうだが、まずはWindows上でどんな感じか試してみる。
環境は少し古いWindows Server 2012
node.jsは16.x

kafkaインストール

こことかここを見ながら。

    • とりあえずhttps://kafka.apache.org/downloadsからバイナリダウンロード

 

    • 7zipで解凍して適当なフォルダに配置

 

    • zookeeperは組み込みを利用するので、bin\windows\zookeeper-server-start.bat config\zookeeper.propertiesで起動

 

    別のプロンプトを立ち上げてbin\windows\kafka-server-start.bat config/server.properties

起動してみるとjava.nio.file.AccessDeniedExceptionのエラー
config下にあるlog.dirs関係を全てlog.dirs=G:\tmp\kraft-combined-logsのようにWindows形式のパスに変更すると
java.nio.file.InvalidPathException: Illegal char < > at index 2: G: mpkafka-logs\meta.properties.tmp
となったので、\を\でエスケープしてみるがやはりAccessDeniedExceptionエラー。
フォルダの権限の問題でもない。
調べてみると単純に2.12-3.0.0の問題のようなので古いバージョンのkafka_2.12-2.8.1に変更すると設定を変更しなくても動いた。

さらに別のコンソールを開いてトピックを作成して確認
topc名(test)が出てくるはず。
partitionsはとりあえず1で試せばよいが、consumerが複数の場合はそれ以上のpartitionsが存在している必要あり。

bin\windows\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
bin\windows\kafka-topics.bat --list --zookeeper localhost:2181

なお、削除は
bin\windows\kafka-topics.bat –delete –zookeeper localhost:2181 –topic test

メッセージ送受信(kafka-node)

ダウンロード数を見るとkafkajsやnode-rdkafkaよりkafuka-nodの方が優勢で、
サンプルも多そうなのでkafka-nodeを利用する。

こことかでnode.jsからのアクセス方法が書いてある。

npm install kafka-node

サンプルとは少し変えて、expressでGETで投げたものを適当に直接kafkaに投げてみる

'use strict';
const kafka = require("kafka-node");
const express = require('express');
const app = express();
const port = 3000;
const Producer = kafka.HighLevelProducer;
const Consumer = kafka.Consumer;
const client = new kafka.KafkaClient({
    kafkaHost: "localhost:9092"
});
const producer = new Producer(client, {
    // Partitioner type (default = 0, random = 1, cyclic = 2, keyed = 3, custom = 4), default 0
    partitionerType: 1
});
const consumer = new Consumer(
    client,
    [{topic: "test", partision:0}],
    {
        groupId: "my-consumer",
        autoCommit: true,
        fromOffset: true
    }
);

app.get("/", function(req, res, next){
    const name = req.query.name;
    const age = req.query.age;
    const message = [
        {
            topic: "test",
            messages: JSON.stringify({name: name, age: age})
        }
    ];
    producer.send(message, (err, data) => {
        if (err) console.log(err);
        else console.log('send messages');
        //process.exit();
    });
    res.status(200).send("SEND"); // テストなので常に成功扱い
});

// 受信する場合
consumer.on("message", (message, err) => {
    console.log(message);
});

app.listen(port, () => console.log(`Example app listening on port ${port}!`));

送信出来ているかどうかをコマンドでも確認してみると、飛んできている事がわかる。
–from-beginningを指定すると最初から読み込む。

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

受信にはいくつかパターンがあるようで、ここ見ると
new kafka.ConsumerGroupStreamを使うのが一番高機能っぽい。
詳細は公式

SQLを使ってkafkaをコントロールするKSQLのようなものもあるがMSKは対応していなさそう。

マルチConsumer化

起動時に
Error while executing topic command : Replication factor: 1 larger than available brokers: 0.
というようなエラーが出て立ち上がらない事があったが、tmpを全て削除して、アプリも終了させてから起動させると立ち上がった。

tmp等を削除してから再起動して
bin\windows\kafka-topics.bat –create –zookeeper localhost:2181 –replication-factor 1 –partitions 2 –topic test
としてpartitionを複数で立ち上げる。

Consumerは先のnode.jsのものからポートを3001に変更したものを作成。
partitionを指定しているため、[{topic: “test”, partition:1}],のようにして別のものを指定して起動。
ブラウザから登録してみると別々の受信をしている事が分かる。

とはいえ、これでは自動でpartitionが割り振られないので使いにくい。
ConsumerGroupStreamを使ってグループで割り振られるようにする。
2つ用意するが、ポートの違いのみ。

'use strict';
const kafka = require("kafka-node");
const express = require('express');
const app = express();
const port = 3000;
const Producer = kafka.HighLevelProducer;

const producer = new Producer(client, {
    partitionerType: 1
});

const consumer = new kafka.ConsumerGroupStream({
    kafkaHost: "localhost:9092",
    groupId: 'my-consumer',
    autoCommit: true,
    fromOffset: 'earliest'
  }, 'test')

app.get("/send", function(req, res, next){
    const name = req.query.name;
    const age = req.query.age;
    const message = [
        {
            topic: "test",
            messages: JSON.stringify({name: name, age: age})
        }
    ];
    producer.send(message, (err, data) => {
        if (err) {console.log(err); res.status(500);}
        else console.log('send messages');
        //process.exit();
    });
    res.status(200).send("SEND");
});

// messageではなくdataになる
consumer.on("data", (message, err) => {
    console.log(message);
});

app.listen(port, () => console.log(`Example app listening on port ${port}!`));

これで実験すると各々が異なるデータを受信出来たので概ねOKぽい。

bannerAds