diff --git a/app/vmselect/promql/timeseries.go b/app/vmselect/promql/timeseries.go index 0b1ac66dc..b13b27327 100644 --- a/app/vmselect/promql/timeseries.go +++ b/app/vmselect/promql/timeseries.go @@ -82,15 +82,17 @@ func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int6 logger.Panicf("BUG: tss cannot be empty") } - // Calculate the required size for marshaled tss. - size := 0 - for _, ts := range tss { - size += ts.marshaledFastSizeNoTimestamps() - } - // timestamps are stored only once for all the tss, since they are identical. + // timestamps are stored only once for all the tss, since they must be identical assertIdenticalTimestamps(tss, step) - size += 8 * len(tss[0].Timestamps) + timestamps := tss[0].Timestamps + // Calculate the required size for marshaled tss. + size := 8 + 8 // 8 bytes for len(tss) and 8 bytes for len(timestamps) + size += 8 * len(timestamps) // encoded timestamps + size += 8 * len(tss) * len(timestamps) // encoded values + for _, ts := range tss { + size += marshaledFastMetricNameSize(&ts.MetricName) + } if size > maxSize { // Do not marshal tss, since it would occupy too much space return dst @@ -98,181 +100,147 @@ func marshalTimeseriesFast(dst []byte, tss []*timeseries, maxSize int, step int6 // Allocate the buffer for the marshaled tss before its' marshaling. // This should reduce memory fragmentation and memory usage. - dst = bytesutil.ResizeNoCopyMayOverallocate(dst, size) - dst = marshalFastTimestamps(dst[:0], tss[0].Timestamps) + dstLen := len(dst) + dst = bytesutil.ResizeWithCopyMayOverallocate(dst, size+dstLen) + dst = dst[:dstLen] + + // Marshal timestamps and values at first, so they are 8-byte aligned. + // This prevents from SIGBUS error on arm architectures. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3927 + dst = encoding.MarshalUint64(dst, uint64(len(tss))) + dst = encoding.MarshalUint64(dst, uint64(len(timestamps))) + dst = marshalTimestampsFast(dst, timestamps) for _, ts := range tss { - dst = ts.marshalFastNoTimestamps(dst) + dst = marshalValuesFast(dst, ts.Values) + } + for _, ts := range tss { + dst = marshalMetricNameFast(dst, &ts.MetricName) } return dst } // unmarshalTimeseriesFast unmarshals timeseries from src. // -// The returned timeseries refer to src, so it is unsafe to modify it -// until timeseries are in use. +// The returned timeseries refer to src, so it is unsafe to modify it while timeseries are in use. func unmarshalTimeseriesFast(src []byte) ([]*timeseries, error) { - tail, timestamps, err := unmarshalFastTimestamps(src) + if len(src) < 16 { + return nil, fmt.Errorf("cannot unmarshal timeseries from %d bytes; need at least 16 bytes", len(src)) + } + tssLen := encoding.UnmarshalUint64(src) + timestampsLen := encoding.UnmarshalUint64(src[8:]) + src = src[16:] + + // Unmarshal timestamps + tail, timestamps, err := unmarshalTimestampsFast(src, timestampsLen) if err != nil { return nil, err } src = tail - var tss []*timeseries - for len(src) > 0 { + tss := make([]*timeseries, tssLen) + for i := range tss { var ts timeseries - ts.denyReuse = false + ts.denyReuse = true ts.Timestamps = timestamps + tss[i] = &ts + } - tail, err := ts.unmarshalFastNoTimestamps(src) + // Unmarshal values + for _, ts := range tss { + tail, values, err := unmarshalValuesFast(src, timestampsLen) + if err != nil { + return nil, err + } + ts.Values = values + src = tail + } + + // Unmarshal metric names for the time series + for _, ts := range tss { + tail, err := unmarshalMetricNameFast(&ts.MetricName, src) if err != nil { return nil, err } src = tail + } - tss = append(tss, &ts) + if len(src) > 0 { + return nil, fmt.Errorf("unexpected non-empty tail left after unmarshaling %d timeseries; len(tail)=%d", len(tss), len(src)) } return tss, nil } -// marshaledFastSizeNoTimestamps returns the size of marshaled ts -// returned from marshalFastNoTimestamps. -func (ts *timeseries) marshaledFastSizeNoTimestamps() int { - mn := &ts.MetricName - n := 2 + len(mn.MetricGroup) +// marshaledFastMetricNameSize returns the size of marshaled mn returned from marshalMetricNameFast. +func marshaledFastMetricNameSize(mn *storage.MetricName) int { + n := 8 // AccountID, ProjectID + n += 2 + len(mn.MetricGroup) n += 2 // Length of tags. for i := range mn.Tags { tag := &mn.Tags[i] n += 2 + len(tag.Key) n += 2 + len(tag.Value) } - n += 8 * len(ts.Values) return n } -// marshalFastNoTimestamps appends marshaled ts to dst and returns the result. -// -// It doesn't marshal timestamps. -// -// The result must be unmarshaled with unmarshalFastNoTimestamps. -func (ts *timeseries) marshalFastNoTimestamps(dst []byte) []byte { - mn := &ts.MetricName - dst = marshalBytesFast(dst, mn.MetricGroup) - dst = encoding.MarshalUint16(dst, uint16(len(mn.Tags))) - // There is no need in tags' sorting - they must be sorted after unmarshaling. - for i := range mn.Tags { - tag := &mn.Tags[i] - dst = marshalBytesFast(dst, tag.Key) - dst = marshalBytesFast(dst, tag.Value) - } - - // Do not marshal len(ts.Values), since it is already encoded as len(ts.Timestamps) - // during marshalFastTimestamps. - var valuesBuf []byte - if len(ts.Values) > 0 { - valuesBuf = float64ToByteSlice(ts.Values) - } +func marshalValuesFast(dst []byte, values []float64) []byte { + // Do not marshal len(values), since it is already encoded as len(timestamps) at marshalTimestampsFast. + valuesBuf := float64ToByteSlice(values) dst = append(dst, valuesBuf...) return dst } -func marshalFastTimestamps(dst []byte, timestamps []int64) []byte { - dst = encoding.MarshalUint32(dst, uint32(len(timestamps))) - var timestampsBuf []byte - if len(timestamps) > 0 { - timestampsBuf = int64ToByteSlice(timestamps) +// it is unsafe modifying src while the returned values is in use. +func unmarshalValuesFast(src []byte, valuesLen uint64) ([]byte, []float64, error) { + bufSize := valuesLen * 8 + if uint64(len(src)) < bufSize { + return src, nil, fmt.Errorf("cannot unmarshal values; got %d ytes; want at least %d bytes", uint64(len(src)), bufSize) } + values := byteSliceToFloat64(src[:bufSize]) + return src[bufSize:], values, nil +} + +func marshalTimestampsFast(dst []byte, timestamps []int64) []byte { + timestampsBuf := int64ToByteSlice(timestamps) dst = append(dst, timestampsBuf...) return dst } // it is unsafe modifying src while the returned timestamps is in use. -func unmarshalFastTimestamps(src []byte) ([]byte, []int64, error) { - if len(src) < 4 { - return src, nil, fmt.Errorf("cannot decode len(timestamps); got %d bytes; want at least %d bytes", len(src), 4) - } - timestampsCount := int(encoding.UnmarshalUint32(src)) - src = src[4:] - if timestampsCount == 0 { - return src, nil, nil - } - - bufSize := timestampsCount * 8 - if len(src) < bufSize { +func unmarshalTimestampsFast(src []byte, timestampsLen uint64) ([]byte, []int64, error) { + bufSize := timestampsLen * 8 + if uint64(len(src)) < bufSize { return src, nil, fmt.Errorf("cannot unmarshal timestamps; got %d bytes; want at least %d bytes", len(src), bufSize) } timestamps := byteSliceToInt64(src[:bufSize]) - src = src[bufSize:] - - return src, timestamps, nil + return src[bufSize:], timestamps, nil } -// unmarshalFastNoTimestamps unmarshals ts from src, so ts members reference src. +// marshalMetricNameFast appends marshaled mn to dst and returns the result. // -// It is expected that ts.Timestamps is already unmarshaled. -// -// It is unsafe to modify src while ts is in use. -func (ts *timeseries) unmarshalFastNoTimestamps(src []byte) ([]byte, error) { - // ts members point to src, so they cannot be re-used. - ts.denyReuse = true - - tail, err := unmarshalMetricNameFast(&ts.MetricName, src) - if err != nil { - return tail, fmt.Errorf("cannot unmarshal MetricName: %w", err) - } - src = tail - - valuesCount := len(ts.Timestamps) - if valuesCount == 0 { - return src, nil - } - bufSize := valuesCount * 8 - if len(src) < bufSize { - return src, fmt.Errorf("cannot unmarshal values; got %d bytes; want at least %d bytes", len(src), bufSize) - } - ts.Values = byteSliceToFloat64(src[:bufSize]) - - return src[bufSize:], nil +// The result must be unmarshaled with unmarshalMetricNameFast. +func marshalMetricNameFast(dst []byte, mn *storage.MetricName) []byte { + dst = encoding.MarshalUint32(dst, mn.AccountID) + dst = encoding.MarshalUint32(dst, mn.ProjectID) + dst = marshalBytesFast(dst, mn.MetricGroup) + dst = encoding.MarshalUint16(dst, uint16(len(mn.Tags))) + // There is no need in tags' sorting - they must be sorted after unmarshaling. + return marshalMetricTagsFast(dst, mn.Tags) } -func float64ToByteSlice(a []float64) (b []byte) { - sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - sh.Data = uintptr(unsafe.Pointer(&a[0])) - sh.Len = len(a) * int(unsafe.Sizeof(a[0])) - sh.Cap = sh.Len - return -} - -func int64ToByteSlice(a []int64) (b []byte) { - sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) - sh.Data = uintptr(unsafe.Pointer(&a[0])) - sh.Len = len(a) * int(unsafe.Sizeof(a[0])) - sh.Cap = sh.Len - return -} - -func byteSliceToInt64(b []byte) (a []int64) { - sh := (*reflect.SliceHeader)(unsafe.Pointer(&a)) - sh.Data = uintptr(unsafe.Pointer(&b[0])) - sh.Len = len(b) / int(unsafe.Sizeof(a[0])) - sh.Cap = sh.Len - return -} - -func byteSliceToFloat64(b []byte) (a []float64) { - sh := (*reflect.SliceHeader)(unsafe.Pointer(&a)) - sh.Data = uintptr(unsafe.Pointer(&b[0])) - sh.Len = len(b) / int(unsafe.Sizeof(a[0])) - sh.Cap = sh.Len - return -} - -// unmarshalMetricNameFast unmarshals mn from src, so mn members -// hold references to src. +// unmarshalMetricNameFast unmarshals mn from src, so mn members hold references to src. // // It is unsafe modifying src while mn is in use. func unmarshalMetricNameFast(mn *storage.MetricName, src []byte) ([]byte, error) { mn.Reset() + if len(src) < 8 { + return src, fmt.Errorf("cannot unmarshal AccountID, ProjectID from %d bytes; need at least 8 bytes", len(src)) + } + mn.AccountID = encoding.UnmarshalUint32(src) + mn.ProjectID = encoding.UnmarshalUint32(src[4:]) + src = src[8:] + tail, metricGroup, err := unmarshalBytesFast(src) if err != nil { return tail, fmt.Errorf("cannot unmarshal MetricGroup: %w", err) @@ -321,9 +289,7 @@ func marshalMetricTagsFast(dst []byte, tags []storage.Tag) []byte { func marshalMetricNameSorted(dst []byte, mn *storage.MetricName) []byte { // Do not marshal AccountID and ProjectID, since they are unused. dst = marshalBytesFast(dst, mn.MetricGroup) - sortMetricTags(mn) - dst = marshalMetricTagsFast(dst, mn.Tags) - return dst + return marshalMetricTagsSorted(dst, mn) } func marshalMetricTagsSorted(dst []byte, mn *storage.MetricName) []byte { @@ -349,6 +315,62 @@ func unmarshalBytesFast(src []byte) ([]byte, []byte, error) { return src[n:], src[:n], nil } +func float64ToByteSlice(a []float64) (b []byte) { + if len(a) == 0 { + return nil + } + sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + sh.Data = uintptr(unsafe.Pointer(&a[0])) + sh.Len = len(a) * int(unsafe.Sizeof(a[0])) + sh.Cap = sh.Len + return +} + +func int64ToByteSlice(a []int64) (b []byte) { + if len(a) == 0 { + return nil + } + sh := (*reflect.SliceHeader)(unsafe.Pointer(&b)) + sh.Data = uintptr(unsafe.Pointer(&a[0])) + sh.Len = len(a) * int(unsafe.Sizeof(a[0])) + sh.Cap = sh.Len + return +} + +func byteSliceToInt64(b []byte) (a []int64) { + if len(b) == 0 { + return nil + } + sh := (*reflect.SliceHeader)(unsafe.Pointer(&a)) + sh.Data = uintptr(unsafe.Pointer(&b[0])) + sh.Len = len(b) / int(unsafe.Sizeof(a[0])) + sh.Cap = sh.Len + // Make sure that the returned slice is properly aligned to 8 bytes. + // This prevents from SIGBUS error on arm architectures, which deny unaligned access. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3927 + if sh.Data%8 != 0 { + logger.Panicf("BUG: the input byte slice b must be aligned to 8 bytes") + } + return +} + +func byteSliceToFloat64(b []byte) (a []float64) { + if len(b) == 0 { + return nil + } + sh := (*reflect.SliceHeader)(unsafe.Pointer(&a)) + sh.Data = uintptr(unsafe.Pointer(&b[0])) + sh.Len = len(b) / int(unsafe.Sizeof(a[0])) + sh.Cap = sh.Len + // Make sure that the returned slice is properly aligned to 8 bytes. + // This prevents from SIGBUS error on arm architectures, which deny unaligned access. + // See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3927 + if sh.Data%8 != 0 { + logger.Panicf("BUG: the input byte slice b must be aligned to 8 bytes") + } + return +} + func stringMetricName(mn *storage.MetricName) string { var dst []byte dst = append(dst, mn.MetricGroup...) diff --git a/app/vmselect/promql/timeseries_test.go b/app/vmselect/promql/timeseries_test.go index 87a9618a7..b1b08ed53 100644 --- a/app/vmselect/promql/timeseries_test.go +++ b/app/vmselect/promql/timeseries_test.go @@ -1,7 +1,6 @@ package promql import ( - "fmt" "os" "reflect" "testing" @@ -14,91 +13,82 @@ func TestMain(m *testing.M) { os.Exit(n) } -func TestTimeseriesMarshalUnmarshalFast(t *testing.T) { - t.Run("single", func(t *testing.T) { - var tsOrig timeseries - buf := tsOrig.marshalFastNoTimestamps(nil) - n := tsOrig.marshaledFastSizeNoTimestamps() - if n != len(buf) { - t.Fatalf("unexpected marshaled size; got %d; want %d", n, len(buf)) - } - - var tsGot timeseries - tail, err := tsGot.unmarshalFastNoTimestamps(buf) +func TestMarshalTimeseriesFast(t *testing.T) { + f := func(tss []*timeseries) { + t.Helper() + data := marshalTimeseriesFast(nil, tss, 1e9, 10) + tss2, err := unmarshalTimeseriesFast(data) if err != nil { t.Fatalf("cannot unmarshal timeseries: %s", err) } - if len(tail) > 0 { - t.Fatalf("unexpected non-empty tail left: len(tail)=%d; tail=%X", len(tail), tail) + if !reflect.DeepEqual(tss, tss2) { + t.Fatalf("unexpected timeseries unmarshaled\ngot\n%#v\nwant\n%#v", tss2[0], tss[0]) } - tsOrig.denyReuse = true - tsOrig.MetricName.MetricGroup = []byte{} - if !reflect.DeepEqual(&tsOrig, &tsGot) { - t.Fatalf("unexpected ts\ngot:\n%s\nwant:\n%s", &tsGot, &tsOrig) - } - }) - t.Run("multiple", func(t *testing.T) { - var dst []byte - var tssOrig []*timeseries - timestamps := []int64{2} - for i := 0; i < 10; i++ { - var ts timeseries - ts.denyReuse = true - ts.MetricName.MetricGroup = []byte(fmt.Sprintf("metricGroup %d", i)) - ts.MetricName.Tags = []storage.Tag{{ - Key: []byte(fmt.Sprintf("key %d", i)), - Value: []byte(fmt.Sprintf("value %d", i)), - }} - ts.Values = []float64{float64(i) + 0.2} - ts.Timestamps = timestamps + } - dstLen := len(dst) - dst = ts.marshalFastNoTimestamps(dst) - n := ts.marshaledFastSizeNoTimestamps() - if n != len(dst)-dstLen { - t.Fatalf("unexpected marshaled size on iteration %d; got %d; want %d", i, n, len(dst)-dstLen) - } + // Single series + f([]*timeseries{{ + MetricName: storage.MetricName{ + MetricGroup: []byte{}, + }, + denyReuse: true, + }}) + f([]*timeseries{{ + MetricName: storage.MetricName{ + AccountID: 8934, + ProjectID: 8984, + MetricGroup: []byte("foobar"), + Tags: []storage.Tag{ + { + Key: []byte("tag1"), + Value: []byte("value1"), + }, + { + Key: []byte("tag2"), + Value: []byte("value2"), + }, + }, + }, + Values: []float64{1, 2, 3.234}, + Timestamps: []int64{10, 20, 30}, + denyReuse: true, + }}) - var tsGot timeseries - tsGot.Timestamps = ts.Timestamps - tail, err := tsGot.unmarshalFastNoTimestamps(dst[dstLen:]) - if err != nil { - t.Fatalf("cannot unmarshal timeseries on iteration %d: %s", i, err) - } - if len(tail) > 0 { - t.Fatalf("unexpected non-empty tail left on iteration %d: len(tail)=%d; tail=%x", i, len(tail), tail) - } - if !reflect.DeepEqual(&ts, &tsGot) { - t.Fatalf("unexpected ts on iteration %d\ngot:\n%s\nwant:\n%s", i, &tsGot, &ts) - } - - tssOrig = append(tssOrig, &ts) - } - buf := marshalTimeseriesFast(nil, tssOrig, 1e6, 123) - tssGot, err := unmarshalTimeseriesFast(buf) - if err != nil { - t.Fatalf("error in unmarshalTimeseriesFast: %s", err) - } - if !reflect.DeepEqual(tssOrig, tssGot) { - t.Fatalf("unexpected unmarshaled timeseries\ngot:\n%s\nwant:\n%s", tssGot, tssOrig) - } - - src := dst - for i := 0; i < 10; i++ { - tsOrig := tssOrig[i] - var ts timeseries - ts.Timestamps = tsOrig.Timestamps - tail, err := ts.unmarshalFastNoTimestamps(src) - if err != nil { - t.Fatalf("cannot unmarshal timeseries[%d]: %s", i, err) - } - src = tail - if !reflect.DeepEqual(tsOrig, &ts) { - t.Fatalf("unexpected ts on iteration %d:\n%+v\nwant:\n%+v", i, &ts, tsOrig) - } - } - if len(src) > 0 { - t.Fatalf("unexpected tail left; len(tail)=%d; tail=%X", len(src), src) - } + // Multiple series + f([]*timeseries{ + { + MetricName: storage.MetricName{ + AccountID: 898, + ProjectID: 9899889, + MetricGroup: []byte("foobar"), + Tags: []storage.Tag{ + { + Key: []byte("tag1"), + Value: []byte("value1"), + }, + { + Key: []byte("tag2"), + Value: []byte("value2"), + }, + }, + }, + Values: []float64{1, 2.34, -33}, + Timestamps: []int64{-10, 0, 10}, + denyReuse: true, + }, + { + MetricName: storage.MetricName{ + MetricGroup: []byte("baz"), + Tags: []storage.Tag{ + { + Key: []byte("tag12"), + Value: []byte("value13"), + }, + }, + }, + Values: []float64{4, 1, 2.34}, + Timestamps: []int64{-10, 0, 10}, + denyReuse: true, + }, }) } diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index b0662b317..f049b49e0 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -27,9 +27,10 @@ The following tip changes can be tested by building VictoriaMetrics components f * BUGFIX: vmstorage: fix a bug, which could lead to incomplete or empty results for heavy queries selecting tens of thousands of time series. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3946). * BUGFIX: vmselect: reduce memory usage and CPU usage when performing heavy queries. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3692). +* BUGFIX: prevent from possible `invalid memory address or nil pointer dereference` panic during [background merge](https://docs.victoriametrics.com/#storage). The issue has been introduced at [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3897). +* BUGFIX: prevent from possible `SIGBUS` crash on ARM architectures (Raspberry Pi), which deny unaligned access to 8-byte words. Thanks to @oliverpool for narrowing down the issue and for [the initial attempt to fix it](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3927). * BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): properly take into account `-rpc.disableCompression` command-line flag at `vmstorage`. It was ignored since [v1.78.0](https://docs.victoriametrics.com/CHANGELOG.html#v1780). See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3932). * BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): fix panic when [writing data to Kafka](https://docs.victoriametrics.com/vmagent.html#writing-metrics-to-kafka). The panic has been introduced in [v1.88.0](https://docs.victoriametrics.com/CHANGELOG.html#v1880). -* BUGFIX: prevent from possible `invalid memory address or nil pointer dereference` panic during [background merge](https://docs.victoriametrics.com/#storage). The issue has been introduced at [v1.85.0](https://docs.victoriametrics.com/CHANGELOG.html#v1850). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3897). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): stop showing `Please enter a valid Query and execute it` error message on the first load of vmui. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): properly process `Run in VMUI` button click in [VictoriaMetrics datasource plugin for Grafana](https://github.com/VictoriaMetrics/grafana-datasource). * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): fix the display of the selected value for dropdowns on `Explore` page.