From 021ee53ba8cac976730fb669af54f60751181c91 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Sun, 6 Feb 2022 20:20:02 +0200 Subject: [PATCH] app/vminsert: improve re-routing logic in order to spread rows more evenly among the available storage nodes --- app/vminsert/main.go | 4 +- app/vminsert/netstorage/consistent_hash.go | 51 ++++ .../netstorage/consistent_hash_test.go | 65 ++++ .../netstorage/consistent_hash_timing_test.go | 40 +++ app/vminsert/netstorage/insert_ctx.go | 7 +- app/vminsert/netstorage/netstorage.go | 282 +++++++++++------- docs/CHANGELOG.md | 1 + go.mod | 1 - go.sum | 2 - .../lithammer/go-jump-consistent-hash/LICENSE | 21 -- .../go-jump-consistent-hash/README.md | 61 ---- .../go-jump-consistent-hash/crc32.go | 38 --- .../lithammer/go-jump-consistent-hash/doc.go | 135 --------- .../lithammer/go-jump-consistent-hash/jump.go | 95 ------ vendor/modules.txt | 3 - 15 files changed, 335 insertions(+), 471 deletions(-) create mode 100644 app/vminsert/netstorage/consistent_hash.go create mode 100644 app/vminsert/netstorage/consistent_hash_test.go create mode 100644 app/vminsert/netstorage/consistent_hash_timing_test.go delete mode 100644 vendor/github.com/lithammer/go-jump-consistent-hash/LICENSE delete mode 100644 vendor/github.com/lithammer/go-jump-consistent-hash/README.md delete mode 100644 vendor/github.com/lithammer/go-jump-consistent-hash/crc32.go delete mode 100644 vendor/github.com/lithammer/go-jump-consistent-hash/doc.go delete mode 100644 vendor/github.com/lithammer/go-jump-consistent-hash/jump.go diff --git a/app/vminsert/main.go b/app/vminsert/main.go index ce9fae2c5..67f750384 100644 --- a/app/vminsert/main.go +++ b/app/vminsert/main.go @@ -80,12 +80,12 @@ func main() { if len(*storageNodes) == 0 { logger.Fatalf("missing -storageNode arg") } - hashSeed := byte(0) + hashSeed := uint64(0) if *clusternativeListenAddr != "" { // Use different hash seed for the second level of vminsert nodes in multi-level cluster setup. // This should fix uneven distribution of time series among storage nodes. // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1672 - hashSeed = 42 + hashSeed = 0xabcdef0123456789 } netstorage.InitStorageNodes(*storageNodes, hashSeed) logger.Infof("successfully initialized netstorage in %.3f seconds", time.Since(startTime).Seconds()) diff --git a/app/vminsert/netstorage/consistent_hash.go b/app/vminsert/netstorage/consistent_hash.go new file mode 100644 index 000000000..2d8844da9 --- /dev/null +++ b/app/vminsert/netstorage/consistent_hash.go @@ -0,0 +1,51 @@ +package netstorage + +import ( + xxhash "github.com/cespare/xxhash/v2" +) + +// See the following docs: +// - https://www.eecs.umich.edu/techreports/cse/96/CSE-TR-316-96.pdf +// - https://github.com/dgryski/go-rendezvous +// - https://dgryski.medium.com/consistent-hashing-algorithmic-tradeoffs-ef6b8e2fcae8 +type consistentHash struct { + hashSeed uint64 + nodeHashes []uint64 +} + +func newConsistentHash(nodes []string, hashSeed uint64) *consistentHash { + nodeHashes := make([]uint64, len(nodes)) + for i, node := range nodes { + nodeHashes[i] = xxhash.Sum64([]byte(node)) + } + return &consistentHash{ + hashSeed: hashSeed, + nodeHashes: nodeHashes, + } +} + +func (rh *consistentHash) getNodeIdx(h uint64, excludeIdxs []int) int { + var mMax uint64 + var idx int + h ^= rh.hashSeed +next: + for i, nh := range rh.nodeHashes { + for _, j := range excludeIdxs { + if i == j { + continue next + } + } + if m := fastHashUint64(nh ^ h); m > mMax { + mMax = m + idx = i + } + } + return idx +} + +func fastHashUint64(x uint64) uint64 { + x ^= x >> 12 // a + x ^= x << 25 // b + x ^= x >> 27 // c + return x * 2685821657736338717 +} diff --git a/app/vminsert/netstorage/consistent_hash_test.go b/app/vminsert/netstorage/consistent_hash_test.go new file mode 100644 index 000000000..405c20894 --- /dev/null +++ b/app/vminsert/netstorage/consistent_hash_test.go @@ -0,0 +1,65 @@ +package netstorage + +import ( + "math" + "math/rand" + "testing" +) + +func TestConsistentHash(t *testing.T) { + r := rand.New(rand.NewSource(1)) + + nodes := []string{ + "node1", + "node2", + "node3", + "node4", + } + rh := newConsistentHash(nodes, 0) + + keys := make([]uint64, 100000) + for i := 0; i < len(keys); i++ { + keys[i] = r.Uint64() + } + perIdxCounts := make([]int, len(nodes)) + keyIndexes := make([]int, len(keys)) + for i, k := range keys { + idx := rh.getNodeIdx(k, nil) + perIdxCounts[idx]++ + keyIndexes[i] = idx + } + // verify that the number of selected node indexes per each node is roughly the same + expectedPerIdxCount := float64(len(keys)) / float64(len(nodes)) + for _, perIdxCount := range perIdxCounts { + if p := math.Abs(float64(perIdxCount)-expectedPerIdxCount) / expectedPerIdxCount; p > 0.005 { + t.Fatalf("uneven number of per-index items %f: %d", p, perIdxCounts) + } + } + // Ignore a single node and verify that the selection for the remaining nodes is even + perIdxCounts = make([]int, len(nodes)) + idxsExclude := []int{1} + indexMismatches := 0 + for i, k := range keys { + idx := rh.getNodeIdx(k, idxsExclude) + perIdxCounts[idx]++ + if keyIndexes[i] != idx { + indexMismatches++ + } + } + maxIndexMismatches := float64(len(keys)) / float64(len(nodes)) + if float64(indexMismatches) > maxIndexMismatches { + t.Fatalf("too many index mismtaches after excluding a node; got %d; want no more than %f", indexMismatches, maxIndexMismatches) + } + expectedPerIdxCount = float64(len(keys)) / float64(len(nodes)-1) + for i, perIdxCount := range perIdxCounts { + if i == idxsExclude[0] { + if perIdxCount != 0 { + t.Fatalf("unexpected non-zero items for excluded index %d: %d items", idxsExclude[0], perIdxCount) + } + continue + } + if p := math.Abs(float64(perIdxCount)-expectedPerIdxCount) / expectedPerIdxCount; p > 0.005 { + t.Fatalf("uneven number of per-index items %f: %d", p, perIdxCounts) + } + } +} diff --git a/app/vminsert/netstorage/consistent_hash_timing_test.go b/app/vminsert/netstorage/consistent_hash_timing_test.go new file mode 100644 index 000000000..16406ceb6 --- /dev/null +++ b/app/vminsert/netstorage/consistent_hash_timing_test.go @@ -0,0 +1,40 @@ +package netstorage + +import ( + "math/rand" + "sync/atomic" + "testing" +) + +func BenchmarkConsistentHash(b *testing.B) { + nodes := []string{ + "node1", + "node2", + "node3", + "node4", + } + rh := newConsistentHash(nodes, 0) + b.ReportAllocs() + b.SetBytes(int64(len(benchKeys))) + b.RunParallel(func(pb *testing.PB) { + sum := 0 + for pb.Next() { + for _, k := range benchKeys { + idx := rh.getNodeIdx(k, nil) + sum += idx + } + } + atomic.AddUint64(&BenchSink, uint64(sum)) + }) +} + +var benchKeys = func() []uint64 { + r := rand.New(rand.NewSource(1)) + keys := make([]uint64, 10000) + for i := 0; i < len(keys); i++ { + keys[i] = r.Uint64() + } + return keys +}() + +var BenchSink uint64 diff --git a/app/vminsert/netstorage/insert_ctx.go b/app/vminsert/netstorage/insert_ctx.go index dbd5af1f4..b61bb41d6 100644 --- a/app/vminsert/netstorage/insert_ctx.go +++ b/app/vminsert/netstorage/insert_ctx.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" xxhash "github.com/cespare/xxhash/v2" - jump "github.com/lithammer/go-jump-consistent-hash" ) // InsertCtx is a generic context for inserting data. @@ -162,9 +161,6 @@ func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) i } buf := ctx.labelsBuf[:0] - if hashSeed != 0 { - buf = append(buf, hashSeed) - } buf = encoding.MarshalUint32(buf, at.AccountID) buf = encoding.MarshalUint32(buf, at.ProjectID) for i := range labels { @@ -175,7 +171,8 @@ func (ctx *InsertCtx) GetStorageNodeIdx(at *auth.Token, labels []prompb.Label) i h := xxhash.Sum64(buf) ctx.labelsBuf = buf - idx := int(jump.Hash(h, int32(len(storageNodes)))) + // Do not exclude unavailable storage nodes in order to properly account for rerouted rows in storageNode.push(). + idx := nodesHash.getNodeIdx(h, nil) return idx } diff --git a/app/vminsert/netstorage/netstorage.go b/app/vminsert/netstorage/netstorage.go index 56986e8ba..12c0cba52 100644 --- a/app/vminsert/netstorage/netstorage.go +++ b/app/vminsert/netstorage/netstorage.go @@ -6,7 +6,6 @@ import ( "fmt" "io" "net" - "sort" "sync" "sync/atomic" "time" @@ -30,31 +29,42 @@ var ( replicationFactor = flag.Int("replicationFactor", 1, "Replication factor for the ingested data, i.e. how many copies to make among distinct -storageNode instances. "+ "Note that vmselect must run with -dedup.minScrapeInterval=1ms for data de-duplication when replicationFactor is greater than 1. "+ "Higher values for -dedup.minScrapeInterval at vmselect is OK") - disableRerouting = flag.Bool(`disableRerouting`, true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate") + disableRerouting = flag.Bool("disableRerouting", true, "Whether to disable re-routing when some of vmstorage nodes accept incoming data at slower speed compared to other storage nodes. Disabled re-routing limits the ingestion rate by the slowest vmstorage node. On the other side, disabled re-routing minimizes the number of active time series in the cluster during rolling restarts and during spikes in series churn rate") ) var errStorageReadOnly = errors.New("storage node is read only") -func (sn *storageNode) isNotReady() bool { - return atomic.LoadUint32(&sn.broken) != 0 || atomic.LoadUint32(&sn.isReadOnly) != 0 +func (sn *storageNode) isReady() bool { + return atomic.LoadUint32(&sn.broken) == 0 && atomic.LoadUint32(&sn.isReadOnly) == 0 } // push pushes buf to sn internal bufs. // // This function doesn't block on fast path. -// It may block only if all the storageNodes cannot handle the incoming ingestion rate. +// It may block only if storageNodes cannot handle the incoming ingestion rate. // This blocking provides backpressure to the caller. // // The function falls back to sending data to other vmstorage nodes // if sn is currently unavailable or overloaded. // -// rows is the number of rows in the buf. +// rows must match the number of rows in the buf. func (sn *storageNode) push(buf []byte, rows int) error { if len(buf) > maxBufSizePerStorageNode { logger.Panicf("BUG: len(buf)=%d cannot exceed %d", len(buf), maxBufSizePerStorageNode) } sn.rowsPushed.Add(rows) + if sn.trySendBuf(buf, rows) { + // Fast path - the buffer is successfully sent to sn. + return nil + } + // Slow path - sn cannot accept buf now, so re-route it to other vmstorage nodes. + if err := sn.rerouteBufToOtherStorageNodes(buf, rows); err != nil { + return fmt.Errorf("error when re-routing rows from %s: %w", sn.dialer.Addr(), err) + } + return nil +} +func (sn *storageNode) rerouteBufToOtherStorageNodes(buf []byte, rows int) error { sn.brLock.Lock() again: select { @@ -63,15 +73,17 @@ again: return fmt.Errorf("cannot send %d rows because of graceful shutdown", rows) default: } - if sn.isNotReady() { + if !sn.isReady() { if len(storageNodes) == 1 { // There are no other storage nodes to re-route to. So wait until the current node becomes healthy. sn.brCond.Wait() goto again } sn.brLock.Unlock() - // The vmstorage node is temporarily broken. Re-route buf to healthy vmstorage nodes even if *disableRerouting==true. - if err := rerouteRowsMayBlock(sn, false, buf, rows); err != nil { + // The vmstorage node isn't ready for data processing. Re-route buf to healthy vmstorage nodes even if disableRerouting is set. + rowsProcessed, err := rerouteRowsToReadyStorageNodes(sn, buf) + rows -= rowsProcessed + if err != nil { return fmt.Errorf("%d rows dropped because the current vsmtorage is unavailable and %w", rows, err) } return nil @@ -83,16 +95,15 @@ again: sn.brLock.Unlock() return nil } + // Slow path: the buf contents doesn't fit sn.buf, so try re-routing it to other vmstorage nodes. if *disableRerouting || len(storageNodes) == 1 { sn.brCond.Wait() goto again } sn.brLock.Unlock() - - // The buf contents doesn't fit sn.buf. - // This means that the current vmstorage is slow or will become broken soon. - // Spread buf among all the vmstorage nodes. - if err := rerouteRowsMayBlock(sn, true, buf, rows); err != nil { + rowsProcessed, err := rerouteRowsToFreeStorageNodes(sn, buf) + rows -= rowsProcessed + if err != nil { return fmt.Errorf("%d rows dropped because the current vmstorage buf is full and %w", rows, err) } return nil @@ -244,7 +255,7 @@ func (sn *storageNode) checkHealth() { } func (sn *storageNode) sendBufRowsNonblocking(br *bufRows) bool { - if sn.isNotReady() { + if !sn.isReady() { return false } sn.bcLock.Lock() @@ -363,9 +374,6 @@ func (sn *storageNode) dial() (*handshake.BufferedConn, error) { // storageNode is a client sending data to vmstorage node. type storageNode struct { - // The last time for the re-routing. - lastRerouteTime uint64 - // broken is set to non-zero if the given vmstorage node is temporarily unhealthy. // In this case the data is re-routed to the remaining healthy vmstorage nodes. broken uint32 @@ -434,23 +442,17 @@ var storageNodesWG sync.WaitGroup var storageNodesStopCh = make(chan struct{}) -// hashSeed is a seed for distributing time series amont storage nodes. -var hashSeed byte +// nodesHash is used for consistently selecting a storage node by key. +var nodesHash *consistentHash // InitStorageNodes initializes vmstorage nodes' connections to the given addrs. // -// eed is used for changing the distribution of input time series among addrs. -func InitStorageNodes(addrs []string, seed byte) { +// hashSeed is used for changing the distribution of input time series among addrs. +func InitStorageNodes(addrs []string, hashSeed uint64) { if len(addrs) == 0 { logger.Panicf("BUG: addrs must be non-empty") } - hashSeed = seed - - // Sort addrs in order to guarantee identical series->vmstorage mapping across all the vminsert nodes. - addrsCopy := append([]string{}, addrs...) - sort.Strings(addrsCopy) - addrs = addrsCopy - + nodesHash = newConsistentHash(addrs, hashSeed) storageNodes = storageNodes[:0] for _, addr := range addrs { if _, _, err := net.SplitHostPort(addr); err != nil { @@ -517,33 +519,13 @@ func Stop() { storageNodesWG.Wait() } -// rerouteRowsMayBlock re-routes rows from buf among healthy storage nodes. +// rerouteRowsToReadyStorageNodes reroutes src from not ready snSource to ready storage nodes. // -// It waits until healthy storage nodes have enough space for the re-routed rows. -// This guarantees backpressure if the ingestion rate exceeds vmstorage nodes' -// ingestion rate capacity. -// -// It returns non-nil error only if Stop is called. -func rerouteRowsMayBlock(snSource *storageNode, mayUseSNSource bool, buf []byte, rows int) error { - if len(storageNodes) < 2 { - logger.Panicf("BUG: re-routing can work only if at least 2 storage nodes are configured; got %d nodes", len(storageNodes)) - } - reroutesTotal.Inc() - atomic.StoreUint64(&snSource.lastRerouteTime, fasttime.UnixTimestamp()) - sns := getStorageNodesMapForRerouting(snSource, mayUseSNSource) - if areStorageNodesEqual(sns) { - // Fast path - all the storage nodes are the same - send the buf to them. - sn := sns[0] - if !sn.sendBufMayBlock(buf) { - return fmt.Errorf("cannot re-route data because of graceful shutdown") - } - if sn != snSource { - snSource.rowsReroutedFromHere.Add(rows) - sn.rowsReroutedToHere.Add(rows) - } - return nil - } - src := buf +// The function blocks until src is fully re-routed. +func rerouteRowsToReadyStorageNodes(snSource *storageNode, src []byte) (int, error) { + rowsProcessed := 0 + var idxsExclude, idxsExcludeNew []int + idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], nil) var mr storage.MetricRow for len(src) > 0 { tail, err := mr.UnmarshalX(src) @@ -555,17 +537,149 @@ func rerouteRowsMayBlock(snSource *storageNode, mayUseSNSource bool, buf []byte, reroutedRowsProcessed.Inc() h := xxhash.Sum64(mr.MetricNameRaw) mr.ResetX() - idx := h % uint64(len(sns)) - sn := sns[idx] - if !sn.sendBufMayBlock(rowBuf) { - return fmt.Errorf("cannot re-route data because of graceful shutdown") + var sn *storageNode + for { + idx := nodesHash.getNodeIdx(h, idxsExclude) + sn = storageNodes[idx] + if sn.isReady() { + break + } + // re-generate idxsExclude list, since sn must be put there. + idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], nil) } + if *disableRerouting { + if !sn.sendBufMayBlock(rowBuf) { + return rowsProcessed, fmt.Errorf("graceful shutdown started") + } + rowsProcessed++ + if sn != snSource { + snSource.rowsReroutedFromHere.Inc() + sn.rowsReroutedToHere.Inc() + } + continue + } + if sn.trySendBuf(rowBuf, 1) { + rowsProcessed++ + if sn != snSource { + snSource.rowsReroutedFromHere.Inc() + sn.rowsReroutedToHere.Inc() + } + continue + } + // If the re-routing is enabled, then try sending the row to another storage node. + idxsExcludeNew = getNotReadyStorageNodeIdxsBlocking(idxsExcludeNew[:0], sn) + idx := nodesHash.getNodeIdx(h, idxsExcludeNew) + snNew := storageNodes[idx] + if snNew.trySendBuf(rowBuf, 1) { + rowsProcessed++ + if snNew != snSource { + snSource.rowsReroutedFromHere.Inc() + snNew.rowsReroutedToHere.Inc() + } + continue + } + // Fall back to sending the row to sn in order to minimize re-routing. + if !sn.sendBufMayBlock(rowBuf) { + return rowsProcessed, fmt.Errorf("graceful shutdown started") + } + rowsProcessed++ if sn != snSource { snSource.rowsReroutedFromHere.Inc() sn.rowsReroutedToHere.Inc() } } - return nil + return rowsProcessed, nil +} + +// reouteRowsToFreeStorageNodes re-routes src from snSource to other storage nodes. +// +// It is expected that snSource has no enough buffer for sending src. +// It is expected than *dsableRerouting isn't set when calling this function. +func rerouteRowsToFreeStorageNodes(snSource *storageNode, src []byte) (int, error) { + if *disableRerouting { + logger.Panicf("BUG: disableRerouting must be disabled when calling rerouteRowsToFreeStorageNodes") + } + rowsProcessed := 0 + var idxsExclude []int + idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], snSource) + var mr storage.MetricRow + for len(src) > 0 { + tail, err := mr.UnmarshalX(src) + if err != nil { + logger.Panicf("BUG: cannot unmarshal MetricRow: %s", err) + } + rowBuf := src[:len(src)-len(tail)] + src = tail + reroutedRowsProcessed.Inc() + h := xxhash.Sum64(mr.MetricNameRaw) + mr.ResetX() + // Try sending the row to snSource in order to minimize re-routing. + if snSource.trySendBuf(rowBuf, 1) { + rowsProcessed++ + continue + } + // The row couldn't be sent to snSrouce. Try re-routing it to other nodes. + var sn *storageNode + for { + idx := nodesHash.getNodeIdx(h, idxsExclude) + sn = storageNodes[idx] + if sn.isReady() { + break + } + // re-generate idxsExclude list, since sn must be put there. + idxsExclude = getNotReadyStorageNodeIdxsBlocking(idxsExclude[:0], snSource) + } + if sn.trySendBuf(rowBuf, 1) { + rowsProcessed++ + snSource.rowsReroutedFromHere.Inc() + sn.rowsReroutedToHere.Inc() + continue + } + // Fall back sending the row to snSource in order to minimize re-routing. + if !snSource.sendBufMayBlock(rowBuf) { + return rowsProcessed, fmt.Errorf("graceful shutdown started") + } + rowsProcessed++ + } + return rowsProcessed, nil +} + +func getNotReadyStorageNodeIdxsBlocking(dst []int, snExtra *storageNode) []int { + dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra) + if len(dst) < len(storageNodes) { + return dst + } + logger.Warnf("all the vmstorage nodes are unavailable; stopping data processing util at least a single node becomes available") + for { + time.Sleep(time.Second) + dst = getNotReadyStorageNodeIdxs(dst[:0], snExtra) + if availableNodes := len(storageNodes) - len(dst); availableNodes > 0 { + logger.Warnf("%d vmstorage nodes became available, so continue data processing", availableNodes) + return dst + } + } +} + +func getNotReadyStorageNodeIdxs(dst []int, snExtra *storageNode) []int { + dst = dst[:0] + for i, sn := range storageNodes { + if sn == snExtra && !sn.isReady() { + dst = append(dst, i) + } + } + return dst +} + +func (sn *storageNode) trySendBuf(buf []byte, rows int) bool { + sent := false + sn.brLock.Lock() + if sn.isReady() && len(sn.br.buf)+len(buf) <= maxBufSizePerStorageNode { + sn.br.buf = append(sn.br.buf, buf...) + sn.br.rows += rows + sent = true + } + sn.brLock.Unlock() + return sent } func (sn *storageNode) sendBufMayBlock(buf []byte) bool { @@ -621,54 +735,6 @@ func (sn *storageNode) checkReadOnlyMode() { } } -func getStorageNodesMapForRerouting(snExclude *storageNode, mayUseSNExclude bool) []*storageNode { - sns := getStorageNodesForRerouting(snExclude, true) - if len(sns) == len(storageNodes) { - return sns - } - if !mayUseSNExclude { - sns = getStorageNodesForRerouting(snExclude, false) - } - for len(sns) < len(storageNodes) { - sns = append(sns, snExclude) - } - return sns -} - -func areStorageNodesEqual(sns []*storageNode) bool { - snOrigin := sns[0] - for _, sn := range sns[1:] { - if sn != snOrigin { - return false - } - } - return true -} - -func getStorageNodesForRerouting(snExclude *storageNode, skipRecentlyReroutedNodes bool) []*storageNode { - sns := make([]*storageNode, 0, len(storageNodes)) - currentTime := fasttime.UnixTimestamp() - for i, sn := range storageNodes { - if sn == snExclude || sn.isNotReady() { - // Skip snExclude and broken storage nodes. - continue - } - if skipRecentlyReroutedNodes && currentTime <= atomic.LoadUint64(&sn.lastRerouteTime)+5 { - // Skip nodes, which were re-routed recently. - continue - } - for len(sns) <= i { - sns = append(sns, sn) - } - } - if len(sns) > 0 { - for len(sns) < len(storageNodes) { - sns = append(sns, sns[0]) - } - } - return sns -} - var ( maxBufSizePerStorageNode int diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 78c98a4ce..0d8580476 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,7 @@ sort: 15 ## tip +* FEATURE: [VictoriaMetrics cluster](https://docs.victoriametrics.com/Cluster-VictoriaMetrics.html): improve re-routing logic, so it re-routes incoming data more evenly if some of `vmstorage` nodes are temporarily unavailable and/or accept data at slower rate than other `vmstorage` nodes. * FEATURE: [MetricsQL](https://docs.victoriametrics.com/MetricsQL.html): cover more cases with the [label filters' propagation optimization](https://utcc.utoronto.ca/~cks/space/blog/sysadmin/PrometheusLabelNonOptimization). This should improve the average performance for practical queries. The following cases are additionally covered: * Multi-level [transform functions](https://docs.victoriametrics.com/MetricsQL.html#transform-functions). For example, `abs(round(foo{a="b"})) + bar{x="y"}` is now optimized to `abs(round(foo{a="b",x="y"})) + bar{a="b",x="y"}` * Binary operations with `on()`, `without()`, `group_left()` and `group_right()` modifiers. For example, `foo{a="b"} on (a) + bar` is now optimized to `foo{a="b"} on (a) + bar{a="b"}` diff --git a/go.mod b/go.mod index 9a24e48e4..3c2e48268 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/golang/snappy v0.0.4 github.com/influxdata/influxdb v1.9.5 github.com/klauspost/compress v1.14.2 - github.com/lithammer/go-jump-consistent-hash v1.0.2 github.com/prometheus/prometheus v1.8.2-0.20201119142752-3ad25a6dc3d9 github.com/urfave/cli/v2 v2.3.0 github.com/valyala/fastjson v1.6.3 diff --git a/go.sum b/go.sum index 90c3c83e3..77165a794 100644 --- a/go.sum +++ b/go.sum @@ -678,8 +678,6 @@ github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM= github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4= -github.com/lithammer/go-jump-consistent-hash v1.0.2 h1:w74N9XiMa4dWZdoVnfLbnDhfpGOMCxlrudzt2e7wtyk= -github.com/lithammer/go-jump-consistent-hash v1.0.2/go.mod h1:4MD1WDikNGnb9D56hAtscaZaOWOiCG+lLbRR5ZN9JL0= github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= diff --git a/vendor/github.com/lithammer/go-jump-consistent-hash/LICENSE b/vendor/github.com/lithammer/go-jump-consistent-hash/LICENSE deleted file mode 100644 index dee3d1de2..000000000 --- a/vendor/github.com/lithammer/go-jump-consistent-hash/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2018 Peter Lithammer - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/vendor/github.com/lithammer/go-jump-consistent-hash/README.md b/vendor/github.com/lithammer/go-jump-consistent-hash/README.md deleted file mode 100644 index 38c171d9a..000000000 --- a/vendor/github.com/lithammer/go-jump-consistent-hash/README.md +++ /dev/null @@ -1,61 +0,0 @@ -# Jump Consistent Hash - -[![Build Status](https://github.com/lithammer/go-jump-consistent-hash/workflows/Go/badge.svg)](https://github.com/lithammer/go-jump-consistent-hash/actions) -[![Godoc](https://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/lithammer/go-jump-consistent-hash) - -Go implementation of the jump consistent hash algorithm[1] by John Lamping and Eric Veach. - -[1] http://arxiv.org/pdf/1406.2294v1.pdf - -## Usage - -```go -import jump "github.com/lithammer/go-jump-consistent-hash" - -func main() { - h := jump.Hash(256, 1024) // h = 520 -} -``` - -Includes a helper function for using a `string` as key instead of an `uint64`. This requires a hasher that computes the string into a format accepted by `Hash()`. Such a hasher that uses [CRC-64 (ECMA)](https://en.wikipedia.org/wiki/Cyclic_redundancy_check) is also included for convenience. - -```go -h := jump.HashString("127.0.0.1", 8, jump.NewCRC64()) // h = 7 -``` - -In reality though you probably want to use a `Hasher` so you won't have to repeat the bucket size and which key hasher used. It also uses more convenient types, like `int` instead of `int32`. - -```go -hasher := jump.New(8, jump.NewCRC64()) -h := hasher.Hash("127.0.0.1") // h = 7 -``` - -If you want to use your own algorithm, you must implement the `KeyHasher` interface, which is a subset of the `hash.Hash64` interface available in the standard library. - -Here's an example of a custom `KeyHasher` that uses Google's [FarmHash](https://github.com/google/farmhash) algorithm (the successor of CityHash) to compute the final key. - -```go -type FarmHash struct { - buf bytes.Buffer -} - -func (f *FarmHash) Write(p []byte) (n int, err error) { - return f.buf.Write(p) -} - -func (f *FarmHash) Reset() { - f.buf.Reset() -} - -func (f *FarmHash) Sum64() uint64 { - // https://github.com/dgryski/go-farm - return farm.Hash64(f.buf.Bytes()) -} - -hasher := jump.New(8, &FarmHash{}) -h := hasher.Hash("127.0.0.1") // h = 5 -``` - -## License - -MIT diff --git a/vendor/github.com/lithammer/go-jump-consistent-hash/crc32.go b/vendor/github.com/lithammer/go-jump-consistent-hash/crc32.go deleted file mode 100644 index a2b7843df..000000000 --- a/vendor/github.com/lithammer/go-jump-consistent-hash/crc32.go +++ /dev/null @@ -1,38 +0,0 @@ -package jump - -import "hash" - -type crc32Hasher struct { - crc32 hash.Hash32 -} - -func (h *crc32Hasher) Write(p []byte) (n int, err error) { - return h.crc32.Write(p) -} - -func (h *crc32Hasher) Sum(b []byte) []byte { - return h.crc32.Sum(b) -} - -func (h *crc32Hasher) Reset() { - h.crc32.Reset() -} - -func (h *crc32Hasher) Size() int { - return h.crc32.Size() -} - -func (h *crc32Hasher) BlockSize() int { - return h.crc32.BlockSize() -} - -func (h *crc32Hasher) Sum32() uint32 { - return h.crc32.Sum32() -} - -func (h *crc32Hasher) Sum64() uint64 { - return uint64(h.crc32.Sum32()) -} - -var _ hash.Hash32 = (*crc32Hasher)(nil) -var _ hash.Hash64 = (*crc32Hasher)(nil) diff --git a/vendor/github.com/lithammer/go-jump-consistent-hash/doc.go b/vendor/github.com/lithammer/go-jump-consistent-hash/doc.go deleted file mode 100644 index 53464ac0c..000000000 --- a/vendor/github.com/lithammer/go-jump-consistent-hash/doc.go +++ /dev/null @@ -1,135 +0,0 @@ -/* -Package jump implements the "jump consistent hash" algorithm. - -Example - - h := jump.Hash(256, 1024) // h = 520 - -Reference C++ implementation[1] - - int32_t JumpConsistentHash(uint64_t key, int32_t num_buckets) { - int64_t b = -1, j = 0; - while (j < num_buckets) { - b = j; - key = key * 2862933555777941757ULL + 1; - j = (b + 1) * (double(1LL << 31) / double((key >> 33) + 1)); - } - return b; - } - -Explanation of the algorithm - -Jump consistent hash works by computing when its output changes as the -number of buckets increases. Let ch(key, num_buckets) be the consistent hash -for the key when there are num_buckets buckets. Clearly, for any key, k, -ch(k, 1) is 0, since there is only the one bucket. In order for the -consistent hash function to balanced, ch(k, 2) will have to stay at 0 for -half the keys, k, while it will have to jump to 1 for the other half. In -general, ch(k, n+1) has to stay the same as ch(k, n) for n/(n+1) of the -keys, and jump to n for the other 1/(n+1) of the keys. - -Here are examples of the consistent hash values for three keys, k1, k2, and -k3, as num_buckets goes up: - - │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │ 8 │ 9 │ 10 │ 11 │ 12 │ 13 │ 14 - ───┼───┼───┼───┼───┼───┼───┼───┼───┼───┼────┼────┼────┼────┼──── - k1 │ 0 │ 0 │ 2 │ 2 │ 4 │ 4 │ 4 │ 4 │ 4 │ 4 │ 4 │ 4 │ 4 │ 4 - ───┼───┼───┼───┼───┼───┼───┼───┼───┼───┼────┼────┼────┼────┼──── - k2 │ 0 │ 1 │ 1 │ 1 │ 1 │ 1 │ 1 │ 7 │ 7 │ 7 │ 7 │ 7 │ 7 │ 7 - ───┼───┼───┼───┼───┼───┼───┼───┼───┼───┼────┼────┼────┼────┼──── - k3 │ 0 │ 1 │ 1 │ 1 │ 1 │ 5 │ 5 │ 7 │ 7 │ 7 │ 10 │ 10 │ 10 │ 10 - -A linear time algorithm can be defined by using the formula for the -probability of ch(key, j) jumping when j increases. It essentially walks -across a row of this table. Given a key and number of buckets, the algorithm -considers each successive bucket, j, from 1 to num_buckets­1, and uses -ch(key, j) to compute ch(key, j+1). At each bucket, j, it decides whether to -keep ch(k, j+1) the same as ch(k, j), or to jump its value to j. In order to -jump for the right fraction of keys, it uses a pseudo­random number -generator with the key as its seed. To jump for 1/(j+1) of keys, it -generates a uniform random number between 0.0 and 1.0, and jumps if the -value is less than 1/(j+1). At the end of the loop, it has computed -ch(k, num_buckets), which is the desired answer. In code: - - int ch(int key, int num_buckets) { - random.seed(key); - int b = 0; // This will track ch(key,j+1). - for (int j = 1; j < num_buckets; j++) { - if (random.next() < 1.0 / (j + 1)) b = j; - } - return b; - } - -We can convert this to a logarithmic time algorithm by exploiting that -ch(key, j+1) is usually unchanged as j increases, only jumping occasionally. -The algorithm will only compute the destinations of jumps ­­ the j’s for -which ch(key, j+1) ≠ ch(key, j). Also notice that for these j’s, ch(key, -j+1) = j. To develop the algorithm, we will treat ch(key, j) as a random -variable, so that we can use the notation for random variables to analyze -the fractions of keys for which various propositions are true. That will -lead us to a closed form expression for a pseudo­random variable whose value -gives the destination of the next jump. - -Suppose that the algorithm is tracking the bucket numbers of the jumps for a -particular key, k. And suppose that b was the destination of the last jump, -that is, ch(k, b) ≠ ch(k, b+1), and ch(k, b+1) = b. Now, we want to find the -next jump, the smallest j such that ch(k, j+1) ≠ ch(k, b+1), or -equivalently, the largest j such that ch(k, j) = ch(k, b+1). We will make a -pseudo­random variable whose value is that j. To get a probabilistic -constraint on j, note that for any bucket number, i, we have j ≥ i if and -only if the consistent hash hasn’t changed by i, that is, if and only if -ch(k, i) = ch(k, b+1). Hence, the distribution of j must satisfy - - P(j ≥ i) = P( ch(k, i) = ch(k, b+1) ) - -Fortunately, it is easy to compute that probability. Notice that since P( -ch(k, 10) = ch(k, 11) ) is 10/11, and P( ch(k, 11) = ch(k, 12) ) is 11/12, -then P( ch(k, 10) = ch(k, 12) ) is 10/11 * 11/12 = 10/12. In general, if n ≥ -m, P( ch(k, n) = ch(k, m) ) = m / n. Thus for any i > b, - - P(j ≥ i) = P( ch(k, i) = ch(k, b+1) ) = (b+1) / i . - -Now, we generate a pseudo­random variable, r, (depending on k and j) that is -uniformly distributed between 0 and 1. Since we want P(j ≥ i) = (b+1) / i, -we set P(j ≥ i) iff r ≤ (b+1) / i. Solving the inequality for i yields P(j ≥ -i) iff i ≤ (b+1) / r. Since i is a lower bound on j, j will equal the -largest i for which P(j ≥ i), thus the largest i satisfying i ≤ (b+1) / r. -Thus, by the definition of the floor function, j = floor((b+1) / r). - -Using this formula, jump consistent hash finds ch(key, num_buckets) by -choosing successive jump destinations until it finds a position at or past -num_buckets. It then knows that the previous jump destination is the answer. - - int ch(int key, int num_buckets) { - random.seed(key); - int b = -1; // bucket number before the previous jump - int j = 0; // bucket number before the current jump - while (j < num_buckets) { - b = j; - r = random.next(); - j = floor((b + 1) / r); - } - return = b; - } - -To turn this into the actual code of figure 1, we need to implement random. -We want it to be fast, and yet to also to have well distributed successive -values. We use a 64­bit linear congruential generator; the particular -multiplier we use produces random numbers that are especially well -distributed in higher dimensions (i.e., when successive random values are -used to form tuples). We use the key as the seed. (For keys that don’t fit -into 64 bits, a 64 bit hash of the key should be used.) The congruential -generator updates the seed on each iteration, and the code derives a double -from the current seed. Tests show that this generator has good speed and -distribution. - -It is worth noting that unlike the algorithm of Karger et al., jump -consistent hash does not require the key to be hashed if it is already an -integer. This is because jump consistent hash has an embedded pseudorandom -number generator that essentially rehashes the key on every iteration. The -hash is not especially good (i.e., linear congruential), but since it is -applied repeatedly, additional hashing of the input key is not necessary. - -[1] http://arxiv.org/pdf/1406.2294v1.pdf -*/ -package jump diff --git a/vendor/github.com/lithammer/go-jump-consistent-hash/jump.go b/vendor/github.com/lithammer/go-jump-consistent-hash/jump.go deleted file mode 100644 index 3fe1a2972..000000000 --- a/vendor/github.com/lithammer/go-jump-consistent-hash/jump.go +++ /dev/null @@ -1,95 +0,0 @@ -package jump - -import ( - "hash" - "hash/crc32" - "hash/crc64" - "hash/fnv" - "io" -) - -// Hash takes a 64 bit key and the number of buckets. It outputs a bucket -// number in the range [0, buckets). -// If the number of buckets is less than or equal to 0 then one 1 is used. -func Hash(key uint64, buckets int32) int32 { - var b, j int64 - - if buckets <= 0 { - buckets = 1 - } - - for j < int64(buckets) { - b = j - key = key*2862933555777941757 + 1 - j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1))) - } - - return int32(b) -} - -// HashString takes string as key instead of an int and uses a KeyHasher to -// generate a key compatible with Hash(). -func HashString(key string, buckets int32, h KeyHasher) int32 { - h.Reset() - _, err := io.WriteString(h, key) - if err != nil { - panic(err) - } - return Hash(h.Sum64(), buckets) -} - -// KeyHasher is a subset of hash.Hash64 in the standard library. -type KeyHasher interface { - // Write (via the embedded io.Writer interface) adds more data to the - // running hash. - // It never returns an error. - io.Writer - - // Reset resets the KeyHasher to its initial state. - Reset() - - // Return the result of the added bytes (via io.Writer). - Sum64() uint64 -} - -// Hasher represents a jump consistent hasher using a string as key. -type Hasher struct { - n int32 - h KeyHasher -} - -// New returns a new instance of of Hasher. -func New(n int, h KeyHasher) *Hasher { - return &Hasher{int32(n), h} -} - -// N returns the number of buckets the hasher can assign to. -func (h *Hasher) N() int { - return int(h.n) -} - -// Hash returns the integer hash for the given key. -func (h *Hasher) Hash(key string) int { - return int(HashString(key, h.n, h.h)) -} - -// KeyHashers available in the standard library for use with HashString() and Hasher. -var ( - // CRC32 uses the 32-bit Cyclic Redundancy Check (CRC-32) with the IEEE - // polynomial. - NewCRC32 func() hash.Hash64 = func() hash.Hash64 { return &crc32Hasher{crc32.NewIEEE()} } - // CRC64 uses the 64-bit Cyclic Redundancy Check (CRC-64) with the ECMA - // polynomial. - NewCRC64 func() hash.Hash64 = func() hash.Hash64 { return crc64.New(crc64.MakeTable(crc64.ECMA)) } - // FNV1 uses the non-cryptographic hash function FNV-1. - NewFNV1 func() hash.Hash64 = func() hash.Hash64 { return fnv.New64() } - // FNV1a uses the non-cryptographic hash function FNV-1a. - NewFNV1a func() hash.Hash64 = func() hash.Hash64 { return fnv.New64a() } - - // These are deprecated because they're not safe for concurrent use. Please - // use the New* functions instead. - CRC32 hash.Hash64 = &crc32Hasher{crc32.NewIEEE()} - CRC64 hash.Hash64 = crc64.New(crc64.MakeTable(crc64.ECMA)) - FNV1 hash.Hash64 = fnv.New64() - FNV1a hash.Hash64 = fnv.New64a() -) diff --git a/vendor/modules.txt b/vendor/modules.txt index 059516948..c9e4cf0a8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -157,9 +157,6 @@ github.com/klauspost/compress/internal/snapref github.com/klauspost/compress/zlib github.com/klauspost/compress/zstd github.com/klauspost/compress/zstd/internal/xxhash -# github.com/lithammer/go-jump-consistent-hash v1.0.2 -## explicit; go 1.16 -github.com/lithammer/go-jump-consistent-hash # github.com/mattn/go-colorable v0.1.12 ## explicit; go 1.13 github.com/mattn/go-colorable