以下是小型应用程序中CQRS+事件溯源的配置示例:
经过
当试图在本地环境中实现CQRS+事件溯源时,我考虑到是否可以在不使用Apache Kafka等消息队列的情况下实现事件检测。我想到使用PostgreSQL的Pub/Sub功能可能可以在最小程度上实现,所以我做了一个考虑应用程序。应用程序可以在这里找到。
如果克隆了
cd src
docker-compose up -d --build
我会启动。
环境
-
- 自作検討アプリ
-
- docker-compose
-
- mcr.microsoft.com/dotnet/sdk:7.0
-
- Microsoft.EntityFrameworkCore Version=7.0.0-preview.7.22376.2
- postgres:14.5-bullseye
环境图像
-
- write_model(eventstore用のpostgres)
-
- read_model(参照用のpostgres)
-
- Projector(read_modelへイベントを射影するdotnetcore)
- SampleCmd(write_modelへイベントを追加するdotnetcore)

文件树摘录 shù lù)
├── docker-compose.yml
├── dotnet
│ ├── Dockerfile
│ ├── docker-entrypoint.sh
│ └── src
│ ├── Domain
│ │ ├── Domain.csproj
│ │ ├── EventIdEnum.cs
│ │ ├── IEventInsert.cs
│ │ ├── User.cs
│ │ ├── UserId.cs
│ │ ├── UserName.cs
│ ├── Infrastructure
│ │ ├── DataBase
│ │ │ ├── Event.cs
│ │ │ └── EventstoreContext.cs
│ │ ├── EventInsert.cs
│ │ ├── Infrastructure.csproj
│ ├── Projector
│ │ ├── Commands.cs
│ │ ├── DataBase
│ │ │ ├── SampleDBContext.cs
│ │ │ └── User.cs
│ │ ├── Program.cs
│ │ ├── Projector.csproj
│ └── SampleCmd
│ ├── Commands.cs
│ ├── DataBase
│ │ ├── SampleContext.cs
│ │ └── User.cs
│ ├── Program.cs
│ ├── SampleCmd.csproj
├── read_model
│ └── Dockerfile
└── write_model
├── Dockerfile
└── initdb
├── eventstore.backup
└── init.sh
领域或事件
暂时将其设为一个普通用户表进行CRUD的领域。
域名 (yu4 ming2)
活动
EventId名称説明スキーマっぽいもの1UserAddedユーザーを追加する{ “type”: “object”, “properties”: { “name”: { “type”: “string” } } }2UserEditedユーザーを編集する{ “type”: “object”, “properties”: { “id”: { “type”: “string” }, “name”: { “type”: “string” } } }3UserDeletedユーザーを削除する{ “type”: “object”, “properties”: { “id”: { “type”: “string” } } }
ER图
请提供Projector的指令。
我已经为数据库恢复准备好了init,并为事件订阅准备好了subscription。
$ docker exec -it dotnet dotnet run --project /root/src/Projector help
Usage: Projector <Command>
Commands:
help Display help.
init
subscription
version Display version.
恢复数据库时用到的init内容摘要
// read_modelのSampleデータベースのDbContextをコンストラクタインジェクションしている.
private SampleDBContext Smp { get; }
// write_modelのEventstoreデータベースのDbContextをコンストラクタインジェクションしている.
private EventstoreContext Ev { get; }
public async Task Init()
{
Smp.Database.EnsureDeleted(); // Sampleデータベースの削除
Smp.Database.EnsureCreated(); // Sampleデータベースの生成
// イベント分再生
foreach (var x in Ev.Events)
Handle(x);
}
// イベントid毎に処理.usersテーブルにinsertしたりupdateしたりdeleteしたり.
private void Handle(Event ev)
{
if (Enum.IsDefined(typeof(EventIdEnum), ev.EventId) == false)
return;
switch ((EventIdEnum)ev.EventId)
{
case EventIdEnum.UserAdded: UserUpsertHandler(ev.Data.RootElement); break;
case EventIdEnum.UserEdited: UserUpsertHandler(ev.Data.RootElement); break;
case EventIdEnum.UserDeleted: UserDeleteHandler(ev.Data.RootElement); break;
default: throw new ArgumentException($"not implemented. {ev.EventId}");
};
}
订阅活动内容摘要
public async Task Subscription(EventstoreContext ev)
{
var conn = (NpgsqlConnection)ev.Database.GetDbConnection();
conn.Open();
try
{
conn.Notification += OnNotification;
// postgresqlのLISTENで購読する.
using var cmd = new NpgsqlCommand("LISTEN event_channel", conn);
cmd.ExecuteNonQuery();
while (Context.CancellationToken.IsCancellationRequested == false)
await conn.WaitAsync(Context.CancellationToken);
}
catch (Exception ex) when (!(ex is OperationCanceledException))
{
Console.WriteLine(ex.ToString());
}
finally
{
conn.Close();
}
}
// イベントが発行される度に処理.
private void OnNotification(object sender, NpgsqlNotificationEventArgs e)
{
Console.WriteLine("event handled: " + e.Payload);
Event? ev = JsonSerializer.Deserialize<Event>(e.Payload, new JsonSerializerOptions
{
PropertyNameCaseInsensitive = true,
});
if (ev is not null)
Handle(ev);
}
Eventstore数据库的触发器部分。
将数据加载到write_model的数据库中。
触发条件:在events表中进行插入
执行内容:通知插入的内容
CREATE OR REPLACE FUNCTION public.event_insert_trigger()
RETURNS trigger
LANGUAGE 'plpgsql'
COST 100
VOLATILE NOT LEAKPROOF
AS $BODY$
BEGIN
PERFORM pg_notify(
'event_channel',
row_to_json(NEW)::text
);
return NULL;
END
$BODY$;
ALTER FUNCTION public.event_insert_trigger()
OWNER TO postgres;
CREATE TRIGGER event_trigger
AFTER INSERT
ON public.events
FOR EACH ROW
EXECUTE FUNCTION public.event_insert_trigger();
样本命令(SampleCmd)的指令
暂时使用控制台应用程序。使用命令如add或edit,指定事件名称并附带必要的参数将事件插入write_model中。
$ docker exec -it dotnet dotnet run --project /root/src/SampleCmd help
Usage: SampleCmd [options...]
Options:
-e, --eventname <String> event name. add or edit or delete (Required)
-n, --username <String> user name. Required for add or edit (Default: null)
-id, --id <String> guid. Required for edit or delete (Default: null)
Commands:
help Display help.
version Display version.
总结。

如果我们将一个PostgreSQL数据库拆分成多个模式来管理,可能也是一个不错的选择。

在事件发生时执行Projector作为外部应用程序似乎也是一个可行的选择。在这种模式中,恢复read_model时需要使用write_model的信息。
请问在使用CQRS+事件溯源在私有云中进行实现时,是否有任何方便的机制可以教授?