package opentsdb import ( "errors" "flag" "fmt" "io" "net" "runtime" "sync" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" "github.com/VictoriaMetrics/metrics" ) var ( trimTimestamp = flag.Duration("opentsdbTrimTimestamp", time.Second, "Trim timestamps for OpenTSDB 'telnet put' data to this duration. "+ "Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data") ) // ParseStream parses OpenTSDB lines from r and calls callback for the parsed rows. // // The callback can be called multiple times for streamed data from r. // // callback shouldn't hold rows after returning. func ParseStream(r io.Reader, callback func(rows []Row) error) error { ctx := getStreamContext() defer putStreamContext(ctx) for ctx.Read(r) { if err := callback(ctx.Rows.Rows); err != nil { return err } } return ctx.Error() } const flushTimeout = 3 * time.Second func (ctx *streamContext) Read(r io.Reader) bool { readCalls.Inc() if ctx.err != nil { return false } if c, ok := r.(net.Conn); ok { if err := c.SetReadDeadline(time.Now().Add(flushTimeout)); err != nil { readErrors.Inc() ctx.err = fmt.Errorf("cannot set read deadline: %w", err) return false } } ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(r, ctx.reqBuf, ctx.tailBuf) if ctx.err != nil { var ne net.Error if errors.As(ctx.err, &ne) && ne.Timeout() { // Flush the read data on timeout and try reading again. ctx.err = nil } else { if ctx.err != io.EOF { readErrors.Inc() ctx.err = fmt.Errorf("cannot read OpenTSDB put protocol data: %w", ctx.err) } return false } } ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) rowsRead.Add(len(ctx.Rows.Rows)) rows := ctx.Rows.Rows // Fill in missing timestamps currentTimestamp := int64(fasttime.UnixTimestamp()) for i := range rows { r := &rows[i] if r.Timestamp == 0 { r.Timestamp = currentTimestamp } } // Convert timestamps from seconds to milliseconds for i := range rows { rows[i].Timestamp *= 1e3 } // Trim timestamps if required. if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 { for i := range rows { row := &rows[i] row.Timestamp -= row.Timestamp % tsTrim } } return true } type streamContext struct { Rows Rows reqBuf []byte tailBuf []byte err error } func (ctx *streamContext) Error() error { if ctx.err == io.EOF { return nil } return ctx.err } func (ctx *streamContext) reset() { ctx.Rows.Reset() ctx.reqBuf = ctx.reqBuf[:0] ctx.tailBuf = ctx.tailBuf[:0] ctx.err = nil } var ( readCalls = metrics.NewCounter(`vm_protoparser_read_calls_total{type="opentsdb"}`) readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="opentsdb"}`) rowsRead = metrics.NewCounter(`vm_protoparser_rows_read_total{type="opentsdb"}`) ) func getStreamContext() *streamContext { select { case ctx := <-streamContextPoolCh: return ctx default: if v := streamContextPool.Get(); v != nil { return v.(*streamContext) } return &streamContext{} } } func putStreamContext(ctx *streamContext) { ctx.reset() select { case streamContextPoolCh <- ctx: default: streamContextPool.Put(ctx) } } var streamContextPool sync.Pool var streamContextPoolCh = make(chan *streamContext, runtime.GOMAXPROCS(-1))