2024-06-28 17:14:29 +00:00
package logstorage
import (
2024-09-26 14:52:55 +00:00
"container/heap"
2024-06-28 17:14:29 +00:00
"fmt"
"math"
2024-09-26 14:52:55 +00:00
"slices"
2024-06-28 17:14:29 +00:00
"sort"
"strings"
"sync"
"sync/atomic"
"unsafe"
2024-09-26 14:52:55 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/contextutil"
2024-06-28 17:14:29 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
)
// pipeStreamContext processes '| stream_context ...' queries.
//
// See https://docs.victoriametrics.com/victorialogs/logsql/#stream_context-pipe
type pipeStreamContext struct {
// linesBefore is the number of lines to return before the matching line
linesBefore int
// linesAfter is the number of lines to return after the matching line
linesAfter int
}
func ( pc * pipeStreamContext ) String ( ) string {
s := "stream_context"
if pc . linesBefore > 0 {
s += fmt . Sprintf ( " before %d" , pc . linesBefore )
}
if pc . linesAfter > 0 {
s += fmt . Sprintf ( " after %d" , pc . linesAfter )
}
2024-09-26 20:22:21 +00:00
if pc . linesBefore <= 0 && pc . linesAfter <= 0 {
s += " after 0"
}
2024-06-28 17:14:29 +00:00
return s
}
func ( pc * pipeStreamContext ) canLiveTail ( ) bool {
return false
}
var neededFieldsForStreamContext = [ ] string {
"_time" ,
"_stream_id" ,
}
func ( pc * pipeStreamContext ) updateNeededFields ( neededFields , unneededFields fieldsSet ) {
neededFields . addFields ( neededFieldsForStreamContext )
unneededFields . removeFields ( neededFieldsForStreamContext )
}
func ( pc * pipeStreamContext ) optimize ( ) {
// nothing to do
}
func ( pc * pipeStreamContext ) hasFilterInWithQuery ( ) bool {
return false
}
func ( pc * pipeStreamContext ) initFilterInValues ( _ map [ string ] [ ] string , _ getFieldValuesFunc ) ( pipe , error ) {
return pc , nil
}
func ( pc * pipeStreamContext ) newPipeProcessor ( workersCount int , stopCh <- chan struct { } , cancel func ( ) , ppNext pipeProcessor ) pipeProcessor {
maxStateSize := int64 ( float64 ( memory . Allowed ( ) ) * 0.2 )
shards := make ( [ ] pipeStreamContextProcessorShard , workersCount )
for i := range shards {
shards [ i ] = pipeStreamContextProcessorShard {
pipeStreamContextProcessorShardNopad : pipeStreamContextProcessorShardNopad {
pc : pc ,
stateSizeBudget : stateSizeBudgetChunk ,
} ,
}
maxStateSize -= stateSizeBudgetChunk
}
pcp := & pipeStreamContextProcessor {
pc : pc ,
stopCh : stopCh ,
cancel : cancel ,
ppNext : ppNext ,
shards : shards ,
maxStateSize : maxStateSize ,
}
pcp . stateSizeBudget . Store ( maxStateSize )
return pcp
}
type pipeStreamContextProcessor struct {
pc * pipeStreamContext
stopCh <- chan struct { }
cancel func ( )
ppNext pipeProcessor
2024-09-26 14:52:55 +00:00
s * Storage
neededColumnNames [ ] string
unneededColumnNames [ ] string
2024-06-28 17:14:29 +00:00
2024-09-26 14:52:55 +00:00
shards [ ] pipeStreamContextProcessorShard
2024-06-28 17:14:29 +00:00
maxStateSize int64
stateSizeBudget atomic . Int64
}
2024-09-26 14:52:55 +00:00
func ( pcp * pipeStreamContextProcessor ) init ( s * Storage , neededColumnNames , unneededColumnNames [ ] string ) {
pcp . s = s
pcp . neededColumnNames = neededColumnNames
pcp . unneededColumnNames = unneededColumnNames
2024-06-28 17:14:29 +00:00
}
2024-09-26 14:52:55 +00:00
func ( pcp * pipeStreamContextProcessor ) getStreamRowss ( streamID string , neededRows [ ] streamContextRow , stateSizeBudget int ) ( [ ] [ ] * streamContextRow , error ) {
2024-06-28 17:14:29 +00:00
tenantID , ok := getTenantIDFromStreamIDString ( streamID )
if ! ok {
logger . Panicf ( "BUG: cannot obtain tenantID from streamID %q" , streamID )
}
2024-09-26 14:52:55 +00:00
// construct the query for selecting all the rows for the given streamID
2024-06-28 17:14:29 +00:00
qStr := "_stream_id:" + streamID
2024-09-26 14:52:55 +00:00
if slices . Contains ( pcp . neededColumnNames , "*" ) {
if len ( pcp . unneededColumnNames ) > 0 {
qStr += " | delete " + fieldNamesString ( pcp . unneededColumnNames )
}
} else {
if len ( pcp . neededColumnNames ) > 0 {
qStr += " | fields " + fieldNamesString ( pcp . neededColumnNames )
}
}
2024-06-28 17:14:29 +00:00
q , err := ParseQuery ( qStr )
if err != nil {
logger . Panicf ( "BUG: cannot parse query [%s]: %s" , qStr , err )
}
2024-09-26 14:52:55 +00:00
// mu protects contextRows and stateSize inside writeBlock callback.
2024-06-28 17:14:29 +00:00
var mu sync . Mutex
2024-09-26 14:52:55 +00:00
contextRows := make ( [ ] streamContextRows , len ( neededRows ) )
for i := range neededRows {
contextRows [ i ] = streamContextRows {
neededTimestamp : neededRows [ i ] . timestamp ,
linesBefore : pcp . pc . linesBefore ,
linesAfter : pcp . pc . linesAfter ,
}
}
sort . Slice ( contextRows , func ( i , j int ) bool {
return contextRows [ i ] . neededTimestamp < contextRows [ j ] . neededTimestamp
} )
2024-06-28 17:14:29 +00:00
stateSize := 0
2024-09-26 14:52:55 +00:00
ctxWithCancel , cancel := contextutil . NewStopChanContext ( pcp . stopCh )
defer cancel ( )
2024-06-28 17:14:29 +00:00
writeBlock := func ( _ uint , br * blockResult ) {
mu . Lock ( )
defer mu . Unlock ( )
if stateSize > stateSizeBudget {
cancel ( )
2024-09-26 20:22:21 +00:00
return
2024-06-28 17:14:29 +00:00
}
2024-09-26 20:22:21 +00:00
for i := range contextRows {
2024-09-26 14:52:55 +00:00
if needStop ( pcp . stopCh ) {
break
2024-06-28 17:14:29 +00:00
}
2024-09-26 20:22:21 +00:00
if ! contextRows [ i ] . canUpdate ( br ) {
// Fast path - skip reading block timestamps for the given ctx.
continue
}
timestamps := br . getTimestamps ( )
for j , timestamp := range timestamps {
if i > 0 && timestamp <= contextRows [ i - 1 ] . neededTimestamp {
2024-09-26 14:52:55 +00:00
continue
}
2024-09-26 20:22:21 +00:00
if i + 1 < len ( contextRows ) && timestamp >= contextRows [ i + 1 ] . neededTimestamp {
2024-09-26 14:52:55 +00:00
continue
}
2024-09-26 20:22:21 +00:00
stateSize += contextRows [ i ] . update ( br , j , timestamp )
2024-06-28 17:14:29 +00:00
}
}
}
2024-09-26 14:52:55 +00:00
if err := pcp . s . runQuery ( ctxWithCancel , [ ] TenantID { tenantID } , q , writeBlock ) ; err != nil {
2024-06-28 17:14:29 +00:00
return nil , err
}
if stateSize > stateSizeBudget {
2024-09-26 14:52:55 +00:00
return nil , fmt . Errorf ( "more than %dMB of memory is needed for fetching the surrounding logs for %d matching logs" , stateSizeBudget / ( 1 << 20 ) , len ( neededRows ) )
}
// return sorted results from contextRows
rowss := make ( [ ] [ ] * streamContextRow , len ( contextRows ) )
for i , ctx := range contextRows {
rowss [ i ] = ctx . getSortedRows ( )
}
rowss = deduplicateStreamRowss ( rowss )
return rowss , nil
}
func deduplicateStreamRowss ( streamRowss [ ] [ ] * streamContextRow ) [ ] [ ] * streamContextRow {
var lastSeenRow * streamContextRow
for _ , streamRows := range streamRowss {
if len ( streamRows ) > 0 {
lastSeenRow = streamRows [ len ( streamRows ) - 1 ]
break
}
}
if lastSeenRow == nil {
return nil
2024-06-28 17:14:29 +00:00
}
2024-09-26 14:52:55 +00:00
resultRowss := streamRowss [ : 1 ]
for _ , streamRows := range streamRowss [ 1 : ] {
i := 0
for i < len ( streamRows ) && ! lastSeenRow . less ( streamRows [ i ] ) {
i ++
}
streamRows = streamRows [ i : ]
if len ( streamRows ) == 0 {
continue
}
resultRowss = append ( resultRowss , streamRows )
lastSeenRow = streamRows [ len ( streamRows ) - 1 ]
}
return resultRowss
}
type streamContextRows struct {
neededTimestamp int64
linesBefore int
linesAfter int
rowsBefore streamContextRowsHeapMin
rowsAfter streamContextRowsHeapMax
rowsMatched [ ] * streamContextRow
}
func ( ctx * streamContextRows ) getSortedRows ( ) [ ] * streamContextRow {
var rows [ ] * streamContextRow
rows = append ( rows , ctx . rowsBefore ... )
rows = append ( rows , ctx . rowsMatched ... )
rows = append ( rows , ctx . rowsAfter ... )
sort . Slice ( rows , func ( i , j int ) bool {
return rows [ i ] . less ( rows [ j ] )
} )
return rows
}
2024-09-26 20:22:21 +00:00
func ( ctx * streamContextRows ) canUpdate ( br * blockResult ) bool {
if ctx . linesBefore > 0 {
if len ( ctx . rowsBefore ) < ctx . linesBefore {
return true
}
minTimestamp := ctx . rowsBefore [ 0 ] . timestamp - 1
maxTimestamp := ctx . neededTimestamp
if br . intersectsTimeRange ( minTimestamp , maxTimestamp ) {
return true
}
}
if ctx . linesAfter > 0 {
if len ( ctx . rowsAfter ) < ctx . linesAfter {
return true
}
minTimestamp := ctx . neededTimestamp
maxTimestamp := ctx . rowsAfter [ 0 ] . timestamp + 1
if br . intersectsTimeRange ( minTimestamp , maxTimestamp ) {
return true
}
}
if ctx . linesBefore <= 0 && ctx . linesAfter <= 0 {
if len ( ctx . rowsMatched ) == 0 {
return true
}
timestamp := ctx . rowsMatched [ 0 ] . timestamp
if br . intersectsTimeRange ( timestamp - 1 , timestamp + 1 ) {
return true
}
}
return false
}
2024-09-26 14:52:55 +00:00
func ( ctx * streamContextRows ) update ( br * blockResult , rowIdx int , rowTimestamp int64 ) int {
if rowTimestamp < ctx . neededTimestamp {
if ctx . linesBefore <= 0 {
return 0
}
if len ( ctx . rowsBefore ) < ctx . linesBefore {
r := ctx . copyRowAtIdx ( br , rowIdx , rowTimestamp )
heap . Push ( & ctx . rowsBefore , r )
return r . sizeBytes ( )
}
if rowTimestamp <= ctx . rowsBefore [ 0 ] . timestamp {
return 0
}
r := ctx . copyRowAtIdx ( br , rowIdx , rowTimestamp )
stateSizeChange := r . sizeBytes ( ) - ctx . rowsBefore [ 0 ] . sizeBytes ( )
ctx . rowsBefore [ 0 ] = r
heap . Fix ( & ctx . rowsBefore , 0 )
return stateSizeChange
}
if rowTimestamp > ctx . neededTimestamp {
if ctx . linesAfter <= 0 {
return 0
}
if len ( ctx . rowsAfter ) < ctx . linesAfter {
r := ctx . copyRowAtIdx ( br , rowIdx , rowTimestamp )
heap . Push ( & ctx . rowsAfter , r )
return r . sizeBytes ( )
}
if rowTimestamp >= ctx . rowsAfter [ 0 ] . timestamp {
return 0
}
r := ctx . copyRowAtIdx ( br , rowIdx , rowTimestamp )
stateSizeChange := r . sizeBytes ( ) - ctx . rowsAfter [ 0 ] . sizeBytes ( )
ctx . rowsAfter [ 0 ] = r
heap . Fix ( & ctx . rowsAfter , 0 )
return stateSizeChange
}
// rowTimestamp == ctx.neededTimestamp
r := ctx . copyRowAtIdx ( br , rowIdx , rowTimestamp )
ctx . rowsMatched = append ( ctx . rowsMatched , r )
return r . sizeBytes ( )
}
func ( ctx * streamContextRows ) copyRowAtIdx ( br * blockResult , rowIdx int , rowTimestamp int64 ) * streamContextRow {
cs := br . getColumns ( )
fields := make ( [ ] Field , len ( cs ) )
for i , c := range cs {
v := c . getValueAtRow ( br , rowIdx )
fields [ i ] = Field {
Name : strings . Clone ( c . name ) ,
Value : strings . Clone ( v ) ,
}
}
return & streamContextRow {
timestamp : rowTimestamp ,
fields : fields ,
}
2024-06-28 17:14:29 +00:00
}
func getTenantIDFromStreamIDString ( s string ) ( TenantID , bool ) {
var sid streamID
if ! sid . tryUnmarshalFromString ( s ) {
return TenantID { } , false
}
return sid . tenantID , true
}
type pipeStreamContextProcessorShard struct {
pipeStreamContextProcessorShardNopad
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
_ [ 128 - unsafe . Sizeof ( pipeStreamContextProcessorShardNopad { } ) % 128 ] byte
}
type streamContextRow struct {
timestamp int64
fields [ ] Field
}
2024-09-26 14:52:55 +00:00
func ( r * streamContextRow ) sizeBytes ( ) int {
n := 0
fields := r . fields
for _ , f := range fields {
n += len ( f . Name ) + len ( f . Value ) + int ( unsafe . Sizeof ( f ) )
}
n += int ( unsafe . Sizeof ( * r ) + unsafe . Sizeof ( r ) )
return n
}
func ( r * streamContextRow ) less ( other * streamContextRow ) bool {
// compare timestamps at first
if r . timestamp != other . timestamp {
return r . timestamp < other . timestamp
}
// compare fields then
i := 0
aFields := r . fields
bFields := other . fields
for i < len ( aFields ) && i < len ( bFields ) {
af := & aFields [ i ]
bf := & bFields [ i ]
if af . Name != bf . Name {
return af . Name < bf . Name
}
if af . Value != bf . Value {
return af . Value < bf . Value
}
i ++
}
if len ( aFields ) != len ( bFields ) {
return len ( aFields ) < len ( bFields )
}
return false
}
2024-06-28 17:14:29 +00:00
type pipeStreamContextProcessorShardNopad struct {
// pc points to the parent pipeStreamContext.
pc * pipeStreamContext
// m holds per-stream matching rows
m map [ string ] [ ] streamContextRow
// stateSizeBudget is the remaining budget for the whole state size for the shard.
// The per-shard budget is provided in chunks from the parent pipeStreamContextProcessor.
stateSizeBudget int
}
// writeBlock writes br to shard.
func ( shard * pipeStreamContextProcessorShard ) writeBlock ( br * blockResult ) {
m := shard . getM ( )
cs := br . getColumns ( )
cStreamID := br . getColumnByName ( "_stream_id" )
stateSize := 0
2024-09-25 14:16:53 +00:00
timestamps := br . getTimestamps ( )
for i , timestamp := range timestamps {
2024-06-28 17:14:29 +00:00
fields := make ( [ ] Field , len ( cs ) )
stateSize += int ( unsafe . Sizeof ( fields [ 0 ] ) ) * len ( fields )
for j , c := range cs {
v := c . getValueAtRow ( br , i )
fields [ j ] = Field {
Name : strings . Clone ( c . name ) ,
Value : strings . Clone ( v ) ,
}
stateSize += len ( c . name ) + len ( v )
}
row := streamContextRow {
timestamp : timestamp ,
fields : fields ,
}
stateSize += int ( unsafe . Sizeof ( row ) )
streamID := cStreamID . getValueAtRow ( br , i )
rows , ok := m [ streamID ]
if ! ok {
stateSize += len ( streamID )
}
rows = append ( rows , row )
streamID = strings . Clone ( streamID )
m [ streamID ] = rows
}
shard . stateSizeBudget -= stateSize
}
func ( shard * pipeStreamContextProcessorShard ) getM ( ) map [ string ] [ ] streamContextRow {
if shard . m == nil {
shard . m = make ( map [ string ] [ ] streamContextRow )
}
return shard . m
}
func ( pcp * pipeStreamContextProcessor ) writeBlock ( workerID uint , br * blockResult ) {
2024-09-25 14:16:53 +00:00
if br . rowsLen == 0 {
2024-06-28 17:14:29 +00:00
return
}
shard := & pcp . shards [ workerID ]
for shard . stateSizeBudget < 0 {
// steal some budget for the state size from the global budget.
remaining := pcp . stateSizeBudget . Add ( - stateSizeBudgetChunk )
if remaining < 0 {
// The state size is too big. Stop processing data in order to avoid OOM crash.
if remaining + stateSizeBudgetChunk >= 0 {
// Notify worker goroutines to stop calling writeBlock() in order to save CPU time.
pcp . cancel ( )
}
return
}
shard . stateSizeBudget += stateSizeBudgetChunk
}
shard . writeBlock ( br )
}
func ( pcp * pipeStreamContextProcessor ) flush ( ) error {
n := pcp . stateSizeBudget . Load ( )
if n <= 0 {
return fmt . Errorf ( "cannot calculate [%s], since it requires more than %dMB of memory" , pcp . pc . String ( ) , pcp . maxStateSize / ( 1 << 20 ) )
}
if n > math . MaxInt {
logger . Panicf ( "BUG: stateSizeBudget shouldn't exceed math.MaxInt=%v; got %d" , math . MaxInt , n )
}
stateSizeBudget := int ( n )
// merge state across shards
shards := pcp . shards
m := shards [ 0 ] . getM ( )
shards = shards [ 1 : ]
for i := range shards {
if needStop ( pcp . stopCh ) {
return nil
}
for streamID , rowsSrc := range shards [ i ] . getM ( ) {
rows , ok := m [ streamID ]
if ! ok {
m [ streamID ] = rowsSrc
} else {
m [ streamID ] = append ( rows , rowsSrc ... )
}
}
}
// write result
wctx := & pipeStreamContextWriteContext {
pcp : pcp ,
}
2024-09-27 09:15:43 +00:00
// write output contexts in the ascending order of rows
streamIDs := getStreamIDsSortedByMinRowTimestamp ( m )
for _ , streamID := range streamIDs {
rows := m [ streamID ]
2024-09-26 14:52:55 +00:00
streamRowss , err := pcp . getStreamRowss ( streamID , rows , stateSizeBudget )
2024-06-28 17:14:29 +00:00
if err != nil {
2024-09-26 14:52:55 +00:00
return err
2024-06-28 17:14:29 +00:00
}
if needStop ( pcp . stopCh ) {
return nil
}
2024-09-26 14:52:55 +00:00
// Write streamRows to the output.
for _ , streamRows := range streamRowss {
for _ , streamRow := range streamRows {
wctx . writeRow ( streamRow . fields )
}
2024-09-27 09:00:28 +00:00
if len ( m ) > 1 || len ( streamRowss ) > 1 {
2024-09-26 14:52:55 +00:00
lastRow := streamRows [ len ( streamRows ) - 1 ]
fields := newDelimiterRowFields ( lastRow , streamID )
wctx . writeRow ( fields )
2024-06-30 22:49:31 +00:00
}
2024-06-28 17:14:29 +00:00
}
}
2024-09-26 14:52:55 +00:00
wctx . flush ( )
2024-06-30 22:49:31 +00:00
return nil
2024-06-28 17:14:29 +00:00
}
2024-09-27 09:15:43 +00:00
func getStreamIDsSortedByMinRowTimestamp ( m map [ string ] [ ] streamContextRow ) [ ] string {
type streamTimestamp struct {
streamID string
timestamp int64
}
streamTimestamps := make ( [ ] streamTimestamp , 0 , len ( m ) )
for streamID , rows := range m {
minTimestamp := rows [ 0 ] . timestamp
for _ , r := range rows [ 1 : ] {
if r . timestamp < minTimestamp {
minTimestamp = r . timestamp
}
}
streamTimestamps = append ( streamTimestamps , streamTimestamp {
streamID : streamID ,
timestamp : minTimestamp ,
} )
}
sort . Slice ( streamTimestamps , func ( i , j int ) bool {
return streamTimestamps [ i ] . timestamp < streamTimestamps [ j ] . timestamp
} )
streamIDs := make ( [ ] string , len ( streamTimestamps ) )
for i := range streamIDs {
streamIDs [ i ] = streamTimestamps [ i ] . streamID
}
return streamIDs
}
2024-09-26 14:52:55 +00:00
func newDelimiterRowFields ( r * streamContextRow , streamID string ) [ ] Field {
return [ ] Field {
{
Name : "_time" ,
Value : string ( marshalTimestampRFC3339NanoString ( nil , r . timestamp + 1 ) ) ,
} ,
{
Name : "_stream_id" ,
Value : streamID ,
} ,
{
Name : "_stream" ,
Value : getFieldValue ( r . fields , "_stream" ) ,
} ,
{
Name : "_msg" ,
Value : "---" ,
} ,
2024-06-28 17:14:29 +00:00
}
}
type pipeStreamContextWriteContext struct {
pcp * pipeStreamContextProcessor
rcs [ ] resultColumn
br blockResult
// rowsCount is the number of rows in the current block
rowsCount int
// valuesLen is the total length of values in the current block
valuesLen int
}
func ( wctx * pipeStreamContextWriteContext ) writeRow ( rowFields [ ] Field ) {
rcs := wctx . rcs
areEqualColumns := len ( rcs ) == len ( rowFields )
if areEqualColumns {
for i , f := range rowFields {
if rcs [ i ] . name != f . Name {
areEqualColumns = false
break
}
}
}
if ! areEqualColumns {
// send the current block to ppNext and construct a block with new set of columns
wctx . flush ( )
rcs = wctx . rcs [ : 0 ]
for _ , f := range rowFields {
rcs = appendResultColumnWithName ( rcs , f . Name )
}
wctx . rcs = rcs
}
for i , f := range rowFields {
v := f . Value
rcs [ i ] . addValue ( v )
wctx . valuesLen += len ( v )
}
wctx . rowsCount ++
if wctx . valuesLen >= 1_000_000 {
wctx . flush ( )
}
}
func ( wctx * pipeStreamContextWriteContext ) flush ( ) {
rcs := wctx . rcs
br := & wctx . br
wctx . valuesLen = 0
// Flush rcs to ppNext
br . setResultColumns ( rcs , wctx . rowsCount )
wctx . rowsCount = 0
wctx . pcp . ppNext . writeBlock ( 0 , br )
br . reset ( )
for i := range rcs {
rcs [ i ] . resetValues ( )
}
}
func parsePipeStreamContext ( lex * lexer ) ( * pipeStreamContext , error ) {
if ! lex . isKeyword ( "stream_context" ) {
return nil , fmt . Errorf ( "expecting 'stream_context'; got %q" , lex . token )
}
lex . nextToken ( )
2024-07-01 23:37:46 +00:00
linesBefore , linesAfter , err := parsePipeStreamContextBeforeAfter ( lex )
if err != nil {
return nil , err
2024-06-28 17:14:29 +00:00
}
pc := & pipeStreamContext {
linesBefore : linesBefore ,
linesAfter : linesAfter ,
}
return pc , nil
}
2024-07-01 23:37:46 +00:00
func parsePipeStreamContextBeforeAfter ( lex * lexer ) ( int , int , error ) {
linesBefore := 0
linesAfter := 0
beforeSet := false
afterSet := false
for {
switch {
case lex . isKeyword ( "before" ) :
lex . nextToken ( )
f , s , err := parseNumber ( lex )
if err != nil {
return 0 , 0 , fmt . Errorf ( "cannot parse 'before' value in 'stream_context': %w" , err )
}
if f < 0 {
return 0 , 0 , fmt . Errorf ( "'before' value cannot be smaller than 0; got %q" , s )
}
linesBefore = int ( f )
beforeSet = true
case lex . isKeyword ( "after" ) :
lex . nextToken ( )
f , s , err := parseNumber ( lex )
if err != nil {
return 0 , 0 , fmt . Errorf ( "cannot parse 'after' value in 'stream_context': %w" , err )
}
if f < 0 {
return 0 , 0 , fmt . Errorf ( "'after' value cannot be smaller than 0; got %q" , s )
}
linesAfter = int ( f )
afterSet = true
default :
if ! beforeSet && ! afterSet {
return 0 , 0 , fmt . Errorf ( "missing 'before N' or 'after N' in 'stream_context'" )
}
return linesBefore , linesAfter , nil
}
}
}
2024-09-26 14:52:55 +00:00
type streamContextRowsHeapMax [ ] * streamContextRow
func ( h * streamContextRowsHeapMax ) Len ( ) int {
return len ( * h )
}
func ( h * streamContextRowsHeapMax ) Less ( i , j int ) bool {
a := * h
return a [ i ] . timestamp > a [ j ] . timestamp
}
func ( h * streamContextRowsHeapMax ) Swap ( i , j int ) {
a := * h
a [ i ] , a [ j ] = a [ j ] , a [ i ]
}
func ( h * streamContextRowsHeapMax ) Push ( v any ) {
x := v . ( * streamContextRow )
* h = append ( * h , x )
}
func ( h * streamContextRowsHeapMax ) Pop ( ) any {
a := * h
x := a [ len ( a ) - 1 ]
a [ len ( a ) - 1 ] = nil
* h = a [ : len ( a ) - 1 ]
return x
}
type streamContextRowsHeapMin streamContextRowsHeapMax
func ( h * streamContextRowsHeapMin ) Len ( ) int {
return len ( * h )
}
func ( h * streamContextRowsHeapMin ) Less ( i , j int ) bool {
a := * h
return a [ i ] . timestamp < a [ j ] . timestamp
}
func ( h * streamContextRowsHeapMin ) Swap ( i , j int ) {
a := * h
a [ i ] , a [ j ] = a [ j ] , a [ i ]
}
func ( h * streamContextRowsHeapMin ) Push ( v any ) {
x := v . ( * streamContextRow )
* h = append ( * h , x )
}
func ( h * streamContextRowsHeapMin ) Pop ( ) any {
a := * h
x := a [ len ( a ) - 1 ]
a [ len ( a ) - 1 ] = nil
* h = a [ : len ( a ) - 1 ]
return x
}