From 86942cb46cbdc554fd1a5c6c7a1e2c95e644c27a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Fri, 10 May 2024 15:12:19 +0200 Subject: [PATCH] wip --- lib/logstorage/consts.go | 2 +- lib/logstorage/datadb.go | 13 ++++++++----- lib/storage/partition.go | 10 +++------- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/lib/logstorage/consts.go b/lib/logstorage/consts.go index 50fc18422..1bd17af95 100644 --- a/lib/logstorage/consts.go +++ b/lib/logstorage/consts.go @@ -14,7 +14,7 @@ const maxUncompressedBlockSize = 2 * 1024 * 1024 const maxRowsPerBlock = 8 * 1024 * 1024 // maxColumnsPerBlock is the maximum number of columns per block. -const maxColumnsPerBlock = 10000 +const maxColumnsPerBlock = 2_000 // MaxFieldNameSize is the maximum size in bytes for field name. // diff --git a/lib/logstorage/datadb.go b/lib/logstorage/datadb.go index f3c9a5acb..5d559bf43 100644 --- a/lib/logstorage/datadb.go +++ b/lib/logstorage/datadb.go @@ -23,6 +23,12 @@ import ( // This time shouldn't exceed a few days. const maxBigPartSize = 1e12 +// The maximum number of inmemory parts in the partition. +// +// The actual number of inmemory parts may exceed this value if in-memory mergers +// cannot keep up with the rate of creating new in-memory parts. +const maxInmemoryPartsPerPartition = 20 + // The interval for guaranteed flush of recently ingested data from memory to on-disk parts, // so they survive process crash. var dataFlushInterval = 5 * time.Second @@ -41,11 +47,6 @@ const defaultPartsToMerge = 15 // The 1.7 is good enough for production workloads. const minMergeMultiplier = 1.7 -// The maximum number of inmemory parts in the partition. -// -// If the number of inmemory parts reaches this value, then assisted merge runs during data ingestion. -const maxInmemoryPartsPerPartition = 20 - // datadb represents a database with log data type datadb struct { // mergeIdx is used for generating unique directory names for parts @@ -663,9 +664,11 @@ func (ddb *datadb) mustAddRows(lr *LogRows) { return } + inmemoryPartsConcurrencyCh <- struct{}{} mp := getInmemoryPart() mp.mustInitFromRows(lr) p := mustOpenInmemoryPart(ddb.pt, mp) + <-inmemoryPartsConcurrencyCh flushDeadline := time.Now().Add(ddb.flushInterval) pw := newPartWrapper(p, mp, flushDeadline) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index a0b9527ad..9c30185ea 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -28,14 +28,10 @@ import ( // This time shouldn't exceed a few days. const maxBigPartSize = 1e12 -// The maximum number of inmemory parts per partition. +// The maximum expected number of inmemory parts per partition. // -// This limit allows reducing querying CPU usage under high ingestion rate. -// See https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5212 -// -// This number may be reached when the insertion pace outreaches merger pace. -// If this number is reached, then the data ingestion is paused until background -// mergers reduce the number of parts below this number. +// The actual number of inmemory parts may exceed this value if in-memory mergers +// cannot keep up with the rate of creating new in-memory parts. const maxInmemoryParts = 60 // Default number of parts to merge at once.