mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
lib/{mergeset,storage}: compare errors with errors.Is()
This commit is contained in:
parent
067d7c1ea1
commit
8beb0da6ad
6 changed files with 42 additions and 28 deletions
|
@ -40,9 +40,6 @@ func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStre
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err == errForciblyStopped {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("cannot merge %d block streams: %s: %w", len(bsrs), bsrs, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package mergeset
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
|
@ -76,7 +77,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
|||
ch := make(chan struct{})
|
||||
var itemsMerged uint64
|
||||
close(ch)
|
||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); err != errForciblyStopped {
|
||||
if err := mergeBlockStreams(&dstIP.ph, &bsw, bsrs, nil, ch, &itemsMerged); !errors.Is(err, errForciblyStopped) {
|
||||
t.Fatalf("unexpected error during merge: got %v; want %v", err, errForciblyStopped)
|
||||
}
|
||||
if itemsMerged != 0 {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package mergeset
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -480,16 +481,27 @@ func (tb *Table) convertToV1280() {
|
|||
}
|
||||
|
||||
func (tb *Table) mergePartsOptimal(pws []*partWrapper, stopCh <-chan struct{}) error {
|
||||
defer func() {
|
||||
// Remove isInMerge flag from pws.
|
||||
tb.partsLock.Lock()
|
||||
for _, pw := range pws {
|
||||
// Do not check for pws.isInMerge set to false,
|
||||
// since it may be set to false in mergeParts below.
|
||||
pw.isInMerge = false
|
||||
}
|
||||
tb.partsLock.Unlock()
|
||||
}()
|
||||
for len(pws) > defaultPartsToMerge {
|
||||
if err := tb.mergeParts(pws[:defaultPartsToMerge], stopCh, false); err != nil {
|
||||
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
|
||||
}
|
||||
pws = pws[defaultPartsToMerge:]
|
||||
}
|
||||
if len(pws) > 0 {
|
||||
if err := tb.mergeParts(pws, stopCh, false); err != nil {
|
||||
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
|
||||
}
|
||||
if len(pws) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := tb.mergeParts(pws, stopCh, false); err != nil {
|
||||
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -576,7 +588,7 @@ func (tb *Table) mergeRawItemsBlocks(blocksToMerge []*inmemoryBlock) {
|
|||
atomic.AddUint64(&tb.assistedMerges, 1)
|
||||
continue
|
||||
}
|
||||
if err == errNothingToMerge || err == errForciblyStopped {
|
||||
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
|
||||
return
|
||||
}
|
||||
logger.Panicf("FATAL: cannot merge small parts: %s", err)
|
||||
|
@ -696,11 +708,11 @@ func (tb *Table) partMerger() error {
|
|||
isFinal = false
|
||||
continue
|
||||
}
|
||||
if err == errForciblyStopped {
|
||||
if errors.Is(err, errForciblyStopped) {
|
||||
// The merger has been stopped.
|
||||
return nil
|
||||
}
|
||||
if err != errNothingToMerge {
|
||||
if !errors.Is(err, errNothingToMerge) {
|
||||
return err
|
||||
}
|
||||
if fasttime.UnixTimestamp()-lastMergeTime > 30 {
|
||||
|
@ -805,9 +817,6 @@ func (tb *Table) mergeParts(pws []*partWrapper, stopCh <-chan struct{}, isOuterP
|
|||
err := mergeBlockStreams(&ph, bsw, bsrs, tb.prepareBlock, stopCh, &tb.itemsMerged)
|
||||
putBlockStreamWriter(bsw)
|
||||
if err != nil {
|
||||
if err == errForciblyStopped {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("error when merging parts to %q: %w", tmpPartPath, err)
|
||||
}
|
||||
if err := ph.WriteMetadata(tmpPartPath); err != nil {
|
||||
|
|
|
@ -27,9 +27,6 @@ func mergeBlockStreams(ph *partHeader, bsw *blockStreamWriter, bsrs []*blockStre
|
|||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if err == errForciblyStopped {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("cannot merge %d streams: %s: %w", len(bsrs), bsrs, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"math/rand"
|
||||
"testing"
|
||||
)
|
||||
|
@ -364,7 +365,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
|||
ch := make(chan struct{})
|
||||
var rowsMerged, rowsDeleted uint64
|
||||
close(ch)
|
||||
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); err != errForciblyStopped {
|
||||
if err := mergeBlockStreams(&mp.ph, &bsw, bsrs, ch, nil, &rowsMerged, &rowsDeleted); !errors.Is(err, errForciblyStopped) {
|
||||
t.Fatalf("unexpected error in mergeBlockStreams: got %v; want %v", err, errForciblyStopped)
|
||||
}
|
||||
if rowsMerged != 0 {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"math/bits"
|
||||
|
@ -584,7 +585,7 @@ func (pt *partition) addRowsPart(rows []rawRow) {
|
|||
atomic.AddUint64(&pt.smallAssistedMerges, 1)
|
||||
return
|
||||
}
|
||||
if err == errNothingToMerge || err == errForciblyStopped {
|
||||
if errors.Is(err, errNothingToMerge) || errors.Is(err, errForciblyStopped) {
|
||||
return
|
||||
}
|
||||
logger.Panicf("FATAL: cannot merge small parts: %s", err)
|
||||
|
@ -800,16 +801,27 @@ func (pt *partition) flushInmemoryParts(dstPws []*partWrapper, force bool) ([]*p
|
|||
}
|
||||
|
||||
func (pt *partition) mergePartsOptimal(pws []*partWrapper) error {
|
||||
defer func() {
|
||||
// Remove isInMerge flag from pws.
|
||||
pt.partsLock.Lock()
|
||||
for _, pw := range pws {
|
||||
// Do not check for pws.isInMerge set to false,
|
||||
// since it may be set to false in mergeParts below.
|
||||
pw.isInMerge = false
|
||||
}
|
||||
pt.partsLock.Unlock()
|
||||
}()
|
||||
for len(pws) > defaultPartsToMerge {
|
||||
if err := pt.mergeParts(pws[:defaultPartsToMerge], nil); err != nil {
|
||||
return fmt.Errorf("cannot merge %d parts: %w", defaultPartsToMerge, err)
|
||||
}
|
||||
pws = pws[defaultPartsToMerge:]
|
||||
}
|
||||
if len(pws) > 0 {
|
||||
if err := pt.mergeParts(pws, nil); err != nil {
|
||||
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
|
||||
}
|
||||
if len(pws) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := pt.mergeParts(pws, nil); err != nil {
|
||||
return fmt.Errorf("cannot merge %d parts: %w", len(pws), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -889,11 +901,11 @@ func (pt *partition) partsMerger(mergerFunc func(isFinal bool) error) error {
|
|||
isFinal = false
|
||||
continue
|
||||
}
|
||||
if err == errForciblyStopped {
|
||||
if errors.Is(err, errForciblyStopped) {
|
||||
// The merger has been stopped.
|
||||
return nil
|
||||
}
|
||||
if err != errNothingToMerge {
|
||||
if !errors.Is(err, errNothingToMerge) {
|
||||
return err
|
||||
}
|
||||
if fasttime.UnixTimestamp()-lastMergeTime > 30 {
|
||||
|
@ -1052,9 +1064,6 @@ func (pt *partition) mergeParts(pws []*partWrapper, stopCh <-chan struct{}) erro
|
|||
}
|
||||
putBlockStreamWriter(bsw)
|
||||
if err != nil {
|
||||
if err == errForciblyStopped {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("error when merging parts to %q: %w", tmpPartPath, err)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue