diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index 1098a49fc..0de158b03 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -96,8 +96,8 @@ var ( ) var ( - // rwctxs contains statically populated entries when -remoteWrite.url is specified. - rwctxs []*remoteWriteCtx + // rwctxsGlobal contains statically populated entries when -remoteWrite.url is specified. + rwctxsGlobal []*remoteWriteCtx // Data without tenant id is written to defaultAuthToken if -enableMultitenantHandlers is specified. defaultAuthToken = &auth.Token{} @@ -110,8 +110,8 @@ var ( StatusCode: http.StatusTooManyRequests, } - // disableOnDiskQueueAll is set to true if all -remoteWrite.url were configured to disable persistent queue via -remoteWrite.disableOnDiskQueue - disableOnDiskQueueAll bool + // disableOnDiskQueueAny is set to true if at least a single -remoteWrite.url is configured with -remoteWrite.disableOnDiskQueue + disableOnDiskQueueAny bool // dropSamplesOnFailureGlobal is set to true if -remoteWrite.dropSamplesOnOverload is set or if multiple -remoteWrite.disableOnDiskQueue options are set. dropSamplesOnFailureGlobal bool @@ -218,16 +218,16 @@ func Init() { deduplicatorGlobal = streamaggr.NewDeduplicator(pushToRemoteStoragesDropFailed, sasOpts.DedupInterval, sasOpts.DropInputLabels, sasOpts.Alias) } - rwctxs = newRemoteWriteCtxs(nil, *remoteWriteURLs) + rwctxsGlobal = newRemoteWriteCtxs(nil, *remoteWriteURLs) disableOnDiskQueues := []bool(*disableOnDiskQueue) - disableOnDiskQueueAll = !slices.Contains(disableOnDiskQueues, false) + disableOnDiskQueueAny = slices.Contains(disableOnDiskQueues, true) // Samples must be dropped if multiple -remoteWrite.disableOnDiskQueue options are configured and at least a single is set to true. // In this case it is impossible to prevent from sending many duplicates of samples passed to TryPush() to all the configured -remoteWrite.url // if these samples couldn't be sent to the -remoteWrite.url with the disabled persistent queue. So it is better sending samples // to the remaining -remoteWrite.url and dropping them on the blocked queue. - dropSamplesOnFailureGlobal = *dropSamplesOnOverload || len(disableOnDiskQueues) > 1 && slices.Contains(disableOnDiskQueues, true) + dropSamplesOnFailureGlobal = *dropSamplesOnOverload || disableOnDiskQueueAny && len(disableOnDiskQueues) > 1 dropDanglingQueues() @@ -258,8 +258,8 @@ func dropDanglingQueues() { // In case if there were many persistent queues with identical *remoteWriteURLs // the queue with the last index will be dropped. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6140 - existingQueues := make(map[string]struct{}, len(rwctxs)) - for _, rwctx := range rwctxs { + existingQueues := make(map[string]struct{}, len(rwctxsGlobal)) + for _, rwctx := range rwctxsGlobal { existingQueues[rwctx.fq.Dirname()] = struct{}{} } @@ -276,7 +276,7 @@ func dropDanglingQueues() { } } if removed > 0 { - logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxs)) + logger.Infof("removed %d dangling queues from %q, active queues: %d", removed, *tmpDataPath, len(rwctxsGlobal)) } } @@ -385,10 +385,10 @@ func Stop() { deduplicatorGlobal = nil } - for _, rwctx := range rwctxs { + for _, rwctx := range rwctxsGlobal { rwctx.MustStop() } - rwctxs = nil + rwctxsGlobal = nil if sl := hourlySeriesLimiter; sl != nil { sl.MustStop() @@ -431,29 +431,20 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF tenantRctx = getRelabelCtx() defer putRelabelCtx(tenantRctx) } - rowsCount := getRowsCount(tss) // Quick check whether writes to configured remote storage systems are blocked. // This allows saving CPU time spent on relabeling and block compression // if some of remote storage systems cannot keep up with the data ingestion rate. - // this shortcut is only applicable if all remote writes have disableOnDiskQueue = true - if disableOnDiskQueueAll { - skippedQueues := 0 - for _, rwctx := range rwctxs { - if rwctx.fq.IsWriteBlocked() { - rwctx.pushFailures.Inc() - if !forceDropSamplesOnFailure { - return false - } - rwctx.rowsDroppedOnPushFailure.Add(rowsCount) - skippedQueues++ - } - } - if skippedQueues == len(rwctxs) { - // All the queues are skipped because they are blocked and dropSamplesOnFailure is set to true. - // Return true to the caller, so it doesn't re-send the samples again. - return true - } + rwctxs, ok := getEligibleRemoteWriteCtxs(tss, forceDropSamplesOnFailure) + if !ok { + // At least a single remote write queue is blocked and dropSamplesOnFailure isn't set. + // Return false to the caller, so it could re-send samples again. + return false + } + if len(rwctxs) == 0 { + // All the remote write queues are skipped because they are blocked and dropSamplesOnFailure is set to true. + // Return true to the caller, so it doesn't re-send the samples again. + return true } var rctx *relabelCtx @@ -463,6 +454,7 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF rctx = getRelabelCtx() defer putRelabelCtx(rctx) } + rowsCount := getRowsCount(tss) globalRowsPushedBeforeRelabel.Add(rowsCount) maxSamplesPerBlock := *maxRowsPerBlock // Allow up to 10x of labels per each block on average. @@ -515,20 +507,46 @@ func tryPush(at *auth.Token, wr *prompbmarshal.WriteRequest, forceDropSamplesOnF deduplicatorGlobal.Push(tssBlock) tssBlock = tssBlock[:0] } - if !tryPushBlockToRemoteStorages(tssBlock, forceDropSamplesOnFailure) { + if !tryPushBlockToRemoteStorages(rwctxs, tssBlock, forceDropSamplesOnFailure) { return false } } return true } +func getEligibleRemoteWriteCtxs(tss []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) ([]*remoteWriteCtx, bool) { + if !disableOnDiskQueueAny { + return rwctxsGlobal, true + } + + // This code is applicable if at least a single remote storage has -disableOnDiskQueue + rwctxs := make([]*remoteWriteCtx, 0, len(rwctxsGlobal)) + for _, rwctx := range rwctxsGlobal { + if !rwctx.fq.IsWriteBlocked() { + rwctxs = append(rwctxs, rwctx) + } else { + rwctx.pushFailures.Inc() + if !forceDropSamplesOnFailure { + return nil, false + } + rowsCount := getRowsCount(tss) + rwctx.rowsDroppedOnPushFailure.Add(rowsCount) + } + } + return rwctxs, true +} + func pushToRemoteStoragesDropFailed(tss []prompbmarshal.TimeSeries) { - if tryPushBlockToRemoteStorages(tss, true) { + rwctxs, _ := getEligibleRemoteWriteCtxs(tss, true) + if len(rwctxs) == 0 { return } + if !tryPushBlockToRemoteStorages(rwctxs, tss, true) { + logger.Panicf("BUG: tryPushBlockToRemoteStorages() must return true when forceDropSamplesOnFailure=true") + } } -func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool { +func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, forceDropSamplesOnFailure bool) bool { if len(tssBlock) == 0 { // Nothing to push return true @@ -547,7 +565,7 @@ func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDrop if replicas <= 0 { replicas = 1 } - return tryShardingBlockAmongRemoteStorages(tssBlock, replicas, forceDropSamplesOnFailure) + return tryShardingBlockAmongRemoteStorages(rwctxs, tssBlock, replicas, forceDropSamplesOnFailure) } // Replicate tssBlock samples among rwctxs. @@ -568,7 +586,7 @@ func tryPushBlockToRemoteStorages(tssBlock []prompbmarshal.TimeSeries, forceDrop return !anyPushFailed.Load() } -func tryShardingBlockAmongRemoteStorages(tssBlock []prompbmarshal.TimeSeries, replicas int, forceDropSamplesOnFailure bool) bool { +func tryShardingBlockAmongRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmarshal.TimeSeries, replicas int, forceDropSamplesOnFailure bool) bool { x := getTSSShards(len(rwctxs)) defer putTSSShards(x) diff --git a/app/vmagent/remotewrite/streamaggr.go b/app/vmagent/remotewrite/streamaggr.go index c1cba412e..c07fc46d8 100644 --- a/app/vmagent/remotewrite/streamaggr.go +++ b/app/vmagent/remotewrite/streamaggr.go @@ -80,7 +80,7 @@ func CheckStreamAggrConfigs() error { func reloadStreamAggrConfigs() { reloadStreamAggrConfig(-1, pushToRemoteStoragesDropFailed) - for idx, rwctx := range rwctxs { + for idx, rwctx := range rwctxsGlobal { reloadStreamAggrConfig(idx, rwctx.pushInternalTrackDropped) } } @@ -102,7 +102,7 @@ func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) { if idx < 0 { sas = sasGlobal.Load() } else { - sas = rwctxs[idx].sas.Load() + sas = rwctxsGlobal[idx].sas.Load() } if !sasNew.Equal(sas) { @@ -110,7 +110,7 @@ func reloadStreamAggrConfig(idx int, pushFunc streamaggr.PushFunc) { if idx < 0 { sasOld = sasGlobal.Swap(sasNew) } else { - sasOld = rwctxs[idx].sas.Swap(sasNew) + sasOld = rwctxsGlobal[idx].sas.Swap(sasNew) } sasOld.MustStop() logger.Infof("successfully reloaded stream aggregation configs at %q", path)