使用Akka实现CQRS模式

目录

    1. 关于文章

 

    1. 简单的自我介绍

 

    1. 复习响应式声明

 

    1. 实现和解释

 

    总结

关于这篇文章

    • 当記事はリアクティブ宣言・CQRSの概要を説明後、Scala/Akka(typed)での実装を例に解説していきます。

 

    • 対象の読者としてScalaプログラマ・Akkaで全体像が掴めない人を想定しております。(対象外の方にも理解出来るよう精一杯努力してはいます。)

 

    • 注意点としてこの記事ではDDDに関する説明は行いません。DDDとCQRSパターンの関係については申し訳ありませんが、別の方の記事をご参照ください。

 

    • 調査した内容を扱いますが解説に不備がある場合は当記事にコメント、もしくは@AerosmithBzまでご連絡ください。

リアクティブ宣言のおさらいについてはリアクティブ宣言の要約となりますので、一読された事のある方は飛ばしていただいて構いません。

关于Akka的许可证更改
从Version 2.7.x开始,Akka的许可证将更改为BSL1.1。
在商业使用之前,请自行调查确认。

我叫简单自我介绍

初次见面的朋友,您好。
我是阿斯达(HN),一年级的内部/SES工程师,负责WEB系统开发。
目前在项目中主要使用Python和PHP,但没有接触到Scala,每天都把我的悲伤唱成歌。
我在Scala方面有大约一年半的经验,同时也持续了大约一年时间在开发(学习)Akka。
我的GitHub链接在这里。

响应式声明的复习

我认为在学习CQRS时,最好先掌握根本概念和思想——反应式宣言,所以我将简单地解释一下反应式宣言。

用户对系统的期望是什么?

在 反应式宣言 本文的引言部分中写道如下。

只是几年前,巨大的应用程序由几十个服务器组成,能够容忍数秒的响应时间和数小时的离线维护,数据量约为几千兆字节。如今的应用程序被部署在各种设备上,从移动设备到数千个多核处理器的云基础集群。用户期望毫秒级的响应时间和100%的运行时间。数据量以拍字节计算。昨天的软件架构完全无法满足今天的需求。

简而言之,目前的系统与过去相比变得庞大,用户的要求变得更高,数据量以拍字节为单位进行处理。现代化系统难以跟上变化,并未满足用户的期望行为。接下来,我们要讨论如何解决这个问题。

 サーバー数期待される応答時間データ量少遅数ギガ多早ペタバイト単位

构成反应性系统的四个要素

我们所追求的是一个具备响应能力、容错性、弹性和消息驱动的系统。我们称之为反应式系统(Reactive Systems)。反应式声明请参见https://www.reactivemanifesto.org/ja

我们在本文中提出了一种被称为反应式系统的架构,旨在解决前述问题,并且将其构成要素定义为即时响应性、容错性、弹性和消息驱动。

    1. 即时响应性

 

    1. 系统应尽可能快速地做出响应。

 

    1. 容错性

 

    1. 系统即使面临故障也能保持即时响应性。

 

    1. 弹性

 

    1. 系统能在工作负载变化的情况下保持即时响应性。

 

    1. 消息驱动

 

    反应式系统通过异步消息传递来建立组件之间的界限。

并提供解释,简而言之,反应式系统的概览是利用异步消息传递构建一个稳定、易于扩展且响应迅速的系统,以解决问题。

スクリーンショット 2022-09-20 22.04.07.png

为什么选择消息驱动作为手段?

這篇文章中描述了以下內容。

消息驱动(Message Driven):响应式系统通过异步消息传递来建立组件之间的边界。这样一来,它保证了松耦合性、隔离性和位置透明性,并提供了将错误委托为消息的手段。显式的消息传递可以实现负载管理和弹性。此外,系统内创建和监控消息队列,并在需要时应用背压,以实现流量控制。通过使用位置透明的消息传递作为通信手段,可以在通信跨越集群或在单个主机内管理相同的故障配置和语义。非阻塞通信使得接收方只在激活时才消耗资源,从而抑制了系统的开销。Reactive声明 [具体链接](https://www.reactivemanifesto.org/ja)

*位置透明性是指运行时实例和其引用的分离概念。

换句话说,我们的目标是通过使用基于消息驱动的异步消息传递来满足以下四个要求。

    1. 确保松散耦合。

 

    1. 通过使用明确的消息传递,可以轻松管理每个组件。

 

    1. 通过位置透明性,可以实现弹性的资源管理。

 

    使用位置透明的消息传递可以在系统中产生共同项,从而降低成本。

强烈推荐您进行实施,以便更好地理解整体情况,因为仅仅通过描述,可能很难想象出位置的透明度。

实施和解释。

由于上面的文本量超过了预期的5倍,所以请快速浏览。

    1. 有关CQRS

 

    1. 有关Akka

 

    1. 有关示例项目的概述

 

    源码和解释

关于CQRS

CQRS(Command Query Responsibility Segregation: コマンドクエリ責務分離)是指将命令(写入)和查询(读取)进行分离,以确保可扩展性的一种思路。在执行写入操作时,通常会与事件溯源模式(Event Sourcing)一起实现,以事件的形式将数据存储到数据库中。

具體的流程如下,在本文中記載:

    1. 命令侧以事件的形式将用户的输入写入用于写入的NoSQL数据库(Cassandra)中。

 

    1. 投影将根据事件将数据映射到用于读取的关系型数据库(Aurora)中。

 

    1. 读取API用于获取数据。

 

    源码和解释
akka-cqrs-counter.png

关于Akka

在这里,我们将解释如何实施示例项目,进而理解应该了解的ClusterScharding,特别是在使用Akka时。

cluster_sharding.png

样本项目的概述

为了理解CQRS的整体情况,这次的重点是应用程序的实现,我们将采用一个简单的计数器应用程序。
此外,在这里我们将解释WriteAPI和Projection, 并且关于ReadAPI,只要能将查询投递到数据库中,本次解释就不再指定特定内容。

写API图

qiita_akka.png

请编写API流程图。

qiita_flow.png

投影图

qiita_projectoin.png

提供原始资料和解释性说明

请参阅GitHub上的完整源代码。

域名

case class Counter(number: Int = 0) {

  // カウントアップした値を持つカウンターを返却する。
  def countUp(number: Int): Counter =
    this.copy(number = this.number + number)
}
    ドメインは後々Actorにステートとして状態を保存し更新出来るようにしたいのでコピーを返すようにします。

综合协议

// Serialize用に定義
trait CborSerializable

object CounterAggregateProtocol {

  sealed trait CounterCommand extends CborSerializable

  // ActorRef[_]::ask用のリプライ先のActorRefをコマンドに格納する。
  final case class CountUp(id: String, n: Int)(val replyTo: ActorRef[CountUpReply]) extends CounterCommand

  sealed trait CountUpReply extends CborSerializable

  final case class CountUpSucceededReply(counter: Counter) extends CountUpReply

  final case class CountUpFailedReply(e: Exception) extends CountUpReply
}
    • Aggregateで送受信するメッセージを定義する。

CborSerializableについては公式Documentを参照ください。

汇集

object CounterAggregate {

  // ①
  private def commandHandler(ctx: ActorContext[CounterAggregateProtocol.CounterCommand]):
  (Counter, CounterAggregateProtocol.CounterCommand) => Effect[CounterEvent, Counter] =
    (_, command) =>
      command match {
        case cmd@CounterAggregateProtocol.CountUp(id, n) =>
          Effect.persist(CounterEvent.CountUpped(id, n)).thenReply(cmd.replyTo) { state =>
            CounterAggregateProtocol.CountUpSucceededReply(state)
          }
      }

  // ②
  private def eventHandler: (Counter, CounterEvent) => Counter =
    (state, event) =>
      event match {
        case CounterEvent.CountUpped(_, n) => state.countUp(n)
      }

  def apply(persistenceId: PersistenceId): Behavior[CounterAggregateProtocol.CounterCommand] =
    Behaviors.setup { ctx =>
      EventSourcedBehavior(
        // ③
        persistenceId = persistenceId,
        // ④
        emptyState = Counter(),
        // コマンドハンドラとイベントハンドラを定義する。
        commandHandler = this.commandHandler(ctx),
        eventHandler = this.eventHandler)
        // ⑤
        .withTagger(_ => Set(CounterTags.Single))
        // ⑥
        .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 10, keepNSnapshots = 3))
        // ⑦
        .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1))
    }
}
    • ①コマンド受信時の処理を定義する。Effect::persistでイベントを永続化する。

 

    • ②イベント受信時の処理を定義する。

 

    • ③persitenceId…永続化する際のプライマリキー。参照する際はこの値を元にアクターを取得する。

 

    • ④アクター開始のステートの値。空の状態を定義する。

 

    • ⑤タグの定義。本番では単一のタグのみでの実装はおやめください。タグについてはこちら。

⑥スナップショットを作成する頻度を定義。
⑦SupervisorStrategy…障害時の振る舞いを定義。

 

投影处理程序

关于投影,可以在WriteApi内进行处理,但是这次我们将其划分为另一个项目。

final class CounterProjectionHandler(system: ActorSystem[_])
  extends Handler[EventEnvelope[CounterEvent]] {

  private implicit val ec: ExecutionContext = system.executionContext

  override def process(envelope: EventEnvelope[CounterEvent]): Future[Done] = {
    envelope.event match {
      case CounterEvent.CountUpped(id, number) =>
        val row = CounterRow(id, number)
        // 読込用DBにイベントを反映させている。
        CounterRepository.insert(row).map(_ => Done)
    }
  }
}

    • イベント受信通知時の処理を定義する。

 

    主に読込用DBにデータを保存したりKafkaなどにデータを渡す。

投影

※关于Projection,可以在WriteApi中进行处理,但是这次我们已经将其分离到另一个项目中了。

object CounterProjection extends App {

  def apply(): Behavior[String] = Behaviors.setup { context =>

    implicit val system: ActorSystem[_] = context.system
    implicit val ec: ExecutionContextExecutor = system.executionContext

    val sourceProvider =
      // ①
      EventSourcedProvider
        .eventsByTag[CounterEvent](
          system,
          readJournalPluginId = CassandraReadJournal.Identifier,
          tag = CounterTags.Single)

    val projection =
      // ②
      CassandraProjection
        .atLeastOnce(
          projectionId = ProjectionId("counters", CounterTags.Single),
          sourceProvider,
          handler = () => new CounterProjectionHandler(system))
        .withSaveOffset(afterEnvelopes = 1, afterDuration = 500.millis)

    // ③
    ClusterSingleton(system).init(
      SingletonActor(
        ProjectionBehavior(projection),
        projection.projectionId.id)
        .withStopMessage(ProjectionBehavior.Stop))

    Behaviors.empty
  }

  ActorSystem(CounterProjection(), "akkaCqrsCounter_projection")
  StdIn.readLine()
}
    • ①タグとReadJounalPluginを元にSourceを取得する。

 

    • ②Projectionを設定する。

 

    ③今回はClusterSingletonを採用。

总结

最後可能有点匆忙,但打算随时更新文章。(因为到目前为止花了大约6个小时,请原谅。)
作为到目前为止所学的感受,这也成为了我编写这篇文章的背景。然而,就像我这样工程师经验浅薄的人来说,有关Akka的这一部分文章相对较少,学习相当困难。
真心感谢给予我建议的各位朋友。
我希望这篇文章可以引导我过去的自己。
以上,非常感谢。

GitHub
– 基于Git版本控制系统的代码托管平台

Twitter
– 一种社交媒体平台,允许用户发送和阅读短文本消息

bannerAds