Skip to content

Instantly share code, notes, and snippets.

@martyphee
Created March 20, 2019 08:58
Show Gist options
  • Save martyphee/df632696796e3484205e486422c08450 to your computer and use it in GitHub Desktop.
Save martyphee/df632696796e3484205e486422c08450 to your computer and use it in GitHub Desktop.
package com.martyphee.sqs
import akka.NotUsed
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.alpakka.sqs.scaladsl.{SqsAckSink, SqsSource}
import akka.stream.alpakka.sqs.{MessageAction, SqsAckSettings, SqsSourceSettings}
import akka.stream.scaladsl.{Flow, Keep, RestartSink, RestartSource, Sink, Source}
import akka.stream.{ActorMaterializer, KillSwitches, Supervision, UniqueKillSwitch}
import com.martyphee.repo.PlayRepo
import com.typesafe.scalalogging.LazyLogging
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
class Consumer(queueUrl: String, maxRatePerSecond: Int)(implicit sqs: SqsAsyncClient,
mat: ActorMaterializer,
ec: ExecutionContext) extends LazyLogging {
val playRepo: PlayRepo = new PlayRepo()
private val warnAndResume: Supervision.Decider = {
case NonFatal(e) =>
logger.warn("Failed to process queue", e)
Supervision.Resume
}
def listen: UniqueKillSwitch = {
sqsSource
.viaMat(flow)(Keep.right)
.to(sqsAckSink)
.run()
}
private def flow: Flow[Message, MessageAction, UniqueKillSwitch] = {
Flow[Message]
.viaMat(KillSwitches.single)(Keep.right)
.mapAsync(parallelism = 1)({ m =>
processMessage(m)
})
.withAttributes(supervisionStrategy(warnAndResume))
.map({ m =>
MessageAction.Delete(m)
})
}
private def sqsSource: Source[Message, NotUsed] =
RestartSource.onFailuresWithBackoff(1.second, 30.seconds, randomFactor = 0.2, maxRestarts = 2) { () =>
SqsSource(queueUrl,
SqsSourceSettings()
.withMaxBatchSize(10)
.withMaxBufferSize(10)
)
}
private def sqsAckSink: Sink[MessageAction, NotUsed] =
RestartSink.withBackoff(1.second, 30.seconds, randomFactor = 0.2, 2) { () =>
SqsAckSink(queueUrl, SqsAckSettings().withMaxInFlight(10))
}
private def processMessage(message: Message): Future[Message] = {
println(s"Message received ${message.body()}")
playRepo.insert(message.messageId()).map { id =>
message
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment