package logstorage import ( "fmt" "unsafe" "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" ) // pipeDropEmptyFields processes '| drop_empty_fields ...' pipe. // // See https://docs.victoriametrics.com/victorialogs/logsql/#drop_empty_fields-pipe type pipeDropEmptyFields struct { } func (pd *pipeDropEmptyFields) String() string { return "drop_empty_fields" } func (pd *pipeDropEmptyFields) canLiveTail() bool { return true } func (pd *pipeDropEmptyFields) optimize() { // nothing to do } func (pd *pipeDropEmptyFields) hasFilterInWithQuery() bool { return false } func (pd *pipeDropEmptyFields) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) { return pd, nil } func (pd *pipeDropEmptyFields) updateNeededFields(_, _ fieldsSet) { // nothing to do } func (pd *pipeDropEmptyFields) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor { return &pipeDropEmptyFieldsProcessor{ ppNext: ppNext, shards: make([]pipeDropEmptyFieldsProcessorShard, workersCount), } } type pipeDropEmptyFieldsProcessor struct { ppNext pipeProcessor shards []pipeDropEmptyFieldsProcessorShard } type pipeDropEmptyFieldsProcessorShard struct { pipeDropEmptyFieldsProcessorShardNopad // The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 . _ [128 - unsafe.Sizeof(pipeDropEmptyFieldsProcessorShardNopad{})%128]byte } type pipeDropEmptyFieldsProcessorShardNopad struct { columnValues [][]string fields []Field wctx pipeDropEmptyFieldsWriteContext } func (pdp *pipeDropEmptyFieldsProcessor) writeBlock(workerID uint, br *blockResult) { if len(br.timestamps) == 0 { return } shard := &pdp.shards[workerID] cs := br.getColumns() shard.columnValues = slicesutil.SetLength(shard.columnValues, len(cs)) columnValues := shard.columnValues for i, c := range cs { columnValues[i] = c.getValues(br) } if !hasEmptyValues(columnValues) { // Fast path - just write br to ppNext, since it has no empty values. pdp.ppNext.writeBlock(workerID, br) return } // Slow path - drop fields with empty values shard.wctx.init(workerID, pdp.ppNext) fields := shard.fields for rowIdx := range br.timestamps { fields = fields[:0] for i, values := range columnValues { v := values[rowIdx] if v == "" { continue } fields = append(fields, Field{ Name: cs[i].name, Value: values[rowIdx], }) } shard.wctx.writeRow(fields) } shard.fields = fields shard.wctx.flush() } func (pdp *pipeDropEmptyFieldsProcessor) flush() error { return nil } type pipeDropEmptyFieldsWriteContext struct { workerID uint ppNext pipeProcessor rcs []resultColumn br blockResult // rowsCount is the number of rows in the current block rowsCount int // valuesLen is the total length of values in the current block valuesLen int } func (wctx *pipeDropEmptyFieldsWriteContext) reset() { wctx.workerID = 0 wctx.ppNext = nil rcs := wctx.rcs for i := range rcs { rcs[i].reset() } wctx.rcs = rcs[:0] wctx.rowsCount = 0 wctx.valuesLen = 0 } func (wctx *pipeDropEmptyFieldsWriteContext) init(workerID uint, ppNext pipeProcessor) { wctx.reset() wctx.workerID = workerID wctx.ppNext = ppNext } func (wctx *pipeDropEmptyFieldsWriteContext) writeRow(fields []Field) { if len(fields) == 0 { // skip rows without non-empty fields return } rcs := wctx.rcs areEqualColumns := len(rcs) == len(fields) if areEqualColumns { for i, f := range fields { if rcs[i].name != f.Name { areEqualColumns = false break } } } if !areEqualColumns { // send the current block to ppNext and construct a block with new set of columns wctx.flush() rcs = wctx.rcs[:0] for _, f := range fields { rcs = appendResultColumnWithName(rcs, f.Name) } wctx.rcs = rcs } for i, f := range fields { v := f.Value rcs[i].addValue(v) wctx.valuesLen += len(v) } wctx.rowsCount++ if wctx.valuesLen >= 1_000_000 { wctx.flush() } } func (wctx *pipeDropEmptyFieldsWriteContext) flush() { rcs := wctx.rcs wctx.valuesLen = 0 // Flush rcs to ppNext br := &wctx.br br.setResultColumns(rcs, wctx.rowsCount) wctx.rowsCount = 0 wctx.ppNext.writeBlock(wctx.workerID, br) br.reset() for i := range rcs { rcs[i].resetValues() } } func parsePipeDropEmptyFields(lex *lexer) (*pipeDropEmptyFields, error) { if !lex.isKeyword("drop_empty_fields") { return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "drop_empty_fields") } lex.nextToken() pd := &pipeDropEmptyFields{} return pd, nil } func hasEmptyValues(columnValues [][]string) bool { for _, values := range columnValues { for _, v := range values { if v == "" { return true } } } return false }