Skip to content

计划组件

Plan 提供了灵活的任务编排系统,具有共享状态管理和信号控制功能。

特性

  • 任务排序和并行执行
  • 任务间共享状态管理
  • 任务生命周期控制(暂停/恢复/停止)
  • 基于信号的任务通信
  • 可配置的状态检查间隔
  • 线程安全操作
  • 资源清理管理

核心概念

Plan

Plan 是一个具有共享状态空间的任务集合。它管理任务的执行顺序和生命周期。

Task

Task 是一个工作单元,具有以下特性:

  • 可以访问共享状态
  • 可以响应控制信号
  • 可以与同一顺序的其他任务并行执行
  • 可以报告其状态并存储任务特定数据

共享空间

共享空间提供了一个线程安全的存储机制,用于任务之间共享数据和通信。

API 参考

Plan 创建

typescript
class Plan {
  constructor(id: string) {
    // 初始化计划
  }
}

类型定义

typescript
/**
 * 计划或任务的状态
 */
export type PlanStatus =
  | 'created' //初始状态
  | 'running' //任务执行中
  | 'paused' //任务暂停
  | 'completed' //任务成功完成
  | 'failed' //任务执行失败
  | 'destroyed' //任务已终止
  | 'unknown';

/**
 * Plan 类
 */
export declare class Plan {
  /**
   * @param plan_id - 计划ID
   */
  constructor(plan_id: string);

  /**
   * 订阅计划
   * @param key - 要订阅的键
   * @param subscribe_fn - 计划变更时执行的函数
   * @param subscribe_args - 传递给订阅函数的参数,The rest of the arguments are the arguments to the process
   */
  Subscribe(key: string, subscribe_fn: string, ...subscribe_args: any[]): void;

  /**
   * 添加任务到计划
   *
   * will Trigger the TaskStarted signal "TaskStarted"
   * @param task_id - 任务ID
   * @param order - 任务顺序
   * @param task_process - 要执行的任务处理函数
   * @param task_args - 传递给任务的参数
   */
  Add(
    task_id: string,
    order: number,
    task_process: string,
    ...task_args: any[]
  ): void;

  /**
   * 同步运行计划
   */
  Run(): void;

  /**
   * 释放计划,清空共享空间
   */
  Release(): void;

  /**
   * 获取计划状态和每个任务的状态
   */
  Status(): {
    plan: PlanStatus;
    tasks: Record<string, PlanStatus>;
  };

  /**
   * 获取任务状态
   */
  TaskStatus(task_id: string): PlanStatus;

  /**
   * 获取或设置任务数据,如果未提供数据则返回当前数据,否则设置数据并返回之前的数据
   */
  TaskData(task_id: string, data?: any): any;

  /**
   * 从共享空间获取值
   */
  Get(key: string): any;

  /**
   * 在共享空间中设置值
   */
  Set(key: string, value: any): void;

  /**
   * 从共享空间删除值
   */
  Del(key: string): void;

  /**
   * 清空共享空间
   */
  Clear(): void;
}

示例

typescript
import { Plan, time } from '@yao/runtime';

function Test() {
  const namespace = 'scripts.runtime.api.plan';

  //same id with same namespace,will be the same plan
  //同一个id,多次new Plan,会返回同一个plan
  const plan = new Plan('test-plan');
  // 订阅任务的变化
  plan.Subscribe('TaskStarted', `${namespace}.TaskStarted`, 'foo');
  plan.Subscribe('TaskCompleted', `${namespace}.TaskCompleted`, 'foo');
  // 订阅共享空间的变化
  plan.Subscribe('some-key', `${namespace}.SomeKey`, 'bar');

  plan.Add('task-1', 1, `${namespace}.Task1`, 'foo1');
  plan.Add('task-2', 1, `${namespace}.Task2`, 'foo2');
  plan.Add('task-3', 2, `${namespace}.Task3`, 'foo3');
  plan.Add('task-4', 2, `${namespace}.Task4`, 'foo4');
  // 运行计划
  plan.Run();
  // 释放计划,资源,也会触发subscribe事件。
  plan.Release();
  return 'Done';
}

function Task1(plan_id: string, task_id: string, foo: string) {
  const plan = new Plan(plan_id);

  // 更新共享数据,will trigger the SomeKey signal
  plan.Set('some-key', `foo-${foo}`);
  time.Sleep(200);
  const ts = new Date().getTime();
  return ts;
}

function Task2(plan_id: string, task_id: string, foo: string) {
  time.Sleep(300);
  const ts = new Date().getTime();
  return ts;
}

function Task3(plan_id: string, task_id: string, foo: string) {
  const plan = new Plan(plan_id);
  const some = plan.Get('some-key');

  // 更新共享数据
  plan.Set('some-key', `bar-${foo}`);
  time.Sleep(400);
  const ts = new Date().getTime();
  return { ts: ts, shared: some };
}

function Task4(plan_id: string, task_id: string, foo: string) {
  const plan = new Plan(plan_id);
  time.Sleep(500);
  const some = plan.Get('some-key');
  const ts = new Date().getTime();
  return { ts: ts, shared: some };
}

function SomeKey(plan_id: string, key: string, data: any, foo: string) {
  const plan = new Plan(plan_id);
  const ts = new Date().getTime();
  console.log(`SomeKey ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`);
  console.log(plan.Status());
}

function TaskStarted(plan_id: string, key: string, data: any, foo: string) {
  const ts = new Date().getTime();
  console.log(
    `TaskStarted ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`
  );
}

function TaskCompleted(plan_id: string, key: string, data: any, foo: string) {
  const ts = new Date().getTime();
  console.log(
    `TaskCompleted ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`
  );
}

最佳实践

  1. 错误处理: 始终检查计划操作返回的错误。
  2. 信号处理: 在长时间运行的任务中实现适当的信号处理。
  3. 资源清理: 使用 stop() 正确清理资源。
  4. 上下文使用: 在任务实现中尊重上下文取消。
  5. 共享状态: 使用共享空间进行任务通信,而不是外部变量。

线程安全

计划组件设计为线程安全:

  • 所有计划操作都是同步的
  • 共享空间操作受互斥锁保护
  • 信号通道是缓冲的,以防止阻塞

事件触发

内置事件:

  • TaskCompleted:当任务完成时触发
  • TaskError:当任务发生错误时触发
  • TaskStarted:当任务被添加到计划时触发
  • Released:当共享空间中的所有数据被删除时触发