diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 9c7de0b6fc..02256ea297 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -15,6 +15,7 @@ The following tip changes can be tested by building VictoriaMetrics components f ## tip +* BUGFIX: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): propagate all the timeout-related errors from `vmstorage` to `vmselect` when `vmstorage`. Previously some timeout errors weren't returned from `vmselect` to `vmstorage`. Instead, `vmstorage` could log the error and close the connection to `vmselect`, so `vmselect` was logging cryptic errors such as `cannot execute funcName="..." on vmstorage "...": EOF`. * BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): add support for time zone selection for older versions of browsers. See [this pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/3680) ## [v1.86.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.86.2) diff --git a/lib/vmselectapi/server.go b/lib/vmselectapi/server.go index 8ef4f4f49d..b12620be5a 100644 --- a/lib/vmselectapi/server.go +++ b/lib/vmselectapi/server.go @@ -488,7 +488,7 @@ func (s *Server) processRequest(ctx *vmselectRequestCtx) error { ctx.deadline = fasttime.UnixTimestamp() + uint64(timeout) // Process the rpcName call. - if err := s.processRPCWithConcurrencyLimit(ctx, rpcName); err != nil { + if err := s.processRPC(ctx, rpcName); err != nil { return fmt.Errorf("cannot execute %q: %s", rpcName, err) } @@ -501,9 +501,10 @@ func (s *Server) processRequest(ctx *vmselectRequestCtx) error { return nil } -func (s *Server) processRPCWithConcurrencyLimit(ctx *vmselectRequestCtx, rpcName string) error { +func (s *Server) beginConcurrentRequest(ctx *vmselectRequestCtx) error { select { case s.concurrencyLimitCh <- struct{}{}: + return nil default: d := time.Duration(ctx.timeout) * time.Second if d > s.limits.MaxQueueDuration { @@ -515,6 +516,7 @@ func (s *Server) processRPCWithConcurrencyLimit(ctx *vmselectRequestCtx, rpcName case s.concurrencyLimitCh <- struct{}{}: timerpool.Put(t) ctx.qt.Printf("wait in queue because -%s=%d concurrent requests are executed", s.limits.MaxConcurrentRequestsFlagName, s.limits.MaxConcurrentRequests) + return nil case <-t.C: timerpool.Put(t) s.concurrencyLimitTimeout.Inc() @@ -525,9 +527,10 @@ func (s *Server) processRPCWithConcurrencyLimit(ctx *vmselectRequestCtx, rpcName s.limits.MaxQueueDurationFlagName, s.limits.MaxQueueDuration, s.limits.MaxConcurrentRequestsFlagName) } } - err := s.processRPC(ctx, rpcName) +} + +func (s *Server) endConcurrentRequest() { <-s.concurrencyLimitCh - return err } func (s *Server) processRPC(ctx *vmselectRequestCtx, rpcName string) error { @@ -585,6 +588,11 @@ func (s *Server) processRegisterMetricNames(ctx *vmselectRequestCtx) error { mr.Timestamp = int64(n) } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Register metric names from mrs. if err := s.api.RegisterMetricNames(ctx.qt, mrs, ctx.deadline); err != nil { return ctx.writeErrorMessage(err) @@ -605,6 +613,11 @@ func (s *Server) processDeleteSeries(ctx *vmselectRequestCtx) error { return err } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Execute the request. deletedCount, err := s.api.DeleteSeries(ctx.qt, &ctx.sq, ctx.deadline) if err != nil { @@ -637,6 +650,11 @@ func (s *Server) processLabelNames(ctx *vmselectRequestCtx) error { maxLabelNames = s.limits.MaxLabelNames } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Execute the request labelNames, err := s.api.LabelNames(ctx.qt, &ctx.sq, maxLabelNames, ctx.deadline) if err != nil { @@ -682,6 +700,11 @@ func (s *Server) processLabelValues(ctx *vmselectRequestCtx) error { maxLabelValues = s.limits.MaxLabelValues } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Execute the request labelValues, err := s.api.LabelValues(ctx.qt, &ctx.sq, labelName, maxLabelValues, ctx.deadline) if err != nil { @@ -742,6 +765,11 @@ func (s *Server) processTagValueSuffixes(ctx *vmselectRequestCtx) error { maxSuffixes = s.limits.MaxTagValueSuffixes } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Execute the request suffixes, err := s.api.TagValueSuffixes(ctx.qt, accountID, projectID, tr, tagKey, tagValuePrefix, delimiter, maxSuffixes, ctx.deadline) if err != nil { @@ -783,6 +811,11 @@ func (s *Server) processSeriesCount(ctx *vmselectRequestCtx) error { return err } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Execute the request n, err := s.api.SeriesCount(ctx.qt, accountID, projectID, ctx.deadline) if err != nil { @@ -817,6 +850,11 @@ func (s *Server) processTSDBStatus(ctx *vmselectRequestCtx) error { return fmt.Errorf("cannot read topN: %w", err) } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Execute the request status, err := s.api.TSDBStatus(ctx.qt, &ctx.sq, focusLabel, int(topN), ctx.deadline) if err != nil { @@ -841,6 +879,11 @@ func (s *Server) processTenants(ctx *vmselectRequestCtx) error { return err } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Execute the request tenants, err := s.api.Tenants(ctx.qt, tr, ctx.deadline) if err != nil { @@ -913,6 +956,11 @@ func (s *Server) processSearchMetricNames(ctx *vmselectRequestCtx) error { return err } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Execute request. metricNames, err := s.api.SearchMetricNames(ctx.qt, &ctx.sq, ctx.deadline) if err != nil { @@ -946,6 +994,11 @@ func (s *Server) processSearch(ctx *vmselectRequestCtx) error { return err } + if err := s.beginConcurrentRequest(ctx); err != nil { + return ctx.writeErrorMessage(err) + } + defer s.endConcurrentRequest() + // Initiaialize the search. startTime := time.Now() bi, err := s.api.InitSearch(ctx.qt, &ctx.sq, ctx.deadline)