Merge branch 'public-single-node' into pmm-6401-read-prometheus-data-files

This commit is contained in:
Aliaksandr Valialkin 2022-10-21 15:03:12 +03:00
commit 07fe2c5361
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
2 changed files with 36 additions and 31 deletions

View file

@ -644,7 +644,11 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, i
flushSeconds = 1
}
lastFlushTime := atomic.LoadUint64(&ris.lastFlushTime)
if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) {
if !isFinal && currentTime <= lastFlushTime+uint64(flushSeconds) {
// Fast path - nothing to flush
return dst
}
// Slow path - move ris.ibs to dst
ris.mu.Lock()
ibs := ris.ibs
dst = append(dst, ibs...)
@ -654,7 +658,6 @@ func (ris *rawItemsShard) appendBlocksToFlush(dst []*inmemoryBlock, tb *Table, i
ris.ibs = ibs[:0]
atomic.StoreUint64(&ris.lastFlushTime, currentTime)
ris.mu.Unlock()
}
return dst
}

View file

@ -473,23 +473,17 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
rrs.mu.Lock()
if cap(rrs.rows) == 0 {
n := getMaxRawRowsPerShard()
rrs.rows = make([]rawRow, 0, n)
rrs.rows = newRawRowsBlock()
}
maxRowsCount := cap(rrs.rows)
capacity := maxRowsCount - len(rrs.rows)
if capacity >= len(rows) {
// Fast path - rows fit rrs.rows capacity.
rrs.rows = append(rrs.rows, rows...)
} else {
// Slow path - rows don't fit rrs.rows capacity.
// Fill rrs.rows with rows until capacity,
// then put rrs.rows to rowsToFlush and convert it to a part.
n := copy(rrs.rows[:cap(rrs.rows)], rows)
n := copy(rrs.rows[len(rrs.rows):cap(rrs.rows)], rows)
rrs.rows = rrs.rows[:len(rrs.rows)+n]
rows = rows[n:]
if len(rows) > 0 {
// Slow path - rows did't fit rrs.rows capacity.
// Convert rrs.rows to rowsToFlush and convert it to a part,
// then try moving the remaining rows to rrs.rows.
rowsToFlush = rrs.rows
n = getMaxRawRowsPerShard()
rrs.rows = make([]rawRow, 0, n)
rrs.rows = newRawRowsBlock()
if len(rows) <= n {
rrs.rows = append(rrs.rows[:0], rows...)
} else {
@ -504,6 +498,11 @@ func (rrs *rawRowsShard) addRows(pt *partition, rows []rawRow) {
pt.flushRowsToParts(rowsToFlush)
}
func newRawRowsBlock() []rawRow {
n := getMaxRawRowsPerShard()
return make([]rawRow, 0, n)
}
func (pt *partition) flushRowsToParts(rows []rawRow) {
maxRows := getMaxRawRowsPerShard()
wg := getWaitGroup()
@ -747,13 +746,16 @@ func (rrs *rawRowsShard) appendRawRowsToFlush(dst []rawRow, pt *partition, isFin
flushSeconds = 1
}
lastFlushTime := atomic.LoadUint64(&rrs.lastFlushTime)
if isFinal || currentTime-lastFlushTime > uint64(flushSeconds) {
if !isFinal && currentTime <= lastFlushTime+uint64(flushSeconds) {
// Fast path - nothing to flush
return dst
}
// Slow path - move rrs.rows to dst.
rrs.mu.Lock()
dst = append(dst, rrs.rows...)
rrs.rows = rrs.rows[:0]
atomic.StoreUint64(&rrs.lastFlushTime, currentTime)
rrs.mu.Unlock()
}
return dst
}