はじめに

最近kafkaをspringbootで使おうと思って、いろいろと調べてました。
springKafkaは知名度に反して日本語サイトで参考になるサイトが少なかったので、自分用の忘備録として残しておこうと思います。
この記事は、Springbootを使ったHello World!からSpring kafkaによるメッセージ送受信を実装していきます。
詳しいkafkaについての解説などは行いませんので、そちらが気になる方は以下の記事を参照してください。
Apache Kafkaの概要とアーキテクチャ

環境

今回の記事で前提としている環境はこちら

    • VScode 1.38.1

 

    • java version “1.8.0_221”

 

    windows 10

もしまだVSCodeやJavaをインストールしていない場合は色んなサイトでインストール方法を紹介しているのでそちらを参照しながらやると良いと思います.今回はWindowsで話を進めますが,MacやLinuxでもおそらく変わらないはずです.

VSCode

    • Ubuntu版

 

    • UbuntuにVisual Studio Codeをインストールしてみた

 

    • Windows版

 

    • Visual Studio Code (Windows版) のインストール

 

    • Mac版

 

    Macにコードエディタ「VSCode (Visual Studio Code) インストール・日本語化する手順

JDK

    • だいたいこれでできる

 

    • OpenJDK(Java)を最新のUbuntuにインストール

 

    • windowsはこれ

 

    初心者でも簡単にできる!Javaをインストールする方法(Windows編)

注意しなければいけないこととしては、OracleからJavaをインストールする場合はOracleにアカウントを作らなければいけないこと。(アカウント作らなくてもインストールできるの?)

VSCodeでJavaの動作確認

    • 上の記事でJavaのバージョンが確認できたら,VSCodeでJavaが開発できるように環境を整える

 

    • VSCodeでJavaの開発環境を構築する

 

    SampleでJavaコードがなにか動かせたら問題なし.
image.png

まずはSpringBootで「Hello World!」

上の設定で準備ができたら、Spring bootアプリをVSCode内で行っていきます。

VSCode内でのspringbootアプリの作り方

image.png
image.png
image.png

spring bootで「Hello World!」

さっそく「Hello World!」を出力できるようにしていきましょう。
作り始めたばかりの時は、下のようなMainクラスが作成されているはずです。

package com.kafka.sample.kafka_sample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}

image.png

そこで「Hello World!」を出力できるようにコードを書きます。
こんな感じ

package com.kafka.sample.kafka_sample;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka-sample")
public class Controller {

    @GetMapping
    public String viewHelloWorld() {
        return "Hello world!";
    }
}

では、動かしてみます。
VSCode内でF5を押します。
初めて起動するとき、アプリの構成を示したJsonファイルが表示されます。

{
    // IntelliSense を使用して利用可能な属性を学べます。
    // 既存の属性の説明をホバーして表示します。
    // 詳細情報は次を確認してください: https://go.microsoft.com/fwlink/?linkid=830387
    "version": "0.2.0",
    "configurations": [
        {
            "type": "java",
            "name": "Debug (Launch) - Current File",
            "request": "launch",
            "mainClass": "${file}"
        },
        {
            "type": "java",
            "name": "Debug (Launch)-DemoApplication<kafka_sample>",
            "request": "launch",
            "mainClass": "com.kafka.sample.kafka_sample.DemoApplication",
            "projectName": "kafka_sample"
        }
    ]
}
image.png

Spring kafkaを使ったデータ受け渡し

ようやくここから本題です。
まずはkafkaを扱うための準備をしましょう。
kafkaを扱うためにはkafkaをインストールしなければなりません。
昨今のトレンドとしてはDockerなどを使ってローカル環境を汚さずに使うのですが、今回はそこにまで言及すると趣旨がずれるのでまた別の機会に記事にします。

kafkaのインストール

公式サイトよりQuick Startが提供されています。
https://kafka.apache.org/quickstart
そちらを参考にやればできるかと思います。

ただし、WindowsはQuickStartで解説されているシェルスクリプトではなくバッチファイルを使用します。
一応、windowsの方向けにコマンドと流れを解説します。

Windows向けKafka立ち上げ

まず、kafkaのパッケージをインストールし、解凍します。
その後、解凍したkafkaフォルダに入ってください。
そしたら、zookeeperを立ち上げるバッチを起動します。

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

このようなメッセージが出れば無事立ち上げ成功です。

INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

次にkafkaを立ち上げます。
もう1つコマンドプロンプトを起動し、そこで先ほど同様kafkaフォルダに移動してください。
そこで別のバッチを起動します。

.\bin\windows\kafka-server-start.bat config\server.properties

これで問題なく立ち上がるはずです。
コマンドプロンプトからkafkaを使う方法は割愛します。
同じ要領でバッチを起動すればQuick Startと同じように動作確認ができます。

spring Kafkaを使ったConsumer,Producerの実装

kafkaが無事立ち上がったことが確認できれば、いよいよコードを書いていきましょう。
KafkaはProducerでメッセージを投げ、Consumerでメッセージを受け取ります。
今回はそれが分かるようにProducerとConsumerそれぞれ実装していきます。
まずは、Producerから作成していきます。

package com.kafka.sample.Producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class Producer{

    //log出力用
    private static final Logger LOGGER=LoggerFactory.getLogger(Producer.class);

    //送り先のTOPICを定義
    private static final String TOPIC="sample";

    //KafkaのkeyとValueの型を設定できます。
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;


    public void sendKafka(String message) {
        LOGGER.info("kafka send message is {}",message);
        //指定したTOPICにmessageを送信できる。
        kafkaTemplate.send(TOPIC,message);

    }
}

今回はKafkaにsendするものはStringのみにします。
jsonを送る場合はMessageBuilderを使用するとTopicやHeaderなども纏めることができ、楽にメッセージ作成をすることができます。

次にConsumerを作ります。

package com.kafka.sample.kafka_sample.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class Consumer{

     //log出力用
     private static final Logger LOGGER=LoggerFactory.getLogger(Consumer.class);

    //kafkaListenerによってどのTopicの情報を取得するのか、
    //どのgroupIdの情報を取得するのかを設定でき、エラーが発生した場合のハンドリングなども可能。
    @KafkaListener(topics = "test",groupId = "sample")
    public void consumer(String message) {
        LOGGER.info("received message is {}",message);
    }

}

今回はメッセージを受け取り、ログを出力するという簡単なものです。
注意点としましては、KafkaListenerアノテーションはTopicとIdを指定しないとコンパイル時にエラーになってしまうことです。VSCodeのエラーチェックでは検出されないので注意してください。
また、Listenerはkafkaにメッセージを送信することはできないので、Listenerで受け取ったメッセージを加工しkafkaに送信する際は、再度sendする必要があります。

最後に、Controller.javaでこれらを使えるように修正します。

package com.kafka.sample.kafka_sample;

import com.kafka.sample.kafka_sample.Producer.Producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@EnableConfigurationProperties
@RequestMapping("/kafka-sample")
public class Controller {

    @Autowired
    private Producer producer;

    @GetMapping
    public void sendMessage(@RequestParam("message") String message){
        producer.sendKafka(message);
    }
}

といっても今回Controllerに書くのはProducerだけで十分ですけどね。
KafkaListenerは指定したTopicでKafkaが受信したときに起動するためです。

動作確認

動かしてちゃんと動くか見てみます。VSCode内でF5を押してください。
起動に失敗しても、何度かF5を押すとうまくいきます。
うまく起動出来たら、ターミナルにはこのようなメッセージが表示されます。

INFO 15576 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [test-0]
image.png
image.png

コマンドを使った確認

最初にtopicの確認から。
今回はtestというトピックを作成しました。ちゃんと作成できているのでしょうか。
コマンドプロンプトを開き、kafkaフォルダに移動後以下のコマンドを実行してください。

.\bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092

このコマンドはkafkaにあるTopicをすべて表示するコマンドです。きちんとtestと表示されていることが確認できるかと思います。

次にConsumerを見てみて、先ほど送ったメッセージがちゃんと登録されているか確認してみましょう。

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

–topicの後ろは設定したTopic名を入れてください。
すると

PS C:\Users\***\Downloads\kafka_2.12-2.3.0> .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
Hello Kafka

ありました!
これで無事kafkaにメッセージが送受信されていることが確認できました。

おわり

今回はspringbootを用いたSpringKafkaでの簡単なメッセージの受け渡しについて書きました。
私が初めてKafkaに触ったとき出てきた記事がほとんど英語で辛かった思い出から書いたので、初心者向けの内容となっています。
なるべく1からできるように書いたつもりですが、もしわかりにくいところや詳しく聞きたいところがあればコメントにてアドバイスいただけますと幸いです。
最後に今回作ったソースコードはGitHubにて公開していますので、書くのがめんどくさいという方はクローンして使ってみてください。
https://github.com/ft0220/KAFKA_SAMPLE
今度はDocker使った起動方法やTestの作り方やエラーのハンドル方法などもう少し詳しい実装面のあれこれも書きたいな。

bannerAds