Confluent 平台备忘录 – (2) 消息发送和接收简易测试

首先

这是使用Confluent Platform时的备忘录。这次我们将尝试使用命令行简单地进行消息交互。

相关文章

Confluent Platform 注释 – (1)环境搭建
Confluent Platform 注释 – (2)消息发送和接收简易测试
Confluent Platform 注释 – (3)Schema Registry简易测试

请提供相关资料

Kafka命令指南
Kafka命令和术语介绍
Kafka概述和架构解析

卡夫卡命令

以下的/opt/confluent-6.2.0/bin目录中提供了这些选项。(这些选项实际上是由Shell脚本提供的,并最终运行Java程序)

Kafka主题

这是用于在Broker上创建Topic或检查Topic属性的命令。

终端命令 kafka-topics –help
[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics –help
该工具用于创建、删除、描述或更改主题。
选项 描述
—— ———–
–alter 更改主题的分区数、副本分配和/或配置。
–at-min-isr-partitions 在描述主题时设置,仅显示ISR计数等于配置的最小值的分区。与–zookeeper选项不兼容。
–bootstrap-server <String:与之连接的服务器> 必需:要连接的Kafka服务器。如果提供此选项,则不需要直接与Zookeeper进行连接。
–command-config <String:命令配置属性文件> 包含要传递给Admin Client的配置的属性文件。此选项仅在使用–bootstrap-server选项描述和更改经纪人配置时使用。
–config 要创建或更改的主题的主题配置覆盖。以下是有效配置的列表:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.replicas
index.interval.bytes
leader.replication.throttled.replicas
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
有关主题配置的详细信息,请参阅Kafka文档。
仅在使用–bootstrap-server选项(kafka-configs CLI支持使用–bootstrap-server选项更改主题配置)的情况下支持。
–create 创建新主题。
–delete 删除主题。
–delete-config 要为现有主题删除的主题配置覆盖(请参阅–config选项下的配置列表)。与–bootstrap-server选项不兼容。
–describe 列出给定主题的详细信息。
–disable-rack-aware 禁用机架感知的副本分配。
–exclude-internal 在运行list或describe命令时排除内部主题。默认情况下,将列出内部主题。
–force 禁止控制台提示。
–help 打印使用信息。
–if-exists 如果在更改或删除或描述主题时设置了该选项,则仅当主题存在时才执行操作。
–if-not-exists 如果在创建主题时设置了该选项,则仅当主题尚不存在时才执行操作。
–list 列出所有可用主题。
–partitions 要创建或更改的主题的分区数(警告:如果增加了带有键的主题的分区,则消息的分区逻辑或顺序将受到影响)。
如果未提供create参数,则默认为集群默认值。
–replica-assignment
–replication-factor 如果未提供,则默认为集群默认值。
–topic 要创建、更改、描述或删除的主题。除了–create选项外,还可以接受正则表达式。将主题名称放在双引号中,并使用’\’前缀来转义正则表达式符号;例如: “test\.topic”。
–topics-with-overrides 在描述主题时设置,仅显示已覆盖配置的主题。
–unavailable-partitions 在描述主题时设置,仅显示没有可用leader的分区。
–under-min-isr-partitions 在描述主题时设置,仅显示ISR计数小于配置的最小值的分区。与–zookeeper选项不兼容。
–under-replicated-partitions 在描述主题时设置,仅显示低于复制数的分区。
–version 显示Kafka版本。
–zookeeper 已弃用,以host:port形式提供的Zookeeper连接字符串。可以提供多个主机以实现故障转移。

卡夫卡控制台生产者

作为制片人的角色,它是一个能够从Shell交互式地发送消息的命令。

kafka-console-producer –help
[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-producer –help
该工具用于从标准输入读取数据并将其发布到Kafka。
选项 描述
—— ———–
–batch-size <整数: 大小> 如果消息不是同步发送,发送单个批次中的消息数。 (默认值: 200)
–bootstrap-server <字符串: 服务器地址> 除非指定 –broker-list (已弃用),否则必填。
要连接的服务器地址。代理列表字符串的格式为 主机1:端口1,主机2:端口2。
–broker-list <字符串: 代理列表> 已弃用,请使用 –bootstrap-server 代替;如果已指定 –bootstrap-server,则忽略此选项。
代理列表字符串的格式为 主机1:端口1, 主机2:端口2。
–compression-codec [字符串: 压缩编解码器] 支持的压缩编解码器:’none’、’gzip’、’snappy’、’lz4’或’zstd’。
如果未指定任何值,则默认为’gzip’。
–help 打印使用信息。
–line-reader <字符串: 读取器类> 用于从标准输入读取行的类名。默认情况下,每行都作为单独的消息读取。 (默认值: kafka.tools.ConsoleProducer$LineMessageReader)
–max-block-ms <长整数: 最大阻塞时间> 发送请求阻塞的最长时间 (默认值: 60000)
–max-memory-bytes <长整数: 总内存大小> 生产者用于缓冲等待发送到服务器的记录的总内存。 (默认值: 33554432)
–max-partition-memory-bytes <长整数: 分区内存大小> 每个分区分配的缓冲区大小。
当接收到小于此大小的记录时,生产者将尝试将它们进行优化分组,
直到达到此大小为止。 (默认值: 16384)
–message-send-max-retries <整数> 代理有多种原因无法接收消息,短暂性不可用仅是其中之一。
此属性指定在生产者放弃并丢弃此消息之前的重试次数。 (默认值: 3)
–metadata-expiry-ms <长整数: 元数据过期间隔> 多长时间内强制刷新元数据,即使没有看到任何领导者变更。 (默认值: 300000)
–producer-property <字符串: 生产者属性> 以键值对的形式传递用户定义的属性给生产者。
–producer.config <字符串: 配置文件> 生产者配置属性文件。需要注意的是,[producer-property] 优先于此配置。
–property <字符串: 属性> 以键值对的形式传递用户定义的属性给消息读取器。
这允许用户定义消息读取器的自定义配置。默认属性包括:
parse.key=true|false
key.separator=
ignore.error=true|false
–request-required-acks <字符串: 所需的ack数> 生产者的请求所需的ack数 (默认值: 1)
–request-timeout-ms <整数: 请求超时时间> 生产者请求的等待ack的超时时间。值必须为非负且非零。 (默认值: 1500)
–retry-backoff-ms <整数> 每次重试之前,生产者都会刷新相关主题的元数据。
由于领导者选举需要一定时间,该属性指定生产者在刷新元数据之前等待的时间。
(默认值: 100)
–socket-buffer-size <整数: 大小> tcp接收缓冲区的大小。
(默认值: 102400)
–sync 如果设置,将消息发送请求同步发送到代理,一个接一个地发送。
–timeout <整数: 超时时间> 如果设置,并且生产者运行在异步模式下,这将给出消息在排队等待足够批量大小时的最大等待时间。
值以毫秒为单位。 (默认值: 1000)
–topic <字符串: topic> 必填: 要发送消息到的主题ID。
–version 显示Kafka版本。

卡夫卡控制台消费者

这是一个在角色为消费者的情况下承担的命令。您可以从Shell中接收消息。

kafka-console-consumer –help
[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer –help
此工具用于从Kafka主题中读取数据并将其输出到标准输出。
选项说明
——
–bootstrap-server
–consumer-property
–consumer.config 消费者配置属性文件。注意,[consumer-property]优先于该配置。
–enable-systest-events 除了记录已消费的消息外,还记录消费者的生命周期事件。(这仅适用于系统测试。)
–formatter 用于格式化Kafka消息以进行显示的类名。(默认值:kafka.tools.DefaultMessageFormatter)
–from-beginning 如果消费者尚未有已建立的偏移量进行消费,则从日志中最早的消息开始,而不是最新的消息。
–group 消费者的消费组ID。
–help 打印使用信息。
–isolation-level 设置为read_committed以过滤尚未提交的事务消息。设置为read_uncommitted以读取所有消息。(默认值:read_uncommitted)
–key-deserializer
–max-messages 在退出之前要消费的最大消息数。如果未设置,消费将持续进行。
–offset 要消费的偏移量ID(非负数),或’earliest’表示从开头开始,或’latest’表示从末尾开始。(默认值:latest)
–partition 要消费的分区。消费从分区的末尾开始,除非指定了’–offset’。
–property 初始化消息格式化器的属性。默认属性包括:
print.timestamp=true|false
print.key=true|false
print.offset=true|false
print.partition=true|false
print.headers=true|false
print.value=true|false
key.separator=
line.separator= headers.separator= null.literal=
key.deserializer=
value.deserializer=
header.deserializer= 用户还可以传入自定义的格式化器属性;具体来说,用户可以通过使用’key.
deserializer.’、’value.
deserializer.’和’headers.
deserializer.’前缀来配置其反序列化器的属性。
–skip-message-on-error 处理消息时如果出现错误,则跳过该消息而不是停止。
–timeout-ms 如果指定,当在指定的时间间隔内没有可供消费的消息时,退出。
–topic 要消费的主题ID。
–value-deserializer
–version 显示Kafka版本。
–whitelist 指定要包括在消费中的主题的白名单的正则表达式。

删除卡夫卡记录

卡夫卡的消息与MQ不同,在消费者接收消息时,似乎消息仍然保留在主题中。这是一个相当令人震惊的事实。

当消费者接收消息时,只会标记为“消费”,在那个时刻并不会被删除。因此,似乎可以从多个消费者中多次读取相同的消息。

那么什么时候会被删除呢!当我去调查时,发现了以下描述。

每个Apache Kafka开发人员都应该知道的前5件事 – 提示#4:掌握命令行工具 – 删除记录

Kafka在磁盘上存储主题的记录,并在消费者读取后保留这些数据。然而,记录并不是存储在一个大文件中,而是根据分区被分成多个段,其中偏移量顺序在相同的主题分区中的段之间是连续的。由于服务器并没有无限的存储空间,Kafka提供了设置来控制保留多少数据,基于时间和大小。

    • The time configuration controlling data retention is log.retention.hours, which defaults to 168 hours (one week)

The size configuration log.retention.bytes controls how large segments can grow before they are eligible for deletion

似乎通过时间和大小来控制删除的时机,但默认情况下似乎会保留7天。
在文章的链接中,介绍了手动删除消息的方法。据说可以创建一个JSON文件来指定要删除的消息,然后将其作为参数传递给kafka-delete-records命令来执行。真是麻烦啊!如果要删除所有消息,或许直接删除/重新创建主题会更快。

kafka-delete-records –help 【卡夫卡删除记录–帮助】
[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-delete-records –help
该工具帮助删除给定分区中指定偏移量之后的记录。
选项 描述
—— ———–
–bootstrap-server
–command-config
–help 打印使用信息。
–offset-json-file {“partitions”:
[{“topic”: “foo”, “partition”: 1,
“offset”: 1}],
“version”:1
}
–version 显示卡夫卡版本。

偏移JSON文件示例

{
   "partitions": [
                  {"topic": "cool-topic", "partition": 0, "offset": -1}
                 ],
                 "version":1
}

如果将offset指定为-1,似乎会删除所有的消息。

示例操作

根据以下描述,尝试运行一些Kafka命令。

image.png

主题选取

我会创建一个名为”酷话题”的Topic。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --create --topic cool-topic --bootstrap-server localhost:9092
Created topic cool-topic.

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
_schemas
cool-topic

我会查看关于酷炫话题的详细信息。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --describe --topic cool-topic --bootstrap-server localhost:9092
Topic: cool-topic       TopicId: Z8vfn8oqQW6LwUl9N_myAA PartitionCount: 1       ReplicationFactor: 1    Configs: segment.bytes=1073741824
        Topic: cool-topic       Partition: 0    Leader: 0       Replicas: 0     Isr: 0

制片人/信息发送

当执行kafka-console-producer命令时,会显示提示符(>),然后您可以在该处输入您想要发送的消息。每当按下回车键时,该行字符串将作为一条消息进行发送。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-producer --topic cool-topic --bootstrap-server localhost:9092
>hello
>first
>second
>third
>end
>^C

最后按下Ctrl+C退出。
这样就发送了5条消息。

消费者/消息接收

我收到了酷话题的消息。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic cool-topic
hello
first
second
third
end
^CProcessed a total of 5 messages

最后,可以用Ctrl+C退出。

删除主题

删除酷的话题。

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --delete --topic cool-topic --bootstrap-server localhost:9092

[root@test12 /opt/confluent-6.2.0/bin]# ./kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
_schemas
bannerAds