Skip to content

Instantly share code, notes, and snippets.

@atooni
Created November 25, 2020 15:39
Show Gist options
  • Save atooni/2b70c28be1e322fc223a59a0639411cb to your computer and use it in GitHub Desktop.
Save atooni/2b70c28be1e322fc223a59a0639411cb to your computer and use it in GitHub Desktop.
Refactor Prometheus DataModel
package zio.zmx.prometheus
import zio.Chunk
sealed abstract case class Metric[A <: Metric.Details](
name: String,
help: String,
labels: Chunk[(String, String)],
details: A
)
object Metric {
sealed trait Details
sealed trait Counter extends Details {
def count: Double
def inc(v: Double): Counter
}
sealed trait Gauge extends Details {
def value: Double
def inc(v: Double): Gauge
def set(v: Double): Gauge
}
sealed trait BucketType {
def boundaries: Chunk[Double]
def buckets(
maxAge: java.time.Duration = TimeSeries.defaultMaxAge,
maxSize: Int = TimeSeries.defaultMaxSize
): Chunk[(Double, TimeSeries)] =
boundaries.map(d => (d, TimeSeries(maxAge, maxSize)))
}
object BucketType {
final case class Manual(limits: Double*) extends BucketType {
override def boundaries: Chunk[Double] =
Chunk.fromArray(limits.toArray.sorted) ++ Chunk(Double.MaxValue).distinct
}
final case class Linear(start: Double, width: Double, count: Int) extends BucketType {
override def boundaries: Chunk[Double] =
Chunk.fromArray(0.until(count).map(i => start + i * width).toArray) ++ Chunk(Double.MaxValue)
}
final case class Exponential(start: Double, factor: Double, count: Int) extends BucketType {
override def boundaries: Chunk[Double] =
Chunk.fromArray(0.until(count).map(i => start * Math.pow(factor, i.toDouble)).toArray) ++ Chunk(Double.MaxValue)
}
}
final case class TimeSeries(
maxAge: java.time.Duration,
maxSize: Int,
samples: Chunk[(Double, java.time.Instant)] = Chunk.empty
) {
def observe(v: Double, t: java.time.Instant): TimeSeries = {
val filtered = filterSamples(t, maxAge)
copy(samples =
if (filtered.length == maxSize) samples.take(maxSize - 1) ++ Chunk((v, t)) else filtered ++ Chunk((v, t))
)
}
def timedSamples(i: java.time.Instant, t: Option[java.time.Duration]): Chunk[Double] =
filterSamples(i, t.getOrElse(maxAge)).map(_._1)
private def filterSamples(t: java.time.Instant, d: java.time.Duration): Chunk[(Double, java.time.Instant)] =
samples.dropWhile(_._2.toEpochMilli < t.toEpochMilli - d.toMillis)
}
object TimeSeries {
val defaultMaxAge: java.time.Duration = java.time.Duration.ofHours(1)
val defaultMaxSize: Int = 1024
}
sealed trait Histogram extends Details {
def buckets: Chunk[(Double, TimeSeries)]
def observe(v: Double, t: java.time.Instant): Histogram
def count: Double
def sum: Double
}
case class SampleResult(
count: Double,
sum: Double,
buckets: Chunk[(String, Option[Double])]
)
sealed abstract class Quantile private (
val phi: Double, // The quantile
val error: Double // The error margin
)
object Quantile {
def apply(phi: Double, error: Double): Option[Quantile] =
if (phi >= 0 && phi <= 1 && error >= 0 && error <= 1) Some(new Quantile(phi, error) {}) else None
}
sealed trait Summary extends Details {
def samples: TimeSeries
def quantiles: Chunk[Quantile]
def observe(v: Double, t: java.time.Instant): Summary
def count: Double
def sum: Double
}
private object Details {
final case class CounterImpl(override val count: Double) extends Counter {
override def inc(v: Double): Counter = copy(count = count + v)
}
final case class GaugeImpl(override val value: Double) extends Gauge {
override def inc(v: Double): Gauge = copy(value = value + v)
override def set(v: Double): Gauge = copy(value = v)
}
final case class HistogramImpl(
override val buckets: Chunk[(Double, TimeSeries)],
override val count: Double,
override val sum: Double
) extends Histogram { self =>
override def observe(v: Double, t: java.time.Instant): Histogram = copy(
buckets = self.buckets.map { case (b, ts) => if (v <= b) (b, ts.observe(v, t)) else (b, ts) },
count = self.count + 1,
sum = self.sum + v
)
}
final case class SummaryImpl(
override val samples: TimeSeries,
override val quantiles: Chunk[Quantile],
override val count: Double,
override val sum: Double
) extends Summary { self =>
override def observe(v: Double, t: java.time.Instant): Summary = copy(
count = self.count + 1,
sum = self.sum + v,
samples = self.samples.observe(v, t)
)
}
}
// --------- Methods creating and using Prometheus counters
def counter(name: String, help: String, labels: Chunk[(String, String)] = Chunk.empty): Metric[Counter] =
new Metric[Counter](name, help, labels, Details.CounterImpl(0)) {}
// The error case is a negative increment and is reflected by returning a null value
def incCounter(c: Metric[Counter], v: Double = 1.0d): Option[Metric[Counter]] =
if (v < 0) None else Some(new Metric[Counter](c.name, c.help, c.labels, c.details.inc(v)) {})
// --------- Methods creating and using Prometheus Gauges
def gauge(
name: String,
help: String,
labels: Chunk[(String, String)] = Chunk.empty,
startAt: Double = 0.0
): Metric[Gauge] =
new Metric[Gauge](name, help, labels, Details.GaugeImpl(startAt)) {}
def incGauge(g: Metric[Gauge], v: Double = 1.0d): Metric[Gauge] =
new Metric[Gauge](g.name, g.help, g.labels, g.details.inc(v)) {}
def dec(g: Metric[Gauge], v: Double = 1.0d): Metric[Gauge] = incGauge(g, -v)
// Set the value of the Gauge to the seconds corresponding to the given Instant
def setToInstant(g: Metric[Gauge], t: java.time.Instant): Metric[Gauge] =
new Metric[Gauge](g.name, g.help, g.labels, g.details.set((t.toEpochMilli / 1000L).toDouble)) {}
// --------- Methods creating and using Prometheus Histograms
def histogram(
name: String,
help: String,
labels: Chunk[(String, String)] = Chunk.empty,
buckets: BucketType,
maxAge: java.time.Duration,
maxSize: Int
): Option[Metric[Histogram]] =
if (labels.find(_._1.equals("le")).isDefined) None
else
Some(
new Metric[Histogram](
name,
help,
labels,
Details.HistogramImpl(
buckets.buckets(maxAge, maxSize),
0,
0
)
) {}
)
def observe(h: Metric[Histogram], v: Double, t: java.time.Instant): Metric[Histogram] =
new Metric[Histogram](h.name, h.help, h.labels, h.details.observe(v, t)) {}
def sampleHistogram(h: Metric[Histogram], i: java.time.Instant, d: Option[java.time.Duration]): SampleResult = {
val samples = h.details.buckets.map { case (k, ts) => (k, ts.timedSamples(i, d).length.toDouble) }
SampleResult(
count = h.details.count,
sum = h.details.sum,
buckets = samples.map { case (k, v) => (if (k == Double.MaxValue) "+Inf" else s"$k", Some(v)) }
)
}
// --------- Methods creating and using Prometheus Histograms
def summary(
name: String,
help: String,
labels: Chunk[(String, String)],
maxAge: java.time.Duration = TimeSeries.defaultMaxAge,
maxSize: Int = TimeSeries.defaultMaxSize
)(
quantiles: Quantile*
): Option[Metric[Summary]] =
if (labels.find(_._1.equals("quantile")).isDefined) None
else
Some(
new Metric[Summary](
name,
help,
labels,
Details.SummaryImpl(TimeSeries(maxAge, maxSize), Chunk.fromIterable(quantiles), 0, 0)
) {}
)
def sampleSummary(s: Metric[Summary], i: java.time.Instant, d: Option[java.time.Duration]): SampleResult = {
val qs = calculateQuantiles(s.details.samples.timedSamples(i, d), s.details.quantiles)
SampleResult(
count = s.details.count,
sum = s.details.sum,
buckets = qs.map { case (k, v) => (s"(phi=${k.phi}, error=${k.error})", v) }
)
}
private def calculateQuantiles(
samples: Chunk[Double],
quantiles: Chunk[Quantile]
): Chunk[(Quantile, Option[Double])] = ???
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment