Last active
August 29, 2015 14:05
-
-
Save byF/588a7a2153b0a459d9a1 to your computer and use it in GitHub Desktop.
Scala script for the aggregation task http://www.aproint.com/aggregation-with-spark-sql
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
(c) 2014 Zdenek Farana; use as is; no guarantees | |
Because of the bug in Spark SQL parser, test B works only with fork https://github.com/aproint/spark | |
*/ | |
def measure(f: => Any):Double = { | |
val start = System.currentTimeMillis() | |
f | |
val end = System.currentTimeMillis() | |
(end-start)/1000.0 | |
} | |
val sqlContext = new org.apache.spark.sql.SQLContext(sc) | |
import sqlContext.createSchemaRDD | |
case class RandomData(year: Int, dayOfYear: Int, hour: Int, created_on: java.sql.Timestamp, value: Double) | |
val P = measure{ | |
import java.text.SimpleDateFormat | |
val sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss") | |
import java.util.Calendar | |
val calendar = Calendar.getInstance(); | |
sc.textFile("randomData.csv").map(_.split(",")).filter(_(0)!="created_on").map { r => | |
val date = sdf.parse(r(0)) | |
calendar.setTime(date) | |
RandomData(calendar.get(Calendar.YEAR), | |
calendar.get(Calendar.DAY_OF_YEAR), | |
calendar.get(Calendar.HOUR), | |
new java.sql.Timestamp(date.getTime), | |
r(1).toDouble) | |
}.registerAsTable("RandomData") | |
sqlContext.cacheTable("RandomData") //could be also done as sqlContext.sql("CACHE TABLE RandomData") | |
} | |
val testA = sqlContext.sql("""SELECT year, | |
dayOfYear, | |
COUNT(*), | |
AVG(value), | |
MIN(value), | |
MAX(value) FROM RandomData GROUP BY year,dayOfYear ORDER BY year,dayOfYear""") | |
val A1 = measure(testA.collect) | |
val A2 = measure(testA.collect) | |
val testB = sqlContext.sql("""SELECT year, | |
dayOfYear, | |
hour, | |
COUNT(*), | |
AVG(value), | |
MIN(value), | |
MAX(value) FROM RandomData | |
WHERE created_on >= CAST('2012-07-16 00:00:00' AS TIMESTAMP) AND created_on <= CAST('2012-07-16 01:00:00' AS TIMESTAMP) | |
GROUP BY year,dayOfYear,hour ORDER BY year,dayOfYear,hour""") | |
val B1 = measure(testB.collect) | |
val B2 = measure(testB.collect) | |
println("""Prep time: %.3fs | |
A: %.3fs; %.3fs | |
B: %.3fs; %.3fs""".format(P,A1,A2,B1,B2)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment