import * as restate from '@restatedev/restate-sdk';
import { execa } from 'execa';
import which from 'which';
import { GenericContainer, StartedTestContainer, TestContainers, Wait } from 'testcontainers';
import * as http2 from 'http2';
import * as net from 'net';
// Prepare the restate server
class ReassignableRestateEndpointServer {
currentHandler!: (req: http2.Http2ServerRequest, res: http2.Http2ServerResponse) => void;
restateServer!: http2.Http2Server;
endpointPort!: number;
serverUrl!: string;
deploymentId?: string;
restateCliPath: string | null | undefined;
registered = false;
constructor() {}
async init() {
// Start HTTP2 server on random port
this.restateServer = http2.createServer((req, res) => this.currentHandler(req, res));
await new Promise((resolve, reject) => {
this.restateServer.listen(0).once('listening', resolve).once('error', reject);
});
this.endpointPort = (this.restateServer.address() as net.AddressInfo).port;
this.restateCliPath = await which('restate', { nothrow: true });
}
setServerUrl(serverUrl: string) {
this.serverUrl = serverUrl;
}
/** Replace the current set of endpoints with a new one. */
async registerEndpoints(
mountServicesFn: (server: restate.RestateEndpoint) => restate.RestateEndpoint,
reregister = false
) {
const restateEndpoint = mountServicesFn(restate.endpoint());
this.currentHandler = restateEndpoint.http2Handler();
if (reregister || !this.registered) {
if (this.deploymentId) {
await this.deleteEndpointWithRestate();
}
this.deploymentId = await registerEndpointWithRestate(this.serverUrl, this.endpointPort);
this.registered = true;
}
}
async stop() {
if (this.deploymentId) {
await this.deleteEndpointWithRestate();
}
this.restateServer?.close();
}
static async start(): Promise<ReassignableRestateEndpointServer> {
const server = new ReassignableRestateEndpointServer();
await server.init();
console.info(`Restate endpoint listening on port ${server.endpointPort}`);
return server;
}
async cleanUpInvocations(serviceNames: string[]) {
const cliPath = this.restateCliPath;
if (!cliPath) {
return;
}
await Promise.all(
serviceNames.map(async (serviceName) => {
// Remove all invocations for the service.
// This can be done through the postgres interface as well but it's easier to
// use the CLI.
// This doesn't always find the invocations for reasons that aren't yet clear.
// You can use just cleanup-restate after the test to remove anything left over.
await execa({
reject: false,
all: true,
env: {
RESTATE_ADMIN_URL: this.serverUrl,
},
})`${cliPath} --yes invocations cancel --kill ${serviceName}`;
})
);
}
async deleteEndpointWithRestate() {
const serviceDesc = await fetch(`${this.serverUrl}/deployments/${this.deploymentId}`).then(
(r) => r.json()
);
// eslint-disable-next-line @typescript-eslint/no-unsafe-call
const serviceNames: string[] = serviceDesc.services.map((s: { name: string }) => s.name);
const res = await fetch(`${this.serverUrl}/deployments/${this.deploymentId}?force=true`, {
method: 'DELETE',
});
if (!res.ok) {
const badResponse = await res.text();
throw new Error(`Error ${res.status} during registration: ${badResponse}`);
}
await this.cleanUpInvocations(serviceNames);
}
}
async function registerEndpointWithRestate(serverUrl: string, endpointPort: number) {
const endpointHost = startedAdminUrl ? 'host.docker.internal' : 'host.testcontainers.internal';
// Register this service endpoint
const res = await fetch(`${serverUrl}/deployments`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
// See https://node.testcontainers.org/features/networking/#expose-host-ports-to-container
uri: `http://${endpointHost}:${endpointPort}`,
// Set force for testing
force: true,
}),
});
if (!res.ok) {
const badResponse = await res.text();
throw new Error(`Error ${res.status} during registration: ${badResponse}`);
}
const result = await res.json();
console.info('Registered', result.id);
return result.id;
}
// Prepare the restate testcontainer
async function prepareRestateTestContainer(
restateServerPort: number
): Promise<StartedTestContainer> {
const restateContainer = new GenericContainer('docker.io/restatedev/restate:1.1.2')
.withEnvironment({})
// Just 1 partition for testing to start up faster
.withCommand(['--bootstrap-num-partitions', '1'])
// Expose ports
.withExposedPorts(8080, 9070)
// Wait start on health checks
.withWaitStrategy(Wait.forAll([Wait.forHttp('/health', 9070)]));
// This MUST be executed before starting the restate container
// Expose host port to access the restate server
await TestContainers.exposeHostPorts(restateServerPort);
// Start restate container
return await restateContainer.start();
}
function adminBaseUrl(startedRestateContainer: StartedTestContainer) {
return `http://${startedRestateContainer.getHost()}:${startedRestateContainer.getMappedPort(
9070
)}`;
}
// See libs/dev-services/restate.sh for these ports
const startedAdminUrl = process.env.USE_TEST_CONTAINERS
? ''
: process.env.TEST_RESTATE_ADMIN_URL || 'http://localhost:9080';
const startedClientUrl = process.env.USE_TEST_CONTAINERS
? ''
: process.env.TEST_RESTATE_CLIENT_URL || 'http://localhost:8099';
export class RestateTestEnvironment {
logs = '';
constructor(
readonly startedRestateHttpServer: ReassignableRestateEndpointServer,
readonly startedRestateContainer?: StartedTestContainer
) {}
public baseUrl(): string {
if (this.startedRestateContainer) {
return `http://${this.startedRestateContainer.getHost()}:${this.startedRestateContainer.getMappedPort(
8080
)}`;
} else if (startedClientUrl) {
return startedClientUrl;
} else {
throw new Error('Restate container not started, but we also had no client url');
}
}
public adminAPIBaseUrl(): string {
if (this.startedRestateContainer) {
return adminBaseUrl(this.startedRestateContainer);
} else if (startedAdminUrl) {
return startedAdminUrl;
} else {
throw new Error('Restate container not started, but we also had no admin url');
}
}
public async stop() {
await this.startedRestateContainer?.stop();
await this.startedRestateHttpServer.stop();
}
public static async start(): Promise<RestateTestEnvironment> {
let startedRestateHttpServer = await ReassignableRestateEndpointServer.start();
if ((startedAdminUrl && !startedClientUrl) || (!startedAdminUrl && startedClientUrl)) {
throw new Error('Saw TEST_RESTATE_ADMIN_URL or TEST_RESTATE_CLIENT_URL, but not both');
}
if (!startedAdminUrl) {
let startedRestateContainer = await prepareRestateTestContainer(
startedRestateHttpServer.endpointPort
);
const env = new RestateTestEnvironment(startedRestateHttpServer, startedRestateContainer);
let logStream = await startedRestateContainer.logs();
logStream.on('data', (data: string | Buffer) => (env.logs += data.toString()));
logStream.on('error', (err) => (env.logs += err.toString()));
startedRestateHttpServer.setServerUrl(adminBaseUrl(startedRestateContainer));
return env;
} else {
startedRestateHttpServer.setServerUrl(startedAdminUrl);
return new RestateTestEnvironment(startedRestateHttpServer, undefined);
}
}
/** Replace the current set of endpoints with a new one. This should be run before every test with the appropriate
* endpoints if you are using a new database for each test. */
async registerEndpoints(
mountServicesFn: (server: restate.RestateEndpoint) => restate.RestateEndpoint,
reregister = false
) {
await this.startedRestateHttpServer.registerEndpoints(mountServicesFn, reregister);
}
}