适配器

Drain Pipeline

批处理事件,在失败时重试,并通过共享的 drain pipeline 防止缓冲区溢出。支持多适配器扇出。

在生产环境中,每条日志事件发送一个 HTTP 请求是浪费资源的。drain pipeline 会将事件缓冲起来,批量发送,在暂时性故障时进行重试,并在缓冲区溢出时丢弃最旧的事件。

快速开始

server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

export default defineNitroPlugin((nitroApp) => {
  const pipeline = createDrainPipeline<DrainContext>()
  const drain = pipeline(createAxiomDrain())

  nitroApp.hooks.hook('evlog:drain', drain)
  nitroApp.hooks.hook('close', () => drain.flush())
})
始终在服务器关闭时调用 drain.flush(),以确保在进程退出前发送的缓冲事件已被发送。

工作原理

  1. 事件通过 evlog:drain 钩子进入内存缓冲区。
  2. 当达到 批处理大小间隔时间 到期时(先满足哪个就刷新哪个),刷新一个批次。
  3. 如果 drain 函数失败,将使用配置的 退避策略 重试该批次。
  4. 如果所有重试耗尽,将调用 onDropped 并传入丢失的事件。
  5. 如果缓冲区超过 maxBufferSize,将丢弃最旧的事件以防止内存泄漏。

配置

server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

export default defineNitroPlugin((nitroApp) => {
  const pipeline = createDrainPipeline<DrainContext>({
    batch: {
      size: 50,          // 每 50 个事件刷新一次
      intervalMs: 5000,  // 或每 5 秒刷新一次,取先到达者
    },
    retry: {
      maxAttempts: 3,
      backoff: 'exponential',
      initialDelayMs: 1000,
      maxDelayMs: 30000,
    },
    maxBufferSize: 1000,
    onDropped: (events, error) => {
      console.error(`[evlog] 已丢弃 ${events.length} 个事件:`, error?.message)
    },
  })

  const drain = pipeline(createAxiomDrain())

  nitroApp.hooks.hook('evlog:drain', drain)
  nitroApp.hooks.hook('close', () => drain.flush())
})

选项参考

选项默认值描述
batch.size50每批次的最大事件数
batch.intervalMs5000刷新部分批次前的最大时间(毫秒)
retry.maxAttempts3包括首次尝试在内的总尝试次数
retry.backoff'exponential''exponential' | 'linear' | 'fixed'
retry.initialDelayMs1000首次重试的基础延迟
retry.maxDelayMs30000任何重试延迟的上限
maxBufferSize1000缓冲事件的最大数量,超过后将丢弃最旧事件
onDropped-事件被丢弃(溢出或重试耗尽)时的回调函数

退避策略

策略延迟模式使用场景
exponential1s, 2s, 4s, 8s...默认。适用于暂时性故障,可能需要时间恢复
linear1s, 2s, 3s, 4s...延迟增长可预测
fixed1s, 1s, 1s, 1s...每次延迟相同。适用于速率限制的 API

返回的 Drain 函数

pipeline(drain) 返回的函数与钩子兼容,并暴露了以下属性:

属性类型描述
drain(ctx)(ctx: T) => void将单个事件推入缓冲区
drain.flush()() => Promise<void>强制刷新所有缓冲事件
drain.pendingnumber当前缓冲中的事件数量

多目的地

使用单个 pipeline 包装多个适配器:

server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createOTLPDrain } from 'evlog/otlp'

export default defineNitroPlugin((nitroApp) => {
  const axiom = createAxiomDrain()
  const otlp = createOTLPDrain()

  const pipeline = createDrainPipeline<DrainContext>()
  const drain = pipeline(async (batch) => {
    await Promise.allSettled([axiom(batch), otlp(batch)])
  })

  nitroApp.hooks.hook('evlog:drain', drain)
  nitroApp.hooks.hook('close', () => drain.flush())
})

自定义 Drain 函数

你无需使用适配器。可以传递任何接受批次的异步函数:

server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'

export default defineNitroPlugin((nitroApp) => {
  const pipeline = createDrainPipeline<DrainContext>({
    batch: { size: 100 },
  })

  const drain = pipeline(async (batch) => {
    await fetch('https://your-service.com/logs', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(batch.map(ctx => ctx.event)),
    })
  })

  nitroApp.hooks.hook('evlog:drain', drain)
  nitroApp.hooks.hook('close', () => drain.flush())
})

独立使用(无需 Nitro)

该 pipeline 可在 Nitro 之外使用。可以在 initLogger 中使用 drain 选项来连接:

index.ts
import type { DrainContext } from 'evlog'
import { initLogger, log } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 25 } })
const drain = pipeline(createAxiomDrain())

initLogger({ drain })

log.info({ action: 'started' }) // 被批处理并发送

// 退出前刷新
await drain.flush()
请查看完整的 bun-script 示例 获取完整的可运行脚本。
请查看 Next.js 指南 获取可运行的实现。

下一步