Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2020-12-25 17:41:24 +02:00
commit 4cdbc4642d
14 changed files with 359 additions and 16 deletions

View file

@ -535,11 +535,17 @@ See [this feature request](https://github.com/prometheus/prometheus/issues/6178)
Additionally VictoriaMetrics provides the following handlers:
* `/api/v1/series/count` - it returns the total number of time series in the database. Some notes:
* `/api/v1/series/count` - returns the total number of time series in the database. Some notes:
* the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series;
* the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions;
* `/api/v1/labels/count` - it returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values.
* `/api/v1/status/active_queries` - it returns a list of currently running queries.
* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values.
* `/api/v1/status/active_queries` - returns a list of currently running queries.
* `/api/v1/status/top_queries` - returns the following query lists:
* the most frequently executed queries - `topByCount`
* queries with the biggest average execution duration - `topByAvgDuration`
* queries that took the most time for execution - `topBySumDuration`
The number of returned queries can be limited via `topN` query arg. Old queries can be filtered out with `maxLifetime` query arg.
For example, request to `/api/v1/status/top_queries?topN=5&maxLifetime=30s` would return up to 5 queries per list, which were executed during the last 30 seconds.
## Graphite API usage

View file

@ -15,6 +15,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envflag"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@ -41,6 +42,7 @@ var customAPIPathList = [][]string{
func main() {
// Write flags and help message to stdout, since it is easier to grep or pipe.
flag.CommandLine.SetOutput(os.Stdout)
flag.Usage = usage
envflag.Parse()
buildinfo.Init()
logger.Init()
@ -125,3 +127,12 @@ func writeAPIHelp(w io.Writer, pathList [][]string) {
fmt.Fprintf(w, "<a href='%s'>%q</a> - %s<br/>", p, p, doc)
}
}
func usage() {
const s = `
victoria-metrics is a time series database and monitoring solution.
See the docs at https://victoriametrics.github.io/
`
flagutil.Usage(s)
}

View file

@ -202,7 +202,7 @@ The shortlist of configuration flags is the following:
How often to evaluate the rules (default 1m0s)
-external.alert.source string
External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service.
eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used
eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used
-external.label array
Optional label in the form 'name=value' to add to all generated recording rules and alerts. Pass multiple -label flags in order to add multiple label sets.
Supports array of values separated by comma or specified via multiple flags.

View file

@ -41,7 +41,7 @@ Rule files may contain %{ENV_VAR} placeholders, which are substituted by the cor
validateExpressions = flag.Bool("rule.validateExpressions", true, "Whether to validate rules expressions via MetricsQL engine")
externalURL = flag.String("external.url", "", "External URL is used as alert's source for sent alerts to the notifier")
externalAlertSource = flag.String("external.alert.source", "", `External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service.
eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used`)
eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used`)
externalLabels = flagutil.NewArray("external.label", "Optional label in the form 'name=value' to add to all generated recording rules and alerts. "+
"Pass multiple -label flags in order to add multiple label sets.")

View file

@ -167,6 +167,10 @@ func InitTemplateFunc(externalURL *url.URL) {
"queryEscape": func(q string) string {
return url.QueryEscape(q)
},
"crlfEscape": func(q string) string {
q = strings.Replace(q, "\n", `\n`, -1)
return strings.Replace(q, "\r", `\r`, -1)
},
"quotesEscape": func(q string) string {
return strings.Replace(q, `"`, `\"`, -1)
},

View file

@ -198,6 +198,14 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
return true
case "/api/v1/status/top_queries":
topQueriesRequests.Inc()
if err := prometheus.QueryStatsHandler(startTime, w, r); err != nil {
topQueriesErrors.Inc()
sendPrometheusError(w, r, fmt.Errorf("cannot query status endpoint: %w", err))
return true
}
return true
case "/api/v1/status/tsdb":
statusTSDBRequests.Inc()
if err := prometheus.TSDBStatusHandler(startTime, w, r); err != nil {
@ -416,6 +424,9 @@ var (
labelsCountRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/labels/count"}`)
labelsCountErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/labels/count"}`)
topQueriesRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/top_queries"}`)
topQueriesErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/top_queries"}`)
statusTSDBRequests = metrics.NewCounter(`vm_http_requests_total{path="/api/v1/status/tsdb"}`)
statusTSDBErrors = metrics.NewCounter(`vm_http_request_errors_total{path="/api/v1/status/tsdb"}`)

View file

@ -14,6 +14,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
@ -1250,3 +1251,34 @@ func getLatencyOffsetMilliseconds() int64 {
}
return d
}
// QueryStatsHandler returns query stats at `/api/v1/status/top_queries`
func QueryStatsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
if err := r.ParseForm(); err != nil {
return fmt.Errorf("cannot parse form values: %w", err)
}
topN := 20
topNStr := r.FormValue("topN")
if len(topNStr) > 0 {
n, err := strconv.Atoi(topNStr)
if err != nil {
return fmt.Errorf("cannot parse `topN` arg %q: %w", topNStr, err)
}
topN = n
}
maxLifetimeMsecs, err := searchutils.GetDuration(r, "maxLifetime", 10*60*1000)
if err != nil {
return fmt.Errorf("cannot parse `maxLifetime` arg: %w", err)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
querystats.WriteJSONQueryStats(bw, topN, time.Duration(maxLifetimeMsecs)*time.Millisecond)
if err := bw.Flush(); err != nil {
return err
}
queryStatsDuration.UpdateDuration(startTime)
return nil
}
var queryStatsDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/top_queries"}`)

View file

@ -11,6 +11,7 @@ import (
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
@ -39,6 +40,12 @@ func Exec(ec *EvalConfig, q string, isFirstPointOnly bool) ([]netstorage.Result,
}
}()
}
if querystats.Enabled() {
startTime := time.Now()
defer func() {
querystats.RegisterQuery(q, ec.End-ec.Start, startTime)
}()
}
ec.validate()

View file

@ -0,0 +1,247 @@
package querystats
import (
"flag"
"fmt"
"io"
"sort"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
var (
lastQueriesCount = flag.Int("search.queryStats.lastQueriesCount", 20000, "Query stats for `/api/v1/status/top_queries` is tracked on this number of last queries. "+
"Zero value disables query stats tracking")
minQueryDuration = flag.Duration("search.queryStats.minQueryDuration", 0, "The minimum duration for queries to track in query stats at `/api/v1/status/top_queries`. "+
"Queries with lower duration are ignored in query stats")
)
var (
qsTracker *queryStatsTracker
initOnce sync.Once
)
// Enabled returns true of query stats tracking is enabled.
func Enabled() bool {
return *lastQueriesCount > 0
}
// RegisterQuery registers the query on the given timeRangeMsecs, which has been started at startTime.
//
// RegisterQuery must be called when the query is finished.
func RegisterQuery(query string, timeRangeMsecs int64, startTime time.Time) {
initOnce.Do(initQueryStats)
qsTracker.registerQuery(query, timeRangeMsecs, startTime)
}
// WriteJSONQueryStats writes query stats to given writer in json format.
func WriteJSONQueryStats(w io.Writer, topN int, maxLifetime time.Duration) {
initOnce.Do(initQueryStats)
qsTracker.writeJSONQueryStats(w, topN, maxLifetime)
}
// queryStatsTracker holds statistics for queries
type queryStatsTracker struct {
mu sync.Mutex
a []queryStatRecord
nextIdx uint
}
type queryStatRecord struct {
query string
timeRangeSecs int64
registerTime time.Time
duration time.Duration
}
type queryStatKey struct {
query string
timeRangeSecs int64
}
func initQueryStats() {
recordsCount := *lastQueriesCount
if recordsCount <= 0 {
recordsCount = 1
} else {
logger.Infof("enabled query stats tracking at `/api/v1/status/top_queries` with -search.queryStats.lastQueriesCount=%d, -search.queryStats.minQueryDuration=%s",
*lastQueriesCount, *minQueryDuration)
}
qsTracker = &queryStatsTracker{
a: make([]queryStatRecord, recordsCount),
}
}
func (qst *queryStatsTracker) writeJSONQueryStats(w io.Writer, topN int, maxLifetime time.Duration) {
fmt.Fprintf(w, `{"topN":"%d","maxLifetime":%q,`, topN, maxLifetime)
fmt.Fprintf(w, `"search.queryStats.lastQueriesCount":%d,`, *lastQueriesCount)
fmt.Fprintf(w, `"search.queryStats.minQueryDuration":%q,`, *minQueryDuration)
fmt.Fprintf(w, `"topByCount":[`)
topByCount := qst.getTopByCount(topN, maxLifetime)
for i, r := range topByCount {
fmt.Fprintf(w, `{"query":%q,"timeRangeSeconds":%d,"count":%d}`, r.query, r.timeRangeSecs, r.count)
if i+1 < len(topByCount) {
fmt.Fprintf(w, `,`)
}
}
fmt.Fprintf(w, `],"topByAvgDuration":[`)
topByAvgDuration := qst.getTopByAvgDuration(topN, maxLifetime)
for i, r := range topByAvgDuration {
fmt.Fprintf(w, `{"query":%q,"timeRangeSeconds":%d,"avgDurationSeconds":%.3f}`, r.query, r.timeRangeSecs, r.duration.Seconds())
if i+1 < len(topByAvgDuration) {
fmt.Fprintf(w, `,`)
}
}
fmt.Fprintf(w, `],"topBySumDuration":[`)
topBySumDuration := qst.getTopBySumDuration(topN, maxLifetime)
for i, r := range topBySumDuration {
fmt.Fprintf(w, `{"query":%q,"timeRangeSeconds":%d,"sumDurationSeconds":%.3f}`, r.query, r.timeRangeSecs, r.duration.Seconds())
if i+1 < len(topBySumDuration) {
fmt.Fprintf(w, `,`)
}
}
fmt.Fprintf(w, `]}`)
}
func (qst *queryStatsTracker) registerQuery(query string, timeRangeMsecs int64, startTime time.Time) {
registerTime := time.Now()
duration := registerTime.Sub(startTime)
if duration < *minQueryDuration {
return
}
qst.mu.Lock()
defer qst.mu.Unlock()
a := qst.a
idx := qst.nextIdx
if idx >= uint(len(a)) {
idx = 0
}
qst.nextIdx = idx + 1
r := &a[idx]
r.query = query
r.timeRangeSecs = timeRangeMsecs / 1000
r.registerTime = registerTime
r.duration = duration
}
func (qst *queryStatsTracker) getTopByCount(topN int, maxLifetime time.Duration) []queryStatByCount {
currentTime := time.Now()
qst.mu.Lock()
m := make(map[queryStatKey]int)
for _, r := range qst.a {
if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime {
continue
}
k := queryStatKey{
query: r.query,
timeRangeSecs: r.timeRangeSecs,
}
m[k] = m[k] + 1
}
qst.mu.Unlock()
var a []queryStatByCount
for k, count := range m {
a = append(a, queryStatByCount{
query: k.query,
timeRangeSecs: k.timeRangeSecs,
count: count,
})
}
sort.Slice(a, func(i, j int) bool {
return a[i].count > a[j].count
})
if len(a) > topN {
a = a[:topN]
}
return a
}
type queryStatByCount struct {
query string
timeRangeSecs int64
count int
}
func (qst *queryStatsTracker) getTopByAvgDuration(topN int, maxLifetime time.Duration) []queryStatByDuration {
currentTime := time.Now()
qst.mu.Lock()
type countSum struct {
count int
sum time.Duration
}
m := make(map[queryStatKey]countSum)
for _, r := range qst.a {
if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime {
continue
}
k := queryStatKey{
query: r.query,
timeRangeSecs: r.timeRangeSecs,
}
ks := m[k]
ks.count++
ks.sum += r.duration
m[k] = ks
}
qst.mu.Unlock()
var a []queryStatByDuration
for k, ks := range m {
a = append(a, queryStatByDuration{
query: k.query,
timeRangeSecs: k.timeRangeSecs,
duration: ks.sum / time.Duration(ks.count),
})
}
sort.Slice(a, func(i, j int) bool {
return a[i].duration > a[j].duration
})
if len(a) > topN {
a = a[:topN]
}
return a
}
type queryStatByDuration struct {
query string
timeRangeSecs int64
duration time.Duration
}
func (qst *queryStatsTracker) getTopBySumDuration(topN int, maxLifetime time.Duration) []queryStatByDuration {
currentTime := time.Now()
qst.mu.Lock()
m := make(map[queryStatKey]time.Duration)
for _, r := range qst.a {
if r.query == "" || currentTime.Sub(r.registerTime) > maxLifetime {
continue
}
k := queryStatKey{
query: r.query,
timeRangeSecs: r.timeRangeSecs,
}
m[k] = m[k] + r.duration
}
qst.mu.Unlock()
var a []queryStatByDuration
for k, d := range m {
a = append(a, queryStatByDuration{
query: k.query,
timeRangeSecs: k.timeRangeSecs,
duration: d,
})
}
sort.Slice(a, func(i, j int) bool {
return a[i].duration > a[j].duration
})
if len(a) > topN {
a = a[:topN]
}
return a
}

View file

@ -2,10 +2,12 @@
# tip
* FEATURE: add `/api/v1/status/top_queries` handler, which returns the most frequently executed queries and queries that took the most time for execution. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/907
* FEATURE: vmagent: add support for `proxy_url` config option in Prometheus scrape configs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503
* FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month.
* FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`.
* BUGFIX: vmalert: properly escape multiline queries when passing them to Grafana. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/890
* BUGFIX: vmagent: set missing `__meta_kubernetes_service_*` labels in `kubernetes_sd_config` for `endpoints` and `endpointslices` roles. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/982

View file

@ -535,11 +535,17 @@ See [this feature request](https://github.com/prometheus/prometheus/issues/6178)
Additionally VictoriaMetrics provides the following handlers:
* `/api/v1/series/count` - it returns the total number of time series in the database. Some notes:
* `/api/v1/series/count` - returns the total number of time series in the database. Some notes:
* the handler scans all the inverted index, so it can be slow if the database contains tens of millions of time series;
* the handler may count [deleted time series](#how-to-delete-time-series) additionally to normal time series due to internal implementation restrictions;
* `/api/v1/labels/count` - it returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values.
* `/api/v1/status/active_queries` - it returns a list of currently running queries.
* `/api/v1/labels/count` - returns a list of `label: values_count` entries. It can be used for determining labels with the maximum number of values.
* `/api/v1/status/active_queries` - returns a list of currently running queries.
* `/api/v1/status/top_queries` - returns the following query lists:
* the most frequently executed queries - `topByCount`
* queries with the biggest average execution duration - `topByAvgDuration`
* queries that took the most time for execution - `topBySumDuration`
The number of returned queries can be limited via `topN` query arg. Old queries can be filtered out with `maxLifetime` query arg.
For example, request to `/api/v1/status/top_queries?topN=5&maxLifetime=30s` would return up to 5 queries per list, which were executed during the last 30 seconds.
## Graphite API usage

View file

@ -202,7 +202,7 @@ The shortlist of configuration flags is the following:
How often to evaluate the rules (default 1m0s)
-external.alert.source string
External Alert Source allows to override the Source link for alerts sent to AlertManager for cases where you want to build a custom link to Grafana, Prometheus or any other service.
eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used
eg. 'explore?orgId=1&left=[\"now-1h\",\"now\",\"VictoriaMetrics\",{\"expr\": \"{{$expr|quotesEscape|crlfEscape|pathEscape}}\"},{\"mode\":\"Metrics\"},{\"ui\":[true,true,true,\"none\"]}]'.If empty '/api/v1/:groupID/alertID/status' is used
-external.label array
Optional label in the form 'name=value' to add to all generated recording rules and alerts. Pass multiple -label flags in order to add multiple label sets.
Supports array of values separated by comma or specified via multiple flags.

View file

@ -186,6 +186,8 @@ func mustSyncParentDirIfExists(path string) {
// MustRemoveAll removes path with all the contents.
//
// It properly fsyncs the parent directory after path removal.
//
// It properly handles NFS issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
func MustRemoveAll(path string) {
_ = mustRemoveAll(path, func() {})
@ -193,6 +195,8 @@ func MustRemoveAll(path string) {
// MustRemoveAllWithDoneCallback removes path with all the contents.
//
// It properly fsyncs the parent directory after path removal.
//
// done is called after the path is successfully removed.
//
// done may be called after the function returns for NFS path.

View file

@ -240,6 +240,9 @@ func createPartition(timestamp int64, smallPartitionsPath, bigPartitionsPath str
// The pt must be detached from table before calling pt.Drop.
func (pt *partition) Drop() {
logger.Infof("dropping partition %q at smallPartsPath=%q, bigPartsPath=%q", pt.name, pt.smallPartsPath, pt.bigPartsPath)
// Wait until all the pending transaction deletions are finished before removing partition directories.
pendingTxnDeletionsWG.Wait()
fs.MustRemoveAll(pt.smallPartsPath)
fs.MustRemoveAll(pt.bigPartsPath)
logger.Infof("partition %q has been dropped", pt.name)
@ -643,6 +646,9 @@ func (pt *partition) PutParts(pws []*partWrapper) {
func (pt *partition) MustClose() {
close(pt.stopCh)
// Wait until all the pending transaction deletions are finished.
pendingTxnDeletionsWG.Wait()
logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath)
startTime := time.Now()
pt.stalePartsRemoverWG.Wait()
@ -1352,13 +1358,14 @@ func (pt *partition) removeStaleParts() {
pt.snapshotLock.RLock()
var removeWG sync.WaitGroup
for pw := range m {
removeWG.Add(1)
logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000)
removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(pw.p.path, removeWG.Done)
}
removeWG.Wait()
fs.MustSyncPath(pt.smallPartsPath)
fs.MustSyncPath(pt.bigPartsPath)
// There is no need in calling fs.MustSyncPath() on pt.smallPartsPath and pt.bigPartsPath,
// since they should be automatically called inside fs.MustRemoveAllWithDoneCallback.
pt.snapshotLock.RUnlock()
// Remove partition references from removed parts.
@ -1738,9 +1745,15 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
}
} else {
// Just remove srcPath.
fs.MustRemoveAll(srcPath)
removeWG.Add(1)
fs.MustRemoveAllWithDoneCallback(srcPath, removeWG.Done)
}
// Flush pathPrefix* directory metadata to the underying storage,
// so the moved files become visible there.
fs.MustSyncPath(pathPrefix1)
fs.MustSyncPath(pathPrefix2)
pendingTxnDeletionsWG.Add(1)
go func() {
defer pendingTxnDeletionsWG.Done()
@ -1748,9 +1761,9 @@ func runTransaction(txnLock *sync.RWMutex, pathPrefix1, pathPrefix2, txnPath str
// This is required for NFS mounts. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/61 .
removeWG.Wait()
// Flush pathPrefix* directory metadata to the underying storage.
fs.MustSyncPath(pathPrefix1)
fs.MustSyncPath(pathPrefix2)
// There is no need in calling fs.MustSyncPath for pathPrefix* after parts' removal,
// since it is already called by fs.MustRemoveAllWithDoneCallback.
if err := os.Remove(txnPath); err != nil {
logger.Errorf("cannot remove transaction file %q: %s", txnPath, err)
}