diff --git a/app/vmselect/netstorage/netstorage.go b/app/vmselect/netstorage/netstorage.go index 1ad725939d..c252ffb0c7 100644 --- a/app/vmselect/netstorage/netstorage.go +++ b/app/vmselect/netstorage/netstorage.go @@ -1170,6 +1170,8 @@ func mergeTSDBStatuses(statuses []*storage.TSDBStatus, topN int) *storage.TSDBSt seriesCountByMetricName := make(map[string]uint64) labelValueCountByLabelName := make(map[string]uint64) seriesCountByLabelValuePair := make(map[string]uint64) + totalSeries := uint64(0) + totalLabelValuePairs := uint64(0) for _, st := range statuses { for _, e := range st.SeriesCountByMetricName { seriesCountByMetricName[e.Name] += e.Count @@ -1184,11 +1186,15 @@ func mergeTSDBStatuses(statuses []*storage.TSDBStatus, topN int) *storage.TSDBSt for _, e := range st.SeriesCountByLabelValuePair { seriesCountByLabelValuePair[e.Name] += e.Count } + totalSeries += st.TotalSeries + totalLabelValuePairs += st.TotalLabelValuePairs } return &storage.TSDBStatus{ SeriesCountByMetricName: toTopHeapEntries(seriesCountByMetricName, topN), LabelValueCountByLabelName: toTopHeapEntries(labelValueCountByLabelName, topN), SeriesCountByLabelValuePair: toTopHeapEntries(seriesCountByLabelValuePair, topN), + TotalSeries: totalSeries, + TotalLabelValuePairs: totalLabelValuePairs, } } @@ -2395,6 +2401,36 @@ func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, ac } // Read response + return readTSDBStatus(bc) +} + +func (sn *storageNode) getTSDBStatusWithFiltersOnConn(bc *handshake.BufferedConn, requestData []byte, topN int) (*storage.TSDBStatus, error) { + // Send the request to sn. + if err := writeBytes(bc, requestData); err != nil { + return nil, fmt.Errorf("cannot write requestData: %w", err) + } + // topN shouldn't exceed 32 bits, so send it as uint32. + if err := writeUint32(bc, uint32(topN)); err != nil { + return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err) + } + if err := bc.Flush(); err != nil { + return nil, fmt.Errorf("cannot flush tsdbStatusWithFilters args to conn: %w", err) + } + + // Read response error. + buf, err := readBytes(nil, bc, maxErrorMessageSize) + if err != nil { + return nil, fmt.Errorf("cannot read error message: %w", err) + } + if len(buf) > 0 { + return nil, newErrRemote(buf) + } + + // Read response + return readTSDBStatus(bc) +} + +func readTSDBStatus(bc *handshake.BufferedConn) (*storage.TSDBStatus, error) { seriesCountByMetricName, err := readTopHeapEntries(bc) if err != nil { return nil, fmt.Errorf("cannot read seriesCountByMetricName: %w", err) @@ -2425,49 +2461,6 @@ func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, ac return status, nil } -func (sn *storageNode) getTSDBStatusWithFiltersOnConn(bc *handshake.BufferedConn, requestData []byte, topN int) (*storage.TSDBStatus, error) { - // Send the request to sn. - if err := writeBytes(bc, requestData); err != nil { - return nil, fmt.Errorf("cannot write requestData: %w", err) - } - // topN shouldn't exceed 32 bits, so send it as uint32. - if err := writeUint32(bc, uint32(topN)); err != nil { - return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err) - } - if err := bc.Flush(); err != nil { - return nil, fmt.Errorf("cannot flush tsdbStatusWithFilters args to conn: %w", err) - } - - // Read response error. - buf, err := readBytes(nil, bc, maxErrorMessageSize) - if err != nil { - return nil, fmt.Errorf("cannot read error message: %w", err) - } - if len(buf) > 0 { - return nil, newErrRemote(buf) - } - - // Read response - seriesCountByMetricName, err := readTopHeapEntries(bc) - if err != nil { - return nil, fmt.Errorf("cannot read seriesCountByMetricName: %w", err) - } - labelValueCountByLabelName, err := readTopHeapEntries(bc) - if err != nil { - return nil, fmt.Errorf("cannot read labelValueCountByLabelName: %w", err) - } - seriesCountByLabelValuePair, err := readTopHeapEntries(bc) - if err != nil { - return nil, fmt.Errorf("cannot read seriesCountByLabelValuePair: %w", err) - } - status := &storage.TSDBStatus{ - SeriesCountByMetricName: seriesCountByMetricName, - LabelValueCountByLabelName: labelValueCountByLabelName, - SeriesCountByLabelValuePair: seriesCountByLabelValuePair, - } - return status, nil -} - func readTopHeapEntries(bc *handshake.BufferedConn) ([]storage.TopHeapEntry, error) { n, err := readUint64(bc) if err != nil { diff --git a/app/vmstorage/transport/server.go b/app/vmstorage/transport/server.go index 901f28197c..7e8f4b0d34 100644 --- a/app/vmstorage/transport/server.go +++ b/app/vmstorage/transport/server.go @@ -950,6 +950,10 @@ func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error { } // Send status to vmselect. + return writeTSDBStatus(ctx, status) +} + +func writeTSDBStatus(ctx *vmselectRequestCtx, status *storage.TSDBStatus) error { if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil { return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err) } @@ -1001,16 +1005,7 @@ func (s *Server) processVMSelectTSDBStatusWithFilters(ctx *vmselectRequestCtx) e } // Send status to vmselect. - if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil { - return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err) - } - if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil { - return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %w", err) - } - if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil { - return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %w", err) - } - return nil + return writeTSDBStatus(ctx, status) } func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error {