Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-11-04 19:10:41 +02:00
commit 8d8e2ccf5f
19 changed files with 398 additions and 28 deletions

View file

@ -1,6 +1,12 @@
# tip
* FEATURE: reduce memory usage when query touches big number of time series.
* FEATURE: vmagent: reduce memory usage when `kubernetes_sd_config` discovers big number of scrape targets (e.g. hundreds of thouthands) and the majority of these targets (99%)
are dropped during relabeling. Previously labels for all the dropped targets were displayed at `/api/v1/targets` page. Now only up to `-promscrape.maxDroppedTargets` such
targets are displayed. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/878 for details.
# [v1.45.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.45.0)
* FEATURE: allow setting `-retentionPeriod` smaller than one month. I.e. `-retentionPeriod=3d`, `-retentionPeriod=2w`, etc. is supported now.

View file

@ -231,6 +231,9 @@ This information may be useful for debugging target relabeling.
by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets`
and `http://vmagent-host:8429/api/v1/targets`.
* The `/api/v1/targets` page could be useful for debugging relabeling process for scrape targets.
This page contains original labels for targets dropped during relabeling (see "droppedTargets" section in the page output). By default up to `-promscrape.maxDroppedTargets` targets are shown here. If your setup drops more targets during relabeling, then increase `-promscrape.maxDroppedTargets` command-line flag value in order to see all the dropped targets. Note that tracking each dropped target requires up to 10Kb of RAM, so big values for `-promscrape.maxDroppedTargets` may result in increased memory usage if big number of scrape targets are dropped during relabeling.
* If `vmagent` scrapes targets with millions of metrics per each target (for instance, when scraping [federation endpoints](https://prometheus.io/docs/prometheus/latest/federation/)),
then it is recommended enabling `stream parsing mode` in order to reduce memory usage during scraping. This mode may be enabled either globally for all the scrape targets
by passing `-promscrape.streamParse` command-line flag or on a per-scrape target basis with `stream_parse: true` option. For example:

View file

@ -231,12 +231,12 @@ again:
return
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_requests_total{url=%q, status_code="%d"}`, c.sanitizedURL, statusCode)).Inc()
if statusCode/100 == 4 {
// Just drop block on 4xx status code like Prometheus does.
if statusCode == 409 {
// Just drop block on 409 status code like Prometheus does.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/873
body, _ := ioutil.ReadAll(resp.Body)
_ = resp.Body.Close()
logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block for 4XX status code like Prometheus does; "+
logger.Errorf("unexpected status code received when sending a block with size %d bytes to %q: #%d; dropping the block like Prometheus does; "+
"response body=%q", len(block), c.sanitizedURL, statusCode, body)
c.packetsDropped.Inc()
return

View file

@ -10,10 +10,12 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/graphite"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/prometheus"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
@ -45,6 +47,9 @@ func getDefaultMaxConcurrentRequests() int {
// Init initializes vmselect
func Init() {
tmpDirPath := *vmstorage.DataPath + "/tmp"
fs.RemoveDirContents(tmpDirPath)
netstorage.InitTmpBlocksDir(tmpDirPath)
promql.InitRollupResultCache(*vmstorage.DataPath + "/cache/rollupResult")
concurrencyCh = make(chan struct{}, *maxConcurrentRequests)

View file

@ -58,6 +58,7 @@ type Results struct {
packedTimeseries []packedTimeseries
sr *storage.Search
tbf *tmpBlocksFile
}
// Len returns the number of results in rss.
@ -73,6 +74,8 @@ func (rss *Results) Cancel() {
func (rss *Results) mustClose() {
putStorageSearch(rss.sr)
rss.sr = nil
putTmpBlocksFile(rss.tbf)
rss.tbf = nil
}
var timeseriesWorkCh = make(chan *timeseriesWork, gomaxprocs*16)
@ -106,7 +109,7 @@ func timeseriesWorker(workerID uint) {
tsw.doneCh <- nil
continue
}
if err := tsw.pts.Unpack(&rs, rss.tr, rss.fetchData); err != nil {
if err := tsw.pts.Unpack(&rs, rss.tbf, rss.tr, rss.fetchData); err != nil {
tsw.doneCh <- fmt.Errorf("error during time series unpacking: %w", err)
continue
}
@ -180,7 +183,7 @@ var gomaxprocs = runtime.GOMAXPROCS(-1)
type packedTimeseries struct {
metricName string
brs []storage.BlockRef
brs []blockRef
pd *promData
}
@ -192,21 +195,23 @@ type promData struct {
var unpackWorkCh = make(chan *unpackWork, gomaxprocs*128)
type unpackWorkItem struct {
br storage.BlockRef
br blockRef
tr storage.TimeRange
}
type unpackWork struct {
tbf *tmpBlocksFile
ws []unpackWorkItem
sbs []*sortBlock
doneCh chan error
}
func (upw *unpackWork) reset() {
upw.tbf = nil
ws := upw.ws
for i := range ws {
w := &ws[i]
w.br = storage.BlockRef{}
w.br = blockRef{}
w.tr = storage.TimeRange{}
}
upw.ws = upw.ws[:0]
@ -223,7 +228,7 @@ func (upw *unpackWork) reset() {
func (upw *unpackWork) unpack(tmpBlock *storage.Block) {
for _, w := range upw.ws {
sb := getSortBlock()
if err := sb.unpackFrom(tmpBlock, w.br, w.tr); err != nil {
if err := sb.unpackFrom(tmpBlock, upw.tbf, w.br, w.tr); err != nil {
putSortBlock(sb)
upw.doneCh <- fmt.Errorf("cannot unpack block: %w", err)
return
@ -269,7 +274,7 @@ func unpackWorker() {
var unpackBatchSize = 8 * runtime.GOMAXPROCS(-1)
// Unpack unpacks pts to dst.
func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData bool) error {
func (pts *packedTimeseries) Unpack(dst *Result, tbf *tmpBlocksFile, tr storage.TimeRange, fetchData bool) error {
dst.reset()
if err := dst.MetricName.Unmarshal(bytesutil.ToUnsafeBytes(pts.metricName)); err != nil {
return fmt.Errorf("cannot unmarshal metricName %q: %w", pts.metricName, err)
@ -283,11 +288,13 @@ func (pts *packedTimeseries) Unpack(dst *Result, tr storage.TimeRange, fetchData
brsLen := len(pts.brs)
upws := make([]*unpackWork, 0, 1+brsLen/unpackBatchSize)
upw := getUnpackWork()
upw.tbf = tbf
for _, br := range pts.brs {
if len(upw.ws) >= unpackBatchSize {
unpackWorkCh <- upw
upws = append(upws, upw)
upw = getUnpackWork()
upw.tbf = tbf
}
upw.ws = append(upw.ws, unpackWorkItem{
br: br,
@ -439,9 +446,10 @@ func (sb *sortBlock) reset() {
sb.NextIdx = 0
}
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, br storage.BlockRef, tr storage.TimeRange) error {
func (sb *sortBlock) unpackFrom(tmpBlock *storage.Block, tbf *tmpBlocksFile, br blockRef, tr storage.TimeRange) error {
tmpBlock.Reset()
br.MustReadBlock(tmpBlock, true)
brReal := tbf.MustReadBlockRefAt(br.partRef, br.addr)
brReal.MustReadBlock(tmpBlock, true)
if err := tmpBlock.UnmarshalData(); err != nil {
return fmt.Errorf("cannot unmarshal block: %w", err)
}
@ -786,19 +794,31 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
sr := getStorageSearch()
maxSeriesCount := sr.Init(vmstorage.Storage, tfss, tr, *maxMetricsPerSearch, deadline.Deadline())
m := make(map[string][]storage.BlockRef, maxSeriesCount)
m := make(map[string][]blockRef, maxSeriesCount)
orderedMetricNames := make([]string, 0, maxSeriesCount)
blocksRead := 0
tbf := getTmpBlocksFile()
var buf []byte
for sr.NextMetricBlock() {
blocksRead++
if deadline.Exceeded() {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
return nil, fmt.Errorf("timeout exceeded while fetching data block #%d from storage: %s", blocksRead, deadline.String())
}
buf = sr.MetricBlockRef.BlockRef.Marshal(buf[:0])
addr, err := tbf.WriteBlockRefData(buf)
if err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
return nil, fmt.Errorf("cannot write %d bytes to temporary file: %w", len(buf), err)
}
metricName := sr.MetricBlockRef.MetricName
brs := m[string(metricName)]
brs = append(brs, *sr.MetricBlockRef.BlockRef)
brs = append(brs, blockRef{
partRef: sr.MetricBlockRef.BlockRef.PartRef(),
addr: addr,
})
if len(brs) > 1 {
// An optimization: do not allocate a string for already existing metricName key in m
m[string(metricName)] = brs
@ -810,12 +830,18 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
}
}
if err := sr.Error(); err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
if errors.Is(err, storage.ErrDeadlineExceeded) {
return nil, fmt.Errorf("timeout exceeded during the query: %s", deadline.String())
}
return nil, fmt.Errorf("search error after reading %d data blocks: %w", blocksRead, err)
}
if err := tbf.Finalize(); err != nil {
putTmpBlocksFile(tbf)
putStorageSearch(sr)
return nil, fmt.Errorf("cannot finalize temporary file: %w", err)
}
// Fetch data from promdb.
pm := make(map[string]*promData)
@ -850,9 +876,15 @@ func ProcessSearchQuery(sq *storage.SearchQuery, fetchData bool, deadline search
}
rss.packedTimeseries = pts
rss.sr = sr
rss.tbf = tbf
return &rss, nil
}
type blockRef struct {
partRef storage.PartRef
addr tmpBlockAddr
}
func setupTfss(tagFilterss [][]storage.TagFilter) ([]*storage.TagFilters, error) {
tfss := make([]*storage.TagFilters, 0, len(tagFilterss))
for _, tagFilters := range tagFilterss {

View file

@ -0,0 +1,188 @@
package netstorage
import (
"fmt"
"io/ioutil"
"os"
"sync"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/metrics"
)
// InitTmpBlocksDir initializes directory to store temporary search results.
//
// It stores data in system-defined temporary directory if tmpDirPath is empty.
func InitTmpBlocksDir(tmpDirPath string) {
if len(tmpDirPath) == 0 {
tmpDirPath = os.TempDir()
}
tmpBlocksDir = tmpDirPath + "/searchResults"
fs.MustRemoveAll(tmpBlocksDir)
if err := fs.MkdirAllIfNotExist(tmpBlocksDir); err != nil {
logger.Panicf("FATAL: cannot create %q: %s", tmpBlocksDir, err)
}
}
var tmpBlocksDir string
func maxInmemoryTmpBlocksFile() int {
mem := memory.Allowed()
maxLen := mem / 1024
if maxLen < 64*1024 {
return 64 * 1024
}
if maxLen > 4*1024*1024 {
return 4 * 1024 * 1024
}
return maxLen
}
var _ = metrics.NewGauge(`vm_tmp_blocks_max_inmemory_file_size_bytes`, func() float64 {
return float64(maxInmemoryTmpBlocksFile())
})
type tmpBlocksFile struct {
buf []byte
f *os.File
r *fs.ReaderAt
offset uint64
}
func getTmpBlocksFile() *tmpBlocksFile {
v := tmpBlocksFilePool.Get()
if v == nil {
return &tmpBlocksFile{
buf: make([]byte, 0, maxInmemoryTmpBlocksFile()),
}
}
return v.(*tmpBlocksFile)
}
func putTmpBlocksFile(tbf *tmpBlocksFile) {
tbf.MustClose()
tbf.buf = tbf.buf[:0]
tbf.f = nil
tbf.r = nil
tbf.offset = 0
tmpBlocksFilePool.Put(tbf)
}
var tmpBlocksFilePool sync.Pool
type tmpBlockAddr struct {
offset uint64
size int
}
func (addr tmpBlockAddr) String() string {
return fmt.Sprintf("offset %d, size %d", addr.offset, addr.size)
}
var (
tmpBlocksFilesCreated = metrics.NewCounter(`vm_tmp_blocks_files_created_total`)
_ = metrics.NewGauge(`vm_tmp_blocks_files_directory_free_bytes`, func() float64 {
return float64(fs.MustGetFreeSpace(tmpBlocksDir))
})
)
// WriteBlockRefData writes br to tbf.
//
// It returns errors since the operation may fail on space shortage
// and this must be handled.
func (tbf *tmpBlocksFile) WriteBlockRefData(b []byte) (tmpBlockAddr, error) {
var addr tmpBlockAddr
addr.offset = tbf.offset
addr.size = len(b)
tbf.offset += uint64(addr.size)
if len(tbf.buf)+len(b) <= cap(tbf.buf) {
// Fast path - the data fits tbf.buf
tbf.buf = append(tbf.buf, b...)
return addr, nil
}
// Slow path: flush the data from tbf.buf to file.
if tbf.f == nil {
f, err := ioutil.TempFile(tmpBlocksDir, "")
if err != nil {
return addr, err
}
tbf.f = f
tmpBlocksFilesCreated.Inc()
}
_, err := tbf.f.Write(tbf.buf)
tbf.buf = append(tbf.buf[:0], b...)
if err != nil {
return addr, fmt.Errorf("cannot write block to %q: %w", tbf.f.Name(), err)
}
return addr, nil
}
func (tbf *tmpBlocksFile) Finalize() error {
if tbf.f == nil {
return nil
}
fname := tbf.f.Name()
if _, err := tbf.f.Write(tbf.buf); err != nil {
return fmt.Errorf("cannot write the remaining %d bytes to %q: %w", len(tbf.buf), fname, err)
}
tbf.buf = tbf.buf[:0]
r, err := fs.OpenReaderAt(fname)
if err != nil {
logger.Panicf("FATAL: cannot open %q: %s", fname, err)
}
// Hint the OS that the file is read almost sequentiallly.
// This should reduce the number of disk seeks, which is important
// for HDDs.
r.MustFadviseSequentialRead(true)
tbf.r = r
return nil
}
func (tbf *tmpBlocksFile) MustReadBlockRefAt(partRef storage.PartRef, addr tmpBlockAddr) storage.BlockRef {
var buf []byte
if tbf.f == nil {
buf = tbf.buf[addr.offset : addr.offset+uint64(addr.size)]
} else {
bb := tmpBufPool.Get()
defer tmpBufPool.Put(bb)
bb.B = bytesutil.Resize(bb.B, addr.size)
tbf.r.MustReadAt(bb.B, int64(addr.offset))
buf = bb.B
}
var br storage.BlockRef
if err := br.Init(partRef, buf); err != nil {
logger.Panicf("FATAL: cannot initialize BlockRef: %s", err)
}
return br
}
var tmpBufPool bytesutil.ByteBufferPool
func (tbf *tmpBlocksFile) MustClose() {
if tbf.f == nil {
return
}
if tbf.r != nil {
// tbf.r could be nil if Finalize wasn't called.
tbf.r.MustClose()
}
fname := tbf.f.Name()
// Remove the file at first, then close it.
// This way the OS shouldn't try to flush file contents to storage
// on close.
if err := os.Remove(fname); err != nil {
logger.Panicf("FATAL: cannot remove %q: %s", fname, err)
}
if err := tbf.f.Close(); err != nil {
logger.Panicf("FATAL: cannot close %q: %s", fname, err)
}
tbf.f = nil
}

View file

@ -231,6 +231,9 @@ This information may be useful for debugging target relabeling.
by passing `-promscrape.suppressScrapeErrors` command-line flag to `vmagent`. The most recent scrape error per each target can be observed at `http://vmagent-host:8429/targets`
and `http://vmagent-host:8429/api/v1/targets`.
* The `/api/v1/targets` page could be useful for debugging relabeling process for scrape targets.
This page contains original labels for targets dropped during relabeling (see "droppedTargets" section in the page output). By default up to `-promscrape.maxDroppedTargets` targets are shown here. If your setup drops more targets during relabeling, then increase `-promscrape.maxDroppedTargets` command-line flag value in order to see all the dropped targets. Note that tracking each dropped target requires up to 10Kb of RAM, so big values for `-promscrape.maxDroppedTargets` may result in increased memory usage if big number of scrape targets are dropped during relabeling.
* If `vmagent` scrapes targets with millions of metrics per each target (for instance, when scraping [federation endpoints](https://prometheus.io/docs/prometheus/latest/federation/)),
then it is recommended enabling `stream parsing mode` in order to reduce memory usage during scraping. This mode may be enabled either globally for all the scrape targets
by passing `-promscrape.streamParse` command-line flag or on a per-scrape target basis with `stream_parse: true` option. For example:

10
lib/fs/fadvise_darwin.go Normal file
View file

@ -0,0 +1,10 @@
package fs
import (
"os"
)
func fadviseSequentialRead(f *os.File, prefetch bool) error {
// TODO: implement this properly
return nil
}

10
lib/fs/fadvise_openbsd.go Normal file
View file

@ -0,0 +1,10 @@
package fs
import (
"os"
)
func fadviseSequentialRead(f *os.File, prefetch bool) error {
// TODO: implement this properly
return nil
}

22
lib/fs/fadvise_unix.go Normal file
View file

@ -0,0 +1,22 @@
// +build linux freebsd
package fs
import (
"fmt"
"os"
"golang.org/x/sys/unix"
)
func fadviseSequentialRead(f *os.File, prefetch bool) error {
fd := int(f.Fd())
mode := unix.FADV_SEQUENTIAL
if prefetch {
mode |= unix.FADV_WILLNEED
}
if err := unix.Fadvise(int(fd), 0, 0, mode); err != nil {
return fmt.Errorf("error returned from unix.Fadvise(%d): %w", mode, err)
}
return nil
}

View file

@ -149,6 +149,15 @@ func (r *ReaderAt) MustClose() {
readersCount.Dec()
}
// MustFadviseSequentialRead hints the OS that f is read mostly sequentially.
//
// if prefetch is set, then the OS is hinted to prefetch f data.
func (r *ReaderAt) MustFadviseSequentialRead(prefetch bool) {
if err := fadviseSequentialRead(r.f, prefetch); err != nil {
logger.Panicf("FATAL: error in fadviseSequentialRead(%q, %v): %s", r.f.Name(), prefetch, err)
}
}
// OpenReaderAt opens ReaderAt for reading from filename.
//
// MustClose must be called on the returned ReaderAt when it is no longer needed.

View file

@ -231,8 +231,8 @@ func (idxbc *indexBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
idxbc.mu.Lock()
for k, idxbe := range idxbc.m {
// Delete items accessed more than 10 minutes ago.
if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 10*60 {
// Delete items accessed more than a minute ago.
if currentTime-atomic.LoadUint64(&idxbe.lastAccessTime) > 60 {
delete(idxbc.m, k)
}
}
@ -378,8 +378,8 @@ func (ibc *inmemoryBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
ibc.mu.Lock()
for k, ibe := range ibc.m {
// Delete items accessed more than 10 minutes ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 10*60 {
// Delete items accessed more than a minute ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 60 {
delete(ibc.m, k)
}
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"math"
"math/bits"
"strconv"
"strings"
"sync"
"time"
@ -121,11 +122,24 @@ func (sw *ScrapeWork) LabelsString() string {
}
func promLabelsString(labels []prompbmarshal.Label) string {
a := make([]string, 0, len(labels))
// Calculate the required memory for storing serialized labels.
n := 2 // for `{...}`
for _, label := range labels {
a = append(a, fmt.Sprintf("%s=%q", label.Name, label.Value))
n += len(label.Name) + len(label.Value)
n += 4 // for `="...",`
}
return "{" + strings.Join(a, ", ") + "}"
b := make([]byte, 0, n)
b = append(b, '{')
for i, label := range labels {
b = append(b, label.Name...)
b = append(b, '=')
b = strconv.AppendQuote(b, label.Value)
if i+1 < len(labels) {
b = append(b, ',')
}
}
b = append(b, '}')
return bytesutil.ToUnsafeString(b)
}
type scrapeWork struct {

View file

@ -11,6 +11,33 @@ import (
parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus"
)
func TestPromLabelsString(t *testing.T) {
f := func(labels []prompbmarshal.Label, resultExpected string) {
t.Helper()
result := promLabelsString(labels)
if result != resultExpected {
t.Fatalf("unexpected result; got\n%s\nwant\n%s", result, resultExpected)
}
}
f([]prompbmarshal.Label{}, "{}")
f([]prompbmarshal.Label{
{
Name: "foo",
Value: "bar",
},
}, `{foo="bar"}`)
f([]prompbmarshal.Label{
{
Name: "foo",
Value: "bar",
},
{
Name: "a",
Value: `"b"`,
},
}, `{foo="bar",a="\"b\""}`)
}
func TestScrapeWorkScrapeInternalFailure(t *testing.T) {
dataExpected := `
up 0 123

View file

@ -1,6 +1,7 @@
package promscrape
import (
"flag"
"fmt"
"io"
"sort"
@ -12,6 +13,10 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
var maxDroppedTargets = flag.Int("promscrape.maxDroppedTargets", 1000, "The maximum number of `droppedTargets` shown at /api/v1/targets page. "+
"Increase this value if your setup drops more scrape targets during relabeling and you need investigating labels for all the dropped targets. "+
"Note that the increased number of tracked dropped targets may result in increased memory usage")
var tsmGlobal = newTargetStatusMap()
// WriteHumanReadableTargetsStatus writes human-readable status for all the scrape targets to w.
@ -240,12 +245,18 @@ type droppedTarget struct {
}
func (dt *droppedTargets) Register(originalLabels []prompbmarshal.Label) {
key := promLabelsString(originalLabels)
currentTime := fasttime.UnixTimestamp()
dt.mu.Lock()
dt.m[key] = droppedTarget{
originalLabels: originalLabels,
deadline: currentTime + 10*60,
if k, ok := dt.m[key]; ok {
k.deadline = currentTime + 10*60
dt.m[key] = k
} else if len(dt.m) < *maxDroppedTargets {
dt.m[key] = droppedTarget{
originalLabels: originalLabels,
deadline: currentTime + 10*60,
}
}
if currentTime-dt.lastCleanupTime > 60 {
for k, v := range dt.m {

View file

@ -232,8 +232,8 @@ func (ibc *indexBlockCache) cleanByTimeout() {
currentTime := fasttime.UnixTimestamp()
ibc.mu.Lock()
for k, ibe := range ibc.m {
// Delete items accessed more than 10 minutes ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 10*60 {
// Delete items accessed more than a minute ago.
if currentTime-atomic.LoadUint64(&ibe.lastAccessTime) > 60 {
delete(ibc.m, k)
}
}

View file

@ -1381,7 +1381,7 @@ func appendPartsToMerge(dst, src []*partWrapper, maxPartsToMerge int, maxRows ui
}
if maxM < minM {
// There is no sense in merging parts with too small m.
return dst, needFreeSpace
return dst, false
}
return append(dst, pws...), needFreeSpace
}

View file

@ -30,6 +30,36 @@ func (br *BlockRef) init(p *part, bh *blockHeader) {
br.bh = *bh
}
// Init initializes br from pr and data
func (br *BlockRef) Init(pr PartRef, data []byte) error {
br.p = pr.p
tail, err := br.bh.Unmarshal(data)
if err != nil {
return err
}
if len(tail) > 0 {
return fmt.Errorf("unexpected non-empty tail left after unmarshaling blockHeader; len(tail)=%d; tail=%q", len(tail), tail)
}
return nil
}
// Marshal marshals br to dst.
func (br *BlockRef) Marshal(dst []byte) []byte {
return br.bh.Marshal(dst)
}
// PartRef returns PartRef from br.
func (br *BlockRef) PartRef() PartRef {
return PartRef{
p: br.p,
}
}
// PartRef is Part reference.
type PartRef struct {
p *part
}
// MustReadBlock reads block from br to dst.
//
// if fetchData is false, then only block header is read, otherwise all the data is read.

View file

@ -71,7 +71,7 @@ func TestSearchQueryMarshalUnmarshal(t *testing.T) {
}
func TestSearch(t *testing.T) {
path := fmt.Sprintf("TestSearch")
path := "TestSearch"
st, err := OpenStorage(path, 0)
if err != nil {
t.Fatalf("cannot open storage %q: %s", path, err)