使用Oracle Cloud Streaming,尝试在单个节点上使用Java Kafka客户端来消费

首先

在Oracle云基础设施(OCI)中,提供了名为Streaming的服务,可以实时收集和处理流数据。Streaming具有与Apache Kafka兼容的API,可以通过Kafka客户端进行连接,实现生产和消费功能。

上次我們實現了一個 Java 的生產者。

我们将确认如何从Java的Kafka客户端中以独立节点方式进行数据消费并输出。

虚拟机 jī)

我会创建一个适当的CentOS 7虚拟机。

生成授权令牌

要在流媒体服务中使用Kafka API进行连接,需要使用IAM用户的认证令牌。您可以在自己的IAM用户详细信息页面上生成该令牌。

1588474181611.png

我会随意地加入一些解释。

1588474211704.png

由于无法使用图片中的内容,我会将令牌记录下来。

如果在Token中包含了符号,比如;,我感觉它可能无法正常运行。如果遇到困难,无法正常工作时,请考虑替换Token。

1588474230416.png

创建流

在OCI控制台上,选择Analytics>Streaming。然后选择创建流。

1588519801919.png

我会随意输入参数来创建。

1588519908911.png

当创建Stream时,如果Stream Pool为空,则会自动创建默认的Stream Pool。teststream01会自动分配到DefaultPool中。Stream Pool是一个用于管理多个Stream的概念,可以统一管理诸如将Endpoint设置为公开或私有、使用何种密钥进行数据加密等方面的事项。

由于这次选择了自动建立,因此端点是公开的,并且使用由Oracle管理的加密密钥进行设置。

1588520017422.png

点击上述屏幕上的「查看Kafka连接设置」按钮。

1588521024252.png

点击“复制全部”,将所有的设置值记录下来。

1588521056591.png

安装Open JDK

为了在 Java 中运行 Kafka 客户端,需要安装 OpenJDK。

sudo yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel

安装 Maven 3.6.3

为了管理依赖关系,安装Maven。

请在以下的URL中确认下载链接:https://maven.apache.org/download.cgi

1587385604751.png

使用从下载页面复制的URL,在CentOS上下载tar.gz文件。

mkdir ~/maven
cd ~/maven
wget https://ftp.jaist.ac.jp/pub/apache/maven/maven-3/3.6.3/binaries/apache-maven-3.6.3-bin.tar.gz

请按照以下步骤进行安装:
https://maven.apache.org/install.html

解压缩 tar.gz 文件。

tar xfvz apache-maven-3.6.3-bin.tar.gz

在中国,将以下内容进行中文本地化的释义如下:
添加环境变量设置到bashrc文件中。

echo 'export PATH=$PATH:$HOME/maven/apache-maven-3.6.3/bin' >> ~/.bashrc

重新加载bashrc。

source ~/.bashrc

确认 mvn 命令是否可执行。

[opc@kafkaconsumer1 maven]$ mvn -v
Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f)
Maven home: /home/opc/maven/apache-maven-3.6.3
Java version: 1.8.0_252, vendor: Oracle Corporation, runtime: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.252.b09-2.el7_8.x86_64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "3.10.0-1062.18.1.el7.x86_64", arch: "amd64", family: "unix"
[opc@kafkaconsumer1 maven]$ 

Java 构建

创建一个目录。

mkdir ~/KafkaSingleConsumerApp
mkdir -p ~/KafkaSingleConsumerApp/src/main/java/jp/test/sugi
cd ~/kafkaconsumersingle

我会编写Java源代码。

cat <<'EOF' > ~/KafkaSingleConsumerApp/src/main/java/jp/test/sugi/KafkaSingleConsumerApp.java
package jp.test.sugi;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

public class KafkaSingleConsumerApp {
    public static void main(final String[] args) {
        System.out.println("Start.");

        // 接続時の設定値を Properties インスタンスとして構築する
        final Properties properties = new Properties();

        // OCI Streaming に接続するための指定
        properties.put("bootstrap.servers", "cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092");
        properties.put("security.protocol", "SASL_SSL");
        properties.put("sasl.mechanism", "PLAIN");
        properties.put("sasl.jaas.config",
                "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"poc02/oracleidentitycloudservice/suguru.sugiyama@oracle.com/ocid1.streampool.oc1.ap-tokyo-1.amaaaaaaycetm7yawtz56lnnerap4r45y4vheekgvhdaevxf3clfpuew6mla\" password=\"8t[shwUN}I-d+{}8Nx_a\";");
        properties.put("max.partition.fetch.bytes", 1024 * 1024);

        // Consumer の動作指定
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "java-consumer-group");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");

        // Consumer を構築する
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(),
                new StringDeserializer());

        // Consumer をトピックに割り当てる
        consumer.subscribe(Arrays.asList("teststream01"));

        try {
            while (true) {
                // メッセージをとりだす
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60l));

                // とりだしたメッセージを表示する
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(String.format("%s:%s", record.offset(), record.value()));
                }

                // メッセージの読み取り位置である offset を、最後に poll() した位置で(同期処理で)更新する
                consumer.commitSync();
            }
        } finally {
            consumer.close();
            System.out.println("End.");
        }
    }
}

为了指定Maven的依赖关系,创建pom.xml文件。

cat <<'EOF' > ~/KafkaSingleConsumerApp/pom.xml
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>jp.test.sugi</groupId>
  <artifactId>KafkaSingleConsumerApp</artifactId>
  <version>1.0</version>
  <name>KafkaSingleConsumerApp</name>
  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.26</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-api</artifactId>
      <version>2.13.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-core</artifactId>
      <version>2.13.2</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-assembly-plugin</artifactId>
          <version>3.3.0</version>
          <configuration>
            <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
            <archive>
              <manifest>
                <mainClass>jp.test.sugi.KafkaSingleConsumerApp</mainClass>
              </manifest>
            </archive>
          </configuration>
          <executions>
            <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
            </execution>
          </executions>
        </plugin>
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
          <configuration>
            <archive>
              <manifest>
                <mainClass>jp.test.sugi.KafkaSingleConsumerApp</mainClass>
              </manifest>
            </archive>
          </configuration>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>
EOF

创建一个包含所有依赖关系的fat jar。第一次运行时,需要等待几分钟以下载所需的文件。

mvn package assembly:single

当运行生成的 Jar 文件时,将消耗 Stream 上的数据。

java -jar target/KafkaSingleConsumerApp-1.0-jar-with-dependencies.jar

为了确认操作,您可以通过OCI控制台进行操作。您只需点击”测试信息”按钮即可。

1588602314673.png

在输入适当的信息后,点击“生成”按钮。

1588602336798.png

请提供一个需要转述的具体例子。

[opc@kafkaconsumer01 KafkaSingleConsumerApp]$ java -jar target/KafkaSingleConsumerApp-1.0-jar-with-dependencies.jar
Start.
log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.consumer.ConsumerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
1076:test1
1077:hello? i am streaming

请参看以下网址

Azure
https://docs.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-producer-consumer-api

Azure
https://docs.microsoft.com/ja-jp/azure/hdinsight/kafka/apache-kafka-producer-consumer-api

bannerAds