Compare commits
14 commits
Author | SHA1 | Date | |
---|---|---|---|
459620820f | |||
b99572f4e1 | |||
09879a3cee | |||
bab21a9bae | |||
104de8e87e | |||
7ab7caa50e | |||
0f12bbf71e | |||
2c55f5eb54 | |||
816480d091 | |||
dcce9113d1 | |||
0c96891a46 | |||
5755f08798 | |||
bef08d99ef | |||
fc48b862a4 |
7 changed files with 309 additions and 35 deletions
|
@ -3,7 +3,7 @@ name: ci
|
|||
on:
|
||||
push:
|
||||
branches:
|
||||
- docker
|
||||
- main
|
||||
tags:
|
||||
- '*'
|
||||
|
||||
|
@ -41,7 +41,7 @@ jobs:
|
|||
with:
|
||||
username: ${{ secrets.DOCKERHUB_USERNAME }}
|
||||
password: ${{ secrets.DOCKERHUB_PASSWORD }}
|
||||
- name: Extract metadata (tags, labels) for Forge hub
|
||||
- name: Extract metadata (tags, labels) for Dockerhub
|
||||
id: metadockerhub
|
||||
uses: https://github.com/docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
|
||||
with:
|
||||
|
|
98
Cargo.lock
generated
98
Cargo.lock
generated
|
@ -236,6 +236,55 @@ dependencies = [
|
|||
"alloc-no-stdlib",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstream"
|
||||
version = "0.6.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "418c75fa768af9c03be99d17643f93f79bbba589895012a80e3452a19ddda15b"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"anstyle-parse",
|
||||
"anstyle-query",
|
||||
"anstyle-wincon",
|
||||
"colorchoice",
|
||||
"is_terminal_polyfill",
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle"
|
||||
version = "1.0.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "038dfcf04a5feb68e9c60b21c9625a54c2c0616e79b72b0fd87075a056ae1d1b"
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-parse"
|
||||
version = "0.2.4"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c03a11a9034d92058ceb6ee011ce58af4a9bf61491aa7e1e59ecd24bd40d22d4"
|
||||
dependencies = [
|
||||
"utf8parse",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-query"
|
||||
version = "1.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a64c907d4e79225ac72e2a354c9ce84d50ebb4586dee56c82b3ee73004f537f5"
|
||||
dependencies = [
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anstyle-wincon"
|
||||
version = "3.0.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61a38449feb7068f52bb06c12759005cf459ee52bb4adc1d5a7c4322d716fb19"
|
||||
dependencies = [
|
||||
"anstyle",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "atomic-waker"
|
||||
version = "1.1.2"
|
||||
|
@ -349,6 +398,12 @@ version = "1.0.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
|
||||
|
||||
[[package]]
|
||||
name = "colorchoice"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b6a852b24ab71dffc585bcb46eaf7959d175cb865a7152e35b348d1b2960422"
|
||||
|
||||
[[package]]
|
||||
name = "convert_case"
|
||||
version = "0.4.0"
|
||||
|
@ -451,6 +506,29 @@ dependencies = [
|
|||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_filter"
|
||||
version = "0.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a009aa4810eb158359dda09d0c87378e4bbb89b5a801f016885a4707ba24f7ea"
|
||||
dependencies = [
|
||||
"log",
|
||||
"regex",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "env_logger"
|
||||
version = "0.11.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "38b35839ba51819680ba087cd351788c9a3c476841207e0b8cee0b04722343b9"
|
||||
dependencies = [
|
||||
"anstream",
|
||||
"anstyle",
|
||||
"env_filter",
|
||||
"humantime",
|
||||
"log",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "equivalent"
|
||||
version = "1.0.1"
|
||||
|
@ -686,6 +764,12 @@ version = "1.0.3"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9"
|
||||
|
||||
[[package]]
|
||||
name = "humantime"
|
||||
version = "2.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
|
||||
|
||||
[[package]]
|
||||
name = "hyper"
|
||||
version = "1.3.1"
|
||||
|
@ -768,6 +852,12 @@ version = "2.9.0"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
|
||||
|
||||
[[package]]
|
||||
name = "is_terminal_polyfill"
|
||||
version = "1.70.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f8478577c03552c21db0e2724ffb8986a5ce7af88107e6be5d2ee6e158c12800"
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.11"
|
||||
|
@ -970,7 +1060,9 @@ name = "opentsdb-auth-proxy"
|
|||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"actix-web",
|
||||
"env_logger",
|
||||
"glob-match",
|
||||
"log",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
@ -1648,6 +1740,12 @@ dependencies = [
|
|||
"percent-encoding",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "utf8parse"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
|
||||
|
||||
[[package]]
|
||||
name = "vcpkg"
|
||||
version = "0.2.15"
|
||||
|
|
|
@ -5,7 +5,9 @@ edition = "2021"
|
|||
|
||||
[dependencies]
|
||||
actix-web = "4.6.0"
|
||||
env_logger = "0.11.3"
|
||||
glob-match = "0.2.1"
|
||||
log = "0.4.21"
|
||||
reqwest = "0.12.4"
|
||||
serde = { version = "1.0.202", features = ["serde_derive"] }
|
||||
serde_json = "1.0.117"
|
||||
|
|
33
README.md
33
README.md
|
@ -1,8 +1,11 @@
|
|||
# OpenTSDB Auth Proxy
|
||||
|
||||
This is a simple proxy for the [OpenTSDB](https://github.com/OpenTSDB/opentsdb)
|
||||
This is a simple read/write proxy for the [OpenTSDB](https://github.com/OpenTSDB/opentsdb)
|
||||
time series database. It handles authentication and authorization.
|
||||
|
||||
**Warning**: This proxy is currently half baked. It works for my needs though.
|
||||
if you need more features, don't hesitate to make a PR ;-)
|
||||
|
||||
This proxy can be publicly exposed. When sending data to opentsdb, set the endpoint
|
||||
to this proxy instead. Each client will send the data alongside an authentication
|
||||
token.
|
||||
|
@ -10,16 +13,40 @@ token.
|
|||
If the token matches the host and the metric matches the list of allowed metrics,
|
||||
then the request is forwarded to the opentsdb server.
|
||||
|
||||
Supported routes:
|
||||
|
||||
- POST `/put`
|
||||
- GET `/query`
|
||||
|
||||
Supported authentications:
|
||||
|
||||
- sha256
|
||||
|
||||
Supported authorizations:
|
||||
|
||||
- `metrics`: read & write
|
||||
- `read_metrics`
|
||||
- `write_metrics`
|
||||
|
||||
|
||||
|
||||
## Container images
|
||||
|
||||
You can find the images on:
|
||||
|
||||
- the dockerhub as [`frankkkkk/opentsdb-auth-proxy`](https://hub.docker.com/r/frankkkkk/opentsdb-auth-proxy)
|
||||
- my hub: [`forge.k3s.fr/frank/opentsdb-auth-proxy`](https://forge.k3s.fr/frank/-/packages/container/opentsdb-auth-proxy/main)
|
||||
|
||||
|
||||
## Configuration
|
||||
|
||||
Take a look at the provided [sample configuration](./example-cfg.yml)
|
||||
|
||||
### Authentication tokens
|
||||
|
||||
Right now, two authentication tokens are supported:
|
||||
Right now, one authentication token is supported:
|
||||
|
||||
- sha256
|
||||
- plain (not recommended)
|
||||
|
||||
#### Sha256
|
||||
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
clients:
|
||||
# Exclusive writers
|
||||
- name: pyranometer
|
||||
metrics:
|
||||
write_metrics:
|
||||
- irradiance
|
||||
- temperature
|
||||
read_metrics:
|
||||
- weather.*
|
||||
auth:
|
||||
type: sha256
|
||||
hash: ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb # a
|
||||
|
||||
# Reader and writer on the same metric
|
||||
- name: tgbt
|
||||
metrics:
|
||||
- testproxy.*
|
||||
|
@ -13,9 +18,25 @@ clients:
|
|||
type: sha256
|
||||
hash: ac790471b321143716e7773d589af923236ebdd435ba17c671df3558becc5154 # 7a5becc5b5bb581522fd0bb8891bb99a70275620
|
||||
|
||||
# Reader and writer on different metrics
|
||||
- name: tgbt
|
||||
write_metrics:
|
||||
- barfoo
|
||||
read_metrics:
|
||||
- foobar.*
|
||||
auth:
|
||||
# ...
|
||||
|
||||
# Reader only
|
||||
- name: consumer
|
||||
read_metrics:
|
||||
- irradiance
|
||||
auth:
|
||||
# ...
|
||||
|
||||
|
||||
config:
|
||||
opentsdb:
|
||||
url: http://opentsdb/api
|
||||
url: http://192.168.30.2/api/
|
||||
server:
|
||||
port: 8080
|
||||
|
|
|
@ -15,8 +15,13 @@ pub struct Config {
|
|||
#[derive(Debug, Deserialize, Clone)]
|
||||
pub struct Client {
|
||||
pub name: String,
|
||||
#[serde(default)]
|
||||
pub metrics: Vec<String>,
|
||||
pub auth: Auth,
|
||||
#[serde(default)]
|
||||
pub read_metrics: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub write_metrics: Vec<String>,
|
||||
pub auth: Option<Auth>,
|
||||
}
|
||||
|
||||
impl Client {
|
||||
|
@ -26,6 +31,24 @@ impl Client {
|
|||
return true;
|
||||
}
|
||||
}
|
||||
for m in &self.write_metrics {
|
||||
if glob_match(m, metric) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
pub fn can_read(&self, metric: &str) -> bool {
|
||||
for m in &self.metrics {
|
||||
if glob_match(m, metric) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
for m in &self.read_metrics {
|
||||
if glob_match(m, metric) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
@ -44,7 +67,7 @@ impl Auth {
|
|||
let mut hasher = Sha256::new();
|
||||
hasher.update(token);
|
||||
let result = hasher.finalize();
|
||||
return format!("{:x}", result) == self.hash;
|
||||
format!("{:x}", result) == self.hash
|
||||
}
|
||||
_ => false,
|
||||
}
|
||||
|
@ -81,7 +104,7 @@ fn default_opentsdb_url() -> String {
|
|||
|
||||
pub fn load_config_file(filename: &str) -> Config {
|
||||
let yaml_content = fs::read_to_string(filename)
|
||||
.expect(format!("Unable to read config file {}", filename).as_str());
|
||||
.unwrap_or_else(|_| panic!("Unable to read config file `{}`", filename));
|
||||
let config: Config = serde_yaml::from_str(&yaml_content).expect("Unable to parse YAML");
|
||||
config
|
||||
}
|
||||
|
@ -89,5 +112,5 @@ pub fn load_config_file(filename: &str) -> Config {
|
|||
pub fn try_authenticate_client<'a>(clients: &'a [Client], token: &str) -> Option<&'a Client> {
|
||||
clients
|
||||
.iter()
|
||||
.find(|client| client.auth.is_valid_token(token))
|
||||
.find(|client| client.auth.is_some() && client.auth.as_ref().unwrap().is_valid_token(token))
|
||||
}
|
||||
|
|
151
src/main.rs
151
src/main.rs
|
@ -1,6 +1,7 @@
|
|||
use actix_web::http::StatusCode;
|
||||
use actix_web::middleware::Logger;
|
||||
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
|
||||
use log::{debug, error, info};
|
||||
use reqwest::Client;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
@ -15,48 +16,95 @@ struct ClientData {
|
|||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct QSParams {
|
||||
struct QPutParams {
|
||||
token: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct OtsdbData {
|
||||
struct OtsdbPutData {
|
||||
metric: String,
|
||||
value: String,
|
||||
timestamp: f64,
|
||||
tags: HashMap<String, String>,
|
||||
value: StringIntFloat,
|
||||
timestamp: i64,
|
||||
tags: HashMap<String, StringIntFloat>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct QQueryParams {
|
||||
#[serde(default)]
|
||||
token: String,
|
||||
|
||||
#[serde(flatten)]
|
||||
q: OpentsdbQuery,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
struct OpentsdbQuery {
|
||||
start: StringInt,
|
||||
end: Option<StringInt>,
|
||||
m: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
enum StringIntFloat {
|
||||
String(String),
|
||||
Integer(i64),
|
||||
Float(f64),
|
||||
}
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
#[serde(untagged)]
|
||||
enum StringInt {
|
||||
String(String),
|
||||
Integer(i64),
|
||||
}
|
||||
|
||||
const CONFIG_FILE: &str = "config.yaml";
|
||||
|
||||
fn get_metric(m: &String) -> String {
|
||||
let mut metric = m.clone();
|
||||
let pts: Vec<&str> = metric.split(":").collect();
|
||||
pts[1].to_string()
|
||||
}
|
||||
|
||||
#[actix_web::post("/put")]
|
||||
async fn put_post(
|
||||
shared: web::Data<ClientData>,
|
||||
qs: web::Query<QSParams>,
|
||||
body: web::Json<OtsdbData>,
|
||||
qs: web::Query<QPutParams>,
|
||||
body: web::Json<OtsdbPutData>,
|
||||
) -> impl Responder {
|
||||
println!("Body: {:?}", body);
|
||||
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
|
||||
|
||||
if authenticated_client.is_none() {
|
||||
return HttpResponse::Unauthorized().body("Unauthorized. Please specify a valid token.");
|
||||
let emsg = format!(
|
||||
"Unauthorized. Unknown token: {}. Please specify a valid tokne.",
|
||||
qs.token
|
||||
);
|
||||
error!("{}", emsg);
|
||||
return HttpResponse::Unauthorized().body(emsg);
|
||||
}
|
||||
|
||||
let client = authenticated_client.unwrap();
|
||||
|
||||
if !client.can_write(&body.metric) {
|
||||
return HttpResponse::Forbidden().body(format!(
|
||||
"Not allowed to write metric `{}`. Allowed metrics: {}",
|
||||
let emsg = format!(
|
||||
"Not allowed to write metric `{}`. Allowed metrics: {} and {}",
|
||||
body.metric,
|
||||
client.metrics.join(", ")
|
||||
));
|
||||
client.metrics.join(", "),
|
||||
client.write_metrics.join(", ") // XXX make it nicer
|
||||
);
|
||||
error!("{}", emsg);
|
||||
return HttpResponse::Forbidden().body(emsg);
|
||||
}
|
||||
println!("Client: {:?}", client);
|
||||
|
||||
let post_url = format!("{}put", shared.cfg.config.opentsdb.url);
|
||||
let otsdb_body = serde_json::to_string(&body).unwrap();
|
||||
|
||||
println!("POST URL: {}", post_url);
|
||||
info!(
|
||||
"{} sent metric {}={:?}",
|
||||
client.name, body.metric, body.value
|
||||
);
|
||||
debug!("POST {} with body: {}", post_url, otsdb_body);
|
||||
|
||||
let response = shared
|
||||
.web_client
|
||||
|
@ -70,11 +118,67 @@ async fn put_post(
|
|||
let status = resp.status();
|
||||
|
||||
let body = resp.text().await.unwrap_or_else(|_| "".to_string());
|
||||
debug!("OpenTSDB response {}: {}", status, body);
|
||||
let sstatus =
|
||||
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
|
||||
HttpResponse::Ok().status(sstatus).body(body)
|
||||
}
|
||||
Err(err) => HttpResponse::InternalServerError().body(format!("Error: {}", err)),
|
||||
Err(err) => {
|
||||
error!("OpenTSDB error: {}", err);
|
||||
HttpResponse::InternalServerError().body(format!("Proxy error: {}", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_web::get("/query")]
|
||||
async fn query_get(shared: web::Data<ClientData>, qs: web::Query<QQueryParams>) -> impl Responder {
|
||||
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
|
||||
|
||||
if authenticated_client.is_none() {
|
||||
let emsg = format!(
|
||||
"Unauthorized. Unknown token: {}. Please specify a valid tokne.",
|
||||
qs.token
|
||||
);
|
||||
error!("{}", emsg);
|
||||
return HttpResponse::Unauthorized().body(emsg);
|
||||
}
|
||||
|
||||
let client = authenticated_client.unwrap();
|
||||
|
||||
println!("Query get: {:?}", qs);
|
||||
let metric = get_metric(&qs.q.m);
|
||||
|
||||
if !client.can_read(&metric) {
|
||||
let emsg = format!("Not allowed to read metric `{}`", metric);
|
||||
error!("{}", emsg);
|
||||
return HttpResponse::Forbidden().body(emsg);
|
||||
}
|
||||
|
||||
let get_url = format!("{}query", shared.cfg.config.opentsdb.url);
|
||||
//let otsdb_body = serde_json::to_string(&body).unwrap();
|
||||
//let query_string
|
||||
|
||||
info!("{} get metric {}", client.name, metric);
|
||||
debug!("GET {} with qs: {:?}", get_url, qs.q);
|
||||
|
||||
let response = shared.web_client.get(get_url).query(&qs.q).send().await;
|
||||
|
||||
match response {
|
||||
Ok(resp) => {
|
||||
let status = resp.status();
|
||||
|
||||
let body = resp.text().await.unwrap_or_else(|_| "".to_string());
|
||||
debug!("OpenTSDB response {}: {}", status, body);
|
||||
let sstatus =
|
||||
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
|
||||
HttpResponse::Ok().status(sstatus).body(body)
|
||||
}
|
||||
Err(err) => {
|
||||
error!("OpenTSDB error: {}", err);
|
||||
HttpResponse::InternalServerError().body(format!("Proxy error: {}", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -83,24 +187,23 @@ async fn main() -> std::io::Result<()> {
|
|||
let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string());
|
||||
let cfg = config::load_config_file(&cfg_file);
|
||||
|
||||
println!("Config: {:?}", cfg);
|
||||
env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));
|
||||
|
||||
println!("Loaded config: {:#?}", cfg);
|
||||
let server_port = cfg.config.server.port.clone();
|
||||
|
||||
let web_client = Client::new();
|
||||
|
||||
let shared = ClientData {
|
||||
web_client: web_client,
|
||||
cfg: cfg,
|
||||
};
|
||||
let shared = ClientData { web_client, cfg };
|
||||
let client_data = web::Data::new(shared);
|
||||
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.app_data(client_data.clone()) //.client_data.clone())
|
||||
.app_data(client_data.clone())
|
||||
.app_data(web::JsonConfig::default().content_type_required(false))
|
||||
.wrap(Logger::default())
|
||||
.wrap(Logger::new("%r %s")) // k8s already logs timestamp
|
||||
.service(put_post)
|
||||
//.route("/put", web::post().to(put_post))
|
||||
.service(query_get)
|
||||
})
|
||||
.bind(format!("[::]:{}", server_port))?
|
||||
.run()
|
||||
|
|
Loading…
Reference in a new issue