Skip to content

异步任务

概述

异步任务是一种用于处理耗时操作、消耗大量资源或容易出错的逻辑的机制。它可以将这些操作从请求主流程中剥离出来,异步执行,从而提高系统的响应速度和稳定性。

使用场景

  • 耗时的数据处理操作
  • 资源密集型任务(如图片处理)
  • 第三方服务调用(如发送邮件)
  • 批量数据导入导出
  • 需要重试机制的操作

例如:新用户注册成功后发送欢迎邮件,可以将发送邮件的操作作为异步任务执行,这样可以让用户更快地得到注册响应,同时避免邮件发送失败影响主流程。

工作机制

异步任务的工作机制如下:

  1. 在程序内部维护一个可重复调用的任务池
  2. 当向任务池中加入新的任务时,任务会进入等待状态
  3. 系统会根据配置的工作进程数(worker_nums)来并发处理任务
  4. 任务执行过程中可以实时反馈进度
  5. 支持失败重试机制

任务配置

在应用的 tasks 目录下创建任务定义配置文件(如:tasks/example.yao):

json
{
  "name": "示例任务",
  "worker_nums": 10,
  "attempts": 3,
  "attempt_after": 200,
  "timeout": 2,
  "size": 1000,
  "process": "scripts.task.Send",
  "event": {
    "next": "scripts.task.NextID",
    "add": "scripts.task.OnAdd",
    "success": "scripts.task.OnSuccess",
    "error": "scripts.task.OnError",
    "progress": "scripts.task.OnProgress"
  }
}

配置参数说明

参数说明是否必填
name任务名称
worker_nums任务池中工作进程数量
attempts失败重试次数
attempt_after重试间隔(毫秒)
timeout任务超时时间(秒)
size任务队列大小
process任务处理器
event.next生成任务唯一ID的处理器
event.add添加任务时的回调处理器
event.success任务成功时的回调处理器
event.error任务失败时的回调处理器
event.progress任务进度更新的回调处理器

事件处理器说明

event.next

  • 触发时机:每次添加新任务时,用于生成任务的唯一标识
  • 参数:无
  • 返回值:number 类型,表示任务ID
  • 使用场景
    • 需要自定义任务ID生成规则
    • 需要保证任务ID的全局唯一性
    • 需要使用特定格式的任务ID

event.add

  • 触发时机:任务被成功添加到任务池后
  • 参数
    • id: number - 任务ID
  • 使用场景
    • 记录任务创建日志
    • 更新任务相关的业务状态
    • 发送任务创建通知

event.success

  • 触发时机:任务执行成功后
  • 参数
    • id: number - 任务ID
    • res: any - 任务执行结果
  • 使用场景
    • 处理任务执行结果
    • 更新相关业务状态
    • 发送成功通知
    • 触发后续业务流程

event.error

  • 触发时机:任务执行失败时(包括重试后仍然失败)
  • 参数
    • id: number - 任务ID
    • err: Error - 错误信息
  • 使用场景
    • 记录错误日志
    • 发送失败告警
    • 执行补偿逻辑
    • 更新任务状态

event.progress

  • 触发时机:任务执行过程中调用 Progress 处理器时
  • 参数
    • id: number - 任务ID
    • current: number - 当前进度
    • total: number - 总进度
    • message: string - 进度信息
  • 使用场景
    • 更新任务进度显示
    • 记录执行过程日志
    • 实时反馈任务状态

事件处理器示例

javascript
/**
 * 生成任务唯一ID
 * @returns {number} 任务ID
 */
function NextID() {
  // 使用时间戳和随机数生成唯一ID
  const timestamp = Date.now();
  const random = Math.floor(Math.random() * 1000);
  return parseInt(`${timestamp}${random}`);
}

/**
 * 任务添加时的回调
 * @param {number} id 任务ID
 */
function OnAdd(id) {
  // 记录任务创建日志
  console.log(`Task #${id} created at ${new Date().toISOString()}`);

  // 更新任务状态
  Process('models.task.save', {
    id: id,
    status: 'created',
    created_at: new Date().toISOString()
  });
}

/**
 * 任务成功回调
 * @param {number} id 任务ID
 * @param {any} res 任务结果
 */
function OnSuccess(id, res) {
  // 更新业务状态
  Process('models.task.save', {
    id: id,
    status: 'completed',
    result: res,
    completed_at: new Date().toISOString()
  });

  // 发送成功通知
  Process('scripts.notification.send', {
    type: 'task_complete',
    task_id: id,
    result: res
  });
}

/**
 * 任务失败回调
 * @param {number} id 任务ID
 * @param {Error} err 错误信息
 */
function OnError(id, err) {
  // 记录错误日志
  console.error(`Task #${id} failed: ${err.message}`);

  // 更新任务状态
  Process('models.task.save', {
    id: id,
    status: 'failed',
    error: err.message,
    failed_at: new Date().toISOString()
  });

  // 发送失败告警
  Process('scripts.notification.send', {
    type: 'task_error',
    task_id: id,
    error: err.message,
    level: 'error'
  });
}

/**
 * 任务进度更新回调
 * @param {number} id 任务ID
 * @param {number} current 当前进度
 * @param {number} total 总进度
 * @param {string} message 进度信息
 */
function OnProgress(id, current, total, message) {
  // 计算进度百分比
  const percentage = Math.floor((current / total) * 100);

  // 更新任务进度
  Process('models.task.save', {
    id: id,
    progress: percentage,
    progress_message: message,
    updated_at: new Date().toISOString()
  });

  // 记录进度日志
  if (percentage % 10 === 0) {
    // 每完成10%记录一次
    console.log(`Task #${id}: ${percentage}% completed - ${message}`);
  }
}

任务处理器

scripts 目录下创建任务处理脚本(如:scripts/task.js):

javascript
// 如果是在响应api请求时,不要在脚本里使用全局变量
// 可以考虑使用session保存全局唯一的id
var id = 1024;

/**
 * 生成任务唯一ID
 * @returns {number} 任务ID
 */
function NextID() {
  id = id + 1;
  console.log(`NextID: ${id}`);
  return id;
}

/**
 * 任务处理器
 * @param {number} task_id 任务ID
 * @param {...any} args 任务参数
 */
function Send(task_id, ...args) {
  console.log(args);

  // 任务处理逻辑
  const current = 1;
  const total = 100;

  // 更新任务进度
  Progress(
    'tasks.task.process',
    task_id,
    current,
    total,
    `Progress ${current}/${total}`
  );

  return { message: 'ok' };
}

/**
 * 任务添加时的回调
 * @param {number} id 任务ID
 */
function OnAdd(id) {
  console.log(`OnAdd: #${id}`);
}

/**
 * 任务进度更新回调
 * @param {number} id 任务ID
 * @param {number} current 当前进度
 * @param {number} total 总进度
 * @param {string} message 进度信息
 */
function OnProgress(id, current, total, message) {
  console.log(`OnProgress: #${id} ${message} ${current}/${total}`);
}

/**
 * 任务成功回调
 * @param {number} id 任务ID
 * @param {any} res 任务结果
 */
function OnSuccess(id, res) {
  console.log(`OnSuccess: #${id} ${JSON.stringify(res)}`);
}

/**
 * 任务失败回调
 * @param {number} id 任务ID
 * @param {Error} err 错误信息
 */
function OnError(id, err) {
  console.log(`OnError: #${id} ${err}`);
}

任务处理流程

  1. 添加任务

    • 使用 tasks.[task_id].add 处理器添加任务
    • 可传入任务处理器所需的参数
    • 返回任务ID(成功)或0(失败)
  2. 任务状态更新

    • 使用 tasks.[task_id].progress 处理器更新任务进度
    • 在任务处理器内部调用
    • 会触发 event.progress 回调
  3. 查询任务状态

    • 使用 tasks.[task_id].get 处理器获取任务状态
    • 返回任务的详细信息:
      javascript
      {
        id: task_id,
        status: '', // "WAITING"/"RUNNING"/"SUCCESS"/"FAILURE"
        current: number, // 当前进度
        total: number, // 总进度
        message: string, // 进度信息
        response: any // 任务结果
      }

使用示例

javascript
function task() {
  for (let i = 1; i < 100; i++) {
    const task_id = Process('tasks.test.add', '任务-' + i);
    if (task_id === 0) {
      console.log('添加任务失败');
      continue;
    }
    console.log(`添加任务成功:${task_id}`);
  }
}

最佳实践

  1. 任务设计

    • 将耗时操作从主流程中剥离
    • 任务应该是独立且原子的
    • 考虑任务的幂等性
  2. 资源管理

    • 根据服务器资源合理设置 worker_nums
    • 设置合适的任务超时时间
    • 控制任务队列大小
  3. 错误处理

    • 实现完善的重试机制
    • 记录详细的错误日志
    • 设置合理的重试间隔
  4. 监控告警

    • 监控任务队列大小
    • 监控任务执行时间
    • 监控失败率和重试情况