diff --git a/lib/logstorage/pipe_extract.go b/lib/logstorage/pipe_extract.go index 7a8631e8d..9501f9fcc 100644 --- a/lib/logstorage/pipe_extract.go +++ b/lib/logstorage/pipe_extract.go @@ -92,17 +92,32 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) { } shard := &pep.shards[workerID] - c := br.getColumnByName(pep.pe.fromField) - values := c.getValues(br) - ef := shard.ef rcs := shard.rcs - for _, v := range values { + + c := br.getColumnByName(pep.pe.fromField) + if c.isConst { + v := c.valuesEncoded[0] ef.apply(v) for i, f := range ef.fields { - rcs[i].addValue(*f.value) + fieldValue := *f.value + rc := &rcs[i] + for range br.timestamps { + rc.addValue(fieldValue) + } + } + } else { + values := c.getValues(br) + for i, v := range values { + if i == 0 || values[i-1] != v { + ef.apply(v) + } + for j, f := range ef.fields { + rcs[j].addValue(*f.value) + } } } + br.addResultColumns(rcs) pep.ppBase.writeBlock(workerID, br)