在生产环境中,每条日志事件发送一个 HTTP 请求是浪费资源的。drain pipeline 会将事件缓冲起来,批量发送,在暂时性故障时进行重试,并在缓冲区溢出时丢弃最旧的事件。
快速开始
server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
始终在服务器关闭时调用
drain.flush(),以确保在进程退出前发送的缓冲事件已被发送。工作原理
- 事件通过
evlog:drain钩子进入内存缓冲区。 - 当达到 批处理大小 或 间隔时间 到期时(先满足哪个就刷新哪个),刷新一个批次。
- 如果 drain 函数失败,将使用配置的 退避策略 重试该批次。
- 如果所有重试耗尽,将调用
onDropped并传入丢失的事件。 - 如果缓冲区超过
maxBufferSize,将丢弃最旧的事件以防止内存泄漏。
配置
server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>({
batch: {
size: 50, // 每 50 个事件刷新一次
intervalMs: 5000, // 或每 5 秒刷新一次,取先到达者
},
retry: {
maxAttempts: 3,
backoff: 'exponential',
initialDelayMs: 1000,
maxDelayMs: 30000,
},
maxBufferSize: 1000,
onDropped: (events, error) => {
console.error(`[evlog] 已丢弃 ${events.length} 个事件:`, error?.message)
},
})
const drain = pipeline(createAxiomDrain())
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
选项参考
| 选项 | 默认值 | 描述 |
|---|---|---|
batch.size | 50 | 每批次的最大事件数 |
batch.intervalMs | 5000 | 刷新部分批次前的最大时间(毫秒) |
retry.maxAttempts | 3 | 包括首次尝试在内的总尝试次数 |
retry.backoff | 'exponential' | 'exponential' | 'linear' | 'fixed' |
retry.initialDelayMs | 1000 | 首次重试的基础延迟 |
retry.maxDelayMs | 30000 | 任何重试延迟的上限 |
maxBufferSize | 1000 | 缓冲事件的最大数量,超过后将丢弃最旧事件 |
onDropped | - | 事件被丢弃(溢出或重试耗尽)时的回调函数 |
退避策略
| 策略 | 延迟模式 | 使用场景 |
|---|---|---|
exponential | 1s, 2s, 4s, 8s... | 默认。适用于暂时性故障,可能需要时间恢复 |
linear | 1s, 2s, 3s, 4s... | 延迟增长可预测 |
fixed | 1s, 1s, 1s, 1s... | 每次延迟相同。适用于速率限制的 API |
返回的 Drain 函数
pipeline(drain) 返回的函数与钩子兼容,并暴露了以下属性:
| 属性 | 类型 | 描述 |
|---|---|---|
drain(ctx) | (ctx: T) => void | 将单个事件推入缓冲区 |
drain.flush() | () => Promise<void> | 强制刷新所有缓冲事件 |
drain.pending | number | 当前缓冲中的事件数量 |
多目的地
使用单个 pipeline 包装多个适配器:
server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createOTLPDrain } from 'evlog/otlp'
export default defineNitroPlugin((nitroApp) => {
const axiom = createAxiomDrain()
const otlp = createOTLPDrain()
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(async (batch) => {
await Promise.allSettled([axiom(batch), otlp(batch)])
})
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
自定义 Drain 函数
你无需使用适配器。可以传递任何接受批次的异步函数:
server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 100 },
})
const drain = pipeline(async (batch) => {
await fetch('https://your-service.com/logs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(batch.map(ctx => ctx.event)),
})
})
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
独立使用(无需 Nitro)
该 pipeline 可在 Nitro 之外使用。可以在 initLogger 中使用 drain 选项来连接:
index.ts
import type { DrainContext } from 'evlog'
import { initLogger, log } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 25 } })
const drain = pipeline(createAxiomDrain())
initLogger({ drain })
log.info({ action: 'started' }) // 被批处理并发送
// 退出前刷新
await drain.flush()
请查看完整的 bun-script 示例 获取完整的可运行脚本。
请查看 Next.js 指南 获取可运行的实现。