适配器

自定义适配器

构建自己的适配器,将日志发送到任意目标。使用工厂模式、批量处理、过滤和错误处理的最佳实践。

你可以创建自定义适配器,将日志发送到任意服务或目标。适配器就是一个接收 DrainContext 并将数据发送到某处的函数。

基本结构

“drain” 是一个接收 DrainContext 并将数据发送到某处的函数:

lib/my-drain.ts
import type { DrainContext } from 'evlog'

const drain = async (ctx: DrainContext) => {
  await fetch('https://your-service.com/logs', {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify(ctx.event),
  })
}

然后将其与你的框架连接:

// server/plugins/evlog-drain.ts
export default defineNitroPlugin((nitroApp) => {
  nitroApp.hooks.hook('evlog:drain', drain)
})

DrainContext 参考

types.ts
interface DrainContext {
  /** 包含所有累积上下文信息的完整事件 */
  event: WideEvent

  /** 请求元数据 */
  request?: {
    method: string
    path: string
    requestId: string
  }

  /** 已过滤的敏感 HTTP 头部 */
  headers?: Record<string, string>
}

interface WideEvent {
  timestamp: string
  level: 'debug' | 'info' | 'warn' | 'error'
  service: string
  environment?: string
  version?: string
  region?: string
  commitHash?: string
  requestId?: string
  // ... 通过 log.set() 添加的所有字段
  [key: string]: unknown
}

工厂模式

对于可重用的适配器,使用工厂模式:

lib/my-adapter.ts
import type { DrainContext } from 'evlog'

export interface MyAdapterConfig {
  apiKey: string
  endpoint?: string
  timeout?: number
}

export function createMyAdapter(config: MyAdapterConfig) {
  const endpoint = config.endpoint ?? 'https://api.myservice.com/ingest'
  const timeout = config.timeout ?? 5000

  return async (ctx: DrainContext) => {
    const controller = new AbortController()
    const timeoutId = setTimeout(() => controller.abort(), timeout)

    try {
      const response = await fetch(endpoint, {
        method: 'POST',
        headers: {
          'Content-Type': 'application/json',
          'X-API-Key': config.apiKey,
        },
        body: JSON.stringify(ctx.event),
        signal: controller.signal,
      })

      if (!response.ok) {
        console.error(`[my-adapter] 请求失败: ${response.status}`)
      }
    } catch (error) {
      if (error instanceof Error && error.name === 'AbortError') {
        console.error('[my-adapter] 请求超时')
      } else {
        console.error('[my-adapter] 错误:', error)
      }
    } finally {
      clearTimeout(timeoutId)
    }
  }
}

然后像其他 drain 一样将适配器传递到框架中:

lib/my-adapter.ts
const drain = createMyAdapter({
  apiKey: process.env.MY_SERVICE_API_KEY!,
})

读取配置

推荐的模式是:覆盖 > 环境变量。如果还需要 Nuxt/Nitro 的 runtimeConfig 支持,可以将其作为回退添加:

lib/my-adapter.ts
export function createMyAdapter(overrides?: Partial<MyAdapterConfig>) {
  return async (ctx: DrainContext) => {
    const config = {
      apiKey: overrides?.apiKey ?? process.env.MY_SERVICE_API_KEY,
      endpoint: overrides?.endpoint ?? process.env.MY_SERVICE_ENDPOINT,
    }

    if (!config.apiKey) {
      console.error('[my-adapter] 缺少 API 密钥')
      return
    }

    // 发送事件...
  }
}

过滤事件

在 drain 函数中过滤要发送的事件:

lib/my-drain.ts
const drain = async (ctx: DrainContext) => {
  if (ctx.event.level !== 'error') return
  if (ctx.request?.path === '/health') return
  if (ctx.event._sampled === false) return

  await sendToMyService(ctx.event)
}

转换事件

在发送前转换事件:

lib/my-drain.ts
const drain = async (ctx: DrainContext) => {
  const payload = {
    ts: new Date(ctx.event.timestamp).getTime(),
    severity: ctx.event.level.toUpperCase(),
    message: JSON.stringify(ctx.event),
    labels: {
      service: ctx.event.service,
      env: ctx.event.environment,
    },
    attributes: {
      method: ctx.event.method,
      path: ctx.event.path,
      status: ctx.event.status,
      duration: ctx.event.duration,
    },
  }

  await fetch('https://logs.example.com/v1/push', {
    method: 'POST',
    body: JSON.stringify(payload),
  })
}

批量处理

对于高吞吐量的场景,使用 Drain Pipeline 来批量处理事件,在失败时重试,并自动处理缓冲区溢出:

lib/my-drain.ts
import type { DrainContext } from 'evlog'
import { createDrainPipeline } from 'evlog/pipeline'

const pipeline = createDrainPipeline<DrainContext>({
  batch: { size: 100, intervalMs: 5000 },
})

const drain = pipeline(async (batch) => {
  await fetch('https://api.example.com/logs/batch', {
    method: 'POST',
    body: JSON.stringify(batch.map(ctx => ctx.event)),
  })
})
查看 Pipeline 文档 获取完整的选项参考、错误重试策略和缓冲区溢出处理。

错误处理最佳实践

  1. 不要抛出错误 - drain 不应导致你的应用崩溃
  2. 静默记录失败 - 使用 console.error 进行调试
  3. 使用超时 - 防止请求挂起
  4. 优雅降级 - 如果配置缺失则跳过发送
lib/robust-adapter.ts
export function createRobustAdapter(config: Config) {
  return async (ctx: DrainContext) => {
    // 验证配置
    if (!config.apiKey) {
      console.error('[adapter] 缺少 API 密钥,已跳过')
      return
    }

    const controller = new AbortController()
    const timeoutId = setTimeout(() => controller.abort(), 5000)

    try {
      await fetch(config.endpoint, {
        method: 'POST',
        body: JSON.stringify(ctx.event),
        signal: controller.signal,
      })
    } catch (error) {
      // 记录但不抛出
      console.error('[adapter] 发送失败:', error)
    } finally {
      clearTimeout(timeoutId)
    }
  }
}

下一步