test("Should aggregate call events") {
  implicit val sqlContext: SQLContext = spark.sqlContext
  import spark.implicits._
  
  val scenario = TestHelper.loadScenario[CallEvent](s"$pathToTestScenarios/pdd_events.json")
  val scenarioIter = scenario.toIterator
  scenario.nonEmpty shouldBe true

  val kafkaData = MemoryStream[MockKafkaDataFrame]
  val processingStream = EventAggregation(appConfig).process(kafkaData.toDF())(session)
    .writeStream
    .format("memory")
    .queryName("calleventaggs")
    .outputMode(eventAggregation.outputMode)
    .start()
  
  // send 11 events into the streaming application
  kafkaData.addData(scenarioIter.take(11).map(TestHelper.asMockKafkaDataFrame))
  // force spark to trigger
  processingStream.processAllAvailable()
  
  val res = spark.sql("select avg(stats.p99) from calleventaggs")
    .collect()
    .map(_.getAs[Double](0))
    .head
  
  DiscoveryUtils.round(res) shouldEqual 7.56
  processingStream.stop()
}