mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-03-11 15:34:56 +00:00
wip
Signed-off-by: Zakhar Bessarab <z.bessarab@victoriametrics.com>
This commit is contained in:
parent
1d4a0796f4
commit
783dc8fd88
1 changed files with 12 additions and 6 deletions
|
@ -30,10 +30,16 @@ const defaultPartsToMerge = 15
|
|||
// The 1.7 is good enough for production workloads.
|
||||
const minMergeMultiplier = 1.7
|
||||
|
||||
// The maximum number of inmemory parts in the partition.
|
||||
// The maximum number of inmemory parts in the partition created per second.
|
||||
//
|
||||
// If the number of inmemory parts reaches this value, then assisted merge runs during data ingestion.
|
||||
const maxInmemoryPartsPerPartition = 20
|
||||
const maxInmemoryPartsPerPartitionPerSecond = 6
|
||||
|
||||
func getMaxInmemoryPartsPerPartition(fi time.Duration) int {
|
||||
sec := fi.Seconds()
|
||||
|
||||
return int(sec * maxInmemoryPartsPerPartitionPerSecond)
|
||||
}
|
||||
|
||||
// datadb represents a database with log data
|
||||
type datadb struct {
|
||||
|
@ -455,7 +461,7 @@ func (ddb *datadb) getDstPartType(pws []*partWrapper, isFinal bool) partType {
|
|||
return partFile
|
||||
}
|
||||
dstPartSize := getCompressedSize(pws)
|
||||
if dstPartSize > getMaxInmemoryPartSize() {
|
||||
if dstPartSize > getMaxInmemoryPartSize(ddb.flushInterval) {
|
||||
return partFile
|
||||
}
|
||||
if !areAllInmemoryParts(pws) {
|
||||
|
@ -515,7 +521,7 @@ func (ddb *datadb) mustAddRows(lr *LogRows) {
|
|||
if len(ddb.inmemoryParts) > defaultPartsToMerge {
|
||||
ddb.startMergeWorkerLocked()
|
||||
}
|
||||
for len(ddb.inmemoryParts) > maxInmemoryPartsPerPartition {
|
||||
for len(ddb.inmemoryParts) > getMaxInmemoryPartsPerPartition(ddb.flushInterval) {
|
||||
// limit the pace for data ingestion if too many inmemory parts are created
|
||||
ddb.mergeDoneCond.Wait()
|
||||
}
|
||||
|
@ -741,9 +747,9 @@ func (ddb *datadb) getFlushToDiskDeadline(pws []*partWrapper) time.Time {
|
|||
return d
|
||||
}
|
||||
|
||||
func getMaxInmemoryPartSize() uint64 {
|
||||
func getMaxInmemoryPartSize(fl time.Duration) uint64 {
|
||||
// Allocate 10% of allowed memory for in-memory parts.
|
||||
n := uint64(0.1 * float64(memory.Allowed()) / maxInmemoryPartsPerPartition)
|
||||
n := uint64(0.1 * float64(memory.Allowed()) / float64(getMaxInmemoryPartsPerPartition(fl)))
|
||||
if n < 1e6 {
|
||||
n = 1e6
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue