diff --git a/README.md b/README.md index a8544b656..d57e82571 100644 --- a/README.md +++ b/README.md @@ -535,11 +535,17 @@ See [this feature request](https://github.com/prometheus/prometheus/issues/6178) Additionally VictoriaMetrics provides the following handlers: -* `/api/v1/series/count` - it returns the total number of time series in the database. Some notes: +* `/api/v1/series/count` - returns the total number of time series in the database. Some notes: * the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series; * the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions; -* `/api/v1/labels/count` - it returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. -* `/api/v1/status/active_queries` - it returns a list of currently running queries. +* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. +* `/api/v1/status/active_queries` - returns a list of currently running queries. +* `/api/v1/status/top_queries` - returns the following query lists: + * the most frequently executed queries - `topByCount` + * queries with the biggest average execution duration - `topByAvgDuration` + * queries that took the most time for execution - `topBySumDuration` + The number of returned queries can be limited via `topN` query arg. Old queries can be filtered out with `maxLifetime` query arg. + For example, request to `/api/v1/status/top_queries?topN=5&maxLifetime=30s` would return up to 5 queries per list, which were executed during the last 30 seconds. ## Graphite API usage diff --git a/app/victoria-metrics/main.go b/app/victoria-metrics/main.go index a5881a722..79d7f0d10 100644 --- a/app/victoria-metrics/main.go +++ b/app/victoria-metrics/main.go @@ -15,6 +15,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo" "github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/fs" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" @@ -41,6 +42,7 @@ var customAPIPathList = [][]string{ func main() { // Write flags and help message to stdout, since it is easier to grep or pipe. flag.CommandLine.SetOutput(os.Stdout) + flag.Usage = usage envflag.Parse() buildinfo.Init() logger.Init() @@ -125,3 +127,12 @@ func writeAPIHelp(w io.Writer, pathList [][]string) { fmt.Fprintf(w, "%q - %s
", p, p, doc) } } + +func usage() { + const s = ` +victoria-metrics is a time series database and monitoring solution. + +See the docs at https://victoriametrics.github.io/ +` + flagutil.Usage(s) +} diff --git a/app/vmalert/README.md b/app/vmalert/README.md index 6f705db6f..aba8231d6 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -202,7 +202,7 @@ The shortlist of configuration flags is the following: How often to evaluate the rules (default 1m0s) -external.alert.source string External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service. - eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used + eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used -external.label array Optional label in the form 'name=value' to add to all generated recording rules and alerts. Pass multiple -label flags in order to add multiple label sets. Supports array of values separated by comma or specified via multiple flags. diff --git a/app/vmalert/main.go b/app/vmalert/main.go index ea6922d86..d04584eb2 100644 --- a/app/vmalert/main.go +++ b/app/vmalert/main.go @@ -41,7 +41,7 @@ Rule files may contain %{ENV_VAR} placeholders, which are substituted by the cor validateExpressions = flag.Bool("rule.validateExpressions", true, "Whether to validate rules expressions via MetricsQL engine") externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier") externalAlertSource = flag.String("external.alert.source", "", `External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service. -eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used`) +eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used`) externalLabels = flagutil.NewArray("external.label", "Optional label in the form 'name=value' to add to all generated recording rules and alerts. "+ "Pass multiple -label flags in order to add multiple label sets.") diff --git a/app/vmalert/notifier/template_func.go b/app/vmalert/notifier/template_func.go index e643d5880..043339b87 100644 --- a/app/vmalert/notifier/template_func.go +++ b/app/vmalert/notifier/template_func.go @@ -167,6 +167,10 @@ func InitTemplateFunc(externalURL *url.URL) { "queryEscape": func(q string) string { return url.QueryEscape(q) }, + "crlfEscape": func(q string) string { + q = strings.Replace(q, "\n", `\n`, -1) + return strings.Replace(q, "\r", `\r`, -1) + }, "quotesEscape": func(q string) string { return strings.Replace(q, `"`, `\"`, -1) }, diff --git a/app/vmselect/main.go b/app/vmselect/main.go index 5847113ca..4f78ab3cd 100644 --- a/app/vmselect/main.go +++ b/app/vmselect/main.go @@ -198,6 +198,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool { return true } return true + case "/api/v1/status/top_queries": + topQueriesRequests.Inc() + if err := prometheus.QueryStatsHandler(startTime, w, r); err != nil { + topQueriesErrors.Inc() + sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err)) + return true + } + return true case "/api/v1/status/tsdb": statusTSDBRequests.Inc() if err := prometheus.TSDBStatusHandler(startTime, w, r); err != nil { @@ -416,6 +424,9 @@ var ( labelsCountRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/labels/count"}`) labelsCountErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/labels/count"}`) + topQueriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/top_queries"}`) + topQueriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/top_queries"}`) + statusTSDBRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/tsdb"}`) statusTSDBErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/tsdb"}`) diff --git a/app/vmselect/prometheus/prometheus.go b/app/vmselect/prometheus/prometheus.go index e2c5cd704..d7a1a5a88 100644 --- a/app/vmselect/prometheus/prometheus.go +++ b/app/vmselect/prometheus/prometheus.go @@ -14,6 +14,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup" @@ -1250,3 +1251,34 @@ func getLatencyOffsetMilliseconds() int64 { } return d } + +// QueryStatsHandler returns query stats at `/api/v1/status/top_queries` +func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error { + if err := r.ParseForm(); err != nil { + return fmt.Errorf("cannot parse form values: %w", err) + } + topN := 20 + topNStr := r.FormValue("topN") + if len(topNStr) > 0 { + n, err := strconv.Atoi(topNStr) + if err != nil { + return fmt.Errorf("cannot parse `topN` arg %q: %w", topNStr, err) + } + topN = n + } + maxLifetimeMsecs, err := searchutils.GetDuration(r, "maxLifetime", 10*60*1000) + if err != nil { + return fmt.Errorf("cannot parse `maxLifetime` arg: %w", err) + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + bw := bufferedwriter.Get(w) + defer bufferedwriter.Put(bw) + querystats.WriteJSONQueryStats(bw, topN, time.Duration(maxLifetimeMsecs)*time.Millisecond) + if err := bw.Flush(); err != nil { + return err + } + queryStatsDuration.UpdateDuration(startTime) + return nil +} + +var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/top_queries"}`) diff --git a/app/vmselect/promql/exec.go b/app/vmselect/promql/exec.go index c41fbac00..adb42b0a6 100644 --- a/app/vmselect/promql/exec.go +++ b/app/vmselect/promql/exec.go @@ -11,6 +11,7 @@ import ( "time" "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage" + "github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/metrics" "github.com/VictoriaMetrics/metricsql" @@ -39,6 +40,12 @@ func Exec(ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result, } }() } + if querystats.Enabled() { + startTime := time.Now() + defer func() { + querystats.RegisterQuery(q, ec.End-ec.Start, startTime) + }() + } ec.validate() diff --git a/app/vmselect/querystats/querystats.go b/app/vmselect/querystats/querystats.go new file mode 100644 index 000000000..5f7a32559 --- /dev/null +++ b/app/vmselect/querystats/querystats.go @@ -0,0 +1,247 @@ +package querystats + +import ( + "flag" + "fmt" + "io" + "sort" + "sync" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" +) + +var ( + lastQueriesCount = flag.Int("search.queryStats.lastQueriesCount", 20000, "Query stats for `/api/v1/status/top_queries` is tracked on this number of last queries. "+ + "Zero value disables query stats tracking") + minQueryDuration = flag.Duration("search.queryStats.minQueryDuration", 0, "The minimum duration for queries to track in query stats at `/api/v1/status/top_queries`. "+ + "Queries with lower duration are ignored in query stats") +) + +var ( + qsTracker *queryStatsTracker + initOnce sync.Once +) + +// Enabled returns true of query stats tracking is enabled. +func Enabled() bool { + return *lastQueriesCount > 0 +} + +// RegisterQuery registers the query on the given timeRangeMsecs, which has been started at startTime. +// +// RegisterQuery must be called when the query is finished. +func RegisterQuery(query string, timeRangeMsecs int64, startTime time.Time) { + initOnce.Do(initQueryStats) + qsTracker.registerQuery(query, timeRangeMsecs, startTime) +} + +// WriteJSONQueryStats writes query stats to given writer in json format. +func WriteJSONQueryStats(w io.Writer, topN int, maxLifetime time.Duration) { + initOnce.Do(initQueryStats) + qsTracker.writeJSONQueryStats(w, topN, maxLifetime) +} + +// queryStatsTracker holds statistics for queries +type queryStatsTracker struct { + mu sync.Mutex + a []queryStatRecord + nextIdx uint +} + +type queryStatRecord struct { + query string + timeRangeSecs int64 + registerTime time.Time + duration time.Duration +} + +type queryStatKey struct { + query string + timeRangeSecs int64 +} + +func initQueryStats() { + recordsCount := *lastQueriesCount + if recordsCount <= 0 { + recordsCount = 1 + } else { + logger.Infof("enabled query stats tracking at `/api/v1/status/top_queries` with -search.queryStats.lastQueriesCount=%d, -search.queryStats.minQueryDuration=%s", + *lastQueriesCount, *minQueryDuration) + } + qsTracker = &queryStatsTracker{ + a: make([]queryStatRecord, recordsCount), + } +} + +func (qst *queryStatsTracker) writeJSONQueryStats(w io.Writer, topN int, maxLifetime time.Duration) { + fmt.Fprintf(w, `{"topN":"%d","maxLifetime":%q,`, topN, maxLifetime) + fmt.Fprintf(w, `"search.queryStats.lastQueriesCount":%d,`, *lastQueriesCount) + fmt.Fprintf(w, `"search.queryStats.minQueryDuration":%q,`, *minQueryDuration) + fmt.Fprintf(w, `"topByCount":[`) + topByCount := qst.getTopByCount(topN, maxLifetime) + for i, r := range topByCount { + fmt.Fprintf(w, `{"query":%q,"timeRangeSeconds":%d,"count":%d}`, r.query, r.timeRangeSecs, r.count) + if i+1 < len(topByCount) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `],"topByAvgDuration":[`) + topByAvgDuration := qst.getTopByAvgDuration(topN, maxLifetime) + for i, r := range topByAvgDuration { + fmt.Fprintf(w, `{"query":%q,"timeRangeSeconds":%d,"avgDurationSeconds":%.3f}`, r.query, r.timeRangeSecs, r.duration.Seconds()) + if i+1 < len(topByAvgDuration) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `],"topBySumDuration":[`) + topBySumDuration := qst.getTopBySumDuration(topN, maxLifetime) + for i, r := range topBySumDuration { + fmt.Fprintf(w, `{"query":%q,"timeRangeSeconds":%d,"sumDurationSeconds":%.3f}`, r.query, r.timeRangeSecs, r.duration.Seconds()) + if i+1 < len(topBySumDuration) { + fmt.Fprintf(w, `,`) + } + } + fmt.Fprintf(w, `]}`) +} + +func (qst *queryStatsTracker) registerQuery(query string, timeRangeMsecs int64, startTime time.Time) { + registerTime := time.Now() + duration := registerTime.Sub(startTime) + if duration < *minQueryDuration { + return + } + + qst.mu.Lock() + defer qst.mu.Unlock() + + a := qst.a + idx := qst.nextIdx + if idx >= uint(len(a)) { + idx = 0 + } + qst.nextIdx = idx + 1 + r := &a[idx] + r.query = query + r.timeRangeSecs = timeRangeMsecs / 1000 + r.registerTime = registerTime + r.duration = duration +} + +func (qst *queryStatsTracker) getTopByCount(topN int, maxLifetime time.Duration) []queryStatByCount { + currentTime := time.Now() + qst.mu.Lock() + m := make(map[queryStatKey]int) + for _, r := range qst.a { + if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { + continue + } + k := queryStatKey{ + query: r.query, + timeRangeSecs: r.timeRangeSecs, + } + m[k] = m[k] + 1 + } + qst.mu.Unlock() + + var a []queryStatByCount + for k, count := range m { + a = append(a, queryStatByCount{ + query: k.query, + timeRangeSecs: k.timeRangeSecs, + count: count, + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].count > a[j].count + }) + if len(a) > topN { + a = a[:topN] + } + return a +} + +type queryStatByCount struct { + query string + timeRangeSecs int64 + count int +} + +func (qst *queryStatsTracker) getTopByAvgDuration(topN int, maxLifetime time.Duration) []queryStatByDuration { + currentTime := time.Now() + qst.mu.Lock() + type countSum struct { + count int + sum time.Duration + } + m := make(map[queryStatKey]countSum) + for _, r := range qst.a { + if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { + continue + } + k := queryStatKey{ + query: r.query, + timeRangeSecs: r.timeRangeSecs, + } + ks := m[k] + ks.count++ + ks.sum += r.duration + m[k] = ks + } + qst.mu.Unlock() + + var a []queryStatByDuration + for k, ks := range m { + a = append(a, queryStatByDuration{ + query: k.query, + timeRangeSecs: k.timeRangeSecs, + duration: ks.sum / time.Duration(ks.count), + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].duration > a[j].duration + }) + if len(a) > topN { + a = a[:topN] + } + return a +} + +type queryStatByDuration struct { + query string + timeRangeSecs int64 + duration time.Duration +} + +func (qst *queryStatsTracker) getTopBySumDuration(topN int, maxLifetime time.Duration) []queryStatByDuration { + currentTime := time.Now() + qst.mu.Lock() + m := make(map[queryStatKey]time.Duration) + for _, r := range qst.a { + if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime { + continue + } + k := queryStatKey{ + query: r.query, + timeRangeSecs: r.timeRangeSecs, + } + m[k] = m[k] + r.duration + } + qst.mu.Unlock() + + var a []queryStatByDuration + for k, d := range m { + a = append(a, queryStatByDuration{ + query: k.query, + timeRangeSecs: k.timeRangeSecs, + duration: d, + }) + } + sort.Slice(a, func(i, j int) bool { + return a[i].duration > a[j].duration + }) + if len(a) > topN { + a = a[:topN] + } + return a +} diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 193529981..df7d27cc9 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,10 +2,12 @@ # tip +* FEATURE: add `/api/v1/status/top_queries` handler, which returns the most frequently executed queries and queries that took the most time for execution. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907 * FEATURE: vmagent: add support for `proxy_url` config option in Prometheus scrape configs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503 * FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. * FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`. +* BUGFIX: vmalert: properly escape multiline queries when passing them to Grafana. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/890 * BUGFIX: vmagent: set missing `__meta_kubernetes_service_*` labels in `kubernetes_sd_config` for `endpoints` and `endpointslices` roles. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/982 diff --git a/docs/Single-server-VictoriaMetrics.md b/docs/Single-server-VictoriaMetrics.md index a8544b656..d57e82571 100644 --- a/docs/Single-server-VictoriaMetrics.md +++ b/docs/Single-server-VictoriaMetrics.md @@ -535,11 +535,17 @@ See [this feature request](https://github.com/prometheus/prometheus/issues/6178) Additionally VictoriaMetrics provides the following handlers: -* `/api/v1/series/count` - it returns the total number of time series in the database. Some notes: +* `/api/v1/series/count` - returns the total number of time series in the database. Some notes: * the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series; * the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions; -* `/api/v1/labels/count` - it returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. -* `/api/v1/status/active_queries` - it returns a list of currently running queries. +* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values. +* `/api/v1/status/active_queries` - returns a list of currently running queries. +* `/api/v1/status/top_queries` - returns the following query lists: + * the most frequently executed queries - `topByCount` + * queries with the biggest average execution duration - `topByAvgDuration` + * queries that took the most time for execution - `topBySumDuration` + The number of returned queries can be limited via `topN` query arg. Old queries can be filtered out with `maxLifetime` query arg. + For example, request to `/api/v1/status/top_queries?topN=5&maxLifetime=30s` would return up to 5 queries per list, which were executed during the last 30 seconds. ## Graphite API usage diff --git a/docs/vmalert.md b/docs/vmalert.md index 6f705db6f..aba8231d6 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -202,7 +202,7 @@ The shortlist of configuration flags is the following: How often to evaluate the rules (default 1m0s) -external.alert.source string External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service. - eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used + eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used -external.label array Optional label in the form 'name=value' to add to all generated recording rules and alerts. Pass multiple -label flags in order to add multiple label sets. Supports array of values separated by comma or specified via multiple flags. diff --git a/lib/fs/fs.go b/lib/fs/fs.go index fb05fa876..9e0cefb13 100644 --- a/lib/fs/fs.go +++ b/lib/fs/fs.go @@ -186,6 +186,8 @@ func mustSyncParentDirIfExists(path string) { // MustRemoveAll removes path with all the contents. // +// It properly fsyncs the parent directory after path removal. +// // It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . func MustRemoveAll(path string) { _ = mustRemoveAll(path, func() {}) @@ -193,6 +195,8 @@ func MustRemoveAll(path string) { // MustRemoveAllWithDoneCallback removes path with all the contents. // +// It properly fsyncs the parent directory after path removal. +// // done is called after the path is successfully removed. // // done may be called after the function returns for NFS path. diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 7ed6ca89f..472c532c8 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -240,6 +240,9 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str // The pt must be detached from table before calling pt.Drop. func (pt *partition) Drop() { logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath) + // Wait until all the pending transaction deletions are finished before removing partition directories. + pendingTxnDeletionsWG.Wait() + fs.MustRemoveAll(pt.smallPartsPath) fs.MustRemoveAll(pt.bigPartsPath) logger.Infof("partition %q has been dropped", pt.name) @@ -643,6 +646,9 @@ func (pt *partition) PutParts(pws []*partWrapper) { func (pt *partition) MustClose() { close(pt.stopCh) + // Wait until all the pending transaction deletions are finished. + pendingTxnDeletionsWG.Wait() + logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath) startTime := time.Now() pt.stalePartsRemoverWG.Wait() @@ -1352,13 +1358,14 @@ func (pt *partition) removeStaleParts() { pt.snapshotLock.RLock() var removeWG sync.WaitGroup for pw := range m { - removeWG.Add(1) logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000) + removeWG.Add(1) fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done) } removeWG.Wait() - fs.MustSyncPath(pt.smallPartsPath) - fs.MustSyncPath(pt.bigPartsPath) + // There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath, + // since they should be automatically called inside fs.MustRemoveAllWithDoneCallback. + pt.snapshotLock.RUnlock() // Remove partition references from removed parts. @@ -1738,9 +1745,15 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str } } else { // Just remove srcPath. - fs.MustRemoveAll(srcPath) + removeWG.Add(1) + fs.MustRemoveAllWithDoneCallback(srcPath, removeWG.Done) } + // Flush pathPrefix* directory metadata to the underying storage, + // so the moved files become visible there. + fs.MustSyncPath(pathPrefix1) + fs.MustSyncPath(pathPrefix2) + pendingTxnDeletionsWG.Add(1) go func() { defer pendingTxnDeletionsWG.Done() @@ -1748,9 +1761,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str // This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 . removeWG.Wait() - // Flush pathPrefix* directory metadata to the underying storage. - fs.MustSyncPath(pathPrefix1) - fs.MustSyncPath(pathPrefix2) + // There is no need in calling fs.MustSyncPath for pathPrefix* after parts' removal, + // since it is already called by fs.MustRemoveAllWithDoneCallback. + if err := os.Remove(txnPath); err != nil { logger.Errorf("cannot remove transaction file %q: %s", txnPath, err) }