异步任务
概述
异步任务是一种用于处理耗时操作、消耗大量资源或容易出错的逻辑的机制。它可以将这些操作从请求主流程中剥离出来,异步执行,从而提高系统的响应速度和稳定性。
使用场景
- 耗时的数据处理操作
- 资源密集型任务(如图片处理)
- 第三方服务调用(如发送邮件)
- 批量数据导入导出
- 需要重试机制的操作
例如:新用户注册成功后发送欢迎邮件,可以将发送邮件的操作作为异步任务执行,这样可以让用户更快地得到注册响应,同时避免邮件发送失败影响主流程。
工作机制
异步任务的工作机制如下:
- 在程序内部维护一个可重复调用的任务池
- 当向任务池中加入新的任务时,任务会进入等待状态
- 系统会根据配置的工作进程数(worker_nums)来并发处理任务
- 任务执行过程中可以实时反馈进度
- 支持失败重试机制
任务配置
在应用的 tasks
目录下创建任务定义配置文件(如:tasks/example.yao
):
json
{
"name": "示例任务",
"worker_nums": 10,
"attempts": 3,
"attempt_after": 200,
"timeout": 2,
"size": 1000,
"process": "scripts.task.Send",
"event": {
"next": "scripts.task.NextID",
"add": "scripts.task.OnAdd",
"success": "scripts.task.OnSuccess",
"error": "scripts.task.OnError",
"progress": "scripts.task.OnProgress"
}
}
配置参数说明
参数 | 说明 | 是否必填 |
---|---|---|
name | 任务名称 | 是 |
worker_nums | 任务池中工作进程数量 | 是 |
attempts | 失败重试次数 | 是 |
attempt_after | 重试间隔(毫秒) | 是 |
timeout | 任务超时时间(秒) | 是 |
size | 任务队列大小 | 是 |
process | 任务处理器 | 是 |
event.next | 生成任务唯一ID的处理器 | 否 |
event.add | 添加任务时的回调处理器 | 否 |
event.success | 任务成功时的回调处理器 | 否 |
event.error | 任务失败时的回调处理器 | 否 |
event.progress | 任务进度更新的回调处理器 | 否 |
事件处理器说明
event.next
- 触发时机:每次添加新任务时,用于生成任务的唯一标识
- 参数:无
- 返回值:number 类型,表示任务ID
- 使用场景:
- 需要自定义任务ID生成规则
- 需要保证任务ID的全局唯一性
- 需要使用特定格式的任务ID
event.add
- 触发时机:任务被成功添加到任务池后
- 参数:
- id: number - 任务ID
- 使用场景:
- 记录任务创建日志
- 更新任务相关的业务状态
- 发送任务创建通知
event.success
- 触发时机:任务执行成功后
- 参数:
- id: number - 任务ID
- res: any - 任务执行结果
- 使用场景:
- 处理任务执行结果
- 更新相关业务状态
- 发送成功通知
- 触发后续业务流程
event.error
- 触发时机:任务执行失败时(包括重试后仍然失败)
- 参数:
- id: number - 任务ID
- err: Error - 错误信息
- 使用场景:
- 记录错误日志
- 发送失败告警
- 执行补偿逻辑
- 更新任务状态
event.progress
- 触发时机:任务执行过程中调用 Progress 处理器时
- 参数:
- id: number - 任务ID
- current: number - 当前进度
- total: number - 总进度
- message: string - 进度信息
- 使用场景:
- 更新任务进度显示
- 记录执行过程日志
- 实时反馈任务状态
事件处理器示例
javascript
/**
* 生成任务唯一ID
* @returns {number} 任务ID
*/
function NextID() {
// 使用时间戳和随机数生成唯一ID
const timestamp = Date.now();
const random = Math.floor(Math.random() * 1000);
return parseInt(`${timestamp}${random}`);
}
/**
* 任务添加时的回调
* @param {number} id 任务ID
*/
function OnAdd(id) {
// 记录任务创建日志
console.log(`Task #${id} created at ${new Date().toISOString()}`);
// 更新任务状态
Process('models.task.save', {
id: id,
status: 'created',
created_at: new Date().toISOString()
});
}
/**
* 任务成功回调
* @param {number} id 任务ID
* @param {any} res 任务结果
*/
function OnSuccess(id, res) {
// 更新业务状态
Process('models.task.save', {
id: id,
status: 'completed',
result: res,
completed_at: new Date().toISOString()
});
// 发送成功通知
Process('scripts.notification.send', {
type: 'task_complete',
task_id: id,
result: res
});
}
/**
* 任务失败回调
* @param {number} id 任务ID
* @param {Error} err 错误信息
*/
function OnError(id, err) {
// 记录错误日志
console.error(`Task #${id} failed: ${err.message}`);
// 更新任务状态
Process('models.task.save', {
id: id,
status: 'failed',
error: err.message,
failed_at: new Date().toISOString()
});
// 发送失败告警
Process('scripts.notification.send', {
type: 'task_error',
task_id: id,
error: err.message,
level: 'error'
});
}
/**
* 任务进度更新回调
* @param {number} id 任务ID
* @param {number} current 当前进度
* @param {number} total 总进度
* @param {string} message 进度信息
*/
function OnProgress(id, current, total, message) {
// 计算进度百分比
const percentage = Math.floor((current / total) * 100);
// 更新任务进度
Process('models.task.save', {
id: id,
progress: percentage,
progress_message: message,
updated_at: new Date().toISOString()
});
// 记录进度日志
if (percentage % 10 === 0) {
// 每完成10%记录一次
console.log(`Task #${id}: ${percentage}% completed - ${message}`);
}
}
任务处理器
在 scripts
目录下创建任务处理脚本(如:scripts/task.js
):
javascript
// 如果是在响应api请求时,不要在脚本里使用全局变量
// 可以考虑使用session保存全局唯一的id
var id = 1024;
/**
* 生成任务唯一ID
* @returns {number} 任务ID
*/
function NextID() {
id = id + 1;
console.log(`NextID: ${id}`);
return id;
}
/**
* 任务处理器
* @param {number} task_id 任务ID
* @param {...any} args 任务参数
*/
function Send(task_id, ...args) {
console.log(args);
// 任务处理逻辑
const current = 1;
const total = 100;
// 更新任务进度
Progress(
'tasks.task.process',
task_id,
current,
total,
`Progress ${current}/${total}`
);
return { message: 'ok' };
}
/**
* 任务添加时的回调
* @param {number} id 任务ID
*/
function OnAdd(id) {
console.log(`OnAdd: #${id}`);
}
/**
* 任务进度更新回调
* @param {number} id 任务ID
* @param {number} current 当前进度
* @param {number} total 总进度
* @param {string} message 进度信息
*/
function OnProgress(id, current, total, message) {
console.log(`OnProgress: #${id} ${message} ${current}/${total}`);
}
/**
* 任务成功回调
* @param {number} id 任务ID
* @param {any} res 任务结果
*/
function OnSuccess(id, res) {
console.log(`OnSuccess: #${id} ${JSON.stringify(res)}`);
}
/**
* 任务失败回调
* @param {number} id 任务ID
* @param {Error} err 错误信息
*/
function OnError(id, err) {
console.log(`OnError: #${id} ${err}`);
}
任务处理流程
添加任务
- 使用
tasks.[task_id].add
处理器添加任务 - 可传入任务处理器所需的参数
- 返回任务ID(成功)或0(失败)
- 使用
任务状态更新
- 使用
tasks.[task_id].progress
处理器更新任务进度 - 在任务处理器内部调用
- 会触发 event.progress 回调
- 使用
查询任务状态
- 使用
tasks.[task_id].get
处理器获取任务状态 - 返回任务的详细信息:javascript
{ id: task_id, status: '', // "WAITING"/"RUNNING"/"SUCCESS"/"FAILURE" current: number, // 当前进度 total: number, // 总进度 message: string, // 进度信息 response: any // 任务结果 }
- 使用
使用示例
javascript
function task() {
for (let i = 1; i < 100; i++) {
const task_id = Process('tasks.test.add', '任务-' + i);
if (task_id === 0) {
console.log('添加任务失败');
continue;
}
console.log(`添加任务成功:${task_id}`);
}
}
最佳实践
任务设计
- 将耗时操作从主流程中剥离
- 任务应该是独立且原子的
- 考虑任务的幂等性
资源管理
- 根据服务器资源合理设置 worker_nums
- 设置合适的任务超时时间
- 控制任务队列大小
错误处理
- 实现完善的重试机制
- 记录详细的错误日志
- 设置合理的重试间隔
监控告警
- 监控任务队列大小
- 监控任务执行时间
- 监控失败率和重试情况