はじめに

Oracle SQL Access to Kafka(OSaK)は、SQLでKafkaトピックに動的にクエリを実行できるOracle Database 23c Freeの新機能です。

概要やユースケースはこちらを参照ください。

 

Oracle SQL Access to Kafkaには主に3つの機能があります。
本記事では、その中でもユーザが指定したタイムスタンプ間で、Kafkaトピックにあるレコードを再読み込みするシーカブル機能の使用手順を紹介します。

前提条件

    • Oracle Database 23c環境

 

    • Kafkaクラスタを作成していること

 

    • 本記事ではOracle Cloud Infrastructure(OCI) 上で提供されるフルマネージドな分散メッセージングサービスである、OCI Streaming Service(OSS)を使ってKafkaクラスタを作成しています。

 

    OSSについては、OCI Streaming を動かしてみようというチュートリアルが参考になります。

Kafkaクラスタの登録

まずはKafkaクラスタの登録をする必要があります。
この手順については、こちらの記事を参照ください。

2つのタイムスタンプ間のKafkaレコードにアクセスするアプリケーションの作成

開始と終了のタイムスタンプ間のKafkaレコードにアクセスするシーカブル・アプリケーションを作成します。Kafkaトピックにデータが残っていることが条件になりますが、過去に発生した問題の調査をする場合にこの機能が有効です。

ロードするKafkaレコードの形式を決定する表を作成します。今回はID列とKafkaレコードをPublishした時点のタイムスタンプ列を持つ表を作成します。
create table seek_sample(id number, time timestamp);

シーカブル・アプリケーションSampleSeekAppを作成します。
※OSaKseekStreamというトピック名を指定しておきます。(OCI Streaming Serviceのストリーム名)
DECLARE
v_options VARCHAR2(50);
BEGIN
v_options := ‘{“fmt” : “DSV”, “reftable” : “seek_sample”}’;
DBMS_KAFKA.CREATE_SEEKABLE_APP (
‘KAFKACLUS1’,
‘SampleSeekApp’,
‘OSaKseekStream’,
v_options);
END;
/

内部的に作成された一時表を確認してみます。
select object_name, object_type from user_objects where object_name like ‘%ORA$DK%’;

OBJECT_NAME OBJECT_TYPE
———————————– ———–
ORA$DKVGTT_KAFKACLUS1_SAMPLESEEKAPP_0 TABLE

ORA$DKV_KAFKACLUS1_SAMPLESEEKAPP_0 VIEW

ORA$DKX_KAFKACLUS1_SAMPLESEEKAPP TABLE

ORA$DKX_KAFKACLUS1_SAMPLESEEKAPP TABLE PARTITION

実際に流すKafkaレコードは以下にします。30秒ごとにcounterとそのタイムスタンプをPublishするスクリプトrun_by30s.shを実行します。

run_by30s.sh
#!/bin/sh

counter=0
while true; do
current_time=$(date +’%Y/%m/%d %H:%M:%S’)
echo “$counter,$current_time” | $KAFKA_HOME/bin/kafka-console-producer.sh –bootstrap-server cell-1.streaming.ap-tokyo-1.oci.oraclecloud.com:9092 –topic OSaKseekStream –producer.config $KAFKA_HOME/config/producer.properties
counter=$((counter+1))
sleep 30s
done

上記のスクリプトを実行している状態(30秒ごとにストリーミングされている状態)で、3:10:00~3:15:00のKafkaレコードを取得します。DBMS_KAFKA.SEEK_OFFSET_TSプロシージャでタイムスタンプを指定することで、その時間内でKafkaレコードの読み取りをするようにOracle SQL Access to Kafkaビューを指定します。その後LOAD_TEMP_TABLEプロシージャでグローバル一時表にレコードをロードすることで、ビューでKafkaレコードの参照ができるようになります。
begin
DBMS_KAFKA.SEEK_OFFSET_TS (‘ORA$DKV_KAFKACLUS1_SAMPLESEEKAPP_0’,
TO_TIMESTAMP(‘2023/06/07 03:10:00’, ‘YYYY/MM/DD HH24:MI:SS’),
TO_TIMESTAMP(‘2023/06/07 03:15:00’, ‘YYYY/MM/DD HH24:MI:SS’));
end;
/

EXEC DBMS_KAFKA.LOAD_TEMP_TABLE(‘ORA$DKVGTT_KAFKACLUS1_SAMPLESEEKAPP_0’);

3:10:00~3:15:00のレコードが参照できるか確認してみます。
set linesize 150
col KAFKA_EPOCH_TIMESTAMP for 9999999999999

SELECT * FROM ORA$DKV_KAFKACLUS1_SAMPLESEEKAPP_0;

KAFKA_PARTITION KAFKA_OFFSET KAFKA_EPOCH_TIMESTAMP ID TIME
————— ———— ——————— ———- —————————-
0 151 1686107418380 0 07-JUN-23 03.10.16.000000 AM
0 152 1686107450533 1 07-JUN-23 03.10.48.000000 AM
0 153 1686107482608 2 07-JUN-23 03.11.20.000000 AM
0 154 1686107514681 3 07-JUN-23 03.11.53.000000 AM
0 155 1686107546725 4 07-JUN-23 03.12.25.000000 AM
0 156 1686107578806 5 07-JUN-23 03.12.57.000000 AM
0 157 1686107610881 6 07-JUN-23 03.13.29.000000 AM
0 158 1686107642936 7 07-JUN-23 03.14.01.000000 AM
0 159 1686107675033 8 07-JUN-23 03.14.33.000000 AM

9 rows selected.

正しく参照できています。このように、Oracle SQL Access to Kafkaのシーカブル機能を使えば、過去のKafkaレコードをOracle Databaseからクエリすることができます。

参考情報

Oracle SQL Access to Kafka

广告
将在 10 秒后关闭
bannerAds