Created
October 1, 2018 20:27
-
-
Save joao-parana/b9bf9aac71ea9a4c5d86bbb938f2c3b4 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.SparkSession | |
import org.apache.spark.sql.SparkSession.Builder | |
import org.apache.spark.SparkContext | |
import org.apache.log4j.{Level, Logger} | |
// A sparkSession é provida pelo proprio Spark Shell | |
// O nivel de log também já é configurado pela Spark Shell | |
def boolean_udf_wrapper(a:String, b:String, t:Any): Boolean = { true } | |
def string_udf_wrapper(a:String, b:String, t:Any): String = { "••••" } | |
import org.apache.spark.sql.functions.expr | |
import org.apache.spark.sql.functions.sum | |
import org.apache.spark.sql.catalyst.dsl.plans.table | |
import org.apache.spark.sql.catalyst.dsl.expressions.{sum,max,min,first,last,count,avg} | |
// | |
// O código acima é constante. A parte mutável aparece abaixo. | |
// | |
import org.apache.spark.sql.Row | |
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} | |
import org.apache.spark.sql.types.{DataType, LongType, StructType} | |
class MyCountUDAF extends UserDefinedAggregateFunction { | |
// UserDefinedAggregateFunction is the contract to define | |
// user-defined aggregate functions (UDAFs). | |
// Este método abaixo define pode ser invocado apenas assim: inputSchema(0) | |
// Isto é feito via inversão de dependência pelo Spark | |
// o retorno é um objeto StructField assim: | |
// StructField("id", LongType, true, {}) | |
// o objeto StructField é do pacote org.apache.spark.sql.types | |
override def inputSchema: StructType = { | |
new StructType().add("id", LongType, nullable = true) | |
} | |
// O buffer para resultado temporário possui um único atributo | |
// no caso da funcionalidade de contagem. | |
// Este método abaixo define pode ser invocado apenas assim: bufferSchema(0) | |
// Isto é feito via inversão de dependência pelo Spark | |
// o retorno é um objeto StructField assim: | |
// StructField("count", LongType, true, {}) | |
override def bufferSchema: StructType = { | |
new StructType().add("count", LongType, nullable = true) | |
} | |
// O método abaixo deve ser invocado sem parênteses em Scala. | |
// refere-se ao tipo do atributo de saida | |
override def dataType: DataType = LongType | |
override def deterministic: Boolean = true | |
// O método abaixo inicializa o buffer. | |
// Isto é feito via inversão de dependência pelo Spark | |
// Observe que a única coisa a ser feita é inicializar o contador com Zero. | |
override def initialize(buffer: MutableAggregationBuffer): Unit = { | |
println(s">>> initialize (buffer: $buffer)") | |
// NOTE: Scala's update used under the covers | |
buffer(0) = 0L | |
} | |
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = { | |
println(s">>> update (buffer: $buffer -> input: $input)") | |
buffer(0) = buffer.getLong(0) + 1 | |
} | |
override def merge(buffer: MutableAggregationBuffer, row: Row): Unit = { | |
println(s">>> merge (buffer: $buffer -> row: $row)") | |
buffer(0) = buffer.getLong(0) + row.getLong(0) | |
} | |
override def evaluate(buffer: Row): Any = { | |
println(s">>> evaluate (buffer: $buffer)") | |
buffer.getLong(0) | |
} | |
} | |
// Criando o objeto MyCountUDAF para ser usada com a API de Dataset/DataFrame | |
val myCountUDAF = new MyCountUDAF | |
// | |
case class R7_Tuple(deptId:Long,deptName:String){} | |
val R7_Dataset = spark.read.json("DATA/depto.json").as[R7_Tuple] | |
case class R6_Tuple(deptId: Long, deptName: String){} | |
val R6_Dataset = R7_Dataset.filter(t => boolean_udf_wrapper("scala", "oldestDeptos", t)).as[R6_Tuple] | |
case class R8_Tuple(deptId:Long){} | |
val R8_Dataset = spark.read.json("DATA/depto_ids.json").as[R8_Tuple] | |
case class R2_Tuple( deptId:Long, deptName:String){} | |
val R2_Dataset = R8_Dataset.join(R6_Dataset, "deptId").as[R2_Tuple] | |
case class R0_Tuple(deptId:Long,name:String,salary:Double){} | |
val R0_Dataset = spark.read.json("DATA/employees.json").as[R0_Tuple] | |
case class R1_Tuple( deptId: Long, name: String, salary: Double, nameSmartCased: String ){} | |
val R1_Dataset = R0_Dataset.map(t => R1_Tuple(t.deptId, t.name, t.salary, string_udf_wrapper("scala", "smartTextCase", t) )) | |
case class R3_Tuple( deptId:Long, name:String, salary:Double, nameSmartCased:String, deptName: String){} | |
val R3_Dataset = R1_Dataset.join(R2_Dataset, "deptId").as[R3_Tuple] | |
case class R4_Tuple(deptId: Long, name: String, salary: Double, nameSmartCased: String, deptName: String){} | |
val R4_Dataset = R3_Dataset.filter(t => boolean_udf_wrapper("scala", "happyEmployees", t)).as[R4_Tuple] | |
case class R5_Tuple( deptId: Long, sum_salary: Double ){} | |
val R5_Dataset = R4_Dataset.groupBy("deptId"). | |
agg(expr("sum(salary)").alias("sum_salary")).as[R5_Tuple] | |
// | |
val agregated_1 = R4_Dataset.groupBy('deptId).agg(myCountUDAF('name) as "count") | |
agregated_1.show(10) | |
val agregated_2 = R4_Dataset.groupBy('deptId).agg(myCountUDAF.distinct('salary) as "count") | |
agregated_2.show(10) | |
R5_Dataset.show(10) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment