From 0f99c1afb1abf5954bb522d13f30712f55d94dd9 Mon Sep 17 00:00:00 2001 From: Artem Navoiev <tenmozes@gmail.com> Date: Sun, 20 Dec 2020 20:06:48 +0200 Subject: [PATCH 01/10] add linkedin to release announcement --- docs/Release-Guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/Release-Guide.md b/docs/Release-Guide.md index a3b33518c7..0b05382b1a 100644 --- a/docs/Release-Guide.md +++ b/docs/Release-Guide.md @@ -15,6 +15,7 @@ Release process guidance 1. Publish message in slack (victoriametrics.slack.com, general channel) 2. Post twit with release notes URL 3. Post in subreddit https://www.reddit.com/r/VictoriaMetrics/ +4. Post in linkedin ## Helm Charts From de89bcddaefde38b8e20e1ea1cfb91b61a5a218a Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Mon, 21 Dec 2020 08:53:37 +0200 Subject: [PATCH 02/10] vendor: upgrade github.com/klauspost/compress from v1.11.3 to v1.11.4 --- go.mod | 2 +- go.sum | 3 +- .../klauspost/compress/zstd/README.md | 25 +- .../klauspost/compress/zstd/blockdec.go | 2 +- .../klauspost/compress/zstd/blockenc.go | 1 + .../klauspost/compress/zstd/decodeheader.go | 202 ++++++++ .../klauspost/compress/zstd/enc_best.go | 484 ++++++++++++++++++ .../compress/zstd/encoder_options.go | 20 +- vendor/modules.txt | 2 +- 9 files changed, 724 insertions(+), 17 deletions(-) create mode 100644 vendor/github.com/klauspost/compress/zstd/decodeheader.go create mode 100644 vendor/github.com/klauspost/compress/zstd/enc_best.go diff --git a/go.mod b/go.mod index eefee133b1..b30654dc3d 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/aws/aws-sdk-go v1.36.12 github.com/cespare/xxhash/v2 v2.1.1 github.com/golang/snappy v0.0.2 - github.com/klauspost/compress v1.11.3 + github.com/klauspost/compress v1.11.4 github.com/valyala/fastjson v1.6.3 github.com/valyala/fastrand v1.0.0 github.com/valyala/fasttemplate v1.2.1 diff --git a/go.sum b/go.sum index 25d2a5d043..1b27cdbda9 100644 --- a/go.sum +++ b/go.sum @@ -149,8 +149,9 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.10.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.0/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= -github.com/klauspost/compress v1.11.3 h1:dB4Bn0tN3wdCzQxnS8r06kV74qN/TAfaIS0bVE8h3jc= github.com/klauspost/compress v1.11.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= +github.com/klauspost/compress v1.11.4 h1:kz40R/YWls3iqT9zX9AHN3WoVsrAWVyui5sxuLqiXqU= +github.com/klauspost/compress v1.11.4/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= diff --git a/vendor/github.com/klauspost/compress/zstd/README.md b/vendor/github.com/klauspost/compress/zstd/README.md index 08e553f753..7680bfe1dd 100644 --- a/vendor/github.com/klauspost/compress/zstd/README.md +++ b/vendor/github.com/klauspost/compress/zstd/README.md @@ -24,22 +24,21 @@ Godoc Documentation: https://godoc.org/github.com/klauspost/compress/zstd ### Status: STABLE - there may always be subtle bugs, a wide variety of content has been tested and the library is actively -used by several projects. This library is being continuously [fuzz-tested](https://github.com/klauspost/compress-fuzz), -kindly supplied by [fuzzit.dev](https://fuzzit.dev/). +used by several projects. This library is being [fuzz-tested](https://github.com/klauspost/compress-fuzz) for all updates. There may still be specific combinations of data types/size/settings that could lead to edge cases, so as always, testing is recommended. For now, a high speed (fastest) and medium-fast (default) compressor has been implemented. -The "Fastest" compression ratio is roughly equivalent to zstd level 1. -The "Default" compression ratio is roughly equivalent to zstd level 3 (default). +* The "Fastest" compression ratio is roughly equivalent to zstd level 1. +* The "Default" compression ratio is roughly equivalent to zstd level 3 (default). +* The "Better" compression ratio is roughly equivalent to zstd level 7. +* The "Best" compression ratio is roughly equivalent to zstd level 11. In terms of speed, it is typically 2x as fast as the stdlib deflate/gzip in its fastest mode. The compression ratio compared to stdlib is around level 3, but usually 3x as fast. -Compared to cgo zstd, the speed is around level 3 (default), but compression slightly worse, between level 1&2. - ### Usage @@ -140,7 +139,7 @@ I have collected some speed examples to compare speed and compression against ot * `file` is the input file. * `out` is the compressor used. `zskp` is this package. `zstd` is the Datadog cgo library. `gzstd/gzkp` is gzip standard and this library. -* `level` is the compression level used. For `zskp` level 1 is "fastest", level 2 is "default". +* `level` is the compression level used. For `zskp` level 1 is "fastest", level 2 is "default"; 3 is "better", 4 is "best". * `insize`/`outsize` is the input/output size. * `millis` is the number of milliseconds used for compression. * `mb/s` is megabytes (2^20 bytes) per second. @@ -154,11 +153,13 @@ file out level insize outsize millis mb/s silesia.tar zskp 1 211947520 73101992 643 313.87 silesia.tar zskp 2 211947520 67504318 969 208.38 silesia.tar zskp 3 211947520 65177448 1899 106.44 +silesia.tar zskp 4 211947520 61381950 8115 24.91 cgo zstd: silesia.tar zstd 1 211947520 73605392 543 371.56 silesia.tar zstd 3 211947520 66793289 864 233.68 silesia.tar zstd 6 211947520 62916450 1913 105.66 +silesia.tar zstd 9 211947520 60212393 5063 39.92 gzip, stdlib/this package: silesia.tar gzstd 1 211947520 80007735 1654 122.21 @@ -171,9 +172,11 @@ file out level insize outsize millis mb/s gob-stream zskp 1 1911399616 235022249 3088 590.30 gob-stream zskp 2 1911399616 205669791 3786 481.34 gob-stream zskp 3 1911399616 185792019 9324 195.48 +gob-stream zskp 4 1911399616 171537212 32113 56.76 gob-stream zstd 1 1911399616 249810424 2637 691.26 gob-stream zstd 3 1911399616 208192146 3490 522.31 gob-stream zstd 6 1911399616 193632038 6687 272.56 +gob-stream zstd 9 1911399616 177620386 16175 112.70 gob-stream gzstd 1 1911399616 357382641 10251 177.82 gob-stream gzkp 1 1911399616 362156523 5695 320.08 @@ -185,9 +188,11 @@ file out level insize outsize millis mb/s enwik9 zskp 1 1000000000 343848582 3609 264.18 enwik9 zskp 2 1000000000 317276632 5746 165.97 enwik9 zskp 3 1000000000 294540704 11725 81.34 +enwik9 zskp 4 1000000000 276609671 44029 21.66 enwik9 zstd 1 1000000000 358072021 3110 306.65 enwik9 zstd 3 1000000000 313734672 4784 199.35 enwik9 zstd 6 1000000000 295138875 10290 92.68 +enwik9 zstd 9 1000000000 278348700 28549 33.40 enwik9 gzstd 1 1000000000 382578136 9604 99.30 enwik9 gzkp 1 1000000000 383825945 6544 145.73 @@ -198,9 +203,11 @@ file out level insize outsize millis mb/s github-june-2days-2019.json zskp 1 6273951764 699045015 10620 563.40 github-june-2days-2019.json zskp 2 6273951764 617881763 11687 511.96 github-june-2days-2019.json zskp 3 6273951764 537511906 29252 204.54 +github-june-2days-2019.json zskp 4 6273951764 512796117 97791 61.18 github-june-2days-2019.json zstd 1 6273951764 766284037 8450 708.00 github-june-2days-2019.json zstd 3 6273951764 661889476 10927 547.57 github-june-2days-2019.json zstd 6 6273951764 642756859 22996 260.18 +github-june-2days-2019.json zstd 9 6273951764 601974523 52413 114.16 github-june-2days-2019.json gzstd 1 6273951764 1164400847 29948 199.79 github-june-2days-2019.json gzkp 1 6273951764 1128755542 19236 311.03 @@ -211,9 +218,11 @@ file out level insize outsize millis mb/s rawstudio-mint14.tar zskp 1 8558382592 3667489370 20210 403.84 rawstudio-mint14.tar zskp 2 8558382592 3364592300 31873 256.07 rawstudio-mint14.tar zskp 3 8558382592 3224594213 71751 113.75 +rawstudio-mint14.tar zskp 4 8558382592 3027332295 486243 16.79 rawstudio-mint14.tar zstd 1 8558382592 3609250104 17136 476.27 rawstudio-mint14.tar zstd 3 8558382592 3341679997 29262 278.92 rawstudio-mint14.tar zstd 6 8558382592 3235846406 77904 104.77 +rawstudio-mint14.tar zstd 9 8558382592 3160778861 140946 57.91 rawstudio-mint14.tar gzstd 1 8558382592 3926257486 57722 141.40 rawstudio-mint14.tar gzkp 1 8558382592 3970463184 41749 195.49 @@ -224,9 +233,11 @@ file out level insize outsize millis mb/s nyc-taxi-data-10M.csv zskp 1 3325605752 641339945 8925 355.35 nyc-taxi-data-10M.csv zskp 2 3325605752 591748091 11268 281.44 nyc-taxi-data-10M.csv zskp 3 3325605752 538490114 19880 159.53 +nyc-taxi-data-10M.csv zskp 4 3325605752 495986829 89368 35.49 nyc-taxi-data-10M.csv zstd 1 3325605752 687399637 8233 385.18 nyc-taxi-data-10M.csv zstd 3 3325605752 598514411 10065 315.07 nyc-taxi-data-10M.csv zstd 6 3325605752 570522953 20038 158.27 +nyc-taxi-data-10M.csv zstd 9 3325605752 517554797 64565 49.12 nyc-taxi-data-10M.csv gzstd 1 3325605752 928656485 23876 132.83 nyc-taxi-data-10M.csv gzkp 1 3325605752 924718719 16388 193.53 ``` diff --git a/vendor/github.com/klauspost/compress/zstd/blockdec.go b/vendor/github.com/klauspost/compress/zstd/blockdec.go index 4733ea876a..b51d922bda 100644 --- a/vendor/github.com/klauspost/compress/zstd/blockdec.go +++ b/vendor/github.com/klauspost/compress/zstd/blockdec.go @@ -613,7 +613,7 @@ func (b *blockDec) decodeCompressed(hist *history) error { // Decode treeless literal block. if litType == literalsBlockTreeless { // TODO: We could send the history early WITHOUT the stream history. - // This would allow decoding treeless literials before the byte history is available. + // This would allow decoding treeless literals before the byte history is available. // Silencia stats: Treeless 4393, with: 32775, total: 37168, 11% treeless. // So not much obvious gain here. diff --git a/vendor/github.com/klauspost/compress/zstd/blockenc.go b/vendor/github.com/klauspost/compress/zstd/blockenc.go index 083fbb502f..c85c40255d 100644 --- a/vendor/github.com/klauspost/compress/zstd/blockenc.go +++ b/vendor/github.com/klauspost/compress/zstd/blockenc.go @@ -76,6 +76,7 @@ func (b *blockEnc) reset(prev *blockEnc) { if prev != nil { b.recentOffsets = prev.prevRecentOffsets } + b.dictLitEnc = nil } // reset will reset the block for a new encode, but in the same stream, diff --git a/vendor/github.com/klauspost/compress/zstd/decodeheader.go b/vendor/github.com/klauspost/compress/zstd/decodeheader.go new file mode 100644 index 0000000000..87896c5eaa --- /dev/null +++ b/vendor/github.com/klauspost/compress/zstd/decodeheader.go @@ -0,0 +1,202 @@ +// Copyright 2020+ Klaus Post. All rights reserved. +// License information can be found in the LICENSE file. + +package zstd + +import ( + "bytes" + "errors" + "io" +) + +// HeaderMaxSize is the maximum size of a Frame and Block Header. +// If less is sent to Header.Decode it *may* still contain enough information. +const HeaderMaxSize = 14 + 3 + +// Header contains information about the first frame and block within that. +type Header struct { + // Window Size the window of data to keep while decoding. + // Will only be set if HasFCS is false. + WindowSize uint64 + + // Frame content size. + // Expected size of the entire frame. + FrameContentSize uint64 + + // Dictionary ID. + // If 0, no dictionary. + DictionaryID uint32 + + // First block information. + FirstBlock struct { + // OK will be set if first block could be decoded. + OK bool + + // Is this the last block of a frame? + Last bool + + // Is the data compressed? + // If true CompressedSize will be populated. + // Unfortunately DecompressedSize cannot be determined + // without decoding the blocks. + Compressed bool + + // DecompressedSize is the expected decompressed size of the block. + // Will be 0 if it cannot be determined. + DecompressedSize int + + // CompressedSize of the data in the block. + // Does not include the block header. + // Will be equal to DecompressedSize if not Compressed. + CompressedSize int + } + + // Skippable will be true if the frame is meant to be skipped. + // No other information will be populated. + Skippable bool + + // If set there is a checksum present for the block content. + HasCheckSum bool + + // If this is true FrameContentSize will have a valid value + HasFCS bool + + SingleSegment bool +} + +// Decode the header from the beginning of the stream. +// This will decode the frame header and the first block header if enough bytes are provided. +// It is recommended to provide at least HeaderMaxSize bytes. +// If the frame header cannot be read an error will be returned. +// If there isn't enough input, io.ErrUnexpectedEOF is returned. +// The FirstBlock.OK will indicate if enough information was available to decode the first block header. +func (h *Header) Decode(in []byte) error { + if len(in) < 4 { + return io.ErrUnexpectedEOF + } + b, in := in[:4], in[4:] + if !bytes.Equal(b, frameMagic) { + if !bytes.Equal(b[1:4], skippableFrameMagic) || b[0]&0xf0 != 0x50 { + return ErrMagicMismatch + } + *h = Header{Skippable: true} + return nil + } + if len(in) < 1 { + return io.ErrUnexpectedEOF + } + + // Clear output + *h = Header{} + fhd, in := in[0], in[1:] + h.SingleSegment = fhd&(1<<5) != 0 + h.HasCheckSum = fhd&(1<<2) != 0 + + if fhd&(1<<3) != 0 { + return errors.New("Reserved bit set on frame header") + } + + // Read Window_Descriptor + // https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#window_descriptor + if !h.SingleSegment { + if len(in) < 1 { + return io.ErrUnexpectedEOF + } + var wd byte + wd, in = in[0], in[1:] + windowLog := 10 + (wd >> 3) + windowBase := uint64(1) << windowLog + windowAdd := (windowBase / 8) * uint64(wd&0x7) + h.WindowSize = windowBase + windowAdd + } + + // Read Dictionary_ID + // https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#dictionary_id + if size := fhd & 3; size != 0 { + if size == 3 { + size = 4 + } + if len(in) < int(size) { + return io.ErrUnexpectedEOF + } + b, in = in[:size], in[size:] + if b == nil { + return io.ErrUnexpectedEOF + } + switch size { + case 1: + h.DictionaryID = uint32(b[0]) + case 2: + h.DictionaryID = uint32(b[0]) | (uint32(b[1]) << 8) + case 4: + h.DictionaryID = uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24) + } + } + + // Read Frame_Content_Size + // https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#frame_content_size + var fcsSize int + v := fhd >> 6 + switch v { + case 0: + if h.SingleSegment { + fcsSize = 1 + } + default: + fcsSize = 1 << v + } + + if fcsSize > 0 { + h.HasFCS = true + if len(in) < fcsSize { + return io.ErrUnexpectedEOF + } + b, in = in[:fcsSize], in[fcsSize:] + if b == nil { + return io.ErrUnexpectedEOF + } + switch fcsSize { + case 1: + h.FrameContentSize = uint64(b[0]) + case 2: + // When FCS_Field_Size is 2, the offset of 256 is added. + h.FrameContentSize = uint64(b[0]) | (uint64(b[1]) << 8) + 256 + case 4: + h.FrameContentSize = uint64(b[0]) | (uint64(b[1]) << 8) | (uint64(b[2]) << 16) | (uint64(b[3]) << 24) + case 8: + d1 := uint32(b[0]) | (uint32(b[1]) << 8) | (uint32(b[2]) << 16) | (uint32(b[3]) << 24) + d2 := uint32(b[4]) | (uint32(b[5]) << 8) | (uint32(b[6]) << 16) | (uint32(b[7]) << 24) + h.FrameContentSize = uint64(d1) | (uint64(d2) << 32) + } + } + + // Frame Header done, we will not fail from now on. + if len(in) < 3 { + return nil + } + tmp, in := in[:3], in[3:] + bh := uint32(tmp[0]) | (uint32(tmp[1]) << 8) | (uint32(tmp[2]) << 16) + h.FirstBlock.Last = bh&1 != 0 + blockType := blockType((bh >> 1) & 3) + // find size. + cSize := int(bh >> 3) + switch blockType { + case blockTypeReserved: + return nil + case blockTypeRLE: + h.FirstBlock.Compressed = true + h.FirstBlock.DecompressedSize = cSize + h.FirstBlock.CompressedSize = 1 + case blockTypeCompressed: + h.FirstBlock.Compressed = true + h.FirstBlock.CompressedSize = cSize + case blockTypeRaw: + h.FirstBlock.DecompressedSize = cSize + h.FirstBlock.CompressedSize = cSize + default: + panic("Invalid block type") + } + + h.FirstBlock.OK = true + return nil +} diff --git a/vendor/github.com/klauspost/compress/zstd/enc_best.go b/vendor/github.com/klauspost/compress/zstd/enc_best.go new file mode 100644 index 0000000000..c4baa42c64 --- /dev/null +++ b/vendor/github.com/klauspost/compress/zstd/enc_best.go @@ -0,0 +1,484 @@ +// Copyright 2019+ Klaus Post. All rights reserved. +// License information can be found in the LICENSE file. +// Based on work by Yann Collet, released under BSD License. + +package zstd + +import ( + "fmt" + "math/bits" +) + +const ( + bestLongTableBits = 20 // Bits used in the long match table + bestLongTableSize = 1 << bestLongTableBits // Size of the table + + // Note: Increasing the short table bits or making the hash shorter + // can actually lead to compression degradation since it will 'steal' more from the + // long match table and match offsets are quite big. + // This greatly depends on the type of input. + bestShortTableBits = 16 // Bits used in the short match table + bestShortTableSize = 1 << bestShortTableBits // Size of the table +) + +// bestFastEncoder uses 2 tables, one for short matches (5 bytes) and one for long matches. +// The long match table contains the previous entry with the same hash, +// effectively making it a "chain" of length 2. +// When we find a long match we choose between the two values and select the longest. +// When we find a short match, after checking the long, we check if we can find a long at n+1 +// and that it is longer (lazy matching). +type bestFastEncoder struct { + fastBase + table [bestShortTableSize]prevEntry + longTable [bestLongTableSize]prevEntry + dictTable []prevEntry + dictLongTable []prevEntry +} + +// Encode improves compression... +func (e *bestFastEncoder) Encode(blk *blockEnc, src []byte) { + const ( + // Input margin is the number of bytes we read (8) + // and the maximum we will read ahead (2) + inputMargin = 8 + 4 + minNonLiteralBlockSize = 16 + ) + + // Protect against e.cur wraparound. + for e.cur >= bufferReset { + if len(e.hist) == 0 { + for i := range e.table[:] { + e.table[i] = prevEntry{} + } + for i := range e.longTable[:] { + e.longTable[i] = prevEntry{} + } + e.cur = e.maxMatchOff + break + } + // Shift down everything in the table that isn't already too far away. + minOff := e.cur + int32(len(e.hist)) - e.maxMatchOff + for i := range e.table[:] { + v := e.table[i].offset + v2 := e.table[i].prev + if v < minOff { + v = 0 + v2 = 0 + } else { + v = v - e.cur + e.maxMatchOff + if v2 < minOff { + v2 = 0 + } else { + v2 = v2 - e.cur + e.maxMatchOff + } + } + e.table[i] = prevEntry{ + offset: v, + prev: v2, + } + } + for i := range e.longTable[:] { + v := e.longTable[i].offset + v2 := e.longTable[i].prev + if v < minOff { + v = 0 + v2 = 0 + } else { + v = v - e.cur + e.maxMatchOff + if v2 < minOff { + v2 = 0 + } else { + v2 = v2 - e.cur + e.maxMatchOff + } + } + e.longTable[i] = prevEntry{ + offset: v, + prev: v2, + } + } + e.cur = e.maxMatchOff + break + } + + s := e.addBlock(src) + blk.size = len(src) + if len(src) < minNonLiteralBlockSize { + blk.extraLits = len(src) + blk.literals = blk.literals[:len(src)] + copy(blk.literals, src) + return + } + + // Override src + src = e.hist + sLimit := int32(len(src)) - inputMargin + const kSearchStrength = 12 + + // nextEmit is where in src the next emitLiteral should start from. + nextEmit := s + cv := load6432(src, s) + + // Relative offsets + offset1 := int32(blk.recentOffsets[0]) + offset2 := int32(blk.recentOffsets[1]) + offset3 := int32(blk.recentOffsets[2]) + + addLiterals := func(s *seq, until int32) { + if until == nextEmit { + return + } + blk.literals = append(blk.literals, src[nextEmit:until]...) + s.litLen = uint32(until - nextEmit) + } + _ = addLiterals + + if debug { + println("recent offsets:", blk.recentOffsets) + } + +encodeLoop: + for { + // We allow the encoder to optionally turn off repeat offsets across blocks + canRepeat := len(blk.sequences) > 2 + + if debugAsserts && canRepeat && offset1 == 0 { + panic("offset0 was 0") + } + + type match struct { + offset int32 + s int32 + length int32 + rep int32 + } + matchAt := func(offset int32, s int32, first uint32, rep int32) match { + if s-offset >= e.maxMatchOff || load3232(src, offset) != first { + return match{offset: offset, s: s} + } + return match{offset: offset, s: s, length: 4 + e.matchlen(s+4, offset+4, src), rep: rep} + } + + bestOf := func(a, b match) match { + aScore := b.s - a.s + a.length + bScore := a.s - b.s + b.length + if a.rep < 0 { + aScore = aScore - int32(bits.Len32(uint32(a.offset)))/8 + } + if b.rep < 0 { + bScore = bScore - int32(bits.Len32(uint32(b.offset)))/8 + } + if aScore >= bScore { + return a + } + return b + } + const goodEnough = 100 + + nextHashL := hash8(cv, bestLongTableBits) + nextHashS := hash4x64(cv, bestShortTableBits) + candidateL := e.longTable[nextHashL] + candidateS := e.table[nextHashS] + + best := bestOf(matchAt(candidateL.offset-e.cur, s, uint32(cv), -1), matchAt(candidateL.prev-e.cur, s, uint32(cv), -1)) + best = bestOf(best, matchAt(candidateS.offset-e.cur, s, uint32(cv), -1)) + best = bestOf(best, matchAt(candidateS.prev-e.cur, s, uint32(cv), -1)) + if canRepeat && best.length < goodEnough { + best = bestOf(best, matchAt(s-offset1+1, s+1, uint32(cv>>8), 1)) + best = bestOf(best, matchAt(s-offset2+1, s+1, uint32(cv>>8), 2)) + best = bestOf(best, matchAt(s-offset3+1, s+1, uint32(cv>>8), 3)) + best = bestOf(best, matchAt(s-offset1+3, s+3, uint32(cv>>24), 1)) + best = bestOf(best, matchAt(s-offset2+3, s+3, uint32(cv>>24), 2)) + best = bestOf(best, matchAt(s-offset3+3, s+3, uint32(cv>>24), 3)) + } + // Load next and check... + e.longTable[nextHashL] = prevEntry{offset: s + e.cur, prev: candidateL.offset} + e.table[nextHashS] = prevEntry{offset: s + e.cur, prev: candidateS.offset} + + // Look far ahead, unless we have a really long match already... + if best.length < goodEnough { + // No match found, move forward on input, no need to check forward... + if best.length < 4 { + s += 1 + (s-nextEmit)>>(kSearchStrength-1) + if s >= sLimit { + break encodeLoop + } + cv = load6432(src, s) + continue + } + + s++ + candidateS = e.table[hash4x64(cv>>8, bestShortTableBits)] + cv = load6432(src, s) + cv2 := load6432(src, s+1) + candidateL = e.longTable[hash8(cv, bestLongTableBits)] + candidateL2 := e.longTable[hash8(cv2, bestLongTableBits)] + + best = bestOf(best, matchAt(candidateS.offset-e.cur, s, uint32(cv), -1)) + best = bestOf(best, matchAt(candidateL.offset-e.cur, s, uint32(cv), -1)) + best = bestOf(best, matchAt(candidateL.prev-e.cur, s, uint32(cv), -1)) + best = bestOf(best, matchAt(candidateL2.offset-e.cur, s+1, uint32(cv2), -1)) + best = bestOf(best, matchAt(candidateL2.prev-e.cur, s+1, uint32(cv2), -1)) + } + + // We have a match, we can store the forward value + if best.rep > 0 { + s = best.s + var seq seq + seq.matchLen = uint32(best.length - zstdMinMatch) + + // We might be able to match backwards. + // Extend as long as we can. + start := best.s + // We end the search early, so we don't risk 0 literals + // and have to do special offset treatment. + startLimit := nextEmit + 1 + + tMin := s - e.maxMatchOff + if tMin < 0 { + tMin = 0 + } + repIndex := best.offset + for repIndex > tMin && start > startLimit && src[repIndex-1] == src[start-1] && seq.matchLen < maxMatchLength-zstdMinMatch-1 { + repIndex-- + start-- + seq.matchLen++ + } + addLiterals(&seq, start) + + // rep 0 + seq.offset = uint32(best.rep) + if debugSequences { + println("repeat sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + + // Index match start+1 (long) -> s - 1 + index0 := s + s = best.s + best.length + + nextEmit = s + if s >= sLimit { + if debug { + println("repeat ended", s, best.length) + + } + break encodeLoop + } + // Index skipped... + off := index0 + e.cur + for index0 < s-1 { + cv0 := load6432(src, index0) + h0 := hash8(cv0, bestLongTableBits) + h1 := hash4x64(cv0, bestShortTableBits) + e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset} + e.table[h1] = prevEntry{offset: off, prev: e.table[h1].offset} + off++ + index0++ + } + switch best.rep { + case 2: + offset1, offset2 = offset2, offset1 + case 3: + offset1, offset2, offset3 = offset3, offset1, offset2 + } + cv = load6432(src, s) + continue + } + + // A 4-byte match has been found. Update recent offsets. + // We'll later see if more than 4 bytes. + s = best.s + t := best.offset + offset1, offset2, offset3 = s-t, offset1, offset2 + + if debugAsserts && s <= t { + panic(fmt.Sprintf("s (%d) <= t (%d)", s, t)) + } + + if debugAsserts && canRepeat && int(offset1) > len(src) { + panic("invalid offset") + } + + // Extend the n-byte match as long as possible. + l := best.length + + // Extend backwards + tMin := s - e.maxMatchOff + if tMin < 0 { + tMin = 0 + } + for t > tMin && s > nextEmit && src[t-1] == src[s-1] && l < maxMatchLength { + s-- + t-- + l++ + } + + // Write our sequence + var seq seq + seq.litLen = uint32(s - nextEmit) + seq.matchLen = uint32(l - zstdMinMatch) + if seq.litLen > 0 { + blk.literals = append(blk.literals, src[nextEmit:s]...) + } + seq.offset = uint32(s-t) + 3 + s += l + if debugSequences { + println("sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + nextEmit = s + if s >= sLimit { + break encodeLoop + } + + // Index match start+1 (long) -> s - 1 + index0 := s - l + 1 + // every entry + for index0 < s-1 { + cv0 := load6432(src, index0) + h0 := hash8(cv0, bestLongTableBits) + h1 := hash4x64(cv0, bestShortTableBits) + off := index0 + e.cur + e.longTable[h0] = prevEntry{offset: off, prev: e.longTable[h0].offset} + e.table[h1] = prevEntry{offset: off, prev: e.table[h1].offset} + index0++ + } + + cv = load6432(src, s) + if !canRepeat { + continue + } + + // Check offset 2 + for { + o2 := s - offset2 + if load3232(src, o2) != uint32(cv) { + // Do regular search + break + } + + // Store this, since we have it. + nextHashS := hash4x64(cv, bestShortTableBits) + nextHashL := hash8(cv, bestLongTableBits) + + // We have at least 4 byte match. + // No need to check backwards. We come straight from a match + l := 4 + e.matchlen(s+4, o2+4, src) + + e.longTable[nextHashL] = prevEntry{offset: s + e.cur, prev: e.longTable[nextHashL].offset} + e.table[nextHashS] = prevEntry{offset: s + e.cur, prev: e.table[nextHashS].offset} + seq.matchLen = uint32(l) - zstdMinMatch + seq.litLen = 0 + + // Since litlen is always 0, this is offset 1. + seq.offset = 1 + s += l + nextEmit = s + if debugSequences { + println("sequence", seq, "next s:", s) + } + blk.sequences = append(blk.sequences, seq) + + // Swap offset 1 and 2. + offset1, offset2 = offset2, offset1 + if s >= sLimit { + // Finished + break encodeLoop + } + cv = load6432(src, s) + } + } + + if int(nextEmit) < len(src) { + blk.literals = append(blk.literals, src[nextEmit:]...) + blk.extraLits = len(src) - int(nextEmit) + } + blk.recentOffsets[0] = uint32(offset1) + blk.recentOffsets[1] = uint32(offset2) + blk.recentOffsets[2] = uint32(offset3) + if debug { + println("returning, recent offsets:", blk.recentOffsets, "extra literals:", blk.extraLits) + } +} + +// EncodeNoHist will encode a block with no history and no following blocks. +// Most notable difference is that src will not be copied for history and +// we do not need to check for max match length. +func (e *bestFastEncoder) EncodeNoHist(blk *blockEnc, src []byte) { + e.Encode(blk, src) +} + +// ResetDict will reset and set a dictionary if not nil +func (e *bestFastEncoder) Reset(d *dict, singleBlock bool) { + e.resetBase(d, singleBlock) + if d == nil { + return + } + // Init or copy dict table + if len(e.dictTable) != len(e.table) || d.id != e.lastDictID { + if len(e.dictTable) != len(e.table) { + e.dictTable = make([]prevEntry, len(e.table)) + } + end := int32(len(d.content)) - 8 + e.maxMatchOff + for i := e.maxMatchOff; i < end; i += 4 { + const hashLog = bestShortTableBits + + cv := load6432(d.content, i-e.maxMatchOff) + nextHash := hash4x64(cv, hashLog) // 0 -> 4 + nextHash1 := hash4x64(cv>>8, hashLog) // 1 -> 5 + nextHash2 := hash4x64(cv>>16, hashLog) // 2 -> 6 + nextHash3 := hash4x64(cv>>24, hashLog) // 3 -> 7 + e.dictTable[nextHash] = prevEntry{ + prev: e.dictTable[nextHash].offset, + offset: i, + } + e.dictTable[nextHash1] = prevEntry{ + prev: e.dictTable[nextHash1].offset, + offset: i + 1, + } + e.dictTable[nextHash2] = prevEntry{ + prev: e.dictTable[nextHash2].offset, + offset: i + 2, + } + e.dictTable[nextHash3] = prevEntry{ + prev: e.dictTable[nextHash3].offset, + offset: i + 3, + } + } + e.lastDictID = d.id + } + + // Init or copy dict table + if len(e.dictLongTable) != len(e.longTable) || d.id != e.lastDictID { + if len(e.dictLongTable) != len(e.longTable) { + e.dictLongTable = make([]prevEntry, len(e.longTable)) + } + if len(d.content) >= 8 { + cv := load6432(d.content, 0) + h := hash8(cv, bestLongTableBits) + e.dictLongTable[h] = prevEntry{ + offset: e.maxMatchOff, + prev: e.dictLongTable[h].offset, + } + + end := int32(len(d.content)) - 8 + e.maxMatchOff + off := 8 // First to read + for i := e.maxMatchOff + 1; i < end; i++ { + cv = cv>>8 | (uint64(d.content[off]) << 56) + h := hash8(cv, bestLongTableBits) + e.dictLongTable[h] = prevEntry{ + offset: i, + prev: e.dictLongTable[h].offset, + } + off++ + } + } + e.lastDictID = d.id + } + // Reset table to initial state + copy(e.longTable[:], e.dictLongTable) + + e.cur = e.maxMatchOff + // Reset table to initial state + copy(e.table[:], e.dictTable) +} diff --git a/vendor/github.com/klauspost/compress/zstd/encoder_options.go b/vendor/github.com/klauspost/compress/zstd/encoder_options.go index 1209915bcc..a7312f42af 100644 --- a/vendor/github.com/klauspost/compress/zstd/encoder_options.go +++ b/vendor/github.com/klauspost/compress/zstd/encoder_options.go @@ -47,6 +47,8 @@ func (o encoderOptions) encoder() encoder { return &doubleFastEncoder{fastEncoder: fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}}} case SpeedBetterCompression: return &betterFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} + case SpeedBestCompression: + return &bestFastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} case SpeedFastest: return &fastEncoder{fastBase: fastBase{maxMatchOff: int32(o.windowSize)}} } @@ -143,20 +145,20 @@ const ( // By using this, notice that CPU usage may go up in the future. SpeedBetterCompression + // SpeedBestCompression will choose the best available compression option. + // This will offer the best compression no matter the CPU cost. + SpeedBestCompression + // speedLast should be kept as the last actual compression option. // The is not for external usage, but is used to keep track of the valid options. speedLast - - // SpeedBestCompression will choose the best available compression option. - // For now this is not implemented. - SpeedBestCompression = SpeedBetterCompression ) // EncoderLevelFromString will convert a string representation of an encoding level back // to a compression level. The compare is not case sensitive. // If the string wasn't recognized, (false, SpeedDefault) will be returned. func EncoderLevelFromString(s string) (bool, EncoderLevel) { - for l := EncoderLevel(speedNotSet + 1); l < speedLast; l++ { + for l := speedNotSet + 1; l < speedLast; l++ { if strings.EqualFold(s, l.String()) { return true, l } @@ -173,7 +175,9 @@ func EncoderLevelFromZstd(level int) EncoderLevel { return SpeedFastest case level >= 3 && level < 6: return SpeedDefault - case level > 5: + case level >= 6 && level < 10: + return SpeedBetterCompression + case level >= 10: return SpeedBetterCompression } return SpeedDefault @@ -188,6 +192,8 @@ func (e EncoderLevel) String() string { return "default" case SpeedBetterCompression: return "better" + case SpeedBestCompression: + return "best" default: return "invalid" } @@ -209,6 +215,8 @@ func WithEncoderLevel(l EncoderLevel) EOption { o.windowSize = 8 << 20 case SpeedBetterCompression: o.windowSize = 16 << 20 + case SpeedBestCompression: + o.windowSize = 32 << 20 } } if !o.customALEntropy { diff --git a/vendor/modules.txt b/vendor/modules.txt index 3fc0378303..eba5b9de73 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -88,7 +88,7 @@ github.com/jmespath/go-jmespath github.com/jstemmer/go-junit-report github.com/jstemmer/go-junit-report/formatter github.com/jstemmer/go-junit-report/parser -# github.com/klauspost/compress v1.11.3 +# github.com/klauspost/compress v1.11.4 github.com/klauspost/compress/flate github.com/klauspost/compress/fse github.com/klauspost/compress/gzip From 46dba00756ece9527eb2df6cc7ab227350e63839 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Tue, 22 Dec 2020 19:48:27 +0200 Subject: [PATCH 03/10] lib/storage: remove stale parts as soon as they go outside the configured retention Previously such parts could remain undeleted for long durations until they are merged with other parts. This should help for `-retentionPeriod` values smaller than one month. --- docs/CHANGELOG.md | 2 ++ lib/storage/partition.go | 67 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index f3d339a83f..bb4677cc34 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,8 @@ # tip +* FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. + # [v1.50.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.50.2) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index 4369ec1169..c049c0812c 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -167,6 +167,7 @@ type partition struct { bigPartsMergerWG sync.WaitGroup rawRowsFlusherWG sync.WaitGroup inmemoryPartsFlusherWG sync.WaitGroup + stalePartsRemoverWG sync.WaitGroup } // partWrapper is a wrapper for the part. @@ -278,6 +279,7 @@ func openPartition(smallPartsPath, bigPartsPath string, getDeletedMetricIDs func pt.startMergeWorkers() pt.startRawRowsFlusher() pt.startInmemoryPartsFlusher() + pt.startStalePartsRemover() return pt, nil } @@ -641,8 +643,13 @@ func (pt *partition) PutParts(pws []*partWrapper) { func (pt *partition) MustClose() { close(pt.stopCh) - logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath) + logger.Infof("waiting for stale parts remover to stop on %q...", pt.smallPartsPath) startTime := time.Now() + pt.stalePartsRemoverWG.Wait() + logger.Infof("stale parts remover stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) + + logger.Infof("waiting for inmemory parts flusher to stop on %q...", pt.smallPartsPath) + startTime = time.Now() pt.inmemoryPartsFlusherWG.Wait() logger.Infof("inmemory parts flusher stopped in %.3f seconds on %q", time.Since(startTime).Seconds(), pt.smallPartsPath) @@ -1289,6 +1296,64 @@ func removeParts(pws []*partWrapper, partsToRemove map[*partWrapper]bool, isBig return dst, removedParts } +func (pt *partition) startStalePartsRemover() { + pt.stalePartsRemoverWG.Add(1) + go func() { + pt.stalePartsRemover() + pt.stalePartsRemoverWG.Done() + }() +} + +func (pt *partition) stalePartsRemover() { + ticker := time.NewTicker(7 * time.Minute) + defer ticker.Stop() + for { + select { + case <-pt.stopCh: + return + case <-ticker.C: + pt.removeStaleParts() + } + } +} + +func (pt *partition) removeStaleParts() { + m := make(map[*partWrapper]bool) + startTime := time.Now() + retentionDeadline := timestampFromTime(startTime) - pt.retentionMsecs + + pt.partsLock.Lock() + for _, pw := range pt.bigParts { + if pw.p.ph.MaxTimestamp < retentionDeadline { + atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount) + m[pw] = true + } + } + for _, pw := range pt.smallParts { + if pw.p.ph.MaxTimestamp < retentionDeadline { + atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount) + m[pw] = true + } + } + removedSmallParts := 0 + removedBigParts := 0 + if len(m) > 0 { + pt.smallParts, removedSmallParts = removeParts(pt.smallParts, m, false) + pt.bigParts, removedBigParts = removeParts(pt.bigParts, m, true) + } + pt.partsLock.Unlock() + + if removedSmallParts+removedBigParts != len(m) { + logger.Panicf("BUG: unexpected number of stale parts removed; got %d, want %d", removedSmallParts+removedBigParts, len(m)) + } + + // Remove partition references from removed parts, so they are eventually deleted when nobody reads from them. + for pw := range m { + logger.Infof("removing part %q, since its data is out of the configured retention (%d secs)", pw.p.path, retentionDeadline/1000) + pw.decRef() + } +} + // getPartsToMerge returns optimal parts to merge from pws. // // The returned parts will contain less than maxRows rows. From c270f8f3e6eaea23c837dc37dfc3c0a17e94bb34 Mon Sep 17 00:00:00 2001 From: Nikolay <nik@victoriametrics.com> Date: Tue, 22 Dec 2020 23:23:04 +0300 Subject: [PATCH 04/10] changes vmalert notifier flag, (#978) fixes issue with notifier insecure setting, now its possible to use multiple notifier.tlsInsecureSkipVerify multiple time. --- app/vmalert/notifier/init.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/app/vmalert/notifier/init.go b/app/vmalert/notifier/init.go index 35aaae5ab6..696e398e53 100644 --- a/app/vmalert/notifier/init.go +++ b/app/vmalert/notifier/init.go @@ -1,7 +1,6 @@ package notifier import ( - "flag" "fmt" "net/http" @@ -14,7 +13,7 @@ var ( basicAuthUsername = flagutil.NewArray("notifier.basicAuth.username", "Optional basic auth username for -datasource.url") basicAuthPassword = flagutil.NewArray("notifier.basicAuth.password", "Optional basic auth password for -datasource.url") - tlsInsecureSkipVerify = flag.Bool("notifier.tlsInsecureSkipVerify", false, "Whether to skip tls verification when connecting to -notifier.url") + tlsInsecureSkipVerify = flagutil.NewArrayBool("notifier.tlsInsecureSkipVerify", "Whether to skip tls verification when connecting to -notifier.url") tlsCertFile = flagutil.NewArray("notifier.tlsCertFile", "Optional path to client-side TLS certificate file to use when connecting to -notifier.url") tlsKeyFile = flagutil.NewArray("notifier.tlsKeyFile", "Optional path to client-side TLS certificate key to use when connecting to -notifier.url") tlsCAFile = flagutil.NewArray("notifier.tlsCAFile", "Optional path to TLS CA file to use for verifying connections to -notifier.url. "+ @@ -33,7 +32,7 @@ func Init(gen AlertURLGenerator) ([]Notifier, error) { for i, addr := range *addrs { cert, key := tlsCertFile.GetOptionalArg(i), tlsKeyFile.GetOptionalArg(i) ca, serverName := tlsCAFile.GetOptionalArg(i), tlsServerName.GetOptionalArg(i) - tr, err := utils.Transport(addr, cert, key, ca, serverName, *tlsInsecureSkipVerify) + tr, err := utils.Transport(addr, cert, key, ca, serverName, tlsInsecureSkipVerify.GetOptionalArg(i)) if err != nil { return nil, fmt.Errorf("failed to create transport: %w", err) } From 9df60518bbf928727da549c824a662e22d29fd63 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Tue, 22 Dec 2020 22:32:10 +0200 Subject: [PATCH 05/10] docs: mention that it is possible to set multiple `-notifier.tlsInsecureSkipVerify` command-line flags for vmalert See c3a92968343c2b3619f1ab935702d0e9b3a46733 --- app/vmalert/README.md | 15 +++++++++++---- docs/CHANGELOG.md | 1 + docs/vmalert.md | 15 +++++++++++---- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/app/vmalert/README.md b/app/vmalert/README.md index 773873039f..7c72894d5e 100644 --- a/app/vmalert/README.md +++ b/app/vmalert/README.md @@ -175,9 +175,9 @@ The shortlist of configuration flags is the following: -datasource.basicAuth.username string Optional basic auth username for -datasource.url -datasource.lookback duration - Lookback defines how far to look into past when evaluating queries. For example, if datasource.lookback=5m then param "time" with value now()-5m will be added to every query. + Lookback defines how far to look into past when evaluating queries. For example, if datasource.lookback=5m then param "time" with value now()-5m will be added to every query. -datasource.maxIdleConnections int - Defines the number of idle (keep-alive connections) to configured datasource.Consider to set this value equal to the value: groups_total * group.concurrency. Too low value may result into high number of sockets in TIME_WAIT state. (default 100) + Defines the number of idle (keep-alive connections) to configured datasource.Consider to set this value equal to the value: groups_total * group.concurrency. Too low value may result into high number of sockets in TIME_WAIT state. (default 100) -datasource.tlsCAFile string Optional path to TLS CA file to use for verifying connections to -datasource.url. By default system CA is used -datasource.tlsCertFile string @@ -190,6 +190,8 @@ The shortlist of configuration flags is the following: Optional TLS server name to use for connections to -datasource.url. By default the server name from -datasource.url is used -datasource.url string Victoria Metrics or VMSelect url. Required parameter. E.g. http://127.0.0.1:8428 + -dryRun -rule + Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified. -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP is used -envflag.enable @@ -224,14 +226,18 @@ The shortlist of configuration flags is the following: Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password -httpListenAddr string Address to listen for http connections (default ":8880") + -loggerDisableTimestamps + Whether to disable writing timestamps in logs -loggerErrorsPerSecondLimit int - Per-second limit on the number of ERROR messages. If more than the given number of errors are emitted per second, then the remaining errors are suppressed. Zero value disables the rate limit (default 10) + Per-second limit on the number of ERROR messages. If more than the given number of errors are emitted per second, then the remaining errors are suppressed. Zero value disables the rate limit -loggerFormat string Format for logs. Possible values: default, json (default "default") -loggerLevel string Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO") -loggerOutput string Output for the logs. Supported values: stderr, stdout (default "stderr") + -loggerWarnsPerSecondLimit int + Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero value disables the rate limit -memory.allowedBytes value Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to non-zero value. Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. Too high value may evict too much data from OS page cache, which will result in higher disk IO usage Supports the following optional suffixes for values: KB, MB, GB, KiB, MiB, GiB (default 0) @@ -251,8 +257,9 @@ The shortlist of configuration flags is the following: -notifier.tlsCertFile array Optional path to client-side TLS certificate file to use when connecting to -notifier.url Supports array of values separated by comma or specified via multiple flags. - -notifier.tlsInsecureSkipVerify + -notifier.tlsInsecureSkipVerify array Whether to skip tls verification when connecting to -notifier.url + Supports array of values separated by comma or specified via multiple flags. -notifier.tlsKeyFile array Optional path to client-side TLS certificate key to use when connecting to -notifier.url Supports array of values separated by comma or specified via multiple flags. diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index bb4677cc34..aba2c78f95 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -3,6 +3,7 @@ # tip * FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. +* FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`. # [v1.50.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.50.2) diff --git a/docs/vmalert.md b/docs/vmalert.md index 773873039f..7c72894d5e 100644 --- a/docs/vmalert.md +++ b/docs/vmalert.md @@ -175,9 +175,9 @@ The shortlist of configuration flags is the following: -datasource.basicAuth.username string Optional basic auth username for -datasource.url -datasource.lookback duration - Lookback defines how far to look into past when evaluating queries. For example, if datasource.lookback=5m then param "time" with value now()-5m will be added to every query. + Lookback defines how far to look into past when evaluating queries. For example, if datasource.lookback=5m then param "time" with value now()-5m will be added to every query. -datasource.maxIdleConnections int - Defines the number of idle (keep-alive connections) to configured datasource.Consider to set this value equal to the value: groups_total * group.concurrency. Too low value may result into high number of sockets in TIME_WAIT state. (default 100) + Defines the number of idle (keep-alive connections) to configured datasource.Consider to set this value equal to the value: groups_total * group.concurrency. Too low value may result into high number of sockets in TIME_WAIT state. (default 100) -datasource.tlsCAFile string Optional path to TLS CA file to use for verifying connections to -datasource.url. By default system CA is used -datasource.tlsCertFile string @@ -190,6 +190,8 @@ The shortlist of configuration flags is the following: Optional TLS server name to use for connections to -datasource.url. By default the server name from -datasource.url is used -datasource.url string Victoria Metrics or VMSelect url. Required parameter. E.g. http://127.0.0.1:8428 + -dryRun -rule + Whether to check only config files without running vmalert. The rules file are validated. The -rule flag must be specified. -enableTCP6 Whether to enable IPv6 for listening and dialing. By default only IPv4 TCP is used -envflag.enable @@ -224,14 +226,18 @@ The shortlist of configuration flags is the following: Username for HTTP Basic Auth. The authentication is disabled if empty. See also -httpAuth.password -httpListenAddr string Address to listen for http connections (default ":8880") + -loggerDisableTimestamps + Whether to disable writing timestamps in logs -loggerErrorsPerSecondLimit int - Per-second limit on the number of ERROR messages. If more than the given number of errors are emitted per second, then the remaining errors are suppressed. Zero value disables the rate limit (default 10) + Per-second limit on the number of ERROR messages. If more than the given number of errors are emitted per second, then the remaining errors are suppressed. Zero value disables the rate limit -loggerFormat string Format for logs. Possible values: default, json (default "default") -loggerLevel string Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO") -loggerOutput string Output for the logs. Supported values: stderr, stdout (default "stderr") + -loggerWarnsPerSecondLimit int + Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero value disables the rate limit -memory.allowedBytes value Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to non-zero value. Too low value may increase cache miss rate, which usually results in higher CPU and disk IO usage. Too high value may evict too much data from OS page cache, which will result in higher disk IO usage Supports the following optional suffixes for values: KB, MB, GB, KiB, MiB, GiB (default 0) @@ -251,8 +257,9 @@ The shortlist of configuration flags is the following: -notifier.tlsCertFile array Optional path to client-side TLS certificate file to use when connecting to -notifier.url Supports array of values separated by comma or specified via multiple flags. - -notifier.tlsInsecureSkipVerify + -notifier.tlsInsecureSkipVerify array Whether to skip tls verification when connecting to -notifier.url + Supports array of values separated by comma or specified via multiple flags. -notifier.tlsKeyFile array Optional path to client-side TLS certificate key to use when connecting to -notifier.url Supports array of values separated by comma or specified via multiple flags. From 9e4ed5e591c9a3aac2fe6ff7978e0b41067f50a0 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Thu, 24 Dec 2020 08:50:10 +0200 Subject: [PATCH 06/10] lib/storage: do not remove parts outside the configured retention if they are currently merged These parts are automatically removed after the merge is complete. --- lib/storage/partition.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/storage/partition.go b/lib/storage/partition.go index c049c0812c..2be544d231 100644 --- a/lib/storage/partition.go +++ b/lib/storage/partition.go @@ -1324,13 +1324,13 @@ func (pt *partition) removeStaleParts() { pt.partsLock.Lock() for _, pw := range pt.bigParts { - if pw.p.ph.MaxTimestamp < retentionDeadline { + if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline { atomic.AddUint64(&pt.bigRowsDeleted, pw.p.ph.RowsCount) m[pw] = true } } for _, pw := range pt.smallParts { - if pw.p.ph.MaxTimestamp < retentionDeadline { + if !pw.isInMerge && pw.p.ph.MaxTimestamp < retentionDeadline { atomic.AddUint64(&pt.smallRowsDeleted, pw.p.ph.RowsCount) m[pw] = true } From 8dd03ecf1979903a0d99df6df03104c7eaab8501 Mon Sep 17 00:00:00 2001 From: Nikolay <nik@victoriametrics.com> Date: Thu, 24 Dec 2020 11:52:37 +0300 Subject: [PATCH 07/10] adds proxy_url support, (#980) * adds proxy_url support, adds proxy_url to the dockerswarm, eureka, kubernetes and consul service discovery, adds proxy_url to the scrape_config for targets scrapping, http based proxy is supported atm, https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503 * fixes imports --- lib/netutil/proxy.go | 124 ++++++++++++++++++ lib/promscrape/client.go | 8 +- lib/promscrape/config.go | 5 + lib/promscrape/discovery/consul/api.go | 2 +- lib/promscrape/discovery/consul/consul.go | 2 + lib/promscrape/discovery/dockerswarm/api.go | 3 +- .../discovery/dockerswarm/dockerswarm.go | 3 +- lib/promscrape/discovery/eureka/api.go | 2 +- lib/promscrape/discovery/eureka/eureka.go | 5 +- lib/promscrape/discovery/kubernetes/api.go | 2 +- .../discovery/kubernetes/kubernetes.go | 2 + lib/promscrape/discoveryutils/client.go | 11 +- lib/promscrape/scrapework.go | 4 + lib/promscrape/statconn.go | 48 ++++--- 14 files changed, 192 insertions(+), 29 deletions(-) create mode 100644 lib/netutil/proxy.go diff --git a/lib/netutil/proxy.go b/lib/netutil/proxy.go new file mode 100644 index 0000000000..d2a899cb2e --- /dev/null +++ b/lib/netutil/proxy.go @@ -0,0 +1,124 @@ +package netutil + +import ( + "bufio" + "encoding/base64" + "fmt" + "net" + "net/url" + "strings" + + "github.com/VictoriaMetrics/fasthttp" +) + +// ProxyURL implements marshal interfaces for url.URL. +type ProxyURL struct { + url *url.URL +} + +// URL returns *url.URL. +func (pu ProxyURL) URL() *url.URL { + return pu.url +} + +// String implements String interface. +func (pu ProxyURL) String() string { + if pu.url == nil { + return "" + } + return pu.url.String() +} + +// MarshalYAML implements yaml.Marshaler interface. +func (pu ProxyURL) MarshalYAML() (interface{}, error) { + if pu.url == nil { + return nil, nil + } + return pu.url.String(), nil +} + +// UnmarshalYAML implements yaml.Unmarshaler interface. +func (pu *ProxyURL) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + parsedURL, err := url.Parse(s) + if err != nil { + return fmt.Errorf("failed parse proxy_url=%q as *url.URL, err=%w", s, err) + } + pu.url = parsedURL + return nil +} + +// GetProxyDialFunc returns dial proxy func for the given proxy url. +// currently only http based proxy is supported. +func GetProxyDialFunc(proxyURL *url.URL) (fasthttp.DialFunc, error) { + if strings.HasPrefix(proxyURL.Scheme, "http") { + return httpProxy(proxyURL.Host, MakeBasicAuthHeader(nil, proxyURL)), nil + } + return nil, fmt.Errorf("unknown scheme=%q for proxy_url: %q, must be http or https", proxyURL.Scheme, proxyURL) +} + +func httpProxy(proxyAddr string, auth []byte) fasthttp.DialFunc { + return func(addr string) (net.Conn, error) { + var ( + conn net.Conn + err error + ) + if TCP6Enabled() { + conn, err = fasthttp.DialDualStack(proxyAddr) + } else { + conn, err = fasthttp.Dial(proxyAddr) + } + if err != nil { + return nil, fmt.Errorf("cannot connect to the proxy=%q,err=%w", proxyAddr, err) + } + if err := MakeProxyConnectCall(conn, []byte(addr), auth); err != nil { + _ = conn.Close() + return nil, err + } + return conn, nil + } +} + +// MakeBasicAuthHeader encodes and writes basic auth http header from url into given dst and returns it. +func MakeBasicAuthHeader(dst []byte, url *url.URL) []byte { + if url == nil || url.User == nil { + return dst + } + if len(url.User.Username()) > 0 { + dst = append(dst, "Proxy-Authorization: Basic "...) + dst = append(dst, base64.StdEncoding.EncodeToString([]byte(url.User.String()))...) + } + return dst +} + +// MakeProxyConnectCall execute CONNECT method to proxy with given destination address. +func MakeProxyConnectCall(conn net.Conn, dstAddr, auth []byte) error { + conReq := make([]byte, 0, 10) + conReq = append(conReq, []byte("CONNECT ")...) + conReq = append(conReq, dstAddr...) + conReq = append(conReq, []byte(" HTTP/1.1\r\n")...) + if len(auth) > 0 { + conReq = append(conReq, auth...) + conReq = append(conReq, []byte("\r\n")...) + } + conReq = append(conReq, []byte("\r\n")...) + + res := fasthttp.AcquireResponse() + defer fasthttp.ReleaseResponse(res) + res.SkipBody = true + if _, err := conn.Write(conReq); err != nil { + return err + } + if err := res.Read(bufio.NewReader(conn)); err != nil { + _ = conn.Close() + return fmt.Errorf("cannot read CONNECT response from proxy, err=%w", err) + } + if res.Header.StatusCode() != 200 { + _ = conn.Close() + return fmt.Errorf("unexpected proxy response status code, want: 200, get: %d", res.Header.StatusCode()) + } + return nil +} diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index 98e1ff230b..2e24e8a9fb 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -69,7 +69,7 @@ func newClient(sw *ScrapeWork) *client { hc := &fasthttp.HostClient{ Addr: host, Name: "vm_promscrape", - Dial: statDial, + Dial: getDialStatConn(sw.ProxyURL), IsTLS: isTLS, TLSConfig: tlsCfg, MaxIdleConnDuration: 2 * sw.ScrapeInterval, @@ -83,6 +83,7 @@ func newClient(sw *ScrapeWork) *client { sc = &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsCfg, + Proxy: http.ProxyURL(sw.ProxyURL), TLSHandshakeTimeout: 10 * time.Second, IdleConnTimeout: 2 * sw.ScrapeInterval, DisableCompression: *disableCompression || sw.DisableCompression, @@ -93,9 +94,8 @@ func newClient(sw *ScrapeWork) *client { } } return &client{ - hc: hc, - sc: sc, - + hc: hc, + sc: sc, scrapeURL: sw.ScrapeURL, host: host, requestURI: requestURI, diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index e4aaca37e2..65cd03cfc3 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -12,6 +12,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" @@ -71,6 +72,7 @@ type ScrapeConfig struct { BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"` FileSDConfigs []FileSDConfig `yaml:"file_sd_configs,omitempty"` @@ -495,6 +497,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf metricsPath: metricsPath, scheme: scheme, params: params, + proxyURL: sc.ProxyURL.URL(), authConfig: ac, honorLabels: honorLabels, honorTimestamps: honorTimestamps, @@ -516,6 +519,7 @@ type scrapeWorkConfig struct { metricsPath string scheme string params map[string][]string + proxyURL *url.URL authConfig *promauth.Config honorLabels bool honorTimestamps bool @@ -750,6 +754,7 @@ func appendScrapeWork(dst []*ScrapeWork, swc *scrapeWorkConfig, target string, e HonorTimestamps: swc.honorTimestamps, OriginalLabels: originalLabels, Labels: labels, + ProxyURL: swc.proxyURL, AuthConfig: swc.authConfig, MetricRelabelConfigs: swc.metricRelabelConfigs, SampleLimit: swc.sampleLimit, diff --git a/lib/promscrape/discovery/consul/api.go b/lib/promscrape/discovery/consul/api.go index 6a888eabba..a487eda610 100644 --- a/lib/promscrape/discovery/consul/api.go +++ b/lib/promscrape/discovery/consul/api.go @@ -58,7 +58,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } apiServer = scheme + "://" + apiServer } - client, err := discoveryutils.NewClient(apiServer, ac) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/consul/consul.go b/lib/promscrape/discovery/consul/consul.go index 6376d40626..2e59f9747e 100644 --- a/lib/promscrape/discovery/consul/consul.go +++ b/lib/promscrape/discovery/consul/consul.go @@ -3,6 +3,7 @@ package consul import ( "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" ) @@ -16,6 +17,7 @@ type SDConfig struct { Scheme string `yaml:"scheme,omitempty"` Username string `yaml:"username"` Password string `yaml:"password"` + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` Services []string `yaml:"services,omitempty"` Tags []string `yaml:"tags,omitempty"` diff --git a/lib/promscrape/discovery/dockerswarm/api.go b/lib/promscrape/discovery/dockerswarm/api.go index 3853600a41..fd72a13eee 100644 --- a/lib/promscrape/discovery/dockerswarm/api.go +++ b/lib/promscrape/discovery/dockerswarm/api.go @@ -34,11 +34,12 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { port: sdc.Port, filtersQueryArg: getFiltersQueryArg(sdc.Filters), } + ac, err := promauth.NewConfig(baseDir, sdc.BasicAuth, sdc.BearerToken, sdc.BearerTokenFile, sdc.TLSConfig) if err != nil { return nil, err } - client, err := discoveryutils.NewClient(sdc.Host, ac) + client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL.URL()) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err) } diff --git a/lib/promscrape/discovery/dockerswarm/dockerswarm.go b/lib/promscrape/discovery/dockerswarm/dockerswarm.go index 61b4b02f67..e09f273159 100644 --- a/lib/promscrape/discovery/dockerswarm/dockerswarm.go +++ b/lib/promscrape/discovery/dockerswarm/dockerswarm.go @@ -3,6 +3,7 @@ package dockerswarm import ( "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" ) @@ -15,7 +16,7 @@ type SDConfig struct { Port int `yaml:"port,omitempty"` Filters []Filter `yaml:"filters,omitempty"` - // TODO: add support for proxy_url + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` // refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` diff --git a/lib/promscrape/discovery/eureka/api.go b/lib/promscrape/discovery/eureka/api.go index f1a319bc36..cfb1214973 100644 --- a/lib/promscrape/discovery/eureka/api.go +++ b/lib/promscrape/discovery/eureka/api.go @@ -43,7 +43,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } apiServer = scheme + "://" + apiServer } - client, err := discoveryutils.NewClient(apiServer, ac) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/eureka/eureka.go b/lib/promscrape/discovery/eureka/eureka.go index 5dca819374..53b68fbb34 100644 --- a/lib/promscrape/discovery/eureka/eureka.go +++ b/lib/promscrape/discovery/eureka/eureka.go @@ -5,9 +5,9 @@ import ( "fmt" "strconv" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" - + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" ) const appsAPIPath = "/apps" @@ -22,6 +22,7 @@ type SDConfig struct { Scheme string `yaml:"scheme,omitempty"` Username string `yaml:"username"` Password string `yaml:"password"` + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` // RefreshInterval time.Duration `yaml:"refresh_interval"` // refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option. diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index caa3306439..39c09e8d8e 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -56,7 +56,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } ac = acNew } - client, err := discoveryutils.NewClient(apiServer, ac) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/kubernetes/kubernetes.go b/lib/promscrape/discovery/kubernetes/kubernetes.go index 87c3819633..f983436942 100644 --- a/lib/promscrape/discovery/kubernetes/kubernetes.go +++ b/lib/promscrape/discovery/kubernetes/kubernetes.go @@ -3,6 +3,7 @@ package kubernetes import ( "fmt" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" ) @@ -15,6 +16,7 @@ type SDConfig struct { BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` + ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` Namespaces Namespaces `yaml:"namespaces,omitempty"` Selectors []Selector `yaml:"selectors,omitempty"` diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 9469fe06d0..10963894ac 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -6,6 +6,7 @@ import ( "fmt" "net" "net/http" + "net/url" "strings" "sync" "time" @@ -45,11 +46,12 @@ type Client struct { } // NewClient returns new Client for the given apiServer and the given ac. -func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { +func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Client, error) { var ( dialFunc fasthttp.DialFunc tlsCfg *tls.Config u fasthttp.URI + err error ) u.Update(apiServer) @@ -61,6 +63,13 @@ func NewClient(apiServer string, ac *promauth.Config) (*Client, error) { return net.Dial("unix", dialAddr) } } + if proxyURL != nil { + dialFunc, err = netutil.GetProxyDialFunc(proxyURL) + if err != nil { + return nil, err + } + } + hostPort := string(u.Host()) isTLS := string(u.Scheme()) == "https" if isTLS && ac != nil { diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 62f759d58a..59d84a5101 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "math/bits" + "net/url" "strconv" "strings" "sync" @@ -70,6 +71,9 @@ type ScrapeWork struct { // Auth config AuthConfig *promauth.Config + // ProxyURL HTTP proxy url + ProxyURL *url.URL + // Optional `metric_relabel_configs`. MetricRelabelConfigs []promrelabel.ParsedRelabelConfig diff --git a/lib/promscrape/statconn.go b/lib/promscrape/statconn.go index 73c6dcc94b..893beefca5 100644 --- a/lib/promscrape/statconn.go +++ b/lib/promscrape/statconn.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net" + "net/url" "sync" "sync/atomic" "time" @@ -47,25 +48,38 @@ var ( stdDialerOnce sync.Once ) -func statDial(addr string) (conn net.Conn, err error) { - if netutil.TCP6Enabled() { - conn, err = fasthttp.DialDualStack(addr) - } else { - conn, err = fasthttp.Dial(addr) - } - dialsTotal.Inc() - if err != nil { - dialErrors.Inc() - if !netutil.TCP6Enabled() { - err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err) +func getDialStatConn(proxyURL *url.URL) fasthttp.DialFunc { + auth := netutil.MakeBasicAuthHeader(nil, proxyURL) + return func(addr string) (conn net.Conn, err error) { + dialAddr := addr + if proxyURL != nil { + dialAddr = proxyURL.Host } - return nil, err + if netutil.TCP6Enabled() { + conn, err = fasthttp.DialDualStack(dialAddr) + } else { + conn, err = fasthttp.Dial(dialAddr) + } + dialsTotal.Inc() + if err != nil { + dialErrors.Inc() + if !netutil.TCP6Enabled() { + err = fmt.Errorf("%w; try -enableTCP6 command-line flag if you scrape ipv6 addresses", err) + } + return nil, err + } + conns.Inc() + if proxyURL != nil { + if err := netutil.MakeProxyConnectCall(conn, []byte(addr), auth); err != nil { + _ = conn.Close() + return nil, err + } + } + sc := &statConn{ + Conn: conn, + } + return sc, nil } - conns.Inc() - sc := &statConn{ - Conn: conn, - } - return sc, nil } var ( From 820669da697353166cd54adb4bbe73e2f8c29587 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Thu, 24 Dec 2020 10:56:10 +0200 Subject: [PATCH 08/10] lib/promscrape: code prettifying for 8dd03ecf1979903a0d99df6df03104c7eaab8501 --- docs/CHANGELOG.md | 1 + lib/netutil/proxy.go | 124 ------------------ lib/promscrape/client.go | 13 +- lib/promscrape/config.go | 8 +- lib/promscrape/discovery/consul/api.go | 2 +- lib/promscrape/discovery/consul/consul.go | 4 +- lib/promscrape/discovery/dockerswarm/api.go | 2 +- .../discovery/dockerswarm/dockerswarm.go | 4 +- lib/promscrape/discovery/eureka/api.go | 2 +- lib/promscrape/discovery/eureka/eureka.go | 4 +- lib/promscrape/discovery/kubernetes/api.go | 2 +- .../discovery/kubernetes/kubernetes.go | 4 +- lib/promscrape/discoveryutils/client.go | 19 ++- lib/promscrape/scrapework.go | 4 +- lib/promscrape/statconn.go | 29 ++-- lib/proxy/proxy.go | 117 +++++++++++++++++ 16 files changed, 165 insertions(+), 174 deletions(-) delete mode 100644 lib/netutil/proxy.go create mode 100644 lib/proxy/proxy.go diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index aba2c78f95..3a05f66772 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -2,6 +2,7 @@ # tip +* FEATURE: vmagent: add support for `proxy_url` config option in Prometheus scrape configs. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/503 * FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. * FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`. diff --git a/lib/netutil/proxy.go b/lib/netutil/proxy.go deleted file mode 100644 index d2a899cb2e..0000000000 --- a/lib/netutil/proxy.go +++ /dev/null @@ -1,124 +0,0 @@ -package netutil - -import ( - "bufio" - "encoding/base64" - "fmt" - "net" - "net/url" - "strings" - - "github.com/VictoriaMetrics/fasthttp" -) - -// ProxyURL implements marshal interfaces for url.URL. -type ProxyURL struct { - url *url.URL -} - -// URL returns *url.URL. -func (pu ProxyURL) URL() *url.URL { - return pu.url -} - -// String implements String interface. -func (pu ProxyURL) String() string { - if pu.url == nil { - return "" - } - return pu.url.String() -} - -// MarshalYAML implements yaml.Marshaler interface. -func (pu ProxyURL) MarshalYAML() (interface{}, error) { - if pu.url == nil { - return nil, nil - } - return pu.url.String(), nil -} - -// UnmarshalYAML implements yaml.Unmarshaler interface. -func (pu *ProxyURL) UnmarshalYAML(unmarshal func(interface{}) error) error { - var s string - if err := unmarshal(&s); err != nil { - return err - } - parsedURL, err := url.Parse(s) - if err != nil { - return fmt.Errorf("failed parse proxy_url=%q as *url.URL, err=%w", s, err) - } - pu.url = parsedURL - return nil -} - -// GetProxyDialFunc returns dial proxy func for the given proxy url. -// currently only http based proxy is supported. -func GetProxyDialFunc(proxyURL *url.URL) (fasthttp.DialFunc, error) { - if strings.HasPrefix(proxyURL.Scheme, "http") { - return httpProxy(proxyURL.Host, MakeBasicAuthHeader(nil, proxyURL)), nil - } - return nil, fmt.Errorf("unknown scheme=%q for proxy_url: %q, must be http or https", proxyURL.Scheme, proxyURL) -} - -func httpProxy(proxyAddr string, auth []byte) fasthttp.DialFunc { - return func(addr string) (net.Conn, error) { - var ( - conn net.Conn - err error - ) - if TCP6Enabled() { - conn, err = fasthttp.DialDualStack(proxyAddr) - } else { - conn, err = fasthttp.Dial(proxyAddr) - } - if err != nil { - return nil, fmt.Errorf("cannot connect to the proxy=%q,err=%w", proxyAddr, err) - } - if err := MakeProxyConnectCall(conn, []byte(addr), auth); err != nil { - _ = conn.Close() - return nil, err - } - return conn, nil - } -} - -// MakeBasicAuthHeader encodes and writes basic auth http header from url into given dst and returns it. -func MakeBasicAuthHeader(dst []byte, url *url.URL) []byte { - if url == nil || url.User == nil { - return dst - } - if len(url.User.Username()) > 0 { - dst = append(dst, "Proxy-Authorization: Basic "...) - dst = append(dst, base64.StdEncoding.EncodeToString([]byte(url.User.String()))...) - } - return dst -} - -// MakeProxyConnectCall execute CONNECT method to proxy with given destination address. -func MakeProxyConnectCall(conn net.Conn, dstAddr, auth []byte) error { - conReq := make([]byte, 0, 10) - conReq = append(conReq, []byte("CONNECT ")...) - conReq = append(conReq, dstAddr...) - conReq = append(conReq, []byte(" HTTP/1.1\r\n")...) - if len(auth) > 0 { - conReq = append(conReq, auth...) - conReq = append(conReq, []byte("\r\n")...) - } - conReq = append(conReq, []byte("\r\n")...) - - res := fasthttp.AcquireResponse() - defer fasthttp.ReleaseResponse(res) - res.SkipBody = true - if _, err := conn.Write(conReq); err != nil { - return err - } - if err := res.Read(bufio.NewReader(conn)); err != nil { - _ = conn.Close() - return fmt.Errorf("cannot read CONNECT response from proxy, err=%w", err) - } - if res.Header.StatusCode() != 200 { - _ = conn.Close() - return fmt.Errorf("unexpected proxy response status code, want: 200, get: %d", res.Header.StatusCode()) - } - return nil -} diff --git a/lib/promscrape/client.go b/lib/promscrape/client.go index 2e24e8a9fb..bec3ef23e4 100644 --- a/lib/promscrape/client.go +++ b/lib/promscrape/client.go @@ -8,6 +8,7 @@ import ( "io" "io/ioutil" "net/http" + "net/url" "strings" "time" @@ -66,10 +67,14 @@ func newClient(sw *ScrapeWork) *client { host += ":443" } } + dialFunc, err := newStatDialFunc(sw.ProxyURL, tlsCfg) + if err != nil { + logger.Fatalf("cannot create dial func: %s", err) + } hc := &fasthttp.HostClient{ Addr: host, Name: "vm_promscrape", - Dial: getDialStatConn(sw.ProxyURL), + Dial: dialFunc, IsTLS: isTLS, TLSConfig: tlsCfg, MaxIdleConnDuration: 2 * sw.ScrapeInterval, @@ -80,10 +85,14 @@ func newClient(sw *ScrapeWork) *client { } var sc *http.Client if *streamParse || sw.StreamParse { + var proxy func(*http.Request) (*url.URL, error) + if proxyURL := sw.ProxyURL.URL(); proxyURL != nil { + proxy = http.ProxyURL(proxyURL) + } sc = &http.Client{ Transport: &http.Transport{ TLSClientConfig: tlsCfg, - Proxy: http.ProxyURL(sw.ProxyURL), + Proxy: proxy, TLSHandshakeTimeout: 10 * time.Second, IdleConnTimeout: 2 * sw.ScrapeInterval, DisableCompression: *disableCompression || sw.DisableCompression, diff --git a/lib/promscrape/config.go b/lib/promscrape/config.go index 65cd03cfc3..31e1ce858d 100644 --- a/lib/promscrape/config.go +++ b/lib/promscrape/config.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" @@ -24,6 +23,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "gopkg.in/yaml.v2" ) @@ -72,7 +72,7 @@ type ScrapeConfig struct { BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` StaticConfigs []StaticConfig `yaml:"static_configs,omitempty"` FileSDConfigs []FileSDConfig `yaml:"file_sd_configs,omitempty"` @@ -497,7 +497,7 @@ func getScrapeWorkConfig(sc *ScrapeConfig, baseDir string, globalCfg *GlobalConf metricsPath: metricsPath, scheme: scheme, params: params, - proxyURL: sc.ProxyURL.URL(), + proxyURL: sc.ProxyURL, authConfig: ac, honorLabels: honorLabels, honorTimestamps: honorTimestamps, @@ -519,7 +519,7 @@ type scrapeWorkConfig struct { metricsPath string scheme string params map[string][]string - proxyURL *url.URL + proxyURL proxy.URL authConfig *promauth.Config honorLabels bool honorTimestamps bool diff --git a/lib/promscrape/discovery/consul/api.go b/lib/promscrape/discovery/consul/api.go index a487eda610..eec5a3c9a0 100644 --- a/lib/promscrape/discovery/consul/api.go +++ b/lib/promscrape/discovery/consul/api.go @@ -58,7 +58,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } apiServer = scheme + "://" + apiServer } - client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/consul/consul.go b/lib/promscrape/discovery/consul/consul.go index 2e59f9747e..9c980f7793 100644 --- a/lib/promscrape/discovery/consul/consul.go +++ b/lib/promscrape/discovery/consul/consul.go @@ -3,8 +3,8 @@ package consul import ( "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" ) // SDConfig represents service discovery config for Consul. @@ -17,7 +17,7 @@ type SDConfig struct { Scheme string `yaml:"scheme,omitempty"` Username string `yaml:"username"` Password string `yaml:"password"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` Services []string `yaml:"services,omitempty"` Tags []string `yaml:"tags,omitempty"` diff --git a/lib/promscrape/discovery/dockerswarm/api.go b/lib/promscrape/discovery/dockerswarm/api.go index fd72a13eee..c79d12fc7d 100644 --- a/lib/promscrape/discovery/dockerswarm/api.go +++ b/lib/promscrape/discovery/dockerswarm/api.go @@ -39,7 +39,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { if err != nil { return nil, err } - client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL.URL()) + client, err := discoveryutils.NewClient(sdc.Host, ac, sdc.ProxyURL) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", sdc.Host, err) } diff --git a/lib/promscrape/discovery/dockerswarm/dockerswarm.go b/lib/promscrape/discovery/dockerswarm/dockerswarm.go index e09f273159..bd95643066 100644 --- a/lib/promscrape/discovery/dockerswarm/dockerswarm.go +++ b/lib/promscrape/discovery/dockerswarm/dockerswarm.go @@ -3,8 +3,8 @@ package dockerswarm import ( "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" ) // SDConfig represents docker swarm service discovery configuration @@ -16,7 +16,7 @@ type SDConfig struct { Port int `yaml:"port,omitempty"` Filters []Filter `yaml:"filters,omitempty"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` // refresh_interval is obtained from `-promscrape.dockerswarmSDCheckInterval` command-line option BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` diff --git a/lib/promscrape/discovery/eureka/api.go b/lib/promscrape/discovery/eureka/api.go index cfb1214973..255c9005d5 100644 --- a/lib/promscrape/discovery/eureka/api.go +++ b/lib/promscrape/discovery/eureka/api.go @@ -43,7 +43,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } apiServer = scheme + "://" + apiServer } - client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/eureka/eureka.go b/lib/promscrape/discovery/eureka/eureka.go index 53b68fbb34..c8ebc55efc 100644 --- a/lib/promscrape/discovery/eureka/eureka.go +++ b/lib/promscrape/discovery/eureka/eureka.go @@ -5,9 +5,9 @@ import ( "fmt" "strconv" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discoveryutils" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" ) const appsAPIPath = "/apps" @@ -22,7 +22,7 @@ type SDConfig struct { Scheme string `yaml:"scheme,omitempty"` Username string `yaml:"username"` Password string `yaml:"password"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` // RefreshInterval time.Duration `yaml:"refresh_interval"` // refresh_interval is obtained from `-promscrape.ec2SDCheckInterval` command-line option. diff --git a/lib/promscrape/discovery/kubernetes/api.go b/lib/promscrape/discovery/kubernetes/api.go index 39c09e8d8e..2c0463214f 100644 --- a/lib/promscrape/discovery/kubernetes/api.go +++ b/lib/promscrape/discovery/kubernetes/api.go @@ -56,7 +56,7 @@ func newAPIConfig(sdc *SDConfig, baseDir string) (*apiConfig, error) { } ac = acNew } - client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL.URL()) + client, err := discoveryutils.NewClient(apiServer, ac, sdc.ProxyURL) if err != nil { return nil, fmt.Errorf("cannot create HTTP client for %q: %w", apiServer, err) } diff --git a/lib/promscrape/discovery/kubernetes/kubernetes.go b/lib/promscrape/discovery/kubernetes/kubernetes.go index f983436942..2c7a489952 100644 --- a/lib/promscrape/discovery/kubernetes/kubernetes.go +++ b/lib/promscrape/discovery/kubernetes/kubernetes.go @@ -3,8 +3,8 @@ package kubernetes import ( "fmt" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" ) // SDConfig represents kubernetes-based service discovery config. @@ -16,7 +16,7 @@ type SDConfig struct { BasicAuth *promauth.BasicAuthConfig `yaml:"basic_auth,omitempty"` BearerToken string `yaml:"bearer_token,omitempty"` BearerTokenFile string `yaml:"bearer_token_file,omitempty"` - ProxyURL netutil.ProxyURL `yaml:"proxy_url,omitempty"` + ProxyURL proxy.URL `yaml:"proxy_url,omitempty"` TLSConfig *promauth.TLSConfig `yaml:"tls_config,omitempty"` Namespaces Namespaces `yaml:"namespaces,omitempty"` Selectors []Selector `yaml:"selectors,omitempty"` diff --git a/lib/promscrape/discoveryutils/client.go b/lib/promscrape/discoveryutils/client.go index 10963894ac..b971be8ec9 100644 --- a/lib/promscrape/discoveryutils/client.go +++ b/lib/promscrape/discoveryutils/client.go @@ -6,13 +6,12 @@ import ( "fmt" "net" "net/http" - "net/url" "strings" "sync" "time" - "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/fasthttp" ) @@ -46,7 +45,7 @@ type Client struct { } // NewClient returns new Client for the given apiServer and the given ac. -func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Client, error) { +func NewClient(apiServer string, ac *promauth.Config, proxyURL proxy.URL) (*Client, error) { var ( dialFunc fasthttp.DialFunc tlsCfg *tls.Config @@ -63,12 +62,6 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Clien return net.Dial("unix", dialAddr) } } - if proxyURL != nil { - dialFunc, err = netutil.GetProxyDialFunc(proxyURL) - if err != nil { - return nil, err - } - } hostPort := string(u.Host()) isTLS := string(u.Scheme()) == "https" @@ -82,10 +75,15 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Clien } hostPort = net.JoinHostPort(hostPort, port) } + if dialFunc == nil { + dialFunc, err = proxyURL.NewDialFunc(tlsCfg) + if err != nil { + return nil, err + } + } hc := &fasthttp.HostClient{ Addr: hostPort, Name: "vm_promscrape/discovery", - DialDualStack: netutil.TCP6Enabled(), IsTLS: isTLS, TLSConfig: tlsCfg, ReadTimeout: time.Minute, @@ -97,7 +95,6 @@ func NewClient(apiServer string, ac *promauth.Config, proxyURL *url.URL) (*Clien blockingClient := &fasthttp.HostClient{ Addr: hostPort, Name: "vm_promscrape/discovery", - DialDualStack: netutil.TCP6Enabled(), IsTLS: isTLS, TLSConfig: tlsCfg, ReadTimeout: BlockingClientReadTimeout, diff --git a/lib/promscrape/scrapework.go b/lib/promscrape/scrapework.go index 59d84a5101..30f28e3a30 100644 --- a/lib/promscrape/scrapework.go +++ b/lib/promscrape/scrapework.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "math/bits" - "net/url" "strconv" "strings" "sync" @@ -18,6 +17,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" parser "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/prometheus" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/metrics" xxhash "github.com/cespare/xxhash/v2" ) @@ -72,7 +72,7 @@ type ScrapeWork struct { AuthConfig *promauth.Config // ProxyURL HTTP proxy url - ProxyURL *url.URL + ProxyURL proxy.URL // Optional `metric_relabel_configs`. MetricRelabelConfigs []promrelabel.ParsedRelabelConfig diff --git a/lib/promscrape/statconn.go b/lib/promscrape/statconn.go index 893beefca5..2fefaf4202 100644 --- a/lib/promscrape/statconn.go +++ b/lib/promscrape/statconn.go @@ -2,14 +2,15 @@ package promscrape import ( "context" + "crypto/tls" "fmt" "net" - "net/url" "sync" "sync/atomic" "time" "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/proxy" "github.com/VictoriaMetrics/fasthttp" "github.com/VictoriaMetrics/metrics" ) @@ -48,18 +49,13 @@ var ( stdDialerOnce sync.Once ) -func getDialStatConn(proxyURL *url.URL) fasthttp.DialFunc { - auth := netutil.MakeBasicAuthHeader(nil, proxyURL) - return func(addr string) (conn net.Conn, err error) { - dialAddr := addr - if proxyURL != nil { - dialAddr = proxyURL.Host - } - if netutil.TCP6Enabled() { - conn, err = fasthttp.DialDualStack(dialAddr) - } else { - conn, err = fasthttp.Dial(dialAddr) - } +func newStatDialFunc(proxyURL proxy.URL, tlsConfig *tls.Config) (fasthttp.DialFunc, error) { + dialFunc, err := proxyURL.NewDialFunc(tlsConfig) + if err != nil { + return nil, err + } + statDialFunc := func(addr string) (net.Conn, error) { + conn, err := dialFunc(addr) dialsTotal.Inc() if err != nil { dialErrors.Inc() @@ -69,17 +65,12 @@ func getDialStatConn(proxyURL *url.URL) fasthttp.DialFunc { return nil, err } conns.Inc() - if proxyURL != nil { - if err := netutil.MakeProxyConnectCall(conn, []byte(addr), auth); err != nil { - _ = conn.Close() - return nil, err - } - } sc := &statConn{ Conn: conn, } return sc, nil } + return statDialFunc, nil } var ( diff --git a/lib/proxy/proxy.go b/lib/proxy/proxy.go new file mode 100644 index 0000000000..82cb7b46a8 --- /dev/null +++ b/lib/proxy/proxy.go @@ -0,0 +1,117 @@ +package proxy + +import ( + "bufio" + "crypto/tls" + "encoding/base64" + "fmt" + "net" + "net/url" + + "github.com/VictoriaMetrics/VictoriaMetrics/lib/netutil" + "github.com/VictoriaMetrics/fasthttp" +) + +// URL implements YAML.Marshaler and yaml.Unmarshaler interfaces for url.URL. +type URL struct { + url *url.URL +} + +// URL return the underlying url. +func (u *URL) URL() *url.URL { + if u == nil || u.url == nil { + return nil + } + return u.url +} + +// MarshalYAML implements yaml.Marshaler interface. +func (u *URL) MarshalYAML() (interface{}, error) { + if u.url == nil { + return nil, nil + } + return u.url.String(), nil +} + +// UnmarshalYAML implements yaml.Unmarshaler interface. +func (u *URL) UnmarshalYAML(unmarshal func(interface{}) error) error { + var s string + if err := unmarshal(&s); err != nil { + return err + } + parsedURL, err := url.Parse(s) + if err != nil { + return fmt.Errorf("cannot parse proxy_url=%q as *url.URL: %w", s, err) + } + u.url = parsedURL + return nil +} + +// NewDialFunc returns dial func for the given pu and tlsConfig. +func (u *URL) NewDialFunc(tlsConfig *tls.Config) (fasthttp.DialFunc, error) { + if u == nil || u.url == nil { + return defaultDialFunc, nil + } + pu := u.url + if pu.Scheme != "http" && pu.Scheme != "https" { + return nil, fmt.Errorf("unknown scheme=%q for proxy_url=%q, must be http or https", pu.Scheme, pu) + } + var authHeader string + if pu.User != nil && len(pu.User.Username()) > 0 { + userPasswordEncoded := base64.StdEncoding.EncodeToString([]byte(pu.User.String())) + authHeader = "Proxy-Authorization: Basic " + userPasswordEncoded + "\r\n" + } + dialFunc := func(addr string) (net.Conn, error) { + proxyConn, err := defaultDialFunc(pu.Host) + if err != nil { + return nil, fmt.Errorf("cannot connect to proxy %q: %w", pu, err) + } + if pu.Scheme == "https" { + proxyConn = tls.Client(proxyConn, tlsConfig) + } + conn, err := sendConnectRequest(proxyConn, addr, authHeader) + if err != nil { + _ = proxyConn.Close() + return nil, fmt.Errorf("error when sending CONNECT request to proxy %q: %w", pu, err) + } + return conn, nil + } + return dialFunc, nil +} + +func defaultDialFunc(addr string) (net.Conn, error) { + if netutil.TCP6Enabled() { + return fasthttp.DialDualStack(addr) + } + return fasthttp.Dial(addr) +} + +// sendConnectRequest sends CONNECT request to proxyConn for the given addr and authHeader and returns the established connection to dstAddr. +func sendConnectRequest(proxyConn net.Conn, dstAddr, authHeader string) (net.Conn, error) { + req := "CONNECT " + dstAddr + " HTTP/1.1\r\nHost: " + dstAddr + "\r\n" + authHeader + "\r\n" + if _, err := proxyConn.Write([]byte(req)); err != nil { + return nil, fmt.Errorf("cannot send CONNECT request for dstAddr=%q: %w", dstAddr, err) + } + var res fasthttp.Response + res.SkipBody = true + conn := &bufferedReaderConn{ + br: bufio.NewReader(proxyConn), + Conn: proxyConn, + } + if err := res.Read(conn.br); err != nil { + return nil, fmt.Errorf("cannot read CONNECT response for dstAddr=%q: %w", dstAddr, err) + } + if statusCode := res.Header.StatusCode(); statusCode != 200 { + return nil, fmt.Errorf("unexpected status code received: %d; want: 200", statusCode) + } + return conn, nil +} + +type bufferedReaderConn struct { + net.Conn + br *bufio.Reader +} + +func (brc *bufferedReaderConn) Read(p []byte) (int, error) { + return brc.br.Read(p) +} From b21e16ad0c0556e804d48e9c42d63a7c81e71e0d Mon Sep 17 00:00:00 2001 From: Nikolay <nik@victoriametrics.com> Date: Thu, 24 Dec 2020 12:26:14 +0300 Subject: [PATCH 09/10] fixes kubernetes_sd (#983) * fixes kubernetes_sd, adds missing service metadata for pod ports without endpoint https://github.com/VictoriaMetrics/VictoriaMetrics/issues/982 * fix test --- lib/promscrape/discovery/kubernetes/endpoints.go | 3 +++ lib/promscrape/discovery/kubernetes/endpointslices.go | 3 +++ lib/promscrape/discovery/kubernetes/endpointslices_test.go | 7 +++++++ 3 files changed, 13 insertions(+) diff --git a/lib/promscrape/discovery/kubernetes/endpoints.go b/lib/promscrape/discovery/kubernetes/endpoints.go index 1bb3aaeb8f..edd5387463 100644 --- a/lib/promscrape/discovery/kubernetes/endpoints.go +++ b/lib/promscrape/discovery/kubernetes/endpoints.go @@ -158,6 +158,9 @@ func (eps *Endpoints) appendTargetLabels(ms []map[string]string, pods []Pod, svc } p.appendCommonLabels(m) p.appendContainerLabels(m, c, &cp) + if svc != nil { + svc.appendCommonLabels(m) + } ms = append(ms, m) } } diff --git a/lib/promscrape/discovery/kubernetes/endpointslices.go b/lib/promscrape/discovery/kubernetes/endpointslices.go index 48a06a25ce..f1f9da6459 100644 --- a/lib/promscrape/discovery/kubernetes/endpointslices.go +++ b/lib/promscrape/discovery/kubernetes/endpointslices.go @@ -113,6 +113,9 @@ func (eps *EndpointSlice) appendTargetLabels(ms []map[string]string, pods []Pod, } p.appendCommonLabels(m) p.appendContainerLabels(m, c, &cp) + if svc != nil { + svc.appendCommonLabels(m) + } ms = append(ms, m) } } diff --git a/lib/promscrape/discovery/kubernetes/endpointslices_test.go b/lib/promscrape/discovery/kubernetes/endpointslices_test.go index 44acef3d77..900751b748 100644 --- a/lib/promscrape/discovery/kubernetes/endpointslices_test.go +++ b/lib/promscrape/discovery/kubernetes/endpointslices_test.go @@ -420,6 +420,13 @@ func TestEndpointSlice_appendTargetLabels(t *testing.T) { "__meta_kubernetes_pod_phase": "", "__meta_kubernetes_pod_ready": "unknown", "__meta_kubernetes_pod_uid": "some-pod-uuid", + "__meta_kubernetes_service_cluster_ip": "", + "__meta_kubernetes_service_label_service_label_1": "value-1", + "__meta_kubernetes_service_label_service_label_2": "value-2", + "__meta_kubernetes_service_labelpresent_service_label_1": "true", + "__meta_kubernetes_service_labelpresent_service_label_2": "true", + "__meta_kubernetes_service_name": "custom-esl", + "__meta_kubernetes_service_type": "ClusterIP", }), }, }, From 4eb520a3427ca7123217cad83af197c58781457d Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin <valyala@gmail.com> Date: Thu, 24 Dec 2020 11:32:58 +0200 Subject: [PATCH 10/10] docs/CHANGELOG.md: mention about adding missing __meta_kubernetes_service_* labels for `endpoints` and `endpointslices` roles in `kubernetes_sd_config` See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/982 --- docs/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 3a05f66772..1935299811 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -6,6 +6,8 @@ * FEATURE: remove parts with stale data as soon as they go outside the configured `-retentionPeriod`. Previously such parts may remain active for long periods of time. This should help reducing disk usage for `-retentionPeriod` smaller than one month. * FEATURE: vmalert: allow setting multiple values for `-notifier.tlsInsecureSkipVerify` command-line flag per each `-notifier.url`. +* BUGFIX: vmagent: set missing `__meta_kubernetes_service_*` labels in `kubernetes_sd_config` for `endpoints` and `endpointslices` roles. See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/982 + # [v1.50.2](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.50.2)