lib/vmselectapi: propagate timeout errors from vmselect to vmstorage instead of closing the connection established from vmselect to vmstorage

This is a follow-up for 20e9598254
This commit is contained in:
Aliaksandr Valialkin 2023-01-20 19:30:19 -08:00
parent b046af8a4d
commit d8329e47cf
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 58 additions and 4 deletions

View file

@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f
## tip
* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): propagate all the timeout-related errors from `vmstorage` to `vmselect` when `vmstorage`. Previously some timeout errors weren't returned from `vmselect` to `vmstorage`. Instead, `vmstorage` could log the error and close the connection to `vmselect`, so `vmselect` was logging cryptic errors such as `cannot execute funcName="..." on vmstorage "...": EOF`.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add support for time zone selection for older versions of browsers. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3680)
## [v1.86.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.86.2)

View file

@ -488,7 +488,7 @@ func (s *Server) processRequest(ctx *vmselectRequestCtx) error {
ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout)
// Process the rpcName call.
if err := s.processRPCWithConcurrencyLimit(ctx, rpcName); err != nil {
if err := s.processRPC(ctx, rpcName); err != nil {
return fmt.Errorf("cannot execute %q: %s", rpcName, err)
}
@ -501,9 +501,10 @@ func (s *Server) processRequest(ctx *vmselectRequestCtx) error {
return nil
}
func (s *Server) processRPCWithConcurrencyLimit(ctx *vmselectRequestCtx, rpcName string) error {
func (s *Server) beginConcurrentRequest(ctx *vmselectRequestCtx) error {
select {
case s.concurrencyLimitCh <- struct{}{}:
return nil
default:
d := time.Duration(ctx.timeout) * time.Second
if d > s.limits.MaxQueueDuration {
@ -515,6 +516,7 @@ func (s *Server) processRPCWithConcurrencyLimit(ctx *vmselectRequestCtx, rpcName
case s.concurrencyLimitCh <- struct{}{}:
timerpool.Put(t)
ctx.qt.Printf("wait in queue because -%s=%d concurrent requests are executed", s.limits.MaxConcurrentRequestsFlagName, s.limits.MaxConcurrentRequests)
return nil
case <-t.C:
timerpool.Put(t)
s.concurrencyLimitTimeout.Inc()
@ -525,9 +527,10 @@ func (s *Server) processRPCWithConcurrencyLimit(ctx *vmselectRequestCtx, rpcName
s.limits.MaxQueueDurationFlagName, s.limits.MaxQueueDuration, s.limits.MaxConcurrentRequestsFlagName)
}
}
err := s.processRPC(ctx, rpcName)
}
func (s *Server) endConcurrentRequest() {
<-s.concurrencyLimitCh
return err
}
func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error {
@ -585,6 +588,11 @@ func (s *Server) processRegisterMetricNames(ctx *vmselectRequestCtx) error {
mr.Timestamp = int64(n)
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Register metric names from mrs.
if err := s.api.RegisterMetricNames(ctx.qt, mrs, ctx.deadline); err != nil {
return ctx.writeErrorMessage(err)
@ -605,6 +613,11 @@ func (s *Server) processDeleteSeries(ctx *vmselectRequestCtx) error {
return err
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Execute the request.
deletedCount, err := s.api.DeleteSeries(ctx.qt, &ctx.sq, ctx.deadline)
if err != nil {
@ -637,6 +650,11 @@ func (s *Server) processLabelNames(ctx *vmselectRequestCtx) error {
maxLabelNames = s.limits.MaxLabelNames
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Execute the request
labelNames, err := s.api.LabelNames(ctx.qt, &ctx.sq, maxLabelNames, ctx.deadline)
if err != nil {
@ -682,6 +700,11 @@ func (s *Server) processLabelValues(ctx *vmselectRequestCtx) error {
maxLabelValues = s.limits.MaxLabelValues
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Execute the request
labelValues, err := s.api.LabelValues(ctx.qt, &ctx.sq, labelName, maxLabelValues, ctx.deadline)
if err != nil {
@ -742,6 +765,11 @@ func (s *Server) processTagValueSuffixes(ctx *vmselectRequestCtx) error {
maxSuffixes = s.limits.MaxTagValueSuffixes
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Execute the request
suffixes, err := s.api.TagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, ctx.deadline)
if err != nil {
@ -783,6 +811,11 @@ func (s *Server) processSeriesCount(ctx *vmselectRequestCtx) error {
return err
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Execute the request
n, err := s.api.SeriesCount(ctx.qt, accountID, projectID, ctx.deadline)
if err != nil {
@ -817,6 +850,11 @@ func (s *Server) processTSDBStatus(ctx *vmselectRequestCtx) error {
return fmt.Errorf("cannot read topN: %w", err)
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Execute the request
status, err := s.api.TSDBStatus(ctx.qt, &ctx.sq, focusLabel, int(topN), ctx.deadline)
if err != nil {
@ -841,6 +879,11 @@ func (s *Server) processTenants(ctx *vmselectRequestCtx) error {
return err
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Execute the request
tenants, err := s.api.Tenants(ctx.qt, tr, ctx.deadline)
if err != nil {
@ -913,6 +956,11 @@ func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error {
return err
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Execute request.
metricNames, err := s.api.SearchMetricNames(ctx.qt, &ctx.sq, ctx.deadline)
if err != nil {
@ -946,6 +994,11 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error {
return err
}
if err := s.beginConcurrentRequest(ctx); err != nil {
return ctx.writeErrorMessage(err)
}
defer s.endConcurrentRequest()
// Initiaialize the search.
startTime := time.Now()
bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline)