Skip to content

Instantly share code, notes, and snippets.

@juliusmarminge
Last active March 19, 2025 14:34
Show Gist options
  • Save juliusmarminge/8631aedf060d02a52449d0021335f56e to your computer and use it in GitHub Desktop.
Save juliusmarminge/8631aedf060d02a52449d0021335f56e to your computer and use it in GitHub Desktop.
Ingest logs from Cloudflare Workers to Axiom
import { randomBytes } from "node:crypto";
import { Array, Cause, HashMap, Inspectable, List, Logger } from "effect";
import type { Env } from "./types";
import { type EffectWorkerRuntime, makeEffectWorkerRuntime } from "./runtime";
let workerId = "";
let workerStartTime = "";
let batch: Array<Record<string, unknown>> = [];
/**
* From https://github.com/axiomhq/axiom-cloudflare-workers
*/
async function throttle<T extends (...args: never[]) => Promise<unknown>>(
fn: T,
wait: number,
maxLen: number,
) {
let timeoutInProgress = false;
return async function actual(this: unknown, ...args: Parameters<T>) {
if (batch.length >= maxLen) {
await fn.apply(this, args);
} else if (!timeoutInProgress) {
timeoutInProgress = true;
await new Promise((resolve, reject) => {
setTimeout(() => {
timeoutInProgress = false;
fn.apply(this, args).then(resolve).catch(resolve);
}, wait);
});
}
};
}
/**
* From https://github.com/axiomhq/axiom-cloudflare-workers
*/
async function sendLogs(token: string, dataset: string) {
if (batch.length === 0) {
return;
}
const logs = batch;
batch = [];
const url = `https://api.axiom.co/v1/datasets/${dataset}/ingest`;
return fetch(url, {
signal: AbortSignal.timeout(30_000),
method: "POST",
body: logs.map((log) => jsonStringify(log)).join("\n"),
headers: {
"Content-Type": "application/x-ndjson",
Authorization: `Bearer ${token}`,
},
});
}
/**
* Ripped from Effect structuredLogger
*/
function structuredMessage(u: unknown): unknown {
switch (typeof u) {
case "bigint":
case "function":
case "symbol": {
return String(u);
}
default: {
return Inspectable.toJSON(u);
}
}
}
/**
* Most here is ripped from Effect structuredLogger
*/
function makeAxiomEffectLogger(
enqueue: (event: Record<string, unknown>) => void,
) {
return Logger.make(({ annotations, cause, date, logLevel, message, spans }) => {
const now = date.getTime();
const fieldsObj: Record<string, unknown> = {};
const spansObj: Record<string, number> = {};
if (HashMap.size(annotations) > 0) {
for (const [k, v] of annotations) {
fieldsObj[k] = structuredMessage(v);
}
}
if (List.isCons(spans)) {
for (const span of spans) {
spansObj[span.label] = now - span.startTime;
}
}
fieldsObj.spans = spansObj;
const messageArr = Array.ensure(message);
enqueue({
_time: now,
level: logLevel.label.toLowerCase(),
message:
messageArr.length === 1
? structuredMessage(messageArr[0])
: messageArr.map(structuredMessage),
error: Cause.isEmpty(cause)
? undefined
: Cause.pretty(cause, { renderErrorCause: true }),
fields: fieldsObj,
});
});
}
const throttledSendLogs = throttle(sendLogs, 1000, 100);
export function withExceptionLogging(
handler: (
req: Request,
env: Env,
ctx: ExecutionContext,
runtime: EffectWorkerRuntime,
) => Promise<Response>,
): ExportedHandlerFetchHandler<Env> {
return async function wrappedHandler(req, env, ctx) {
workerStartTime ||= new Date().toISOString();
workerId ||= randomBytes(8).toString("hex");
const workerInfo = {
id: workerId,
started: workerStartTime,
datacenter: req.cf?.colo,
};
const start = performance.now();
const requestId = req.headers.get("cf-request-id");
const rayId = req.headers.get("cf-ray");
const url = new URL(req.url);
const requestInfo = {
startTime: start,
requestId: requestId,
rayId: rayId,
asn: req.cf?.asn,
asOrganization: req.cf?.asOrganization,
method: req.method,
path: url.pathname,
host: url.host,
userAgent: req.headers.get("user-agent"),
country: req.cf?.country,
city: req.cf?.city,
region: req.cf?.region,
ip: req.headers.get("cf-connecting-ip"),
};
function enqueueLog(log: Record<string, unknown>) {
batch.push({
...log,
request: requestInfo,
worker: workerInfo,
});
}
try {
const runtime = makeKVRuntime({
env,
logger: makeAxiomEffectLogger(enqueueLog),
});
const response = await handler(req, env, ctx, logger, runtime);
const duration = performance.now() - start;
// Maybe log response? Might be expensive to log everything though...
return response;
} catch (error) {
const endTime = performance.now();
const duration = endTime - start;
enqueueLog({
_time: endTime,
level: "error",
message: `Uncaught exception: ${err.message}`,
fields: {
error,
...fields,
},
});
return new Response("Internal server error", { status: 500 });
} finally {
ctx.waitUntil(throttledSendLogs(env.AXIOM_TOKEN, env.AXIOM_DATASET));
}
};
}
import { ConfigProvider, Layer, Logger, ManagedRuntime } from "effect";
import type { Env } from "./types";
export function makeEffectWorkerRuntime(options: {
env: Env,
logger: Logger.Logger<unknown, unknown>
}) {
const cfp = ConfigProvider.fromJson(options.env);
const layer = Layer.provide(
Logger.replace(Logger.defaultLogger, options.logger),
Layer.setConfigProvider(cfp),
);
return ManagedRuntime.make(layer);
}
export type EffectWorkerRuntime = ReturnType<typeof makeEffectWorkerRuntime>
import { Effect } from "effect";
import { withExceptionLogging } from "./observability";
import type { Env } from "./types";
export default {
fetch: withExceptionLogging(async (req, env, ctx, logger, runtime) => {
Effect.log("Hello from Effect").pipe(
Effect.annotateLogs({ foo: "bar" }),
runtime.runSync
);
return Response.json({ ok: true })
}),
} satisfies ExportedHandler<Env>;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment