计划组件
js Plan 对象 提供了灵活的任务编排系统,具有共享状态管理和信号控制功能。
Plan对象提供了Js接口与GO lang接口。
特性
- 任务排序和并行执行,同一个order的任务会并行执行。
- 任务间共享状态管理
- 任务生命周期控制(暂停/恢复/停止)
- 基于信号的任务通信
- 可配置的状态检查间隔
- 线程安全操作
- 资源清理管理
核心概念
Plan
Plan 是一个具有共享状态空间的任务集合。它管理任务的执行顺序和生命周期。
Task
Task 是一个工作单元,具有以下特性:
- 可以访问共享状态
- 可以响应控制信号
- 可以与同一顺序的其他任务并行执行
- 可以报告其状态并存储任务特定数据
共享空间
共享空间提供了一个线程安全的存储机制,用于任务之间共享数据和通信。
API 参考
Plan 创建
typescript
class Plan {
constructor(id: string) {
// 初始化计划
}
}
创建一个新的 Plan 实例,需要传入plan_id,如果此id已经存在,会返回已存在的Plan。
typescript
const plan = new Plan(plan_id: string);
类型定义
typescript
/**
* 计划或任务的状态
*/
export type PlanStatus =
| 'created' //初始状态
| 'running' //任务执行中
| 'paused' //任务暂停
| 'completed' //任务成功完成
| 'failed' //任务执行失败
| 'destroyed' //任务已终止
| 'unknown';
/**
* Plan 类
*/
export declare class Plan {
/**
* @param plan_id - 计划ID
*/
constructor(plan_id: string);
/**
* 订阅计划
* @param key - 要订阅的键
* @param subscribe_fn - 计划变更时执行的函数
* @param subscribe_args - 传递给订阅函数的参数,The rest of the arguments are the arguments to the process
*/
Subscribe(key: string, subscribe_fn: string, ...subscribe_args: any[]): void;
/**
* 添加任务到计划
*
* will Trigger the TaskStarted signal "TaskStarted"
* @param task_id - 任务ID
* @param order - 任务顺序,同一个order的任务会并行执行
* @param task_process - 要执行的任务处理函数或是处理器名称,默认情况下只支持处理器名称,如果需要直接传入函数,需要定制开发。
* @param task_args - 传递给任务的参数
*/
Add(
task_id: string,
order: number,
task_process: string,
...task_args: any[]
): void;
/**
* 同步运行计划
*/
Run(): void;
/**
* 释放计划,清空共享空间
*/
Release(): void;
/**
* 获取计划状态和每个任务的状态
*/
Status(): {
plan: PlanStatus;
tasks: Record<string, PlanStatus>;
};
/**
* 获取任务状态
*/
TaskStatus(task_id: string): PlanStatus;
/**
* 获取或设置任务数据,如果未提供数据则返回当前数据,否则设置数据并返回之前的数据
*/
TaskData(task_id: string, data?: any): any;
/**
* 从共享空间获取值
*/
Get(key: string): any;
/**
* 在共享空间中设置值,如果有订阅相关的Key,会触发相关的subscribe事件。
*/
Set(key: string, value: any): void;
/**
* 从共享空间删除值
*/
Del(key: string): void;
/**
* 清空共享空间
*/
Clear(): void;
}
示例
typescript
import { Plan, time } from '@yao/runtime';
function Test() {
const namespace = 'scripts.runtime.api.plan';
//same id with same namespace,will be the same plan
//同一个id,多次new Plan,会返回同一个plan
const plan = new Plan('test-plan');
// 订阅任务的变化
plan.Subscribe('TaskStarted', `${namespace}.TaskStarted`, 'foo');
plan.Subscribe('TaskCompleted', `${namespace}.TaskCompleted`, 'foo');
// 订阅共享空间的变化
plan.Subscribe('some-key', `${namespace}.SomeKey`, 'bar');
plan.Add('task-1', 1, `${namespace}.Task1`, 'foo1');
plan.Add('task-2', 1, `${namespace}.Task2`, 'foo2');
plan.Add('task-3', 2, `${namespace}.Task3`, 'foo3');
plan.Add('task-4', 2, `${namespace}.Task4`, 'foo4');
// 运行计划
plan.Run();
// 释放计划,资源,也会触发subscribe事件。
plan.Release();
return 'Done';
}
function Task1(plan_id: string, task_id: string, foo: string) {
const plan = new Plan(plan_id);
// 更新共享数据,will trigger the SomeKey signal
plan.Set('some-key', `foo-${foo}`);
time.Sleep(200);
const ts = new Date().getTime();
return ts;
}
function Task2(plan_id: string, task_id: string, foo: string) {
time.Sleep(300);
const ts = new Date().getTime();
return ts;
}
function Task3(plan_id: string, task_id: string, foo: string) {
const plan = new Plan(plan_id);
const some = plan.Get('some-key');
// 更新共享数据
plan.Set('some-key', `bar-${foo}`);
time.Sleep(400);
const ts = new Date().getTime();
return { ts: ts, shared: some };
}
function Task4(plan_id: string, task_id: string, foo: string) {
const plan = new Plan(plan_id);
time.Sleep(500);
const some = plan.Get('some-key');
const ts = new Date().getTime();
return { ts: ts, shared: some };
}
function SomeKey(plan_id: string, key: string, data: any, foo: string) {
const plan = new Plan(plan_id);
const ts = new Date().getTime();
console.log(`SomeKey ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`);
console.log(plan.Status());
}
function TaskStarted(plan_id: string, key: string, data: any, foo: string) {
const ts = new Date().getTime();
console.log(
`TaskStarted ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`
);
}
function TaskCompleted(plan_id: string, key: string, data: any, foo: string) {
const ts = new Date().getTime();
console.log(
`TaskCompleted ${plan_id} ${key} ${JSON.stringify(data)} ${foo} ${ts}`
);
}
事件触发
内置事件,会在任务状态发生变化时自动触发。在使用Plan时,不要在Set函数中使用以下Key参数。
- TaskCompleted:当任务完成时触发
- TaskError:当任务发生错误时触发
- TaskStarted:当任务被添加到计划时触发
- Released:当共享空间中的所有数据被删除时触发
注意
计划组件在js环境中与golang环境暴露的接口不同:
- js环境,没有stop,pause,resume等方法,只能等待任务完成。
- plan.Run() Run the plan
- plan.Add() Add a task to the plan
- plan.Status() Get the status of the plan and each task
- plan.Release() Release the task and releases its resources
- plan.TaskStatus() Get the status of a task
- plan.TaskData() Get or set the data of a task
- plan.Subscribe() Subscribe to a key in the shared space
- plan.Get() Get a value from the shared space
- plan.Set() Set a value in the shared space
- plan.Del() Delete a value from the shared space
- plan.Clear() Clear the shared space
- golang环境,更加丰富的功能,可以暂停,恢复,停止等。
- plan.NewPlan() NewPlan creates a new Plan instance
- plan.AddTask() AddTask adds a new task to the plan
- plan.RemoveTask() RemoveTask removes a task from the plan
- plan.Start() Start begins execution of the plan
- plan.Pause() Pause temporarily halts execution of the plan
- plan.Resume() Resume continues execution of a paused plan
- plan.Stop() Stop terminates execution of the plan
- plan.Rlease() Release the task and releases its resources
- plan.GetStatus() GetStatus returns the current status of the plan and its tasks
- plan.Trigger() Trigger triggers an event on the plan
- plan.GetTaskStatus() GetTaskStatus returns the status of a specific task
- plan.GetTaskData() GetTaskData returns the data associated with a specific task
- share.Set stores a value in the shared space
- share.Get retrieves a value from the shared space
- share.Delete deletes a value from the shared space
- share.ClearNotify ClearNotify removes all values from the shared space and notifies subscribers
- share.Clear removes all values from the shared space
- share.Subscribe subscribes to changes in the shared space
- share.Unsubscribe unsubscribes from changes in the shared space