drain 是 evlog 管道的终端步骤:一个接收宽事件并将其发送到某处的函数——HTTP API、消息队列、数据库、webhook、本地文件。evlog 为常见提供商提供了内置 drain(适配器概览)。当你需要一个未被覆盖的目标时,就自己编写一个。
两个工厂覆盖所有情况:
| 你有…… | 使用 |
|---|---|
一个 HTTP 后端(REST、JSON ingest、供应商 /v1/logs 端点) | defineHttpDrain |
| 一个非 HTTP 传输(gRPC、WebSocket、供应商 SDK、队列、原始套接字) | defineDrain |
两者都来自 evlog/toolkit,并且正是每个内置适配器所使用的工厂。
构建一个自定义 evlog drain
defineHttpDrain(HTTP 配方)
每个内置适配器都遵循的配方。两个纯函数:resolve() 返回配置(或 null 以跳过),encode() 返回 HTTP 请求负载。
import {
defineHttpDrain,
resolveAdapterConfig,
type ConfigField,
} from 'evlog/toolkit'
interface MyServiceConfig {
apiKey: string
endpoint?: string
timeout?: number
}
const FIELDS: ConfigField<MyServiceConfig>[] = [
{ key: 'apiKey', env: ['MYSERVICE_API_KEY'] },
{ key: 'endpoint', env: ['MYSERVICE_ENDPOINT'] },
{ key: 'timeout' },
]
export function createMyServiceDrain(overrides?: Partial<MyServiceConfig>) {
return defineHttpDrain<MyServiceConfig>({
name: 'myservice',
resolve: async () => {
const cfg = await resolveAdapterConfig<MyServiceConfig>('myservice', FIELDS, overrides)
if (!cfg.apiKey) {
console.error('[evlog/myservice] 缺少 apiKey')
return null
}
return cfg as MyServiceConfig
},
encode: (events, cfg) => ({
url: `${cfg.endpoint ?? 'https://api.myservice.com'}/v1/ingest`,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${cfg.apiKey}`,
},
body: JSON.stringify(events),
}),
})
}
就是这样。defineHttpDrain 会处理批处理、重试(默认 2 次)、超时(默认 5000ms)、错误隔离,以及身份标头(User-Agent: evlog/<version> + X-Evlog-Source: <name>)。即使你的目标端宕机,你的应用流水线也会继续运行。
一个 5 分钟示例 — 内部 Loki drain
一个完整可工作的 drain,只有 25 行,并且不需要外部配置助手:
import { defineHttpDrain } from 'evlog/toolkit'
export function createLokiDrain(overrides?: { url?: string, token?: string }) {
return defineHttpDrain<{ url: string, token: string }>({
name: 'loki',
resolve: () => ({
url: overrides?.url ?? process.env.LOKI_URL!,
token: overrides?.token ?? process.env.LOKI_TOKEN!,
}),
encode: (events, config) => ({
url: `${config.url}/loki/api/v1/push`,
headers: {
'Content-Type': 'application/json',
Authorization: `Bearer ${config.token}`,
},
body: JSON.stringify({
streams: events.map(e => ({
stream: { service: e.service, level: e.level },
values: [[String(Date.parse(e.timestamp) * 1e6), JSON.stringify(e)]],
})),
}),
}),
})
}
标准化配置优先级
resolveAdapterConfig(namespace, fields, overrides) 会沿着标准链路查找,因此用户获得的配置体验与内置适配器相同:
- 显式传给你工厂的
overrides runtimeConfig.evlog.<namespace>(Nuxt/Nitro)runtimeConfig.<namespace>(旧版 Nuxt/Nitro)NUXT_<NS>_<FIELD>环境变量<NS>_<FIELD>环境变量
字段名应遵循项目约定:apiKey、endpoint、serviceName、timeout。如果你正在重命名一个已有字段(例如 token → apiKey),请在一个大版本内同时保留这两个 ConfigField 条目——参见 axiom.ts 和 better-stack.ts 的弃用模式。
将 drain 接入你的框架
一旦 createMyServiceDrain() 返回该 drain,就像其他任何东西一样接入它:
// server/plugins/evlog-drain.ts
import { createMyServiceDrain } from '~/server/utils/my-drain'
const drain = createMyServiceDrain()
export default defineNitroPlugin((nitroApp) => {
nitroApp.hooks.hook('evlog:drain', drain)
})
// lib/evlog.ts
import { createEvlog } from 'evlog/next'
import { createMyServiceDrain } from './my-drain'
export const { withEvlog, useLogger, log, createError } = createEvlog({
service: 'my-app',
drain: createMyServiceDrain(),
})
import { createMyServiceDrain } from './my-drain'
app.use(evlog({ drain: createMyServiceDrain() }))
await app.register(evlog, { drain: createMyServiceDrain() })
EvlogModule.forRoot({ drain: createMyServiceDrain() })
import { initLogger } from 'evlog'
import { createMyServiceDrain } from './my-drain'
initLogger({ drain: createMyServiceDrain() })
对于生产环境,将其包装一次在 createDrainPipeline 中,这样事件就会被批处理并重试。
过滤和转换事件
encode() 接收完整的 WideEvent[] 批次以及解析后的配置。可在内联中进行过滤或转换——返回 null 是对该批次干净地选择退出:
encode: (events, cfg) => {
const filtered = events.filter(e => e.level === 'error' && e.path !== '/health')
if (filtered.length === 0) return null
const payload = filtered.map(e => ({
ts: new Date(e.timestamp).getTime(),
severity: e.level.toUpperCase(),
attributes: { method: e.method, path: e.path, status: e.status, duration: e.duration },
}))
return {
url: `${cfg.endpoint}/v1/push`,
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(payload),
}
}
defineDrain(非 HTTP 传输)
如果你的目标需要 gRPC、供应商 SDK、队列客户端、WebSocket 或原始套接字,可以用 defineDrain 再往下一层。传输由你负责;工具包仍然会为你提供配置解析、错误隔离和一致的形状。
import { defineDrain } from 'evlog/toolkit'
export const createCustomTransportDrain = () =>
defineDrain<{ apiKey: string }>({
name: 'custom',
resolve: async () => ({ apiKey: process.env.MY_KEY! }),
send: async (events, cfg) => {
await myVendorSdk.publish(events, { token: cfg.apiKey })
},
})
当你退回使用 defineDrain 时,请手动遵循 defineHttpDrain 强制执行的相同规则:将传输包装在 try/catch 中,用 console.error('[evlog/<name>] …') 记录日志,并且绝不要重新抛出。
DrainContext 参考
当 evlog 通过 evlog:drain 调用你的 drain 时,它会为每个事件传递一个 DrainContext:
interface DrainContext {
/** 包含所有已累积上下文的完整宽事件 */
event: WideEvent
/** 请求元数据 */
request?: {
method: string
path: string
requestId: string
}
/** 安全的 HTTP 标头(已过滤敏感标头) */
headers?: Record<string, string>
}
interface WideEvent {
timestamp: string
level: 'debug' | 'info' | 'warn' | 'error'
service: string
environment?: string
version?: string
region?: string
commitHash?: string
requestId?: string
// ... 以及通过 log.set() 添加的所有字段
[key: string]: unknown
}
在 encode() / send() 接收的批量形式中,你会直接得到 WideEvent[](工具包会将每个上下文中的 event 解包)。
工具包辅助函数
evlog/toolkit 暴露了每个内置适配器都会用到的相同辅助函数。与 drains 相关的有:
| 导出 | 用途 |
|---|---|
defineHttpDrain(spec) | HTTP 配方——自动重试、超时、身份标头、错误隔离 |
defineDrain(spec) | 非 HTTP 传输的相同契约 |
resolveAdapterConfig(ns, fields, overrides) | 标准配置优先级链(overrides → runtimeConfig.evlog.<ns> → env) |
httpPost(opts) | 每个内置 HTTP 适配器使用的重试 POST 辅助函数——处理超时、重试、脱敏错误消息 |
composeDrains(drains) | 将多个 drain 合并为一个(错误隔离,并通过 Promise.allSettled 并发运行) |
toTypedAttributeValue(value) | 将任意值转换为 Axiom / Sentry 使用的 typed attribute 形状 |
toOtlpAttributeValue(value) | 将任意值转换为 OTLP AnyValue 形状(用于 OTLP / HyperDX / PostHog 日志) |
OTEL_SEVERITY_NUMBER, OTEL_SEVERITY_TEXT | OTEL 日志严重级别表 |
身份头
defineHttpDrain 会自动为每个请求添加两个头部,方便接收方识别流量:
| Header | Value |
|---|---|
User-Agent | evlog/<version>(仅限 Node / 服务器运行时——浏览器会移除此头部) |
X-Evlog-Source | 你提供的 drain name |
如果你直接在 httpPost 之上构建 drain,也可以覆盖或抑制它们——参见 身份头。
错误处理——已为你完成
defineHttpDrain 会自动强制执行所有最佳实践:
- 绝不抛出——失败会被捕获并记录,前缀为
[evlog/<name>]。 - 重试——默认在瞬时错误时尝试 2 次(可通过
retries配置)。 - 超时——默认 5000ms(可通过
timeout配置)。 - 优雅降级——
resolve()返回null会使 drain 变为无操作。
如果你回退使用 defineDrain,请手动遵循相同规则。
作为社区包发布
社区 drain 的推荐结构:
my-evlog-drain/
├─ src/
│ ├─ drain.ts # 通过 defineHttpDrain 创建 createMyDrain
│ └─ index.ts # 重新导出
├─ test/ # vitest,mock fetch
├─ package.json # peerDependency: "evlog"
└─ README.md
将 evlog 添加为 peerDependency(而不是 dependency)——你的包在安装时不应该拉入一份 evlog 副本。
下一步
- Drain Pipeline — 将你的 drain 包装进 batch + retry + fanout,用于生产环境
- Adapters Overview — 查看内置适配器如何使用
defineHttpDrain - Custom Enrichers — 适用于派生事件字段的相同工具包形状
- Custom Framework Integration — 适用于 HTTP 框架的相同工具包形状
- Best Practices — 安全与生产环境提示