mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-21 15:45:01 +00:00
add test case
This commit is contained in:
parent
c89a7a0b62
commit
0eb36e8f2e
2 changed files with 107 additions and 3 deletions
|
@ -1,9 +1,10 @@
|
||||||
package streamaggr
|
package streamaggr
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
// "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
|
// maxAggrState calculates output=max, e.g. the maximum value over input samples.
|
||||||
|
@ -29,19 +30,22 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
|
||||||
again:
|
again:
|
||||||
v, ok := as.m.Load(outputKey)
|
v, ok := as.m.Load(outputKey)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
fmt.Printf("wang The entry is missing in the map: %d\n", time.Now().UnixMicro())
|
||||||
// The entry is missing in the map. Try creating it.
|
// The entry is missing in the map. Try creating it.
|
||||||
v = &maxStateValue{
|
v = &maxStateValue{
|
||||||
max: s.value,
|
max: s.value,
|
||||||
}
|
}
|
||||||
outputKey = bytesutil.InternString(outputKey)
|
// outputKey = bytesutil.InternString(outputKey)
|
||||||
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
vNew, loaded := as.m.LoadOrStore(outputKey, v)
|
||||||
if !loaded {
|
if !loaded {
|
||||||
|
fmt.Printf("wang new entry has been successfully created: %d\n", time.Now().UnixMicro())
|
||||||
// The new entry has been successfully created.
|
// The new entry has been successfully created.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// Use the entry created by a concurrent goroutine.
|
// Use the entry created by a concurrent goroutine.
|
||||||
v = vNew
|
v = vNew
|
||||||
}
|
}
|
||||||
|
fmt.Printf("wang the right way %d\n", time.Now().Unix())
|
||||||
sv := v.(*maxStateValue)
|
sv := v.(*maxStateValue)
|
||||||
sv.mu.Lock()
|
sv.mu.Lock()
|
||||||
deleted := sv.deleted
|
deleted := sv.deleted
|
||||||
|
@ -57,13 +61,22 @@ func (as *maxAggrState) pushSamples(samples []pushSample) {
|
||||||
goto again
|
goto again
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
length := 0
|
||||||
|
as.m.Range(func(_, _ interface{}) bool {
|
||||||
|
length++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
fmt.Printf("wang sync map length %d\n", length)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (as *maxAggrState) flushState(ctx *flushCtx) {
|
func (as *maxAggrState) flushState(ctx *flushCtx) {
|
||||||
m := &as.m
|
m := &as.m
|
||||||
|
// fmt.Println("Im outside %d", time.Now().Unix())
|
||||||
|
|
||||||
m.Range(func(k, v any) bool {
|
m.Range(func(k, v any) bool {
|
||||||
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
// Atomically delete the entry from the map, so new entry is created for the next flush.
|
||||||
m.Delete(k)
|
m.Delete(k)
|
||||||
|
// fmt.Println("wang I have deleted the entry: %d", time.Now().UnixMicro())
|
||||||
|
|
||||||
sv := v.(*maxStateValue)
|
sv := v.(*maxStateValue)
|
||||||
sv.mu.Lock()
|
sv.mu.Lock()
|
||||||
|
@ -73,6 +86,8 @@ func (as *maxAggrState) flushState(ctx *flushCtx) {
|
||||||
sv.mu.Unlock()
|
sv.mu.Unlock()
|
||||||
|
|
||||||
key := k.(string)
|
key := k.(string)
|
||||||
|
// fmt.Printf("Im here %d, %v", time.Now().UnixMicro(), max)
|
||||||
|
|
||||||
ctx.appendSeries(key, "max", max)
|
ctx.appendSeries(key, "max", max)
|
||||||
return true
|
return true
|
||||||
})
|
})
|
||||||
|
|
89
lib/streamaggr/max_test.go
Normal file
89
lib/streamaggr/max_test.go
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
package streamaggr
|
||||||
|
|
||||||
|
import (
|
||||||
|
// "fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
|
||||||
|
// "time"
|
||||||
|
// "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPushSampleRace(t *testing.T) {
|
||||||
|
as := maxAggrState{}
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
go func() {
|
||||||
|
inputLabels := []prompbmarshal.Label{
|
||||||
|
{
|
||||||
|
Name: "id",
|
||||||
|
Value: "1",
|
||||||
|
// Value: fmt.Sprintf("%d", i),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
outputLabels := []prompbmarshal.Label{
|
||||||
|
|
||||||
|
{
|
||||||
|
Name: "label",
|
||||||
|
Value: "label",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label1",
|
||||||
|
Value: "label1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label2",
|
||||||
|
Value: "label2",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label3",
|
||||||
|
Value: "label3",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label4",
|
||||||
|
Value: "label4",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label5",
|
||||||
|
Value: "label5",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label6",
|
||||||
|
Value: "label6",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label7",
|
||||||
|
Value: "label7",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label8",
|
||||||
|
Value: "label8",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label9",
|
||||||
|
Value: "label9",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label11",
|
||||||
|
Value: "label11",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "label12",
|
||||||
|
Value: "label12",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
buf := compressLabels([]byte{}, inputLabels, outputLabels)
|
||||||
|
key := bytesutil.ToUnsafeString(buf)
|
||||||
|
sample := []pushSample{
|
||||||
|
{
|
||||||
|
key: key,
|
||||||
|
value: 1,
|
||||||
|
timestamp: 123,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
as.pushSamples(sample)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
Loading…
Reference in a new issue