扩展

Drain Pipeline

批量处理事件、失败重试、分发到多个目标,并将浏览器日志发送到你的服务器。用于包装生产环境中每个 drain 的共享流水线。

在生产环境中,为每个发出的事件发送一个 HTTP 请求无法扩展。drain pipeline 会缓冲事件并批量发送,在临时失败时重试,缓冲区溢出时丢弃最旧的事件,并允许单个 drain 函数并行分发到多个目标。用于客户端日志的 HTTP browser drain 也由同一条流水线提供支持。

drain pipeline · batch + retry·BUFFERING
app emits#0
never blocks the response
in-memory buffer0 / 10
·
·
·
·
·
·
·
·
·
·
fills until size or 5s interval
flush · http POST0 ok
onDropped · maxBufferSize · drain.flush()
emitted0
batches sent0
dropped0
你想要…查看
将任意 drain 包装为批量 + 重试 + 缓冲快速开始
将每个事件并行发送到多个目标Fanout
将浏览器日志发送到你的服务器端点HTTP drain(浏览器到服务器)
调整批量大小、重试策略、缓冲区大小配置

添加 drain 流水线(批量 + 重试 + 分发)

快速开始

该流水线可包装任意 drain。具体接线取决于你的框架——选择与你一致的标签页;下面的所有其他示例都使用相同的结构。

// server/plugins/evlog-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

export default defineNitroPlugin((nitroApp) => {
  const pipeline = createDrainPipeline<DrainContext>()
  const drain = pipeline(createAxiomDrain())

  nitroApp.hooks.hook('evlog:drain', drain)
  nitroApp.hooks.hook('close', () => drain.flush())
})
在进程退出前始终刷新流水线(drain.flush())。在 Nitro 中使用 close hook;在 standalone 脚本中在 process.exit 之前调用;在 serverless 运行时中使用 waitUntil(drain.flush())

工作原理

事件在到达 evlog:drain 时会被缓冲。当达到 batch.sizebatch.intervalMs 到期时(以先发生者为准)就会批量刷新。失败时,会使用配置的退避策略重试同一批次;一旦 retry.maxAttempts 耗尽,就会以丢失的事件调用 onDropped。缓冲区由 maxBufferSize 限制——一旦满了,就会丢弃最旧的事件以保持内存占用平稳。

配置

pipeline-config.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'

const pipeline = createDrainPipeline<DrainContext>({
  batch: {
    size: 50,          // 每 50 个事件刷新一次
    intervalMs: 5000,  // 或每 5 秒刷新一次,以先到者为准
  },
  retry: {
    maxAttempts: 3,
    backoff: 'exponential',
    initialDelayMs: 1000,
    maxDelayMs: 30000,
  },
  maxBufferSize: 1000,
  onDropped: (events, error) => {
    console.error(`[evlog] 丢弃了 ${events.length} 个事件:`, error?.message)
  },
})

export const drain = pipeline(createAxiomDrain())

选项参考

选项默认值描述
batch.size50每批最多事件数
batch.intervalMs5000刷新部分批次前的最大时间(ms)
retry.maxAttempts3包括初始尝试在内的总尝试次数
retry.backoff'exponential''exponential' | 'linear' | 'fixed'
retry.initialDelayMs1000第一次重试的基础延迟
retry.maxDelayMs30000任意重试延迟的上限
maxBufferSize1000丢弃最旧事件前的最大缓冲事件数
onDropped-事件被丢弃时的回调(溢出或重试耗尽)

退避策略

策略延迟模式使用场景
exponential1s, 2s, 4s, 8s…默认值。最适合可能需要时间恢复的临时失败
linear1s, 2s, 3s, 4s…可预测的延迟增长
fixed1s, 1s, 1s, 1s…每次延迟都相同。适合限流 API

返回的 drain 函数

pipeline(drain) 返回的函数与 hook 兼容,并提供:

属性类型描述
drain(ctx)(ctx: T) => void将单个事件推入缓冲区
drain.flush()() => Promise<void>强制刷新所有缓冲事件
drain.pendingnumber当前缓冲的事件数量

Fanout

drain pipeline
0/5 delivered
wide event
{
  level:    "info",
  method:   "POST",
  path:     "/checkout",
  duration: 234,
  user:     {...},
  cart:     {...}
}
BUILDING
  1. axiom
    pending
  2. otlp
    pending
  3. sentry
    pending
  4. nuxthub
    pending
  5. fs
    pending
fire-and-forget retry with backoff one failure ≠ blocked response ✓ all destinations resolved

在生产环境中,同一类宽泛事件通常需要到达多个目标:长期存储(Axiom)、指标工具(Datadog)、错误追踪器(Sentry),以及用于事件回放的本地 fs drain。流水线先批量处理一次,然后你的 drain 通过 Promise.allSettled 将该批次并行分发到每个目标,这样单个缓慢/失败的目标就不会阻塞其他目标。

将 evlog 事件分发到多个目标

配方

import { createDrainPipeline } from 'evlog/pipeline'
import { createAxiomDrain } from 'evlog/axiom'
import { createDatadogDrain } from 'evlog/datadog'
import { createSentryDrain } from 'evlog/sentry'
import { createFsDrain } from 'evlog/fs'
import type { DrainContext } from 'evlog'

const pipeline = createDrainPipeline<DrainContext>({
  batch: { size: 50, intervalMs: 5000 },
  retry: { maxAttempts: 3 },
  maxBufferSize: 1000,
})

const axiom = createAxiomDrain()
const datadog = createDatadogDrain()
const sentry = createSentryDrain({ minLevel: 'error' })
const fs = createFsDrain({ dir: '.evlog/logs', maxFiles: 14 })

export const drain = pipeline(async (batch) => {
  await Promise.allSettled([
    axiom(batch),
    datadog(batch),
    sentry(batch),
    fs(batch),
  ])
})

你会得到什么

  • 并行分发 — 每个目标都会通过 Promise.allSettled 并发接收该批次
  • 容错 fanout — 如果 Datadog 的 API 抛错,Axiom / Sentry / fs 仍然会完成;只有当包装函数拒绝时,流水线才会重试整个批次
  • 共享背压 — 缓冲区只为整个流水线设置一次;如果包装的 drain 跟不上,最旧的事件会在所有目标中一致地被丢弃

每个 drain 的过滤

包装某个目标 drain,让它只看到你关心的事件:

import type { DrainContext } from 'evlog'

const sentry = createSentryDrain({ dsn: process.env.SENTRY_DSN! })

async function sentryErrorsOnly(batch: DrainContext[]): Promise<void> {
  const errors = batch.filter(c => c.event?.level === 'error')
  if (errors.length > 0) await sentry(errors)
}

export const drain = pipeline(async (batch) => {
  await Promise.allSettled([
    axiom(batch),
    sentryErrorsOnly(batch),
  ])
})

大多数内置 drains 都直接暴露 minLevel,因此你只在需要非 level 过滤(路径、自定义字段等)时才需要这种模式。

自定义 drain 函数

你不需要适配器。传入任意接受一个 batch 的异步函数:

pipeline-custom.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'

const pipeline = createDrainPipeline<DrainContext>({ batch: { size: 100 } })

export const drain = pipeline(async (batch) => {
  await fetch('https://your-service.com/logs', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(batch.map(ctx => ctx.event)),
  })
})

对于更复杂的场景(配置解析、重试、身份标头),请改用 defineHttpDrain,让工具包处理样板代码。

HTTP drain(浏览器到服务器)

HTTP drain 是一种与框架无关的传输方式,用于将客户端日志发送到你的服务器。它建立在相同的 pipeline 原语之上,并带有浏览器特定的默认行为(fetch keepalive + 在 visibilitychange 时使用 sendBeacon)。

evlog/browser 导入路径已被 弃用,它会重新导出与 evlog/http 相同的 API。它将在下一个 主版本 中移除。新代码请优先使用 evlog/http

为客户端日志设置 HTTP 传输

快速开始

app.ts
import { initLogger, log } from 'evlog'
import { createHttpLogDrain } from 'evlog/http'

const drain = createHttpLogDrain({
  drain: { endpoint: 'https://logs.example.com/v1/ingest' },
})
initLogger({ drain })

log.info({ action: 'page_view', path: location.pathname })

工作原理(浏览器特定)

  1. log.info() / log.warn() / log.error() 将事件推入内存缓冲区
  2. 事件会按大小(默认 25)或时间间隔(默认 2 秒)进行批处理
  3. 批次通过带有 keepalive: truefetch 发送,以确保请求在页面导航时仍能继续
  4. 当页面变为隐藏状态(切换标签、导航)时,缓冲的事件会通过 navigator.sendBeacon 作为回退方式刷新发送
  5. 你的服务器端点接收一个 DrainContext[] JSON 数组,并以你喜欢的方式处理它

双层 API

createHttpLogDrain(options)

高级、预组合:创建一个带有批处理、重试以及在 visibilitychange 时自动刷新 的 pipeline。直接返回可与 initLogger({ drain }) 一起使用的 PipelineDrainFn<DrainContext>

app.ts
import { initLogger, log } from 'evlog'
import { createHttpLogDrain } from 'evlog/http'

const drain = createHttpLogDrain({
  drain: { endpoint: 'https://logs.example.com/v1/ingest' },
  pipeline: { batch: { size: 50, intervalMs: 5000 } },
})

initLogger({ drain })
log.info({ action: 'click', target: 'buy-button' })

createHttpDrain(config)

低层传输函数。当你希望完全控制 pipeline 配置时使用它:

app.ts
import { createHttpDrain } from 'evlog/http'
import { createDrainPipeline } from 'evlog/pipeline'
import type { DrainContext } from 'evlog'

const transport = createHttpDrain({
  endpoint: 'https://logs.example.com/v1/ingest',
})
const pipeline = createDrainPipeline<DrainContext>({
  batch: { size: 100, intervalMs: 10000 },
  retry: { maxAttempts: 5 },
})

const drain = pipeline(transport)

配置参考

HttpDrainConfig

选项默认值说明
endpoint-(必需) 服务器 ingest 端点的完整 URL
headers-随每个 fetch 请求发送的自定义标头(例如 AuthorizationX-API-Key
timeout5000请求超时时间,单位为毫秒
useBeacontrue当页面隐藏时使用 sendBeacon
credentials'same-origin'Fetch 凭据模式('omit''same-origin''include')。对于跨域端点请设置为 'include'

HttpLogDrainOptions

选项默认值说明
drain-(必需) HttpDrainConfig 对象
pipeline{ batch: { size: 25, intervalMs: 2000 }, retry: { maxAttempts: 2 } }pipeline 配置覆盖项
autoFlushtrue自动注册 visibilitychange 刷新监听器

sendBeacon 回退

当启用 useBeacon(默认值)且页面变为隐藏时,drain 会自动从 fetch 切换到 navigator.sendBeacon。这可确保即使用户关闭标签页或离开页面,日志也能被发送。

sendBeacon 受浏览器施加的负载限制(约 64 KB)。如果负载超过此限制,drain 会抛出错误。请保持批次大小合理(默认 25 远在限制范围内)。

身份验证

传递自定义标头以保护你的 ingest 端点:

app.ts
const drain = createHttpLogDrain({
  drain: {
    endpoint: 'https://logs.example.com/v1/ingest',
    headers: {
      'Authorization': 'Bearer ' + token,
    },
  },
})
headers 仅应用于 fetch 请求。sendBeacon API 不支持自定义标头,因此当页面隐藏并使用 sendBeacon 时,不会发送标头。如果你的端点需要身份验证,请通过会话 cookie 进行验证(对跨域端点设置 credentials: 'include'),或者通过 useBeacon: false 禁用 sendBeacon

服务器端点

你的服务器需要一个接受 DrainContext[] JSON body 的 POST 端点。常见框架示例如下:

app.post('/v1/ingest', express.json(), (req, res) => {
  for (const entry of req.body) {
    console.log('[BROWSER]', JSON.stringify(entry))
  }
  res.sendStatus(204)
})

查看完整的 浏览器示例,了解一个可运行的 Hono 服务器 + 浏览器页面。

常见陷阱

  • 不要忘记在关闭时调用 drain.flush() —— 否则缓冲的事件会丢失
  • 调整 batch.size 以匹配你的提供商建议的负载大小 —— 太小会增加额外开销,太大则有被拒绝的风险
  • 除非你需要按目标端点分别重试,否则不要为每个 drain 运行一个 pipeline —— 共享一个 pipeline 可以保持批处理和缓冲的一致性
  • 在未确认兼容无服务器环境前,不要向不兼容的目标进行扇出 —— stream server 通过进程内流将每个已连接客户端连接起来;它不是 drain

下一步