lib/protoparser: add -*TrimTimstamp command-line flags for Influx, Graphite, OpenTSDB and CSV data

These flags can be used for reducing disk space usage for timestamps data ingested over the given protocols
This commit is contained in:
Aliaksandr Valialkin 2020-04-10 12:43:51 +03:00
parent 0681b4c27a
commit e62afc7366
5 changed files with 89 additions and 11 deletions

View file

@ -1,6 +1,7 @@
package csvimport package csvimport
import ( import (
"flag"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -13,6 +14,11 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var (
trimTimestamp = flag.Duration("csvTrimTimestamp", time.Millisecond, "Trim timestamps when importing csv data to this duration. "+
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
)
// ParseStream parses csv from req and calls callback for the parsed rows. // ParseStream parses csv from req and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from req. // The callback can be called multiple times for streamed data from req.
@ -61,14 +67,25 @@ func (ctx *streamContext) Read(r io.Reader, cds []ColumnDescriptor) bool {
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf), cds) ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf), cds)
rowsRead.Add(len(ctx.Rows.Rows)) rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Set missing timestamps // Set missing timestamps
currentTs := time.Now().UnixNano() / 1e6 currentTs := time.Now().UnixNano() / 1e6
for i := range ctx.Rows.Rows { for i := range rows {
row := &ctx.Rows.Rows[i] row := &rows[i]
if row.Timestamp == 0 { if row.Timestamp == 0 {
row.Timestamp = currentTs row.Timestamp = currentTs
} }
} }
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
return true return true
} }

View file

@ -1,6 +1,7 @@
package graphite package graphite
import ( import (
"flag"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -13,6 +14,11 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var (
trimTimestamp = flag.Duration("graphiteTrimTimestamp", time.Second, "Trim timestamps for Graphite data to this duration. "+
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
)
// ParseStream parses Graphite lines from r and calls callback for the parsed rows. // ParseStream parses Graphite lines from r and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from r. // The callback can be called multiple times for streamed data from r.
@ -60,9 +66,10 @@ func (ctx *streamContext) Read(r io.Reader) bool {
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows)) rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Fill missing timestamps with the current timestamp rounded to seconds. // Fill missing timestamps with the current timestamp rounded to seconds.
currentTimestamp := time.Now().Unix() currentTimestamp := time.Now().Unix()
rows := ctx.Rows.Rows
for i := range rows { for i := range rows {
r := &rows[i] r := &rows[i]
if r.Timestamp == 0 { if r.Timestamp == 0 {
@ -75,6 +82,14 @@ func (ctx *streamContext) Read(r io.Reader) bool {
rows[i].Timestamp *= 1e3 rows[i].Timestamp *= 1e3
} }
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
return true return true
} }

View file

@ -1,6 +1,7 @@
package influx package influx
import ( import (
"flag"
"fmt" "fmt"
"io" "io"
"runtime" "runtime"
@ -12,6 +13,11 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var (
trimTimestamp = flag.Duration("influxTrimTimestamp", time.Millisecond, "Trim timestamps for Influx line protocol data to this duration. "+
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
)
// ParseStream parses r with the given args and calls callback for the parsed rows. // ParseStream parses r with the given args and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from r. // The callback can be called multiple times for streamed data from r.
@ -70,11 +76,13 @@ func (ctx *streamContext) Read(r io.Reader, tsMultiplier int64) bool {
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows)) rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Adjust timestamps according to tsMultiplier // Adjust timestamps according to tsMultiplier
currentTs := time.Now().UnixNano() / 1e6 currentTs := time.Now().UnixNano() / 1e6
if tsMultiplier >= 1 { if tsMultiplier >= 1 {
for i := range ctx.Rows.Rows { for i := range rows {
row := &ctx.Rows.Rows[i] row := &rows[i]
if row.Timestamp == 0 { if row.Timestamp == 0 {
row.Timestamp = currentTs row.Timestamp = currentTs
} else { } else {
@ -84,8 +92,8 @@ func (ctx *streamContext) Read(r io.Reader, tsMultiplier int64) bool {
} else if tsMultiplier < 0 { } else if tsMultiplier < 0 {
tsMultiplier = -tsMultiplier tsMultiplier = -tsMultiplier
currentTs -= currentTs % tsMultiplier currentTs -= currentTs % tsMultiplier
for i := range ctx.Rows.Rows { for i := range rows {
row := &ctx.Rows.Rows[i] row := &rows[i]
if row.Timestamp == 0 { if row.Timestamp == 0 {
row.Timestamp = currentTs row.Timestamp = currentTs
} else { } else {
@ -93,6 +101,15 @@ func (ctx *streamContext) Read(r io.Reader, tsMultiplier int64) bool {
} }
} }
} }
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
return true return true
} }

View file

@ -1,6 +1,7 @@
package opentsdb package opentsdb
import ( import (
"flag"
"fmt" "fmt"
"io" "io"
"net" "net"
@ -13,6 +14,11 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var (
trimTimestamp = flag.Duration("opentsdbTrimTimestamp", time.Second, "Trim timestamps for OpenTSDB 'telnet put' data to this duration. "+
"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
)
// ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. // ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows.
// //
// The callback can be called multiple times for streamed data from r. // The callback can be called multiple times for streamed data from r.
@ -59,9 +65,10 @@ func (ctx *streamContext) Read(r io.Reader) bool {
ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf))
rowsRead.Add(len(ctx.Rows.Rows)) rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Fill in missing timestamps // Fill in missing timestamps
currentTimestamp := time.Now().Unix() currentTimestamp := time.Now().Unix()
rows := ctx.Rows.Rows
for i := range rows { for i := range rows {
r := &rows[i] r := &rows[i]
if r.Timestamp == 0 { if r.Timestamp == 0 {
@ -73,6 +80,15 @@ func (ctx *streamContext) Read(r io.Reader) bool {
for i := range rows { for i := range rows {
rows[i].Timestamp *= 1e3 rows[i].Timestamp *= 1e3
} }
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
return true return true
} }

View file

@ -14,7 +14,11 @@ import (
"github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metrics"
) )
var maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request") var (
maxInsertRequestSize = flag.Int("opentsdbhttp.maxInsertRequestSize", 32*1024*1024, "The maximum size of OpenTSDB HTTP put request")
trimTimestamp = flag.Duration("opentsdbhttpTrimTimestamp", time.Millisecond, "Trim timestamps for OpenTSDB HTTP data to this duration. "+
"Minimum practical duration is 1ms. Higher duration (i.e. 1s) may be used for reducing disk space usage for timestamp data")
)
// ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows. // ParseStream parses OpenTSDB http lines from req and calls callback for the parsed rows.
// //
@ -60,9 +64,10 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
ctx.Rows.Unmarshal(v) ctx.Rows.Unmarshal(v)
rowsRead.Add(len(ctx.Rows.Rows)) rowsRead.Add(len(ctx.Rows.Rows))
rows := ctx.Rows.Rows
// Fill in missing timestamps // Fill in missing timestamps
currentTimestamp := time.Now().Unix() currentTimestamp := time.Now().Unix()
rows := ctx.Rows.Rows
for i := range rows { for i := range rows {
r := &rows[i] r := &rows[i]
if r.Timestamp == 0 { if r.Timestamp == 0 {
@ -79,8 +84,16 @@ func ParseStream(req *http.Request, callback func(rows []Row) error) error {
} }
} }
// Trim timestamps if required.
if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1 {
for i := range rows {
row := &rows[i]
row.Timestamp -= row.Timestamp % tsTrim
}
}
// Insert ctx.Rows to db. // Insert ctx.Rows to db.
return callback(ctx.Rows.Rows) return callback(rows)
} }
const secondMask int64 = 0x7FFFFFFF00000000 const secondMask int64 = 0x7FFFFFFF00000000