Last active
March 19, 2025 14:34
-
-
Save juliusmarminge/8631aedf060d02a52449d0021335f56e to your computer and use it in GitHub Desktop.
Ingest logs from Cloudflare Workers to Axiom
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { randomBytes } from "node:crypto"; | |
import { Array, Cause, HashMap, Inspectable, List, Logger } from "effect"; | |
import type { Env } from "./types"; | |
import { type EffectWorkerRuntime, makeEffectWorkerRuntime } from "./runtime"; | |
let workerId = ""; | |
let workerStartTime = ""; | |
let batch: Array<Record<string, unknown>> = []; | |
/** | |
* From https://github.com/axiomhq/axiom-cloudflare-workers | |
*/ | |
async function throttle<T extends (...args: never[]) => Promise<unknown>>( | |
fn: T, | |
wait: number, | |
maxLen: number, | |
) { | |
let timeoutInProgress = false; | |
return async function actual(this: unknown, ...args: Parameters<T>) { | |
if (batch.length >= maxLen) { | |
await fn.apply(this, args); | |
} else if (!timeoutInProgress) { | |
timeoutInProgress = true; | |
await new Promise((resolve, reject) => { | |
setTimeout(() => { | |
timeoutInProgress = false; | |
fn.apply(this, args).then(resolve).catch(resolve); | |
}, wait); | |
}); | |
} | |
}; | |
} | |
/** | |
* From https://github.com/axiomhq/axiom-cloudflare-workers | |
*/ | |
async function sendLogs(token: string, dataset: string) { | |
if (batch.length === 0) { | |
return; | |
} | |
const logs = batch; | |
batch = []; | |
const url = `https://api.axiom.co/v1/datasets/${dataset}/ingest`; | |
return fetch(url, { | |
signal: AbortSignal.timeout(30_000), | |
method: "POST", | |
body: logs.map((log) => jsonStringify(log)).join("\n"), | |
headers: { | |
"Content-Type": "application/x-ndjson", | |
Authorization: `Bearer ${token}`, | |
}, | |
}); | |
} | |
/** | |
* Ripped from Effect structuredLogger | |
*/ | |
function structuredMessage(u: unknown): unknown { | |
switch (typeof u) { | |
case "bigint": | |
case "function": | |
case "symbol": { | |
return String(u); | |
} | |
default: { | |
return Inspectable.toJSON(u); | |
} | |
} | |
} | |
/** | |
* Most here is ripped from Effect structuredLogger | |
*/ | |
function makeAxiomEffectLogger( | |
enqueue: (event: Record<string, unknown>) => void, | |
) { | |
return Logger.make(({ annotations, cause, date, logLevel, message, spans }) => { | |
const now = date.getTime(); | |
const fieldsObj: Record<string, unknown> = {}; | |
const spansObj: Record<string, number> = {}; | |
if (HashMap.size(annotations) > 0) { | |
for (const [k, v] of annotations) { | |
fieldsObj[k] = structuredMessage(v); | |
} | |
} | |
if (List.isCons(spans)) { | |
for (const span of spans) { | |
spansObj[span.label] = now - span.startTime; | |
} | |
} | |
fieldsObj.spans = spansObj; | |
const messageArr = Array.ensure(message); | |
enqueue({ | |
_time: now, | |
level: logLevel.label.toLowerCase(), | |
message: | |
messageArr.length === 1 | |
? structuredMessage(messageArr[0]) | |
: messageArr.map(structuredMessage), | |
error: Cause.isEmpty(cause) | |
? undefined | |
: Cause.pretty(cause, { renderErrorCause: true }), | |
fields: fieldsObj, | |
}); | |
}); | |
} | |
const throttledSendLogs = throttle(sendLogs, 1000, 100); | |
export function withExceptionLogging( | |
handler: ( | |
req: Request, | |
env: Env, | |
ctx: ExecutionContext, | |
runtime: EffectWorkerRuntime, | |
) => Promise<Response>, | |
): ExportedHandlerFetchHandler<Env> { | |
return async function wrappedHandler(req, env, ctx) { | |
workerStartTime ||= new Date().toISOString(); | |
workerId ||= randomBytes(8).toString("hex"); | |
const workerInfo = { | |
id: workerId, | |
started: workerStartTime, | |
datacenter: req.cf?.colo, | |
}; | |
const start = performance.now(); | |
const requestId = req.headers.get("cf-request-id"); | |
const rayId = req.headers.get("cf-ray"); | |
const url = new URL(req.url); | |
const requestInfo = { | |
startTime: start, | |
requestId: requestId, | |
rayId: rayId, | |
asn: req.cf?.asn, | |
asOrganization: req.cf?.asOrganization, | |
method: req.method, | |
path: url.pathname, | |
host: url.host, | |
userAgent: req.headers.get("user-agent"), | |
country: req.cf?.country, | |
city: req.cf?.city, | |
region: req.cf?.region, | |
ip: req.headers.get("cf-connecting-ip"), | |
}; | |
function enqueueLog(log: Record<string, unknown>) { | |
batch.push({ | |
...log, | |
request: requestInfo, | |
worker: workerInfo, | |
}); | |
} | |
try { | |
const runtime = makeKVRuntime({ | |
env, | |
logger: makeAxiomEffectLogger(enqueueLog), | |
}); | |
const response = await handler(req, env, ctx, logger, runtime); | |
const duration = performance.now() - start; | |
// Maybe log response? Might be expensive to log everything though... | |
return response; | |
} catch (error) { | |
const endTime = performance.now(); | |
const duration = endTime - start; | |
enqueueLog({ | |
_time: endTime, | |
level: "error", | |
message: `Uncaught exception: ${err.message}`, | |
fields: { | |
error, | |
...fields, | |
}, | |
}); | |
return new Response("Internal server error", { status: 500 }); | |
} finally { | |
ctx.waitUntil(throttledSendLogs(env.AXIOM_TOKEN, env.AXIOM_DATASET)); | |
} | |
}; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { ConfigProvider, Layer, Logger, ManagedRuntime } from "effect"; | |
import type { Env } from "./types"; | |
export function makeEffectWorkerRuntime(options: { | |
env: Env, | |
logger: Logger.Logger<unknown, unknown> | |
}) { | |
const cfp = ConfigProvider.fromJson(options.env); | |
const layer = Layer.provide( | |
Logger.replace(Logger.defaultLogger, options.logger), | |
Layer.setConfigProvider(cfp), | |
); | |
return ManagedRuntime.make(layer); | |
} | |
export type EffectWorkerRuntime = ReturnType<typeof makeEffectWorkerRuntime> |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Effect } from "effect"; | |
import { withExceptionLogging } from "./observability"; | |
import type { Env } from "./types"; | |
export default { | |
fetch: withExceptionLogging(async (req, env, ctx, logger, runtime) => { | |
Effect.log("Hello from Effect").pipe( | |
Effect.annotateLogs({ foo: "bar" }), | |
runtime.runSync | |
); | |
return Response.json({ ok: true }) | |
}), | |
} satisfies ExportedHandler<Env>; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment