Confluent Platform中每个组件的特性
你好。
由于之前已经了解了整个Confluent平台的概述,接下来我们将看一下各个组件的特点。
然而,由于在概述中已经写明了其性质,因此将仅仅总结每个主题和每个组件的特点内容。另外,简化Camus和Proactive Support这些易于理解的部分。
数据序列化革命
-
- JSONフォーマットはよくつかわれているが、下記のように2つ欠点がある。
1. JSON自体にフォーマットを仕込むと各フィールドに形式を付記する必要があるため冗長
2. 構造定義なしでデータを投入するとデータが欠けたり変なのが加わったりするので危険
データスキーマの問題を解消するため、最近はThrift、Protocol Buffer、Avroといったスキーマ定義言語がある
その中では、スキーマが今後変わることを考慮した機構を有するAvroを我々は勧める。
阿夫罗
- Avroのスキーマ定義はJSONで行われ、サンプルは下記。
{"namespace": "example.avro",
"type": "record",
"name": "user",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"}
]
}
-
- 面白い点としては、Avroはデータをシリアライズするときだけでなく、デシリアライズ時もこのスキーマを使用する。
結果シリアライズ化したデータからはフィールド名が除去され、データサイズ小さい
重要ポイントとして挙げたスキーマの進化として、データのスキーマが変わった場合、下流でシームレスに扱えるかが重要
その際に見落としが発生しがちであるし、確認に時間もかかる。
Avroでは以下の3パターンの互換性確保のパターンがある。
1. 後方互換(Backward Compatibility)
2. 前方互換(Forward Compatibility)
3. 完全互換(Full Compatibility)
反向互换
-
- HDFSに投入されたデータをHive等のクエリで横断的に検索するケースを考える。
その場合、古いスキーマでエンコードされたデータを新規スキーマでも読める必要がある。
このようなケースに対応するためにAvroは「後方互換」のルールを持つ。
全データが「後方互換」のルールを守られれば、全データに対して統一的にクエリをかけることが可能となる。
例えば、先ほどのuserスキーマにfavorite_colorというフィールドを下記のように追加することを考える。
{"namespace": "example.avro",
"type": "record",
"name": "user",
"fields": [
{"name": "name", "type": "string"},
{"name": "favorite_number", "type": "int"},
{"name": "favorite_color", "type": "string", "default": "green"}
]
}
favorite_colorがデフォルト値greenを持つことに注目
デフォルト値が古いスキーマでシリアライズされたデータを新しいスキーマで読み込む際に使用される。
デフォルト値定義がない場合、新しいスキーマで古いデータを読み込む際に新規フィールドの値が不定となり、扱えなくなる。
前方交换位置
-
- アプリケーションロジックが特定スキーマバージョンに紐づけられているケースを考える。
スキーマ更新時、直ちにアプリケーションを更新できない場合は多々ある。
そのため、新しいスキーマのデータを過去のアプリケーションが理解できる形に落とし込む必要がある。
そのため、古いスキーマで新しいスキーマのデータを読む手段を提供する。
例えば、上記のfavorite_colorが追加されたスキーマも前方互換性がある。
古いスキーマに新しいスキーマのデータを投影すると追加フィールドは単純に捨てられる。
上記のfavorite_colorを追加したスキーマからfavorite_colorを削除すると、そのスキーマは前方互換ではなくなる。
完全互換 –
前述の後方互換、前方互換を1スキーマが両方満たす場合、そのスキーマは完全互換といえる。
古いデータを新しいスキーマで読め、新しいデータを古いスキーマで読める。
ここまででわかるように、Avroのスキーマ管理においては下記が重要
スキーマの管理
スキーマをバージョンアップ時の互換性の確保
Confluent Platformではスキーマの管理手段と、スキーマバージョン間の整合性チェックの機能を提供する。
应用程序开发
-
- Confluent Platformでは管理しているスキーマを用いてKafkaにデータを送受信する手段を下記の通り用意
JVMアプリケーション用:シリアライズ
その他アプリケーション用:Rest Proxy
JVM应用程序:序列化
-
- JVMアプリケーションにおいては依存性の追加とシリアライザ設定によって使用することが可能。
追加する依存性は下記の通り。
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.9.0.0-cp1</version>
<scope>provided</scope>
</dependency>
-
- シリアライザ設定は以下2項目を参照。
1. Key-Valueの方をGenericsでObject型にする。Primitive型、Map、Recordが投入可能。
2. 下記コードのようにシリアライザとSchema RegistryのURLを追加する。
Properties props = new Properties();
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", "http://localhost:8081");
// Set any other properties
KafkaProducer producer = new KafkaProducer(props);
-
- ただ、これらの設定はPropertyファイルに書いて読み込ませるのをお勧めする。
-
- 上記の設定により、Kafkaに対してシリアライズしたデータが送信されるようになると同時にバリデーションも実行される。
それはデシリアライザも同じく
この状態でデータを投入すると、下記のコードとやっていることは同じとなる。
User user1 = new User();
user1.setName("Alyssa");
user1.setFavoriteNumber(256);
Future<RecordAndMetadata> resultFuture = producer.send(user1);
其他应用:Rest 代理
-
- Rest ProxyによってKafkaのAPIやAvroサポートがない言語からもConfluent Platformの機能を使用することが可能
-
- 下記のAPIが存在
Topic一覧取得
Topicメタデータ取得
TopicのPartition一覧&メタデータ取得
Topicへのメッセージ投入
Topicからのメッセージ取得
Consumerグループの作成
ConsumerグループのOffsetの更新
Consumerグループを介したメッセージの取得
Broker一覧取得
例としては下記のようにリクエストを送ってメッセージを投入し、投入結果を取得可能。
POST /topics/test HTTP/1.1
Host: kafkaproxy.example.com
Content-Type: application/vnd.kafka.avro.v1+json
Accept: application/vnd.kafka.v1+json, application/vnd.kafka+json, application/json
{
"value_schema": "{\"name\":\"int\",\"type\": \"int\"}"
"records": [
{
"value": 12
},
{
"value": 24,
"partition": 1
}
]
}
HTTP/1.1 200 OK
Content-Type: application/vnd.kafka.v1+json
{
"key_schema_id": null,
"value_schema_id": 32,
"offsets": [
{
"partition": 2,
"offset": 103
},
{
"partition": 1,
"offset": 104
}
]
}
模式注册表
设计概要
-
- Schema RegistryはAvroSchema用分散ストレージ層でKafkaをバックエンドとしている。
登録されたスキーマにはユニークなIDが割り振られる。単調増加するが1ずつ増えるわけではない。(採番にはZookeeperを使用)
Kafkaは耐障害性のあるバックエンドと変化を記録するWALを保持
Schema Registryは分散、Singleマスタ構成。(選出にはZookeeperを使用)
Slave側はアクセスされた際にMasterにリクエストを転送する方式

API列表
-
- 下記のようなAPIを保持
Schema Subject一覧取得(-key、-value等)
Schema Subjectのバージョン一覧取得
Subject、バージョン指定してSchemaを取得
Subject指定してのSchema取得
Subjectを指定してのSchema更新
Schemaの互換性確認
互換性レベル設定(デフォルト、Subject指定)
互換性レベル取得(デフォルト、Subject指定)
代理休息
-
- Rest Proxyは元々のJava Consumer/Producer/CLIのうち下記の機能を有する。
実行可能なAPI一覧は前述。
メタデータ
クラスタからbroker、topic、partition、config情報が取得可能
Producer
Producerを公開するのではなく、Producerを介してKafkaにメッセージを投入するAPIを公開
データの投入はRest Proxy内でプーリングされたProducerを使用
圧縮方式などの一部の設定はConfig API側で設定可能
Consumer
Rest Proxy内ではConsumer Groupを実現するためにHigh Level Consumerを用いている。
ConsumerはOffsetがあるためステートフル。
OffsetのCommitは自動か手動か選択可能
Data Formats
Rest Proxy自体はJSONでデータをやり取りするが、実際にKafkaに保持されるのはBase64エンコードか、Avro.
Avroを使用する場合、Schema Registryに登録されたスキーマによってバリデーション
Rest Proxyのクラスタ化とロードバランシング
Rest Proxy自体は認識のためのidを別定義するのみで、各インスタンス間の連携は無い。
そのため、クラスタ化は不要、ロードバランシングは別機構で実施する必要がある。
Simple Consumer
基本的にはHigh LevelのConsumerを使うことが望ましいが、Low LevelのConsumerも使用可能
Offset指定でメッセージの取得を行う際などに用いられる。
下記の機能は現状まだ提供されていない。
管理者用オペレーション
複数のTopic、Partitionに対する同時投入
複数スレッドのConsumer(現状シングルスレッドのため、スループットは出ない)
マルチテナントにおけるユーザ間のバランシングなど特殊な機能
总结
我已经确认了Confluent平台的重要功能,包括模式管理、通过Rest API的代理、Avro模式的兼容性管理和序列化等方面的概述。
然而,目前来看,只有生产者方面的性能才能通过Rest代理实现,所以完全用Rest API替代现有功能还有很长的路要走。
看来,在获取数据的一方,目前还必须使用Java / C++的消费者。