From 864fbf91250b13389f8a195849db241fca72ed77 Mon Sep 17 00:00:00 2001
From: hagen1778 <roman@victoriametrics.com>
Date: Fri, 10 May 2024 14:28:37 +0200
Subject: [PATCH] Statsd protocol compatibility (#5053)

In this PR I added compatibility with [statsd
protocol](https://github.com/b/statsd_spec) with tags to be able to send
metrics directly from statsd clients to vmagent or directly to VM.
For example its compatible with
[statsd-instrument](https://github.com/Shopify/statsd-instrument) and
[dogstatsd-ruby](https://github.com/DataDog/dogstatsd-ruby) gems

Related issues: #5052, #206, #4600

(cherry picked from commit c6c5a5a18695ab5f0be1f68bbb63bf90f6f15657)
Signed-off-by: hagen1778 <roman@victoriametrics.com>
---
 lib/ingestserver/statsd/server.go             | 173 +++++++++
 lib/protoparser/statsd/parser.go              | 226 +++++++++++
 lib/protoparser/statsd/parser_test.go         | 367 ++++++++++++++++++
 lib/protoparser/statsd/parser_timing_test.go  |  25 ++
 lib/protoparser/statsd/stream/streamparser.go | 218 +++++++++++
 .../statsd/stream/streamparser_test.go        |  60 +++
 6 files changed, 1069 insertions(+)
 create mode 100644 lib/ingestserver/statsd/server.go
 create mode 100644 lib/protoparser/statsd/parser.go
 create mode 100644 lib/protoparser/statsd/parser_test.go
 create mode 100644 lib/protoparser/statsd/parser_timing_test.go
 create mode 100644 lib/protoparser/statsd/stream/streamparser.go
 create mode 100644 lib/protoparser/statsd/stream/streamparser_test.go

diff --git a/lib/ingestserver/statsd/server.go b/lib/ingestserver/statsd/server.go
new file mode 100644
index 0000000000..533d2ed1be
--- /dev/null
+++ b/lib/ingestserver/statsd/server.go
@@ -0,0 +1,173 @@
+package statsd
+
+import (
+	"errors"
+	"io"
+	"net"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/ingestserver"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil"
+	"github.com/VictoriaMetrics/metrics"
+)
+
+var (
+	writeRequestsTCP = metrics.NewCounter(`vm_ingestserver_requests_total{type="statsd", name="write", net="tcp"}`)
+	writeErrorsTCP   = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="statsd", name="write", net="tcp"}`)
+
+	writeRequestsUDP = metrics.NewCounter(`vm_ingestserver_requests_total{type="statsd", name="write", net="udp"}`)
+	writeErrorsUDP   = metrics.NewCounter(`vm_ingestserver_request_errors_total{type="statsd", name="write", net="udp"}`)
+)
+
+// Server accepts Statsd plaintext lines over TCP and UDP.
+type Server struct {
+	addr  string
+	lnTCP net.Listener
+	lnUDP net.PacketConn
+	wg    sync.WaitGroup
+	cm    ingestserver.ConnsMap
+}
+
+// MustStart starts statsd server on the given addr.
+//
+// The incoming connections are processed with insertHandler.
+//
+// If useProxyProtocol is set to true, then the incoming connections are accepted via proxy protocol.
+// See https://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
+//
+// MustStop must be called on the returned server when it is no longer needed.
+func MustStart(addr string, useProxyProtocol bool, insertHandler func(r io.Reader) error) *Server {
+	logger.Infof("starting TCP Statsd server at %q", addr)
+	lnTCP, err := netutil.NewTCPListener("statsd", addr, useProxyProtocol, nil)
+	if err != nil {
+		logger.Fatalf("cannot start TCP Statsd server at %q: %s", addr, err)
+	}
+
+	logger.Infof("starting UDP Statsd server at %q", addr)
+	lnUDP, err := net.ListenPacket(netutil.GetUDPNetwork(), addr)
+	if err != nil {
+		logger.Fatalf("cannot start UDP Statsd server at %q: %s", addr, err)
+	}
+
+	s := &Server{
+		addr:  addr,
+		lnTCP: lnTCP,
+		lnUDP: lnUDP,
+	}
+	s.cm.Init("statsd")
+	s.wg.Add(1)
+	go func() {
+		defer s.wg.Done()
+		s.serveTCP(insertHandler)
+		logger.Infof("stopped TCP Statsd server at %q", addr)
+	}()
+	s.wg.Add(1)
+	go func() {
+		defer s.wg.Done()
+		s.serveUDP(insertHandler)
+		logger.Infof("stopped UDP Statsd server at %q", addr)
+	}()
+	return s
+}
+
+// MustStop stops the server.
+func (s *Server) MustStop() {
+	logger.Infof("stopping TCP Statsd server at %q...", s.addr)
+	if err := s.lnTCP.Close(); err != nil {
+		logger.Errorf("cannot close TCP Statsd server: %s", err)
+	}
+	logger.Infof("stopping UDP Statsd server at %q...", s.addr)
+	if err := s.lnUDP.Close(); err != nil {
+		logger.Errorf("cannot close UDP Statsd server: %s", err)
+	}
+	s.cm.CloseAll(0)
+	s.wg.Wait()
+	logger.Infof("TCP and UDP Statsd servers at %q have been stopped", s.addr)
+}
+
+func (s *Server) serveTCP(insertHandler func(r io.Reader) error) {
+	var wg sync.WaitGroup
+	for {
+		c, err := s.lnTCP.Accept()
+		if err != nil {
+			var ne net.Error
+			if errors.As(err, &ne) {
+				if ne.Temporary() {
+					logger.Errorf("statsd: temporary error when listening for TCP addr %q: %s", s.lnTCP.Addr(), err)
+					time.Sleep(time.Second)
+					continue
+				}
+				if strings.Contains(err.Error(), "use of closed network connection") {
+					break
+				}
+				logger.Fatalf("unrecoverable error when accepting TCP Statsd connections: %s", err)
+			}
+			logger.Fatalf("unexpected error when accepting TCP Statsd connections: %s", err)
+		}
+		if !s.cm.Add(c) {
+			_ = c.Close()
+			break
+		}
+		wg.Add(1)
+		go func() {
+			defer func() {
+				s.cm.Delete(c)
+				_ = c.Close()
+				wg.Done()
+			}()
+			writeRequestsTCP.Inc()
+			if err := insertHandler(c); err != nil {
+				writeErrorsTCP.Inc()
+				logger.Errorf("error in TCP Statsd conn %q<->%q: %s", c.LocalAddr(), c.RemoteAddr(), err)
+			}
+		}()
+	}
+	wg.Wait()
+}
+
+func (s *Server) serveUDP(insertHandler func(r io.Reader) error) {
+	gomaxprocs := cgroup.AvailableCPUs()
+	var wg sync.WaitGroup
+	for i := 0; i < gomaxprocs; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			var bb bytesutil.ByteBuffer
+			bb.B = bytesutil.ResizeNoCopyNoOverallocate(bb.B, 64*1024)
+			for {
+				bb.Reset()
+				bb.B = bb.B[:cap(bb.B)]
+				n, addr, err := s.lnUDP.ReadFrom(bb.B)
+				if err != nil {
+					writeErrorsUDP.Inc()
+					var ne net.Error
+					if errors.As(err, &ne) {
+						if ne.Temporary() {
+							logger.Errorf("statsd: temporary error when listening for UDP addr %q: %s", s.lnUDP.LocalAddr(), err)
+							time.Sleep(time.Second)
+							continue
+						}
+						if strings.Contains(err.Error(), "use of closed network connection") {
+							break
+						}
+					}
+					logger.Errorf("cannot read Statsd UDP data: %s", err)
+					continue
+				}
+				bb.B = bb.B[:n]
+				writeRequestsUDP.Inc()
+				if err := insertHandler(bb.NewReader()); err != nil {
+					writeErrorsUDP.Inc()
+					logger.Errorf("error in UDP Statsd conn %q<->%q: %s", s.lnUDP.LocalAddr(), addr, err)
+					continue
+				}
+			}
+		}()
+	}
+	wg.Wait()
+}
diff --git a/lib/protoparser/statsd/parser.go b/lib/protoparser/statsd/parser.go
new file mode 100644
index 0000000000..3ae9800d6d
--- /dev/null
+++ b/lib/protoparser/statsd/parser.go
@@ -0,0 +1,226 @@
+package statsd
+
+import (
+	"fmt"
+	"strings"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
+	"github.com/VictoriaMetrics/metrics"
+	"github.com/valyala/fastjson/fastfloat"
+)
+
+// Statsd metric format with tags: MetricName:value|type|@sample_rate|#tag1:value,tag1...
+const statsdSeparator = '|'
+const statsdPairsSeparator = ':'
+const statsdTagsStartSeparator = '#'
+const statsdTagsSeparator = ','
+
+// Rows contains parsed statsd rows.
+type Rows struct {
+	Rows []Row
+
+	tagsPool []Tag
+}
+
+// Reset resets rs.
+func (rs *Rows) Reset() {
+	// Reset items, so they can be GC'ed
+
+	for i := range rs.Rows {
+		rs.Rows[i].reset()
+	}
+	rs.Rows = rs.Rows[:0]
+
+	for i := range rs.tagsPool {
+		rs.tagsPool[i].reset()
+	}
+	rs.tagsPool = rs.tagsPool[:0]
+}
+
+// Unmarshal unmarshals statsd plaintext protocol rows from s.
+//
+// s shouldn't be modified when rs is in use.
+func (rs *Rows) Unmarshal(s string) {
+	rs.Rows, rs.tagsPool = unmarshalRows(rs.Rows[:0], s, rs.tagsPool[:0])
+}
+
+// Row is a single statsd row.
+type Row struct {
+	Metric    string
+	Tags      []Tag
+	Value     float64
+	Timestamp int64
+}
+
+func (r *Row) reset() {
+	r.Metric = ""
+	r.Tags = nil
+	r.Value = 0
+	r.Timestamp = 0
+}
+
+func (r *Row) unmarshal(s string, tagsPool []Tag) ([]Tag, error) {
+	r.reset()
+	originalString := s
+	s = stripTrailingWhitespace(s)
+	separatorPosition := strings.IndexByte(s, statsdSeparator)
+	if separatorPosition < 0 {
+		s = stripTrailingWhitespace(s)
+	} else {
+		s = stripTrailingWhitespace(s[:separatorPosition])
+	}
+
+	valuesSeparatorPosition := strings.LastIndexByte(s, statsdPairsSeparator)
+
+	if valuesSeparatorPosition == 0 {
+		return tagsPool, fmt.Errorf("cannot find metric name for %q", s)
+	}
+
+	if valuesSeparatorPosition < 0 {
+		return tagsPool, fmt.Errorf("cannot find separator for %q", s)
+	}
+
+	r.Metric = s[:valuesSeparatorPosition]
+	valueStr := s[valuesSeparatorPosition+1:]
+
+	v, err := fastfloat.Parse(valueStr)
+	if err != nil {
+		return tagsPool, fmt.Errorf("cannot unmarshal value from %q: %w; original line: %q", valueStr, err, originalString)
+	}
+	r.Value = v
+
+	// parsing tags
+	tagsSeparatorPosition := strings.LastIndexByte(originalString, statsdTagsStartSeparator)
+
+	if tagsSeparatorPosition < 0 {
+		// no tags
+		return tagsPool, nil
+	}
+
+	tagsStart := len(tagsPool)
+	tagsPool = unmarshalTags(tagsPool, originalString[tagsSeparatorPosition+1:])
+	tags := tagsPool[tagsStart:]
+	r.Tags = tags[:len(tags):len(tags)]
+
+	return tagsPool, nil
+}
+
+func unmarshalRows(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
+	for len(s) > 0 {
+		n := strings.IndexByte(s, '\n')
+		if n < 0 {
+			// The last line.
+			return unmarshalRow(dst, s, tagsPool)
+		}
+		dst, tagsPool = unmarshalRow(dst, s[:n], tagsPool)
+		s = s[n+1:]
+	}
+	return dst, tagsPool
+}
+
+func unmarshalRow(dst []Row, s string, tagsPool []Tag) ([]Row, []Tag) {
+	if len(s) > 0 && s[len(s)-1] == '\r' {
+		s = s[:len(s)-1]
+	}
+	s = stripLeadingWhitespace(s)
+	if len(s) == 0 {
+		// Skip empty line
+		return dst, tagsPool
+	}
+	if cap(dst) > len(dst) {
+		dst = dst[:len(dst)+1]
+	} else {
+		dst = append(dst, Row{})
+	}
+	r := &dst[len(dst)-1]
+	var err error
+	tagsPool, err = r.unmarshal(s, tagsPool)
+	if err != nil {
+		dst = dst[:len(dst)-1]
+		logger.Errorf("cannot unmarshal Statsd line %q: %s", s, err)
+		invalidLines.Inc()
+	}
+	return dst, tagsPool
+}
+
+var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="statsd"}`)
+
+func unmarshalTags(dst []Tag, s string) []Tag {
+	for {
+		if cap(dst) > len(dst) {
+			dst = dst[:len(dst)+1]
+		} else {
+			dst = append(dst, Tag{})
+		}
+		tag := &dst[len(dst)-1]
+
+		n := strings.IndexByte(s, statsdTagsSeparator)
+
+		if n < 0 {
+			// The last tag found
+			tag.unmarshal(s)
+			if len(tag.Key) == 0 || len(tag.Value) == 0 {
+				// Skip empty tag
+				dst = dst[:len(dst)-1]
+			}
+			return dst
+		}
+		tag.unmarshal(s[:n])
+		s = s[n+1:]
+		if len(tag.Key) == 0 || len(tag.Value) == 0 {
+			// Skip empty tag
+			dst = dst[:len(dst)-1]
+		}
+	}
+}
+
+// Tag is a statsd tag.
+type Tag struct {
+	Key   string
+	Value string
+}
+
+func (t *Tag) reset() {
+	t.Key = ""
+	t.Value = ""
+}
+
+func (t *Tag) unmarshal(s string) {
+	t.reset()
+	n := strings.IndexByte(s, statsdPairsSeparator)
+	if n < 0 {
+		// Empty tag value.
+		// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1100
+		t.Key = s
+		t.Value = s[len(s):]
+	} else {
+		t.Key = s[:n]
+		t.Value = s[n+1:]
+	}
+}
+
+func stripTrailingWhitespace(s string) string {
+	n := len(s)
+	for {
+		n--
+		if n < 0 {
+			return ""
+		}
+		ch := s[n]
+
+		if ch != ' ' && ch != '\t' {
+			return s[:n+1]
+		}
+	}
+}
+
+func stripLeadingWhitespace(s string) string {
+	for len(s) > 0 {
+		ch := s[0]
+		if ch != ' ' && ch != '\t' {
+			return s
+		}
+		s = s[1:]
+	}
+	return ""
+}
diff --git a/lib/protoparser/statsd/parser_test.go b/lib/protoparser/statsd/parser_test.go
new file mode 100644
index 0000000000..aeb498bb75
--- /dev/null
+++ b/lib/protoparser/statsd/parser_test.go
@@ -0,0 +1,367 @@
+package statsd
+
+import (
+	"reflect"
+	"testing"
+)
+
+func TestUnmarshalTagsSuccess(t *testing.T) {
+	f := func(dst []Tag, s string, tagsPoolExpected []Tag) {
+		t.Helper()
+
+		tagsPool := unmarshalTags(dst, s)
+		if !reflect.DeepEqual(tagsPool, tagsPoolExpected) {
+			t.Fatalf("unexpected tags;\ngot\n%+v;\nwant\n%+v", tagsPool, tagsPoolExpected)
+		}
+
+		// Try unmarshaling again
+		tagsPool = unmarshalTags(dst, s)
+		if !reflect.DeepEqual(tagsPool, tagsPoolExpected) {
+			t.Fatalf("unexpected tags on second unmarshal;\ngot\n%+v;\nwant\n%+v", tagsPool, tagsPoolExpected)
+		}
+	}
+
+	f([]Tag{}, "foo:bar", []Tag{
+		{
+			Key:   "foo",
+			Value: "bar",
+		},
+	})
+
+	f([]Tag{}, "foo:bar,qwe:123", []Tag{
+		{
+			Key:   "foo",
+			Value: "bar",
+		},
+		{
+			Key:   "qwe",
+			Value: "123",
+		},
+	})
+
+	f([]Tag{}, "foo.qwe:bar", []Tag{
+		{
+			Key:   "foo.qwe",
+			Value: "bar",
+		},
+	})
+
+	f([]Tag{}, "foo:10", []Tag{
+		{
+			Key:   "foo",
+			Value: "10",
+		},
+	})
+
+	f([]Tag{}, "foo: _qwe", []Tag{
+		{
+			Key:   "foo",
+			Value: " _qwe",
+		},
+	})
+
+	f([]Tag{}, "foo:qwe    ", []Tag{
+		{
+			Key:   "foo",
+			Value: "qwe    ",
+		},
+	})
+
+	f([]Tag{}, "foo  asd:qwe    ", []Tag{
+		{
+			Key:   "foo  asd",
+			Value: "qwe    ",
+		},
+	})
+
+	f([]Tag{}, "foo:var:123", []Tag{
+		{
+			Key:   "foo",
+			Value: "var:123",
+		},
+	})
+
+	// invalid tags
+	f([]Tag{}, ":bar", []Tag{})
+	f([]Tag{}, "foo:", []Tag{})
+	f([]Tag{}, "   ", []Tag{})
+}
+
+func TestRowsUnmarshalSuccess(t *testing.T) {
+	f := func(s string, rowsExpected *Rows) {
+		t.Helper()
+		var rows Rows
+		rows.Unmarshal(s)
+		if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
+			t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
+		}
+
+		// Try unmarshaling again
+		rows.Unmarshal(s)
+		if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) {
+			t.Fatalf("unexpected rows on second unmarshal;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows)
+		}
+
+		rows.Reset()
+		if len(rows.Rows) != 0 {
+			t.Fatalf("non-empty rows after reset: %+v", rows.Rows)
+		}
+	}
+
+	// Empty line
+	f("", &Rows{})
+	f("\r", &Rows{})
+	f("\n\n", &Rows{})
+	f("\n\r\n", &Rows{})
+
+	// Single line
+	f(" 123:455", &Rows{
+		Rows: []Row{{
+			Metric: "123",
+			Value:  455,
+		}},
+	})
+	f("123:455 |c", &Rows{
+		Rows: []Row{{
+			Metric: "123",
+			Value:  455,
+		}},
+	})
+	f("foobar:-123.456|c", &Rows{
+		Rows: []Row{{
+			Metric: "foobar",
+			Value:  -123.456,
+		}},
+	})
+	f("foo.bar:123.456|c\n", &Rows{
+		Rows: []Row{{
+			Metric: "foo.bar",
+			Value:  123.456,
+		}},
+	})
+
+	// with sample rate
+	f("foo.bar:1|c|@0.1", &Rows{
+		Rows: []Row{{
+			Metric: "foo.bar",
+			Value:  1,
+		}},
+	})
+
+	// without specifying metric unit
+	f("foo.bar:123", &Rows{
+		Rows: []Row{{
+			Metric: "foo.bar",
+			Value:  123,
+		}},
+	})
+	// without specifying metric unit but with tags
+	f("foo.bar:123|#foo:bar", &Rows{
+		Rows: []Row{{
+			Metric: "foo.bar",
+			Value:  123,
+			Tags: []Tag{
+				{
+					Key:   "foo",
+					Value: "bar",
+				},
+			},
+		}},
+	})
+
+	f("foo.bar:123.456|c|#foo:bar,qwe:asd", &Rows{
+		Rows: []Row{{
+			Metric: "foo.bar",
+			Value:  123.456,
+			Tags: []Tag{
+				{
+					Key:   "foo",
+					Value: "bar",
+				},
+				{
+					Key:   "qwe",
+					Value: "asd",
+				},
+			},
+		}},
+	})
+
+	// Whitespace in metric name, tag name and tag value
+	f("s a:1|c|#ta g1:aaa1,tag2:bb b2", &Rows{
+		Rows: []Row{{
+			Metric: "s a",
+			Value:  1,
+			Tags: []Tag{
+				{
+					Key:   "ta g1",
+					Value: "aaa1",
+				},
+				{
+					Key:   "tag2",
+					Value: "bb b2",
+				},
+			},
+		}},
+	})
+
+	// Tags
+	f("foo:1|c", &Rows{
+		Rows: []Row{{
+			Metric: "foo",
+			Value:  1,
+		}},
+	})
+	// Empty tag name
+	f("foo:1|#:123", &Rows{
+		Rows: []Row{{
+			Metric: "foo",
+			Tags:   []Tag{},
+			Value:  1,
+		}},
+	})
+	// Empty tag value
+	f("foo:1|#tag1:", &Rows{
+		Rows: []Row{{
+			Metric: "foo",
+			Tags:   []Tag{},
+			Value:  1,
+		}},
+	})
+	f("foo:1|#bar:baz,aa:,x:y,:z", &Rows{
+		Rows: []Row{{
+			Metric: "foo",
+			Tags: []Tag{
+				{
+					Key:   "bar",
+					Value: "baz",
+				},
+				{
+					Key:   "x",
+					Value: "y",
+				},
+			},
+			Value: 1,
+		}},
+	})
+
+	// Multi lines
+	f("foo:0.3|c\naaa:3|g\nbar.baz:0.34|c\n", &Rows{
+		Rows: []Row{
+			{
+				Metric: "foo",
+				Value:  0.3,
+			},
+			{
+				Metric: "aaa",
+				Value:  3,
+			},
+			{
+				Metric: "bar.baz",
+				Value:  0.34,
+			},
+		},
+	})
+
+	f("foo:0.3|c|#tag1:1,tag2:2\naaa:3|g|#tag3:3,tag4:4", &Rows{
+		Rows: []Row{
+			{
+				Metric: "foo",
+				Value:  0.3,
+				Tags: []Tag{
+					{
+						Key:   "tag1",
+						Value: "1",
+					},
+					{
+						Key:   "tag2",
+						Value: "2",
+					},
+				},
+			},
+			{
+				Metric: "aaa",
+				Value:  3,
+				Tags: []Tag{
+					{
+						Key:   "tag3",
+						Value: "3",
+					},
+					{
+						Key:   "tag4",
+						Value: "4",
+					},
+				},
+			},
+		},
+	})
+
+	// Multi lines with invalid line
+	f("foo:0.3|c\naaa\nbar.baz:0.34\n", &Rows{
+		Rows: []Row{
+			{
+				Metric: "foo",
+				Value:  0.3,
+			},
+			{
+				Metric: "bar.baz",
+				Value:  0.34,
+			},
+		},
+	})
+
+	// Whitespace after at the end
+	f("foo.baz:125|c\na:1.34\t  ", &Rows{
+		Rows: []Row{
+			{
+				Metric: "foo.baz",
+				Value:  125,
+			},
+			{
+				Metric: "a",
+				Value:  1.34,
+			},
+		},
+	})
+
+	// ignores sample rate
+	f("foo.baz:125|c|@0.5#tag1:12", &Rows{
+		Rows: []Row{
+			{
+				Metric: "foo.baz",
+				Value:  125,
+				Tags: []Tag{
+					{
+						Key:   "tag1",
+						Value: "12",
+					},
+				},
+			},
+		},
+	})
+}
+
+func TestRowsUnmarshalFailure(t *testing.T) {
+	f := func(s string) {
+		t.Helper()
+		var rows Rows
+		rows.Unmarshal(s)
+		if len(rows.Rows) != 0 {
+			t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows))
+		}
+
+		// Try again
+		rows.Unmarshal(s)
+		if len(rows.Rows) != 0 {
+			t.Fatalf("unexpected number of rows parsed; got %d; want 0", len(rows.Rows))
+		}
+	}
+
+	// random string
+	f("aaa")
+
+	// empty value
+	f("foo:")
+
+	// empty metric name
+	f(":12")
+}
diff --git a/lib/protoparser/statsd/parser_timing_test.go b/lib/protoparser/statsd/parser_timing_test.go
new file mode 100644
index 0000000000..d9e3c64792
--- /dev/null
+++ b/lib/protoparser/statsd/parser_timing_test.go
@@ -0,0 +1,25 @@
+package statsd
+
+import (
+	"fmt"
+	"testing"
+)
+
+func BenchmarkRowsUnmarshal(b *testing.B) {
+	s := `cpu.usage_user:1.23|c
+cpu.usage_system:23.344|c
+cpu.usage_iowait:3.3443|c
+cpu.usage_irq:0.34432|c
+`
+	b.SetBytes(int64(len(s)))
+	b.ReportAllocs()
+	b.RunParallel(func(pb *testing.PB) {
+		var rows Rows
+		for pb.Next() {
+			rows.Unmarshal(s)
+			if len(rows.Rows) != 4 {
+				panic(fmt.Errorf("unexpected number of rows unmarshaled: got %d; want 4", len(rows.Rows)))
+			}
+		}
+	})
+}
diff --git a/lib/protoparser/statsd/stream/streamparser.go b/lib/protoparser/statsd/stream/streamparser.go
new file mode 100644
index 0000000000..27d9e40279
--- /dev/null
+++ b/lib/protoparser/statsd/stream/streamparser.go
@@ -0,0 +1,218 @@
+package stream
+
+import (
+	"bufio"
+	"flag"
+	"fmt"
+	"io"
+	"sync"
+	"time"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/writeconcurrencylimiter"
+	"github.com/VictoriaMetrics/metrics"
+)
+
+var (
+	trimTimestamp = flag.Duration("statsdTrimTimestamp", time.Second, "Trim timestamps for Statsd data to this duration. "+
+		"Minimum practical duration is 1s. Higher duration (i.e. 1m) may be used for reducing disk space usage for timestamp data")
+)
+
+// Parse parses Statsd lines from r and calls callback for the parsed rows.
+//
+// The callback can be called concurrently multiple times for streamed data from r.
+//
+// callback shouldn't hold rows after returning.
+func Parse(r io.Reader, isGzipped bool, callback func(rows []statsd.Row) error) error {
+	wcr := writeconcurrencylimiter.GetReader(r)
+	defer writeconcurrencylimiter.PutReader(wcr)
+	r = wcr
+
+	if isGzipped {
+		zr, err := common.GetGzipReader(r)
+		if err != nil {
+			return fmt.Errorf("cannot read gzipped statsd data: %w", err)
+		}
+		defer common.PutGzipReader(zr)
+		r = zr
+	}
+
+	ctx := getStreamContext(r)
+	defer putStreamContext(ctx)
+
+	for ctx.Read() {
+		uw := getUnmarshalWork()
+		uw.ctx = ctx
+		uw.callback = callback
+		uw.reqBuf, ctx.reqBuf = ctx.reqBuf, uw.reqBuf
+		ctx.wg.Add(1)
+		common.ScheduleUnmarshalWork(uw)
+		wcr.DecConcurrency()
+	}
+	ctx.wg.Wait()
+	if err := ctx.Error(); err != nil {
+		return err
+	}
+	return ctx.callbackErr
+}
+
+func (ctx *streamContext) Read() bool {
+	readCalls.Inc()
+	if ctx.err != nil || ctx.hasCallbackError() {
+		return false
+	}
+	ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlock(ctx.br, ctx.reqBuf, ctx.tailBuf)
+	if ctx.err != nil {
+		if ctx.err != io.EOF {
+			readErrors.Inc()
+			ctx.err = fmt.Errorf("cannot read statsd plaintext protocol data: %w", ctx.err)
+		}
+		return false
+	}
+	return true
+}
+
+type streamContext struct {
+	br      *bufio.Reader
+	reqBuf  []byte
+	tailBuf []byte
+	err     error
+
+	wg              sync.WaitGroup
+	callbackErrLock sync.Mutex
+	callbackErr     error
+}
+
+func (ctx *streamContext) Error() error {
+	if ctx.err == io.EOF {
+		return nil
+	}
+	return ctx.err
+}
+
+func (ctx *streamContext) hasCallbackError() bool {
+	ctx.callbackErrLock.Lock()
+	ok := ctx.callbackErr != nil
+	ctx.callbackErrLock.Unlock()
+	return ok
+}
+
+func (ctx *streamContext) reset() {
+	ctx.br.Reset(nil)
+	ctx.reqBuf = ctx.reqBuf[:0]
+	ctx.tailBuf = ctx.tailBuf[:0]
+	ctx.err = nil
+	ctx.callbackErr = nil
+}
+
+var (
+	readCalls  = metrics.NewCounter(`vm_protoparser_read_calls_total{type="statsd"}`)
+	readErrors = metrics.NewCounter(`vm_protoparser_read_errors_total{type="statsd"}`)
+	rowsRead   = metrics.NewCounter(`vm_protoparser_rows_read_total{type="statsd"}`)
+)
+
+func getStreamContext(r io.Reader) *streamContext {
+	select {
+	case ctx := <-streamContextPoolCh:
+		ctx.br.Reset(r)
+		return ctx
+	default:
+		if v := streamContextPool.Get(); v != nil {
+			ctx := v.(*streamContext)
+			ctx.br.Reset(r)
+			return ctx
+		}
+		return &streamContext{
+			br: bufio.NewReaderSize(r, 64*1024),
+		}
+	}
+}
+
+func putStreamContext(ctx *streamContext) {
+	ctx.reset()
+	select {
+	case streamContextPoolCh <- ctx:
+	default:
+		streamContextPool.Put(ctx)
+	}
+}
+
+var streamContextPool sync.Pool
+var streamContextPoolCh = make(chan *streamContext, cgroup.AvailableCPUs())
+
+type unmarshalWork struct {
+	rows     statsd.Rows
+	ctx      *streamContext
+	callback func(rows []statsd.Row) error
+	reqBuf   []byte
+}
+
+func (uw *unmarshalWork) reset() {
+	uw.rows.Reset()
+	uw.ctx = nil
+	uw.callback = nil
+	uw.reqBuf = uw.reqBuf[:0]
+}
+
+func (uw *unmarshalWork) runCallback(rows []statsd.Row) {
+	ctx := uw.ctx
+	if err := uw.callback(rows); err != nil {
+		ctx.callbackErrLock.Lock()
+		if ctx.callbackErr == nil {
+			ctx.callbackErr = fmt.Errorf("error when processing imported data: %w", err)
+		}
+		ctx.callbackErrLock.Unlock()
+	}
+	ctx.wg.Done()
+}
+
+// Unmarshal implements common.UnmarshalWork
+func (uw *unmarshalWork) Unmarshal() {
+	uw.rows.Unmarshal(bytesutil.ToUnsafeString(uw.reqBuf))
+	rows := uw.rows.Rows
+	rowsRead.Add(len(rows))
+
+	// Fill missing timestamps with the current timestamp rounded to seconds.
+	currentTimestamp := int64(fasttime.UnixTimestamp())
+	for i := range rows {
+		r := &rows[i]
+		if r.Timestamp == 0 || r.Timestamp == -1 {
+			r.Timestamp = currentTimestamp
+		}
+	}
+
+	// Convert timestamps from seconds to milliseconds.
+	for i := range rows {
+		rows[i].Timestamp *= 1e3
+	}
+
+	// Trim timestamps if required.
+	if tsTrim := trimTimestamp.Milliseconds(); tsTrim > 1000 {
+		for i := range rows {
+			row := &rows[i]
+			row.Timestamp -= row.Timestamp % tsTrim
+		}
+	}
+
+	uw.runCallback(rows)
+	putUnmarshalWork(uw)
+}
+
+func getUnmarshalWork() *unmarshalWork {
+	v := unmarshalWorkPool.Get()
+	if v == nil {
+		return &unmarshalWork{}
+	}
+	return v.(*unmarshalWork)
+}
+
+func putUnmarshalWork(uw *unmarshalWork) {
+	uw.reset()
+	unmarshalWorkPool.Put(uw)
+}
+
+var unmarshalWorkPool sync.Pool
diff --git a/lib/protoparser/statsd/stream/streamparser_test.go b/lib/protoparser/statsd/stream/streamparser_test.go
new file mode 100644
index 0000000000..8800dfd455
--- /dev/null
+++ b/lib/protoparser/statsd/stream/streamparser_test.go
@@ -0,0 +1,60 @@
+package stream
+
+import (
+	"reflect"
+	"strings"
+	"testing"
+
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
+	"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/statsd"
+)
+
+func Test_streamContext_Read(t *testing.T) {
+	f := func(s string, rowsExpected *statsd.Rows) {
+		t.Helper()
+		ctx := getStreamContext(strings.NewReader(s))
+		if !ctx.Read() {
+			t.Fatalf("expecting successful read")
+		}
+		uw := getUnmarshalWork()
+		callbackCalls := 0
+		uw.ctx = ctx
+		uw.callback = func(rows []statsd.Row) error {
+			callbackCalls++
+			if len(rows) != len(rowsExpected.Rows) {
+				t.Fatalf("different len of expected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
+			}
+			if !reflect.DeepEqual(rows, rowsExpected.Rows) {
+				t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows, rowsExpected.Rows)
+			}
+			return nil
+		}
+		uw.reqBuf = append(uw.reqBuf[:0], ctx.reqBuf...)
+		ctx.wg.Add(1)
+		uw.Unmarshal()
+		if callbackCalls != 1 {
+			t.Fatalf("unexpected number of callback calls; got %d; want 1", callbackCalls)
+		}
+	}
+
+	// Full line without tags
+	f("aaa:1123|c", &statsd.Rows{
+		Rows: []statsd.Row{{
+			Metric:    "aaa",
+			Value:     1123,
+			Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
+		}},
+	})
+	// Full line with tags
+	f("aaa:1123|c|#x:y", &statsd.Rows{
+		Rows: []statsd.Row{{
+			Metric: "aaa",
+			Tags: []statsd.Tag{{
+				Key:   "x",
+				Value: "y",
+			}},
+			Value:     1123,
+			Timestamp: int64(fasttime.UnixTimestamp()) * 1000,
+		}},
+	})
+}