Created
July 7, 2017 21:32
-
-
Save GaalDornick/8920577ca92842f44d7bfd3a277c7545 to your computer and use it in GitHub Desktop.
Structured streaming Unions
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package spark.poc | |
import java.io.File | |
import java.nio.file.{Files, Path, Paths} | |
import org.apache.log4j.Logger | |
import org.apache.spark.SparkConf | |
import org.apache.spark.sql.SparkSession | |
import org.scalatest.Outcome | |
import org.scalatest.fixture | |
import scala.collection.JavaConverters._ | |
class StreamUnionTest extends fixture.FlatSpec { | |
private val logger = Logger.getLogger(this.getClass) | |
case class FixtureParam(sparkSession: SparkSession) | |
override def withFixture(test: OneArgTest): Outcome = { | |
val sparkSession = getSparkSession(test.name, "non_prod") | |
try { | |
withFixture(test.toNoArgTest(FixtureParam(sparkSession))) | |
} finally { | |
sparkSession.stop() | |
} | |
} | |
implicit def fixtureParamToSparkSession(fp: FixtureParam): SparkSession = { | |
fp.sparkSession | |
} | |
private def getSparkSession(appname: String, env: String, sparkConf: Option[SparkConf] = None) = { | |
logger.info("Creating SparkContext for " + appname) | |
val conf = if (sparkConf.isDefined) sparkConf.get else new SparkConf() | |
conf | |
.setAppName(appname) | |
.set("spark.io.compression.codec", "lz4") | |
.set("spark.hadoop.fs.s3a.server-side-encryption-algorithm", "AES256") | |
.set("spark.hadoop.fs.s3a.connection.timeout", "1800000") | |
.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2") | |
if (env != "prod") { | |
conf.set("quantum.jsonSchemaLocation", "file") | |
conf.set("spark.master", "local[*]") | |
conf.set("spark.sql.shuffle.partitions", "10") | |
} | |
val sc = SparkSession.builder().config(conf).getOrCreate() | |
sc.sparkContext.hadoopConfiguration.set("mapreduce.input.fileinputformat.input.dir.recursive", "true") | |
sc | |
} | |
private def delete(dir: Path) = { | |
if(Files.exists(dir)) { | |
Files.walk(dir).iterator().asScala.toList | |
.map(p => p.toFile) | |
.sortWith((o1, o2) => o1.compareTo(o2) > 0) | |
.foreach(_.delete) | |
} | |
} | |
"A streaming data frame" should "union with a streaming data frame" in { sparkSession => | |
val oldEvents = Paths.get("target/oldEvents/").toAbsolutePath | |
delete(oldEvents) | |
Files.createDirectories(oldEvents) | |
val oldEventFile1=oldEvents.resolve("event1.json") | |
Files.createFile(oldEventFile1) | |
Files.write(oldEventFile1, List("""{"eventId":1, "eventType":"A", "acctId":"X"}""", | |
"""{"eventId":2, "eventType":"B", "acctId":"X"}""", | |
"""{"eventId":3, "eventType":"C", "acctId":"X"}""", | |
"""{"eventId":4, "eventType":"D", "acctId":"X"}""", | |
"""{"eventId":5, "eventType":"A", "acctId":"Y"}""", | |
"""{"eventId":6, "eventType":"B", "acctId":"Y"}""", | |
"""{"eventId":7, "eventType":"C", "acctId":"Y"}""" | |
).toIterable.asJava) | |
val dfAllOldEvents = sparkSession.read.json(oldEvents.toString) | |
dfAllOldEvents.createOrReplaceTempView("allOldEvents") | |
val newEvents = Paths.get("target/newEvents/").toAbsolutePath | |
delete(newEvents) | |
Files.createDirectories(newEvents) | |
val dfNewEvents = sparkSession.readStream.schema(dfAllOldEvents.schema).json(newEvents.toString) | |
dfNewEvents.createOrReplaceTempView("newEvents") | |
val dfOldEvents = sparkSession.sql("Select allOldEvents.* from allOldEvents join newEvents on allOldEvents.acctId = newEvents.acctId") | |
dfOldEvents.createOrReplaceTempView("oldEvents") | |
val dfAllEvents = dfNewEvents.union(dfOldEvents) | |
//val dfAllEvents = sparkSession.sql("select * from oldEvents union select * from newEvents")// dfNewEvents.union(dfOldEvents) | |
dfAllEvents.writeStream | |
.outputMode("append") | |
.format("console") | |
.start() | |
val newEventFile1=newEvents.resolve("eventNew1.json") | |
//Files.createFile(newEventFile1) | |
Files.write(newEventFile1, List("""{"eventId":8, "eventType":"E", "acctId":"X"}""").toIterable.asJava) | |
sparkSession.streams.awaitAnyTermination(10000) | |
} | |
"A streaming data frame" should "union with itself" in { sparkSession => | |
val oldEvents = Paths.get("target/oldEvents/").toAbsolutePath | |
delete(oldEvents) | |
Files.createDirectories(oldEvents) | |
val oldEventFile1=oldEvents.resolve("event1.json") | |
Files.createFile(oldEventFile1) | |
Files.write(oldEventFile1, List("""{"eventId":1, "eventType":"A", "acctId":"X"}""", | |
"""{"eventId":2, "eventType":"B", "acctId":"X"}""", | |
"""{"eventId":3, "eventType":"C", "acctId":"X"}""", | |
"""{"eventId":4, "eventType":"D", "acctId":"X"}""", | |
"""{"eventId":5, "eventType":"A", "acctId":"Y"}""", | |
"""{"eventId":6, "eventType":"B", "acctId":"Y"}""", | |
"""{"eventId":7, "eventType":"C", "acctId":"Y"}""" | |
).toIterable.asJava) | |
val dfAllOldEvents = sparkSession.read.json(oldEvents.toString) | |
val newEvents = Paths.get("target/newEvents/").toAbsolutePath | |
delete(newEvents) | |
Files.createDirectories(newEvents) | |
val dfNewEvents = sparkSession.readStream.schema(dfAllOldEvents.schema).json(newEvents.toString) | |
val dfSelfJoinEvents = dfNewEvents.union(dfNewEvents) | |
dfSelfJoinEvents.writeStream | |
.outputMode("append") | |
.format("console") | |
.start() | |
val newEventFile1=newEvents.resolve("eventNew1.json") | |
//Files.createFile(newEventFile1) | |
Files.write(newEventFile1, List("""{"eventId":8, "eventType":"E", "acctId":"X"}""").toIterable.asJava) | |
sparkSession.streams.awaitAnyTermination(10000) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment