扩展

Stream

订阅流经 evlog 的广泛事件——可通过 createStreamDrain 在进程内订阅,或通过本地 SSE 流服务器经网络订阅。

evlog 提供了一个 stream 原语,因此任何本地消费者都可以订阅广泛事件,而无需重新实现一个 drain。它分为两层,并且相互构建:

  • 进程内总线createStreamDrain(),标准的发布/订阅。同步监听器、异步迭代器、带环形缓冲区的重放。
  • 网络桥接startStreamServer(),一个可选启用的 HTTP 迷你服务器,通过 Server-Sent Events 将进程内总线暴露给浏览器、CLI 或外部开发工具。
按设计仅限本地。 这两层都运行在单个 Node / Bun / Deno 进程内。它们可用于 pnpm dev、长期运行的自托管服务器,以及 VM 和容器(Fly、Railway、Coolify…)。它们适用于无服务器平台(Vercel Functions、Cloudflare Workers、AWS Lambda)——每次调用都是一个隔离进程。在那里做跨实例广播请使用真正的 broker(Redis Streams、NATS、Pub/Sub)。

进程内总线

createStreamDrain()·0/3 delivered
emit
bus
0/3
subscribe()
idle
events()
idle
replay()
0/8
ring
0/8
one emit · all subscribers replay for late joiners emitted: 0

createStreamDrain() 只是一个 drain。将它注册到 evlog 的 drain hook 上,就可以在事件发出时订阅它们——无需 HTTP、无需序列化、无需额外跳转。

在进程内订阅广泛事件

import { createStreamDrain } from 'evlog/stream'

const stream = createStreamDrain({ buffer: 200 })

nitroApp.hooks.hook('evlog:drain', stream.drain)

const off = stream.subscribe((event) => {
  if (event.level === 'error') notify(event)
})
off()

for (const past of stream.recent()) {
  bootstrap(past)
}
for await (const event of stream.events()) {
  bootstrap(event)
}

选项

选项类型描述
buffernumberrecent() 快照的环形缓冲区大小。设为 0 可禁用。默认:500
perSubscriberQueuenumber每个 events() 异步迭代器在最旧事件被丢弃前可排队的最大事件数。drain 本身永远不会被阻塞。默认:1000
filter(event) => boolean可选谓词,在每个被 drain 的事件上运行——返回 false 可完全跳过该事件(既不缓冲也不投递)。

stream.drain(events) 可接受单个事件或批量事件。stream.recent() 返回已缓冲事件的快照(最旧在前,最新在后),可用于临时检查或为 UI 面板填充初始数据。

网络桥接 — stream server

GET /sse·connecting
hello
data:{ type:"hello", data:{ pid: 84321, version: "2.16.0" } }
replay
data:{ type:"replay", data:{ POST /login → 200 } }
replay
data:{ type:"replay", data:{ GET / → 200 } }
event
data:{ type:"event", data:{ POST /checkout → 200 } }
event
data:{ type:"event", data:{ GET /cart → 200 } }
ping
data:{ type:"ping", data:{ ts: 1746796800123 } }
event
data:{ type:"event", data:{ POST /api/email → 500 } }
connect
replay0
events0
heartbeat30s

startStreamServer() 会在与你的应用相同的进程中启动一个小型 node:http 服务器,使用其自身的临时端口,并通过 Server-Sent Events 暴露进程内总线。任何消费者(浏览器标签页、CLI、Tauri/Electron 开发工具)都可以订阅——你的应用 API 不受影响。

严格按需启用。 除非你显式设置该选项,否则不会启动任何东西。开发环境中也不会自动启用——只有在你要求时服务器才会启动。

通过 SSE 暴露 evlog stream

启动内容

当你启用后,evlog 会调用 startStreamServer(),并:

  1. 选择一个临时可用端口(或你指定的端口)
  2. 127.0.0.1(或你的主机)上启动一个小型 node:http 服务器
  3. 将发现的 URL 写入 .evlog/stream.url
  4. 在服务器控制台打印一行横幅
  5. 将进程内 stream drain 挂接到 evlog 管道中
  6. 为 SSE 流暴露 GET /,为握手元数据暴露 GET /info

在关闭时(SIGINT / SIGTERM / process exit),服务器会清理监听器并删除 URL 文件。

按框架启用

export default defineNuxtConfig({
  modules: ['evlog/nuxt'],
  evlog: {
    stream: true,
    // 或:stream: { port: 4444, token: process.env.EVLOG_STREAM_TOKEN }
  },
})

Hono / Express / Fastify 集成不需要“功能 PR”——startStreamServer() 与你的框架是正交的。你只需启动一次,然后像其他 drain 一样将它的 drain 连接到 evlog 管道。

发现

这个迷你服务器运行在随机端口上,因此任何消费者都必须先发现它。

.evlog/stream.url
http://127.0.0.1:53942

直接从磁盘读取:

import { readFile } from 'node:fs/promises'

const url = (await readFile('.evlog/stream.url', 'utf-8')).trim()

或者——对于 Nuxt 应用中同源的浏览器标签页——请求发现路由:

const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())

线协议格式

每条 SSE 消息的形状都是 { evlog: '1', type, data }

type何时data
hello连接后的第一帧{ evlogVersion, bufferSize, heartbeatMs }
replay紧随 hello 之后,当消费者传入 ?since=<iso> 时重放缓冲事件WideEvent
event之后每个发出的事件WideEvent
ping心跳(默认每 15 秒一次,可通过 heartbeatMs 配置){ ts: number }
Browser consumer
const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())
const es = new EventSource(url)

es.onmessage = (msg) => {
  const { type, data } = JSON.parse(msg.data)
  if (type === 'event') events.push(data)
}

Auth + CORS

选项行为
token设置后,服务器要求每个请求都带有 Authorization: Bearer <token>,否则返回 401。
(无 token)仅当不存在 Origin 头,或来源主机是本地(127.0.0.1 / localhost / [::1])时才接受连接。其他来源返回 403。
host默认 127.0.0.1——永远无法从局域网访问。只有在设置了 token 时才应覆盖为 0.0.0.0
heartbeatMs心跳间隔(默认 15000)。
buffer保存在底层默认 stream 上的环形缓冲区——通过 ?since=<iso> 为后加入的客户端重放。默认 500

进一步了解

  • FS reader — 重放或尾随历史 NDJSON 文件(跨进程,重启后仍可继续)
  • Consumer recipes — 构建一个最小开发工具,管道输出到 curl + jq,先重放历史再进入实时状态