2020-09-28 01:11:55 +00:00
|
|
|
package common
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
|
|
|
|
2020-12-08 18:49:32 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
2020-09-28 01:11:55 +00:00
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
|
|
)
|
|
|
|
|
|
|
|
// ScheduleUnmarshalWork schedules uw to run in the worker pool.
|
|
|
|
//
|
|
|
|
// It is expected that StartUnmarshalWorkers is already called.
|
|
|
|
func ScheduleUnmarshalWork(uw UnmarshalWork) {
|
|
|
|
unmarshalWorkCh <- uw
|
|
|
|
}
|
|
|
|
|
|
|
|
// UnmarshalWork is a unit of unmarshal work.
|
|
|
|
type UnmarshalWork interface {
|
|
|
|
// Unmarshal must implement CPU-bound unmarshal work.
|
|
|
|
Unmarshal()
|
|
|
|
}
|
|
|
|
|
|
|
|
// StartUnmarshalWorkers starts unmarshal workers.
|
|
|
|
func StartUnmarshalWorkers() {
|
|
|
|
if unmarshalWorkCh != nil {
|
2023-03-03 10:02:13 +00:00
|
|
|
logger.Panicf("BUG: it looks like startUnmarshalWorkers() has been already called without stopUnmarshalWorkers()")
|
2020-09-28 01:11:55 +00:00
|
|
|
}
|
2020-12-08 18:49:32 +00:00
|
|
|
gomaxprocs := cgroup.AvailableCPUs()
|
2021-05-08 14:55:44 +00:00
|
|
|
unmarshalWorkCh = make(chan UnmarshalWork, gomaxprocs)
|
2020-09-28 01:11:55 +00:00
|
|
|
unmarshalWorkersWG.Add(gomaxprocs)
|
|
|
|
for i := 0; i < gomaxprocs; i++ {
|
|
|
|
go func() {
|
|
|
|
defer unmarshalWorkersWG.Done()
|
|
|
|
for uw := range unmarshalWorkCh {
|
|
|
|
uw.Unmarshal()
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// StopUnmarshalWorkers stops unmarshal workers.
|
|
|
|
//
|
2023-02-13 12:27:13 +00:00
|
|
|
// No more calls to ScheduleUnmarshalWork are allowed after calling stopUnmarshalWorkers
|
2020-09-28 01:11:55 +00:00
|
|
|
func StopUnmarshalWorkers() {
|
|
|
|
close(unmarshalWorkCh)
|
|
|
|
unmarshalWorkersWG.Wait()
|
|
|
|
unmarshalWorkCh = nil
|
|
|
|
}
|
|
|
|
|
|
|
|
var (
|
|
|
|
unmarshalWorkCh chan UnmarshalWork
|
|
|
|
unmarshalWorkersWG sync.WaitGroup
|
|
|
|
)
|