mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/logstorage: add block_stats
pipe for analyzing per-block storage stats
This commit is contained in:
parent
f9e23bf8e3
commit
5ed54ebadf
11 changed files with 366 additions and 7 deletions
|
@ -18,7 +18,9 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
|
|||
* FEATURE: add [`join` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#join-pipe), which can be used for performing SQL-like joins.
|
||||
* FEATURE: support returning historical logs from [live tailing API](https://docs.victoriametrics.com/victorialogs/querying/#live-tailing) via `start_offset` query arg. For example, request to `/select/logsql/tail?query=*&start_offset=5m` returns logs for the last 5 minutes before starting returning live tailing logs for the given `query`.
|
||||
* FEATURE: add an ability to specify extra fields for logs ingested via [HTTP-based data ingestion protocols](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-apis). See `extra_fields` query arg and `VL-Extra-Fields` HTTP header in [these docs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#http-parameters).
|
||||
* BUGFIX: Properly parse structured metadata when ingesting logs with Loki ingestion protocol. An issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details.
|
||||
* FEATURE: add [`block_stats` pipe](https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe) for returning various per-block stats. This pipe is useful for debugging.
|
||||
|
||||
* BUGFIX: properly parse structured metadata when ingesting logs with [Loki ingestion protocol](https://docs.victoriametrics.com/victorialogs/data-ingestion/promtail/). The issue has been introduced in [v0.3.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.3.0-victorialogs). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7431) for the details.
|
||||
|
||||
## [v0.40.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.40.0-victorialogs)
|
||||
|
||||
|
|
|
@ -1293,6 +1293,7 @@ _time:5m | stats by (_stream) count() per_stream_logs | sort by (per_stream_logs
|
|||
|
||||
LogsQL supports the following pipes:
|
||||
|
||||
- [`block_stats`](#block_stats-pipe) returns various stats for the selected blocks with logs.
|
||||
- [`blocks_count`](#blocks_count-pipe) counts the number of blocks with logs processed by the query.
|
||||
- [`copy`](#copy-pipe) copies [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`delete`](#delete-pipe) deletes [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
|
@ -1325,10 +1326,34 @@ LogsQL supports the following pipes:
|
|||
- [`unpack_syslog`](#unpack_syslog-pipe) unpacks [syslog](https://en.wikipedia.org/wiki/Syslog) messages from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
- [`unroll`](#unroll-pipe) unrolls JSON arrays from [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model).
|
||||
|
||||
### block_stats pipe
|
||||
|
||||
`<q> | block_stats` [pipe](#pipes) returns the following stats per each block processed by `<q>`. This pipe is needed mostly for debugging.
|
||||
|
||||
The returned per-block stats:
|
||||
|
||||
- `field` - [field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) name
|
||||
- `rows` - the number of rows at the given `field.
|
||||
- `type` - internal storage type for the given `field`
|
||||
- `values_bytes` - on-disk size of the data for the given `field`
|
||||
- `bloom_bytes` - on-disk size of bloom filter data for the given `field`
|
||||
- `dict_bytes` - on-disk size of the dictionary data for the given `field`
|
||||
- `dict_items` - the number of unique values in the dictionary for the given `field`
|
||||
|
||||
See also:
|
||||
|
||||
- [`blocks_count` pipe](#blocks_count-pipe)
|
||||
- [`len` pipe](#len-pipe)
|
||||
|
||||
### blocks_count pipe
|
||||
|
||||
`<q> | blocks_count` [pipe](#pipes) counts the number of blocks with logs processed by `<q>`. This pipe is needed mostly for debugging.
|
||||
|
||||
See also:
|
||||
|
||||
- [`block_stats` pipe](#block_stats-pipe)
|
||||
- [`len` pipe](#len-pipe)
|
||||
|
||||
### copy pipe
|
||||
|
||||
If some [log fields](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) must be copied, then `| copy src1 as dst1, ..., srcN as dstN` [pipe](#pipes) can be used.
|
||||
|
@ -1820,6 +1845,7 @@ See also:
|
|||
- [`sum_len` stats function](#sum_len-stats)
|
||||
- [`sort` pipe](#sort-pipe)
|
||||
- [`limit` pipe](#limit-pipe)
|
||||
- [`block_stats` pipe](#block_stats-pipe)
|
||||
|
||||
### limit pipe
|
||||
|
||||
|
|
|
@ -243,14 +243,25 @@ func marshalBytesBlock(dst, src []byte) []byte {
|
|||
|
||||
// Compress the block
|
||||
dst = append(dst, marshalBytesTypeZSTD)
|
||||
compressLevel := getCompressLevel(len(src))
|
||||
bb := bbPool.Get()
|
||||
bb.B = encoding.CompressZSTDLevel(bb.B[:0], src, 1)
|
||||
bb.B = encoding.CompressZSTDLevel(bb.B[:0], src, compressLevel)
|
||||
dst = encoding.MarshalVarUint64(dst, uint64(len(bb.B)))
|
||||
dst = append(dst, bb.B...)
|
||||
bbPool.Put(bb)
|
||||
return dst
|
||||
}
|
||||
|
||||
func getCompressLevel(dataLen int) int {
|
||||
if dataLen <= 512 {
|
||||
return 1
|
||||
}
|
||||
if dataLen <= 4*1024 {
|
||||
return 2
|
||||
}
|
||||
return 3
|
||||
}
|
||||
|
||||
func unmarshalBytesBlock(dst, src []byte) ([]byte, []byte, error) {
|
||||
if len(src) < 1 {
|
||||
return dst, src, fmt.Errorf("cannot unmarshal block type from empty src")
|
||||
|
|
|
@ -349,9 +349,11 @@ func (q *Query) Clone(timestamp int64) *Query {
|
|||
func (q *Query) CanReturnLastNResults() bool {
|
||||
for _, p := range q.pipes {
|
||||
switch p.(type) {
|
||||
case *pipeBlocksCount,
|
||||
case *pipeBlockStats,
|
||||
*pipeBlocksCount,
|
||||
*pipeFieldNames,
|
||||
*pipeFieldValues,
|
||||
*pipeJoin,
|
||||
*pipeLimit,
|
||||
*pipeOffset,
|
||||
*pipeTop,
|
||||
|
|
|
@ -982,6 +982,9 @@ func TestParseQuerySuccess(t *testing.T) {
|
|||
f(`* | field_values x`, `* | field_values x`)
|
||||
f(`* | field_values (x)`, `* | field_values x`)
|
||||
|
||||
// block_stats pipe
|
||||
f(`foo | block_stats`, `foo | block_stats`)
|
||||
|
||||
// blocks_count pipe
|
||||
f(`foo | blocks_count as x`, `foo | blocks_count as x`)
|
||||
f(`foo | blocks_count y`, `foo | blocks_count as y`)
|
||||
|
@ -1222,6 +1225,11 @@ func TestParseQuerySuccess(t *testing.T) {
|
|||
f(`* | unpack_logfmt from x`, `* | unpack_logfmt from x`)
|
||||
f(`* | unpack_logfmt from x result_prefix y`, `* | unpack_logfmt from x result_prefix y`)
|
||||
|
||||
// join pipe
|
||||
f(`* | join by (x) (foo:bar)`, `* | join by (x) (foo:bar)`)
|
||||
f(`* | join on (x, y) (foo:bar)`, `* | join by (x, y) (foo:bar)`)
|
||||
f(`* | join (x, y) (foo:bar)`, `* | join by (x, y) (foo:bar)`)
|
||||
|
||||
// multiple different pipes
|
||||
f(`* | fields foo, bar | limit 100 | stats by(foo,bar) count(baz) as qwert`, `* | fields foo, bar | limit 100 | stats by (foo, bar) count(baz) as qwert`)
|
||||
f(`* | skip 100 | head 20 | skip 10`, `* | offset 100 | limit 20 | offset 10`)
|
||||
|
@ -1504,6 +1512,11 @@ func TestParseQueryFailure(t *testing.T) {
|
|||
f(`foo | field_names x y`)
|
||||
f(`foo | field_names x, y`)
|
||||
|
||||
// invalid block_stats
|
||||
f(`foo | block_stats foo`)
|
||||
f(`foo | block_stats ()`)
|
||||
f(`foo | block_stats (foo)`)
|
||||
|
||||
// invalid blocks_count
|
||||
f(`foo | blocks_count |`)
|
||||
f(`foo | blocks_count (`)
|
||||
|
@ -1894,6 +1907,10 @@ func TestQueryGetNeededColumns(t *testing.T) {
|
|||
f(`* | fields x,y | field_names as bar | fields baz`, `x,y`, ``)
|
||||
f(`* | rm x,y | field_names as bar | fields baz`, `*`, `x,y`)
|
||||
|
||||
f(`* | block_stats`, `*`, ``)
|
||||
f(`* | block_stats | fields foo`, `*`, ``)
|
||||
f(`* | block_stats | rm foo`, `*`, ``)
|
||||
|
||||
f(`* | blocks_count as foo`, ``, ``)
|
||||
f(`* | blocks_count foo | fields bar`, ``, ``)
|
||||
f(`* | blocks_count foo | fields foo`, ``, ``)
|
||||
|
@ -2014,6 +2031,7 @@ func TestQueryGetNeededColumns(t *testing.T) {
|
|||
f(`* | extract_regexp if (q:w p:a) "(?P<f1>.*)bar" from x | count() r1`, `p,q`, ``)
|
||||
f(`* | field_names | count() r1`, ``, ``)
|
||||
f(`* | limit 10 | field_names as abc | count() r1`, `*`, ``)
|
||||
f(`* | block_stats | count() r1`, `*`, ``)
|
||||
f(`* | blocks_count | count() r1`, ``, ``)
|
||||
f(`* | limit 10 | blocks_count as abc | count() r1`, ``, ``)
|
||||
f(`* | fields a, b | count() r1`, ``, ``)
|
||||
|
@ -2117,10 +2135,12 @@ func TestQueryCanReturnLastNResults(t *testing.T) {
|
|||
f("* | limit 10", false)
|
||||
f("* | offset 10", false)
|
||||
f("* | uniq (x)", false)
|
||||
f("* | block_stats", false)
|
||||
f("* | blocks_count", false)
|
||||
f("* | field_names", false)
|
||||
f("* | field_values x", false)
|
||||
f("* | top 5 by (x)", false)
|
||||
f("* | join by (x) (foo)", false)
|
||||
|
||||
}
|
||||
|
||||
|
@ -2144,6 +2164,7 @@ func TestQueryCanLiveTail(t *testing.T) {
|
|||
f("* | drop_empty_fields", true)
|
||||
f("* | extract 'foo<bar>baz'", true)
|
||||
f("* | extract_regexp 'foo(?P<bar>baz)'", true)
|
||||
f("* | block_stats", false)
|
||||
f("* | blocks_count a", false)
|
||||
f("* | field_names a", false)
|
||||
f("* | fields a, b", true)
|
||||
|
@ -2341,6 +2362,7 @@ func TestQueryGetStatsByFields_Failure(t *testing.T) {
|
|||
f(`foo | count() | drop_empty_fields`)
|
||||
f(`foo | count() | extract "foo<bar>baz"`)
|
||||
f(`foo | count() | extract_regexp "(?P<ip>([0-9]+[.]){3}[0-9]+)"`)
|
||||
f(`foo | count() | block_stats`)
|
||||
f(`foo | count() | blocks_count`)
|
||||
f(`foo | count() | field_names`)
|
||||
f(`foo | count() | field_values abc`)
|
||||
|
|
|
@ -99,6 +99,12 @@ func parsePipes(lex *lexer) ([]pipe, error) {
|
|||
|
||||
func parsePipe(lex *lexer) (pipe, error) {
|
||||
switch {
|
||||
case lex.isKeyword("block_stats"):
|
||||
ps, err := parsePipeBlockStats(lex)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot parse 'block_stats' pipe: %w", err)
|
||||
}
|
||||
return ps, nil
|
||||
case lex.isKeyword("blocks_count"):
|
||||
pc, err := parsePipeBlocksCount(lex)
|
||||
if err != nil {
|
||||
|
@ -302,6 +308,7 @@ func parsePipe(lex *lexer) (pipe, error) {
|
|||
|
||||
var pipeNames = func() map[string]struct{} {
|
||||
a := []string{
|
||||
"block_stats",
|
||||
"blocks_count",
|
||||
"copy", "cp",
|
||||
"delete", "del", "rm", "drop",
|
||||
|
|
218
lib/logstorage/pipe_block_stats.go
Normal file
218
lib/logstorage/pipe_block_stats.go
Normal file
|
@ -0,0 +1,218 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
)
|
||||
|
||||
// pipeBlockStats processes '| block_stats ...' pipe.
|
||||
//
|
||||
// See https://docs.victoriametrics.com/victorialogs/logsql/#block_stats-pipe
|
||||
type pipeBlockStats struct {
|
||||
}
|
||||
|
||||
func (ps *pipeBlockStats) String() string {
|
||||
return "block_stats"
|
||||
}
|
||||
|
||||
func (ps *pipeBlockStats) canLiveTail() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (ps *pipeBlockStats) optimize() {
|
||||
// nothing to do
|
||||
}
|
||||
|
||||
func (ps *pipeBlockStats) hasFilterInWithQuery() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (ps *pipeBlockStats) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
func (ps *pipeBlockStats) updateNeededFields(neededFields, unneededFields fieldsSet) {
|
||||
unneededFields.reset()
|
||||
neededFields.add("*")
|
||||
}
|
||||
|
||||
func (ps *pipeBlockStats) newPipeProcessor(workersCount int, _ <-chan struct{}, _ func(), ppNext pipeProcessor) pipeProcessor {
|
||||
return &pipeBlockStatsProcessor{
|
||||
ppNext: ppNext,
|
||||
|
||||
shards: make([]pipeBlockStatsProcessorShard, workersCount),
|
||||
}
|
||||
}
|
||||
|
||||
type pipeBlockStatsProcessor struct {
|
||||
ppNext pipeProcessor
|
||||
|
||||
shards []pipeBlockStatsProcessorShard
|
||||
}
|
||||
|
||||
type pipeBlockStatsProcessorShard struct {
|
||||
pipeBlockStatsProcessorShardNopad
|
||||
|
||||
// The padding prevents false sharing on widespread platforms with 128 mod (cache line size) = 0 .
|
||||
_ [128 - unsafe.Sizeof(pipeBlockStatsProcessorShardNopad{})%128]byte
|
||||
}
|
||||
|
||||
type pipeBlockStatsProcessorShardNopad struct {
|
||||
wctx pipeBlockStatsWriteContext
|
||||
}
|
||||
|
||||
func (psp *pipeBlockStatsProcessor) writeBlock(workerID uint, br *blockResult) {
|
||||
if br.rowsLen == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
shard := &psp.shards[workerID]
|
||||
shard.wctx.init(workerID, psp.ppNext, br.rowsLen)
|
||||
|
||||
cs := br.getColumns()
|
||||
for _, c := range cs {
|
||||
if c.isConst {
|
||||
shard.wctx.writeRow(c.name, "const", uint64(len(c.valuesEncoded[0])), 0, 0, 0)
|
||||
continue
|
||||
}
|
||||
if c.isTime {
|
||||
var blockSize uint64
|
||||
if br.bs != nil {
|
||||
blockSize = br.bs.bsw.bh.timestampsHeader.blockSize
|
||||
}
|
||||
shard.wctx.writeRow(c.name, "time", blockSize, 0, 0, 0)
|
||||
continue
|
||||
}
|
||||
if br.bs == nil {
|
||||
shard.wctx.writeRow(c.name, "inmemory", 0, 0, 0, 0)
|
||||
continue
|
||||
}
|
||||
|
||||
typ := c.valueType.String()
|
||||
ch := br.bs.getColumnHeader(c.name)
|
||||
dictSize := 0
|
||||
dictItemsCount := len(ch.valuesDict.values)
|
||||
if c.valueType == valueTypeDict {
|
||||
for _, v := range ch.valuesDict.values {
|
||||
dictSize += len(v)
|
||||
}
|
||||
}
|
||||
shard.wctx.writeRow(c.name, typ, ch.valuesSize, ch.bloomFilterSize, uint64(dictItemsCount), uint64(dictSize))
|
||||
}
|
||||
|
||||
shard.wctx.flush()
|
||||
shard.wctx.reset()
|
||||
}
|
||||
|
||||
func (psp *pipeBlockStatsProcessor) flush() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func parsePipeBlockStats(lex *lexer) (*pipeBlockStats, error) {
|
||||
if !lex.isKeyword("block_stats") {
|
||||
return nil, fmt.Errorf("unexpected token: %q; want %q", lex.token, "block_stats")
|
||||
}
|
||||
lex.nextToken()
|
||||
|
||||
ps := &pipeBlockStats{}
|
||||
|
||||
return ps, nil
|
||||
}
|
||||
|
||||
type pipeBlockStatsWriteContext struct {
|
||||
workerID uint
|
||||
ppNext pipeProcessor
|
||||
|
||||
a arena
|
||||
rowsLen int
|
||||
tmpBuf []byte
|
||||
|
||||
rcs []resultColumn
|
||||
br blockResult
|
||||
|
||||
// rowsCount is the number of rows in the current block
|
||||
rowsCount int
|
||||
}
|
||||
|
||||
func (wctx *pipeBlockStatsWriteContext) reset() {
|
||||
wctx.workerID = 0
|
||||
wctx.ppNext = nil
|
||||
|
||||
wctx.a.reset()
|
||||
wctx.rowsLen = 0
|
||||
wctx.tmpBuf = wctx.tmpBuf[:0]
|
||||
|
||||
rcs := wctx.rcs
|
||||
for i := range rcs {
|
||||
rcs[i].reset()
|
||||
}
|
||||
wctx.rcs = rcs[:0]
|
||||
|
||||
wctx.rowsCount = 0
|
||||
}
|
||||
|
||||
func (wctx *pipeBlockStatsWriteContext) init(workerID uint, ppNext pipeProcessor, rowsLen int) {
|
||||
wctx.reset()
|
||||
|
||||
wctx.workerID = workerID
|
||||
wctx.ppNext = ppNext
|
||||
|
||||
wctx.rowsLen = rowsLen
|
||||
}
|
||||
|
||||
func (wctx *pipeBlockStatsWriteContext) writeRow(columnName, columnType string, valuesSize, bloomSize, dictItems, dictSize uint64) {
|
||||
rcs := wctx.rcs
|
||||
if len(rcs) == 0 {
|
||||
wctx.rcs = slicesutil.SetLength(wctx.rcs, 7)
|
||||
rcs = wctx.rcs
|
||||
|
||||
rcs[0].name = "field"
|
||||
rcs[1].name = "type"
|
||||
rcs[2].name = "values_bytes"
|
||||
rcs[3].name = "bloom_bytes"
|
||||
rcs[4].name = "dict_items"
|
||||
rcs[5].name = "dict_bytes"
|
||||
rcs[6].name = "rows"
|
||||
}
|
||||
|
||||
wctx.addValue(&rcs[0], columnName)
|
||||
wctx.addValue(&rcs[1], columnType)
|
||||
wctx.addUint64Value(&rcs[2], valuesSize)
|
||||
wctx.addUint64Value(&rcs[3], bloomSize)
|
||||
wctx.addUint64Value(&rcs[4], dictItems)
|
||||
wctx.addUint64Value(&rcs[5], dictSize)
|
||||
wctx.addUint64Value(&rcs[6], uint64(wctx.rowsLen))
|
||||
|
||||
wctx.rowsCount++
|
||||
if len(wctx.a.b) >= 1_000_000 {
|
||||
wctx.flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (wctx *pipeBlockStatsWriteContext) addUint64Value(rc *resultColumn, n uint64) {
|
||||
wctx.tmpBuf = marshalUint64String(wctx.tmpBuf[:0], n)
|
||||
wctx.addValue(rc, bytesutil.ToUnsafeString(wctx.tmpBuf))
|
||||
}
|
||||
|
||||
func (wctx *pipeBlockStatsWriteContext) addValue(rc *resultColumn, v string) {
|
||||
vCopy := wctx.a.copyString(v)
|
||||
rc.addValue(vCopy)
|
||||
}
|
||||
|
||||
func (wctx *pipeBlockStatsWriteContext) flush() {
|
||||
rcs := wctx.rcs
|
||||
|
||||
// Flush rcs to ppNext
|
||||
br := &wctx.br
|
||||
br.setResultColumns(rcs, wctx.rowsCount)
|
||||
wctx.rowsCount = 0
|
||||
wctx.ppNext.writeBlock(wctx.workerID, br)
|
||||
br.reset()
|
||||
for i := range rcs {
|
||||
rcs[i].resetValues()
|
||||
}
|
||||
wctx.a.reset()
|
||||
}
|
41
lib/logstorage/pipe_block_stats_test.go
Normal file
41
lib/logstorage/pipe_block_stats_test.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package logstorage
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestParsePipeBlockStatsSuccess(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParsePipeSuccess(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`block_stats`)
|
||||
}
|
||||
|
||||
func TestParsePipeBlockStatsFailure(t *testing.T) {
|
||||
f := func(pipeStr string) {
|
||||
t.Helper()
|
||||
expectParsePipeFailure(t, pipeStr)
|
||||
}
|
||||
|
||||
f(`block_stats foo`)
|
||||
f(`block_stats ()`)
|
||||
f(`block_stats (foo)`)
|
||||
}
|
||||
|
||||
func TestPipeBlockStatsUpdateNeededFields(t *testing.T) {
|
||||
f := func(s string, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected string) {
|
||||
t.Helper()
|
||||
expectPipeNeededFields(t, s, neededFields, unneededFields, neededFieldsExpected, unneededFieldsExpected)
|
||||
}
|
||||
|
||||
// all the needed fields
|
||||
f("block_stats", "*", "", "*", "")
|
||||
|
||||
// all the needed fields, plus unneeded fields
|
||||
f("block_stats", "*", "f1,f2", "*", "")
|
||||
|
||||
// needed fields
|
||||
f("block_stats", "f1,f2", "", "*", "")
|
||||
}
|
|
@ -38,7 +38,7 @@ func (pj *pipeJoin) hasFilterInWithQuery() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (pj *pipeJoin) initFilterInValues(cache map[string][]string, getFieldValuesFunc getFieldValuesFunc) (pipe, error) {
|
||||
func (pj *pipeJoin) initFilterInValues(_ map[string][]string, _ getFieldValuesFunc) (pipe, error) {
|
||||
return pj, nil
|
||||
}
|
||||
|
||||
|
@ -122,6 +122,9 @@ func (pjp *pipeJoinProcessor) writeBlock(workerID uint, br *blockResult) {
|
|||
continue
|
||||
}
|
||||
for _, extraFields := range matchingRows {
|
||||
if needStop(pjp.stopCh) {
|
||||
return
|
||||
}
|
||||
shard.wctx.writeRow(rowIdx, extraFields)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -134,10 +134,10 @@ func (pup *pipeUnrollProcessor) writeBlock(workerID uint, br *blockResult) {
|
|||
|
||||
fields := shard.fields
|
||||
for rowIdx := 0; rowIdx < br.rowsLen; rowIdx++ {
|
||||
if needStop(pup.stopCh) {
|
||||
return
|
||||
}
|
||||
if bm.isSetBit(rowIdx) {
|
||||
if needStop(pup.stopCh) {
|
||||
return
|
||||
}
|
||||
shard.writeUnrolledFields(pu.fields, columnValues, rowIdx)
|
||||
} else {
|
||||
fields = fields[:0]
|
||||
|
|
|
@ -55,6 +55,33 @@ const (
|
|||
valueTypeTimestampISO8601 = valueType(9)
|
||||
)
|
||||
|
||||
func (t valueType) String() string {
|
||||
switch t {
|
||||
case valueTypeUnknown:
|
||||
return "unknown"
|
||||
case valueTypeString:
|
||||
return "string"
|
||||
case valueTypeDict:
|
||||
return "dict"
|
||||
case valueTypeUint8:
|
||||
return "uint8"
|
||||
case valueTypeUint16:
|
||||
return "uint16"
|
||||
case valueTypeUint32:
|
||||
return "uint32"
|
||||
case valueTypeUint64:
|
||||
return "uint64"
|
||||
case valueTypeFloat64:
|
||||
return "float64"
|
||||
case valueTypeIPv4:
|
||||
return "ipv4"
|
||||
case valueTypeTimestampISO8601:
|
||||
return "iso8601"
|
||||
default:
|
||||
return fmt.Sprintf("unknown valueType=%d", t)
|
||||
}
|
||||
}
|
||||
|
||||
type valuesEncoder struct {
|
||||
// buf contains data for values.
|
||||
buf []byte
|
||||
|
|
Loading…
Reference in a new issue