app/vmselect: fixes possible panics for multitenant queries

This commit fixes panic for multitenant requests and empty storage node responses for tenants api.

 It also optimizes `populateSqTenantTokensIfNeeded` function calls, by making it only once for query request. Previously it was incorrectly called multiple times per each storage node request.

Related issue: 
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7549
---------
Signed-off-by: f41gh7 <nik@victoriametrics.com>
Co-authored-by: f41gh7 <nik@victoriametrics.com>
This commit is contained in:
Andrei Baidarov 2024-11-15 16:12:30 +01:00 committed by GitHub
parent c73a9cbdaa
commit 3120dc2054
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 35 additions and 50 deletions

View file

@ -852,15 +852,12 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
deletedCount int deletedCount int
err error err error
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
err := populateSqTenantTokensIfNeeded(sq) err := populateSqTenantTokensIfNeeded(sq)
if err != nil { if err != nil {
return []*nodeResult{{ return 0, err
err: err,
}}
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, true, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any { return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
sn.deleteSeriesRequests.Inc() sn.deleteSeriesRequests.Inc()
deletedCount, err := sn.deleteSeries(qt, requestData, deadline) deletedCount, err := sn.deleteSeries(qt, requestData, deadline)
@ -876,7 +873,7 @@ func DeleteSeries(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline sear
// Collect results // Collect results
deletedTotal := 0 deletedTotal := 0
err := snr.collectAllResults(func(result any) error { err = snr.collectAllResults(func(result any) error {
for _, cr := range result.([]any) { for _, cr := range result.([]any) {
nr := cr.(*nodeResult) nr := cr.(*nodeResult)
if nr.err != nil { if nr.err != nil {
@ -904,15 +901,12 @@ func LabelNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
labelNames []string labelNames []string
err error err error
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
err := populateSqTenantTokensIfNeeded(sq) err := populateSqTenantTokensIfNeeded(sq)
if err != nil { if err != nil {
return []*nodeResult{{ return nil, false, err
err: err,
}}
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any { return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
sn.labelNamesRequests.Inc() sn.labelNamesRequests.Inc()
labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline) labelNames, err := sn.getLabelNames(qt, requestData, maxLabelNames, deadline)
@ -1051,15 +1045,12 @@ func LabelValues(qt *querytracer.Tracer, denyPartialResponse bool, labelName str
labelValues []string labelValues []string
err error err error
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
err := populateSqTenantTokensIfNeeded(sq) err := populateSqTenantTokensIfNeeded(sq)
if err != nil { if err != nil {
return []*nodeResult{{ return nil, false, err
err: err,
}}
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any { return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
sn.labelValuesRequests.Inc() sn.labelValuesRequests.Inc()
labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline) labelValues, err := sn.getLabelValues(qt, labelName, requestData, maxLabelValues, deadline)
@ -1267,15 +1258,12 @@ func TSDBStatus(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.Se
status *storage.TSDBStatus status *storage.TSDBStatus
err error err error
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
err := populateSqTenantTokensIfNeeded(sq) err := populateSqTenantTokensIfNeeded(sq)
if err != nil { if err != nil {
return []*nodeResult{{ return nil, false, err
err: err,
}}
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any { return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, _ storage.TenantToken) any {
sn.tsdbStatusRequests.Inc() sn.tsdbStatusRequests.Inc()
status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline) status, err := sn.getTSDBStatus(qt, requestData, focusLabel, topN, deadline)
@ -1703,14 +1691,12 @@ func SearchMetricNames(qt *querytracer.Tracer, denyPartialResponse bool, sq *sto
metricNames []string metricNames []string
err error err error
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
err := populateSqTenantTokensIfNeeded(sq) err := populateSqTenantTokensIfNeeded(sq)
if err != nil { if err != nil {
return []*nodeResult{{ return nil, false, err
err: err,
}}
} }
sns := getStorageNodes()
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, _ uint, sn *storageNode) any {
return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any { return execSearchQuery(qt, sq, func(qt *querytracer.Tracer, requestData []byte, t storage.TenantToken) any {
sn.searchMetricNamesRequests.Inc() sn.searchMetricNamesRequests.Inc()
metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline) metricNames, err := sn.processSearchMetricNames(qt, requestData, deadline)
@ -1896,14 +1882,12 @@ func processBlocks(qt *querytracer.Tracer, sns []*storageNode, denyPartialRespon
return err return err
} }
err := populateSqTenantTokensIfNeeded(sq)
if err != nil {
return false, err
}
// Send the query to all the storage nodes in parallel. // Send the query to all the storage nodes in parallel.
snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any { snr := startStorageNodesRequest(qt, sns, denyPartialResponse, func(qt *querytracer.Tracer, workerID uint, sn *storageNode) any {
var err error
err = populateSqTenantTokensIfNeeded(sq)
if err != nil {
return &err
}
res := execSearchQuery(qt, sq, func(qt *querytracer.Tracer, rd []byte, _ storage.TenantToken) any { res := execSearchQuery(qt, sq, func(qt *querytracer.Tracer, rd []byte, _ storage.TenantToken) any {
sn.searchRequests.Inc() sn.searchRequests.Inc()
err = sn.processSearchQuery(qt, rd, f, workerID, deadline) err = sn.processSearchQuery(qt, rd, f, workerID, deadline)

View file

@ -1112,9 +1112,9 @@ func evalInstantRollup(qt *querytracer.Tracer, ec *EvalConfig, funcName string,
return offset >= maxOffset return offset >= maxOffset
} }
at := ec.AuthTokens[0] var at *auth.Token
if ec.IsMultiTenant { if !ec.IsMultiTenant {
at = nil at = ec.AuthTokens[0]
} }
deleteCachedSeries := func(qt *querytracer.Tracer) { deleteCachedSeries := func(qt *querytracer.Tracer) {
rollupResultCacheV.DeleteInstantValues(qt, at, expr, window, ec.Step, ec.EnforcedTagFilterss) rollupResultCacheV.DeleteInstantValues(qt, at, expr, window, ec.Step, ec.EnforcedTagFilterss)

View file

@ -236,9 +236,9 @@ func (rrc *rollupResultCache) GetSeries(qt *querytracer.Tracer, ec *EvalConfig,
// Obtain tss from the cache. // Obtain tss from the cache.
bb := bbPool.Get() bb := bbPool.Get()
defer bbPool.Put(bb) defer bbPool.Put(bb)
at := ec.AuthTokens[0] var at *auth.Token
if ec.IsMultiTenant { if !ec.IsMultiTenant {
at = nil at = ec.AuthTokens[0]
} }
bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss) bb.B = marshalRollupResultCacheKeyForSeries(bb.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss)
@ -369,9 +369,9 @@ func (rrc *rollupResultCache) PutSeries(qt *querytracer.Tracer, ec *EvalConfig,
metainfoBuf := bbPool.Get() metainfoBuf := bbPool.Get()
defer bbPool.Put(metainfoBuf) defer bbPool.Put(metainfoBuf)
at := ec.AuthTokens[0] var at *auth.Token
if ec.IsMultiTenant { if !ec.IsMultiTenant {
at = nil at = ec.AuthTokens[0]
} }
metainfoKey.B = marshalRollupResultCacheKeyForSeries(metainfoKey.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss) metainfoKey.B = marshalRollupResultCacheKeyForSeries(metainfoKey.B[:0], at, expr, window, ec.Step, ec.EnforcedTagFilterss)
metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B) metainfoBuf.B = rrc.c.Get(metainfoBuf.B[:0], metainfoKey.B)

View file

@ -26,6 +26,7 @@ See also [LTS releases](https://docs.victoriametrics.com/lts-releases/).
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly check availability of all the backends before giving up when proxying requests. Previously, vmauth could return an error even if there were healthy backends available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3061) for details. * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly check availability of all the backends before giving up when proxying requests. Previously, vmauth could return an error even if there were healthy backends available. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3061) for details.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly inherit [`drop_src_path_prefix_parts`](https://docs.victoriametrics.com/vmauth/#dropping-request-path-prefix), [`load_balancing_policy`](https://docs.victoriametrics.com/vmauth/#high-availability), [`retry_status_codes`](https://docs.victoriametrics.com/vmauth/#load-balancing) and [`discover_backend_ips`](https://docs.victoriametrics.com/vmauth/#discovering-backend-ips) options by `url_map` entries if `url_prefix` option isn't set at the [user config level](https://docs.victoriametrics.com/vmauth/#auth-config). These options were inherited only when the `url_prefix` option was set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7519). * BUGFIX: [vmauth](https://docs.victoriametrics.com/vmauth/): properly inherit [`drop_src_path_prefix_parts`](https://docs.victoriametrics.com/vmauth/#dropping-request-path-prefix), [`load_balancing_policy`](https://docs.victoriametrics.com/vmauth/#high-availability), [`retry_status_codes`](https://docs.victoriametrics.com/vmauth/#load-balancing) and [`discover_backend_ips`](https://docs.victoriametrics.com/vmauth/#discovering-backend-ips) options by `url_map` entries if `url_prefix` option isn't set at the [user config level](https://docs.victoriametrics.com/vmauth/#auth-config). These options were inherited only when the `url_prefix` option was set. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7519).
* BUGFIX: [dashboards](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards): add `file` label filter to vmalert dashboard panels. Previously, metrics from groups with the same name but different rule files could be mixed in the results. * BUGFIX: [dashboards](https://github.com/VictoriaMetrics/VictoriaMetrics/blob/master/dashboards): add `file` label filter to vmalert dashboard panels. Previously, metrics from groups with the same name but different rule files could be mixed in the results.
BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/cluster-victoriametrics/): Properly handle [multitenant](https://docs.victoriametrics.com/cluster-victoriametrics/#multitenancy-via-labels) query request errors and correctly perform search for available tenants. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7549) for details.
## [v1.106.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.106.0) ## [v1.106.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.106.0)