使用.NET 6和Dapr进行分布式服务开发之第6部分 输入绑定调度器执行

通过输入绑定实现协同合作。

我过去一直按照以下步骤进行工作。这次我想要谈谈Dapr中的输入绑定(Input Binding)之一,即定期执行调度器。我认为这是最容易理解的方式。

因为我们在现有前提的基础上要添加输入绑定功能,所以请从这个页面阅读的人,请事先阅读以下内容。


 

关于输入绑定

输入绑定(Input Binding)是指在每5分钟(事件)触发服务调用(触发器)等场景中,可以使用诸如Azure Storage Queue/Amazon SQS/Apache Kafka等队列,当入队(事件)后,希望调用服务(触发器)的功能。

我认为大家对Azure Functions和AWS Lambda这样的服务已经很熟悉了,它们可以对事件进行触发处理。我们也可以构建类似的机制来对事件进行处理。

image.png

绑定列表中包括输入绑定和输出绑定两部分,详细信息请查看以下链接:
https://docs.dapr.io/reference/components-reference/supported-bindings/

尝试使用 Cron(计划任务调度程序)绑定

在这些输入绑定中,我认为最简单的是Cron(调度程序),所以我们先从Cron(调度程序)开始尝试。

image.png

创建并添加项目

请前往之前工作的解决方案的根文件夹,并添加一个新项目,然后在该项目的控制器上设置绑定。

dotnet new webapi -o WorkerService
dotnet sln add WorkerService

将项目添加到tye.yaml文件中

请确保不要忘记在 “tye.yaml” 中添加项目,并将 dapr 配置为侧边栏启动。以下是在现有 YAML 文件中增加 “worker-service” 的两行代码。

# tye application configuration file
# read all about it at https://github.com/dotnet/tye
#
# when you've given us a try, we'd love to know what you think:
#    https://aka.ms/AA7q20u
#
name: dapr
extensions:
- name: dapr

  # log-level configures the log level of the dapr sidecar
  log-level: debug

  # config allows you to pass additional configuration into the dapr sidecar
  # config will be interpreted as a named k8s resource when deployed, and will be interpreted as
  # a file on disk when running locally at `./components/myconfig.yaml`
  #
  # config: myconfig

  # components-path configures the components path of the dapr sidecar
  components-path: "./components/"

  # If not using the default Dapr placement service or otherwise using a placement service on a nonstandard port,
  # you can configure the Dapr sidecar to use an explicit port.
  # placement-port: 6050
services:
- name: service-a
  project: ServiceA/ServiceA.csproj
- name: service-b
  project: ServiceB/ServiceB.csproj
- name: app
  project: App/App.csproj
- name: stateservice
  project: StateService/StateService.csproj
- name: worker-service
  project: WorkerService/WorkerService.csproj

# This may conflict with the redis instance that dapr manages.
#
# Doing a `docker ps` can show if its already running. If that's the case
# then comment out out when running locally. 
# - name: redis
#   image: redis
#   bindings: 
#   - port: 6379

向component文件夹中添加cron-binding.yaml文件。

在处理statestore时,我认为您已添加了statestore.yaml,现在同样添加cron-binging.yaml。

image.png
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: Job # 呼び出されるコントローラーパス(JobController)
spec:
  type: bindings.cron
  version: v1
  metadata:
  - name: schedule
    value: "@every 3s" # いわゆるCRON式です。* * *といったフォーマットも対応します。ここでは、3秒おきに実行しています。
scopes: # scopesでは、対象となるサービスを列挙して指定しています。
- worker-service

另外,根据确认的规格,调用必须使用POST方法。换句话说,在这种情况下,只需在JobController中执行POST的实现,就可以了。

项目的变更

我們將修改這個項目。首先,在這個 WorkerService 中,請刪除 WeatherForecast.cs 和 WeatherForecastController.cs,因為我們不會使用它們。然後,我們還準備了一個新的 JobController.cs 和 JobResponse.cs。

image.png

此外,还要记得编辑Program.cs文件。
特别是//app.UseHttpsRedirection();这一行一定要注释掉。因为Dapr启动时会进行重定向,可能导致错误发生。

using System.Diagnostics;
Activity.DefaultIdFormat = ActivityIdFormat.W3C;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

//app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

我会为您准备一个简单的响应模型。

namespace WorkerService;

public class JobResponse
{
    public DateTime Date { get; set; }

}

在 JobController 中,当收到 POST 请求时,会简单地将当前时间(Datetime.Now)填充到响应中返回。

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;

namespace WorkerService.Controllers;

[ApiController]
[Route("[controller]")]
public class JobController : ControllerBase
{
    private readonly ILogger<JobController> _logger;

    public JobController(ILogger<JobController> logger)
    {
        _logger = logger;
    }

    [HttpPost(Name = "PostJob")]
    public JobResponse Post()
    {
        JobResponse response = new JobResponse
        {
            Date = DateTime.Now
        };

        Console.WriteLine(response.ToString());
        return response;
    }
}

启动和确认

完成编辑后,请使用tye run启动。

$ tye run
Loading Application Details...
Launching Tye Host...

[14:44:48 INF] Executing application from tye.yaml
[14:44:48 INF] Dashboard running on http://127.0.0.1:8000
[14:44:48 INF] Build Watcher: Watching for builds...
...

请以与以往相同的方式,在浏览器中打开http://127.0.0.1:8000。

image.png

首先,让我们通过查看worker-service-dapr的日志来确认。
如果日志中存在名称为“Job”,调度触发的记录,那么说明Dapr成功地触发了调度任务。

image.png
[worker-service-dapr_73c5055d-f]: time="2022-03-07T15:43:34.0106148+09:00" level=debug msg="name: Job, schedule fired: 2022-03-07 15:43:34.0106148 +0900 JST m=+141.014253101" app_id=worker-service instance=HX-90 scope=dapr.contrib type=log ver=1.6.0

让我们也去工作者服务端进行确认。

image.png
[worker-service_75697214-b]: WorkerService.JobResponse
广告
将在 10 秒后关闭
bannerAds