mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-31 15:06:26 +00:00
wip
This commit is contained in:
parent
30a6aee52a
commit
38b9213a0c
1 changed files with 20 additions and 5 deletions
|
@ -92,17 +92,32 @@ func (pep *pipeExtractProcessor) writeBlock(workerID uint, br *blockResult) {
|
|||
}
|
||||
|
||||
shard := &pep.shards[workerID]
|
||||
c := br.getColumnByName(pep.pe.fromField)
|
||||
values := c.getValues(br)
|
||||
|
||||
ef := shard.ef
|
||||
rcs := shard.rcs
|
||||
for _, v := range values {
|
||||
|
||||
c := br.getColumnByName(pep.pe.fromField)
|
||||
if c.isConst {
|
||||
v := c.valuesEncoded[0]
|
||||
ef.apply(v)
|
||||
for i, f := range ef.fields {
|
||||
rcs[i].addValue(*f.value)
|
||||
fieldValue := *f.value
|
||||
rc := &rcs[i]
|
||||
for range br.timestamps {
|
||||
rc.addValue(fieldValue)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
values := c.getValues(br)
|
||||
for i, v := range values {
|
||||
if i == 0 || values[i-1] != v {
|
||||
ef.apply(v)
|
||||
}
|
||||
for j, f := range ef.fields {
|
||||
rcs[j].addValue(*f.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
br.addResultColumns(rcs)
|
||||
pep.ppBase.writeBlock(workerID, br)
|
||||
|
||||
|
|
Loading…
Reference in a new issue