Last active
June 2, 2025 18:45
-
-
Save slinkydeveloper/ab844bc9413f2ce812db182aa84fdfa4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
plugins { | |
application | |
kotlin("jvm") version "2.1.20" | |
kotlin("plugin.serialization") version "2.1.20" | |
id("com.google.devtools.ksp") version "2.1.20-1.0.32" | |
id("com.google.protobuf") version "0.9.4" | |
} | |
repositories { | |
mavenCentral() | |
maven { url = uri("https://s01.oss.sonatype.org/content/repositories/snapshots/") } | |
maven { url = uri("https://packages.confluent.io/maven/") } | |
} | |
val restateVersion = "2.1.0" | |
dependencies { | |
// Annotation processor | |
ksp("dev.restate:sdk-api-kotlin-gen:$restateVersion") | |
// Restate SDK | |
implementation("dev.restate:sdk-kotlin-http:$restateVersion") | |
// Need snapshot for making KtTypeTag public | |
implementation("dev.restate:sdk-serde-kotlinx:2.2.0-SNAPSHOT") | |
// Logging | |
implementation("org.apache.logging.log4j:log4j-api:2.24.3") | |
// Confluent stuff | |
implementation("io.confluent:kafka-protobuf-serializer:7.9.1") | |
implementation("io.confluent:kafka-schema-serializer:7.9.1") | |
implementation("io.confluent:kafka-schema-registry-client:7.9.1") | |
// Protobuf | |
implementation("com.google.protobuf:protobuf-java:3.6.1") | |
if (JavaVersion.current().isJava9Compatible()) { | |
// Workaround for @javax.annotation.Generated | |
// see: https://github.com/grpc/grpc-java/issues/3633 | |
implementation("javax.annotation:javax.annotation-api:1.3.1") | |
} | |
} | |
kotlin { | |
jvmToolchain(21) | |
} | |
// Configure main class | |
application { | |
mainClass.set("my.example.GreeterKt") | |
} | |
protobuf { | |
protoc { | |
// The artifact spec for the Protobuf Compiler | |
artifact = "com.google.protobuf:protoc:3.6.1" | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Service | |
@CustomSerdeFactory(ProtobufConfluentSerdeFactory::class) | |
class Greeter { | |
@Handler | |
suspend fun greet(ctx: Context, name: MyMessage): MyMessage { | |
TODO("") | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package my.example | |
import com.google.protobuf.Message | |
import com.google.protobuf.MessageLite | |
import com.google.protobuf.Parser | |
import com.google.protobuf.UnsafeByteOperations | |
import dev.restate.common.Slice | |
import dev.restate.serde.Serde | |
import dev.restate.serde.SerdeFactory | |
import dev.restate.serde.TypeRef | |
import dev.restate.serde.TypeTag | |
import dev.restate.serde.kotlinx.KotlinSerializationSerdeFactory.KtTypeTag | |
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient | |
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer | |
class ProtobufConfluentSerdeFactory : SerdeFactory { | |
val schemaRegistryClient: SchemaRegistryClient? = createSchemaRegistryFromEnv() | |
@Suppress("UNCHECKED_CAST") | |
override fun <T : Any?> create(typeRef: TypeRef<T>) = | |
create(typeRef.type as? Class<*> ?: throw IllegalStateException("Expecting type to be class")) as Serde<T> | |
override fun <T : Any?> create(typeTag: TypeTag<T>): Serde<T> { | |
// This KtTypeTag is what the Restate KSP generator uses | |
if (typeTag is KtTypeTag) { | |
// Calls the overload below | |
return create(typeTag.type.java) as Serde<T> | |
} | |
return super.create(typeTag) | |
} | |
override fun <T : Any?> create(clazz: Class<T>): Serde<T> { | |
// Only accept Protobuf Messages | |
if (!Message::class.java.isAssignableFrom(clazz)) { | |
throw IllegalArgumentException("Class must be a Protobuf Message") | |
} | |
// Retrieve the parser using reflections | |
val parserMethod = clazz.getDeclaredMethod("parser") | |
val parser = parserMethod.invoke(null) | |
// Create the serde | |
@Suppress("UNCHECKED_CAST") | |
return ProtobufConfluentSerde( | |
schemaRegistryClient?.let { KafkaProtobufDeserializer<Message>(it) }, | |
parser as Parser<Message> | |
) as Serde<T> | |
} | |
companion object { | |
private fun createSchemaRegistryFromEnv(): SchemaRegistryClient? { | |
// Up to you to how you wanna configure this, | |
// but it's a requirement for ProtobufConfluentSerdeFactory to have an empty constructor | |
val url = System.getenv("SCHEMA_REGISTRY_URL") ?: return null | |
val username = System.getenv("SCHEMA_REGISTRY_USERNAME") | |
val password = System.getenv("SCHEMA_REGISTRY_PASSWORD") | |
val config = mutableMapOf( | |
"schema.registry.url" to url | |
) | |
if (username != null && password != null) { | |
config["basic.auth.credentials.source"] = "USER_INFO" | |
config["basic.auth.user.info"] = "$username:$password" | |
} | |
return io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient( | |
url, | |
100, // Schema cache size | |
config | |
) | |
} | |
} | |
} | |
class ProtobufConfluentSerde<T: Message>( | |
val kafkaProtobufDeserializer: KafkaProtobufDeserializer<T>?, | |
val messageParser: Parser<T> | |
) : Serde<T> { | |
override fun serialize(value: T): Slice = Slice.wrap( | |
// Serialize using Protobuf API, no fuzz | |
(value as MessageLite).toByteString().asReadOnlyByteBuffer() | |
) | |
override fun deserialize(value: Slice): T { | |
// When deserializing, try first the KafkaProtobufDeserializer if configured | |
if (kafkaProtobufDeserializer != null) { | |
try { | |
return kafkaProtobufDeserializer.deserialize("mock", value.toByteArray()) | |
} catch (_: Exception) { | |
// Bad luck, move on to the parser | |
} | |
} | |
// Try the Protobuf Parser | |
return messageParser.parseFrom(UnsafeByteOperations.unsafeWrap(value.asReadOnlyByteBuffer())) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment