app: properly collect and merge /api/v1/status/tsdb info from vmstorage nodes

The collection has been broken in f2754c3e90

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2233
This commit is contained in:
Aliaksandr Valialkin 2022-06-08 19:25:59 +03:00
parent 8d98b8ae10
commit 2b343d8bd0
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 41 additions and 53 deletions

View file

@ -1170,6 +1170,8 @@ func mergeTSDBStatuses(statuses []*storage.TSDBStatus, topN int) *storage.TSDBSt
seriesCountByMetricName := make(map[string]uint64)
labelValueCountByLabelName := make(map[string]uint64)
seriesCountByLabelValuePair := make(map[string]uint64)
totalSeries := uint64(0)
totalLabelValuePairs := uint64(0)
for _, st := range statuses {
for _, e := range st.SeriesCountByMetricName {
seriesCountByMetricName[e.Name] += e.Count
@ -1184,11 +1186,15 @@ func mergeTSDBStatuses(statuses []*storage.TSDBStatus, topN int) *storage.TSDBSt
for _, e := range st.SeriesCountByLabelValuePair {
seriesCountByLabelValuePair[e.Name] += e.Count
}
totalSeries += st.TotalSeries
totalLabelValuePairs += st.TotalLabelValuePairs
}
return &storage.TSDBStatus{
SeriesCountByMetricName: toTopHeapEntries(seriesCountByMetricName, topN),
LabelValueCountByLabelName: toTopHeapEntries(labelValueCountByLabelName, topN),
SeriesCountByLabelValuePair: toTopHeapEntries(seriesCountByLabelValuePair, topN),
TotalSeries: totalSeries,
TotalLabelValuePairs: totalLabelValuePairs,
}
}
@ -2395,6 +2401,36 @@ func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, ac
}
// Read response
return readTSDBStatus(bc)
}
func (sn *storageNode) getTSDBStatusWithFiltersOnConn(bc *handshake.BufferedConn, requestData []byte, topN int) (*storage.TSDBStatus, error) {
// Send the request to sn.
if err := writeBytes(bc, requestData); err != nil {
return nil, fmt.Errorf("cannot write requestData: %w", err)
}
// topN shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32(bc, uint32(topN)); err != nil {
return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err)
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush tsdbStatusWithFilters args to conn: %w", err)
}
// Read response error.
buf, err := readBytes(nil, bc, maxErrorMessageSize)
if err != nil {
return nil, fmt.Errorf("cannot read error message: %w", err)
}
if len(buf) > 0 {
return nil, newErrRemote(buf)
}
// Read response
return readTSDBStatus(bc)
}
func readTSDBStatus(bc *handshake.BufferedConn) (*storage.TSDBStatus, error) {
seriesCountByMetricName, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read seriesCountByMetricName: %w", err)
@ -2425,49 +2461,6 @@ func (sn *storageNode) getTSDBStatusForDateOnConn(bc *handshake.BufferedConn, ac
return status, nil
}
func (sn *storageNode) getTSDBStatusWithFiltersOnConn(bc *handshake.BufferedConn, requestData []byte, topN int) (*storage.TSDBStatus, error) {
// Send the request to sn.
if err := writeBytes(bc, requestData); err != nil {
return nil, fmt.Errorf("cannot write requestData: %w", err)
}
// topN shouldn't exceed 32 bits, so send it as uint32.
if err := writeUint32(bc, uint32(topN)); err != nil {
return nil, fmt.Errorf("cannot send topN=%d to conn: %w", topN, err)
}
if err := bc.Flush(); err != nil {
return nil, fmt.Errorf("cannot flush tsdbStatusWithFilters args to conn: %w", err)
}
// Read response error.
buf, err := readBytes(nil, bc, maxErrorMessageSize)
if err != nil {
return nil, fmt.Errorf("cannot read error message: %w", err)
}
if len(buf) > 0 {
return nil, newErrRemote(buf)
}
// Read response
seriesCountByMetricName, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read seriesCountByMetricName: %w", err)
}
labelValueCountByLabelName, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read labelValueCountByLabelName: %w", err)
}
seriesCountByLabelValuePair, err := readTopHeapEntries(bc)
if err != nil {
return nil, fmt.Errorf("cannot read seriesCountByLabelValuePair: %w", err)
}
status := &storage.TSDBStatus{
SeriesCountByMetricName: seriesCountByMetricName,
LabelValueCountByLabelName: labelValueCountByLabelName,
SeriesCountByLabelValuePair: seriesCountByLabelValuePair,
}
return status, nil
}
func readTopHeapEntries(bc *handshake.BufferedConn) ([]storage.TopHeapEntry, error) {
n, err := readUint64(bc)
if err != nil {

View file

@ -950,6 +950,10 @@ func (s *Server) processVMSelectTSDBStatus(ctx *vmselectRequestCtx) error {
}
// Send status to vmselect.
return writeTSDBStatus(ctx, status)
}
func writeTSDBStatus(ctx *vmselectRequestCtx, status *storage.TSDBStatus) error {
if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil {
return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err)
}
@ -1001,16 +1005,7 @@ func (s *Server) processVMSelectTSDBStatusWithFilters(ctx *vmselectRequestCtx) e
}
// Send status to vmselect.
if err := writeTopHeapEntries(ctx, status.SeriesCountByMetricName); err != nil {
return fmt.Errorf("cannot write seriesCountByMetricName to vmselect: %w", err)
}
if err := writeTopHeapEntries(ctx, status.LabelValueCountByLabelName); err != nil {
return fmt.Errorf("cannot write labelValueCountByLabelName to vmselect: %w", err)
}
if err := writeTopHeapEntries(ctx, status.SeriesCountByLabelValuePair); err != nil {
return fmt.Errorf("cannot write seriesCountByLabelValuePair to vmselect: %w", err)
}
return nil
return writeTSDBStatus(ctx, status)
}
func writeTopHeapEntries(ctx *vmselectRequestCtx, a []storage.TopHeapEntry) error {