Compare commits
5 commits
7ab7caa50e
...
459620820f
Author | SHA1 | Date | |
---|---|---|---|
459620820f | |||
b99572f4e1 | |||
09879a3cee | |||
bab21a9bae | |||
104de8e87e |
4 changed files with 169 additions and 28 deletions
26
README.md
26
README.md
|
@ -1,8 +1,11 @@
|
||||||
# OpenTSDB Auth Proxy
|
# OpenTSDB Auth Proxy
|
||||||
|
|
||||||
This is a simple proxy for the [OpenTSDB](https://github.com/OpenTSDB/opentsdb)
|
This is a simple read/write proxy for the [OpenTSDB](https://github.com/OpenTSDB/opentsdb)
|
||||||
time series database. It handles authentication and authorization.
|
time series database. It handles authentication and authorization.
|
||||||
|
|
||||||
|
**Warning**: This proxy is currently half baked. It works for my needs though.
|
||||||
|
if you need more features, don't hesitate to make a PR ;-)
|
||||||
|
|
||||||
This proxy can be publicly exposed. When sending data to opentsdb, set the endpoint
|
This proxy can be publicly exposed. When sending data to opentsdb, set the endpoint
|
||||||
to this proxy instead. Each client will send the data alongside an authentication
|
to this proxy instead. Each client will send the data alongside an authentication
|
||||||
token.
|
token.
|
||||||
|
@ -10,8 +13,24 @@ token.
|
||||||
If the token matches the host and the metric matches the list of allowed metrics,
|
If the token matches the host and the metric matches the list of allowed metrics,
|
||||||
then the request is forwarded to the opentsdb server.
|
then the request is forwarded to the opentsdb server.
|
||||||
|
|
||||||
|
Supported routes:
|
||||||
|
|
||||||
## Images
|
- POST `/put`
|
||||||
|
- GET `/query`
|
||||||
|
|
||||||
|
Supported authentications:
|
||||||
|
|
||||||
|
- sha256
|
||||||
|
|
||||||
|
Supported authorizations:
|
||||||
|
|
||||||
|
- `metrics`: read & write
|
||||||
|
- `read_metrics`
|
||||||
|
- `write_metrics`
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## Container images
|
||||||
|
|
||||||
You can find the images on:
|
You can find the images on:
|
||||||
|
|
||||||
|
@ -25,10 +44,9 @@ Take a look at the provided [sample configuration](./example-cfg.yml)
|
||||||
|
|
||||||
### Authentication tokens
|
### Authentication tokens
|
||||||
|
|
||||||
Right now, two authentication tokens are supported:
|
Right now, one authentication token is supported:
|
||||||
|
|
||||||
- sha256
|
- sha256
|
||||||
- plain (not recommended)
|
|
||||||
|
|
||||||
#### Sha256
|
#### Sha256
|
||||||
|
|
||||||
|
|
|
@ -1,11 +1,16 @@
|
||||||
clients:
|
clients:
|
||||||
|
# Exclusive writers
|
||||||
- name: pyranometer
|
- name: pyranometer
|
||||||
metrics:
|
write_metrics:
|
||||||
- irradiance
|
- irradiance
|
||||||
- temperature
|
- temperature
|
||||||
|
read_metrics:
|
||||||
|
- weather.*
|
||||||
auth:
|
auth:
|
||||||
type: sha256
|
type: sha256
|
||||||
hash: ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb # a
|
hash: ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb # a
|
||||||
|
|
||||||
|
# Reader and writer on the same metric
|
||||||
- name: tgbt
|
- name: tgbt
|
||||||
metrics:
|
metrics:
|
||||||
- testproxy.*
|
- testproxy.*
|
||||||
|
@ -13,9 +18,25 @@ clients:
|
||||||
type: sha256
|
type: sha256
|
||||||
hash: ac790471b321143716e7773d589af923236ebdd435ba17c671df3558becc5154 # 7a5becc5b5bb581522fd0bb8891bb99a70275620
|
hash: ac790471b321143716e7773d589af923236ebdd435ba17c671df3558becc5154 # 7a5becc5b5bb581522fd0bb8891bb99a70275620
|
||||||
|
|
||||||
|
# Reader and writer on different metrics
|
||||||
|
- name: tgbt
|
||||||
|
write_metrics:
|
||||||
|
- barfoo
|
||||||
|
read_metrics:
|
||||||
|
- foobar.*
|
||||||
|
auth:
|
||||||
|
# ...
|
||||||
|
|
||||||
|
# Reader only
|
||||||
|
- name: consumer
|
||||||
|
read_metrics:
|
||||||
|
- irradiance
|
||||||
|
auth:
|
||||||
|
# ...
|
||||||
|
|
||||||
|
|
||||||
config:
|
config:
|
||||||
opentsdb:
|
opentsdb:
|
||||||
url: http://opentsdb/api/
|
url: http://192.168.30.2/api/
|
||||||
server:
|
server:
|
||||||
port: 8080
|
port: 8080
|
||||||
|
|
|
@ -15,8 +15,13 @@ pub struct Config {
|
||||||
#[derive(Debug, Deserialize, Clone)]
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
|
#[serde(default)]
|
||||||
pub metrics: Vec<String>,
|
pub metrics: Vec<String>,
|
||||||
pub auth: Auth,
|
#[serde(default)]
|
||||||
|
pub read_metrics: Vec<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub write_metrics: Vec<String>,
|
||||||
|
pub auth: Option<Auth>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
|
@ -26,6 +31,24 @@ impl Client {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for m in &self.write_metrics {
|
||||||
|
if glob_match(m, metric) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
pub fn can_read(&self, metric: &str) -> bool {
|
||||||
|
for m in &self.metrics {
|
||||||
|
if glob_match(m, metric) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for m in &self.read_metrics {
|
||||||
|
if glob_match(m, metric) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,7 +67,7 @@ impl Auth {
|
||||||
let mut hasher = Sha256::new();
|
let mut hasher = Sha256::new();
|
||||||
hasher.update(token);
|
hasher.update(token);
|
||||||
let result = hasher.finalize();
|
let result = hasher.finalize();
|
||||||
return format!("{:x}", result) == self.hash;
|
format!("{:x}", result) == self.hash
|
||||||
}
|
}
|
||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
|
@ -81,7 +104,7 @@ fn default_opentsdb_url() -> String {
|
||||||
|
|
||||||
pub fn load_config_file(filename: &str) -> Config {
|
pub fn load_config_file(filename: &str) -> Config {
|
||||||
let yaml_content = fs::read_to_string(filename)
|
let yaml_content = fs::read_to_string(filename)
|
||||||
.expect(format!("Unable to read config file {}", filename).as_str());
|
.unwrap_or_else(|_| panic!("Unable to read config file `{}`", filename));
|
||||||
let config: Config = serde_yaml::from_str(&yaml_content).expect("Unable to parse YAML");
|
let config: Config = serde_yaml::from_str(&yaml_content).expect("Unable to parse YAML");
|
||||||
config
|
config
|
||||||
}
|
}
|
||||||
|
@ -89,5 +112,5 @@ pub fn load_config_file(filename: &str) -> Config {
|
||||||
pub fn try_authenticate_client<'a>(clients: &'a [Client], token: &str) -> Option<&'a Client> {
|
pub fn try_authenticate_client<'a>(clients: &'a [Client], token: &str) -> Option<&'a Client> {
|
||||||
clients
|
clients
|
||||||
.iter()
|
.iter()
|
||||||
.find(|client| client.auth.is_valid_token(token))
|
.find(|client| client.auth.is_some() && client.auth.as_ref().unwrap().is_valid_token(token))
|
||||||
}
|
}
|
||||||
|
|
115
src/main.rs
115
src/main.rs
|
@ -1,7 +1,7 @@
|
||||||
use actix_web::http::StatusCode;
|
use actix_web::http::StatusCode;
|
||||||
use actix_web::middleware::Logger;
|
use actix_web::middleware::Logger;
|
||||||
use actix_web::{error, web, App, HttpResponse, HttpServer, Responder};
|
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
|
||||||
use log::{debug, error, info, log_enabled, Level};
|
use log::{debug, error, info};
|
||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
@ -16,33 +16,62 @@ struct ClientData {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize)]
|
#[derive(Debug, Deserialize)]
|
||||||
struct QSParams {
|
struct QPutParams {
|
||||||
token: String,
|
token: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
struct OtsdbPutData {
|
||||||
|
metric: String,
|
||||||
|
value: StringIntFloat,
|
||||||
|
timestamp: i64,
|
||||||
|
tags: HashMap<String, StringIntFloat>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct QQueryParams {
|
||||||
|
#[serde(default)]
|
||||||
|
token: String,
|
||||||
|
|
||||||
|
#[serde(flatten)]
|
||||||
|
q: OpentsdbQuery,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
struct OpentsdbQuery {
|
||||||
|
start: StringInt,
|
||||||
|
end: Option<StringInt>,
|
||||||
|
m: String,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
#[serde(untagged)]
|
#[serde(untagged)]
|
||||||
enum OtsdbValue {
|
enum StringIntFloat {
|
||||||
String(String),
|
String(String),
|
||||||
Integer(i64),
|
Integer(i64),
|
||||||
Float(f64),
|
Float(f64),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Deserialize, Serialize)]
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
struct OtsdbData {
|
#[serde(untagged)]
|
||||||
metric: String,
|
enum StringInt {
|
||||||
value: OtsdbValue,
|
String(String),
|
||||||
timestamp: i64,
|
Integer(i64),
|
||||||
tags: HashMap<String, OtsdbValue>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const CONFIG_FILE: &str = "config.yaml";
|
const CONFIG_FILE: &str = "config.yaml";
|
||||||
|
|
||||||
|
fn get_metric(m: &String) -> String {
|
||||||
|
let mut metric = m.clone();
|
||||||
|
let pts: Vec<&str> = metric.split(":").collect();
|
||||||
|
pts[1].to_string()
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_web::post("/put")]
|
#[actix_web::post("/put")]
|
||||||
async fn put_post(
|
async fn put_post(
|
||||||
shared: web::Data<ClientData>,
|
shared: web::Data<ClientData>,
|
||||||
qs: web::Query<QSParams>,
|
qs: web::Query<QPutParams>,
|
||||||
body: web::Json<OtsdbData>,
|
body: web::Json<OtsdbPutData>,
|
||||||
) -> impl Responder {
|
) -> impl Responder {
|
||||||
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
|
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
|
||||||
|
|
||||||
|
@ -59,12 +88,13 @@ async fn put_post(
|
||||||
|
|
||||||
if !client.can_write(&body.metric) {
|
if !client.can_write(&body.metric) {
|
||||||
let emsg = format!(
|
let emsg = format!(
|
||||||
"Not allowed to write metric `{}`. Allowed metrics: {}",
|
"Not allowed to write metric `{}`. Allowed metrics: {} and {}",
|
||||||
body.metric,
|
body.metric,
|
||||||
client.metrics.join(", ")
|
client.metrics.join(", "),
|
||||||
|
client.write_metrics.join(", ") // XXX make it nicer
|
||||||
);
|
);
|
||||||
error!("{}", emsg);
|
error!("{}", emsg);
|
||||||
return HttpResponse::Forbidden().body(format!("{}", emsg));
|
return HttpResponse::Forbidden().body(emsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
let post_url = format!("{}put", shared.cfg.config.opentsdb.url);
|
let post_url = format!("{}put", shared.cfg.config.opentsdb.url);
|
||||||
|
@ -101,6 +131,57 @@ async fn put_post(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[actix_web::get("/query")]
|
||||||
|
async fn query_get(shared: web::Data<ClientData>, qs: web::Query<QQueryParams>) -> impl Responder {
|
||||||
|
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
|
||||||
|
|
||||||
|
if authenticated_client.is_none() {
|
||||||
|
let emsg = format!(
|
||||||
|
"Unauthorized. Unknown token: {}. Please specify a valid tokne.",
|
||||||
|
qs.token
|
||||||
|
);
|
||||||
|
error!("{}", emsg);
|
||||||
|
return HttpResponse::Unauthorized().body(emsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = authenticated_client.unwrap();
|
||||||
|
|
||||||
|
println!("Query get: {:?}", qs);
|
||||||
|
let metric = get_metric(&qs.q.m);
|
||||||
|
|
||||||
|
if !client.can_read(&metric) {
|
||||||
|
let emsg = format!("Not allowed to read metric `{}`", metric);
|
||||||
|
error!("{}", emsg);
|
||||||
|
return HttpResponse::Forbidden().body(emsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
let get_url = format!("{}query", shared.cfg.config.opentsdb.url);
|
||||||
|
//let otsdb_body = serde_json::to_string(&body).unwrap();
|
||||||
|
//let query_string
|
||||||
|
|
||||||
|
info!("{} get metric {}", client.name, metric);
|
||||||
|
debug!("GET {} with qs: {:?}", get_url, qs.q);
|
||||||
|
|
||||||
|
let response = shared.web_client.get(get_url).query(&qs.q).send().await;
|
||||||
|
|
||||||
|
match response {
|
||||||
|
Ok(resp) => {
|
||||||
|
let status = resp.status();
|
||||||
|
|
||||||
|
let body = resp.text().await.unwrap_or_else(|_| "".to_string());
|
||||||
|
debug!("OpenTSDB response {}: {}", status, body);
|
||||||
|
let sstatus =
|
||||||
|
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
|
||||||
|
|
||||||
|
HttpResponse::Ok().status(sstatus).body(body)
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
error!("OpenTSDB error: {}", err);
|
||||||
|
HttpResponse::InternalServerError().body(format!("Proxy error: {}", err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[actix_web::main]
|
#[actix_web::main]
|
||||||
async fn main() -> std::io::Result<()> {
|
async fn main() -> std::io::Result<()> {
|
||||||
let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string());
|
let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string());
|
||||||
|
@ -113,10 +194,7 @@ async fn main() -> std::io::Result<()> {
|
||||||
|
|
||||||
let web_client = Client::new();
|
let web_client = Client::new();
|
||||||
|
|
||||||
let shared = ClientData {
|
let shared = ClientData { web_client, cfg };
|
||||||
web_client: web_client,
|
|
||||||
cfg: cfg,
|
|
||||||
};
|
|
||||||
let client_data = web::Data::new(shared);
|
let client_data = web::Data::new(shared);
|
||||||
|
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
|
@ -125,6 +203,7 @@ async fn main() -> std::io::Result<()> {
|
||||||
.app_data(web::JsonConfig::default().content_type_required(false))
|
.app_data(web::JsonConfig::default().content_type_required(false))
|
||||||
.wrap(Logger::new("%r %s")) // k8s already logs timestamp
|
.wrap(Logger::new("%r %s")) // k8s already logs timestamp
|
||||||
.service(put_post)
|
.service(put_post)
|
||||||
|
.service(query_get)
|
||||||
})
|
})
|
||||||
.bind(format!("[::]:{}", server_port))?
|
.bind(format!("[::]:{}", server_port))?
|
||||||
.run()
|
.run()
|
||||||
|
|
Loading…
Reference in a new issue