Created
March 2, 2020 17:55
-
-
Save marquesds/880c6ed08bee14122523da637513a690 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import io.circe.{Decoder, Encoder} | |
import io.circe.parser.decode | |
import io.circe.syntax._ | |
import org.apache.flink.api.common.serialization.{DeserializationSchema, SerializationSchema} | |
import org.apache.flink.api.java.typeutils.TypeExtractor | |
import scala.reflect.ClassTag | |
/* Generic Flink schema to encode/decode json with Circe */ | |
case class FlinkCirceSchema[A <: AnyRef : ClassTag]()(implicit decoder: Decoder[A], encoder: Encoder[A]) | |
extends SerializationSchema[A] | |
with DeserializationSchema[A] { | |
import org.apache.flink.api.common.typeinfo.TypeInformation | |
override def deserialize(bytes: Array[Byte]): A = decode[A](new String(bytes)) match { | |
case Right(value) => value | |
case Left(_) => null.asInstanceOf[A] | |
} | |
override def serialize(element: A): Array[Byte] = element.asJson.noSpaces.getBytes | |
override def isEndOfStream(nextElement: A): Boolean = false | |
override def getProducedType: TypeInformation[A] = { | |
import scala.reflect._ | |
TypeExtractor.getForClass(classTag[A].runtimeClass.asInstanceOf[Class[A]]) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment