Stream
evlog 提供了一个 stream 原语,因此任何本地消费者都可以订阅广泛事件,而无需重新实现一个 drain。它分为两层,并且相互构建:
- 进程内总线 —
createStreamDrain(),标准的发布/订阅。同步监听器、异步迭代器、带环形缓冲区的重放。 - 网络桥接 —
startStreamServer(),一个可选启用的 HTTP 迷你服务器,通过 Server-Sent Events 将进程内总线暴露给浏览器、CLI 或外部开发工具。
pnpm dev、长期运行的自托管服务器,以及 VM 和容器(Fly、Railway、Coolify…)。它们不适用于无服务器平台(Vercel Functions、Cloudflare Workers、AWS Lambda)——每次调用都是一个隔离进程。在那里做跨实例广播请使用真正的 broker(Redis Streams、NATS、Pub/Sub)。进程内总线
createStreamDrain() 只是一个 drain。将它注册到 evlog 的 drain hook 上,就可以在事件发出时订阅它们——无需 HTTP、无需序列化、无需额外跳转。
在进程内订阅广泛事件
import { createStreamDrain } from 'evlog/stream'
const stream = createStreamDrain({ buffer: 200 })
nitroApp.hooks.hook('evlog:drain', stream.drain)
const off = stream.subscribe((event) => {
if (event.level === 'error') notify(event)
})
off()
for (const past of stream.recent()) {
bootstrap(past)
}
for await (const event of stream.events()) {
bootstrap(event)
}
选项
| 选项 | 类型 | 描述 |
|---|---|---|
buffer | number | recent() 快照的环形缓冲区大小。设为 0 可禁用。默认:500。 |
perSubscriberQueue | number | 每个 events() 异步迭代器在最旧事件被丢弃前可排队的最大事件数。drain 本身永远不会被阻塞。默认:1000。 |
filter | (event) => boolean | 可选谓词,在每个被 drain 的事件上运行——返回 false 可完全跳过该事件(既不缓冲也不投递)。 |
stream.drain(events) 可接受单个事件或批量事件。stream.recent() 返回已缓冲事件的快照(最旧在前,最新在后),可用于临时检查或为 UI 面板填充初始数据。
网络桥接 — stream server
startStreamServer() 会在与你的应用相同的进程中启动一个小型 node:http 服务器,使用其自身的临时端口,并通过 Server-Sent Events 暴露进程内总线。任何消费者(浏览器标签页、CLI、Tauri/Electron 开发工具)都可以订阅——你的应用 API 不受影响。
通过 SSE 暴露 evlog stream
启动内容
当你启用后,evlog 会调用 startStreamServer(),并:
- 选择一个临时可用端口(或你指定的端口)
- 在
127.0.0.1(或你的主机)上启动一个小型node:http服务器 - 将发现的 URL 写入
.evlog/stream.url - 在服务器控制台打印一行横幅
- 将进程内 stream drain 挂接到 evlog 管道中
- 为 SSE 流暴露
GET /,为握手元数据暴露GET /info
在关闭时(SIGINT / SIGTERM / process exit),服务器会清理监听器并删除 URL 文件。
按框架启用
export default defineNuxtConfig({
modules: ['evlog/nuxt'],
evlog: {
stream: true,
// 或:stream: { port: 4444, token: process.env.EVLOG_STREAM_TOKEN }
},
})
import { defineStreamedInstrumentation } from 'evlog/next/stream'
export const { register } = defineStreamedInstrumentation({
stream: true,
})
import { startStreamServer } from 'evlog/stream'
if (process.env.NODE_ENV !== 'production' && process.env.EVLOG_STREAM === '1') {
const { drain } = await startStreamServer()
// 将 `drain` 接入你框架的 evlog drain hook
}
Hono / Express / Fastify 集成不需要“功能 PR”——startStreamServer() 与你的框架是正交的。你只需启动一次,然后像其他 drain 一样将它的 drain 连接到 evlog 管道。
发现
这个迷你服务器运行在随机端口上,因此任何消费者都必须先发现它。
http://127.0.0.1:53942
直接从磁盘读取:
import { readFile } from 'node:fs/promises'
const url = (await readFile('.evlog/stream.url', 'utf-8')).trim()
或者——对于 Nuxt 应用中同源的浏览器标签页——请求发现路由:
const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())
线协议格式
每条 SSE 消息的形状都是 { evlog: '1', type, data }:
type | 何时 | data |
|---|---|---|
hello | 连接后的第一帧 | { evlogVersion, bufferSize, heartbeatMs } |
replay | 紧随 hello 之后,当消费者传入 ?since=<iso> 时重放缓冲事件 | WideEvent |
event | 之后每个发出的事件 | WideEvent |
ping | 心跳(默认每 15 秒一次,可通过 heartbeatMs 配置) | { ts: number } |
const { url } = await fetch('/api/_evlog/stream-info').then(r => r.json())
const es = new EventSource(url)
es.onmessage = (msg) => {
const { type, data } = JSON.parse(msg.data)
if (type === 'event') events.push(data)
}
Auth + CORS
| 选项 | 行为 |
|---|---|
token | 设置后,服务器要求每个请求都带有 Authorization: Bearer <token>,否则返回 401。 |
| (无 token) | 仅当不存在 Origin 头,或来源主机是本地(127.0.0.1 / localhost / [::1])时才接受连接。其他来源返回 403。 |
host | 默认 127.0.0.1——永远无法从局域网访问。只有在设置了 token 时才应覆盖为 0.0.0.0。 |
heartbeatMs | 心跳间隔(默认 15000)。 |
buffer | 保存在底层默认 stream 上的环形缓冲区——通过 ?since=<iso> 为后加入的客户端重放。默认 500。 |
进一步了解
- FS reader — 重放或尾随历史 NDJSON 文件(跨进程,重启后仍可继续)
- Consumer recipes — 构建一个最小开发工具,管道输出到 curl + jq,先重放历史再进入实时状态
概览
观察流经管道的内容(stream、fs reader、consumer recipes),接入管道(plugins、enrichers、tail sampling、identity headers),或构建你自己的模块(custom drains、drain pipeline、自定义框架集成)。
自定义框架
为没有内置集成的 HTTP 框架(或非 HTTP 运行时)构建 evlog 支持。对于 (ctx, next) 中间件形状使用 defineFrameworkIntegration,或为其他情况使用 createMiddlewareLogger / createRequestLogger。