Skip to content

计划组件

js Plan 对象 提供了灵活的任务编排系统,具有共享状态管理和信号控制功能。

Plan对象提供了Js接口与GO lang接口。

特性

  • 任务排序和并行执行,同一个order的任务会并行执行。
  • 任务间共享状态管理
  • 任务生命周期控制(暂停/恢复/停止)
  • 基于信号的任务通信
  • 可配置的状态检查间隔
  • 线程安全操作
  • 资源清理管理

核心概念

Plan

Plan 是一个具有共享状态空间的任务集合。它管理任务的执行顺序和生命周期。

Task

Task 是一个工作单元,具有以下特性:

  • 可以访问共享状态
  • 可以响应控制信号
  • 可以与同一顺序的其他任务并行执行
  • 可以报告其状态并存储任务特定数据

共享空间

共享空间提供了一个线程安全的存储机制,用于任务之间共享数据和通信。

API 参考

Plan 创建

typescript
class Plan {
  constructor(id: string) {
    // 初始化计划
  }
}

创建一个新的 Plan 实例,需要传入plan_id,如果此id已经存在,会返回已存在的Plan。

typescript
const plan = new Plan(plan_id: string);

类型定义

typescript
/**
 * 计划或任务的状态
 */
export type PlanStatus =
  | 'created' //初始状态
  | 'running' //任务执行中
  | 'paused' //任务暂停
  | 'completed' //任务成功完成
  | 'failed' //任务执行失败
  | 'destroyed' //任务已终止
  | 'unknown';

/**
 * Plan 类
 */
export declare class Plan {
  /**
   * @param plan_id - 计划ID
   */
  constructor(plan_id: string);

  /**
   * 订阅计划
   * @param key - 要订阅的键
   * @param subscribe_fn - 计划变更时执行的函数
   * @param subscribe_args - 传递给订阅函数的参数,The rest of the arguments are the arguments to the process
   */
  Subscribe(key: string, subscribe_fn: string, ...subscribe_args: any[]): void;

  /**
   * 添加任务到计划
   *
   * will Trigger the TaskStarted signal "TaskStarted"
   * @param task_id - 任务ID
   * @param order - 任务顺序,同一个order的任务会并行执行
   * @param task_process - 要执行的任务处理函数或是处理器名称,默认情况下只支持处理器名称,如果需要直接传入函数,需要定制开发。
   * @param task_args - 传递给任务的参数
   */
  Add(
    task_id: string,
    order: number,
    task_process: string,
    ...task_args: any[]
  ): void;

  /**
   * 同步运行计划
   */
  Run(): void;

  /**
   * 释放计划,清空共享空间
   */
  Release(): void;

  /**
   * 获取计划状态和每个任务的状态
   */
  Status(): {
    plan: PlanStatus;
    tasks: Record<string, PlanStatus>;
  };

  /**
   * 获取任务状态
   */
  TaskStatus(task_id: string): PlanStatus;

  /**
   * 获取或设置任务数据,如果未提供数据则返回当前数据,否则设置数据并返回之前的数据
   */
  TaskData(task_id: string, data?: any): any;

  /**
   * 从共享空间获取值
   */
  Get(key: string): any;

  /**
   * 在共享空间中设置值,如果有订阅相关的Key,会触发相关的subscribe事件。
   */
  Set(key: string, value: any): void;

  /**
   * 从共享空间删除值
   */
  Del(key: string): void;

  /**
   * 清空共享空间
   */
  Clear(): void;
}

示例

typescript
import { Plan, time } from '@yao/runtime';

function Test() {
  const namespace = 'scripts.runtime.api.plan';

  //same id with same namespace,will be the same plan
  //同一个id,多次new Plan,会返回同一个plan
  const plan = new Plan('test-plan');
  // 订阅任务的变化
  plan.Subscribe('TaskStarted', `${namespace}.TaskStarted`, 'foo');
  plan.Subscribe('TaskCompleted', `${namespace}.TaskCompleted`, 'foo');
  // 订阅共享空间的变化
  plan.Subscribe('some-key', `${namespace}.SomeKey`, 'bar');

  plan.Add('task-1', 1, `${namespace}.Task1`, 'foo1');
  plan.Add('task-2', 1, `${namespace}.Task2`, 'foo2');
  plan.Add('task-3', 2, `${namespace}.Task3`, 'foo3');
  plan.Add('task-4', 2, `${namespace}.Task4`, 'foo4');
  // 运行计划
  plan.Run();
  // 释放计划,资源,也会触发subscribe事件。
  plan.Release();
  return 'Done';
}

function Task1(plan_id: string, task_id: string, foo: string) {
  const plan = new Plan(plan_id);

  // 更新共享数据,will trigger the SomeKey signal
  plan.Set('some-key', `foo-${foo}`);
  time.Sleep(200);
  const ts = new Date().getTime();
  return ts;
}

function Task2(plan_id: string, task_id: string, foo: string) {
  time.Sleep(300);
  const ts = new Date().getTime();
  return ts;
}

function Task3(plan_id: string, task_id: string, foo: string) {
  const plan = new Plan(plan_id);
  const some = plan.Get('some-key');

  // 更新共享数据
  plan.Set('some-key', `bar-${foo}`);
  time.Sleep(400);
  const ts = new Date().getTime();
  return { ts: ts, shared: some };
}

function Task4(plan_id: string, task_id: string, foo: string) {
  const plan = new Plan(plan_id);
  time.Sleep(500);
  const some = plan.Get('some-key');
  const ts = new Date().getTime();
  return { ts: ts, shared: some };
}

function SomeKey(plan_id: string, key: string, data: any, foo: string) {
  const plan = new Plan(plan_id);
  const ts = new Date().getTime();
  console.log(`SomeKey ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`);
  console.log(plan.Status());
}

function TaskStarted(plan_id: string, key: string, data: any, foo: string) {
  const ts = new Date().getTime();
  console.log(
    `TaskStarted ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`
  );
}

function TaskCompleted(plan_id: string, key: string, data: any, foo: string) {
  const ts = new Date().getTime();
  console.log(
    `TaskCompleted ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`
  );
}

事件触发

内置事件,会在任务状态发生变化时自动触发。在使用Plan时,不要在Set函数中使用以下Key参数。

  • TaskCompleted:当任务完成时触发
  • TaskError:当任务发生错误时触发
  • TaskStarted:当任务被添加到计划时触发
  • Released:当共享空间中的所有数据被删除时触发

注意

计划组件在js环境中与golang环境暴露的接口不同:

  • js环境,没有stop,pause,resume等方法,只能等待任务完成。
    • plan.Run() Run the plan
    • plan.Add() Add a task to the plan
    • plan.Status() Get the status of the plan and each task
    • plan.Release() Release the task and releases its resources
    • plan.TaskStatus() Get the status of a task
    • plan.TaskData() Get or set the data of a task
    • plan.Subscribe() Subscribe to a key in the shared space
    • plan.Get() Get a value from the shared space
    • plan.Set() Set a value in the shared space
    • plan.Del() Delete a value from the shared space
    • plan.Clear() Clear the shared space
  • golang环境,更加丰富的功能,可以暂停,恢复,停止等。
    • plan.NewPlan() NewPlan creates a new Plan instance
    • plan.AddTask() AddTask adds a new task to the plan
    • plan.RemoveTask() RemoveTask removes a task from the plan
    • plan.Start() Start begins execution of the plan
    • plan.Pause() Pause temporarily halts execution of the plan
    • plan.Resume() Resume continues execution of a paused plan
    • plan.Stop() Stop terminates execution of the plan
    • plan.Rlease() Release the task and releases its resources
    • plan.GetStatus() GetStatus returns the current status of the plan and its tasks
    • plan.Trigger() Trigger triggers an event on the plan
    • plan.GetTaskStatus() GetTaskStatus returns the status of a specific task
    • plan.GetTaskData() GetTaskData returns the data associated with a specific task
    • share.Set stores a value in the shared space
    • share.Get retrieves a value from the shared space
    • share.Delete deletes a value from the shared space
    • share.ClearNotify ClearNotify removes all values from the shared space and notifies subscribers
    • share.Clear removes all values from the shared space
    • share.Subscribe subscribes to changes in the shared space
    • share.Unsubscribe unsubscribes from changes in the shared space