lib/mergeset: do not panic on too long items passed to Table.AddItems()

Instead, log a sample of these long items once per 5 seconds into error log,
so users could notice and fix the issue with too long labels or too many labels.

Previously this panic could occur in production when ingesting samples with too long labels.
This commit is contained in:
Aliaksandr Valialkin 2024-02-12 19:32:16 +02:00
parent a2a218e284
commit 7e8b772426
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
3 changed files with 27 additions and 6 deletions

View file

@ -247,7 +247,13 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
continue continue
} }
putInmemoryBlock(ib) putInmemoryBlock(ib)
logger.Panicf("BUG: cannot insert too big item into an empty inmemoryBlock len(item)=%d; the caller should be responsible for avoiding too big items", len(item))
// Skip too long item
itemPrefix := item
if len(itemPrefix) > 128 {
itemPrefix = itemPrefix[:128]
}
tooLongItemLogger.Errorf("skipping adding too long item to indexdb: len(item)=%d; it souldn't exceed %d bytes; item prefix=%q", len(item), maxInmemoryBlockSize, itemPrefix)
} }
ris.ibs = ibs ris.ibs = ibs
ris.mu.Unlock() ris.mu.Unlock()
@ -265,6 +271,8 @@ func (ris *rawItemsShard) addItems(tb *Table, items [][]byte) [][]byte {
return tailItems return tailItems
} }
var tooLongItemLogger = logger.WithThrottler("tooLongItem", 5*time.Second)
type partWrapper struct { type partWrapper struct {
p *part p *part
@ -515,8 +523,8 @@ func (tb *Table) UpdateMetrics(m *TableMetrics) {
// AddItems adds the given items to the tb. // AddItems adds the given items to the tb.
// //
// The function panics when items contains an item with length exceeding maxInmemoryBlockSize. // The function ignores items with length exceeding maxInmemoryBlockSize.
// It is caller's responsibility to make sure there are no too long items. // It logs the ignored items, so users could notice and fix the issue.
func (tb *Table) AddItems(items [][]byte) { func (tb *Table) AddItems(items [][]byte) {
tb.rawItems.addItems(tb, items) tb.rawItems.addItems(tb, items)
atomic.AddUint64(&tb.itemsAdded, uint64(len(items))) atomic.AddUint64(&tb.itemsAdded, uint64(len(items)))

View file

@ -33,6 +33,19 @@ func TestTableOpenClose(t *testing.T) {
} }
} }
func TestTableAddItemsTooLongItem(t *testing.T) {
const path = "TestTableAddItemsTooLongItem"
if err := os.RemoveAll(path); err != nil {
t.Fatalf("cannot remove %q: %s", path, err)
}
var isReadOnly uint32
tb := MustOpenTable(path, nil, nil, &isReadOnly)
tb.AddItems([][]byte{make([]byte, maxInmemoryBlockSize+1)})
tb.MustClose()
_ = os.RemoveAll(path)
}
func TestTableAddItemsSerial(t *testing.T) { func TestTableAddItemsSerial(t *testing.T) {
r := rand.New(rand.NewSource(1)) r := rand.New(rand.NewSource(1))
const path = "TestTableAddItemsSerial" const path = "TestTableAddItemsSerial"

View file

@ -651,9 +651,9 @@ func trackTruncatedLabels(labels []prompb.Label, truncated *prompb.Label) {
case <-truncatedLabelsLogTicker.C: case <-truncatedLabelsLogTicker.C:
// Do not call logger.WithThrottler() here, since this will result in increased CPU usage // Do not call logger.WithThrottler() here, since this will result in increased CPU usage
// because labelsToString() will be called with each trackTruncatedLabels call. // because labelsToString() will be called with each trackTruncatedLabels call.
logger.Warnf("truncated label value as it exceeds configured maximal label value length: max %d, actual %d;"+ logger.Warnf("truncate value for label %s because its length=%d exceeds -maxLabelValueLen=%d; "+
" truncated label: %s; original labels: %s; either reduce the label value length or increase -maxLabelValueLen=%d;", "original labels: %s; either reduce the label value length or increase -maxLabelValueLen command-line flag value",
maxLabelValueLen, len(truncated.Value), truncated.Name, labelsToString(labels), maxLabelValueLen) truncated.Name, len(truncated.Value), maxLabelValueLen, labelsToString(labels))
default: default:
} }
} }