diff --git a/src/config.rs b/src/config.rs index 875be10..0ee900b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -57,7 +57,7 @@ impl Client { pub struct Auth { #[serde(rename = "type")] pub auth_type: String, - pub hash: Option, + pub hash: String, } impl Auth { @@ -67,10 +67,7 @@ impl Auth { let mut hasher = Sha256::new(); hasher.update(token); let result = hasher.finalize(); - if let Some(hash) = &self.hash { - return format!("{:x}", result) == *hash; - } - false + format!("{:x}", result) == self.hash } _ => false, } diff --git a/src/main.rs b/src/main.rs index f263bd0..a987069 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,7 +30,18 @@ struct OtsdbPutData { #[derive(Debug, Deserialize)] struct QQueryParams { + #[serde(default)] token: String, + + #[serde(flatten)] + q: OpentsdbQuery, +} + +#[derive(Debug, Deserialize, Serialize)] +struct OpentsdbQuery { + start: StringInt, + end: Option, + m: String, } #[derive(Debug, Deserialize, Serialize)] @@ -41,8 +52,21 @@ enum StringIntFloat { Float(f64), } +#[derive(Debug, Deserialize, Serialize)] +#[serde(untagged)] +enum StringInt { + String(String), + Integer(i64), +} + const CONFIG_FILE: &str = "config.yaml"; +fn get_metric(m: &String) -> String { + let mut metric = m.clone(); + let pts: Vec<&str> = metric.split(":").collect(); + pts[1].to_string() +} + #[actix_web::post("/put")] async fn put_post( shared: web::Data, @@ -64,12 +88,13 @@ async fn put_post( if !client.can_write(&body.metric) { let emsg = format!( - "Not allowed to write metric `{}`. Allowed metrics: {}", + "Not allowed to write metric `{}`. Allowed metrics: {} and {}", body.metric, - client.metrics.join(", ") + client.metrics.join(", "), + client.write_metrics.join(", ") // XXX make it nicer ); error!("{}", emsg); - return HttpResponse::Forbidden().body(emsg.to_string()); + return HttpResponse::Forbidden().body(emsg); } let post_url = format!("{}put", shared.cfg.config.opentsdb.url); @@ -106,6 +131,57 @@ async fn put_post( } } +#[actix_web::get("/query")] +async fn query_get(shared: web::Data, qs: web::Query) -> impl Responder { + let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token); + + if authenticated_client.is_none() { + let emsg = format!( + "Unauthorized. Unknown token: {}. Please specify a valid tokne.", + qs.token + ); + error!("{}", emsg); + return HttpResponse::Unauthorized().body(emsg); + } + + let client = authenticated_client.unwrap(); + + println!("Query get: {:?}", qs); + let metric = get_metric(&qs.q.m); + + if !client.can_read(&metric) { + let emsg = format!("Not allowed to read metric `{}`", metric); + error!("{}", emsg); + return HttpResponse::Forbidden().body(emsg); + } + + let get_url = format!("{}query", shared.cfg.config.opentsdb.url); + //let otsdb_body = serde_json::to_string(&body).unwrap(); + //let query_string + + info!("{} get metric {}", client.name, metric); + debug!("GET {} with qs: {:?}", get_url, qs.q); + + let response = shared.web_client.get(get_url).query(&qs.q).send().await; + + match response { + Ok(resp) => { + let status = resp.status(); + + let body = resp.text().await.unwrap_or_else(|_| "".to_string()); + debug!("OpenTSDB response {}: {}", status, body); + let sstatus = + StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + + HttpResponse::Ok().status(sstatus).body(body) + } + Err(err) => { + error!("OpenTSDB error: {}", err); + HttpResponse::InternalServerError().body(format!("Proxy error: {}", err)) + } + } +} + #[actix_web::main] async fn main() -> std::io::Result<()> { let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string()); @@ -127,6 +203,7 @@ async fn main() -> std::io::Result<()> { .app_data(web::JsonConfig::default().content_type_required(false)) .wrap(Logger::new("%r %s")) // k8s already logs timestamp .service(put_post) + .service(query_get) }) .bind(format!("[::]:{}", server_port))? .run()