mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
lib/querytracer: makes package concurrent safe to use (#5322)
* lib/querytracer: makes package concurrent safe to use it must fix various issues with concurrent code usage. Especially, when it's not reasonable to wait for all goroutines to be finished * wip --------- Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
parent
236404eb1f
commit
a9ad7f45d4
3 changed files with 66 additions and 22 deletions
|
@ -15,6 +15,8 @@ The following `tip` changes can be tested by building VictoriaMetrics components
|
|||
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly apply [relabeling](https://docs.victoriametrics.com/vmagent.html#relabeling) with `regex`, which start and end with `.+` or `.*` and which contain alternate sub-regexps. For example, `.+;|;.+` or `.*foo|bar|baz.*`. Previously such regexps were improperly parsed, which could result in undexpected relabeling results. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5297).
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/vmagent.html): properly decode Snappy-encoded data blocks received via [VictoriaMetrics remote_write protocol](https://docs.victoriametrics.com/vmagent.html#victoriametrics-remote-write-protocol). See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5301).
|
||||
* BUGFIX: fix panic, which could occur when [query tracing](https://docs.victoriametrics.com/#query-tracing) is enabled. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5319).
|
||||
|
||||
|
||||
## [v1.93.7](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.93.7)
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
|
||||
|
@ -22,6 +23,14 @@ var denyQueryTracing = flag.Bool("denyQueryTracing", false, "Whether to disable
|
|||
// Tracer may contain sub-tracers (branches) in order to build tree-like execution order.
|
||||
// Call Tracer.NewChild func for adding sub-tracer.
|
||||
type Tracer struct {
|
||||
// isDone is set to true after Done* call.
|
||||
//
|
||||
// It is used for determining whether it is safe to print the trace.
|
||||
// It is unsafe to print the trace when it isn't closed yet, since it may be modified
|
||||
// by concurrently running goroutines.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5319
|
||||
isDone atomic.Bool
|
||||
|
||||
// startTime is the time when Tracer was created
|
||||
startTime time.Time
|
||||
// doneTime is the time when Done or Donef was called
|
||||
|
@ -30,7 +39,7 @@ type Tracer struct {
|
|||
message string
|
||||
// children is a list of children Tracer objects
|
||||
children []*Tracer
|
||||
// span contains span for the given Tracer. It is added via Tracer.AddSpan().
|
||||
// span contains span for the given Tracer. It is added via Tracer.AddJSON().
|
||||
// If span is non-nil, then the remaining fields aren't used.
|
||||
span *span
|
||||
}
|
||||
|
@ -68,7 +77,7 @@ func (t *Tracer) NewChild(format string, args ...interface{}) *Tracer {
|
|||
if t == nil {
|
||||
return nil
|
||||
}
|
||||
if !t.doneTime.IsZero() {
|
||||
if t.isDone.Load() {
|
||||
panic(fmt.Errorf("BUG: NewChild() cannot be called after Donef(%q) call", t.message))
|
||||
}
|
||||
child := &Tracer{
|
||||
|
@ -87,10 +96,11 @@ func (t *Tracer) Done() {
|
|||
if t == nil {
|
||||
return
|
||||
}
|
||||
if !t.doneTime.IsZero() {
|
||||
if t.isDone.Load() {
|
||||
panic(fmt.Errorf("BUG: Donef(%q) already called", t.message))
|
||||
}
|
||||
t.doneTime = time.Now()
|
||||
t.isDone.Store(true)
|
||||
}
|
||||
|
||||
// Donef appends the given fmt.Sprintf(format, args..) message to t and finished it.
|
||||
|
@ -101,11 +111,12 @@ func (t *Tracer) Donef(format string, args ...interface{}) {
|
|||
if t == nil {
|
||||
return
|
||||
}
|
||||
if !t.doneTime.IsZero() {
|
||||
if t.isDone.Load() {
|
||||
panic(fmt.Errorf("BUG: Donef(%q) already called", t.message))
|
||||
}
|
||||
t.message += ": " + fmt.Sprintf(format, args...)
|
||||
t.doneTime = time.Now()
|
||||
t.isDone.Store(true)
|
||||
}
|
||||
|
||||
// Printf adds new fmt.Sprintf(format, args...) message to t.
|
||||
|
@ -115,7 +126,7 @@ func (t *Tracer) Printf(format string, args ...interface{}) {
|
|||
if t == nil {
|
||||
return
|
||||
}
|
||||
if !t.doneTime.IsZero() {
|
||||
if t.isDone.Load() {
|
||||
panic(fmt.Errorf("BUG: Printf() cannot be called after Done(%q) call", t.message))
|
||||
}
|
||||
now := time.Now()
|
||||
|
@ -124,6 +135,7 @@ func (t *Tracer) Printf(format string, args ...interface{}) {
|
|||
doneTime: now,
|
||||
message: fmt.Sprintf(format, args...),
|
||||
}
|
||||
child.isDone.Store(true)
|
||||
t.children = append(t.children, child)
|
||||
}
|
||||
|
||||
|
@ -153,6 +165,9 @@ func (t *Tracer) AddJSON(jsonTrace []byte) error {
|
|||
// String returns string representation of t.
|
||||
//
|
||||
// String must be called when t methods aren't called by other goroutines.
|
||||
//
|
||||
// It is safe calling String() when child tracers aren't finished yet.
|
||||
// In this case they will contain the corresponding message.
|
||||
func (t *Tracer) String() string {
|
||||
if t == nil {
|
||||
return ""
|
||||
|
@ -166,6 +181,9 @@ func (t *Tracer) String() string {
|
|||
// ToJSON returns JSON representation of t.
|
||||
//
|
||||
// ToJSON must be called when t methods aren't called by other goroutines.
|
||||
|
||||
// It is safe calling ToJSON() when child tracers aren't finished yet.
|
||||
// In this case they will contain the corresponding message.
|
||||
func (t *Tracer) ToJSON() string {
|
||||
if t == nil {
|
||||
return ""
|
||||
|
@ -187,6 +205,12 @@ func (t *Tracer) toSpanInternal(prevTime time.Time) (*span, time.Time) {
|
|||
if t.span != nil {
|
||||
return t.span, prevTime
|
||||
}
|
||||
if !t.isDone.Load() {
|
||||
s := &span{
|
||||
Message: fmt.Sprintf("missing Tracer.Done() call for the trace with message=%s", t.message),
|
||||
}
|
||||
return s, prevTime
|
||||
}
|
||||
if t.doneTime == t.startTime {
|
||||
// a single-line trace
|
||||
d := t.startTime.Sub(prevTime)
|
||||
|
@ -199,10 +223,6 @@ func (t *Tracer) toSpanInternal(prevTime time.Time) (*span, time.Time) {
|
|||
// tracer with children
|
||||
msg := t.message
|
||||
doneTime := t.doneTime
|
||||
if doneTime.IsZero() {
|
||||
msg += ": missing Tracer.Done() call"
|
||||
doneTime = t.getLastChildDoneTime(t.startTime)
|
||||
}
|
||||
d := doneTime.Sub(t.startTime)
|
||||
var children []*span
|
||||
var sChild *span
|
||||
|
@ -219,14 +239,6 @@ func (t *Tracer) toSpanInternal(prevTime time.Time) (*span, time.Time) {
|
|||
return s, doneTime
|
||||
}
|
||||
|
||||
func (t *Tracer) getLastChildDoneTime(defaultTime time.Time) time.Time {
|
||||
if len(t.children) == 0 {
|
||||
return defaultTime
|
||||
}
|
||||
lastChild := t.children[len(t.children)-1]
|
||||
return lastChild.getLastChildDoneTime(lastChild.startTime)
|
||||
}
|
||||
|
||||
// span represents a single trace span
|
||||
type span struct {
|
||||
// DurationMsec is the duration for the current trace span in milliseconds.
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
package querytracer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
|
@ -137,11 +139,39 @@ func TestTraceMissingDonef(t *testing.T) {
|
|||
qtChild.Printf("child printf")
|
||||
qt.Printf("another parent printf")
|
||||
s := qt.String()
|
||||
sExpected := `- 0ms: : parent: missing Tracer.Done() call
|
||||
| - 0ms: parent printf
|
||||
| - 0ms: child: missing Tracer.Done() call
|
||||
| | - 0ms: child printf
|
||||
| - 0ms: another parent printf
|
||||
sExpected := `- 0.000ms: missing Tracer.Done() call for the trace with message=: parent
|
||||
`
|
||||
if !areEqualTracesSkipDuration(s, sExpected) {
|
||||
t.Fatalf("unexpected trace\ngot\n%s\nwant\n%s", s, sExpected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestTraceConcurrent(t *testing.T) {
|
||||
qt := New(true, "parent")
|
||||
childLocal := qt.NewChild("local")
|
||||
childLocal.Printf("abc")
|
||||
childLocal.Done()
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 3; i++ {
|
||||
child := qt.NewChild(fmt.Sprintf("child %d", i))
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
for j := 0; j < 100; j++ {
|
||||
child.Printf(fmt.Sprintf("message %d", j))
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
qt.Done()
|
||||
// Verify that it is safe to call qt.String() when child traces aren't done yet
|
||||
s := qt.String()
|
||||
wg.Wait()
|
||||
sExpected := `- 0.008ms: : parent
|
||||
| - 0.002ms: local
|
||||
| | - 0.000ms: abc
|
||||
| - 0.000ms: missing Tracer.Done() call for the trace with message=child 0
|
||||
| - 0.000ms: missing Tracer.Done() call for the trace with message=child 1
|
||||
| - 0.000ms: missing Tracer.Done() call for the trace with message=child 2
|
||||
`
|
||||
if !areEqualTracesSkipDuration(s, sExpected) {
|
||||
t.Fatalf("unexpected trace\ngot\n%s\nwant\n%s", s, sExpected)
|
||||
|
|
Loading…
Reference in a new issue