app/vminsert: improve re-routing logic in order to spread rows more evenly among the available storage nodes

This commit is contained in:
Aliaksandr Valialkin 2022-02-06 20:20:02 +02:00
parent d24e5d9efd
commit 021ee53ba8
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1
15 changed files with 335 additions and 471 deletions

View file

@ -80,12 +80,12 @@ func main() {
if len(*storageNodes) == 0 { if len(*storageNodes) == 0 {
logger.Fatalf("missing -storageNode arg") logger.Fatalf("missing -storageNode arg")
} }
hashSeed := byte(0) hashSeed := uint64(0)
if *clusternativeListenAddr != "" { if *clusternativeListenAddr != "" {
// Use different hash seed for the second level of vminsert nodes in multi-level cluster setup. // Use different hash seed for the second level of vminsert nodes in multi-level cluster setup.
// This should fix uneven distribution of time series among storage nodes. // This should fix uneven distribution of time series among storage nodes.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1672 // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1672
hashSeed = 42 hashSeed = 0xabcdef0123456789
} }
netstorage.InitStorageNodes(*storageNodes, hashSeed) netstorage.InitStorageNodes(*storageNodes, hashSeed)
logger.Infof("successfully initialized netstorage in %.3f seconds", time.Since(startTime).Seconds()) logger.Infof("successfully initialized netstorage in %.3f seconds", time.Since(startTime).Seconds())

View file

@ -0,0 +1,51 @@
package netstorage
import (
xxhash "github.com/cespare/xxhash/v2"
)
// See the following docs:
// - https://www.eecs.umich.edu/techreports/cse/96/CSE-TR-316-96.pdf
// - https://github.com/dgryski/go-rendezvous
// - https://dgryski.medium.com/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8
type consistentHash struct {
hashSeed uint64
nodeHashes []uint64
}
func newConsistentHash(nodes []string, hashSeed uint64) *consistentHash {
nodeHashes := make([]uint64, len(nodes))
for i, node := range nodes {
nodeHashes[i] = xxhash.Sum64([]byte(node))
}
return &consistentHash{
hashSeed: hashSeed,
nodeHashes: nodeHashes,
}
}
func (rh *consistentHash) getNodeIdx(h uint64, excludeIdxs []int) int {
var mMax uint64
var idx int
h ^= rh.hashSeed
next:
for i, nh := range rh.nodeHashes {
for _, j := range excludeIdxs {
if i == j {
continue next
}
}
if m := fastHashUint64(nh ^ h); m > mMax {
mMax = m
idx = i
}
}
return idx
}
func fastHashUint64(x uint64) uint64 {
x ^= x >> 12 // a
x ^= x << 25 // b
x ^= x >> 27 // c
return x * 2685821657736338717
}

View file

@ -0,0 +1,65 @@
package netstorage
import (
"math"
"math/rand"
"testing"
)
func TestConsistentHash(t *testing.T) {
r := rand.New(rand.NewSource(1))
nodes := []string{
"node1",
"node2",
"node3",
"node4",
}
rh := newConsistentHash(nodes, 0)
keys := make([]uint64, 100000)
for i := 0; i < len(keys); i++ {
keys[i] = r.Uint64()
}
perIdxCounts := make([]int, len(nodes))
keyIndexes := make([]int, len(keys))
for i, k := range keys {
idx := rh.getNodeIdx(k, nil)
perIdxCounts[idx]++
keyIndexes[i] = idx
}
// verify that the number of selected node indexes per each node is roughly the same
expectedPerIdxCount := float64(len(keys)) / float64(len(nodes))
for _, perIdxCount := range perIdxCounts {
if p := math.Abs(float64(perIdxCount)-expectedPerIdxCount) / expectedPerIdxCount; p > 0.005 {
t.Fatalf("uneven number of per-index items %f: %d", p, perIdxCounts)
}
}
// Ignore a single node and verify that the selection for the remaining nodes is even
perIdxCounts = make([]int, len(nodes))
idxsExclude := []int{1}
indexMismatches := 0
for i, k := range keys {
idx := rh.getNodeIdx(k, idxsExclude)
perIdxCounts[idx]++
if keyIndexes[i] != idx {
indexMismatches++
}
}
maxIndexMismatches := float64(len(keys)) / float64(len(nodes))
if float64(indexMismatches) > maxIndexMismatches {
t.Fatalf("too many index mismtaches after excluding a node; got %d; want no more than %f", indexMismatches, maxIndexMismatches)
}
expectedPerIdxCount = float64(len(keys)) / float64(len(nodes)-1)
for i, perIdxCount := range perIdxCounts {
if i == idxsExclude[0] {
if perIdxCount != 0 {
t.Fatalf("unexpected non-zero items for excluded index %d: %d items", idxsExclude[0], perIdxCount)
}
continue
}
if p := math.Abs(float64(perIdxCount)-expectedPerIdxCount) / expectedPerIdxCount; p > 0.005 {
t.Fatalf("uneven number of per-index items %f: %d", p, perIdxCounts)
}
}
}

View file

@ -0,0 +1,40 @@
package netstorage
import (
"math/rand"
"sync/atomic"
"testing"
)
func BenchmarkConsistentHash(b *testing.B) {
nodes := []string{
"node1",
"node2",
"node3",
"node4",
}
rh := newConsistentHash(nodes, 0)
b.ReportAllocs()
b.SetBytes(int64(len(benchKeys)))
b.RunParallel(func(pb *testing.PB) {
sum := 0
for pb.Next() {
for _, k := range benchKeys {
idx := rh.getNodeIdx(k, nil)
sum += idx
}
}
atomic.AddUint64(&BenchSink, uint64(sum))
})
}
var benchKeys = func() []uint64 {
r := rand.New(rand.NewSource(1))
keys := make([]uint64, 10000)
for i := 0; i < len(keys); i++ {
keys[i] = r.Uint64()
}
return keys
}()
var BenchSink uint64

View file

@ -12,7 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
xxhash "github.com/cespare/xxhash/v2" xxhash "github.com/cespare/xxhash/v2"
jump "github.com/lithammer/go-jump-consistent-hash"
) )
// InsertCtx is a generic context for inserting data. // InsertCtx is a generic context for inserting data.
@ -162,9 +161,6 @@ func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) i
} }
buf := ctx.labelsBuf[:0] buf := ctx.labelsBuf[:0]
if hashSeed != 0 {
buf = append(buf, hashSeed)
}
buf = encoding.MarshalUint32(buf, at.AccountID) buf = encoding.MarshalUint32(buf, at.AccountID)
buf = encoding.MarshalUint32(buf, at.ProjectID) buf = encoding.MarshalUint32(buf, at.ProjectID)
for i := range labels { for i := range labels {
@ -175,7 +171,8 @@ func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) i
h := xxhash.Sum64(buf) h := xxhash.Sum64(buf)
ctx.labelsBuf = buf ctx.labelsBuf = buf
idx := int(jump.Hash(h, int32(len(storageNodes)))) // Do not exclude unavailable storage nodes in order to properly account for rerouted rows in storageNode.push().
idx := nodesHash.getNodeIdx(h, nil)
return idx return idx
} }

View file

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io" "io"
"net" "net"
"sort"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -30,31 +29,42 @@ var (
replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+ replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+
"Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+ "Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+
"Higher values for -dedup.minScrapeInterval at vmselect is OK") "Higher values for -dedup.minScrapeInterval at vmselect is OK")
disableRerouting = flag.Bool(`disableRerouting`, true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate") disableRerouting = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate")
) )
var errStorageReadOnly = errors.New("storage node is read only") var errStorageReadOnly = errors.New("storage node is read only")
func (sn *storageNode) isNotReady() bool { func (sn *storageNode) isReady() bool {
return atomic.LoadUint32(&sn.broken) != 0 || atomic.LoadUint32(&sn.isReadOnly) != 0 return atomic.LoadUint32(&sn.broken) == 0 && atomic.LoadUint32(&sn.isReadOnly) == 0
} }
// push pushes buf to sn internal bufs. // push pushes buf to sn internal bufs.
// //
// This function doesn't block on fast path. // This function doesn't block on fast path.
// It may block only if all the storageNodes cannot handle the incoming ingestion rate. // It may block only if storageNodes cannot handle the incoming ingestion rate.
// This blocking provides backpressure to the caller. // This blocking provides backpressure to the caller.
// //
// The function falls back to sending data to other vmstorage nodes // The function falls back to sending data to other vmstorage nodes
// if sn is currently unavailable or overloaded. // if sn is currently unavailable or overloaded.
// //
// rows is the number of rows in the buf. // rows must match the number of rows in the buf.
func (sn *storageNode) push(buf []byte, rows int) error { func (sn *storageNode) push(buf []byte, rows int) error {
if len(buf) > maxBufSizePerStorageNode { if len(buf) > maxBufSizePerStorageNode {
logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode) logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode)
} }
sn.rowsPushed.Add(rows) sn.rowsPushed.Add(rows)
if sn.trySendBuf(buf, rows) {
// Fast path - the buffer is successfully sent to sn.
return nil
}
// Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes.
if err := sn.rerouteBufToOtherStorageNodes(buf, rows); err != nil {
return fmt.Errorf("error when re-routing rows from %s: %w", sn.dialer.Addr(), err)
}
return nil
}
func (sn *storageNode) rerouteBufToOtherStorageNodes(buf []byte, rows int) error {
sn.brLock.Lock() sn.brLock.Lock()
again: again:
select { select {
@ -63,15 +73,17 @@ again:
return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows) return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows)
default: default:
} }
if sn.isNotReady() { if !sn.isReady() {
if len(storageNodes) == 1 { if len(storageNodes) == 1 {
// There are no other storage nodes to re-route to. So wait until the current node becomes healthy. // There are no other storage nodes to re-route to. So wait until the current node becomes healthy.
sn.brCond.Wait() sn.brCond.Wait()
goto again goto again
} }
sn.brLock.Unlock() sn.brLock.Unlock()
// The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes even if *disableRerouting==true. // The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set.
if err := rerouteRowsMayBlock(sn, false, buf, rows); err != nil { rowsProcessed, err := rerouteRowsToReadyStorageNodes(sn, buf)
rows -= rowsProcessed
if err != nil {
return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err) return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err)
} }
return nil return nil
@ -83,16 +95,15 @@ again:
sn.brLock.Unlock() sn.brLock.Unlock()
return nil return nil
} }
// Slow path: the buf contents doesn't fit sn.buf, so try re-routing it to other vmstorage nodes.
if *disableRerouting || len(storageNodes) == 1 { if *disableRerouting || len(storageNodes) == 1 {
sn.brCond.Wait() sn.brCond.Wait()
goto again goto again
} }
sn.brLock.Unlock() sn.brLock.Unlock()
rowsProcessed, err := rerouteRowsToFreeStorageNodes(sn, buf)
// The buf contents doesn't fit sn.buf. rows -= rowsProcessed
// This means that the current vmstorage is slow or will become broken soon. if err != nil {
// Spread buf among all the vmstorage nodes.
if err := rerouteRowsMayBlock(sn, true, buf, rows); err != nil {
return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err) return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err)
} }
return nil return nil
@ -244,7 +255,7 @@ func (sn *storageNode) checkHealth() {
} }
func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool {
if sn.isNotReady() { if !sn.isReady() {
return false return false
} }
sn.bcLock.Lock() sn.bcLock.Lock()
@ -363,9 +374,6 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) {
// storageNode is a client sending data to vmstorage node. // storageNode is a client sending data to vmstorage node.
type storageNode struct { type storageNode struct {
// The last time for the re-routing.
lastRerouteTime uint64
// broken is set to non-zero if the given vmstorage node is temporarily unhealthy. // broken is set to non-zero if the given vmstorage node is temporarily unhealthy.
// In this case the data is re-routed to the remaining healthy vmstorage nodes. // In this case the data is re-routed to the remaining healthy vmstorage nodes.
broken uint32 broken uint32
@ -434,23 +442,17 @@ var storageNodesWG sync.WaitGroup
var storageNodesStopCh = make(chan struct{}) var storageNodesStopCh = make(chan struct{})
// hashSeed is a seed for distributing time series amont storage nodes. // nodesHash is used for consistently selecting a storage node by key.
var hashSeed byte var nodesHash *consistentHash
// InitStorageNodes initializes vmstorage nodes' connections to the given addrs. // InitStorageNodes initializes vmstorage nodes' connections to the given addrs.
// //
// eed is used for changing the distribution of input time series among addrs. // hashSeed is used for changing the distribution of input time series among addrs.
func InitStorageNodes(addrs []string, seed byte) { func InitStorageNodes(addrs []string, hashSeed uint64) {
if len(addrs) == 0 { if len(addrs) == 0 {
logger.Panicf("BUG: addrs must be non-empty") logger.Panicf("BUG: addrs must be non-empty")
} }
hashSeed = seed nodesHash = newConsistentHash(addrs, hashSeed)
// Sort addrs in order to guarantee identical series->vmstorage mapping across all the vminsert nodes.
addrsCopy := append([]string{}, addrs...)
sort.Strings(addrsCopy)
addrs = addrsCopy
storageNodes = storageNodes[:0] storageNodes = storageNodes[:0]
for _, addr := range addrs { for _, addr := range addrs {
if _, _, err := net.SplitHostPort(addr); err != nil { if _, _, err := net.SplitHostPort(addr); err != nil {
@ -517,33 +519,13 @@ func Stop() {
storageNodesWG.Wait() storageNodesWG.Wait()
} }
// rerouteRowsMayBlock re-routes rows from buf among healthy storage nodes. // rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes.
// //
// It waits until healthy storage nodes have enough space for the re-routed rows. // The function blocks until src is fully re-routed.
// This guarantees backpressure if the ingestion rate exceeds vmstorage nodes' func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, error) {
// ingestion rate capacity. rowsProcessed := 0
// var idxsExclude, idxsExcludeNew []int
// It returns non-nil error only if Stop is called. idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], nil)
func rerouteRowsMayBlock(snSource *storageNode, mayUseSNSource bool, buf []byte, rows int) error {
if len(storageNodes) < 2 {
logger.Panicf("BUG: re-routing can work only if at least 2 storage nodes are configured; got %d nodes", len(storageNodes))
}
reroutesTotal.Inc()
atomic.StoreUint64(&snSource.lastRerouteTime, fasttime.UnixTimestamp())
sns := getStorageNodesMapForRerouting(snSource, mayUseSNSource)
if areStorageNodesEqual(sns) {
// Fast path - all the storage nodes are the same - send the buf to them.
sn := sns[0]
if !sn.sendBufMayBlock(buf) {
return fmt.Errorf("cannot re-route data because of graceful shutdown")
}
if sn != snSource {
snSource.rowsReroutedFromHere.Add(rows)
sn.rowsReroutedToHere.Add(rows)
}
return nil
}
src := buf
var mr storage.MetricRow var mr storage.MetricRow
for len(src) > 0 { for len(src) > 0 {
tail, err := mr.UnmarshalX(src) tail, err := mr.UnmarshalX(src)
@ -555,17 +537,149 @@ func rerouteRowsMayBlock(snSource *storageNode, mayUseSNSource bool, buf []byte,
reroutedRowsProcessed.Inc() reroutedRowsProcessed.Inc()
h := xxhash.Sum64(mr.MetricNameRaw) h := xxhash.Sum64(mr.MetricNameRaw)
mr.ResetX() mr.ResetX()
idx := h % uint64(len(sns)) var sn *storageNode
sn := sns[idx] for {
if !sn.sendBufMayBlock(rowBuf) { idx := nodesHash.getNodeIdx(h, idxsExclude)
return fmt.Errorf("cannot re-route data because of graceful shutdown") sn = storageNodes[idx]
if sn.isReady() {
break
}
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], nil)
} }
if *disableRerouting {
if !sn.sendBufMayBlock(rowBuf) {
return rowsProcessed, fmt.Errorf("graceful shutdown started")
}
rowsProcessed++
if sn != snSource {
snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
}
continue
}
if sn.trySendBuf(rowBuf, 1) {
rowsProcessed++
if sn != snSource {
snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
}
continue
}
// If the re-routing is enabled, then try sending the row to another storage node.
idxsExcludeNew = getNotReadyStorageNodeIdxsBlocking(idxsExcludeNew[:0], sn)
idx := nodesHash.getNodeIdx(h, idxsExcludeNew)
snNew := storageNodes[idx]
if snNew.trySendBuf(rowBuf, 1) {
rowsProcessed++
if snNew != snSource {
snSource.rowsReroutedFromHere.Inc()
snNew.rowsReroutedToHere.Inc()
}
continue
}
// Fall back to sending the row to sn in order to minimize re-routing.
if !sn.sendBufMayBlock(rowBuf) {
return rowsProcessed, fmt.Errorf("graceful shutdown started")
}
rowsProcessed++
if sn != snSource { if sn != snSource {
snSource.rowsReroutedFromHere.Inc() snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc() sn.rowsReroutedToHere.Inc()
} }
} }
return nil return rowsProcessed, nil
}
// reouteRowsToFreeStorageNodes re-routes src from snSource to other storage nodes.
//
// It is expected that snSource has no enough buffer for sending src.
// It is expected than *dsableRerouting isn't set when calling this function.
func rerouteRowsToFreeStorageNodes(snSource *storageNode, src []byte) (int, error) {
if *disableRerouting {
logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes")
}
rowsProcessed := 0
var idxsExclude []int
idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], snSource)
var mr storage.MetricRow
for len(src) > 0 {
tail, err := mr.UnmarshalX(src)
if err != nil {
logger.Panicf("BUG: cannot unmarshal MetricRow: %s", err)
}
rowBuf := src[:len(src)-len(tail)]
src = tail
reroutedRowsProcessed.Inc()
h := xxhash.Sum64(mr.MetricNameRaw)
mr.ResetX()
// Try sending the row to snSource in order to minimize re-routing.
if snSource.trySendBuf(rowBuf, 1) {
rowsProcessed++
continue
}
// The row couldn't be sent to snSrouce. Try re-routing it to other nodes.
var sn *storageNode
for {
idx := nodesHash.getNodeIdx(h, idxsExclude)
sn = storageNodes[idx]
if sn.isReady() {
break
}
// re-generate idxsExclude list, since sn must be put there.
idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], snSource)
}
if sn.trySendBuf(rowBuf, 1) {
rowsProcessed++
snSource.rowsReroutedFromHere.Inc()
sn.rowsReroutedToHere.Inc()
continue
}
// Fall back sending the row to snSource in order to minimize re-routing.
if !snSource.sendBufMayBlock(rowBuf) {
return rowsProcessed, fmt.Errorf("graceful shutdown started")
}
rowsProcessed++
}
return rowsProcessed, nil
}
func getNotReadyStorageNodeIdxsBlocking(dst []int, snExtra *storageNode) []int {
dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra)
if len(dst) < len(storageNodes) {
return dst
}
logger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available")
for {
time.Sleep(time.Second)
dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra)
if availableNodes := len(storageNodes) - len(dst); availableNodes > 0 {
logger.Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes)
return dst
}
}
}
func getNotReadyStorageNodeIdxs(dst []int, snExtra *storageNode) []int {
dst = dst[:0]
for i, sn := range storageNodes {
if sn == snExtra && !sn.isReady() {
dst = append(dst, i)
}
}
return dst
}
func (sn *storageNode) trySendBuf(buf []byte, rows int) bool {
sent := false
sn.brLock.Lock()
if sn.isReady() && len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode {
sn.br.buf = append(sn.br.buf, buf...)
sn.br.rows += rows
sent = true
}
sn.brLock.Unlock()
return sent
} }
func (sn *storageNode) sendBufMayBlock(buf []byte) bool { func (sn *storageNode) sendBufMayBlock(buf []byte) bool {
@ -621,54 +735,6 @@ func (sn *storageNode) checkReadOnlyMode() {
} }
} }
func getStorageNodesMapForRerouting(snExclude *storageNode, mayUseSNExclude bool) []*storageNode {
sns := getStorageNodesForRerouting(snExclude, true)
if len(sns) == len(storageNodes) {
return sns
}
if !mayUseSNExclude {
sns = getStorageNodesForRerouting(snExclude, false)
}
for len(sns) < len(storageNodes) {
sns = append(sns, snExclude)
}
return sns
}
func areStorageNodesEqual(sns []*storageNode) bool {
snOrigin := sns[0]
for _, sn := range sns[1:] {
if sn != snOrigin {
return false
}
}
return true
}
func getStorageNodesForRerouting(snExclude *storageNode, skipRecentlyReroutedNodes bool) []*storageNode {
sns := make([]*storageNode, 0, len(storageNodes))
currentTime := fasttime.UnixTimestamp()
for i, sn := range storageNodes {
if sn == snExclude || sn.isNotReady() {
// Skip snExclude and broken storage nodes.
continue
}
if skipRecentlyReroutedNodes && currentTime <= atomic.LoadUint64(&sn.lastRerouteTime)+5 {
// Skip nodes, which were re-routed recently.
continue
}
for len(sns) <= i {
sns = append(sns, sn)
}
}
if len(sns) > 0 {
for len(sns) < len(storageNodes) {
sns = append(sns, sns[0])
}
}
return sns
}
var ( var (
maxBufSizePerStorageNode int maxBufSizePerStorageNode int

View file

@ -6,6 +6,7 @@ sort: 15
## tip ## tip
* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve re-routing logic, so it re-routes incoming data more evenly if some of `vmstorage` nodes are temporarily unavailable and/or accept data at slower rate than other `vmstorage` nodes.
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): cover more cases with the [label filters' propagation optimization](https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization). This should improve the average performance for practical queries. The following cases are additionally covered: * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): cover more cases with the [label filters' propagation optimization](https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization). This should improve the average performance for practical queries. The following cases are additionally covered:
* Multi-level [transform functions](https://docs.victoriametrics.com/MetricsQL.html#transform-functions). For example, `abs(round(foo{a="b"})) + bar{x="y"}` is now optimized to `abs(round(foo{a="b",x="y"})) + bar{a="b",x="y"}` * Multi-level [transform functions](https://docs.victoriametrics.com/MetricsQL.html#transform-functions). For example, `abs(round(foo{a="b"})) + bar{x="y"}` is now optimized to `abs(round(foo{a="b",x="y"})) + bar{a="b",x="y"}`
* Binary operations with `on()`, `without()`, `group_left()` and `group_right()` modifiers. For example, `foo{a="b"} on (a) + bar` is now optimized to `foo{a="b"} on (a) + bar{a="b"}` * Binary operations with `on()`, `without()`, `group_left()` and `group_right()` modifiers. For example, `foo{a="b"} on (a) + bar` is now optimized to `foo{a="b"} on (a) + bar{a="b"}`

1
go.mod
View file

@ -17,7 +17,6 @@ require (
github.com/golang/snappy v0.0.4 github.com/golang/snappy v0.0.4
github.com/influxdata/influxdb v1.9.5 github.com/influxdata/influxdb v1.9.5
github.com/klauspost/compress v1.14.2 github.com/klauspost/compress v1.14.2
github.com/lithammer/go-jump-consistent-hash v1.0.2
github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9 github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9
github.com/urfave/cli/v2 v2.3.0 github.com/urfave/cli/v2 v2.3.0
github.com/valyala/fastjson v1.6.3 github.com/valyala/fastjson v1.6.3

2
go.sum
View file

@ -678,8 +678,6 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lithammer/go-jump-consistent-hash v1.0.2 h1:w74N9XiMa4dWZdoVnfLbnDhfpGOMCxlrudzt2e7wtyk=
github.com/lithammer/go-jump-consistent-hash v1.0.2/go.mod h1:4MD1WDikNGnb9D56hAtscaZaOWOiCG+lLbRR5ZN9JL0=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=

View file

@ -1,21 +0,0 @@
The MIT License (MIT)
Copyright (c) 2018 Peter Lithammer
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View file

@ -1,61 +0,0 @@
# Jump Consistent Hash
[![Build Status](https://github.com/lithammer/go-jump-consistent-hash/workflows/Go/badge.svg)](https://github.com/lithammer/go-jump-consistent-hash/actions)
[![Godoc](https://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/lithammer/go-jump-consistent-hash)
Go implementation of the jump consistent hash algorithm[1] by John Lamping and Eric Veach.
[1] http://arxiv.org/pdf/1406.2294v1.pdf
## Usage
```go
import jump "github.com/lithammer/go-jump-consistent-hash"
func main() {
h := jump.Hash(256, 1024) // h = 520
}
```
Includes a helper function for using a `string` as key instead of an `uint64`. This requires a hasher that computes the string into a format accepted by `Hash()`. Such a hasher that uses [CRC-64 (ECMA)](https://en.wikipedia.org/wiki/Cyclic_redundancy_check) is also included for convenience.
```go
h := jump.HashString("127.0.0.1", 8, jump.NewCRC64()) // h = 7
```
In reality though you probably want to use a `Hasher` so you won't have to repeat the bucket size and which key hasher used. It also uses more convenient types, like `int` instead of `int32`.
```go
hasher := jump.New(8, jump.NewCRC64())
h := hasher.Hash("127.0.0.1") // h = 7
```
If you want to use your own algorithm, you must implement the `KeyHasher` interface, which is a subset of the `hash.Hash64` interface available in the standard library.
Here's an example of a custom `KeyHasher` that uses Google's [FarmHash](https://github.com/google/farmhash) algorithm (the successor of CityHash) to compute the final key.
```go
type FarmHash struct {
buf bytes.Buffer
}
func (f *FarmHash) Write(p []byte) (n int, err error) {
return f.buf.Write(p)
}
func (f *FarmHash) Reset() {
f.buf.Reset()
}
func (f *FarmHash) Sum64() uint64 {
// https://github.com/dgryski/go-farm
return farm.Hash64(f.buf.Bytes())
}
hasher := jump.New(8, &FarmHash{})
h := hasher.Hash("127.0.0.1") // h = 5
```
## License
MIT

View file

@ -1,38 +0,0 @@
package jump
import "hash"
type crc32Hasher struct {
crc32 hash.Hash32
}
func (h *crc32Hasher) Write(p []byte) (n int, err error) {
return h.crc32.Write(p)
}
func (h *crc32Hasher) Sum(b []byte) []byte {
return h.crc32.Sum(b)
}
func (h *crc32Hasher) Reset() {
h.crc32.Reset()
}
func (h *crc32Hasher) Size() int {
return h.crc32.Size()
}
func (h *crc32Hasher) BlockSize() int {
return h.crc32.BlockSize()
}
func (h *crc32Hasher) Sum32() uint32 {
return h.crc32.Sum32()
}
func (h *crc32Hasher) Sum64() uint64 {
return uint64(h.crc32.Sum32())
}
var _ hash.Hash32 = (*crc32Hasher)(nil)
var _ hash.Hash64 = (*crc32Hasher)(nil)

View file

@ -1,135 +0,0 @@
/*
Package jump implements the "jump consistent hash" algorithm.
Example
h := jump.Hash(256, 1024) // h = 520
Reference C++ implementation[1]
int32_t JumpConsistentHash(uint64_t key, int32_t num_buckets) {
int64_t b = -1, j = 0;
while (j < num_buckets) {
b = j;
key = key * 2862933555777941757ULL + 1;
j = (b + 1) * (double(1LL << 31) / double((key >> 33) + 1));
}
return b;
}
Explanation of the algorithm
Jump consistent hash works by computing when its output changes as the
number of buckets increases. Let ch(key, num_buckets) be the consistent hash
for the key when there are num_buckets buckets. Clearly, for any key, k,
ch(k, 1) is 0, since there is only the one bucket. In order for the
consistent hash function to balanced, ch(k, 2) will have to stay at 0 for
half the keys, k, while it will have to jump to 1 for the other half. In
general, ch(k, n+1) has to stay the same as ch(k, n) for n/(n+1) of the
keys, and jump to n for the other 1/(n+1) of the keys.
Here are examples of the consistent hash values for three keys, k1, k2, and
k3, as num_buckets goes up:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
k1 0 0 2 2 4 4 4 4 4 4 4 4 4 4
k2 0 1 1 1 1 1 1 7 7 7 7 7 7 7
k3 0 1 1 1 1 5 5 7 7 7 10 10 10 10
A linear time algorithm can be defined by using the formula for the
probability of ch(key, j) jumping when j increases. It essentially walks
across a row of this table. Given a key and number of buckets, the algorithm
considers each successive bucket, j, from 1 to num_buckets­1, and uses
ch(key, j) to compute ch(key, j+1). At each bucket, j, it decides whether to
keep ch(k, j+1) the same as ch(k, j), or to jump its value to j. In order to
jump for the right fraction of keys, it uses a pseudo­random number
generator with the key as its seed. To jump for 1/(j+1) of keys, it
generates a uniform random number between 0.0 and 1.0, and jumps if the
value is less than 1/(j+1). At the end of the loop, it has computed
ch(k, num_buckets), which is the desired answer. In code:
int ch(int key, int num_buckets) {
random.seed(key);
int b = 0; // This will track ch(key,j+1).
for (int j = 1; j < num_buckets; j++) {
if (random.next() < 1.0 / (j + 1)) b = j;
}
return b;
}
We can convert this to a logarithmic time algorithm by exploiting that
ch(key, j+1) is usually unchanged as j increases, only jumping occasionally.
The algorithm will only compute the destinations of jumps ­­ the js for
which ch(key, j+1) ≠ ch(key, j). Also notice that for these js, ch(key,
j+1) = j. To develop the algorithm, we will treat ch(key, j) as a random
variable, so that we can use the notation for random variables to analyze
the fractions of keys for which various propositions are true. That will
lead us to a closed form expression for a pseudo­random variable whose value
gives the destination of the next jump.
Suppose that the algorithm is tracking the bucket numbers of the jumps for a
particular key, k. And suppose that b was the destination of the last jump,
that is, ch(k, b) ≠ ch(k, b+1), and ch(k, b+1) = b. Now, we want to find the
next jump, the smallest j such that ch(k, j+1) ≠ ch(k, b+1), or
equivalently, the largest j such that ch(k, j) = ch(k, b+1). We will make a
pseudo­random variable whose value is that j. To get a probabilistic
constraint on j, note that for any bucket number, i, we have j i if and
only if the consistent hash hasnt changed by i, that is, if and only if
ch(k, i) = ch(k, b+1). Hence, the distribution of j must satisfy
P(j i) = P( ch(k, i) = ch(k, b+1) )
Fortunately, it is easy to compute that probability. Notice that since P(
ch(k, 10) = ch(k, 11) ) is 10/11, and P( ch(k, 11) = ch(k, 12) ) is 11/12,
then P( ch(k, 10) = ch(k, 12) ) is 10/11 * 11/12 = 10/12. In general, if n
m, P( ch(k, n) = ch(k, m) ) = m / n. Thus for any i > b,
P(j i) = P( ch(k, i) = ch(k, b+1) ) = (b+1) / i .
Now, we generate a pseudo­random variable, r, (depending on k and j) that is
uniformly distributed between 0 and 1. Since we want P(j i) = (b+1) / i,
we set P(j i) iff r (b+1) / i. Solving the inequality for i yields P(j
i) iff i (b+1) / r. Since i is a lower bound on j, j will equal the
largest i for which P(j i), thus the largest i satisfying i (b+1) / r.
Thus, by the definition of the floor function, j = floor((b+1) / r).
Using this formula, jump consistent hash finds ch(key, num_buckets) by
choosing successive jump destinations until it finds a position at or past
num_buckets. It then knows that the previous jump destination is the answer.
int ch(int key, int num_buckets) {
random.seed(key);
int b = -1; // bucket number before the previous jump
int j = 0; // bucket number before the current jump
while (j < num_buckets) {
b = j;
r = random.next();
j = floor((b + 1) / r);
}
return = b;
}
To turn this into the actual code of figure 1, we need to implement random.
We want it to be fast, and yet to also to have well distributed successive
values. We use a 64­bit linear congruential generator; the particular
multiplier we use produces random numbers that are especially well
distributed in higher dimensions (i.e., when successive random values are
used to form tuples). We use the key as the seed. (For keys that dont fit
into 64 bits, a 64 bit hash of the key should be used.) The congruential
generator updates the seed on each iteration, and the code derives a double
from the current seed. Tests show that this generator has good speed and
distribution.
It is worth noting that unlike the algorithm of Karger et al., jump
consistent hash does not require the key to be hashed if it is already an
integer. This is because jump consistent hash has an embedded pseudorandom
number generator that essentially rehashes the key on every iteration. The
hash is not especially good (i.e., linear congruential), but since it is
applied repeatedly, additional hashing of the input key is not necessary.
[1] http://arxiv.org/pdf/1406.2294v1.pdf
*/
package jump

View file

@ -1,95 +0,0 @@
package jump
import (
"hash"
"hash/crc32"
"hash/crc64"
"hash/fnv"
"io"
)
// Hash takes a 64 bit key and the number of buckets. It outputs a bucket
// number in the range [0, buckets).
// If the number of buckets is less than or equal to 0 then one 1 is used.
func Hash(key uint64, buckets int32) int32 {
var b, j int64
if buckets <= 0 {
buckets = 1
}
for j < int64(buckets) {
b = j
key = key*2862933555777941757 + 1
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
}
return int32(b)
}
// HashString takes string as key instead of an int and uses a KeyHasher to
// generate a key compatible with Hash().
func HashString(key string, buckets int32, h KeyHasher) int32 {
h.Reset()
_, err := io.WriteString(h, key)
if err != nil {
panic(err)
}
return Hash(h.Sum64(), buckets)
}
// KeyHasher is a subset of hash.Hash64 in the standard library.
type KeyHasher interface {
// Write (via the embedded io.Writer interface) adds more data to the
// running hash.
// It never returns an error.
io.Writer
// Reset resets the KeyHasher to its initial state.
Reset()
// Return the result of the added bytes (via io.Writer).
Sum64() uint64
}
// Hasher represents a jump consistent hasher using a string as key.
type Hasher struct {
n int32
h KeyHasher
}
// New returns a new instance of of Hasher.
func New(n int, h KeyHasher) *Hasher {
return &Hasher{int32(n), h}
}
// N returns the number of buckets the hasher can assign to.
func (h *Hasher) N() int {
return int(h.n)
}
// Hash returns the integer hash for the given key.
func (h *Hasher) Hash(key string) int {
return int(HashString(key, h.n, h.h))
}
// KeyHashers available in the standard library for use with HashString() and Hasher.
var (
// CRC32 uses the 32-bit Cyclic Redundancy Check (CRC-32) with the IEEE
// polynomial.
NewCRC32 func() hash.Hash64 = func() hash.Hash64 { return &crc32Hasher{crc32.NewIEEE()} }
// CRC64 uses the 64-bit Cyclic Redundancy Check (CRC-64) with the ECMA
// polynomial.
NewCRC64 func() hash.Hash64 = func() hash.Hash64 { return crc64.New(crc64.MakeTable(crc64.ECMA)) }
// FNV1 uses the non-cryptographic hash function FNV-1.
NewFNV1 func() hash.Hash64 = func() hash.Hash64 { return fnv.New64() }
// FNV1a uses the non-cryptographic hash function FNV-1a.
NewFNV1a func() hash.Hash64 = func() hash.Hash64 { return fnv.New64a() }
// These are deprecated because they're not safe for concurrent use. Please
// use the New* functions instead.
CRC32 hash.Hash64 = &crc32Hasher{crc32.NewIEEE()}
CRC64 hash.Hash64 = crc64.New(crc64.MakeTable(crc64.ECMA))
FNV1 hash.Hash64 = fnv.New64()
FNV1a hash.Hash64 = fnv.New64a()
)

3
vendor/modules.txt vendored
View file

@ -157,9 +157,6 @@ github.com/klauspost/compress/internal/snapref
github.com/klauspost/compress/zlib github.com/klauspost/compress/zlib
github.com/klauspost/compress/zstd github.com/klauspost/compress/zstd
github.com/klauspost/compress/zstd/internal/xxhash github.com/klauspost/compress/zstd/internal/xxhash
# github.com/lithammer/go-jump-consistent-hash v1.0.2
## explicit; go 1.16
github.com/lithammer/go-jump-consistent-hash
# github.com/mattn/go-colorable v0.1.12 # github.com/mattn/go-colorable v0.1.12
## explicit; go 1.13 ## explicit; go 1.13
github.com/mattn/go-colorable github.com/mattn/go-colorable