Compare commits

...

4 commits

Author SHA1 Message Date
b99572f4e1 Implement read
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-05-29 23:43:39 +02:00
09879a3cee structs: reorganize
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-05-29 23:43:29 +02:00
bab21a9bae cfg: add read/write opts
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-05-28 12:26:40 +02:00
104de8e87e clippy
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-05-28 11:24:38 +02:00
3 changed files with 147 additions and 24 deletions

View file

@ -1,11 +1,16 @@
clients:
# Exclusive writers
- name: pyranometer
metrics:
write_metrics:
- irradiance
- temperature
read_metrics:
- weather.*
auth:
type: sha256
hash: ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb # a
# Reader and writer on the same metric
- name: tgbt
metrics:
- testproxy.*
@ -13,9 +18,25 @@ clients:
type: sha256
hash: ac790471b321143716e7773d589af923236ebdd435ba17c671df3558becc5154 # 7a5becc5b5bb581522fd0bb8891bb99a70275620
# Reader and writer on different metrics
- name: tgbt
write_metrics:
- barfoo
read_metrics:
- foobar.*
auth:
# ...
# Reader only
- name: consumer
read_metrics:
- irradiance
auth:
# ...
config:
opentsdb:
url: http://opentsdb/api/
url: http://192.168.30.2/api/
server:
port: 8080

View file

@ -15,8 +15,13 @@ pub struct Config {
#[derive(Debug, Deserialize, Clone)]
pub struct Client {
pub name: String,
#[serde(default)]
pub metrics: Vec<String>,
pub auth: Auth,
#[serde(default)]
pub read_metrics: Vec<String>,
#[serde(default)]
pub write_metrics: Vec<String>,
pub auth: Option<Auth>,
}
impl Client {
@ -26,6 +31,24 @@ impl Client {
return true;
}
}
for m in &self.write_metrics {
if glob_match(m, metric) {
return true;
}
}
false
}
pub fn can_read(&self, metric: &str) -> bool {
for m in &self.metrics {
if glob_match(m, metric) {
return true;
}
}
for m in &self.read_metrics {
if glob_match(m, metric) {
return true;
}
}
false
}
}
@ -44,7 +67,7 @@ impl Auth {
let mut hasher = Sha256::new();
hasher.update(token);
let result = hasher.finalize();
return format!("{:x}", result) == self.hash;
format!("{:x}", result) == self.hash
}
_ => false,
}
@ -81,7 +104,7 @@ fn default_opentsdb_url() -> String {
pub fn load_config_file(filename: &str) -> Config {
let yaml_content = fs::read_to_string(filename)
.expect(format!("Unable to read config file {}", filename).as_str());
.unwrap_or_else(|_| panic!("Unable to read config file `{}`", filename));
let config: Config = serde_yaml::from_str(&yaml_content).expect("Unable to parse YAML");
config
}
@ -89,5 +112,5 @@ pub fn load_config_file(filename: &str) -> Config {
pub fn try_authenticate_client<'a>(clients: &'a [Client], token: &str) -> Option<&'a Client> {
clients
.iter()
.find(|client| client.auth.is_valid_token(token))
.find(|client| client.auth.is_some() && client.auth.as_ref().unwrap().is_valid_token(token))
}

View file

@ -1,7 +1,7 @@
use actix_web::http::StatusCode;
use actix_web::middleware::Logger;
use actix_web::{error, web, App, HttpResponse, HttpServer, Responder};
use log::{debug, error, info, log_enabled, Level};
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use log::{debug, error, info};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@ -16,33 +16,62 @@ struct ClientData {
}
#[derive(Debug, Deserialize)]
struct QSParams {
struct QPutParams {
token: String,
}
#[derive(Debug, Deserialize, Serialize)]
struct OtsdbPutData {
metric: String,
value: StringIntFloat,
timestamp: i64,
tags: HashMap<String, StringIntFloat>,
}
#[derive(Debug, Deserialize)]
struct QQueryParams {
#[serde(default)]
token: String,
#[serde(flatten)]
q: OpentsdbQuery,
}
#[derive(Debug, Deserialize, Serialize)]
struct OpentsdbQuery {
start: StringInt,
end: Option<StringInt>,
m: String,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)]
enum OtsdbValue {
enum StringIntFloat {
String(String),
Integer(i64),
Float(f64),
}
#[derive(Debug, Deserialize, Serialize)]
struct OtsdbData {
metric: String,
value: OtsdbValue,
timestamp: i64,
tags: HashMap<String, OtsdbValue>,
#[serde(untagged)]
enum StringInt {
String(String),
Integer(i64),
}
const CONFIG_FILE: &str = "config.yaml";
fn get_metric(m: &String) -> String {
let mut metric = m.clone();
let pts: Vec<&str> = metric.split(":").collect();
pts[1].to_string()
}
#[actix_web::post("/put")]
async fn put_post(
shared: web::Data<ClientData>,
qs: web::Query<QSParams>,
body: web::Json<OtsdbData>,
qs: web::Query<QPutParams>,
body: web::Json<OtsdbPutData>,
) -> impl Responder {
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
@ -59,12 +88,13 @@ async fn put_post(
if !client.can_write(&body.metric) {
let emsg = format!(
"Not allowed to write metric `{}`. Allowed metrics: {}",
"Not allowed to write metric `{}`. Allowed metrics: {} and {}",
body.metric,
client.metrics.join(", ")
client.metrics.join(", "),
client.write_metrics.join(", ") // XXX make it nicer
);
error!("{}", emsg);
return HttpResponse::Forbidden().body(format!("{}", emsg));
return HttpResponse::Forbidden().body(emsg);
}
let post_url = format!("{}put", shared.cfg.config.opentsdb.url);
@ -101,6 +131,57 @@ async fn put_post(
}
}
#[actix_web::get("/query")]
async fn query_get(shared: web::Data<ClientData>, qs: web::Query<QQueryParams>) -> impl Responder {
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
if authenticated_client.is_none() {
let emsg = format!(
"Unauthorized. Unknown token: {}. Please specify a valid tokne.",
qs.token
);
error!("{}", emsg);
return HttpResponse::Unauthorized().body(emsg);
}
let client = authenticated_client.unwrap();
println!("Query get: {:?}", qs);
let metric = get_metric(&qs.q.m);
if !client.can_read(&metric) {
let emsg = format!("Not allowed to read metric `{}`", metric);
error!("{}", emsg);
return HttpResponse::Forbidden().body(emsg);
}
let get_url = format!("{}query", shared.cfg.config.opentsdb.url);
//let otsdb_body = serde_json::to_string(&body).unwrap();
//let query_string
info!("{} get metric {}", client.name, metric);
debug!("GET {} with qs: {:?}", get_url, qs.q);
let response = shared.web_client.get(get_url).query(&qs.q).send().await;
match response {
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_else(|_| "".to_string());
debug!("OpenTSDB response {}: {}", status, body);
let sstatus =
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
HttpResponse::Ok().status(sstatus).body(body)
}
Err(err) => {
error!("OpenTSDB error: {}", err);
HttpResponse::InternalServerError().body(format!("Proxy error: {}", err))
}
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string());
@ -113,10 +194,7 @@ async fn main() -> std::io::Result<()> {
let web_client = Client::new();
let shared = ClientData {
web_client: web_client,
cfg: cfg,
};
let shared = ClientData { web_client, cfg };
let client_data = web::Data::new(shared);
HttpServer::new(move || {
@ -125,6 +203,7 @@ async fn main() -> std::io::Result<()> {
.app_data(web::JsonConfig::default().content_type_required(false))
.wrap(Logger::new("%r %s")) // k8s already logs timestamp
.service(put_post)
.service(query_get)
})
.bind(format!("[::]:{}", server_port))?
.run()