diff --git a/README.md b/README.md index 616219e60a..eb9056a28c 100644 --- a/README.md +++ b/README.md @@ -135,6 +135,7 @@ with [the official Grafana dashboard for VictoriaMetrics cluster](https://grafan - `` may have the following values: - `prometheus` - for inserting data with [Prometheus remote write API](https://prometheus.io/docs/prometheus/latest/configuration/configuration/#remote_write) - `influx/write` or `influx/api/v2/write` - for inserting data with [Influx line protocol](https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/) + - `prometheus/api/v1/import` - for importing data obtained via `api/v1/export` on `vmselect` (see below). * URLs for querying: `http://:8481/select//prometheus/`, where: - `` is an arbitrary number identifying data namespace for the query (aka tenant) diff --git a/app/vminsert/common/lines_reader.go b/app/vminsert/common/lines_reader.go index 74973f1260..260cdc4eb5 100644 --- a/app/vminsert/common/lines_reader.go +++ b/app/vminsert/common/lines_reader.go @@ -20,6 +20,17 @@ const defaultBlockSize = 64 * 1024 // // Returns (dstBuf, tailBuf). func ReadLinesBlock(r io.Reader, dstBuf, tailBuf []byte) ([]byte, []byte, error) { + return ReadLinesBlockExt(r, dstBuf, tailBuf, maxLineSize) +} + +// ReadLinesBlockExt reads a block of lines delimited by '\n' from tailBuf and r into dstBuf. +// +// Trailing chars after the last newline are put into tailBuf. +// +// Returns (dstBuf, tailBuf). +// +// maxLineLen limits the maximum length of a single line. +func ReadLinesBlockExt(r io.Reader, dstBuf, tailBuf []byte, maxLineLen int) ([]byte, []byte, error) { if cap(dstBuf) < defaultBlockSize { dstBuf = bytesutil.Resize(dstBuf, defaultBlockSize) } @@ -48,8 +59,8 @@ again: nn := bytes.LastIndexByte(dstBuf[len(dstBuf)-n:], '\n') if nn < 0 { // Didn't found at least a single line. - if len(dstBuf) > maxLineSize { - return dstBuf, tailBuf, fmt.Errorf("too long line: more than %d bytes", maxLineSize) + if len(dstBuf) > maxLineLen { + return dstBuf, tailBuf, fmt.Errorf("too long line: more than %d bytes", maxLineLen) } if cap(dstBuf) < 2*len(dstBuf) { // Increase dsbBuf capacity, so more data could be read into it. diff --git a/app/vminsert/main.go b/app/vminsert/main.go index d412911e38..22998068b4 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdb" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/opentsdbhttp" "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/vmimport" "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" @@ -120,6 +121,14 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool { } w.WriteHeader(http.StatusNoContent) return true + case "prometheus/api/v1/import": + vmimportRequests.Inc() + if err := vmimport.InsertHandler(at, r); err != nil { + vmimportErrors.Inc() + httpserver.Errorf(w, "error in %q: %s", r.URL.Path, err) + return true + } + return true case "influx/write", "influx/api/v2/write": influxWriteRequests.Inc() if err := influx.InsertHandler(at, r); err != nil { @@ -145,6 +154,9 @@ var ( prometheusWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/", protocol="prometheus"}`) prometheusWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/", protocol="prometheus"}`) + vmimportRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/prometheus/api/v1/import", protocol="vm"}`) + vmimportErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/prometheus/api/v1/import", protocol="vm"}`) + influxWriteRequests = metrics.NewCounter(`vm_http_requests_total{path="/insert/{}/influx/", protocol="influx"}`) influxWriteErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/insert/{}/influx/", protocol="influx"}`) diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index 5b89f01d0c..b618291e25 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -73,6 +73,26 @@ func (ctx *InsertCtx) Reset() { } } +// AddLabelBytes adds (name, value) label to ctx.Labels. +// +// name and value must exist until ctx.Labels is used. +func (ctx *InsertCtx) AddLabelBytes(name, value []byte) { + labels := ctx.Labels + if cap(labels) > len(labels) { + labels = labels[:len(labels)+1] + } else { + labels = append(labels, prompb.Label{}) + } + label := &labels[len(labels)-1] + + // Do not copy name and value contents for performance reasons. + // This reduces GC overhead on the number of objects and allocations. + label.Name = name + label.Value = value + + ctx.Labels = labels +} + // AddLabel adds (name, value) label to ctx.Labels. // // name and value must exist until ctx.Labels is used. diff --git a/app/vminsert/vmimport/parser.go b/app/vminsert/vmimport/parser.go new file mode 100644 index 0000000000..785b0845db --- /dev/null +++ b/app/vminsert/vmimport/parser.go @@ -0,0 +1,202 @@ +package vmimport + +import ( + "fmt" + "strings" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/metrics" + "github.com/valyala/fastjson" +) + +// Rows contains parsed rows from `/api/v1/import` request. +type Rows struct { + Rows []Row + + tu tagsUnmarshaler +} + +// Reset resets rs. +func (rs *Rows) Reset() { + for i := range rs.Rows { + rs.Rows[i].reset() + } + rs.Rows = rs.Rows[:0] + + rs.tu.reset() +} + +// Unmarshal unmarshals influx line protocol rows from s. +// +// See https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/ +// +// s must be unchanged until rs is in use. +func (rs *Rows) Unmarshal(s string) { + rs.tu.reset() + rs.Rows = unmarshalRows(rs.Rows[:0], s, &rs.tu) +} + +// Row is a single row from `/api/v1/import` request. +type Row struct { + Tags []Tag + Values []float64 + Timestamps []int64 +} + +func (r *Row) reset() { + r.Tags = nil + r.Values = r.Values[:0] + r.Timestamps = r.Timestamps[:0] +} + +func (r *Row) unmarshal(s string, tu *tagsUnmarshaler) error { + r.reset() + v, err := tu.p.Parse(s) + if err != nil { + return fmt.Errorf("cannot parse json line: %s", err) + } + + // Unmarshal tags + metric := v.GetObject("metric") + if metric == nil { + return fmt.Errorf("missing `metric` object") + } + tagsStart := len(tu.tagsPool) + if err := tu.unmarshalTags(metric); err != nil { + return fmt.Errorf("cannot unmarshal `metric`: %s", err) + } + tags := tu.tagsPool[tagsStart:] + r.Tags = tags[:len(tags):len(tags)] + if len(r.Tags) == 0 { + return fmt.Errorf("missing tags") + } + + // Unmarshal values + values := v.GetArray("values") + if len(values) == 0 { + return fmt.Errorf("missing `values` array") + } + for i, v := range values { + f, err := v.Float64() + if err != nil { + return fmt.Errorf("cannot unmarshal value at position %d: %s", i, err) + } + r.Values = append(r.Values, f) + } + + // Unmarshal timestamps + timestamps := v.GetArray("timestamps") + if len(timestamps) == 0 { + return fmt.Errorf("missing `timestamps` array") + } + for i, v := range timestamps { + ts, err := v.Int64() + if err != nil { + return fmt.Errorf("cannot unmarshal timestamp at position %d: %s", i, err) + } + r.Timestamps = append(r.Timestamps, ts) + } + + if len(r.Timestamps) != len(r.Values) { + return fmt.Errorf("`timestamps` array size must match `values` array size; got %d; want %d", len(r.Timestamps), len(r.Values)) + } + return nil +} + +// Tag represents `/api/v1/import` tag. +type Tag struct { + Key []byte + Value []byte +} + +func (tag *Tag) reset() { + // tag.Key and tag.Value point to tu.bytesPool, so there is no need in keeping these byte slices here. + tag.Key = nil + tag.Value = nil +} + +type tagsUnmarshaler struct { + p fastjson.Parser + tagsPool []Tag + bytesPool []byte + err error +} + +func (tu *tagsUnmarshaler) reset() { + for i := range tu.tagsPool { + tu.tagsPool[i].reset() + } + tu.tagsPool = tu.tagsPool[:0] + + tu.bytesPool = tu.bytesPool[:0] + tu.err = nil +} + +func (tu *tagsUnmarshaler) addTag() *Tag { + dst := tu.tagsPool + if cap(dst) > len(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, Tag{}) + } + tag := &dst[len(dst)-1] + tu.tagsPool = dst + return tag +} + +func (tu *tagsUnmarshaler) addBytes(b []byte) []byte { + bytesPoolLen := len(tu.bytesPool) + tu.bytesPool = append(tu.bytesPool, b...) + bCopy := tu.bytesPool[bytesPoolLen:] + return bCopy[:len(bCopy):len(bCopy)] +} + +func (tu *tagsUnmarshaler) unmarshalTags(o *fastjson.Object) error { + tu.err = nil + o.Visit(func(key []byte, v *fastjson.Value) { + tag := tu.addTag() + tag.Key = tu.addBytes(key) + sb, err := v.StringBytes() + if err != nil && tu.err != nil { + tu.err = fmt.Errorf("cannot parse value for tag %q: %s", tag.Key, err) + } + tag.Value = tu.addBytes(sb) + }) + return tu.err +} + +func unmarshalRows(dst []Row, s string, tu *tagsUnmarshaler) []Row { + for len(s) > 0 { + n := strings.IndexByte(s, '\n') + if n < 0 { + // The last line. + return unmarshalRow(dst, s, tu) + } + dst = unmarshalRow(dst, s[:n], tu) + s = s[n+1:] + } + return dst +} + +func unmarshalRow(dst []Row, s string, tu *tagsUnmarshaler) []Row { + if len(s) > 0 && s[len(s)-1] == '\r' { + s = s[:len(s)-1] + } + if len(s) == 0 { + return dst + } + if cap(dst) > len(dst) { + dst = dst[:len(dst)+1] + } else { + dst = append(dst, Row{}) + } + r := &dst[len(dst)-1] + if err := r.unmarshal(s, tu); err != nil { + dst = dst[:len(dst)-1] + logger.Errorf("cannot unmarshal json line %q: %s; skipping it", s, err) + invalidLines.Inc() + } + return dst +} + +var invalidLines = metrics.NewCounter(`vm_rows_invalid_total{type="vmimport"}`) diff --git a/app/vminsert/vmimport/parser_test.go b/app/vminsert/vmimport/parser_test.go new file mode 100644 index 0000000000..7ed445af14 --- /dev/null +++ b/app/vminsert/vmimport/parser_test.go @@ -0,0 +1,216 @@ +package vmimport + +import ( + "reflect" + "testing" +) + +func TestRowsUnmarshalFailure(t *testing.T) { + f := func(s string) { + t.Helper() + var rows Rows + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows)) + } + + // Try again + rows.Unmarshal(s) + if len(rows.Rows) != 0 { + t.Fatalf("expecting zero rows; got %d rows", len(rows.Rows)) + } + } + + // Invalid json line + f("") + f("\n") + f("foo\n") + f("123") + f("[1,3]") + f("{}") + f("[]") + f(`{"foo":"bar"}`) + + // Invalid metric + f(`{"metric":123,"values":[1,2],"timestamps":[3,4]}`) + f(`{"metric":[123],"values":[1,2],"timestamps":[3,4]}`) + f(`{"metric":[],"values":[1,2],"timestamps":[3,4]}`) + f(`{"metric":{},"values":[1,2],"timestamps":[3,4]}`) + f(`{"metric":null,"values":[1,2],"timestamps":[3,4]}`) + f(`{"values":[1,2],"timestamps":[3,4]}`) + + // Invalid values + f(`{"metric":{"foo":"bar"},"values":1,"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"values":{"x":1},"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"values":{"x":1},"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"values":null,"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"timestamps":[3,4]}`) + + // Invalid timestamps + f(`{"metric":{"foo":"bar"},"values":[1,2],"timestamps":3}`) + f(`{"metric":{"foo":"bar"},"values":[1,2],"timestamps":false}`) + f(`{"metric":{"foo":"bar"},"values":[1,2],"timestamps":{}}`) + f(`{"metric":{"foo":"bar"},"values":[1,2]}`) + + // values and timestamps count mismatch + f(`{"metric":{"foo":"bar"},"values":[],"timestamps":[]}`) + f(`{"metric":{"foo":"bar"},"values":[],"timestamps":[1]}`) + f(`{"metric":{"foo":"bar"},"values":[2],"timestamps":[]}`) + f(`{"metric":{"foo":"bar"},"values":[2],"timestamps":[3,4]}`) + f(`{"metric":{"foo":"bar"},"values":[2,3],"timestamps":[4]}`) + + // Garbage after the line + f(`{"metric":{"foo":"bar"},"values":[2],"timestamps":[4]}{}`) +} + +func TestRowsUnmarshalSuccess(t *testing.T) { + f := func(s string, rowsExpected *Rows) { + t.Helper() + var rows Rows + rows.Unmarshal(s) + if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { + t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) + } + + // Try unmarshaling again + rows.Unmarshal(s) + if !reflect.DeepEqual(rows.Rows, rowsExpected.Rows) { + t.Fatalf("unexpected rows;\ngot\n%+v;\nwant\n%+v", rows.Rows, rowsExpected.Rows) + } + + rows.Reset() + if len(rows.Rows) != 0 { + t.Fatalf("non-empty rows after reset: %+v", rows.Rows) + } + } + + // Empty line + f("", &Rows{}) + f("\n\n", &Rows{}) + f("\n\r\n", &Rows{}) + + // Single line with a single tag + f(`{"metric":{"foo":"bar"},"values":[1.23],"timestamps":[456]}`, &Rows{ + Rows: []Row{{ + Tags: []Tag{{ + Key: []byte("foo"), + Value: []byte("bar"), + }}, + Values: []float64{1.23}, + Timestamps: []int64{456}, + }}, + }) + + // Line with multiple tags + f(`{"metric":{"foo":"bar","baz":"xx"},"values":[1.23, -3.21],"timestamps" : [456,789]}`, &Rows{ + Rows: []Row{{ + Tags: []Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("baz"), + Value: []byte("xx"), + }, + }, + Values: []float64{1.23, -3.21}, + Timestamps: []int64{456, 789}, + }}, + }) + + // Multiple lines + f(`{"metric":{"foo":"bar","baz":"xx"},"values":[1.23, -3.21],"timestamps" : [456,789]} +{"metric":{"__name__":"xx"},"values":[34],"timestamps" : [11]} +`, &Rows{ + Rows: []Row{ + { + Tags: []Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("baz"), + Value: []byte("xx"), + }, + }, + Values: []float64{1.23, -3.21}, + Timestamps: []int64{456, 789}, + }, + { + Tags: []Tag{ + { + Key: []byte("__name__"), + Value: []byte("xx"), + }, + }, + Values: []float64{34}, + Timestamps: []int64{11}, + }, + }, + }) + + // Multiple lines with invalid line in the middle. + f(`{"metric":{"xfoo":"bar","baz":"xx"},"values":[1.232, -3.21],"timestamps" : [456,7890]} +garbage here +{"metric":{"__name__":"xxy"},"values":[34],"timestamps" : [111]}`, &Rows{ + Rows: []Row{ + { + Tags: []Tag{ + { + Key: []byte("xfoo"), + Value: []byte("bar"), + }, + { + Key: []byte("baz"), + Value: []byte("xx"), + }, + }, + Values: []float64{1.232, -3.21}, + Timestamps: []int64{456, 7890}, + }, + { + Tags: []Tag{ + { + Key: []byte("__name__"), + Value: []byte("xxy"), + }, + }, + Values: []float64{34}, + Timestamps: []int64{111}, + }, + }, + }) + + // No newline after the second line. + f(`{"metric":{"foo":"bar","baz":"xx"},"values":[1.23, -3.21],"timestamps" : [456,789]} +{"metric":{"__name__":"xx"},"values":[34],"timestamps" : [11]}`, &Rows{ + Rows: []Row{ + { + Tags: []Tag{ + { + Key: []byte("foo"), + Value: []byte("bar"), + }, + { + Key: []byte("baz"), + Value: []byte("xx"), + }, + }, + Values: []float64{1.23, -3.21}, + Timestamps: []int64{456, 789}, + }, + { + Tags: []Tag{ + { + Key: []byte("__name__"), + Value: []byte("xx"), + }, + }, + Values: []float64{34}, + Timestamps: []int64{11}, + }, + }, + }) +} diff --git a/app/vminsert/vmimport/parser_timing_test.go b/app/vminsert/vmimport/parser_timing_test.go new file mode 100644 index 0000000000..e512a3a1d3 --- /dev/null +++ b/app/vminsert/vmimport/parser_timing_test.go @@ -0,0 +1,25 @@ +package vmimport + +import ( + "fmt" + "testing" +) + +func BenchmarkRowsUnmarshal(b *testing.B) { + s := `{"metric":{"__name__":"up","job":"node_exporter","instance":"localhost:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]} +{"metric":{"__name__":"up","job":"prometheus","instance":"localhost:9090"},"values":[1,1,1],"timestamps":[1549891461511,1549891476511,1549891491511]} +{"metric":{"__name__":"up","job":"node_exporter","instance":"foobar.com:9100"},"values":[0,0,0],"timestamps":[1549891472010,1549891487724,1549891503438]} +{"metric":{"__name__":"up","job":"prometheus","instance":"xxx.yyy.zzz:9090"},"values":[1,1,1],"timestamps":[1549891461511,1549891476511,1549891491511]} +` + b.SetBytes(int64(len(s))) + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + var rows Rows + for pb.Next() { + rows.Unmarshal(s) + if len(rows.Rows) != 4 { + panic(fmt.Errorf("unexpected number of rows parsed; got %d; want 4", len(rows.Rows))) + } + } + }) +} diff --git a/app/vminsert/vmimport/request_handler.go b/app/vminsert/vmimport/request_handler.go new file mode 100644 index 0000000000..ba8ed40dc7 --- /dev/null +++ b/app/vminsert/vmimport/request_handler.go @@ -0,0 +1,159 @@ +package vmimport + +import ( + "flag" + "fmt" + "io" + "net/http" + "runtime" + "sync" + + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/common" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/concurrencylimiter" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vminsert/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/auth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/tenantmetrics" + "github.com/VictoriaMetrics/metrics" +) + +var maxLineLen = flag.Int("import.maxLineLen", 100*1024*1024, "The maximum length in bytes of a single line accepted by `/api/v1/import`") + +var ( + rowsInserted = tenantmetrics.NewCounterMap(`vm_rows_inserted_total{type="vmimport"}`) + rowsPerInsert = metrics.NewSummary(`vm_rows_per_insert{type="vmimport"}`) +) + +// InsertHandler processes `/api/v1/import` request. +// +// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6 +func InsertHandler(at *auth.Token, req *http.Request) error { + return concurrencylimiter.Do(func() error { + return insertHandlerInternal(at, req) + }) +} + +func insertHandlerInternal(at *auth.Token, req *http.Request) error { + readCalls.Inc() + + r := req.Body + if req.Header.Get("Content-Encoding") == "gzip" { + zr, err := common.GetGzipReader(r) + if err != nil { + return fmt.Errorf("cannot read gzipped vmimport data: %s", err) + } + defer common.PutGzipReader(zr) + r = zr + } + + ctx := getPushCtx() + defer putPushCtx(ctx) + for ctx.Read(r) { + if err := ctx.InsertRows(at); err != nil { + return err + } + } + return ctx.Error() +} + +func (ctx *pushCtx) InsertRows(at *auth.Token) error { + rows := ctx.Rows.Rows + ic := &ctx.Common + ic.Reset() + rowsTotal := 0 + for i := range rows { + r := &rows[i] + ic.Labels = ic.Labels[:0] + for j := range r.Tags { + tag := &r.Tags[j] + ic.AddLabelBytes(tag.Key, tag.Value) + } + ic.MetricNameBuf = storage.MarshalMetricNameRaw(ic.MetricNameBuf[:0], at.AccountID, at.ProjectID, ic.Labels) + storageNodeIdx := ic.GetStorageNodeIdx(at, ic.Labels) + values := r.Values + timestamps := r.Timestamps + _ = timestamps[len(values)-1] + for j, value := range values { + timestamp := timestamps[j] + if err := ic.WriteDataPointExt(at, storageNodeIdx, ic.MetricNameBuf, timestamp, value); err != nil { + return err + } + } + rowsTotal += len(values) + } + rowsInserted.Get(at).Add(rowsTotal) + rowsPerInsert.Update(float64(rowsTotal)) + return ic.FlushBufs() +} + +func (ctx *pushCtx) Read(r io.Reader) bool { + if ctx.err != nil { + return false + } + ctx.reqBuf, ctx.tailBuf, ctx.err = common.ReadLinesBlockExt(r, ctx.reqBuf, ctx.tailBuf, *maxLineLen) + if ctx.err != nil { + if ctx.err != io.EOF { + readErrors.Inc() + ctx.err = fmt.Errorf("cannot read vmimport data: %s", ctx.err) + } + return false + } + ctx.Rows.Unmarshal(bytesutil.ToUnsafeString(ctx.reqBuf)) + return true +} + +var ( + readCalls = metrics.NewCounter(`vm_read_calls_total{name="vmimport"}`) + readErrors = metrics.NewCounter(`vm_read_errors_total{name="vmimport"}`) +) + +type pushCtx struct { + Rows Rows + Common netstorage.InsertCtx + + reqBuf []byte + tailBuf []byte + + err error +} + +func (ctx *pushCtx) Error() error { + if ctx.err == io.EOF { + return nil + } + return ctx.err +} + +func (ctx *pushCtx) reset() { + ctx.Rows.Reset() + ctx.Common.Reset() + ctx.reqBuf = ctx.reqBuf[:0] + ctx.tailBuf = ctx.tailBuf[:0] + + ctx.err = nil +} + +func getPushCtx() *pushCtx { + select { + case ctx := <-pushCtxPoolCh: + return ctx + default: + if v := pushCtxPool.Get(); v != nil { + return v.(*pushCtx) + } + return &pushCtx{} + } +} + +func putPushCtx(ctx *pushCtx) { + ctx.reset() + select { + case pushCtxPoolCh <- ctx: + default: + pushCtxPool.Put(ctx) + } +} + +var pushCtxPool sync.Pool +var pushCtxPoolCh = make(chan *pushCtx, runtime.GOMAXPROCS(-1))