lib/mergeset: do not panic on too long items passed to Table.AddItems()

Instead, log a sample of these long items once per 5 seconds into error log,
so users could notice and fix the issue with too long labels or too many labels.

Previously this panic could occur in production when ingesting samples with too long labels.
This commit is contained in:
Aliaksandr Valialkin 2024-02-12 19:32:16 +02:00
parent 5d69ba630e
commit a49a50701a
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 87 additions and 16 deletions

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"log"
"math/rand"
"net"
"net/http"
"os"
@ -413,6 +414,7 @@ func httpReadMetrics(t *testing.T, address, query string) []Metric {
}
return rows
}
func httpReadStruct(t *testing.T, address, query string, dst interface{}) {
t.Helper()
s := newSuite(t)
@ -497,3 +499,73 @@ func (s *suite) greaterThan(a, b int) {
s.t.FailNow()
}
}
func TestImportJSONLines(t *testing.T) {
f := func(labelsCount, labelLen int) {
t.Helper()
reqURL := fmt.Sprintf("http://localhost%s/api/v1/import", testHTTPListenAddr)
line := generateJSONLine(labelsCount, labelLen)
req, err := http.NewRequest("POST", reqURL, bytes.NewBufferString(line))
if err != nil {
t.Fatalf("cannot create request: %s", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("cannot perform request for labelsCount=%d, labelLen=%d: %s", labelsCount, labelLen, err)
}
if resp.StatusCode != 204 {
t.Fatalf("unexpected statusCode for labelsCount=%d, labelLen=%d; got %d; want 204", labelsCount, labelLen, resp.StatusCode)
}
}
// labels with various lengths
for i := 0; i < 500; i++ {
f(10, i*5)
}
// Too many labels
f(1000, 100)
// Too long labels
f(1, 100_000)
f(10, 100_000)
f(10, 10_000)
}
func generateJSONLine(labelsCount, labelLen int) string {
m := make(map[string]string, labelsCount)
m["__name__"] = generateSizedRandomString(labelLen)
for j := 1; j < labelsCount; j++ {
labelName := generateSizedRandomString(labelLen)
labelValue := generateSizedRandomString(labelLen)
m[labelName] = labelValue
}
type jsonLine struct {
Metric map[string]string `json:"metric"`
Values []float64 `json:"values"`
Timestamps []int64 `json:"timestamps"`
}
line := &jsonLine{
Metric: m,
Values: []float64{1.34},
Timestamps: []int64{time.Now().UnixNano() / 1e6},
}
data, err := json.Marshal(&line)
if err != nil {
panic(fmt.Errorf("cannot marshal JSON: %w", err))
}
data = append(data, '\n')
return string(data)
}
const alphabetSample = `qwertyuiopasdfghjklzxcvbnm`
func generateSizedRandomString(size int) string {
dst := make([]byte, size)
for i := range dst {
dst[i] = alphabetSample[rand.Intn(len(alphabetSample))]
}
return string(dst)
}

View file

@ -244,8 +244,13 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
ibs = append(ibs, ib)
continue
}
ris.mu.Unlock()
logger.Panicf("BUG: cannot insert too big item into an empty inmemoryBlock; len(item)=%d; the caller should be responsible for avoiding too big items", len(item))
// Skip too long item
itemPrefix := item
if len(itemPrefix) > 128 {
itemPrefix = itemPrefix[:128]
}
tooLongItemLogger.Errorf("skipping adding too long item to indexdb: len(item)=%d; it souldn't exceed %d bytes; item prefix=%q", len(item), maxInmemoryBlockSize, itemPrefix)
}
ris.ibs = ibs
ris.mu.Unlock()
@ -255,6 +260,8 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
return tailItems
}
var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second)
type partWrapper struct {
p *part
@ -600,8 +607,8 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
// AddItems adds the given items to the tb.
//
// The function panics when items contains an item with length exceeding maxInmemoryBlockSize.
// It is caller's responsibility to make sure there are no too long items.
// The function ignores items with length exceeding maxInmemoryBlockSize.
// It logs the ignored items, so users could notice and fix the issue.
func (tb *Table) AddItems(items [][]byte) {
tb.rawItems.addItems(tb, items)
atomic.AddUint64(&tb.itemsAdded, uint64(len(items)))

View file

@ -41,15 +41,7 @@ func TestTableAddItemsTooLongItem(t *testing.T) {
var isReadOnly uint32
tb := MustOpenTable(path, nil, nil, &isReadOnly)
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("expecting panic")
}
}()
tb.AddItems([][]byte{make([]byte, maxInmemoryBlockSize+1)})
}()
t.Logf("foobar")
tb.MustClose()
_ = os.RemoveAll(path)
}

View file

@ -582,9 +582,9 @@ func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label) {
case <-truncatedLabelsLogTicker.C:
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage
// because labelsToString() will be called with each trackTruncatedLabels call.
logger.Warnf("truncated label value as it exceeds configured maximal label value length: max %d, actual %d;"+
" truncated label: %s; original labels: %s; either reduce the label value length or increase -maxLabelValueLen=%d;",
maxLabelValueLen, len(truncated.Value), truncated.Name, labelsToString(labels), maxLabelValueLen)
logger.Warnf("truncate value for label %s because its length=%d exceeds -maxLabelValueLen=%d; "+
"original labels: %s; either reduce the label value length or increase -maxLabelValueLen command-line flag value",
truncated.Name, len(truncated.Value), maxLabelValueLen, labelsToString(labels))
default:
}
}