Drain Pipeline
在生产环境中,为每个发出的事件发送一个 HTTP 请求无法扩展。drain pipeline 会缓冲事件并批量发送,在临时失败时重试,缓冲区溢出时丢弃最旧的事件,并允许单个 drain 函数并行分发到多个目标。用于客户端日志的 HTTP browser drain 也由同一条流水线提供支持。
| 你想要… | 查看 |
|---|---|
| 将任意 drain 包装为批量 + 重试 + 缓冲 | 快速开始 |
| 将每个事件并行发送到多个目标 | Fanout |
| 将浏览器日志发送到你的服务器端点 | HTTP drain(浏览器到服务器) |
| 调整批量大小、重试策略、缓冲区大小 | 配置 |
添加 drain 流水线(批量 + 重试 + 分发)
快速开始
该流水线可包装任意 drain。具体接线取决于你的框架——选择与你一致的标签页;下面的所有其他示例都使用相同的结构。
// server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
export default defineNitroPlugin((nitroApp) => {
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
nitroApp.hooks.hook('evlog:drain', drain)
nitroApp.hooks.hook('close', () => drain.flush())
})
// lib/evlog.ts
import type { DrainContext } from 'evlog'
import { createEvlog } from 'evlog/next'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
export const { withEvlog, useLogger, log, createError } = createEvlog({
service: 'my-app',
drain,
})
export const flushEvlog = () => drain.flush()
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
app.use(evlog({ drain })) // Hono / Express / Elysia
// await app.register(evlog, { drain }) // Fastify
// EvlogModule.forRoot({ drain }) // NestJS
process.on('SIGTERM', () => drain.flush())
// index.ts — 纯 TypeScript / Bun / Node script
import type { DrainContext } from 'evlog'
import { initLogger } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>()
const drain = pipeline(createAxiomDrain())
initLogger({ drain })
await drain.flush() // 在退出前
drain.flush())。在 Nitro 中使用 close hook;在 standalone 脚本中在 process.exit 之前调用;在 serverless 运行时中使用 waitUntil(drain.flush())。工作原理
事件在到达 evlog:drain 时会被缓冲。当达到 batch.size 或 batch.intervalMs 到期时(以先发生者为准)就会批量刷新。失败时,会使用配置的退避策略重试同一批次;一旦 retry.maxAttempts 耗尽,就会以丢失的事件调用 onDropped。缓冲区由 maxBufferSize 限制——一旦满了,就会丢弃最旧的事件以保持内存占用平稳。
配置
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
const pipeline = createDrainPipeline<DrainContext>({
batch: {
size: 50, // 每 50 个事件刷新一次
intervalMs: 5000, // 或每 5 秒刷新一次,以先到者为准
},
retry: {
maxAttempts: 3,
backoff: 'exponential',
initialDelayMs: 1000,
maxDelayMs: 30000,
},
maxBufferSize: 1000,
onDropped: (events, error) => {
console.error(`[evlog] 丢弃了 ${events.length} 个事件:`, error?.message)
},
})
export const drain = pipeline(createAxiomDrain())
选项参考
| 选项 | 默认值 | 描述 |
|---|---|---|
batch.size | 50 | 每批最多事件数 |
batch.intervalMs | 5000 | 刷新部分批次前的最大时间(ms) |
retry.maxAttempts | 3 | 包括初始尝试在内的总尝试次数 |
retry.backoff | 'exponential' | 'exponential' | 'linear' | 'fixed' |
retry.initialDelayMs | 1000 | 第一次重试的基础延迟 |
retry.maxDelayMs | 30000 | 任意重试延迟的上限 |
maxBufferSize | 1000 | 丢弃最旧事件前的最大缓冲事件数 |
onDropped | - | 事件被丢弃时的回调(溢出或重试耗尽) |
退避策略
| 策略 | 延迟模式 | 使用场景 |
|---|---|---|
exponential | 1s, 2s, 4s, 8s… | 默认值。最适合可能需要时间恢复的临时失败 |
linear | 1s, 2s, 3s, 4s… | 可预测的延迟增长 |
fixed | 1s, 1s, 1s, 1s… | 每次延迟都相同。适合限流 API |
返回的 drain 函数
pipeline(drain) 返回的函数与 hook 兼容,并提供:
| 属性 | 类型 | 描述 |
|---|---|---|
drain(ctx) | (ctx: T) => void | 将单个事件推入缓冲区 |
drain.flush() | () => Promise<void> | 强制刷新所有缓冲事件 |
drain.pending | number | 当前缓冲的事件数量 |
Fanout
{
level: "info",
method: "POST",
path: "/checkout",
duration: 234,
user: {...},
cart: {...}
}- axiompending
- otlppending
- sentrypending
- nuxthubpending
- fspending
在生产环境中,同一类宽泛事件通常需要到达多个目标:长期存储(Axiom)、指标工具(Datadog)、错误追踪器(Sentry),以及用于事件回放的本地 fs drain。流水线先批量处理一次,然后你的 drain 通过 Promise.allSettled 将该批次并行分发到每个目标,这样单个缓慢/失败的目标就不会阻塞其他目标。
将 evlog 事件分发到多个目标
配方
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createDatadogDrain } from 'evlog/datadog'
import { createSentryDrain } from 'evlog/sentry'
import { createFsDrain } from 'evlog/fs'
import type { DrainContext } from 'evlog'
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 50, intervalMs: 5000 },
retry: { maxAttempts: 3 },
maxBufferSize: 1000,
})
const axiom = createAxiomDrain()
const datadog = createDatadogDrain()
const sentry = createSentryDrain({ minLevel: 'error' })
const fs = createFsDrain({ dir: '.evlog/logs', maxFiles: 14 })
export const drain = pipeline(async (batch) => {
await Promise.allSettled([
axiom(batch),
datadog(batch),
sentry(batch),
fs(batch),
])
})
你会得到什么
- 并行分发 — 每个目标都会通过
Promise.allSettled并发接收该批次 - 容错 fanout — 如果 Datadog 的 API 抛错,Axiom / Sentry / fs 仍然会完成;只有当包装函数拒绝时,流水线才会重试整个批次
- 共享背压 — 缓冲区只为整个流水线设置一次;如果包装的 drain 跟不上,最旧的事件会在所有目标中一致地被丢弃
每个 drain 的过滤
包装某个目标 drain,让它只看到你关心的事件:
import type { DrainContext } from 'evlog'
const sentry = createSentryDrain({ dsn: process.env.SENTRY_DSN! })
async function sentryErrorsOnly(batch: DrainContext[]): Promise<void> {
const errors = batch.filter(c => c.event?.level === 'error')
if (errors.length > 0) await sentry(errors)
}
export const drain = pipeline(async (batch) => {
await Promise.allSettled([
axiom(batch),
sentryErrorsOnly(batch),
])
})
大多数内置 drains 都直接暴露 minLevel,因此你只在需要非 level 过滤(路径、自定义字段等)时才需要这种模式。
自定义 drain 函数
你不需要适配器。传入任意接受一个 batch 的异步函数:
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 100 } })
export const drain = pipeline(async (batch) => {
await fetch('https://your-service.com/logs', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(batch.map(ctx => ctx.event)),
})
})
对于更复杂的场景(配置解析、重试、身份标头),请改用 defineHttpDrain,让工具包处理样板代码。
HTTP drain(浏览器到服务器)
HTTP drain 是一种与框架无关的传输方式,用于将客户端日志发送到你的服务器。它建立在相同的 pipeline 原语之上,并带有浏览器特定的默认行为(fetch keepalive + 在 visibilitychange 时使用 sendBeacon)。
evlog/browser 导入路径已被 弃用,它会重新导出与 evlog/http 相同的 API。它将在下一个 主版本 中移除。新代码请优先使用 evlog/http。为客户端日志设置 HTTP 传输
快速开始
import { initLogger, log } from 'evlog'
import { createHttpLogDrain } from 'evlog/http'
const drain = createHttpLogDrain({
drain: { endpoint: 'https://logs.example.com/v1/ingest' },
})
initLogger({ drain })
log.info({ action: 'page_view', path: location.pathname })
工作原理(浏览器特定)
log.info()/log.warn()/log.error()将事件推入内存缓冲区- 事件会按大小(默认 25)或时间间隔(默认 2 秒)进行批处理
- 批次通过带有
keepalive: true的fetch发送,以确保请求在页面导航时仍能继续 - 当页面变为隐藏状态(切换标签、导航)时,缓冲的事件会通过
navigator.sendBeacon作为回退方式刷新发送 - 你的服务器端点接收一个
DrainContext[]JSON 数组,并以你喜欢的方式处理它
双层 API
createHttpLogDrain(options)
高级、预组合:创建一个带有批处理、重试以及在 visibilitychange 时自动刷新 的 pipeline。直接返回可与 initLogger({ drain }) 一起使用的 PipelineDrainFn<DrainContext>。
import { initLogger, log } from 'evlog'
import { createHttpLogDrain } from 'evlog/http'
const drain = createHttpLogDrain({
drain: { endpoint: 'https://logs.example.com/v1/ingest' },
pipeline: { batch: { size: 50, intervalMs: 5000 } },
})
initLogger({ drain })
log.info({ action: 'click', target: 'buy-button' })
createHttpDrain(config)
低层传输函数。当你希望完全控制 pipeline 配置时使用它:
import { createHttpDrain } from 'evlog/http'
import { createDrainPipeline } from 'evlog/pipeline'
import type { DrainContext } from 'evlog'
const transport = createHttpDrain({
endpoint: 'https://logs.example.com/v1/ingest',
})
const pipeline = createDrainPipeline<DrainContext>({
batch: { size: 100, intervalMs: 10000 },
retry: { maxAttempts: 5 },
})
const drain = pipeline(transport)
配置参考
HttpDrainConfig
| 选项 | 默认值 | 说明 |
|---|---|---|
endpoint | - | (必需) 服务器 ingest 端点的完整 URL |
headers | - | 随每个 fetch 请求发送的自定义标头(例如 Authorization、X-API-Key) |
timeout | 5000 | 请求超时时间,单位为毫秒 |
useBeacon | true | 当页面隐藏时使用 sendBeacon |
credentials | 'same-origin' | Fetch 凭据模式('omit'、'same-origin'、'include')。对于跨域端点请设置为 'include' |
HttpLogDrainOptions
| 选项 | 默认值 | 说明 |
|---|---|---|
drain | - | (必需) HttpDrainConfig 对象 |
pipeline | { batch: { size: 25, intervalMs: 2000 }, retry: { maxAttempts: 2 } } | pipeline 配置覆盖项 |
autoFlush | true | 自动注册 visibilitychange 刷新监听器 |
sendBeacon 回退
useBeacon(默认值)且页面变为隐藏时,drain 会自动从 fetch 切换到 navigator.sendBeacon。这可确保即使用户关闭标签页或离开页面,日志也能被发送。sendBeacon 受浏览器施加的负载限制(约 64 KB)。如果负载超过此限制,drain 会抛出错误。请保持批次大小合理(默认 25 远在限制范围内)。
身份验证
传递自定义标头以保护你的 ingest 端点:
const drain = createHttpLogDrain({
drain: {
endpoint: 'https://logs.example.com/v1/ingest',
headers: {
'Authorization': 'Bearer ' + token,
},
},
})
headers 仅应用于 fetch 请求。sendBeacon API 不支持自定义标头,因此当页面隐藏并使用 sendBeacon 时,不会发送标头。如果你的端点需要身份验证,请通过会话 cookie 进行验证(对跨域端点设置 credentials: 'include'),或者通过 useBeacon: false 禁用 sendBeacon。服务器端点
你的服务器需要一个接受 DrainContext[] JSON body 的 POST 端点。常见框架示例如下:
app.post('/v1/ingest', express.json(), (req, res) => {
for (const entry of req.body) {
console.log('[BROWSER]', JSON.stringify(entry))
}
res.sendStatus(204)
})
app.post('/v1/ingest', async (c) => {
const body = await c.req.json()
for (const entry of body) {
console.log('[BROWSER]', JSON.stringify(entry))
}
return c.body(null, 204)
})
查看完整的 浏览器示例,了解一个可运行的 Hono 服务器 + 浏览器页面。
常见陷阱
- 不要忘记在关闭时调用
drain.flush()—— 否则缓冲的事件会丢失 - 调整
batch.size以匹配你的提供商建议的负载大小 —— 太小会增加额外开销,太大则有被拒绝的风险 - 除非你需要按目标端点分别重试,否则不要为每个 drain 运行一个 pipeline —— 共享一个 pipeline 可以保持批处理和缓冲的一致性
- 在未确认兼容无服务器环境前,不要向不兼容的目标进行扇出 —— stream server 通过进程内流将每个已连接客户端连接起来;它不是 drain
下一步
- 自定义 Drains — 使用
defineHttpDrain/defineDrain为任意后端构建 drain - 适配器概览 — 与 pipeline 开箱即用的内置适配器
- 最佳实践 — 安全与生产建议
- 客户端日志 — 端到端的浏览器 → 服务器流程