lib/logstorage: improve performance for stream_context pipe over streams with big number of log entries

Do not read timestamps for blocks, which cannot contain surrounding logs.
This should improve peformance for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6730 .

Also optimize min(_time) and max(_time) calculations a bit by avoiding conversion
of timestamp to string when it isn't needed.
This should improve performance for https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7070 .
This commit is contained in:
Aliaksandr Valialkin 2024-09-26 22:22:21 +02:00
parent 3646724c6f
commit b82bd0c2ec
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
7 changed files with 118 additions and 105 deletions

View file

@ -387,35 +387,46 @@ func (br *blockResult) mustInit(bs *blockSearch, bm *bitmap) {
br.bm = bm br.bm = bm
} }
func (br *blockResult) getMinTimestamp() int64 { // intersectsTimeRange returns true if br timestamps intersect (minTimestamp .. maxTimestamp) time range.
if br.bm != nil && br.bm.bitsLen == br.rowsLen { func (br *blockResult) intersectsTimeRange(minTimestamp, maxTimestamp int64) bool {
return br.bs.bsw.bh.timestampsHeader.minTimestamp return minTimestamp < br.getMaxTimestamp(minTimestamp) && maxTimestamp > br.getMinTimestamp(maxTimestamp)
}
func (br *blockResult) getMinTimestamp(minTimestamp int64) int64 {
if br.bs != nil {
bh := &br.bs.bsw.bh
if bh.rowsCount == uint64(br.rowsLen) {
return min(minTimestamp, bh.timestampsHeader.minTimestamp)
}
if minTimestamp <= bh.timestampsHeader.minTimestamp {
return minTimestamp
}
} }
// Slow path - need to scan timestamps
timestamps := br.getTimestamps() timestamps := br.getTimestamps()
if len(timestamps) == 0 { for _, timestamp := range timestamps {
return -1 << 63 if timestamp < minTimestamp {
} minTimestamp = timestamp
minTimestamp := timestamps[0]
for i := 1; i < len(timestamps); i++ {
if timestamps[i] < minTimestamp {
minTimestamp = timestamps[i]
} }
} }
return minTimestamp return minTimestamp
} }
func (br *blockResult) getMaxTimestamp() int64 { func (br *blockResult) getMaxTimestamp(maxTimestamp int64) int64 {
if br.bm != nil && br.bm.bitsLen == br.rowsLen { if br.bs != nil {
return br.bs.bsw.bh.timestampsHeader.maxTimestamp bh := &br.bs.bsw.bh
if bh.rowsCount == uint64(br.rowsLen) {
return max(maxTimestamp, bh.timestampsHeader.maxTimestamp)
}
if maxTimestamp >= bh.timestampsHeader.maxTimestamp {
return maxTimestamp
}
} }
// Slow path - need to scan timestamps
timestamps := br.getTimestamps() timestamps := br.getTimestamps()
if len(timestamps) == 0 { for i := len(timestamps) - 1; i >= 0; i-- {
return (1 << 63) - 1
}
maxTimestamp := timestamps[len(timestamps)-1]
for i := len(timestamps) - 2; i >= 0; i-- {
if timestamps[i] > maxTimestamp { if timestamps[i] > maxTimestamp {
maxTimestamp = timestamps[i] maxTimestamp = timestamps[i]
} }

View file

@ -35,6 +35,9 @@ func (pc *pipeStreamContext) String() string {
if pc.linesAfter > 0 { if pc.linesAfter > 0 {
s += fmt.Sprintf(" after %d", pc.linesAfter) s += fmt.Sprintf(" after %d", pc.linesAfter)
} }
if pc.linesBefore <= 0 && pc.linesAfter <= 0 {
s += " after 0"
}
return s return s
} }
@ -163,21 +166,28 @@ func (pcp *pipeStreamContextProcessor) getStreamRowss(streamID string, neededRow
if stateSize > stateSizeBudget { if stateSize > stateSizeBudget {
cancel() cancel()
return
} }
timestamps := br.getTimestamps() for i := range contextRows {
for i, timestamp := range timestamps {
if needStop(pcp.stopCh) { if needStop(pcp.stopCh) {
break break
} }
for j := range contextRows {
if j > 0 && timestamp <= contextRows[j-1].neededTimestamp { if !contextRows[i].canUpdate(br) {
// Fast path - skip reading block timestamps for the given ctx.
continue continue
} }
if j+1 < len(contextRows) && timestamp >= contextRows[j+1].neededTimestamp {
timestamps := br.getTimestamps()
for j, timestamp := range timestamps {
if i > 0 && timestamp <= contextRows[i-1].neededTimestamp {
continue continue
} }
stateSize += contextRows[j].update(br, i, timestamp) if i+1 < len(contextRows) && timestamp >= contextRows[i+1].neededTimestamp {
continue
}
stateSize += contextRows[i].update(br, j, timestamp)
} }
} }
} }
@ -247,6 +257,42 @@ func (ctx *streamContextRows) getSortedRows() []*streamContextRow {
return rows return rows
} }
func (ctx *streamContextRows) canUpdate(br *blockResult) bool {
if ctx.linesBefore > 0 {
if len(ctx.rowsBefore) < ctx.linesBefore {
return true
}
minTimestamp := ctx.rowsBefore[0].timestamp - 1
maxTimestamp := ctx.neededTimestamp
if br.intersectsTimeRange(minTimestamp, maxTimestamp) {
return true
}
}
if ctx.linesAfter > 0 {
if len(ctx.rowsAfter) < ctx.linesAfter {
return true
}
minTimestamp := ctx.neededTimestamp
maxTimestamp := ctx.rowsAfter[0].timestamp + 1
if br.intersectsTimeRange(minTimestamp, maxTimestamp) {
return true
}
}
if ctx.linesBefore <= 0 && ctx.linesAfter <= 0 {
if len(ctx.rowsMatched) == 0 {
return true
}
timestamp := ctx.rowsMatched[0].timestamp
if br.intersectsTimeRange(timestamp-1, timestamp+1) {
return true
}
}
return false
}
func (ctx *streamContextRows) update(br *blockResult, rowIdx int, rowTimestamp int64) int { func (ctx *streamContextRows) update(br *blockResult, rowIdx int, rowTimestamp int64) int {
if rowTimestamp < ctx.neededTimestamp { if rowTimestamp < ctx.neededTimestamp {
if ctx.linesBefore <= 0 { if ctx.linesBefore <= 0 {
@ -430,11 +476,6 @@ func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult
if br.rowsLen == 0 { if br.rowsLen == 0 {
return return
} }
if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 {
// Fast path - there is no need to fetch stream context.
pcp.ppNext.writeBlock(workerID, br)
return
}
shard := &pcp.shards[workerID] shard := &pcp.shards[workerID]
@ -456,11 +497,6 @@ func (pcp *pipeStreamContextProcessor) writeBlock(workerID uint, br *blockResult
} }
func (pcp *pipeStreamContextProcessor) flush() error { func (pcp *pipeStreamContextProcessor) flush() error {
if pcp.pc.linesBefore <= 0 && pcp.pc.linesAfter <= 0 {
// Fast path - nothing to do.
return nil
}
n := pcp.stateSizeBudget.Load() n := pcp.stateSizeBudget.Load()
if n <= 0 { if n <= 0 {
return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20)) return fmt.Errorf("cannot calculate [%s], since it requires more than %dMB of memory", pcp.pc.String(), pcp.maxStateSize/(1<<20))

View file

@ -12,6 +12,7 @@ func TestParsePipeStreamContextSuccess(t *testing.T) {
f(`stream_context before 5`) f(`stream_context before 5`)
f(`stream_context after 10`) f(`stream_context after 10`)
f(`stream_context after 0`)
f(`stream_context before 10 after 20`) f(`stream_context before 10 after 20`)
} }
@ -30,73 +31,6 @@ func TestParsePipeStreamContextFailure(t *testing.T) {
f(`stream_context after -4`) f(`stream_context after -4`)
} }
func TestPipeStreamContext(t *testing.T) {
f := func(pipeStr string, rows, rowsExpected [][]Field) {
t.Helper()
expectPipeResults(t, pipeStr, rows, rowsExpected)
}
f("stream_context before 0", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
})
f("stream_context after 0", [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
}, [][]Field{
{
{"a", `2`},
{"b", `3`},
},
{
{"a", "2"},
{"b", "3"},
},
{
{"a", `2`},
{"b", `54`},
{"c", "d"},
},
})
}
func TestPipeStreamContextUpdateNeededFields(t *testing.T) { func TestPipeStreamContextUpdateNeededFields(t *testing.T) {
f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) { f := func(s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
t.Helper() t.Helper()

View file

@ -85,7 +85,15 @@ func (smp *statsMaxProcessor) updateStateForColumn(br *blockResult, c *blockResu
} }
if c.isTime { if c.isTime {
maxTimestamp := br.getMaxTimestamp() timestamp, ok := TryParseTimestampRFC3339Nano(smp.max)
if !ok {
timestamp = -1 << 63
}
maxTimestamp := br.getMaxTimestamp(timestamp)
if maxTimestamp <= timestamp {
return
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp) bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)

View file

@ -87,7 +87,15 @@ func (smp *statsMinProcessor) updateStateForColumn(br *blockResult, c *blockResu
} }
if c.isTime { if c.isTime {
minTimestamp := br.getMinTimestamp() timestamp, ok := TryParseTimestampRFC3339Nano(smp.min)
if !ok {
timestamp = (1 << 63) - 1
}
minTimestamp := br.getMinTimestamp(timestamp)
if minTimestamp >= timestamp {
return
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp) bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp)
smp.updateStateBytes(bb.B) smp.updateStateBytes(bb.B)

View file

@ -60,7 +60,15 @@ func (smp *statsRowMaxProcessor) updateStatsForAllRows(br *blockResult) int {
return stateSizeIncrease return stateSizeIncrease
} }
if c.isTime { if c.isTime {
maxTimestamp := br.getMaxTimestamp() timestamp, ok := TryParseTimestampRFC3339Nano(smp.max)
if !ok {
timestamp = -1 << 63
}
maxTimestamp := br.getMaxTimestamp(timestamp)
if maxTimestamp <= timestamp {
return stateSizeIncrease
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp) bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], maxTimestamp)
v := bytesutil.ToUnsafeString(bb.B) v := bytesutil.ToUnsafeString(bb.B)

View file

@ -60,7 +60,15 @@ func (smp *statsRowMinProcessor) updateStatsForAllRows(br *blockResult) int {
return stateSizeIncrease return stateSizeIncrease
} }
if c.isTime { if c.isTime {
minTimestamp := br.getMinTimestamp() timestamp, ok := TryParseTimestampRFC3339Nano(smp.min)
if !ok {
timestamp = (1 << 63) - 1
}
minTimestamp := br.getMinTimestamp(timestamp)
if minTimestamp >= timestamp {
return stateSizeIncrease
}
bb := bbPool.Get() bb := bbPool.Get()
bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp) bb.B = marshalTimestampRFC3339NanoString(bb.B[:0], minTimestamp)
v := bytesutil.ToUnsafeString(bb.B) v := bytesutil.ToUnsafeString(bb.B)