扩展
为任何没有内置适配器的后端构建一个 drain — 对于 HTTP 目标使用 defineHttpDrain,对于其他任何传输使用 defineDrain。标准化的配置解析、重试、超时和身份标头都已为你处理好。

drain 是 evlog 管道的终端步骤:一个接收宽事件并将其发送到某处的函数——HTTP API、消息队列、数据库、webhook、本地文件。evlog 为常见提供商提供了内置 drain(适配器概览)。当你需要一个未被覆盖的目标时,就自己编写一个。

两个工厂覆盖所有情况:

你有……使用
一个 HTTP 后端(REST、JSON ingest、供应商 /v1/logs 端点)defineHttpDrain
一个非 HTTP 传输(gRPC、WebSocket、供应商 SDK、队列、原始套接字)defineDrain

两者都来自 evlog/toolkit,并且正是每个内置适配器所使用的工厂。

构建一个自定义 evlog drain

defineHttpDrain(HTTP 配方)

每个内置适配器都遵循的配方。两个纯函数:resolve() 返回配置(或 null 以跳过),encode() 返回 HTTP 请求负载。

lib/my-drain.ts
import {
  defineHttpDrain,
  resolveAdapterConfig,
  type ConfigField,
} from 'evlog/toolkit'

interface MyServiceConfig {
  apiKey: string
  endpoint?: string
  timeout?: number
}

const FIELDS: ConfigField<MyServiceConfig>[] = [
  { key: 'apiKey', env: ['MYSERVICE_API_KEY'] },
  { key: 'endpoint', env: ['MYSERVICE_ENDPOINT'] },
  { key: 'timeout' },
]

export function createMyServiceDrain(overrides?: Partial<MyServiceConfig>) {
  return defineHttpDrain<MyServiceConfig>({
    name: 'myservice',
    resolve: async () => {
      const cfg = await resolveAdapterConfig<MyServiceConfig>('myservice', FIELDS, overrides)
      if (!cfg.apiKey) {
        console.error('[evlog/myservice] 缺少 apiKey')
        return null
      }
      return cfg as MyServiceConfig
    },
    encode: (events, cfg) => ({
      url: `${cfg.endpoint ?? 'https://api.myservice.com'}/v1/ingest`,
      headers: {
        'Content-Type': 'application/json',
        Authorization: `Bearer ${cfg.apiKey}`,
      },
      body: JSON.stringify(events),
    }),
  })
}

就是这样。defineHttpDrain 会处理批处理、重试(默认 2 次)、超时(默认 5000ms)、错误隔离,以及身份标头(User-Agent: evlog/<version> + X-Evlog-Source: <name>)。即使你的目标端宕机,你的应用流水线也会继续运行。

一个 5 分钟示例 — 内部 Loki drain

一个完整可工作的 drain,只有 25 行,并且不需要外部配置助手:

lib/loki-drain.ts
import { defineHttpDrain } from 'evlog/toolkit'

export function createLokiDrain(overrides?: { url?: string, token?: string }) {
  return defineHttpDrain<{ url: string, token: string }>({
    name: 'loki',
    resolve: () => ({
      url: overrides?.url ?? process.env.LOKI_URL!,
      token: overrides?.token ?? process.env.LOKI_TOKEN!,
    }),
    encode: (events, config) => ({
      url: `${config.url}/loki/api/v1/push`,
      headers: {
        'Content-Type': 'application/json',
        Authorization: `Bearer ${config.token}`,
      },
      body: JSON.stringify({
        streams: events.map(e => ({
          stream: { service: e.service, level: e.level },
          values: [[String(Date.parse(e.timestamp) * 1e6), JSON.stringify(e)]],
        })),
      }),
    }),
  })
}

标准化配置优先级

resolveAdapterConfig(namespace, fields, overrides) 会沿着标准链路查找,因此用户获得的配置体验与内置适配器相同:

  1. 显式传给你工厂的 overrides
  2. runtimeConfig.evlog.<namespace>(Nuxt/Nitro)
  3. runtimeConfig.<namespace>(旧版 Nuxt/Nitro)
  4. NUXT_<NS>_<FIELD> 环境变量
  5. <NS>_<FIELD> 环境变量

字段名应遵循项目约定:apiKeyendpointserviceNametimeout。如果你正在重命名一个已有字段(例如 tokenapiKey),请在一个大版本内同时保留这两个 ConfigField 条目——参见 axiom.tsbetter-stack.ts 的弃用模式。

将 drain 接入你的框架

一旦 createMyServiceDrain() 返回该 drain,就像其他任何东西一样接入它:

// server/plugins/evlog-drain.ts
import { createMyServiceDrain } from '~/server/utils/my-drain'

const drain = createMyServiceDrain()

export default defineNitroPlugin((nitroApp) => {
  nitroApp.hooks.hook('evlog:drain', drain)
})

对于生产环境,将其包装一次在 createDrainPipeline 中,这样事件就会被批处理并重试。

过滤和转换事件

encode() 接收完整的 WideEvent[] 批次以及解析后的配置。可在内联中进行过滤或转换——返回 null 是对该批次干净地选择退出:

encode: (events, cfg) => {
  const filtered = events.filter(e => e.level === 'error' && e.path !== '/health')
  if (filtered.length === 0) return null

  const payload = filtered.map(e => ({
    ts: new Date(e.timestamp).getTime(),
    severity: e.level.toUpperCase(),
    attributes: { method: e.method, path: e.path, status: e.status, duration: e.duration },
  }))

  return {
    url: `${cfg.endpoint}/v1/push`,
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(payload),
  }
}

defineDrain(非 HTTP 传输)

如果你的目标需要 gRPC、供应商 SDK、队列客户端、WebSocket 或原始套接字,可以用 defineDrain 再往下一层。传输由你负责;工具包仍然会为你提供配置解析、错误隔离和一致的形状。

import { defineDrain } from 'evlog/toolkit'

export const createCustomTransportDrain = () =>
  defineDrain<{ apiKey: string }>({
    name: 'custom',
    resolve: async () => ({ apiKey: process.env.MY_KEY! }),
    send: async (events, cfg) => {
      await myVendorSdk.publish(events, { token: cfg.apiKey })
    },
  })

当你退回使用 defineDrain 时,请手动遵循 defineHttpDrain 强制执行的相同规则:将传输包装在 try/catch 中,用 console.error('[evlog/<name>] …') 记录日志,并且绝不要重新抛出。

DrainContext 参考

当 evlog 通过 evlog:drain 调用你的 drain 时,它会为每个事件传递一个 DrainContext

types.ts
interface DrainContext {
  /** 包含所有已累积上下文的完整宽事件 */
  event: WideEvent

  /** 请求元数据 */
  request?: {
    method: string
    path: string
    requestId: string
  }

  /** 安全的 HTTP 标头(已过滤敏感标头) */
  headers?: Record<string, string>
}

interface WideEvent {
  timestamp: string
  level: 'debug' | 'info' | 'warn' | 'error'
  service: string
  environment?: string
  version?: string
  region?: string
  commitHash?: string
  requestId?: string
  // ... 以及通过 log.set() 添加的所有字段
  [key: string]: unknown
}

encode() / send() 接收的批量形式中,你会直接得到 WideEvent[](工具包会将每个上下文中的 event 解包)。

工具包辅助函数

evlog/toolkit 暴露了每个内置适配器都会用到的相同辅助函数。与 drains 相关的有:

导出用途
defineHttpDrain(spec)HTTP 配方——自动重试、超时、身份标头、错误隔离
defineDrain(spec)非 HTTP 传输的相同契约
resolveAdapterConfig(ns, fields, overrides)标准配置优先级链(overrides → runtimeConfig.evlog.<ns> → env)
httpPost(opts)每个内置 HTTP 适配器使用的重试 POST 辅助函数——处理超时、重试、脱敏错误消息
composeDrains(drains)将多个 drain 合并为一个(错误隔离,并通过 Promise.allSettled 并发运行)
toTypedAttributeValue(value)将任意值转换为 Axiom / Sentry 使用的 typed attribute 形状
toOtlpAttributeValue(value)将任意值转换为 OTLP AnyValue 形状(用于 OTLP / HyperDX / PostHog 日志)
OTEL_SEVERITY_NUMBER, OTEL_SEVERITY_TEXTOTEL 日志严重级别表

身份头

defineHttpDrain 会自动为每个请求添加两个头部,方便接收方识别流量:

HeaderValue
User-Agentevlog/<version>(仅限 Node / 服务器运行时——浏览器会移除此头部)
X-Evlog-Source你提供的 drain name

如果你直接在 httpPost 之上构建 drain,也可以覆盖或抑制它们——参见 身份头

错误处理——已为你完成

defineHttpDrain 会自动强制执行所有最佳实践:

  1. 绝不抛出——失败会被捕获并记录,前缀为 [evlog/<name>]
  2. 重试——默认在瞬时错误时尝试 2 次(可通过 retries 配置)。
  3. 超时——默认 5000ms(可通过 timeout 配置)。
  4. 优雅降级——resolve() 返回 null 会使 drain 变为无操作。

如果你回退使用 defineDrain,请手动遵循相同规则。

作为社区包发布

社区 drain 的推荐结构:

my-evlog-drain/
├─ src/
│  ├─ drain.ts        # 通过 defineHttpDrain 创建 createMyDrain
│  └─ index.ts        # 重新导出
├─ test/              # vitest,mock fetch
├─ package.json       # peerDependency: "evlog"
└─ README.md

evlog 添加为 peerDependency(而不是 dependency)——你的包在安装时不应该拉入一份 evlog 副本。

做出了很棒的东西?提交 PR 为 Adapters 表添加一行——社区会感谢你。

下一步