Implement read

Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
This commit is contained in:
Frank Villaro-Dixon 2024-05-29 23:43:39 +02:00
parent 09879a3cee
commit b99572f4e1
2 changed files with 82 additions and 8 deletions

View file

@ -57,7 +57,7 @@ impl Client {
pub struct Auth { pub struct Auth {
#[serde(rename = "type")] #[serde(rename = "type")]
pub auth_type: String, pub auth_type: String,
pub hash: Option<String>, pub hash: String,
} }
impl Auth { impl Auth {
@ -67,10 +67,7 @@ impl Auth {
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(token); hasher.update(token);
let result = hasher.finalize(); let result = hasher.finalize();
if let Some(hash) = &self.hash { format!("{:x}", result) == self.hash
return format!("{:x}", result) == *hash;
}
false
} }
_ => false, _ => false,
} }

View file

@ -30,7 +30,18 @@ struct OtsdbPutData {
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct QQueryParams { struct QQueryParams {
#[serde(default)]
token: String, token: String,
#[serde(flatten)]
q: OpentsdbQuery,
}
#[derive(Debug, Deserialize, Serialize)]
struct OpentsdbQuery {
start: StringInt,
end: Option<StringInt>,
m: String,
} }
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
@ -41,8 +52,21 @@ enum StringIntFloat {
Float(f64), Float(f64),
} }
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
enum StringInt {
String(String),
Integer(i64),
}
const CONFIG_FILE: &str = "config.yaml"; const CONFIG_FILE: &str = "config.yaml";
fn get_metric(m: &String) -> String {
let mut metric = m.clone();
let pts: Vec<&str> = metric.split(":").collect();
pts[1].to_string()
}
#[actix_web::post("/put")] #[actix_web::post("/put")]
async fn put_post( async fn put_post(
shared: web::Data<ClientData>, shared: web::Data<ClientData>,
@ -64,12 +88,13 @@ async fn put_post(
if !client.can_write(&body.metric) { if !client.can_write(&body.metric) {
let emsg = format!( let emsg = format!(
"Not allowed to write metric `{}`. Allowed metrics: {}", "Not allowed to write metric `{}`. Allowed metrics: {} and {}",
body.metric, body.metric,
client.metrics.join(", ") client.metrics.join(", "),
client.write_metrics.join(", ") // XXX make it nicer
); );
error!("{}", emsg); error!("{}", emsg);
return HttpResponse::Forbidden().body(emsg.to_string()); return HttpResponse::Forbidden().body(emsg);
} }
let post_url = format!("{}put", shared.cfg.config.opentsdb.url); let post_url = format!("{}put", shared.cfg.config.opentsdb.url);
@ -106,6 +131,57 @@ async fn put_post(
} }
} }
#[actix_web::get("/query")]
async fn query_get(shared: web::Data<ClientData>, qs: web::Query<QQueryParams>) -> impl Responder {
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
if authenticated_client.is_none() {
let emsg = format!(
"Unauthorized. Unknown token: {}. Please specify a valid tokne.",
qs.token
);
error!("{}", emsg);
return HttpResponse::Unauthorized().body(emsg);
}
let client = authenticated_client.unwrap();
println!("Query get: {:?}", qs);
let metric = get_metric(&qs.q.m);
if !client.can_read(&metric) {
let emsg = format!("Not allowed to read metric `{}`", metric);
error!("{}", emsg);
return HttpResponse::Forbidden().body(emsg);
}
let get_url = format!("{}query", shared.cfg.config.opentsdb.url);
//let otsdb_body = serde_json::to_string(&body).unwrap();
//let query_string
info!("{} get metric {}", client.name, metric);
debug!("GET {} with qs: {:?}", get_url, qs.q);
let response = shared.web_client.get(get_url).query(&qs.q).send().await;
match response {
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_else(|_| "".to_string());
debug!("OpenTSDB response {}: {}", status, body);
let sstatus =
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
HttpResponse::Ok().status(sstatus).body(body)
}
Err(err) => {
error!("OpenTSDB error: {}", err);
HttpResponse::InternalServerError().body(format!("Proxy error: {}", err))
}
}
}
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string()); let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string());
@ -127,6 +203,7 @@ async fn main() -> std::io::Result<()> {
.app_data(web::JsonConfig::default().content_type_required(false)) .app_data(web::JsonConfig::default().content_type_required(false))
.wrap(Logger::new("%r %s")) // k8s already logs timestamp .wrap(Logger::new("%r %s")) // k8s already logs timestamp
.service(put_post) .service(put_post)
.service(query_get)
}) })
.bind(format!("[::]:{}", server_port))? .bind(format!("[::]:{}", server_port))?
.run() .run()