Compare commits

...

5 commits

Author SHA1 Message Date
459620820f improve doc
All checks were successful
ci / docker-build (push) Successful in 16m49s
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-05-29 23:47:24 +02:00
b99572f4e1 Implement read
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-05-29 23:43:39 +02:00
09879a3cee structs: reorganize
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-05-29 23:43:29 +02:00
bab21a9bae cfg: add read/write opts
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-05-28 12:26:40 +02:00
104de8e87e clippy
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
2024-05-28 11:24:38 +02:00
4 changed files with 169 additions and 28 deletions

View file

@ -1,8 +1,11 @@
# OpenTSDB Auth Proxy # OpenTSDB Auth Proxy
This is a simple proxy for the [OpenTSDB](https://github.com/OpenTSDB/opentsdb) This is a simple read/write proxy for the [OpenTSDB](https://github.com/OpenTSDB/opentsdb)
time series database. It handles authentication and authorization. time series database. It handles authentication and authorization.
**Warning**: This proxy is currently half baked. It works for my needs though.
if you need more features, don't hesitate to make a PR ;-)
This proxy can be publicly exposed. When sending data to opentsdb, set the endpoint This proxy can be publicly exposed. When sending data to opentsdb, set the endpoint
to this proxy instead. Each client will send the data alongside an authentication to this proxy instead. Each client will send the data alongside an authentication
token. token.
@ -10,8 +13,24 @@ token.
If the token matches the host and the metric matches the list of allowed metrics, If the token matches the host and the metric matches the list of allowed metrics,
then the request is forwarded to the opentsdb server. then the request is forwarded to the opentsdb server.
Supported routes:
## Images - POST `/put`
- GET `/query`
Supported authentications:
- sha256
Supported authorizations:
- `metrics`: read & write
- `read_metrics`
- `write_metrics`
## Container images
You can find the images on: You can find the images on:
@ -25,10 +44,9 @@ Take a look at the provided [sample configuration](./example-cfg.yml)
### Authentication tokens ### Authentication tokens
Right now, two authentication tokens are supported: Right now, one authentication token is supported:
- sha256 - sha256
- plain (not recommended)
#### Sha256 #### Sha256

View file

@ -1,11 +1,16 @@
clients: clients:
# Exclusive writers
- name: pyranometer - name: pyranometer
metrics: write_metrics:
- irradiance - irradiance
- temperature - temperature
read_metrics:
- weather.*
auth: auth:
type: sha256 type: sha256
hash: ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb # a hash: ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb # a
# Reader and writer on the same metric
- name: tgbt - name: tgbt
metrics: metrics:
- testproxy.* - testproxy.*
@ -13,9 +18,25 @@ clients:
type: sha256 type: sha256
hash: ac790471b321143716e7773d589af923236ebdd435ba17c671df3558becc5154 # 7a5becc5b5bb581522fd0bb8891bb99a70275620 hash: ac790471b321143716e7773d589af923236ebdd435ba17c671df3558becc5154 # 7a5becc5b5bb581522fd0bb8891bb99a70275620
# Reader and writer on different metrics
- name: tgbt
write_metrics:
- barfoo
read_metrics:
- foobar.*
auth:
# ...
# Reader only
- name: consumer
read_metrics:
- irradiance
auth:
# ...
config: config:
opentsdb: opentsdb:
url: http://opentsdb/api/ url: http://192.168.30.2/api/
server: server:
port: 8080 port: 8080

View file

@ -15,8 +15,13 @@ pub struct Config {
#[derive(Debug, Deserialize, Clone)] #[derive(Debug, Deserialize, Clone)]
pub struct Client { pub struct Client {
pub name: String, pub name: String,
#[serde(default)]
pub metrics: Vec<String>, pub metrics: Vec<String>,
pub auth: Auth, #[serde(default)]
pub read_metrics: Vec<String>,
#[serde(default)]
pub write_metrics: Vec<String>,
pub auth: Option<Auth>,
} }
impl Client { impl Client {
@ -26,6 +31,24 @@ impl Client {
return true; return true;
} }
} }
for m in &self.write_metrics {
if glob_match(m, metric) {
return true;
}
}
false
}
pub fn can_read(&self, metric: &str) -> bool {
for m in &self.metrics {
if glob_match(m, metric) {
return true;
}
}
for m in &self.read_metrics {
if glob_match(m, metric) {
return true;
}
}
false false
} }
} }
@ -44,7 +67,7 @@ impl Auth {
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(token); hasher.update(token);
let result = hasher.finalize(); let result = hasher.finalize();
return format!("{:x}", result) == self.hash; format!("{:x}", result) == self.hash
} }
_ => false, _ => false,
} }
@ -81,7 +104,7 @@ fn default_opentsdb_url() -> String {
pub fn load_config_file(filename: &str) -> Config { pub fn load_config_file(filename: &str) -> Config {
let yaml_content = fs::read_to_string(filename) let yaml_content = fs::read_to_string(filename)
.expect(format!("Unable to read config file {}", filename).as_str()); .unwrap_or_else(|_| panic!("Unable to read config file `{}`", filename));
let config: Config = serde_yaml::from_str(&yaml_content).expect("Unable to parse YAML"); let config: Config = serde_yaml::from_str(&yaml_content).expect("Unable to parse YAML");
config config
} }
@ -89,5 +112,5 @@ pub fn load_config_file(filename: &str) -> Config {
pub fn try_authenticate_client<'a>(clients: &'a [Client], token: &str) -> Option<&'a Client> { pub fn try_authenticate_client<'a>(clients: &'a [Client], token: &str) -> Option<&'a Client> {
clients clients
.iter() .iter()
.find(|client| client.auth.is_valid_token(token)) .find(|client| client.auth.is_some() && client.auth.as_ref().unwrap().is_valid_token(token))
} }

View file

@ -1,7 +1,7 @@
use actix_web::http::StatusCode; use actix_web::http::StatusCode;
use actix_web::middleware::Logger; use actix_web::middleware::Logger;
use actix_web::{error, web, App, HttpResponse, HttpServer, Responder}; use actix_web::{web, App, HttpResponse, HttpServer, Responder};
use log::{debug, error, info, log_enabled, Level}; use log::{debug, error, info};
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
@ -16,33 +16,62 @@ struct ClientData {
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
struct QSParams { struct QPutParams {
token: String, token: String,
} }
#[derive(Debug, Deserialize, Serialize)]
struct OtsdbPutData {
metric: String,
value: StringIntFloat,
timestamp: i64,
tags: HashMap<String, StringIntFloat>,
}
#[derive(Debug, Deserialize)]
struct QQueryParams {
#[serde(default)]
token: String,
#[serde(flatten)]
q: OpentsdbQuery,
}
#[derive(Debug, Deserialize, Serialize)]
struct OpentsdbQuery {
start: StringInt,
end: Option<StringInt>,
m: String,
}
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
#[serde(untagged)] #[serde(untagged)]
enum OtsdbValue { enum StringIntFloat {
String(String), String(String),
Integer(i64), Integer(i64),
Float(f64), Float(f64),
} }
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
struct OtsdbData { #[serde(untagged)]
metric: String, enum StringInt {
value: OtsdbValue, String(String),
timestamp: i64, Integer(i64),
tags: HashMap<String, OtsdbValue>,
} }
const CONFIG_FILE: &str = "config.yaml"; const CONFIG_FILE: &str = "config.yaml";
fn get_metric(m: &String) -> String {
let mut metric = m.clone();
let pts: Vec<&str> = metric.split(":").collect();
pts[1].to_string()
}
#[actix_web::post("/put")] #[actix_web::post("/put")]
async fn put_post( async fn put_post(
shared: web::Data<ClientData>, shared: web::Data<ClientData>,
qs: web::Query<QSParams>, qs: web::Query<QPutParams>,
body: web::Json<OtsdbData>, body: web::Json<OtsdbPutData>,
) -> impl Responder { ) -> impl Responder {
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token); let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
@ -59,12 +88,13 @@ async fn put_post(
if !client.can_write(&body.metric) { if !client.can_write(&body.metric) {
let emsg = format!( let emsg = format!(
"Not allowed to write metric `{}`. Allowed metrics: {}", "Not allowed to write metric `{}`. Allowed metrics: {} and {}",
body.metric, body.metric,
client.metrics.join(", ") client.metrics.join(", "),
client.write_metrics.join(", ") // XXX make it nicer
); );
error!("{}", emsg); error!("{}", emsg);
return HttpResponse::Forbidden().body(format!("{}", emsg)); return HttpResponse::Forbidden().body(emsg);
} }
let post_url = format!("{}put", shared.cfg.config.opentsdb.url); let post_url = format!("{}put", shared.cfg.config.opentsdb.url);
@ -101,6 +131,57 @@ async fn put_post(
} }
} }
#[actix_web::get("/query")]
async fn query_get(shared: web::Data<ClientData>, qs: web::Query<QQueryParams>) -> impl Responder {
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
if authenticated_client.is_none() {
let emsg = format!(
"Unauthorized. Unknown token: {}. Please specify a valid tokne.",
qs.token
);
error!("{}", emsg);
return HttpResponse::Unauthorized().body(emsg);
}
let client = authenticated_client.unwrap();
println!("Query get: {:?}", qs);
let metric = get_metric(&qs.q.m);
if !client.can_read(&metric) {
let emsg = format!("Not allowed to read metric `{}`", metric);
error!("{}", emsg);
return HttpResponse::Forbidden().body(emsg);
}
let get_url = format!("{}query", shared.cfg.config.opentsdb.url);
//let otsdb_body = serde_json::to_string(&body).unwrap();
//let query_string
info!("{} get metric {}", client.name, metric);
debug!("GET {} with qs: {:?}", get_url, qs.q);
let response = shared.web_client.get(get_url).query(&qs.q).send().await;
match response {
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_else(|_| "".to_string());
debug!("OpenTSDB response {}: {}", status, body);
let sstatus =
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
HttpResponse::Ok().status(sstatus).body(body)
}
Err(err) => {
error!("OpenTSDB error: {}", err);
HttpResponse::InternalServerError().body(format!("Proxy error: {}", err))
}
}
}
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string()); let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string());
@ -113,10 +194,7 @@ async fn main() -> std::io::Result<()> {
let web_client = Client::new(); let web_client = Client::new();
let shared = ClientData { let shared = ClientData { web_client, cfg };
web_client: web_client,
cfg: cfg,
};
let client_data = web::Data::new(shared); let client_data = web::Data::new(shared);
HttpServer::new(move || { HttpServer::new(move || {
@ -125,6 +203,7 @@ async fn main() -> std::io::Result<()> {
.app_data(web::JsonConfig::default().content_type_required(false)) .app_data(web::JsonConfig::default().content_type_required(false))
.wrap(Logger::new("%r %s")) // k8s already logs timestamp .wrap(Logger::new("%r %s")) // k8s already logs timestamp
.service(put_post) .service(put_post)
.service(query_get)
}) })
.bind(format!("[::]:{}", server_port))? .bind(format!("[::]:{}", server_port))?
.run() .run()