first commit
Signed-off-by: Frank Villaro-Dixon <frank@villaro-dixon.eu>
This commit is contained in:
commit
c8950355cf
7 changed files with 2239 additions and 0 deletions
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
@ -0,0 +1 @@
|
||||||
|
/target
|
1949
Cargo.lock
generated
Normal file
1949
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load diff
13
Cargo.toml
Normal file
13
Cargo.toml
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
[package]
|
||||||
|
name = "opentsdb-auth-proxy"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
actix-web = "4.6.0"
|
||||||
|
glob-match = "0.2.1"
|
||||||
|
reqwest = "0.12.4"
|
||||||
|
serde = { version = "1.0.202", features = ["serde_derive"] }
|
||||||
|
serde_json = "1.0.117"
|
||||||
|
serde_yaml = "0.9.34"
|
||||||
|
sha2 = "0.10.8"
|
54
README.md
Normal file
54
README.md
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
# OpenTSDB Auth Proxy
|
||||||
|
|
||||||
|
This is a simple proxy for the [OpenTSDB](https://github.com/OpenTSDB/opentsdb)
|
||||||
|
time series database. It handles authentication and authorization.
|
||||||
|
|
||||||
|
This proxy can be publicly exposed. When sending data to opentsdb, set the endpoint
|
||||||
|
to this proxy instead. Each client will send the data alongside an authentication
|
||||||
|
token.
|
||||||
|
|
||||||
|
If the token matches the host and the metric matches the list of allowed metrics,
|
||||||
|
then the request is forwarded to the opentsdb server.
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Take a look at the provided [sample configuration](./example-cfg.yml)
|
||||||
|
|
||||||
|
### Authentication tokens
|
||||||
|
|
||||||
|
Right now, two authentication tokens are supported:
|
||||||
|
|
||||||
|
- sha256
|
||||||
|
- plain (not recommended)
|
||||||
|
|
||||||
|
#### Sha256
|
||||||
|
|
||||||
|
To generate a sha256 token for a specific producer, do the following:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Generate a token
|
||||||
|
TOKEN=$(openssl rand -hex 20)
|
||||||
|
SHA=$(echo -n $TOKEN | sha256sum - | awk '{print $1}')
|
||||||
|
|
||||||
|
echo "Token for the device is $TOKEN . Sha256 is $SHA"
|
||||||
|
# Token for the device is 7a5becc5b5bb581522fd0bb8891bb99a70275620 . Sha256 is ac790471b321143716e7773d589af923236ebdd435ba17c671df3558becc5154
|
||||||
|
```
|
||||||
|
|
||||||
|
The producer will need to send its token on query string:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
curl -X POST https://my-proxy/api/put?token=7a5....
|
||||||
|
```
|
||||||
|
|
||||||
|
You then need to specify the hash in the config file. This file is then "safe"
|
||||||
|
if the token is reasonably random.
|
||||||
|
|
||||||
|
#### Plain
|
||||||
|
|
||||||
|
To be implemented; but don't do it.
|
||||||
|
|
||||||
|
## Notes about exposing OpenTSDB
|
||||||
|
|
||||||
|
Currently, OpenTSDB does not support authentication. If you run opentsdb in a k8s
|
||||||
|
cluster, protect its ingress too. Either via a different ingress class, or with
|
||||||
|
specific per-ingress-ctrl anotations.
|
21
example-cfg.yml
Normal file
21
example-cfg.yml
Normal file
|
@ -0,0 +1,21 @@
|
||||||
|
clients:
|
||||||
|
- name: pyranometer
|
||||||
|
metrics:
|
||||||
|
- irradiance
|
||||||
|
- temperature
|
||||||
|
auth:
|
||||||
|
type: sha256
|
||||||
|
hash: ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb # a
|
||||||
|
- name: tgbt
|
||||||
|
metrics:
|
||||||
|
- testproxy.*
|
||||||
|
auth:
|
||||||
|
type: sha256
|
||||||
|
hash: ac790471b321143716e7773d589af923236ebdd435ba17c671df3558becc5154 # 7a5becc5b5bb581522fd0bb8891bb99a70275620
|
||||||
|
|
||||||
|
|
||||||
|
config:
|
||||||
|
opentsdb:
|
||||||
|
url: http://opentsdb/api
|
||||||
|
server:
|
||||||
|
port: 8080
|
93
src/config.rs
Normal file
93
src/config.rs
Normal file
|
@ -0,0 +1,93 @@
|
||||||
|
use glob_match::glob_match;
|
||||||
|
use serde::Deserialize;
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use std::env;
|
||||||
|
use std::fs;
|
||||||
|
|
||||||
|
const DEFAULT_LISTEN_PORT: &str = "8080";
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct Config {
|
||||||
|
pub clients: Vec<Client>,
|
||||||
|
pub config: ConfigOptions,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct Client {
|
||||||
|
pub name: String,
|
||||||
|
pub metrics: Vec<String>,
|
||||||
|
pub auth: Auth,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Client {
|
||||||
|
pub fn can_write(&self, metric: &str) -> bool {
|
||||||
|
for m in &self.metrics {
|
||||||
|
if glob_match(m, metric) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct Auth {
|
||||||
|
#[serde(rename = "type")]
|
||||||
|
pub auth_type: String,
|
||||||
|
pub hash: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Auth {
|
||||||
|
pub fn is_valid_token(&self, token: &str) -> bool {
|
||||||
|
match self.auth_type.as_str() {
|
||||||
|
"sha256" => {
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(token);
|
||||||
|
let result = hasher.finalize();
|
||||||
|
return format!("{:x}", result) == self.hash;
|
||||||
|
}
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct ConfigOptions {
|
||||||
|
pub opentsdb: Opentsdb,
|
||||||
|
pub server: Server,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct Opentsdb {
|
||||||
|
#[serde(default = "default_opentsdb_url")]
|
||||||
|
pub url: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Clone)]
|
||||||
|
pub struct Server {
|
||||||
|
#[serde(default = "default_listen_port")]
|
||||||
|
pub port: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_listen_port() -> String {
|
||||||
|
DEFAULT_LISTEN_PORT.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_opentsdb_url() -> String {
|
||||||
|
env::var("OPENTSDB_URL")
|
||||||
|
.expect("OPENTSDB_URL must be set or defined in config file")
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn load_config_file(filename: &str) -> Config {
|
||||||
|
let yaml_content = fs::read_to_string(filename)
|
||||||
|
.expect(format!("Unable to read config file {}", filename).as_str());
|
||||||
|
let config: Config = serde_yaml::from_str(&yaml_content).expect("Unable to parse YAML");
|
||||||
|
config
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn try_authenticate_client<'a>(clients: &'a [Client], token: &str) -> Option<&'a Client> {
|
||||||
|
clients
|
||||||
|
.iter()
|
||||||
|
.find(|client| client.auth.is_valid_token(token))
|
||||||
|
}
|
108
src/main.rs
Normal file
108
src/main.rs
Normal file
|
@ -0,0 +1,108 @@
|
||||||
|
use actix_web::http::StatusCode;
|
||||||
|
use actix_web::middleware::Logger;
|
||||||
|
use actix_web::{web, App, HttpResponse, HttpServer, Responder};
|
||||||
|
use reqwest::Client;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::env;
|
||||||
|
|
||||||
|
mod config;
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
struct ClientData {
|
||||||
|
web_client: Client,
|
||||||
|
cfg: config::Config,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct QSParams {
|
||||||
|
token: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize, Serialize)]
|
||||||
|
struct OtsdbData {
|
||||||
|
metric: String,
|
||||||
|
value: String,
|
||||||
|
timestamp: f64,
|
||||||
|
tags: HashMap<String, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
const CONFIG_FILE: &str = "config.yaml";
|
||||||
|
|
||||||
|
#[actix_web::post("/put")]
|
||||||
|
async fn put_post(
|
||||||
|
shared: web::Data<ClientData>,
|
||||||
|
qs: web::Query<QSParams>,
|
||||||
|
body: web::Json<OtsdbData>,
|
||||||
|
) -> impl Responder {
|
||||||
|
println!("Body: {:?}", body);
|
||||||
|
let authenticated_client = config::try_authenticate_client(&shared.cfg.clients, &qs.token);
|
||||||
|
|
||||||
|
if authenticated_client.is_none() {
|
||||||
|
return HttpResponse::Unauthorized().body("Unauthorized. Please specify a valid token.");
|
||||||
|
}
|
||||||
|
|
||||||
|
let client = authenticated_client.unwrap();
|
||||||
|
|
||||||
|
if !client.can_write(&body.metric) {
|
||||||
|
return HttpResponse::Forbidden().body(format!(
|
||||||
|
"Not allowed to write metric `{}`. Allowed metrics: {}",
|
||||||
|
body.metric,
|
||||||
|
client.metrics.join(", ")
|
||||||
|
));
|
||||||
|
}
|
||||||
|
println!("Client: {:?}", client);
|
||||||
|
|
||||||
|
let post_url = format!("{}put", shared.cfg.config.opentsdb.url);
|
||||||
|
let otsdb_body = serde_json::to_string(&body).unwrap();
|
||||||
|
|
||||||
|
println!("POST URL: {}", post_url);
|
||||||
|
|
||||||
|
let response = shared
|
||||||
|
.web_client
|
||||||
|
.post(post_url)
|
||||||
|
.body(otsdb_body)
|
||||||
|
.send()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
match response {
|
||||||
|
Ok(resp) => {
|
||||||
|
let status = resp.status();
|
||||||
|
|
||||||
|
let body = resp.text().await.unwrap_or_else(|_| "".to_string());
|
||||||
|
let sstatus =
|
||||||
|
StatusCode::from_u16(status.as_u16()).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
|
||||||
|
HttpResponse::Ok().status(sstatus).body(body)
|
||||||
|
}
|
||||||
|
Err(err) => HttpResponse::InternalServerError().body(format!("Error: {}", err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[actix_web::main]
|
||||||
|
async fn main() -> std::io::Result<()> {
|
||||||
|
let cfg_file = env::var("CONFIG_FILE").unwrap_or(CONFIG_FILE.to_string());
|
||||||
|
let cfg = config::load_config_file(&cfg_file);
|
||||||
|
|
||||||
|
println!("Config: {:?}", cfg);
|
||||||
|
let server_port = cfg.config.server.port.clone();
|
||||||
|
|
||||||
|
let web_client = Client::new();
|
||||||
|
|
||||||
|
let shared = ClientData {
|
||||||
|
web_client: web_client,
|
||||||
|
cfg: cfg,
|
||||||
|
};
|
||||||
|
let client_data = web::Data::new(shared);
|
||||||
|
|
||||||
|
HttpServer::new(move || {
|
||||||
|
App::new()
|
||||||
|
.app_data(client_data.clone()) //.client_data.clone())
|
||||||
|
.app_data(web::JsonConfig::default().content_type_required(false))
|
||||||
|
.wrap(Logger::default())
|
||||||
|
.service(put_post)
|
||||||
|
//.route("/put", web::post().to(put_post))
|
||||||
|
})
|
||||||
|
.bind(format!("[::]:{}", server_port))?
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
}
|
Loading…
Reference in a new issue