lib/querytracer: makes package concurrent safe to use (#5322)

* lib/querytracer: makes package concurrent safe to use
it must fix various issues with concurrent code usage.
Especially, when it's not reasonable to wait for all goroutines to be finished

* wip

---------

Co-authored-by: Aliaksandr Valialkin <valyala@victoriametrics.com>
This commit is contained in:
Nikolay 2023-11-14 20:58:28 +01:00 committed by Aliaksandr Valialkin
parent cb106bdf39
commit 3121d76bee
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
3 changed files with 65 additions and 22 deletions

View file

@ -118,6 +118,7 @@ The sandbox cluster installation is running under the constant load generated by
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): correctly display query errors on [Explore Prometheus Metrics](https://docs.victoriametrics.com/#metrics-explorer) page. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5202) for details.
* BUGFIX: [vmui](https://docs.victoriametrics.com/#vmui): properly handle trailing slash in the server URL. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5203).
* BUGFIX: [vmbackupmanager](https://docs.victoriametrics.com/vmbackupmanager.html): correctly print error in logs when copying backup fails. Previously, error was displayed in metrics but was missing in logs.
* BUGFIX: fix panic, which could occur when [query tracing](https://docs.victoriametrics.com/#query-tracing) is enabled. See [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5319).
## [v1.94.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.94.0)

View file

@ -7,6 +7,7 @@ import (
"fmt"
"io"
"strings"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
@ -22,6 +23,14 @@ var denyQueryTracing = flag.Bool("denyQueryTracing", false, "Whether to disable
// Tracer may contain sub-tracers (branches) in order to build tree-like execution order.
// Call Tracer.NewChild func for adding sub-tracer.
type Tracer struct {
// isDone is set to true after Done* call.
//
// It is used for determining whether it is safe to print the trace.
// It is unsafe to print the trace when it isn't closed yet, since it may be modified
// by concurrently running goroutines.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5319
isDone atomic.Bool
// startTime is the time when Tracer was created
startTime time.Time
// doneTime is the time when Done or Donef was called
@ -30,7 +39,7 @@ type Tracer struct {
message string
// children is a list of children Tracer objects
children []*Tracer
// span contains span for the given Tracer. It is added via Tracer.AddSpan().
// span contains span for the given Tracer. It is added via Tracer.AddJSON().
// If span is non-nil, then the remaining fields aren't used.
span *span
}
@ -68,7 +77,7 @@ func (t *Tracer) NewChild(format string, args ...interface{}) *Tracer {
if t == nil {
return nil
}
if !t.doneTime.IsZero() {
if t.isDone.Load() {
panic(fmt.Errorf("BUG: NewChild() cannot be called after Donef(%q) call", t.message))
}
child := &Tracer{
@ -87,10 +96,11 @@ func (t *Tracer) Done() {
if t == nil {
return
}
if !t.doneTime.IsZero() {
if t.isDone.Load() {
panic(fmt.Errorf("BUG: Donef(%q) already called", t.message))
}
t.doneTime = time.Now()
t.isDone.Store(true)
}
// Donef appends the given fmt.Sprintf(format, args..) message to t and finished it.
@ -101,11 +111,12 @@ func (t *Tracer) Donef(format string, args ...interface{}) {
if t == nil {
return
}
if !t.doneTime.IsZero() {
if t.isDone.Load() {
panic(fmt.Errorf("BUG: Donef(%q) already called", t.message))
}
t.message += ": " + fmt.Sprintf(format, args...)
t.doneTime = time.Now()
t.isDone.Store(true)
}
// Printf adds new fmt.Sprintf(format, args...) message to t.
@ -115,7 +126,7 @@ func (t *Tracer) Printf(format string, args ...interface{}) {
if t == nil {
return
}
if !t.doneTime.IsZero() {
if t.isDone.Load() {
panic(fmt.Errorf("BUG: Printf() cannot be called after Done(%q) call", t.message))
}
now := time.Now()
@ -124,6 +135,7 @@ func (t *Tracer) Printf(format string, args ...interface{}) {
doneTime: now,
message: fmt.Sprintf(format, args...),
}
child.isDone.Store(true)
t.children = append(t.children, child)
}
@ -153,6 +165,9 @@ func (t *Tracer) AddJSON(jsonTrace []byte) error {
// String returns string representation of t.
//
// String must be called when t methods aren't called by other goroutines.
//
// It is safe calling String() when child tracers aren't finished yet.
// In this case they will contain the corresponding message.
func (t *Tracer) String() string {
if t == nil {
return ""
@ -166,6 +181,9 @@ func (t *Tracer) String() string {
// ToJSON returns JSON representation of t.
//
// ToJSON must be called when t methods aren't called by other goroutines.
// It is safe calling ToJSON() when child tracers aren't finished yet.
// In this case they will contain the corresponding message.
func (t *Tracer) ToJSON() string {
if t == nil {
return ""
@ -187,6 +205,12 @@ func (t *Tracer) toSpanInternal(prevTime time.Time) (*span, time.Time) {
if t.span != nil {
return t.span, prevTime
}
if !t.isDone.Load() {
s := &span{
Message: fmt.Sprintf("missing Tracer.Done() call for the trace with message=%s", t.message),
}
return s, prevTime
}
if t.doneTime == t.startTime {
// a single-line trace
d := t.startTime.Sub(prevTime)
@ -199,10 +223,6 @@ func (t *Tracer) toSpanInternal(prevTime time.Time) (*span, time.Time) {
// tracer with children
msg := t.message
doneTime := t.doneTime
if doneTime.IsZero() {
msg += ": missing Tracer.Done() call"
doneTime = t.getLastChildDoneTime(t.startTime)
}
d := doneTime.Sub(t.startTime)
var children []*span
var sChild *span
@ -219,14 +239,6 @@ func (t *Tracer) toSpanInternal(prevTime time.Time) (*span, time.Time) {
return s, doneTime
}
func (t *Tracer) getLastChildDoneTime(defaultTime time.Time) time.Time {
if len(t.children) == 0 {
return defaultTime
}
lastChild := t.children[len(t.children)-1]
return lastChild.getLastChildDoneTime(lastChild.startTime)
}
// span represents a single trace span
type span struct {
// DurationMsec is the duration for the current trace span in milliseconds.

View file

@ -1,7 +1,9 @@
package querytracer
import (
"fmt"
"regexp"
"sync"
"testing"
)
@ -137,11 +139,39 @@ func TestTraceMissingDonef(t *testing.T) {
qtChild.Printf("child printf")
qt.Printf("another parent printf")
s := qt.String()
sExpected := `- 0ms: : parent: missing Tracer.Done() call
| - 0ms: parent printf
| - 0ms: child: missing Tracer.Done() call
| | - 0ms: child printf
| - 0ms: another parent printf
sExpected := `- 0.000ms: missing Tracer.Done() call for the trace with message=: parent
`
if !areEqualTracesSkipDuration(s, sExpected) {
t.Fatalf("unexpected trace\ngot\n%s\nwant\n%s", s, sExpected)
}
}
func TestTraceConcurrent(t *testing.T) {
qt := New(true, "parent")
childLocal := qt.NewChild("local")
childLocal.Printf("abc")
childLocal.Done()
var wg sync.WaitGroup
for i := 0; i < 3; i++ {
child := qt.NewChild(fmt.Sprintf("child %d", i))
wg.Add(1)
go func() {
for j := 0; j < 100; j++ {
child.Printf(fmt.Sprintf("message %d", j))
}
wg.Done()
}()
}
qt.Done()
// Verify that it is safe to call qt.String() when child traces aren't done yet
s := qt.String()
wg.Wait()
sExpected := `- 0.008ms: : parent
| - 0.002ms: local
| | - 0.000ms: abc
| - 0.000ms: missing Tracer.Done() call for the trace with message=child 0
| - 0.000ms: missing Tracer.Done() call for the trace with message=child 1
| - 0.000ms: missing Tracer.Done() call for the trace with message=child 2
`
if !areEqualTracesSkipDuration(s, sExpected) {
t.Fatalf("unexpected trace\ngot\n%s\nwant\n%s", s, sExpected)