VictoriaMetrics/lib/querytracer/tracer.go
Nikolay bfd83e3cca
app/vmselect: fixes panic data race at query tracing
Previously, NewChild elements of querytracer could be referenced by concurrent
storageNode goroutines. After earlier return ( if search.skipSlowReplicas is set), it is
possible, that tracer objects could be still in-use by concurrent workers.
  It may cause panics and data races. Most probable case is when parent tracer is finished, but children
still could write data to itself via Donef() method. It triggers read-write data race at trace
formatting.

This commit adds a new methods to the querytracer package, that allows to
create children not referenced by parent and add it to the parent later.

 Orphaned child must be registered at the parent, when goroutine returns. It's done synchronously by the single caller  via finishQueryTracer call.
If child didn't finished work and reference for it is used by concurrent goroutine, new child must be created instead with
context message.
 It prevents panics and possible data races.

Related issue:
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8114

---------

Signed-off-by: f41gh7 <nik@victoriametrics.com>
Co-authored-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
Co-authored-by: Roman Khavronenko <roman@victoriametrics.com>
2025-01-24 13:56:09 +01:00

311 lines
8.2 KiB
Go

package querytracer
import (
"bytes"
"encoding/json"
"flag"
"fmt"
"io"
"strings"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/buildinfo"
)
var denyQueryTracing = flag.Bool("denyQueryTracing", false, "Whether to disable the ability to trace queries. See https://docs.victoriametrics.com/#query-tracing")
// Tracer represents query tracer.
//
// It must be created via New call.
// Each created tracer must be finalized via Done or Donef call.
//
// Tracer may contain sub-tracers (branches) in order to build tree-like execution order.
// Call Tracer.NewChild func for adding sub-tracer.
type Tracer struct {
// isDone is set to true after Done* call.
//
// It is used for determining whether it is safe to print the trace.
// It is unsafe to print the trace when it isn't closed yet, since it may be modified
// by concurrently running goroutines.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/5319
isDone atomic.Bool
// startTime is the time when Tracer was created
startTime time.Time
// doneTime is the time when Done or Donef was called
doneTime time.Time
// message is the message generated by NewChild, Printf or Donef call.
message string
// children is a list of children Tracer objects
children []*Tracer
// span contains span for the given Tracer. It is added via Tracer.AddJSON().
// If span is non-nil, then the remaining fields aren't used.
span *span
}
// New creates a new instance of the tracer with the given fmt.Sprintf(format, args...) message.
//
// If enabled isn't set, then all function calls to the returned object will be no-op.
//
// Done or Donef must be called when the tracer should be finished.
func New(enabled bool, format string, args ...any) *Tracer {
if *denyQueryTracing || !enabled {
return nil
}
message := fmt.Sprintf(format, args...)
message = buildinfo.Version + ": " + message
return &Tracer{
message: message,
startTime: time.Now(),
}
}
// Enabled returns true if the t is enabled.
func (t *Tracer) Enabled() bool {
return t != nil
}
// NewChild adds a new child Tracer to t with the given fmt.Sprintf(format, args...) message.
//
// The returned child must be closed via Done or Donef calls.
//
// NewChild cannot be called from concurrent goroutines.
// Create children tracers from a single goroutine and then pass them
// to concurrent goroutines.
func (t *Tracer) NewChild(format string, args ...any) *Tracer {
if t == nil {
return nil
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: NewChild() cannot be called after Donef(%q) call", t.message))
}
child := &Tracer{
message: fmt.Sprintf(format, args...),
startTime: time.Now(),
}
t.children = append(t.children, child)
return child
}
// NewOrphan returns a new Tracer without registering it as t child.
//
// The returned Tracer should be added to the parent manually via AddChild() call.
//
// NewOrphan cannot be called from concurrent goroutines.
// Create orphaned Tracers from a single goroutine and then pass them
// to concurrent goroutines instead.
func NewOrphan(t *Tracer, format string, args ...any) *Tracer {
if t == nil {
return nil
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: NewOrphan() cannot be called after Donef(%q) call", t.message))
}
child := &Tracer{
message: fmt.Sprintf(format, args...),
startTime: time.Now(),
}
return child
}
// AddChild registers given child tracer at t.
func (t *Tracer) AddChild(child *Tracer) {
if t == nil {
return
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: AddChild() cannot be called after Donef(%q) call", t.message))
}
t.children = append(t.children, child)
}
// Done finishes t.
//
// Done cannot be called multiple times.
// Other Tracer functions cannot be called after Done call.
func (t *Tracer) Done() {
if t == nil {
return
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: Donef(%q) already called", t.message))
}
t.doneTime = time.Now()
t.isDone.Store(true)
}
// Donef appends the given fmt.Sprintf(format, args..) message to t and finished it.
//
// Donef cannot be called multiple times.
// Other Tracer functions cannot be called after Donef call.
func (t *Tracer) Donef(format string, args ...any) {
if t == nil {
return
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: Donef(%q) already called", t.message))
}
t.message += ": " + fmt.Sprintf(format, args...)
t.doneTime = time.Now()
t.isDone.Store(true)
}
// Printf adds new fmt.Sprintf(format, args...) message to t.
//
// Printf cannot be called from concurrent goroutines.
func (t *Tracer) Printf(format string, args ...any) {
if t == nil {
return
}
if t.isDone.Load() {
panic(fmt.Errorf("BUG: Printf() cannot be called after Done(%q) call", t.message))
}
now := time.Now()
child := &Tracer{
startTime: now,
doneTime: now,
message: fmt.Sprintf(format, args...),
}
child.isDone.Store(true)
t.children = append(t.children, child)
}
// AddJSON adds a sub-trace to t.
//
// The jsonTrace must be encoded with ToJSON.
//
// AddJSON cannot be called from concurrent goroutines.
func (t *Tracer) AddJSON(jsonTrace []byte) error {
if t == nil {
return nil
}
if len(jsonTrace) == 0 {
return nil
}
var s *span
if err := json.Unmarshal(jsonTrace, &s); err != nil {
return fmt.Errorf("cannot unmarshal json trace: %w", err)
}
child := &Tracer{
span: s,
}
t.children = append(t.children, child)
return nil
}
// String returns string representation of t.
//
// String must be called when t methods aren't called by other goroutines.
//
// It is safe calling String() when child tracers aren't finished yet.
// In this case they will contain the corresponding message.
func (t *Tracer) String() string {
if t == nil {
return ""
}
s := t.toSpan()
var bb bytes.Buffer
s.writePlaintextWithIndent(&bb, 0)
return bb.String()
}
// ToJSON returns JSON representation of t.
//
// ToJSON must be called when t methods aren't called by other goroutines.
//
// It is safe calling ToJSON() when child tracers aren't finished yet.
// In this case they will contain the corresponding message.
func (t *Tracer) ToJSON() string {
if t == nil {
return ""
}
s := t.toSpan()
data, err := json.Marshal(s)
if err != nil {
panic(fmt.Errorf("BUG: unexpected error from json.Marshal: %w", err))
}
return string(data)
}
func (t *Tracer) toSpan() *span {
s, _ := t.toSpanInternal(time.Now())
return s
}
func (t *Tracer) toSpanInternal(prevTime time.Time) (*span, time.Time) {
if t.span != nil {
return t.span, prevTime
}
if !t.isDone.Load() {
s := &span{
Message: fmt.Sprintf("missing Tracer.Done() call for the trace with message=%s", t.message),
}
return s, prevTime
}
if t.doneTime == t.startTime {
// a single-line trace
d := t.startTime.Sub(prevTime)
s := &span{
DurationMsec: float64(d.Microseconds()) / 1000,
Message: t.message,
}
return s, t.doneTime
}
// tracer with children
msg := t.message
doneTime := t.doneTime
d := doneTime.Sub(t.startTime)
var children []*span
var sChild *span
prevChildTime := t.startTime
for _, child := range t.children {
sChild, prevChildTime = child.toSpanInternal(prevChildTime)
children = append(children, sChild)
}
s := &span{
DurationMsec: float64(d.Microseconds()) / 1000,
Message: msg,
Children: children,
}
return s, doneTime
}
// span represents a single trace span
type span struct {
// DurationMsec is the duration for the current trace span in milliseconds.
DurationMsec float64 `json:"duration_msec"`
// Message is a trace message
Message string `json:"message"`
// Children contains children spans
Children []*span `json:"children,omitempty"`
}
func (s *span) writePlaintextWithIndent(w io.Writer, indent int) {
prefix := ""
for i := 0; i < indent; i++ {
prefix += "| "
}
prefix += "- "
msg := s.messageWithPrefix(prefix)
fmt.Fprintf(w, "%s%.03fms: %s\n", prefix, s.DurationMsec, msg)
childIndent := indent + 1
for _, sChild := range s.Children {
sChild.writePlaintextWithIndent(w, childIndent)
}
}
func (s *span) messageWithPrefix(prefix string) string {
prefix = strings.Replace(prefix, "-", "|", 1)
lines := strings.Split(s.Message, "\n")
result := lines[:1]
for i := range lines[1:] {
ln := lines[i+1]
if ln == "" {
continue
}
ln = prefix + ln
result = append(result, ln)
}
return strings.Join(result, "\n")
}