ClickHouse®与Apache Kafka®和Aiven的连接

将Apache Kafka®和Aiven for ClickHouse®连接在一起的方式。

我将介绍一种在处理大量数据时能够快速响应且无需对数据进行下采样的分析方法。

Apache Kafka®与Aiven的连接,适用于ClickHouse®。

搜索开发者中心

Kafka和ClickHouse在根本上是不同的,却也有许多共同之处。它们都是开源的,具有高度可扩展性,适用于不变数据,并能处理大量数据。因此,这些技术并非竞争关系,实际上是相互补充的。

Apache Kafka 在处理实时数据流方面表现优秀。然而,在某些情况下,需要回溯到旧记录并进行后续数据分析和处理。作为流式处理平台的 Apache Kafka,并未经过优化以便访问大规模数据块或作为 OLAP(在线分析处理)引擎,所以这是困难的。

ClickHouse是被设计成可处理Petabyte级数据的可扩展且可靠的存储解决方案,同时也是用于快速在线分析处理的强大工具,在许多企业中被广泛用于数据分析。

通过将这两种技术结合起来,可以实现ClickHouse高性能的数据仓库,并通过从Apache Kafka获取实时数据,始终保持最新状态。

Apache Kafka的主题可以被视为实时数据流动的河流,而ClickHouse则是所有数据最终都会到达的海洋。

好,让我们卷起袖子来实际整合这两个数据解决方案。接下来,我们将逐步创建服务、进行整合,并执行一些查询实验。

创建服务

为了简化安装过程,我们将使用Aiven运营的托管版Apache Kafka和ClickHouse。即使您还没有Aiven账户也不用担心。注册只需一步,您可以使用免费试用版进行此实验。

你可以直接在Aiven的控制台上创建Aiven for ClickHouse和Aiven for Apache Kafka服务。在下面的示例中,我们使用apache-kafka-service和clickhouse-service作为服务名称,但你也可以更有创意地命名它们 😉

另外,为了能够添加Aiven for ClickHouse的集成功能,至少需要启动计划。

创建服务后,将等待其完全部署并运行。这样,准备工作就完成了!

Apache Kafka 的准备工作

为了将数据从Apache Kafka传输到ClickHouse,首先需要确保Apache Kafka中存在数据。因此,首先需要在Apache Kafka中创建一个主题。可以直接通过Aiven控制台进行创建。将主题命名为”measurements”。在这里,我们将发送虚构设备的连续测量值。

为了模拟新数据的持续流动,可以使用一个简短的bash脚本。这个脚本将创建一个包含事件时间戳、设备ID和值这三个属性的JSON对象。然后,使用kcat将该对象发送到名为”measurements”的主题。有关如何设置kcat的详细信息,请参考此文章。

`#!/bin/bash

while :
do
 stamp=$(日付 +%s)
 id=$((RANDOM%100))
 val=$((RANDOM%1000))
 echo "{\"timestamp\":$stamp,\"device\_id\":$id,\"value\":$val}"\
 | kcat -F kcat.config -P -t measurements
done`クリップボードにコピー

运行脚本并让其保持运行状态,不断创建消息并发送到话题上。

Apache Kafka的操作已经完成。现在,让我们切换到ClickHouse。

将Aiven for ClickHouse连接到Apache Kafka。

可以将Aiven for ClickHouse服务集成到Apache Kafka服务中,但在同一个Aiven项目中拥有两个服务可以更方便地进行集成。

要将Aiven for ClickHouse与Apache Kafka集成,需要进行两个步骤:

1. 建立连接。
2. 指定整合数据的结构和来源。

使用Aiven CLI来执行这些步骤。

首先,创建一个clickhouse_kafka类型的集成,指定服务名称,将Apache Kafka作为数据源,将ClickHouse作为目标,并建立连接。

avn service integration-create
 --integration-type clickhouse_kafka ¦-ソースサービス
 --source-service apache-kafka-service ╱ --service clickhouse-service
 --dest-service clickhouse-service`Copy to clipboard

如果执行此命令,(如果没有问题)将不会返回任何内容。然而,如果您查看Aiven for ClickHouse服务中可用的数据库列表(例如使用Aiven的控制台),您将注意到有一个名为service_apache-kafka-service的新数据库。创建的数据库名称是由service_和Apache Kafka服务名称组合而成的。

由于尚未指定从Apache Kafka服务获取何种数据,因此数据库为空。您可以使用JSON负载定义数据源,但首先需要找到集成的ID。可以通过执行以下命令来获取:

avn service integration-list clickhouse-service | grep apache-kafka-service`クリップボードにコピーする
avn service integration-list terminal output

在我的情况下,集成ID是88546a37-5a8a-4c0c-8bd7-80960e3adab0。而对于你的情况,UUID会是不同的。

通过获取统合ID,您可以为相应的连接进行适当的设置。

    • トピック measurements からデータを取得したい。

 

    • データは JSON 形式(特に JSONEachRow )である。

 

    データは3つのカラムを持つテーブルに変換される:データは timestamp、device_id、value の 3 つのカラムを持つテーブルに変換される:
 avnサービス統合-update 88546a37-5a8a-4c0c-8bd7-80960e3adab0 \
 --user-config-json '{ {
 "tables":[
 {
 "name":"measurements_from_kafka"、
 "columns":[
 {"name":"timestamp", "type":"DateTime"}、
 {name":「device_id", "type":「int8"}、
 {name":「value"、"type":「Int16"} である。
[ ],
 "トピック":[name": "測定値"}]、
 "data_format":"JSONEachRow"、
 "group_name":"measurements_from_kafka_consumer"
 }
[ ]
 クリップボードにコピーする

ClickHouse 使用 `group_name` 字段指定的消费者组来跟踪从主题消费的消息。默认情况下,每个条目将被读取一次。如果您想要获取数据两次,可以在表中使用不同的组名创建副本。

即时从Clickhouse读取Kafka消息

只需使用此设置即可从Apache Kafka主题中读取数据,无需使用ClickHouse进行任何额外操作。

在中文中,原文的含义可以被表达为:使用clickhouse-client是执行ClickHouse的SQL命令最方便的方法。如果不知道如何执行,可以确保使用CLI连接到ClickHouse®集群。

例如,我使用 Docker,并使用以下命令来运行客户端。请用您的值替换USERNAME、PASSWORD、HOST、PORT。

ドッカーラン
 --rm clickhouse/clickhouse-client ˶ˆ꒳ˆ˵
 --ユーザー名
 --パスワード PASSWORD
 --host HOST
 --ポート
 --secure`Copy to clipboard

当您进入客户端时,可以查看数据库的列表。

SHOW DATABASES`クリップボードにコピーする
データベースの端末出力を表示

显示已创建并建立的Integration Service (可能使用了其他不同的名称!)。

从这个数据库中获取表格列表时,会显示出在集成设置中指定的表格名称。

SHOW TABLES FROM `service_apache-kafka-service``Copy to clipboard
show tables terminal output

可以通过这种方式来重新确认其结构。

DESCRIBE `service_apache-kafka-service`.measurements_from_kafka`Copy to clipboard
describe tables terminal output

嗯,也许你想直接从这张表中读取。但请记住,消息只能被消费一次!也就是说,读取过的项目将会消失。尽管如此,你仍然无法停止执行以下命令:

SELECT ˶* FROM `service_apache-kafka-service`.measurements_from_kafka LIMIT 100`Copy to clipboard
SELECT count( \*) FROM `service_apache-kafka-service`.measurements_from_kafka`Copy to clipboard
接続テーブルの端末出力から直接項目を選択する

然而,这并不是从Apache Kafka消费数据的最方便的方法。在大多数情况下,您可能希望复制并保存ClickHouse的数据项。这将在下一节中介绍。

将Kafka消息保留在Clickhouse表中。

要将Apache Kafka中的数据发送到ClickHouse进行保存,需要两个部分:

1. 所有的数据都会永久地保存在目标表中。
2. 材料化视图用作连接器表(来自Kafka的测量数据)与目标表之间的桥梁。

可以使用以下两个查询创建:

CREATE TABLE device_measurements (timestamp DateTime, device_id Int8, value Int16)
ENGINE = ReplicatedMergeTree()
ORDER BY timestamp;`クリップボードにコピーする
CREATE MATERIALIZED VIEW materialised_view TO device_measurements AS
SELECT \* FROM `service_apache-kafka-service`.measurements_from_kafka;`Copy to clipboard

创建物化视图时,实际上会在后台添加触发器。该触发器会针对添加到 measurements_from_kafka 表中的新数据作出响应。一旦触发器激活,数据将通过物化视图(必要时还可以进行转换)并录入 device_measurements 表中。

当执行时,可以确认数据正在流动。

SELECT COUNT(*) from device_measurements`Copy to clipboard

执行一个查询来计算从设备得到的所有测量值,并确定哪个设备的平均值最高。这里使用了使用bar函数的简单且直观的机制。

セレクト
 device_id、
 count() as readings_number、
 bar(avg(value), 0, 1000, 100) as average_measurement_value
FROM device_measurements
GROUP BY device_id
ORDER BY device_id ASC`クリップボードにコピー
バーターミナル出力でデータを視覚化する

結論 – 概括而言

通过Aiven for ClickHouse将数据导入,并使用Materialized View来保存数据,我已经掌握了相应的技能。如果您有兴趣,还有其他一些资料可以参考,请随意查看:

    • ClickHouseとは何か、どのようにして高いパフォーマンスを実現するのか

Aiven for ClickHouseを始める

Aiven for ClickHouseをApache Kafkaと統合する方法に関するドキュメント(ここでは省略されている詳細もあります)。

Aiven for ClickHouseで利用可能なその他の統合に関する情報

广告
将在 10 秒后关闭
bannerAds