Created
January 4, 2016 07:09
-
-
Save RadoBuransky/dc20d72593ff732e74f7 to your computer and use it in GitHub Desktop.
The idea here is to first create a state stream containing strings and then try to access this state as integers which should crash. But the thing is that the sesond test doesn't "see" state from the first test. Why?
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.buransky | |
import _root_.kafka.serializer.StringDecoder | |
import net.manub.embeddedkafka.EmbeddedKafka | |
import org.apache.spark.SparkConf | |
import org.apache.spark.streaming._ | |
import org.apache.spark.streaming.dstream.DStream | |
import org.apache.spark.streaming.kafka.KafkaUtils | |
import org.scalatest.FunSuite | |
/** | |
* The idea here is to first create a state stream containing strings and then try to access this state as integers | |
* which should crash. But the thing is that the sesond test doesn't "see" state from the first test. Why? | |
*/ | |
class CheckpointISpec extends FunSuite with EmbeddedKafka { | |
import CheckpointISpec._ | |
/** | |
* Create state with strings. | |
*/ | |
test("Create RDD with string state") { | |
withSsc { inputStream => | |
inputStream.mapWithState(stringStateSpec) | |
} | |
} | |
/** | |
* Try to access the state created before as integers. | |
*/ | |
test("Create RDD with int state") { | |
withSsc { inputStream => | |
inputStream.mapWithState(intStateSpec) | |
} | |
} | |
private def withSsc(action: (DStream[(String, String)]) => DStream[(String, String)]): Unit = { | |
withRunningKafka { | |
// Create Spark configuration | |
val conf = new SparkConf().setAppName(appName) | |
.setMaster("local[2]") | |
val ssc = new StreamingContext(conf, Seconds(3)) | |
ssc.checkpoint("./tmp") | |
publishStringMessageToKafka(kafkaTopic, "") | |
// Connect to embedded Kafka | |
val kafkaStream = createKafkaStream(ssc).map(m => m._2 -> m._2) | |
publishStringMessageToKafka(kafkaTopic, "a") | |
publishStringMessageToKafka(kafkaTopic, "b") | |
publishStringMessageToKafka(kafkaTopic, "a") | |
publishStringMessageToKafka(kafkaTopic, "b") | |
// Invoke action and print it | |
action(kafkaStream).foreachRDD { rdd => | |
rdd.foreach(println) | |
} | |
ssc.start() | |
ssc.stop(stopSparkContext = true, stopGracefully = true) | |
} | |
} | |
private def createKafkaStream(ssc: StreamingContext): DStream[(String, String)] = { | |
// Configure Kafka | |
val kafkaParams = Map[String, String]("metadata.broker.list" -> "localhost:6001") | |
// Create direct Kafka stream | |
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(kafkaTopic)) | |
} | |
} | |
object CheckpointISpec { | |
val appName = "CheckpointMigrationISpec" | |
val kafkaTopic = "test" | |
lazy val stringStateSpec = StateSpec.function[String, String, String, (String, String)](stateMapping("abc") _) | |
lazy val intStateSpec = StateSpec.function[String, String, Int, (String, String)](stateMapping(42) _) | |
private def stateMapping[StateType](fixedState: StateType)(time: Time, | |
key: String, | |
value: Option[String], | |
state: State[StateType]): Option[(String, String)] = { | |
println(s"key: $key, value: $value, $state") | |
state.update(fixedState) | |
value.map(key -> _) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment