Last active
July 6, 2018 17:29
-
-
Save fubhy/d30944689ae6733ccbb37124e7f0d5ed to your computer and use it in GitHub Desktop.
Avro serializer with cached schema registry
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const MAGIC_BYTE = 0; | |
class AvroSerializer { | |
private schemaRegistry; | |
constructor(schemaRegistry) { | |
this.schemaRegistry = schemaRegistry; | |
} | |
public async decode(message): Promise<string> { | |
if (message[0] !== MAGIC_BYTE) { | |
throw new Error('Message does not start with magic byte.'); | |
} | |
const schemaId = message.readInt32BE(1); | |
const type = await this.schemaRegistry.getById(schemaId); | |
return type.decode(message, 5).value; | |
} | |
} | |
export default AvroSerializer; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import axios from 'axios'; | |
import { parse } from 'avsc'; | |
const VALID_METHODS = ['GET', 'POST', 'PUT', 'DELETE']; | |
const ACCEPT_HEADER = 'application/vnd.schemaregistry.v1+json, application/vnd.schemaregistry+json, application/json'; | |
class CachedSchemaRegistryClient { | |
private url: string; | |
private idToSchema: Map<number, string> = new Map(); | |
private subjectToSchemaId: Map<string, Map<string, number>> = new Map(); | |
private subjectToSchemaVersion: Map<string, Map<string, number>> = new Map(); | |
constructor( | |
url, | |
caLocation?: string, | |
certLocation?: string, | |
keyLocation?: string, | |
) { | |
this.url = url; | |
// TODO: Handle certificates. | |
} | |
private sendRequest( | |
url: string, | |
method: string = 'GET', | |
body?: any, | |
headers: any = {} | |
) { | |
if (VALID_METHODS.indexOf(method) === -1) { | |
throw new Error(`Method "${method}" is invalid. Valid methods include "${VALID_METHODS.join(", ")}".`); | |
} | |
const headerDefaults = { | |
'Accept': ACCEPT_HEADER, | |
}; | |
const headerDefaultsForBody = body ? { | |
'Content-Length': Object.keys(body).length.toString() + '', | |
'Content-Type': 'application/vnd.schemaregistry.v1+json', | |
} : {}; | |
return axios.request({ | |
url, | |
method, | |
headers: { | |
headerDefaults, | |
headerDefaultsForBody, | |
...headers, | |
}, | |
data: body, | |
}); | |
} | |
private addToCache(cache, subject, schema, value): void { | |
if (!cache.has(subject)) { | |
cache.set(subject, new Map()); | |
} | |
cache.get(subject).set(schema, value); | |
} | |
private cacheSchema(schema, schemaId, subject?, version?): void { | |
// Make sure we don't overwrite anything. | |
let actualSchema = schema; | |
if (this.idToSchema.has(schemaId)) { | |
actualSchema = this.idToSchema.get(schemaId); | |
} | |
else { | |
this.idToSchema.set(schemaId, actualSchema); | |
} | |
if (subject) { | |
this.addToCache( | |
this.subjectToSchemaId, | |
subject, | |
actualSchema, | |
schemaId | |
); | |
if (version) { | |
this.addToCache( | |
this.subjectToSchemaVersion, | |
subject, | |
actualSchema, | |
version, | |
); | |
} | |
} | |
return actualSchema; | |
} | |
public async register(subject: string, avroSchema: string): Promise<number> { | |
const schemasToId = this.subjectToSchemaId.get(subject); | |
const schemaId = schemasToId && schemasToId.get(avroSchema); | |
if (typeof schemaId !== 'undefined') { | |
return schemaId; | |
} | |
const url = `${this.url}/subjects/${subject}/versions`; | |
const body = { | |
schema: JSON.stringify(avroSchema), | |
}; | |
const response = await this.sendRequest(url, 'POST', body); | |
if (response.status === 409) { | |
throw new Error('Incompatible Avro schema.'); | |
} | |
if (response.status === 422) { | |
throw new Error('Invalid Avro schema.'); | |
} | |
if (!(response.status >= 200 && response.status <= 299)) { | |
throw new Error('Unable to register schema.'); | |
} | |
const responseSchemaId = response.data['id'] as number; | |
this.cacheSchema(avroSchema, responseSchemaId, subject); | |
return responseSchemaId; | |
} | |
public async getById(schemaId: number) { | |
if (this.idToSchema.has(schemaId)) { | |
return this.idToSchema.get(schemaId); | |
} | |
const url = `${this.url}/schemas/ids/${schemaId}`; | |
const response = await this.sendRequest(url); | |
if (response.status === 404) { | |
// TODO: Add logging. | |
return null; | |
} | |
if (!(response.status >= 200 && response.status <= 299)) { | |
// TODO: Add logging. | |
return null; | |
} | |
try { | |
const schema = parse(response.data['schema'] as string); | |
return this.cacheSchema(schema, schemaId); | |
} | |
catch (e) { | |
throw new Error(`Received bad schema (id ${schemaId}) from registry.`); | |
} | |
} | |
public async getLatestSchema(subject: string): Promise<{ | |
id: number, | |
schema: string, | |
version: number, | |
} | null> { | |
const url = `${this.url}/subjects/${subject}/versions/latest`; | |
const response = await this.sendRequest(url); | |
if (response.status === 4040) { | |
// TODO: Add logging. | |
return null; | |
} | |
if (response.status === 422) { | |
// TODO: Add logging. | |
return null; | |
} | |
if (!(response.status >= 200 && response.status <= 299)) { | |
// TODO: Add logging. | |
return null; | |
} | |
const schemaId = parseInt(response.data['id'] as string, 10); | |
const version = parseInt(response.data['version'] as string, 10); | |
const schema = this.idToSchema.has(schemaId) ? | |
this.idToSchema.get(schemaId) : | |
parse(response.data['schema']); | |
this.cacheSchema(schema, schemaId, subject, version) | |
return { id: schemaId, schema, version }; | |
} | |
public async getVersion(subject: string, avroSchema: string): Promise<number | null> { | |
const schemasToVersion = this.subjectToSchemaVersion.get(subject); | |
const version = schemasToVersion && schemasToVersion.get(avroSchema); | |
if (typeof version !== 'undefined') { | |
return version; | |
} | |
const url = `${this.url}/subjects/${subject}`; | |
const body = { | |
schema: JSON.stringify(avroSchema), | |
}; | |
const response = await this.sendRequest(url, 'POST', body); | |
if (response.status === 404) { | |
// TODO: Add logging. | |
return null; | |
} | |
if (!(response.status >= 200 && response.status <= 299)) { | |
// TODO: Add logging. | |
return null; | |
} | |
const schemaId = parseInt(response.data['id'] as string, 10); | |
const responseVersion = parseInt(response.data['version'] as string, 10); | |
this.cacheSchema(avroSchema, schemaId, subject, responseVersion); | |
return responseVersion; | |
} | |
public async testCompatibility(subject: string, avroSchema: string, version: string = 'latest'): Promise<boolean> { | |
const url = `${this.url}/compatibility/subjects/${subject}/versions/${version}`; | |
const body = { | |
schema: JSON.stringify(avroSchema), | |
}; | |
const response = await this.sendRequest(url, 'POST', body); | |
if (response.status === 404) { | |
// TODO: Add logging. | |
return false; | |
} | |
if (response.status === 422) { | |
// TODO: Add logging. | |
return false; | |
} | |
if (response.status >= 200 && response.status <= 299) { | |
return !!response.data['is_compatible']; | |
} | |
// TODO: Add logging. | |
return false; | |
} | |
} | |
export default CachedSchemaRegistryClient; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import * as Kafka from 'node-rdkafka'; | |
import * as Rx from 'rxjs'; | |
import { multicast, switchMap } from 'rxjs/operators'; | |
import CachedSchemaRegistryClient from './kafka/CachedSchemaRegistryClient'; | |
import AvroSerializer from './kafka/AvroSerializer'; | |
const serializer = new AvroSerializer(new CachedSchemaRegistryClient( | |
process.env.SCHEMA_REGISTRY_URL, | |
)); | |
export default function consumeRecords(serializer) { | |
const record$ = new Rx.Observable(subscriber => { | |
try { | |
const consumer = new Kafka.KafkaConsumer({ | |
'group.id': 'meltspot-graphql-records', | |
'socket.keepalive.enable': true, | |
'enable.auto.commit': true, | |
'metadata.broker.list': process.env.KAFKA_BROKER | |
}, {}); | |
consumer.on('ready', () => { | |
consumer.subscribe(['record']); | |
consumer.consume(); | |
}); | |
consumer.on('data', value => subscriber.next(value)); | |
consumer.on('error', error => subscriber.error(error)); | |
consumer.connect({}, error => error && subscriber.error(error)); | |
} | |
catch (error) { | |
subscriber.error(error); | |
} | |
}); | |
// Decode avro messages. | |
const decoded$ = record$.pipe(switchMap(async (item: any) => { | |
const value = await (item.value && serializer.decode(item.value)); | |
const key = await (item.key && serializer.decode(item.key)); | |
return { | |
...item, | |
parsedValue: value, | |
parsedKey: key, | |
}; | |
})); | |
// Share (multicast) the records instead of creating non-shared | |
// observables for each client. | |
const multi$ = decoded$.pipe(multicast(new Rx.ReplaySubject(1))) as Rx.ConnectableObservable<any>; | |
multi$.connect(); | |
return multi$; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment