vmagent: support sharding by excluded labels (#5938)

To horizontally scale streaming aggregation, you might want to deploy a separate hashing tier 
of vmagents that route to a separate aggregation tier. The hashing tier should shard by all labels 
except the instance-level labels, to ensure the input metrics are routed correctly to the aggregator 
instance responsible for those labels.
For this to achieve we introduce `remoteWrite.shardByURL.inverseLabels` flag to inverse logic of `remoteWrite.shardByURL.labels`

---------

Co-authored-by: Eugene Ma <eugene.ma@airbnb.com>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
This commit is contained in:
Eugene Ma 2024-03-29 05:26:02 -07:00 committed by GitHub
parent ac9c2a796f
commit 166b97b8d0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -51,6 +51,7 @@ var (
shardByURLLabels = flagutil.NewArrayString("remoteWrite.shardByURL.labels", "Optional list of labels, which must be used for sharding outgoing samples "+
"among remote storage systems if -remoteWrite.shardByURL command-line flag is set. By default all the labels are used for sharding in order to gain "+
"even distribution of series over the specified -remoteWrite.url systems")
shardByURLInverseLabels = flag.Bool("remoteWrite.shardByURL.inverseLabels", false, "Inverse the behavior of remoteWrite.shardByURL.labels so that series are sharded using all labels except the ones specified in remoteWrite.shardByURL.labels.")
tmpDataPath = flag.String("remoteWrite.tmpDataPath", "vmagent-remotewrite-data", "Path to directory for storing pending data, which isn't sent to the configured -remoteWrite.url . "+
"See also -remoteWrite.maxDiskUsagePerURL and -remoteWrite.disableOnDiskQueue")
keepDanglingQueues = flag.Bool("remoteWrite.keepDanglingQueues", false, "Keep persistent queues contents at -remoteWrite.tmpDataPath in case there are no matching -remoteWrite.url. "+
@ -537,12 +538,14 @@ func tryPushBlockToRemoteStorages(rwctxs []*remoteWriteCtx, tssBlock []prompbmar
// Shard the data among rwctxs
tssByURL := make([][]prompbmarshal.TimeSeries, len(rwctxs))
tmpLabels := promutils.GetLabels()
inverseLabels := *shardByURLInverseLabels
for _, ts := range tssBlock {
hashLabels := ts.Labels
if len(shardByURLLabelsMap) > 0 {
hashLabels = tmpLabels.Labels[:0]
for _, label := range ts.Labels {
if _, ok := shardByURLLabelsMap[label.Name]; ok {
_, ok := shardByURLLabelsMap[label.Name]
if ok && !inverseLabels || !ok && inverseLabels {
hashLabels = append(hashLabels, label)
}
}