Skip to content

Instantly share code, notes, and snippets.

@mattheworiordan
Last active April 17, 2026 14:39
Show Gist options
  • Select an option

  • Save mattheworiordan/db9b9a811d89580a4122ff91e76d1956 to your computer and use it in GitHub Desktop.

Select an option

Save mattheworiordan/db9b9a811d89580a4122ff91e76d1956 to your computer and use it in GitHub Desktop.
Repro: @ably/ai-transport encoder continues queuing appends after NACK (error 93002)

@ably/ai-transport: mutableMessages error handling

Issue 1: No early detection of mutableMessages misconfiguration

createServerTransport() and turn.start() both succeed on a channel without mutableMessages enabled. The error only surfaces deep in the streaming pipeline when the first appendMessage operation is NACKed with error 93002.

Expected: The SDK should detect at attach or turn-start time that mutableMessages is required, and fail fast with a clear error before streaming begins.

Observed: The transport attaches and starts streaming without complaint. The error appears as NACKs on individual append operations.

Issue 2: Encoder continues queuing appends after NACK

When streamResponse() is running and the server NACKs with error 93002, the encoder's fire-and-forget appendStream() keeps queuing new operations. Each token from the LLM stream generates an independent server round-trip and NACK, producing a flood of identical errors and PromiseRejectionHandledWarning messages.

Expected: The encoder core needs a circuit breaker. After the first NACK with a non-retryable error (93002), it should abort the stream and stop queuing further appends. appendStream() is fire-and-forget by design, but _flushPending() (which collects failures) only runs on closeStream/abort/close — never on appendStream itself. There's no feedback loop from publish errors back to the stream consumer.

Observed: Every token produces an independent server round-trip and NACK for the entire duration of the stream.

Broader question: Why require explicit mutableMessages opt-in for AI Transport?

If AI Transport inherently requires mutable messages to function, why does a developer need to explicitly create a channel rule to enable it? The current flow is:

  1. Developer installs @ably/ai-transport
  2. Writes code following the docs
  3. Runs it — gets a flood of cryptic NACK errors
  4. Has to discover that a channel rule with mutableMessages is needed
  5. Goes to the dashboard or CLI to create one

For an AI Transport product, shouldn't channels used by the SDK automatically support the operations the SDK needs? If we know it's an agent doing agentic work via AI Transport, requiring explicit configuration adds friction that doesn't need to exist.

Observation: No built-in generic codec

The SDK ships one codec: UIMessageCodec for the Vercel AI SDK. Anyone not using Vercel (LangChain, custom LLM integrations, plain text streaming) must implement the full Codec interface from scratch — createEncoder, createDecoder, createAccumulator, isTerminal, getMessageKey — just to stream text.

This repro script needed a custom codec for a minimal use case (stream text tokens). A built-in PlainTextCodec or similar would significantly reduce the barrier for non-Vercel users and make the SDK genuinely framework-agnostic in practice, not just in architecture.

How to run the repro

mkdir /tmp/ait-nack-repro && cd /tmp/ait-nack-repro

# Download the script
curl -sL https://gist.githubusercontent.com/mattheworiordan/db9b9a811d89580a4122ff91e76d1956/raw/repro.mjs -o repro.mjs

# Install dependencies
echo '{"type":"module"}' > package.json
npm install ably @ably/ai-transport

# Run — make sure the app does NOT have mutableMessages on the "no-mutable" namespace
ABLY_API_KEY="your-app-id.key-id:key-secret" node repro.mjs

# If you have the Ably CLI:
ABLY_API_KEY=$(ably auth keys current --value-only) node repro.mjs

What you'll see

--- Issue 1: Early detection ---
turn.start() succeeded without detecting mutableMessages is missing.

--- Issue 2: Encoder NACK handling ---
Streaming 50 tokens at ~40ms each...

[AblySDK Error] Ably: Protocol.onNack(): serial = 2; count = 1; err = ... code=93002 ...
[AblySDK Error] Ably: Protocol.onNack(): serial = 3; count = 11; err = ... code=93002 ...
... (continues for every token in the stream)

ISSUE 1 CONFIRMED: No early detection of mutableMessages misconfiguration.
ISSUE 2 CONFIRMED: Encoder kept queuing appends after first NACK.

Context

Found while building an interactive AI Transport demo for the Ably CLI. When a developer runs the demo without mutableMessages configured, they should get a clear, immediate error — not a flood of NACKs.

#!/usr/bin/env node
// See README.md in this gist for full explanation and setup instructions.
import Ably from "ably";
import { createServerTransport, createEncoderCore } from "@ably/ai-transport";
const apiKey = process.env.ABLY_API_KEY;
if (!apiKey) {
console.error("Error: ABLY_API_KEY is not set.\n");
console.error(' ABLY_API_KEY="your-key" node repro.mjs');
process.exit(1);
}
// Minimal codec — correctly forwards options to encoder core
const codec = {
createEncoder(channelWriter, options) {
const core = createEncoderCore(channelWriter, options);
let streamId = null;
return {
async writeMessages() {},
async writeEvent() {},
async appendEvent(event) {
if (event.type === "text") {
if (!streamId) {
streamId = `stream-${Date.now()}`;
await core.startStream(streamId, { name: "text", data: event.text });
} else {
core.appendStream(streamId, event.text); // fire-and-forget
}
} else if (event.type === "finish" && streamId) {
await core.closeStream(streamId, { name: "text", data: "" });
streamId = null;
}
},
async abort() { await core.abortAllStreams(); },
async close() { await core.close(); },
};
},
createDecoder: () => ({ decode: () => [] }),
createAccumulator: () => ({
processOutputs() {}, updateMessage() {},
messages: [], completedMessages: [], hasActiveStream: false,
}),
isTerminal: (e) => e.type === "finish",
getMessageKey: () => "unknown",
};
// Setup
const client = new Ably.Realtime({ key: apiKey });
const channelName = `no-mutable:nack-repro-${Date.now()}`;
const channel = client.channels.get(channelName);
let errorCount = 0;
const errorCodes = new Map();
process.on("uncaughtException", (err) => {
errorCount++;
const code = err.code || "unknown";
errorCodes.set(code, (errorCodes.get(code) || 0) + 1);
});
console.log("@ably/ai-transport mutableMessages error handling repro");
console.log("=======================================================\n");
console.log(`Channel: ${channelName} (mutableMessages NOT enabled)\n`);
await new Promise((resolve, reject) => {
client.connection.once("connected", resolve);
client.connection.once("failed", reject);
});
console.log("Connected.\n");
// Issue 1: Does the SDK detect misconfiguration early?
console.log("--- Issue 1: Early detection ---");
const transport = createServerTransport({ channel, codec });
const turnId = `turn-${Date.now()}`;
const turn = transport.newTurn({ turnId, clientId: "repro" });
await turn.start();
console.log("turn.start() succeeded without detecting mutableMessages is missing.\n");
// Issue 2: Does the encoder stop after the first NACK?
console.log("--- Issue 2: Encoder NACK handling ---");
const TOKEN_COUNT = 50;
console.log(`Streaming ${TOKEN_COUNT} tokens at ~40ms each...\n`);
const stream = new ReadableStream({
async start(controller) {
for (let i = 0; i < TOKEN_COUNT; i++) {
controller.enqueue({ type: "text", text: `token-${i} ` });
await new Promise((r) => setTimeout(r, 40));
}
controller.enqueue({ type: "finish" });
controller.close();
},
});
const t0 = Date.now();
let endReason = "unknown";
try {
const result = await turn.streamResponse(stream);
endReason = result.reason;
console.log(`streamResponse() returned: reason=${result.reason}`);
} catch (err) {
endReason = "error";
console.log(`streamResponse() threw: code=${err.code}`);
}
try {
await turn.end(endReason);
} catch {
// Connection may already be failing
}
await new Promise((r) => setTimeout(r, 3000));
console.log(`\n--- Results ---`);
console.log(`Tokens: ${TOKEN_COUNT}`);
console.log(`Errors: ${errorCount}`);
console.log(`Time: ${Date.now() - t0}ms`);
console.log(`Codes: ${[...errorCodes.entries()].map(([k, v]) => `${k} (x${v})`).join(", ")}`);
console.log("");
if (errorCount > 5) {
console.log("ISSUE 2 CONFIRMED: Encoder kept queuing appends after first NACK.");
}
console.log("ISSUE 1 CONFIRMED: No early detection of mutableMessages misconfiguration.");
client.close();
setTimeout(() => process.exit(0), 500);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment