lib/storage: obtain all the time series ids from (tag->metricIDs) rows instead of (metricID->TSID) rows, since this much faster

This commit is contained in:
Aliaksandr Valialkin 2019-11-09 18:00:58 +02:00
parent cdbe848102
commit 695c1dc5eb

View file

@ -1212,20 +1212,35 @@ func (is *indexSearch) getTSIDByMetricID(dst *TSID, metricID uint64) error {
func (is *indexSearch) getSeriesCount() (uint64, error) {
ts := &is.ts
kb := &is.kb
var n uint64
kb.B = append(kb.B[:0], nsPrefixMetricIDToTSID)
mp := &is.mp
var metricIDsLen uint64
// Extract the number of series from ((__name__=value): metricIDs) rows
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, nil)
ts.Seek(kb.B)
for ts.NextItem() {
if !bytes.HasPrefix(ts.Item, kb.B) {
item := ts.Item
if !bytes.HasPrefix(item, kb.B) {
break
}
tail := item[len(kb.B):]
n := bytes.IndexByte(tail, tagSeparatorChar)
if n < 0 {
return 0, fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar %d", item, tagSeparatorChar)
}
tail = tail[n+1:]
if err := mp.InitOnlyTail(item, tail); err != nil {
return 0, err
}
// Take into account deleted timeseries too.
n++
// It is OK if series can be counted multiple times in rare cases -
// the returned number is an estimation.
metricIDsLen += uint64(mp.MetricIDsLen())
}
if err := ts.Error(); err != nil {
return 0, fmt.Errorf("error when counting unique timeseries: %s", err)
}
return n, nil
return metricIDsLen, nil
}
// updateMetricIDsByMetricNameMatch matches metricName values for the given srcMetricIDs against tfs
@ -2092,7 +2107,11 @@ func (is *indexSearch) containsTimeRange(tr TimeRange) (bool, error) {
func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics int) error {
ts := &is.ts
kb := &is.kb
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixMetricIDToTSID)
mp := &is.mp
// Extract all the mtricIDs from (__name__=value) metricIDs.
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
kb.B = marshalTagValue(kb.B, nil)
prefix := kb.B
ts.Seek(prefix)
for ts.NextItem() {
@ -2101,11 +2120,18 @@ func (is *indexSearch) updateMetricIDsAll(metricIDs *uint64set.Set, maxMetrics i
return nil
}
tail := item[len(prefix):]
if len(tail) < 8 {
return fmt.Errorf("cannot unmarshal metricID from item with size %d; need at least 9 bytes; item=%q", len(tail), tail)
n := bytes.IndexByte(tail, tagSeparatorChar)
if n < 0 {
return fmt.Errorf("invalid tag->metricIDs line %q: cannot find tagSeparatorChar %d", item, tagSeparatorChar)
}
tail = tail[n+1:]
if err := mp.InitOnlyTail(item, tail); err != nil {
return err
}
mp.ParseMetricIDs()
for _, metricID := range mp.MetricIDs {
metricIDs.Add(metricID)
}
metricID := encoding.UnmarshalUint64(tail)
metricIDs.Add(metricID)
if metricIDs.Len() >= maxMetrics {
return nil
}