From e62afc7366fdeba69849424bc7ae5815c208531b Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 10 Apr 2020 12:43:51 +0300 Subject: [PATCH] lib/protoparser: add `-*TrimTimstamp` command-line flags for Influx, Graphite, OpenTSDB and CSV data These flags can be used for reducing disk space usage for timestamps data ingested over the given protocols --- lib/protoparser/csvimport/streamparser.go | 21 ++++++++++++++-- lib/protoparser/graphite/streamparser.go | 17 ++++++++++++- lib/protoparser/influx/streamparser.go | 25 ++++++++++++++++---- lib/protoparser/opentsdb/streamparser.go | 18 +++++++++++++- lib/protoparser/opentsdbhttp/streamparser.go | 19 ++++++++++++--- 5 files changed, 89 insertions(+), 11 deletions(-) diff --git a/lib/protoparser/csvimport/streamparser.go b/lib/protoparser/csvimport/streamparser.go index 9c6eb30b8..47f66c6d0 100644 --- a/lib/protoparser/csvimport/streamparser.go +++ b/lib/protoparser/csvimport/streamparser.go @@ -1,6 +1,7 @@ package csvimport import ( + "flag" "fmt" "io" "net/http" @@ -13,6 +14,11 @@ import ( "github.com/VictoriaMetrics/metrics" ) +var ( + trimTimestamp = flag.Duration("csvTrimTimestamp", time.Millisecond, "Trim timestamps when importing csv data to this duration. "+ + "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") +) + // ParseStream parses csv from req and calls callback for the parsed rows. // // The callback can be called multiple times for streamed data from req. @@ -61,14 +67,25 @@ func (ctx *streamContext) Read(r io.Reader, cds []ColumnDescriptor) bool { ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf), cds) rowsRead.Add(len(ctx.Rows.Rows)) + rows := ctx.Rows.Rows + // Set missing timestamps currentTs := time.Now().UnixNano() / 1e6 - for i := range ctx.Rows.Rows { - row := &ctx.Rows.Rows[i] + for i := range rows { + row := &rows[i] if row.Timestamp == 0 { row.Timestamp = currentTs } } + + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + return true } diff --git a/lib/protoparser/graphite/streamparser.go b/lib/protoparser/graphite/streamparser.go index faf6069b5..afda93c8c 100644 --- a/lib/protoparser/graphite/streamparser.go +++ b/lib/protoparser/graphite/streamparser.go @@ -1,6 +1,7 @@ package graphite import ( + "flag" "fmt" "io" "net" @@ -13,6 +14,11 @@ import ( "github.com/VictoriaMetrics/metrics" ) +var ( + trimTimestamp = flag.Duration("graphiteTrimTimestamp", time.Second, "Trim timestamps for Graphite data to this duration. "+ + "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") +) + // ParseStream parses Graphite lines from r and calls callback for the parsed rows. // // The callback can be called multiple times for streamed data from r. @@ -60,9 +66,10 @@ func (ctx *streamContext) Read(r io.Reader) bool { ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) rowsRead.Add(len(ctx.Rows.Rows)) + rows := ctx.Rows.Rows + // Fill missing timestamps with the current timestamp rounded to seconds. currentTimestamp := time.Now().Unix() - rows := ctx.Rows.Rows for i := range rows { r := &rows[i] if r.Timestamp == 0 { @@ -75,6 +82,14 @@ func (ctx *streamContext) Read(r io.Reader) bool { rows[i].Timestamp *= 1e3 } + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + return true } diff --git a/lib/protoparser/influx/streamparser.go b/lib/protoparser/influx/streamparser.go index 7e4482daf..918b4270a 100644 --- a/lib/protoparser/influx/streamparser.go +++ b/lib/protoparser/influx/streamparser.go @@ -1,6 +1,7 @@ package influx import ( + "flag" "fmt" "io" "runtime" @@ -12,6 +13,11 @@ import ( "github.com/VictoriaMetrics/metrics" ) +var ( + trimTimestamp = flag.Duration("influxTrimTimestamp", time.Millisecond, "Trim timestamps for Influx line protocol data to this duration. "+ + "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") +) + // ParseStream parses r with the given args and calls callback for the parsed rows. // // The callback can be called multiple times for streamed data from r. @@ -70,11 +76,13 @@ func (ctx *streamContext) Read(r io.Reader, tsMultiplier int64) bool { ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) rowsRead.Add(len(ctx.Rows.Rows)) + rows := ctx.Rows.Rows + // Adjust timestamps according to tsMultiplier currentTs := time.Now().UnixNano() / 1e6 if tsMultiplier >= 1 { - for i := range ctx.Rows.Rows { - row := &ctx.Rows.Rows[i] + for i := range rows { + row := &rows[i] if row.Timestamp == 0 { row.Timestamp = currentTs } else { @@ -84,8 +92,8 @@ func (ctx *streamContext) Read(r io.Reader, tsMultiplier int64) bool { } else if tsMultiplier < 0 { tsMultiplier = -tsMultiplier currentTs -= currentTs % tsMultiplier - for i := range ctx.Rows.Rows { - row := &ctx.Rows.Rows[i] + for i := range rows { + row := &rows[i] if row.Timestamp == 0 { row.Timestamp = currentTs } else { @@ -93,6 +101,15 @@ func (ctx *streamContext) Read(r io.Reader, tsMultiplier int64) bool { } } } + + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + return true } diff --git a/lib/protoparser/opentsdb/streamparser.go b/lib/protoparser/opentsdb/streamparser.go index f5bdf844d..031a986df 100644 --- a/lib/protoparser/opentsdb/streamparser.go +++ b/lib/protoparser/opentsdb/streamparser.go @@ -1,6 +1,7 @@ package opentsdb import ( + "flag" "fmt" "io" "net" @@ -13,6 +14,11 @@ import ( "github.com/VictoriaMetrics/metrics" ) +var ( + trimTimestamp = flag.Duration("opentsdbTrimTimestamp", time.Second, "Trim timestamps for OpenTSDB 'telnet put' data to this duration. "+ + "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") +) + // ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. // // The callback can be called multiple times for streamed data from r. @@ -59,9 +65,10 @@ func (ctx *streamContext) Read(r io.Reader) bool { ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) rowsRead.Add(len(ctx.Rows.Rows)) + rows := ctx.Rows.Rows + // Fill in missing timestamps currentTimestamp := time.Now().Unix() - rows := ctx.Rows.Rows for i := range rows { r := &rows[i] if r.Timestamp == 0 { @@ -73,6 +80,15 @@ func (ctx *streamContext) Read(r io.Reader) bool { for i := range rows { rows[i].Timestamp *= 1e3 } + + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + return true } diff --git a/lib/protoparser/opentsdbhttp/streamparser.go b/lib/protoparser/opentsdbhttp/streamparser.go index f3c18c72f..28a842c5c 100644 --- a/lib/protoparser/opentsdbhttp/streamparser.go +++ b/lib/protoparser/opentsdbhttp/streamparser.go @@ -14,7 +14,11 @@ import ( "github.com/VictoriaMetrics/metrics" ) -var maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request") +var ( + maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request") + trimTimestamp = flag.Duration("opentsdbhttpTrimTimestamp", time.Millisecond, "Trim timestamps for OpenTSDB HTTP data to this duration. "+ + "Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data") +) // ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows. // @@ -60,9 +64,10 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { ctx.Rows.Unmarshal(v) rowsRead.Add(len(ctx.Rows.Rows)) + rows := ctx.Rows.Rows + // Fill in missing timestamps currentTimestamp := time.Now().Unix() - rows := ctx.Rows.Rows for i := range rows { r := &rows[i] if r.Timestamp == 0 { @@ -79,8 +84,16 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error { } } + // Trim timestamps if required. + if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 { + for i := range rows { + row := &rows[i] + row.Timestamp -= row.Timestamp % tsTrim + } + } + // Insert ctx.Rows to db. - return callback(ctx.Rows.Rows) + return callback(rows) } const secondMask int64 = 0x7FFFFFFF00000000