【通向Cassandra日】Cassandra反应式编程风格~介绍和基本使用方法
首先
Cassandra在东京的日子
今年,2023年6月1日,Cassandra Day将在日本举办。Cassandra Day去年在柏林、伦敦、阿姆斯特丹、河内、雅加达、休斯顿、圣克拉拉、西雅图、新加坡也举办了。
為了本次在東京舉辦的活動,我們將發表有關Apache Cassandra的文章。

关于Apache Cassandra
如果用一句话来概括,Apache Cassandra就是一个开源的分布式数据库管理系统。
与其他分布式数据库管理系统一样,可以使用多个通用服务器来构建一个数据库(也可以只使用一个服务器进行开发等目的)。
在这里,将详细的说明省略掉,我们将把重点放在向感兴趣的人介绍的角色上,这个角色已经交由官方网站和维基百科来承担。
Cassandra的反应式编程方式~介绍
Cassandra的Java驱动程序提供了内置的响应式查询支持。
这篇文章使用了截至2023年5月最新的Java驱动程序4.15作为基础。
ReactiveSession通过扩展CqlSession接口,并新增特殊方法来执行以响应式流表示的请求。
响应式流API
Java 驱动程序 4.15 的反应式功能是基于 Reactive Streams API 开发的。(在 3.x 版本中,它是基于 Google 的 Java 库 Guava 开发的)
如果应用程序完全不使用反应式查询,则可以通过排除它来最小化驱动程序在运行时的依赖关系数量。
由于历史原因,与反应式相关的驱动程序类型存在于以DSE(DataStax Enterprise的缩写)为首的包中。
然而,反应式查询也可以在开源的Apache Cassandra中工作。
响应式执行模型采用了非阻塞方式实现。有关详细信息,请参阅非阻塞编程手册。
简而言之
ReactiveSession公开了两个公共方法。
ReactiveResultSet executeReactive(String query);
ReactiveResultSet executeReactive(Statement<?> statement);
无论哪种方法,都会返回一个通常的ResultSet的响应式流版本,即ReactiveResultSet。
在响应式编程的上下文中,ReactiveResultSet起到处理查询结果的”发布者”的角色。
当订阅 ReactiveResultSet 时,需要注意以下两点。
ReactiveResultSetドライバーによって返されるすべての実装は、デフォルトでは、コールド、ユニキャスト、単一サブスクリプションのみのパブリッシャーです。つまり、複数のサブスクライバーの利用はサポートされていません。複数のダウンストリーム サブスクライバーでそれらを使用する必要がある場合は、そのようなパブリッシャーによって生成された結果をキャッシュすることを検討します。(ドキュメントでは、キャッシングの例が紹介されています)。
ReactiveResultSetは、内部的に、IO スレッドを用います。サブスクライバーの実装者は、 Reactive Streams 仕様ルール 2.2を順守し、負荷の高い計算を実行したり、onNext呼び出し内で呼び出しをブロックしたりしないようにすることが推奨されます。これらを実行すると、パフォーマンスに影響を与える可能性があります。サブスクライバーでは、データを処理ロジックに非同期的にディスパッチする必要があります。
基础用法
响应式阅读风格
以下是一个示例,它从表中读取并将返回的所有行输出到控制台。
try (CqlSession session = ...) {
Flux.from(session.executeReactive("SELECT ..."))
.doOnNext(System.out::println)
.blockLast();
} catch (DriverException e) {
e.printStackTrace();
}
写作的响应式风格
在把查询结果输出到控制台后,将行插入到表中。
try (CqlSession session = ...) {
Flux.just("INSERT ...", "INSERT ...", "INSERT ...", ...)
.doOnNext(System.out::println)
.flatMap(session::executeReactive)
.blockLast();
} catch (DriverException e) {
e.printStackTrace();
}
请注意,只有在语句以响应方式执行时,实际请求才会在 ReactiveResultSet 被订阅时触发。换句话说,当 executeReactive 方法返回时,还没有执行任何操作。这就是为什么在上述写入示例中使用 flatMap 的原因。该方法将处理由 executeReactive 方法调用返回的每个订阅。例如,在以下代码中,由于 ReactiveResultSetsession.executeReactiveReactiveResultSet 尚未被订阅,因此查询不会被执行。
// DON'T DO THIS
Flux.just("INSERT INTO ...")
// The returned ReactiveResultSet is not subscribed to
.doOnNext(session::executeReactive)
.blockLast();
访问查询元数据
ReactiveResultSet公开了与请求执行和查询元数据相关的有用信息。
-
- Publisher<? extends ColumnDefinitions> getColumnDefinitions();
-
- Publisher<? extends ExecutionInfo> getExecutionInfos();
- Publisher wasApplied();
只需订阅即可检索上述发布者的内容。请注意,在查询完成之前,这些发布者是无法执行的。如果查询失败,这些发布者将以同样的错误失败。
在下面的示例中,执行查询并将所有可用的元数据输出到控制台。
ReactiveResultSet rs = session.executeReactive("SELECT ...");
// execute the query first
Flux.from(rs).blockLast();
// then retrieve query metadata
System.out.println("Column definitions: ");
Mono.from(rs.getColumnDefinitions()).doOnNext(System.out::println).block();
System.out.println("Execution infos: ");
Flux.from(rs.getExecutionInfos()).doOnNext(System.out::println).blockLast();
System.out.println("Was applied: ");
Mono.from(rs.wasApplied()).doOnNext(System.out::println).block();
您也可以在行级别检查查询元数据。由响应式查询执行返回的每一行都是ReactiveRow类的响应式实现。
在ReactiveRow中,将公开与ReactiveResultSet相同类型的查询元数据和执行信息,但是对于每一行来说会有以下不同。
-
- ColumnDefinitions getColumnDefinitions();
-
- ExecutionInfo getExecutionInfo();
- boolean wasApplied();
在下面的例子中,我们将执行查询并针对返回的每一行输出处理该行的协调器。然后,我们将获取连接到满足查询条件的所有协调器,并将它们输出到控制台上。
Iterable<Node> coordinators = Flux.from(session.executeReactive("SELECT ..."))
.doOnNext(
row ->
System.out.printf(
"Row %s was obtained from coordinator %s%n",
row,
row.getExecutionInfo().getCoordinator()))
.map(ReactiveRow::getExecutionInfo)
// dedup by coordinator (note: this is dangerous on a large result set)
.groupBy(ExecutionInfo::getCoordinator)
.map(GroupedFlux::key)
.toIterable();
System.out.println("Contacted coordinators: " + coordinators);
最后
在本稿中,我们介绍了Cassandra数据库的Java驱动程序支持响应式编程,并解释了其基本用法。
我希望在另一个机会上介绍更深入的内容。