Skip to content

Instantly share code, notes, and snippets.

@slinkydeveloper
Last active June 2, 2025 18:45
Show Gist options
  • Save slinkydeveloper/ab844bc9413f2ce812db182aa84fdfa4 to your computer and use it in GitHub Desktop.
Save slinkydeveloper/ab844bc9413f2ce812db182aa84fdfa4 to your computer and use it in GitHub Desktop.
plugins {
application
kotlin("jvm") version "2.1.20"
kotlin("plugin.serialization") version "2.1.20"
id("com.google.devtools.ksp") version "2.1.20-1.0.32"
id("com.google.protobuf") version "0.9.4"
}
repositories {
mavenCentral()
maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") }
maven { url = uri("https://packages.confluent.io/maven/") }
}
val restateVersion = "2.1.0"
dependencies {
// Annotation processor
ksp("dev.restate:sdk-api-kotlin-gen:$restateVersion")
// Restate SDK
implementation("dev.restate:sdk-kotlin-http:$restateVersion")
// Need snapshot for making KtTypeTag public
implementation("dev.restate:sdk-serde-kotlinx:2.2.0-SNAPSHOT")
// Logging
implementation("org.apache.logging.log4j:log4j-api:2.24.3")
// Confluent stuff
implementation("io.confluent:kafka-protobuf-serializer:7.9.1")
implementation("io.confluent:kafka-schema-serializer:7.9.1")
implementation("io.confluent:kafka-schema-registry-client:7.9.1")
// Protobuf
implementation("com.google.protobuf:protobuf-java:3.6.1")
if (JavaVersion.current().isJava9Compatible()) {
// Workaround for @javax.annotation.Generated
// see: https://github.com/grpc/grpc-java/issues/3633
implementation("javax.annotation:javax.annotation-api:1.3.1")
}
}
kotlin {
jvmToolchain(21)
}
// Configure main class
application {
mainClass.set("my.example.GreeterKt")
}
protobuf {
protoc {
// The artifact spec for the Protobuf Compiler
artifact = "com.google.protobuf:protoc:3.6.1"
}
}
@Service
@CustomSerdeFactory(ProtobufConfluentSerdeFactory::class)
class Greeter {
@Handler
suspend fun greet(ctx: Context, name: MyMessage): MyMessage {
TODO("")
}
}
package my.example
import com.google.protobuf.Message
import com.google.protobuf.MessageLite
import com.google.protobuf.Parser
import com.google.protobuf.UnsafeByteOperations
import dev.restate.common.Slice
import dev.restate.serde.Serde
import dev.restate.serde.SerdeFactory
import dev.restate.serde.TypeRef
import dev.restate.serde.TypeTag
import dev.restate.serde.kotlinx.KotlinSerializationSerdeFactory.KtTypeTag
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer
class ProtobufConfluentSerdeFactory : SerdeFactory {
val schemaRegistryClient: SchemaRegistryClient? = createSchemaRegistryFromEnv()
@Suppress("UNCHECKED_CAST")
override fun <T : Any?> create(typeRef: TypeRef<T>) =
create(typeRef.type as? Class<*> ?: throw IllegalStateException("Expecting type to be class")) as Serde<T>
override fun <T : Any?> create(typeTag: TypeTag<T>): Serde<T> {
// This KtTypeTag is what the Restate KSP generator uses
if (typeTag is KtTypeTag) {
// Calls the overload below
return create(typeTag.type.java) as Serde<T>
}
return super.create(typeTag)
}
override fun <T : Any?> create(clazz: Class<T>): Serde<T> {
// Only accept Protobuf Messages
if (!Message::class.java.isAssignableFrom(clazz)) {
throw IllegalArgumentException("Class must be a Protobuf Message")
}
// Retrieve the parser using reflections
val parserMethod = clazz.getDeclaredMethod("parser")
val parser = parserMethod.invoke(null)
// Create the serde
@Suppress("UNCHECKED_CAST")
return ProtobufConfluentSerde(
schemaRegistryClient?.let { KafkaProtobufDeserializer<Message>(it) },
parser as Parser<Message>
) as Serde<T>
}
companion object {
private fun createSchemaRegistryFromEnv(): SchemaRegistryClient? {
// Up to you to how you wanna configure this,
// but it's a requirement for ProtobufConfluentSerdeFactory to have an empty constructor
val url = System.getenv("SCHEMA_REGISTRY_URL") ?: return null
val username = System.getenv("SCHEMA_REGISTRY_USERNAME")
val password = System.getenv("SCHEMA_REGISTRY_PASSWORD")
val config = mutableMapOf(
"schema.registry.url" to url
)
if (username != null && password != null) {
config["basic.auth.credentials.source"] = "USER_INFO"
config["basic.auth.user.info"] = "$username:$password"
}
return io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient(
url,
100, // Schema cache size
config
)
}
}
}
class ProtobufConfluentSerde<T: Message>(
val kafkaProtobufDeserializer: KafkaProtobufDeserializer<T>?,
val messageParser: Parser<T>
) : Serde<T> {
override fun serialize(value: T): Slice = Slice.wrap(
// Serialize using Protobuf API, no fuzz
(value as MessageLite).toByteString().asReadOnlyByteBuffer()
)
override fun deserialize(value: Slice): T {
// When deserializing, try first the KafkaProtobufDeserializer if configured
if (kafkaProtobufDeserializer != null) {
try {
return kafkaProtobufDeserializer.deserialize("mock", value.toByteArray())
} catch (_: Exception) {
// Bad luck, move on to the parser
}
}
// Try the Protobuf Parser
return messageParser.parseFrom(UnsafeByteOperations.unsafeWrap(value.asReadOnlyByteBuffer()))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment