all: add support for GOARCH=386 and fix all the issues related to 32-bit architectures such as GOARCH=arm

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
This commit is contained in:
Aliaksandr Valialkin 2019-10-17 18:22:56 +03:00
parent a398343bb6
commit 97ce4e03a5
25 changed files with 217 additions and 139 deletions

View file

@ -29,6 +29,7 @@ jobs:
git diff --exit-code
make test-full
make test-pure
make test-full-386
make victoria-metrics
make victoria-metrics-pure
make victoria-metrics-arm

View file

@ -61,6 +61,9 @@ test-pure:
test-full:
GO111MODULE=on go test -tags=integration -mod=vendor -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
test-full-386:
GO111MODULE=on GOARCH=386 go test -tags=integration -mod=vendor -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
benchmark:
GO111MODULE=on go test -mod=vendor -bench=. ./lib/...
GO111MODULE=on go test -mod=vendor -bench=. ./app/...

View file

@ -32,6 +32,12 @@ victoria-metrics-arm64:
victoria-metrics-arm64-prod:
APP_NAME=victoria-metrics APP_SUFFIX='-arm64' DOCKER_OPTS='--env CGO_ENABLED=0 --env GOARCH=arm64' $(MAKE) app-via-docker
victoria-metrics-386:
CGO_ENABLED=0 GOOS=linux GOARCH=386 GO111MODULE=on go build -mod=vendor -ldflags "$(GO_BUILDINFO)" -o bin/victoria-metrics-386 ./app/victoria-metrics
victoria-metrics-386-prod:
APP_NAME=victoria-metrics APP_SUFFIX='-386' DOCKER_OPTS='--env CGO_ENABLED=0 --env GOARCH=386' $(MAKE) app-via-docker
victoria-metrics-pure:
APP_NAME=victoria-metrics $(MAKE) app-local-pure

View file

@ -85,6 +85,15 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
}},
})
// Timestamp bigger than 1<<31
f("aaa 1123 429496729600", &Rows{
Rows: []Row{{
Metric: "aaa",
Value: 1123,
Timestamp: 429496729600,
}},
})
// Tags
f("foo;bar=baz 1 2", &Rows{
Rows: []Row{{

View file

@ -13,7 +13,7 @@
{% for i, ts := range rs.Timestamps %}
{%z= bb.B %}{% space %}
{%f= rs.Values[i] %}{% space %}
{%d= int(ts) %}{% newline %}
{%dl= ts %}{% newline %}
{% endfor %}
{% code quicktemplate.ReleaseByteBuffer(bb) %}
{% endfunc %}
@ -35,10 +35,10 @@
"timestamps":[
{% if len(rs.Timestamps) > 0 %}
{% code timestamps := rs.Timestamps %}
{%d= int(timestamps[0]) %}
{%dl= timestamps[0] %}
{% code timestamps = timestamps[1:] %}
{% for _, ts := range timestamps %}
,{%d= int(ts) %}
,{%dl= ts %}
{% endfor %}
{% endif %}
]

View file

@ -49,7 +49,7 @@ func StreamExportPrometheusLine(qw422016 *qt422016.Writer, rs *netstorage.Result
//line app/vmselect/prometheus/export.qtpl:15
qw422016.N().S(` `)
//line app/vmselect/prometheus/export.qtpl:16
qw422016.N().D(int(ts))
qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:16
qw422016.N().S(`
`)
@ -129,7 +129,7 @@ func StreamExportJSONLine(qw422016 *qt422016.Writer, rs *netstorage.Result) {
timestamps := rs.Timestamps
//line app/vmselect/prometheus/export.qtpl:38
qw422016.N().D(int(timestamps[0]))
qw422016.N().DL(timestamps[0])
//line app/vmselect/prometheus/export.qtpl:39
timestamps = timestamps[1:]
@ -138,7 +138,7 @@ func StreamExportJSONLine(qw422016 *qt422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/export.qtpl:40
qw422016.N().S(`,`)
//line app/vmselect/prometheus/export.qtpl:41
qw422016.N().D(int(ts))
qw422016.N().DL(ts)
//line app/vmselect/prometheus/export.qtpl:42
}
//line app/vmselect/prometheus/export.qtpl:43

View file

@ -10,7 +10,7 @@
{% if len(rs.Timestamps) == 0 || len(rs.Values) == 0 %}{% return %}{% endif %}
{%= prometheusMetricName(&rs.MetricName) %}{% space %}
{%f= rs.Values[len(rs.Values)-1] %}{% space %}
{%d= int(rs.Timestamps[len(rs.Timestamps)-1]) %}{% newline %}
{%dl= rs.Timestamps[len(rs.Timestamps)-1] %}{% newline %}
{% endfunc %}
{% endstripspace %}

View file

@ -41,7 +41,7 @@ func StreamFederate(qw422016 *qt422016.Writer, rs *netstorage.Result) {
//line app/vmselect/prometheus/federate.qtpl:12
qw422016.N().S(` `)
//line app/vmselect/prometheus/federate.qtpl:13
qw422016.N().D(int(rs.Timestamps[len(rs.Timestamps)-1]))
qw422016.N().DL(rs.Timestamps[len(rs.Timestamps)-1])
//line app/vmselect/prometheus/federate.qtpl:13
qw422016.N().S(`
`)

View file

@ -3,7 +3,7 @@ SeriesCountResponse generates response for /api/v1/series/count .
{% func SeriesCountResponse(n uint64) %}
{
"status":"success",
"data":[{%d int(n) %}]
"data":[{%dl int64(n) %}]
}
{% endfunc %}
{% endstripspace %}

View file

@ -24,7 +24,7 @@ func StreamSeriesCountResponse(qw422016 *qt422016.Writer, n uint64) {
//line app/vmselect/prometheus/series_count_response.qtpl:3
qw422016.N().S(`{"status":"success","data":[`)
//line app/vmselect/prometheus/series_count_response.qtpl:6
qw422016.N().D(int(n))
qw422016.N().DL(int64(n))
//line app/vmselect/prometheus/series_count_response.qtpl:6
qw422016.N().S(`]}`)
//line app/vmselect/prometheus/series_count_response.qtpl:8

View file

@ -0,0 +1,3 @@
package promql
const maxByteSliceLen = 1<<31 - 1

View file

@ -194,11 +194,14 @@ type parseCacheValue struct {
}
type parseCache struct {
m map[string]*parseCacheValue
mu sync.RWMutex
// Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[string]*parseCacheValue
mu sync.RWMutex
}
func (pc *parseCache) Requests() uint64 {

View file

@ -51,11 +51,14 @@ type regexpCacheValue struct {
}
type regexpCache struct {
m map[string]*regexpCacheValue
mu sync.RWMutex
// Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[string]*regexpCacheValue
mu sync.RWMutex
}
func (rc *regexpCache) Requests() uint64 {

View file

@ -858,7 +858,7 @@ func testRowsEqual(t *testing.T, values []float64, timestamps []int64, valuesExp
}
continue
}
if v != vExpected {
if math.Abs(v-vExpected) > 1e-15 {
t.Fatalf("unexpected value at values[%d]; got %f; want %f\nvalues=\n%v\nvaluesExpected=\n%v",
i, v, vExpected, values, valuesExpected)
}

View file

@ -5,6 +5,7 @@ import (
"path/filepath"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -43,7 +44,7 @@ var (
maxCachedInmemoryBlocksPerPartOnce sync.Once
)
type part struct {
type partInternals struct {
ph partHeader
path string
@ -55,7 +56,14 @@ type part struct {
indexFile fs.ReadAtCloser
itemsFile fs.ReadAtCloser
lensFile fs.ReadAtCloser
}
type part struct {
partInternals
// Align atomic counters inside caches by 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
_ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte
idxbCache indexBlockCache
ibCache inmemoryBlockCache
}
@ -114,15 +122,15 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
}
metaindexReader.MustClose()
p := &part{
path: path,
size: size,
mrs: mrs,
var p part
p.path = path
p.size = size
p.mrs = mrs
p.indexFile = indexFile
p.itemsFile = itemsFile
p.lensFile = lensFile
indexFile: indexFile,
itemsFile: itemsFile,
lensFile: lensFile,
}
p.ph.CopyFrom(ph)
p.idxbCache.Init()
p.ibCache.Init()
@ -133,7 +141,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
p.MustClose()
return nil, err
}
return p, nil
return &p, nil
}
func (p *part) MustClose() {
@ -165,12 +173,15 @@ func putIndexBlock(idxb *indexBlock) {
var indexBlockPool sync.Pool
type indexBlockCache struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[uint64]*indexBlock
missesMap map[uint64]uint64
mu sync.RWMutex
requests uint64
misses uint64
}
func (idxbc *indexBlockCache) Init() {
@ -274,12 +285,15 @@ func (idxbc *indexBlockCache) Misses() uint64 {
}
type inmemoryBlockCache struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[inmemoryBlockCacheKey]*inmemoryBlock
missesMap map[inmemoryBlockCacheKey]uint64
mu sync.RWMutex
requests uint64
misses uint64
}
type inmemoryBlockCacheKey struct {

View file

@ -70,6 +70,17 @@ const rawItemsFlushInterval = time.Second
// Table represents mergeset table.
type Table struct {
// Atomically updated counters must go first in the struct, so they are properly
// aligned to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
activeMerges uint64
mergesCount uint64
itemsMerged uint64
assistedMerges uint64
mergeIdx uint64
path string
flushCallback func()
@ -83,8 +94,6 @@ type Table struct {
rawItemsLock sync.Mutex
rawItemsLastFlushTime time.Time
mergeIdx uint64
snapshotLock sync.RWMutex
flockF *os.File
@ -100,11 +109,6 @@ type Table struct {
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
rawItemsPendingFlushesWG syncwg.WaitGroup
activeMerges uint64
mergesCount uint64
itemsMerged uint64
assistedMerges uint64
}
type partWrapper struct {

View file

@ -43,6 +43,11 @@ func (cm *connMetrics) init(group, name, addr string) {
}
type statConn struct {
// Move atomic counters to the top of struct in order to properly align them on 32-bit arch.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
closeCalls uint64
readTimeout time.Duration
lastReadTime time.Time
@ -52,8 +57,6 @@ type statConn struct {
net.Conn
cm *connMetrics
closeCalls uint64
}
func (sc *statConn) Read(p []byte) (int, error) {

View file

@ -59,7 +59,7 @@ func TestBlockStreamReaderManyTSIDManyRows(t *testing.T) {
r.PrecisionBits = defaultPrecisionBits
const blocks = 123
for i := 0; i < 3210; i++ {
r.TSID.MetricID = uint64((1e12 - i) % blocks)
r.TSID.MetricID = uint64((1e9 - i) % blocks)
r.Value = rand.Float64()
r.Timestamp = int64(rand.Float64() * 1e9)
rows = append(rows, r)
@ -73,7 +73,7 @@ func TestBlockStreamReaderReadConcurrent(t *testing.T) {
r.PrecisionBits = defaultPrecisionBits
const blocks = 123
for i := 0; i < 3210; i++ {
r.TSID.MetricID = uint64((1e12 - i) % blocks)
r.TSID.MetricID = uint64((1e9 - i) % blocks)
r.Value = rand.Float64()
r.Timestamp = int64(rand.Float64() * 1e9)
rows = append(rows, r)

View file

@ -68,8 +68,30 @@ func shouldCacheBlock(item []byte) bool {
// indexDB represents an index db.
type indexDB struct {
name string
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
refCount uint64
// The number of missing MetricID -> TSID entries.
// High rate for this value means corrupted indexDB.
missingTSIDsForMetricID uint64
// The number of calls to search for metric ids for recent hours.
recentHourMetricIDsSearchCalls uint64
// The number of cache hits during search for metric ids in recent hours.
recentHourMetricIDsSearchHits uint64
// The number of searches for metric ids by days.
dateMetricIDsSearchCalls uint64
// The number of successful searches for metric ids by days.
dateMetricIDsSearchHits uint64
mustDrop uint64
name string
tb *mergeset.Table
extDB *indexDB
@ -104,24 +126,6 @@ type indexDB struct {
// up to two last hours.
currHourMetricIDs *atomic.Value
prevHourMetricIDs *atomic.Value
// The number of missing MetricID -> TSID entries.
// High rate for this value means corrupted indexDB.
missingTSIDsForMetricID uint64
// The number of calls to search for metric ids for recent hours.
recentHourMetricIDsSearchCalls uint64
// The number of cache hits during search for metric ids in recent hours.
recentHourMetricIDsSearchHits uint64
// The number of searches for metric ids by days.
dateMetricIDsSearchCalls uint64
// The number of successful searches for metric ids by days.
dateMetricIDsSearchHits uint64
mustDrop uint64
}
// openIndexDB opens index db from the given path with the given caches.

View file

@ -25,7 +25,7 @@ func TestMergeBlockStreamsOneStreamOneBlockManyRows(t *testing.T) {
minTimestamp := int64(1<<63 - 1)
maxTimestamp := int64(-1 << 63)
for i := 0; i < maxRowsPerBlock; i++ {
r.Timestamp = int64(rand.Intn(1e15))
r.Timestamp = int64(rand.Intn(1e9))
r.Value = rand.NormFloat64() * 2332
rows = append(rows, r)
@ -51,7 +51,7 @@ func TestMergeBlockStreamsOneStreamManyBlocksOneRow(t *testing.T) {
for i := 0; i < blocksCount; i++ {
initTestTSID(&r.TSID)
r.TSID.MetricID = uint64(i * 123)
r.Timestamp = int64(rand.Intn(1e15))
r.Timestamp = int64(rand.Intn(1e9))
r.Value = rand.NormFloat64() * 2332
rows = append(rows, r)
@ -78,7 +78,7 @@ func TestMergeBlockStreamsOneStreamManyBlocksManyRows(t *testing.T) {
maxTimestamp := int64(-1 << 63)
for i := 0; i < rowsCount; i++ {
r.TSID.MetricID = uint64(i % blocksCount)
r.Timestamp = int64(rand.Intn(1e15))
r.Timestamp = int64(rand.Intn(1e9))
r.Value = rand.NormFloat64() * 2332
rows = append(rows, r)
@ -175,7 +175,7 @@ func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) {
const rowsCount1 = 4938
for i := 0; i < rowsCount1; i++ {
r.TSID.MetricID = uint64(i % blocksCount)
r.Timestamp = int64(rand.Intn(1e15))
r.Timestamp = int64(rand.Intn(1e9))
r.Value = rand.NormFloat64() * 2332
rows = append(rows, r)
@ -192,7 +192,7 @@ func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) {
const rowsCount2 = 3281
for i := 0; i < rowsCount2; i++ {
r.TSID.MetricID = uint64((i + 17) % blocksCount)
r.Timestamp = int64(rand.Intn(1e15))
r.Timestamp = int64(rand.Intn(1e9))
r.Value = rand.NormFloat64() * 2332
rows = append(rows, r)
@ -310,7 +310,7 @@ func TestMergeBlockStreamsManyStreamsManyBlocksManyRows(t *testing.T) {
var rows []rawRow
for j := 0; j < rowsPerStream; j++ {
r.TSID.MetricID = uint64(j % blocksCount)
r.Timestamp = int64(rand.Intn(1e10))
r.Timestamp = int64(rand.Intn(1e9))
r.Value = rand.NormFloat64()
rows = append(rows, r)
@ -343,7 +343,7 @@ func TestMergeForciblyStop(t *testing.T) {
var rows []rawRow
for j := 0; j < rowsPerStream; j++ {
r.TSID.MetricID = uint64(j % blocksCount)
r.Timestamp = int64(rand.Intn(1e10))
r.Timestamp = int64(rand.Intn(1e9))
r.Value = rand.NormFloat64()
rows = append(rows, r)

View file

@ -5,6 +5,7 @@ import (
"path/filepath"
"sync"
"sync/atomic"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
@ -27,8 +28,7 @@ var (
maxCachedIndexBlocksPerPartOnce sync.Once
)
// part represents a searchable part containing time series data.
type part struct {
type partInternals struct {
ph partHeader
// Filesystem path to the part.
@ -44,7 +44,15 @@ type part struct {
indexFile fs.ReadAtCloser
metaindex []metaindexRow
}
// part represents a searchable part containing time series data.
type part struct {
partInternals
// Align ibCache to 8 bytes in order to align internal counters on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
_ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte
ibCache indexBlockCache
}
@ -107,27 +115,26 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
}
metaindexReader.MustClose()
p := &part{
ph: *ph,
path: path,
size: size,
timestampsFile: timestampsFile,
valuesFile: valuesFile,
indexFile: indexFile,
var p part
p.ph = *ph
p.path = path
p.size = size
p.timestampsFile = timestampsFile
p.valuesFile = valuesFile
p.indexFile = indexFile
metaindex: metaindex,
}
p.metaindex = metaindex
if len(errors) > 0 {
// Return only the first error, since it has no sense in returning all errors.
err = fmt.Errorf("cannot initialize part %q: %s", p, errors[0])
err = fmt.Errorf("cannot initialize part %q: %s", &p, errors[0])
p.MustClose()
return nil, err
}
p.ibCache.Init()
return p, nil
return &p, nil
}
// String returns human-readable representation of p.
@ -168,12 +175,14 @@ func putIndexBlock(ib *indexBlock) {
var indexBlockPool sync.Pool
type indexBlockCache struct {
// Put atomic counters to the top of struct in order to align them to 8 bytes on 32-bit architectures.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
requests uint64
misses uint64
m map[uint64]*indexBlock
missesMap map[uint64]uint64
mu sync.RWMutex
requests uint64
misses uint64
}
func (ibc *indexBlockCache) Init() {

View file

@ -90,6 +90,22 @@ const inmemoryPartsFlushInterval = 5 * time.Second
// partition represents a partition.
type partition struct {
// Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
activeBigMerges uint64
activeSmallMerges uint64
bigMergesCount uint64
smallMergesCount uint64
bigRowsMerged uint64
smallRowsMerged uint64
bigRowsDeleted uint64
smallRowsDeleted uint64
smallAssistedMerges uint64
mergeIdx uint64
smallPartsPath string
bigPartsPath string
@ -123,8 +139,6 @@ type partition struct {
// rawRowsLastFlushTime is the last time rawRows are flushed.
rawRowsLastFlushTime time.Time
mergeIdx uint64
snapshotLock sync.RWMutex
stopCh chan struct{}
@ -134,29 +148,22 @@ type partition struct {
rawRowsFlusherWG sync.WaitGroup
inmemoryPartsFlusherWG sync.WaitGroup
activeBigMerges uint64
activeSmallMerges uint64
bigMergesCount uint64
smallMergesCount uint64
bigRowsMerged uint64
smallRowsMerged uint64
bigRowsDeleted uint64
smallRowsDeleted uint64
smallAssistedMerges uint64
}
// partWrapper is a wrapper for the part.
type partWrapper struct {
// Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
// The number of references to the part.
refCount uint64
// The part itself.
p *part
// non-nil if the part is inmemoryPart.
mp *inmemoryPart
// The number of references to the part.
refCount uint64
// Whether the part is in merge now.
isInMerge bool
}

View file

@ -14,34 +14,34 @@ func TestPartitionMaxRowsByPath(t *testing.T) {
}
func TestAppendPartsToMerge(t *testing.T) {
testAppendPartsToMerge(t, 2, []int{}, nil)
testAppendPartsToMerge(t, 2, []int{123}, nil)
testAppendPartsToMerge(t, 2, []int{4, 2}, nil)
testAppendPartsToMerge(t, 2, []int{128, 64, 32, 16, 8, 4, 2, 1}, nil)
testAppendPartsToMerge(t, 4, []int{128, 64, 32, 10, 9, 7, 2, 1}, []int{2, 7, 9, 10})
testAppendPartsToMerge(t, 2, []int{128, 64, 32, 16, 8, 4, 2, 2}, []int{2, 2})
testAppendPartsToMerge(t, 4, []int{128, 64, 32, 16, 8, 4, 2, 2}, []int{2, 2, 4, 8})
testAppendPartsToMerge(t, 2, []int{1, 1}, []int{1, 1})
testAppendPartsToMerge(t, 2, []int{2, 2, 2}, []int{2, 2})
testAppendPartsToMerge(t, 2, []int{4, 2, 4}, []int{4, 4})
testAppendPartsToMerge(t, 2, []int{1, 3, 7, 2}, nil)
testAppendPartsToMerge(t, 3, []int{1, 3, 7, 2}, []int{1, 2, 3})
testAppendPartsToMerge(t, 4, []int{1, 3, 7, 2}, []int{1, 2, 3})
testAppendPartsToMerge(t, 3, []int{11, 1, 10, 100, 10}, []int{10, 10, 11})
testAppendPartsToMerge(t, 2, []uint64{}, nil)
testAppendPartsToMerge(t, 2, []uint64{123}, nil)
testAppendPartsToMerge(t, 2, []uint64{4, 2}, nil)
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 1}, nil)
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 10, 9, 7, 2, 1}, []uint64{2, 7, 9, 10})
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2})
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2, 4, 8})
testAppendPartsToMerge(t, 2, []uint64{1, 1}, []uint64{1, 1})
testAppendPartsToMerge(t, 2, []uint64{2, 2, 2}, []uint64{2, 2})
testAppendPartsToMerge(t, 2, []uint64{4, 2, 4}, []uint64{4, 4})
testAppendPartsToMerge(t, 2, []uint64{1, 3, 7, 2}, nil)
testAppendPartsToMerge(t, 3, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
testAppendPartsToMerge(t, 4, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
testAppendPartsToMerge(t, 3, []uint64{11, 1, 10, 100, 10}, []uint64{10, 10, 11})
}
func TestAppendPartsToMergeManyParts(t *testing.T) {
// Verify that big number of parts are merged into minimal number of parts
// using minimum merges.
var a []int
var a []uint64
maxOutPartRows := uint64(0)
for i := 0; i < 1024; i++ {
n := int(rand.NormFloat64() * 1e9)
n := uint64(uint32(rand.NormFloat64() * 1e9))
if n < 0 {
n = -n
}
n++
maxOutPartRows += uint64(n)
maxOutPartRows += n
a = append(a, n)
}
pws := newTestPartWrappersForRowsCount(a)
@ -67,11 +67,10 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
}
}
pw := &partWrapper{
p: &part{
ph: partHeader{
p: &part{},
}
pw.p.ph = partHeader{
RowsCount: rowsCount,
},
},
}
rowsMerged += rowsCount
pwsNew = append(pwsNew, pw)
@ -94,7 +93,7 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
}
}
func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []int) {
func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []uint64) {
t.Helper()
pws := newTestPartWrappersForRowsCount(initialRowsCount)
@ -111,11 +110,13 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
prefix := []*partWrapper{
{
p: &part{
partInternals: partInternals{
ph: partHeader{
RowsCount: 1234,
},
},
},
},
{},
{},
}
@ -132,21 +133,23 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
}
}
func newTestRowsCountFromPartWrappers(pws []*partWrapper) []int {
var rowsCount []int
func newTestRowsCountFromPartWrappers(pws []*partWrapper) []uint64 {
var rowsCount []uint64
for _, pw := range pws {
rowsCount = append(rowsCount, int(pw.p.ph.RowsCount))
rowsCount = append(rowsCount, pw.p.ph.RowsCount)
}
return rowsCount
}
func newTestPartWrappersForRowsCount(rowsCount []int) []*partWrapper {
func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper {
var pws []*partWrapper
for _, rc := range rowsCount {
pw := &partWrapper{
p: &part{
partInternals: partInternals{
ph: partHeader{
RowsCount: uint64(rc),
RowsCount: rc,
},
},
},
}

View file

@ -29,6 +29,15 @@ const maxRetentionMonths = 12 * 100
// Storage represents TSDB storage.
type Storage struct {
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
addRowsConcurrencyLimitReached uint64
addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64
path string
cachePath string
retentionMonths int
@ -66,13 +75,6 @@ type Storage struct {
currHourMetricIDsUpdaterWG sync.WaitGroup
retentionWatcherWG sync.WaitGroup
tooSmallTimestampRows uint64
tooBigTimestampRows uint64
addRowsConcurrencyLimitReached uint64
addRowsConcurrencyLimitTimeout uint64
addRowsConcurrencyDroppedRows uint64
}
// OpenStorage opens storage on the given path with the given number of retention months.

View file

@ -34,11 +34,15 @@ type table struct {
// partitionWrapper provides refcounting mechanism for the partition.
type partitionWrapper struct {
pt *partition
// Atomic counters must be at the top of struct for proper 8-byte alignment on 32-bit archs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
refCount uint64
// The partition must be dropped if mustDrop > 0
mustDrop uint64
pt *partition
}
func (ptw *partitionWrapper) incRef() {