test("Should aggregate call events") { implicit val sqlContext: SQLContext = spark.sqlContext import spark.implicits._ val scenario = TestHelper.loadScenario[CallEvent](s"$pathToTestScenarios/pdd_events.json") val scenarioIter = scenario.toIterator scenario.nonEmpty shouldBe true val kafkaData = MemoryStream[MockKafkaDataFrame] val processingStream = EventAggregation(appConfig).process(kafkaData.toDF())(session) .writeStream .format("memory") .queryName("calleventaggs") .outputMode(eventAggregation.outputMode) .start() // send 11 events into the streaming application kafkaData.addData(scenarioIter.take(11).map(TestHelper.asMockKafkaDataFrame)) // force spark to trigger processingStream.processAllAvailable() val res = spark.sql("select avg(stats.p99) from calleventaggs") .collect() .map(_.getAs[Double](0)) .head DiscoveryUtils.round(res) shouldEqual 7.56 processingStream.stop() }