使用任务
使用异步任务来执行耗时的处理操作。
异步任务的工作机制如下:在程序内部维护了一个可重复调用的任务池,当向任务池中加入新的任务时,并不会马上执行,而是会等待有空闲的处理器才会执行。避免了大量并发处理导致服务器资源短缺。
https://yaoapps.com/doc/手册/Widgets/Task
操作
定义任务,在应用tasks
目录创建任务定义配置文件。
- name 任务名称
- worker_nums 指定任务池中工作处理器的数量
- attempts 失败重试次数
- attempt_after 重试间隔
- timeout 超时时间
- process 该 task 绑定的处理器
- next 生成任务唯一 id,可选,需要返回一个整型的数字,用来生成任务的标识,如果没有传值,内部会根据作业队列自动的生成 id,注意官方版本有一个 bug,内部 ID 无法自动增长
- add 添加任务时触发的方法,可选
- success 任务处理成功后触发方法,可选
- error 任务失败后触发方法,可选
- progress 任务处理中调用,可选
tasks/test.yao
json
{
"name": "测试task",
"worker_nums": 10,
"attempts": 3,
"attempt_after": 200,
"timeout": 2,
"size": 1000,
"process": "scripts.task.Send", //必须的
"event": {
"next": "scripts.task.NextID", //需要返回一个整型的数字,用来生成任务的标识,返回值一定要保持唯一,要不就留空
"add": "scripts.task.OnAdd",
"success": "scripts.task.OnSuccess",
"error": "scripts.task.OnError",
"progress": "scripts.task.OnProgress"
}
}
js
// 如果是在响应api请求时,不要在脚本里使用全局变量,因为不同的会话中,脚本会初始化,id并不会增长
// 可以考虑使用session保存全局唯一的id
var id = 1024;
/**
* 任务标识生成器
* Generate job id,需要返回一个整型的数字,用来生成任务的标识,一定要保证ID的唯一性
* @returns
*/
function NextID() {
id = id + 1;
console.log(`NextID: ${id}`);
return id;
}
/**
* 任务绑定的处理器,
* @param {integer} task_id 作业的id,
* @param {any} args 任务的参数,可以有多个,由tasks.xxx.add处理器传入。
*
*/
function Send(task_id, ...args) {
console.log(args);
// do the job
const current = 1;
const totla = 100;
// 在任务处理器内部过程中调用,汇报任务处理进展,在一个长时间运行的作业中,这个是必要的。
Progress(
'tasks.task.process',
task_id,
current,
total,
fmt.Sprintf('Progress %v/%v', current, total),
);
return {
message: 'ok',
};
}
/**
* OnAdd add event
* @param {*} id 任务id
*/
function OnAdd(id) {
log.Error('进入add');
console.log(`OnAdd: #${id}`);
}
/**
* OnProgress
* @param {integer} id task id,任务ID
* @param {integer} current
* @param {integer} total
* @param {string} message
*/
function OnProgress(id, current, total, message) {
console.log(`OnProgress: #${id} ${message} ${current}/${total} `);
}
/**
* OnSuccess
* @param {integer} id task id,任务ID
* @param {any} res
*/
function OnSuccess(id, res) {
console.log(`OnSuccess: #${id} ${JSON.stringify(res)}`);
}
/**
* OnError
* @param {integer} id task id,任务ID
* @param {err} error
*/
function OnError(id, err) {
console.log(`OnError: #${id} ${err}`);
}
工作原理
当定义 task 后,yao 框架会维护一个作业池。
处理器:
tasks.[task_id].add
增加一个任务,增加任务时可以传入任务处理器需要的参数,如果增加成功返回 task_id,失败返回 0。tasks.[task_id].progress
任务进度反馈处理器,在任务处理器内部调用,在处理器内部主动调用,除了会更新 job 内部的状态,还会回调 event.process 事件
js
// 在任务处理器内部过程中调用
Progress(
'tasks.task.process',
task_id,
current,
total,
fmt.Sprintf('Progress %v/%v', current, total),
);
tasks.[task_id].get
根据任务 ID,获取当前任务的状态,在任务外部主动调用
获取 job 的状态
js
const job = Progress('tasks.task.get', task_id);
return {
id: job_id,
status: '', //"WAITING"/"RUNNING"/"SUCCESS"/"FAILURE"
current: job.curr, //由处理器`tasks.[task_id].progress` 更新
total: job.total, //由处理器`tasks.[task_id].progress` 更新
message: job.message, //由处理器`tasks.[task_id].progress` 更新
response: job.response,
};
调用任务时,是调用tasks.[task_id].add
处理器方法,并且传入业务处理器需要参数。这里的 task_id 是 task 的定义标识,而标识即是定义的 task 配置文件的文件名。
任务会保存到缓存中,并等待处理。
js
function task() {
for (i = 1; i < 100; i++) {
//
const task_id = Process('tasks.test.Add', '进入任务' + i);
if (task_id == 0) {
console.log('add task failed');
}
}
}