Created
February 26, 2020 20:44
-
-
Save fsarradin/88447deb2cf44c2f559ef3aa3fd0ee43 to your computer and use it in GitHub Desktop.
Read a fixed length format file by using ZIO, ZStream, and Magnolia
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "zkafka" | |
version := "0.1" | |
scalaVersion := "2.13.1" | |
libraryDependencies ++= Seq( | |
"dev.zio" %% "zio" % "1.0.0-RC17", | |
"dev.zio" %% "zio-streams" % "1.0.0-RC17", | |
"com.propensive" %% "magnolia" % "0.12.6" | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
H | |
D12345 John 32 | |
A12345 5 RUE DU TEMPLES93170 BAGNOLET | |
D12346 Mary 30 | |
D12347 Sebastian 35 | |
D12348 Ahmed 21 | |
D12349 Tony 63 | |
F |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.univalence | |
import java.io.{File, FileInputStream, IOException} | |
import java.util.concurrent.TimeUnit | |
import scala.language.experimental.macros | |
import scala.util.Random | |
import zio._ | |
import zio.clock.Clock | |
import zio.console._ | |
import zio.duration.Duration | |
import zio.stream._ | |
object zkafka extends zio.App { | |
import FromFixedLengthData._ | |
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] = { | |
val stream = open("data.flf") | |
val program: ZIO[Console, IOException, Unit] = | |
for { | |
data <- stream.chunks | |
.aggregate(ZSink.utf8DecodeChunk) | |
.aggregate(ZSink.splitLines) | |
.mapConcatChunk(identity) | |
.filter(line => line.length > 0 && !Set('H', 'F').contains(line(0))) | |
.map(parse) | |
.fold(Map.empty[UserId, CompleteUser]) { | |
case (m, None) => m | |
case (m, Some(Left(u))) => | |
val user = CompleteUser(u.id, u.name, u.age, None) | |
m.updated(u.id, user) | |
case (m, Some(Right(a))) => | |
m.updated(a.userId, m(a.userId).copy(address = Some(a))) | |
} | |
_ <- putStrLn(data.toString()) | |
} yield () | |
program | |
.fold({ f => | |
f.printStackTrace() | |
1 | |
}, _ => 0) | |
} | |
type ParseResult = Option[Either[User, Address]] | |
def parse(line: String): ParseResult = { | |
val discr = line.splitAt(1)._1 | |
if (discr == "D") | |
Some(Left(fromFixedLength[User](line))) | |
else if (discr == "A") | |
Some(Right(fromFixedLength[Address](line))) | |
else None | |
} | |
def open(filename: String): ZStreamChunk[Any, IOException, Byte] = { | |
val file = new File(filename) | |
val fis = new FileInputStream(file) | |
ZStream.fromInputStream(fis) | |
} | |
} | |
trait FixedLengthField { | |
val offset: Int | |
val length: Int | |
lazy val endOffset: Int = offset + length | |
// def from(line: String): this.type | |
} | |
case class UserId(value: String) | |
object UserId extends FixedLengthField { | |
override val offset: Int = 1 | |
override val length: Int = 5 | |
def from(line: String): UserId = | |
UserId(line.substring(UserId.offset, UserId.endOffset)) | |
} | |
case class UserName(value: String) | |
object UserName extends FixedLengthField { | |
override val offset: Int = 6 | |
override val length: Int = 10 | |
def from(line: String): UserName = | |
UserName(line.substring(UserName.offset, UserName.endOffset).trim) | |
} | |
case class UserAge(value: Int) | |
object UserAge extends FixedLengthField { | |
override val offset: Int = 16 | |
override val length: Int = 3 | |
def from(line: String): UserAge = | |
UserAge(line.substring(UserAge.offset, UserAge.endOffset).trim.toInt) | |
} | |
case class User(id: UserId, name: UserName, age: UserAge) | |
case class AddressStreet(value: String) | |
object AddressStreet extends FixedLengthField { | |
override val offset: Int = 6 | |
override val length: Int = 20 | |
def from(line: String): AddressStreet = | |
AddressStreet( | |
line.substring(AddressStreet.offset, AddressStreet.endOffset).trim | |
) | |
} | |
case class AddressPostCode(value: String) | |
object AddressPostCode extends FixedLengthField { | |
override val offset: Int = 26 | |
override val length: Int = 5 | |
def from(line: String): AddressPostCode = | |
AddressPostCode( | |
line.substring(AddressPostCode.offset, AddressPostCode.endOffset).trim | |
) | |
} | |
case class AddressCity(value: String) | |
object AddressCity extends FixedLengthField { | |
override val offset: Int = 31 | |
override val length: Int = 20 | |
def from(line: String): AddressCity = | |
AddressCity(line.substring(AddressCity.offset, AddressCity.endOffset).trim) | |
} | |
case class Address(userId: UserId, | |
street: AddressStreet, | |
postCode: AddressPostCode, | |
city: AddressCity) | |
case class CompleteUser(id: UserId, | |
name: UserName, | |
age: UserAge, | |
address: Option[Address]) | |
trait FromFixedLengthData[A] { | |
def fromFixedLength(line: String): A | |
} | |
object FromFixedLengthData { | |
import magnolia._ | |
@inline def apply[A]( | |
implicit ev: FromFixedLengthData[A] | |
): FromFixedLengthData[A] = ev | |
type Typeclass[A] = FromFixedLengthData[A] | |
implicit val userIdFromLength: FromFixedLengthData[UserId] = line => | |
UserId.from(line) | |
implicit val userNameFromLength: FromFixedLengthData[UserName] = line => | |
UserName.from(line) | |
implicit val userAgeFromLength: FromFixedLengthData[UserAge] = line => | |
UserAge.from(line) | |
implicit val addressStreetFromLength: FromFixedLengthData[AddressStreet] = | |
line => AddressStreet.from(line) | |
implicit val addressPostCodeFromLength: FromFixedLengthData[AddressPostCode] = | |
line => AddressPostCode.from(line) | |
implicit val addressCityFromLength: FromFixedLengthData[AddressCity] = line => | |
AddressCity.from(line) | |
def combine[T](ctx: CaseClass[Typeclass, T]): Typeclass[T] = | |
line => | |
ctx.construct { p => | |
p.typeclass.fromFixedLength(line) | |
} | |
implicit def gen[T]: Typeclass[T] = macro Magnolia.gen[T] | |
def fromFixedLength[A: FromFixedLengthData](line: String): A = FromFixedLengthData[A].fromFixedLength(line) | |
def main(args: Array[String]): Unit = { | |
val dataUser = "D12345 John 32" | |
val dataAddress = "A123455 RUE DU TEMPLES 93170BAGNOLET " | |
println(fromFixedLength[User](dataUser)) | |
println(fromFixedLength[Address](dataAddress)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment