Skip to content

Instantly share code, notes, and snippets.

@autrilla
Created April 5, 2020 18:40
Show Gist options
  • Select an option

  • Save autrilla/78dd21523567d49b176e8af55d902376 to your computer and use it in GitHub Desktop.

Select an option

Save autrilla/78dd21523567d49b176e8af55d902376 to your computer and use it in GitHub Desktop.
use http;
use hyper;
use hyper_tls::HttpsConnector;
use async_trait::async_trait;
use regex;
use thiserror::Error;
use crate::lib;
#[derive(Debug)]
pub struct Probe {
pub method: hyper::Method,
pub uri: String,
pub body: String,
pub headers: Vec<(String, String)>,
pub validators: Vec<Box<dyn Validator + Sync + Send>>,
}
#[derive(Error, Debug)]
pub enum ValidationError {
#[error("Invalid status code (expected {start}-{end}, got {got})")]
InvalidStatusCode {
start: u16,
end: u16,
got: hyper::StatusCode,
},
#[error("Body does not match")]
BodyDoesNotMatch,
}
#[async_trait]
pub trait Validator: std::fmt::Debug {
async fn validate(&self, parts: &http::response::Parts, body: &[u8]) -> lib::Result;
}
#[derive(Debug)]
pub struct StatusCodeValidator {
pub range: std::ops::Range<u16>,
}
#[async_trait]
impl Validator for StatusCodeValidator {
async fn validate(&self, parts: &http::response::Parts, _body: &[u8]) -> lib::Result {
if self.range.contains(&parts.status.into()) {
Ok(())
} else {
Err(ValidationError::InvalidStatusCode {
start: self.range.start,
end: self.range.end,
got: parts.status.clone(),
}
.into())
}
}
}
#[derive(Debug)]
pub struct BodyRegexValidator {
pub re: regex::bytes::Regex,
}
#[async_trait]
impl Validator for BodyRegexValidator {
async fn validate(&self, _parts: &http::response::Parts, body: &[u8]) -> lib::Result {
if self.re.is_match(body) {
Ok(())
} else {
Err(ValidationError::BodyDoesNotMatch.into())
}
}
}
#[async_trait]
impl lib::Probe for Probe {
async fn execute(&self) -> lib::Result {
let https = HttpsConnector::new();
let client = hyper::Client::builder().build::<_, hyper::Body>(https);
let mut req = hyper::Request::builder()
.method(self.method.clone())
.uri(self.uri.clone());
for (name, value) in &self.headers {
req = req.header(name, value);
}
let req = req.body(hyper::Body::from(self.body.clone())).expect("");
let resp = client.request(req).await?;
let (parts, body) = resp.into_parts();
let body = hyper::body::to_bytes(body).await?;
for validator in &self.validators {
validator.validate(&parts, &body).await?;
}
Ok(())
}
}
use anyhow;
use async_trait::async_trait;
pub type Result = anyhow::Result<()>;
#[async_trait]
pub trait Probe: std::fmt::Debug {
async fn execute(&self) -> Result;
}
pub struct Config {
pub name: String,
pub probe: Box<dyn Probe>,
pub interval: std::time::Duration,
}
use futures_util::stream::StreamExt;
use prometheus;
use prometheus::Encoder;
use regex;
use tokio;
use warp::Filter;
mod http;
mod lib;
#[tokio::main]
async fn main() {
let configs = vec![
lib::Config {
name: "insert_temperature".into(),
probe: Box::new(http::Probe {
method: hyper::Method::POST,
uri: "http://farmnet.favega.com/temperature-probes/FAVEGA-TEST/temperatures/"
.into(),
body: "temperature=20.0".into(),
headers: vec![
(
"Content-Type".into(),
"application/x-www-form-urlencoded".into(),
),
(
"Authorization".into(),
"TemperatureProbeToken REDACTED".into(),
),
],
validators: vec![
Box::new(http::StatusCodeValidator {
range: std::ops::Range {
start: 200,
end: 300,
},
}),
Box::new(http::BodyRegexValidator {
re: regex::bytes::Regex::new("temperature.*20\\.0").expect(""),
}),
],
}),
interval: std::time::Duration::from_secs(1),
},
lib::Config {
name: "favega".into(),
probe: Box::new(http::Probe {
method: hyper::Method::GET,
uri: "https://favega.com/es/".into(),
body: "".into(),
headers: vec![],
validators: vec![
Box::new(http::StatusCodeValidator {
range: std::ops::Range {
start: 200,
end: 300,
},
}),
Box::new(http::BodyRegexValidator {
re: regex::bytes::Regex::new("Diseñamos y fabricamos nuestros productos")
.expect(""),
}),
],
}),
interval: std::time::Duration::from_secs(10),
},
lib::Config {
name: "tiendaganadera".into(),
probe: Box::new(http::Probe {
method: hyper::Method::GET,
uri: "https://tiendaganadera.com/home.php".into(),
body: "".into(),
headers: vec![],
validators: vec![
Box::new(http::StatusCodeValidator {
range: std::ops::Range {
start: 200,
end: 300,
},
}),
Box::new(http::BodyRegexValidator {
re: regex::bytes::Regex::new("Tienda").expect(""),
}),
],
}),
interval: std::time::Duration::from_secs(10),
},
lib::Config {
name: "tiendaagricola".into(),
probe: Box::new(http::Probe {
method: hyper::Method::GET,
uri: "https://tiendaganadera.com/home.php".into(),
body: "".into(),
headers: vec![],
validators: vec![
Box::new(http::StatusCodeValidator {
range: std::ops::Range {
start: 200,
end: 300,
},
}),
Box::new(http::BodyRegexValidator {
re: regex::bytes::Regex::new("Tienda").expect(""),
}),
],
}),
interval: std::time::Duration::from_secs(10),
},
];
let requests_counter = prometheus::CounterVec::new(
prometheus::Opts::new("prober_requests_total", "Total requests made by the prober"),
&["probe_name", "status"],
)
.unwrap();
// Create a Registry and register Counter.
let r = prometheus::Registry::new();
r.register(Box::new(requests_counter.clone())).unwrap();
let streams = configs
.iter()
.map(|config| tokio::time::interval(config.interval).map(move |_| config))
.collect::<Vec<_>>();
let mut combined_stream = futures_util::stream::select_all(streams);
let hello = warp::path!("metrics").map(move || {
let mut buffer = vec![];
let encoder = prometheus::TextEncoder::new();
let metric_families = r.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
});
let handle = tokio::runtime::Handle::current();
handle.spawn(async {
println!("Metrics serving started");
warp::serve(hello).run(([0, 0, 0, 0], 8000)).await;
});
println!("Prober service started");
loop {
let config = combined_stream.next().await;
if let Some(config) = config {
let rsp = config.probe.execute().await;
if let Err(ref e) = rsp {
println!("Error executing probe {}: {:?}", &config.name, e);
}
let status = if rsp.is_ok() { "ok" } else { "error" };
requests_counter
.with_label_values(&[&config.name, status])
.inc();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment