以下是小型应用程序中CQRS+事件溯源的配置示例:

经过

当试图在本地环境中实现CQRS+事件溯源时,我考虑到是否可以在不使用Apache Kafka等消息队列的情况下实现事件检测。我想到使用PostgreSQL的Pub/Sub功能可能可以在最小程度上实现,所以我做了一个考虑应用程序。应用程序可以在这里找到。

如果克隆了

cd src
docker-compose up -d --build

我会启动。

环境

    • 自作検討アプリ

 

    • docker-compose

 

    • mcr.microsoft.com/dotnet/sdk:7.0

 

    • Microsoft.EntityFrameworkCore Version=7.0.0-preview.7.22376.2

 

    postgres:14.5-bullseye

环境图像

    • write_model(eventstore用のpostgres)

 

    • read_model(参照用のpostgres)

 

    • Projector(read_modelへイベントを射影するdotnetcore)

 

    SampleCmd(write_modelへイベントを追加するdotnetcore)
名称未設定ファイル.drawio.png

文件树摘录 shù lù)

├── docker-compose.yml
├── dotnet
│   ├── Dockerfile
│   ├── docker-entrypoint.sh
│   └── src
│       ├── Domain
│       │   ├── Domain.csproj
│       │   ├── EventIdEnum.cs
│       │   ├── IEventInsert.cs
│       │   ├── User.cs
│       │   ├── UserId.cs
│       │   ├── UserName.cs
│       ├── Infrastructure
│       │   ├── DataBase
│       │   │   ├── Event.cs
│       │   │   └── EventstoreContext.cs
│       │   ├── EventInsert.cs
│       │   ├── Infrastructure.csproj
│       ├── Projector
│       │   ├── Commands.cs
│       │   ├── DataBase
│       │   │   ├── SampleDBContext.cs
│       │   │   └── User.cs
│       │   ├── Program.cs
│       │   ├── Projector.csproj
│       └── SampleCmd
│           ├── Commands.cs
│           ├── DataBase
│           │   ├── SampleContext.cs
│           │   └── User.cs
│           ├── Program.cs
│           ├── SampleCmd.csproj
├── read_model
│   └── Dockerfile
└── write_model
    ├── Dockerfile
    └── initdb
        ├── eventstore.backup
        └── init.sh

领域或事件

暂时将其设为一个普通用户表进行CRUD的领域。

域名 (yu4 ming2)

undefined

活动

EventId名称説明スキーマっぽいもの1UserAddedユーザーを追加する{ “type”: “object”, “properties”: { “name”: { “type”: “string” } } }2UserEditedユーザーを編集する{ “type”: “object”, “properties”: { “id”: { “type”: “string” }, “name”: { “type”: “string” } } }3UserDeletedユーザーを削除する{ “type”: “object”, “properties”: { “id”: { “type”: “string” } } }

ER图

undefined

请提供Projector的指令。

我已经为数据库恢复准备好了init,并为事件订阅准备好了subscription。

$ docker exec -it dotnet dotnet run --project /root/src/Projector help
Usage: Projector <Command>

Commands:
  help            Display help.
  init            
  subscription    
  version         Display version.

恢复数据库时用到的init内容摘要

    // read_modelのSampleデータベースのDbContextをコンストラクタインジェクションしている.
    private SampleDBContext Smp { get; }

    // write_modelのEventstoreデータベースのDbContextをコンストラクタインジェクションしている.
    private EventstoreContext Ev { get; }

    public async Task Init()
    {
        Smp.Database.EnsureDeleted(); // Sampleデータベースの削除
        Smp.Database.EnsureCreated(); // Sampleデータベースの生成

        // イベント分再生
        foreach (var x in Ev.Events)
            Handle(x);
    }

    // イベントid毎に処理.usersテーブルにinsertしたりupdateしたりdeleteしたり.
    private void Handle(Event ev)
    {
        if (Enum.IsDefined(typeof(EventIdEnum), ev.EventId) == false)
            return;

        switch ((EventIdEnum)ev.EventId)
        {
            case EventIdEnum.UserAdded: UserUpsertHandler(ev.Data.RootElement); break;
            case EventIdEnum.UserEdited: UserUpsertHandler(ev.Data.RootElement); break;
            case EventIdEnum.UserDeleted: UserDeleteHandler(ev.Data.RootElement); break;
            default: throw new ArgumentException($"not implemented. {ev.EventId}");
        };
    }

订阅活动内容摘要

    public async Task Subscription(EventstoreContext ev)
    {
        var conn = (NpgsqlConnection)ev.Database.GetDbConnection();
        conn.Open();

        try
        {
            conn.Notification += OnNotification; 

            // postgresqlのLISTENで購読する.
            using var cmd = new NpgsqlCommand("LISTEN event_channel", conn);
            cmd.ExecuteNonQuery();

            while (Context.CancellationToken.IsCancellationRequested == false)
                await conn.WaitAsync(Context.CancellationToken);
        }
        catch (Exception ex) when (!(ex is OperationCanceledException))
        {
            Console.WriteLine(ex.ToString());
        }
        finally
        {
            conn.Close();
        }
    }

    // イベントが発行される度に処理.
    private void OnNotification(object sender, NpgsqlNotificationEventArgs e)
    {
        Console.WriteLine("event handled: " + e.Payload);
        Event? ev = JsonSerializer.Deserialize<Event>(e.Payload, new JsonSerializerOptions 
        {
            PropertyNameCaseInsensitive = true,
        });

        if (ev is not null)
            Handle(ev);
    }

Eventstore数据库的触发器部分。

将数据加载到write_model的数据库中。

触发条件:在events表中进行插入
执行内容:通知插入的内容

CREATE OR REPLACE FUNCTION public.event_insert_trigger()
    RETURNS trigger
    LANGUAGE 'plpgsql'
    COST 100
    VOLATILE NOT LEAKPROOF
AS $BODY$
BEGIN
PERFORM pg_notify(
	'event_channel',
	row_to_json(NEW)::text
);

return NULL;
END
$BODY$;

ALTER FUNCTION public.event_insert_trigger()
    OWNER TO postgres;

CREATE TRIGGER event_trigger
    AFTER INSERT
    ON public.events
    FOR EACH ROW
    EXECUTE FUNCTION public.event_insert_trigger();

样本命令(SampleCmd)的指令

暂时使用控制台应用程序。使用命令如add或edit,指定事件名称并附带必要的参数将事件插入write_model中。

$ docker exec -it dotnet dotnet run --project /root/src/SampleCmd help
Usage: SampleCmd [options...]

Options:
  -e, --eventname <String>    event name. add or edit or delete (Required)
  -n, --username <String>     user name. Required for add or edit (Default: null)
  -id, --id <String>          guid. Required for edit or delete (Default: null)

Commands:
  help       Display help.
  version    Display version.

总结。

index.png

如果我们将一个PostgreSQL数据库拆分成多个模式来管理,可能也是一个不错的选择。

index.png

在事件发生时执行Projector作为外部应用程序似乎也是一个可行的选择。在这种模式中,恢复read_model时需要使用write_model的信息。

请问在使用CQRS+事件溯源在私有云中进行实现时,是否有任何方便的机制可以教授?

bannerAds