This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package natewave | |
import akka.Done | |
import scala.concurrent.Future | |
import scala.util.{Failure, Success} | |
object GraphTest extends App { | |
import akka.actor.ActorSystem | |
import akka.stream._ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def upsert[Pk, A](dbContext: DbContext[Pk, A], x: A): Query[Unit, Connection] = | |
queryBuilder.write { implicit connection => implicit ec => | |
val params = dbContext.params(x) | |
val keys = params.map(_.name) | |
val cols = keys.mkString("(", ",", ")") | |
val placeholders = keys.mkString("({", "},{", "})") | |
val values = keys.flatMap { key => | |
params.find(_.name == key).map(DbContext.anormNamedParameter) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def getCommittedOffsetsForConsumer(consumerConfig: KafkaConsumerConfig, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, Long] = { | |
val result = withCustomStringConsumer(consumer => { | |
consumer.assign(topicPartitions.toList.asJava) | |
val tps = topicPartitions.map( tp => { | |
val offsetAndMetadata = consumer.committed(tp) | |
val startOffset: Long = KafkaCluster.getLogBeginningOffset(consumerConfig.topic).values.head | |
Option(offsetAndMetadata) match { | |
case None => { | |
// log "TopicPartition $tp doesn't have committed offsets" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
sealed trait NodeState | |
trait Follower extends NodeState | |
trait Candidate extends NodeState | |
trait Leader extends NodeState | |
final class NotLeader extends NodeState with Follower with Candidate | |
final class NotFollower extends NodeState with Candidate with Leader | |
final class NotCandidate extends NodeState with Follower with Candidate | |
class Node[State <: NodeState] private () { |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.{ Future, ExecutionContext } | |
object GroupedAsync { | |
def sequencePar[K, V, R](fs: ((K, V)) => Future[(K, R)])(input: Seq[(K, V)])(implicit ec: ExecutionContext): Future[Seq[(K, R)]] = { | |
val result: Future[Seq[(K, R)]] = { | |
val grouped: Map[K, Seq[(K, V)]] = input.groupBy(_._1) | |
val futurePerKeyMap: Map[K, Future[Seq[(K, R)]]] = grouped.mapValues { elements => | |
val l: Future[Seq[(K, R)]] = elements.foldLeft(Future(Seq.empty[(K, R)])) { | |
(previousFuture, next) => |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class KafkaListener(saveProgress: Array[OffsetRange] => Boolean) extends SparkListener with Logging { | |
private val runningJobs = collection.mutable.Set.empty[SparkListenerJobStart] | |
private val kafkaRDDs = collection.mutable.Map.empty[Int, Array[OffsetRange]] | |
override def onJobStart(jobStart: SparkListenerJobStart) { | |
runningJobs += jobStart | |
} | |
override def onJobEnd(jobEnd: SparkListenerJobEnd) { | |
runningJobs.find(_.jobId == jobEnd.jobId).map { job => |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val sRule = implicitly[RuleLike[AvroValue, String]] | |
val drRule = implicitly[RuleLike[AvroValue, DefectResult]] | |
def kTransform(g: GenericRecord) = RecordTransformations.convert(g, sRule) | |
def vTransform(g: GenericRecord) = RecordTransformations.convert(g, drRule) | |
val defectResults: DStream[(String, DefectResult)] = KafkaCommons.read[String, DefectResult](consumerConfig, ssc)(kTransform, vTransform) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def read[A, B](consumerConfig: ConsumerConfig, streamingContext: StreamingContext, maxRate: Int = DEFAULT_MAX_RATE) | |
(keyRule:GenericRecord => VA[A], valueRule: GenericRecord => VA[B]): DStream[(A, B)] = { | |
val sparkConf = streamingContext.sparkContext.getConf | |
val appName = sparkConf.get("spark.app.name") | |
val offsetsCoordinator = OffsetsCoordinator.get( | |
channel = OffsetsCoordinator.newChannel(consumerConfig.host, consumerConfig.port), | |
clientId = consumerConfig.clientId, | |
groupId = consumerConfig.groupId) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class KafkaCustomDecoder[T](implicit keyRule: RuleLike[AvroValue, T]) extends kafka.serializer.Decoder[VA[T]] with Logging { | |
import kafka.message.Message | |
import java.util.Properties | |
def fromAvro(value: GenericRecord) = | |
Avro.fromAvro[T](Avro.wrap(value)) | |
def fromBytes(bytes: Array[Byte]): VA[T] = { | |
val props: Properties = new Properties() | |
props.put("schema.registry.url", "http://localhost:8081") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def read[A, B](consumerConfig: ConsumerConfig, streamingContext: StreamingContext, maxRate: Int = DEFAULT_MAX_RATE) | |
(implicit keyRule: RuleLike[AvroValue, A], valueRule: RuleLike[AvroValue, B]): DStream[(VA[A], VA[B])] = { | |
val sparkConf = streamingContext.sparkContext.getConf | |
val appName = sparkConf.get("spark.app.name") | |
val offsetsCoordinator = OffsetsCoordinator.get( | |
channel = OffsetsCoordinator.newChannel(consumerConfig.host, consumerConfig.port), | |
clientId = consumerConfig.clientId, | |
groupId = consumerConfig.groupId) |
NewerOlder