diff --git a/lib/streamaggr/max.go b/lib/streamaggr/max.go index 9197d3add..eb185dfe9 100644 --- a/lib/streamaggr/max.go +++ b/lib/streamaggr/max.go @@ -1,9 +1,10 @@ package streamaggr import ( + "fmt" "sync" - - "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "time" + // "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" ) // maxAggrState calculates output=max, e.g. the maximum value over input samples. @@ -29,19 +30,22 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { again: v, ok := as.m.Load(outputKey) if !ok { + fmt.Printf("wang The entry is missing in the map: %d\n", time.Now().UnixMicro()) // The entry is missing in the map. Try creating it. v = &maxStateValue{ max: s.value, } - outputKey = bytesutil.InternString(outputKey) + // outputKey = bytesutil.InternString(outputKey) vNew, loaded := as.m.LoadOrStore(outputKey, v) if !loaded { + fmt.Printf("wang new entry has been successfully created: %d\n", time.Now().UnixMicro()) // The new entry has been successfully created. continue } // Use the entry created by a concurrent goroutine. v = vNew } + fmt.Printf("wang the right way %d\n", time.Now().Unix()) sv := v.(*maxStateValue) sv.mu.Lock() deleted := sv.deleted @@ -57,13 +61,22 @@ func (as *maxAggrState) pushSamples(samples []pushSample) { goto again } } + length := 0 + as.m.Range(func(_, _ interface{}) bool { + length++ + return true + }) + fmt.Printf("wang sync map length %d\n", length) } func (as *maxAggrState) flushState(ctx *flushCtx) { m := &as.m + // fmt.Println("Im outside %d", time.Now().Unix()) + m.Range(func(k, v any) bool { // Atomically delete the entry from the map, so new entry is created for the next flush. m.Delete(k) + // fmt.Println("wang I have deleted the entry: %d", time.Now().UnixMicro()) sv := v.(*maxStateValue) sv.mu.Lock() @@ -73,6 +86,8 @@ func (as *maxAggrState) flushState(ctx *flushCtx) { sv.mu.Unlock() key := k.(string) + // fmt.Printf("Im here %d, %v", time.Now().UnixMicro(), max) + ctx.appendSeries(key, "max", max) return true }) diff --git a/lib/streamaggr/max_test.go b/lib/streamaggr/max_test.go new file mode 100644 index 000000000..f3f3dc61e --- /dev/null +++ b/lib/streamaggr/max_test.go @@ -0,0 +1,89 @@ +package streamaggr + +import ( + // "fmt" + "testing" + "time" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" + // "time" + // "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" +) + +func TestPushSampleRace(t *testing.T) { + as := maxAggrState{} + for i := 0; i < 2; i++ { + go func() { + inputLabels := []prompbmarshal.Label{ + { + Name: "id", + Value: "1", + // Value: fmt.Sprintf("%d", i), + }, + } + outputLabels := []prompbmarshal.Label{ + + { + Name: "label", + Value: "label", + }, + { + Name: "label1", + Value: "label1", + }, + { + Name: "label2", + Value: "label2", + }, + { + Name: "label3", + Value: "label3", + }, + { + Name: "label4", + Value: "label4", + }, + { + Name: "label5", + Value: "label5", + }, + { + Name: "label6", + Value: "label6", + }, + { + Name: "label7", + Value: "label7", + }, + { + Name: "label8", + Value: "label8", + }, + { + Name: "label9", + Value: "label9", + }, + { + Name: "label11", + Value: "label11", + }, + { + Name: "label12", + Value: "label12", + }, + } + buf := compressLabels([]byte{}, inputLabels, outputLabels) + key := bytesutil.ToUnsafeString(buf) + sample := []pushSample{ + { + key: key, + value: 1, + timestamp: 123, + }, + } + as.pushSamples(sample) + }() + } + time.Sleep(1 * time.Second) +}