diff --git a/lib/logstorage/block.go b/lib/logstorage/block.go index 5aae71a1b..2199611d9 100644 --- a/lib/logstorage/block.go +++ b/lib/logstorage/block.go @@ -392,8 +392,9 @@ func (b *block) resizeColumns(columnsLen int) []column { func (b *block) sortColumnsByName() { if len(b.columns)+len(b.constColumns) > maxColumnsPerBlock { - logger.Panicf("BUG: too big number of columns detected in the block: %d; the number of columns mustn't exceed %d", - len(b.columns)+len(b.constColumns), maxColumnsPerBlock) + columnNames := b.getColumnNames() + logger.Panicf("BUG: too big number of columns detected in the block: %d; the number of columns mustn't exceed %d; columns: %s", + len(b.columns)+len(b.constColumns), maxColumnsPerBlock, columnNames) } cs := getColumnsSorter() @@ -407,6 +408,17 @@ func (b *block) sortColumnsByName() { putConstColumnsSorter(ccs) } +func (b *block) getColumnNames() []string { + a := make([]string, 0, len(b.columns)+len(b.constColumns)) + for _, c := range b.columns { + a = append(a, c.name) + } + for _, c := range b.constColumns { + a = append(a, c.Name) + } + return a +} + // Len returns the number of log entries in b. func (b *block) Len() int { return len(b.timestamps) diff --git a/lib/logstorage/block_header.go b/lib/logstorage/block_header.go index da620e3c0..d699b87b7 100644 --- a/lib/logstorage/block_header.go +++ b/lib/logstorage/block_header.go @@ -492,8 +492,8 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte, partFormatVersion uint) e return fmt.Errorf("cannot unmarshal columnHeaders len") } src = src[nSize:] - if n > maxColumnsPerBlock { - return fmt.Errorf("too many column headers: %d; mustn't exceed %d", n, maxColumnsPerBlock) + if n > 1e6 { + return fmt.Errorf("too big number of columnHeaders: %d", n) } chs := csh.resizeColumnHeaders(int(n)) @@ -506,14 +506,19 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte, partFormatVersion uint) e } csh.columnHeaders = chs + if len(chs) > maxColumnsPerBlock { + columnNames := getNamesFromColumnHeaders(chs) + return fmt.Errorf("too many column headers: %d; it mustn't exceed %d; columns: %s", len(chs), maxColumnsPerBlock, columnNames) + } + // unmarshal constColumns n, nSize = encoding.UnmarshalVarUint64(src) if nSize <= 0 { return fmt.Errorf("cannot unmarshal constColumns len") } src = src[nSize:] - if n+uint64(len(csh.columnHeaders)) > maxColumnsPerBlock { - return fmt.Errorf("too many columns: %d; mustn't exceed %d", n+uint64(len(csh.columnHeaders)), maxColumnsPerBlock) + if n > 1e6 { + return fmt.Errorf("too big number of constColumns: %d", n) } ccs := csh.resizeConstColumns(int(n)) @@ -525,6 +530,14 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte, partFormatVersion uint) e src = tail } + if len(ccs)+len(csh.columnHeaders) > maxColumnsPerBlock { + columnNames := getNamesFromColumnHeaders(csh.columnHeaders) + for _, cc := range ccs { + columnNames = append(columnNames, cc.Name) + } + return fmt.Errorf("too many columns: %d; mustn't exceed %d; columns: %s", len(ccs)+len(csh.columnHeaders), maxColumnsPerBlock, columnNames) + } + // Verify that the src is empty if len(src) > 0 { return fmt.Errorf("unexpected non-empty tail left after unmarshaling columnsHeader: len(tail)=%d", len(src)) @@ -533,6 +546,14 @@ func (csh *columnsHeader) unmarshalNoArena(src []byte, partFormatVersion uint) e return nil } +func getNamesFromColumnHeaders(chs []columnHeader) []string { + a := make([]string, 0, len(chs)) + for _, ch := range chs { + a = append(a, ch.name) + } + return a +} + // columnHeaders contains information for values, which belong to a single label in a single block. // // The main column with an empty name is stored in messageValuesFilename,