diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index d2c63feb0..153e003bd 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -144,6 +144,9 @@ func (ctx *InsertCtx) FlushBufs() error { return nil } } + // There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here, + // since the number of concurrent FlushBufs() calls should be already limited via writeconcurrencylimiter + // used at every ParseStream() call under lib/protoparser/*/streamparser.go err := vmstorage.AddRows(ctx.mrs) ctx.Reset(0) if err == nil { diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 9f4cbc233..ed028e089 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -111,6 +111,8 @@ func pushAggregateSeries(tss []prompbmarshal.TimeSeries) { return } } + // There is no need in limiting the number of concurrent calls to vmstorage.AddRows() here, + // since the number of concurrent pushAggregateSeries() calls should be already limited by lib/streamaggr. if err := vmstorage.AddRows(ctx.mrs); err != nil { logger.Errorf("cannot flush aggregate series: %s", err) } diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 4d3cf3793..d9efd46d9 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -147,6 +147,8 @@ var WG syncwg.WaitGroup var resetResponseCacheIfNeeded func(mrs []storage.MetricRow) // AddRows adds mrs to the storage. +// +// The caller should limit the number of concurrent calls to AddRows() in order to limit memory usage. func AddRows(mrs []storage.MetricRow) error { if Storage.IsReadOnly() { return errReadOnly