Skip to content

Main role in Architecture

  • Watcher: A real-time monitoring role, with various types including TimerWatcher and EventWatcher.
  • Worker: The actual worker role, responsible for performing all the actual work.
  • Job: A detailed description of a job. For example, what is the trigger type? Is it event-triggered or time-triggered? Which Worker will execute it?
  • JobContext: the context state corresponding to the Job, which is used for running the Worker and restoring its state.
  • Extension point: Green. Properties or methods must be overridden when implementing customization.
  • Sealed class: pink or red

1、Job : 任务的配置

Job

  • Job是对任务的描述,通过该描述可以设置它的工作类型,执行的业务逻辑单元。
  • 由两种方式来配置它,一般是在应用程序配置appsettings.json或插件模块的配置moduleAssemblyName.json中,直接用代码来继承Job类可以等价实现。

支持的任务类型

使用前请先根据支持的任务类型,确定是否需要使用该框架。
由Job.Trigger配置的值触发 。

Value类型示例值简介
Setup安装与升级""启动后运行
Startup启动任务"2000"Setup后所有任务前运行,延时2秒。可能会持续运行。
Event事件触发"DirChangeEventer"当指定目录变化时触发
Interval间隔运行"00:00:20"运行的间隔20秒
SchedulableCron表达式"* 0/10 * * * ?"Cron表达式:每10分钟执行

不同类型的Job的生命周期

使用不同类型的Job实现不同类型的任务。

Job Work Flow

配置方式

json
{
 "Jobs": {
   //SampleJobs是模块名称,以区分多个插件模块中的同名Job。
   "SampleJobs": [
     {
       "name": "Interval10Sec",
       "trigger": "00:00:10",
       "workerName": "SimapleWorker"
     },
   ]
 }
}

代码模式

cs
public class Interval10Sec : Job
{
    public Interval10Sec()
    {
        Trigger = "00:00:10";
    }
    public override string? WorkerName => nameof(SimapleWorker);
}

2、JobContext : 任务执行时的状态和上下文

JobContext

  • JobContext是每个Job对应的上下文与状态,它会在首次运行后持久化到默认的/state/目录下。
  • 任务再次运行时会读取上次的持久化状态并恢复成当前的JobContext。
  • 每个JobContext都必须实现对应的JobFactory,以创建对应的上下文。
  • 也可以不创建自定的JobContext,而是直接使用它,可以通过Data属性来设置和取得不同的工作状态。

自定义的CustomJobContext示例:

cs
public sealed class CustomJobContext : JobContext
{
    public bool Done
    {
        get; set;
    }
}

public sealed class CustomJobContextFactory(IServiceProvider serviceProvider)
    : JobContextServiceBase(serviceProvider), IJobContextFactory<CustomJobContext>, ITransient
{
    public override CustomJobContext LoadOrCreate(IJob job)
    {
        var ctx = LoadOrCreateNew(job, Create);
        return ctx;
    }

    protected override CustomJobContext Create() => new();
}

3、Worker : 具体执行业务逻辑的工作单元

Worker

  • 通过KeyedName来以类的名称注册到DI,也可以用字符串指定名称,不指定名称将以类的全名注册到DI。
  • 实现自已的Worker业务类时,要将Job对应的WorkerName指向它。
  • 每个Worker需要一个对应的JobContext以确定上下文类型。

简单示例:

cs
[KeyedName]
public sealed class SimapleWorker(IServiceProvider serviceProvider) : Worker<JobContext>(serviceProvider)
{
    protected override async Task ExecuteAsync(CancellationToken cancellationToken)
    {
        await Task.Delay(500, cancellationToken); //mock execute 0.5 second.
        await ExecuteCallbackAsync(true);// work finish callback. It's not necessary.
    }
   
    protected override async Task ExecuteCallbackAsync<TResult>(TResult? result) where TResult : default
    {
        if (result is true)
            Logger.LogDebug($"Worker are success.");

        await Task.CompletedTask;
    }
}

4、Event: 事件触发类型的Job的启动事件

Eventor

  • 它是Job的触发类型的事件触发。
  • 每种事件是不同的,因此要根据具体的业务来实现。
  • 事件的类名称是启动该事件侦测的启动启动类的名称。

简单示例:

cs
[KeyedName]
public sealed class RouterEventer(IServiceProvider serviceProvider) : Eventer<TimeJobContext>(serviceProvider)
{
    public override async Task StartAsync(TimeJobContext? jobContext, CancellationToken cancellationToken)
    {
        await base.StartAsync(jobContext, cancellationToken);
        var receiver = new UdpReceiver(ServiceProvider, cancellationToken);
        receiver.OnMessage += async (s, e) =>
        {
            var workerName = jobContext.GetWorkerName();
            var worker = ServiceProvider.GetKeyedService<IWorker<TimeJobContext>>(workerName);
            if (worker == null)
            {
                Logger.LogError($"{this} No Worker[{workerName}] was found.");
                return;
            }

            var dnsrst = jobContext!.GetData<DDNSState>() ?? new DDNSState();
            jobContext.SetData(dnsrst);
            await worker.StartAsync(jobContext, cancellationToken); //trigger the worker.
        };
        await Task.CompletedTask;
    }
}

5、Watcher: 每个Watcher会监视某些类型的Job

Watcher

  • 每个Watcher监视一个或多个Job。
  • 每个Watcher必然对应一个具体的JobContext。
  • 它在启动时会将对应的Job创建成对应的JobContext。
  • 由它来实现对Job的Worker的触发与管理。
  • 同一种Trigger类型的Job,可能会有多个Watcher,Watcher与Trigger和具体的JobContext对应。
  • 可以实现自定义的Watcher。

一个事件Watcher的简单实现

cs
public sealed class MyEventWatcher(IServiceProvider serviceProvider) : EventWatcherBase<MyJobContext>(serviceProvider)
{
}

一个自定义的Watcher的实现

cs
public sealed class MyWatcher(IServiceProvider serviceProvider) : Watcher<MyJobContext>(serviceProvider), ITransient
{
    protected override TriggerStyle TriggerStyle => TriggerStyle.Setup;

    protected override async Task ExecuteAsync()
    {
        var ctxs = JobContexts;
        using var logDis = Logger.BeginScopeLog(out string scopedId);
        try
        {
            var runjxs = ctxs.Where(x => !x.Done).OrderBy(x => x.OrderVersion);
            foreach (var run in runjxs)
            {
                if (AppCancellationToken.IsCancellationRequested) break;
                var name = run.GetWorkerName();
                var worker = ServiceProvider.GetKeyedService<IWorker<SetupJobContext>>(name);
                if (worker == null)
                    Logger.LogError($"{this} No Worker[{name}] was found.");
                else
                    await worker.StartAsync(run, AppCancellationToken);
            }
        }
        catch (Exception ex)
        {
            Logger.LogError(ex, $"{this} has exception:{ex.Message}");
        }
    }
}

6、HostService: 框架的启动服务类

Job