mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
all: add support for GOARCH=386 and fix all the issues related to 32-bit architectures such as GOARCH=arm
Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
This commit is contained in:
parent
5d2af2cfa2
commit
5b01b7fb01
24 changed files with 211 additions and 139 deletions
1
.github/workflows/main.yml
vendored
1
.github/workflows/main.yml
vendored
|
@ -29,6 +29,7 @@ jobs:
|
|||
git diff --exit-code
|
||||
make test-full
|
||||
make test-pure
|
||||
make test-full-386
|
||||
make vminsert vmselect vmstorage
|
||||
make vminsert-pure vmselect-pure vmstorage-pure
|
||||
GOOS=freebsd go build -mod=vendor ./app/vminsert
|
||||
|
|
3
Makefile
3
Makefile
|
@ -77,6 +77,9 @@ test-pure:
|
|||
test-full:
|
||||
GO111MODULE=on go test -mod=vendor -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
|
||||
|
||||
test-full-386:
|
||||
GO111MODULE=on GOARCH=386 go test -tags=integration -mod=vendor -coverprofile=coverage.txt -covermode=atomic ./lib/... ./app/...
|
||||
|
||||
benchmark:
|
||||
GO111MODULE=on go test -mod=vendor -bench=. ./lib/...
|
||||
GO111MODULE=on go test -mod=vendor -bench=. ./app/...
|
||||
|
|
|
@ -85,6 +85,15 @@ func TestRowsUnmarshalSuccess(t *testing.T) {
|
|||
}},
|
||||
})
|
||||
|
||||
// Timestamp bigger than 1<<31
|
||||
f("aaa 1123 429496729600", &Rows{
|
||||
Rows: []Row{{
|
||||
Metric: "aaa",
|
||||
Value: 1123,
|
||||
Timestamp: 429496729600,
|
||||
}},
|
||||
})
|
||||
|
||||
// Tags
|
||||
f("foo;bar=baz 1 2", &Rows{
|
||||
Rows: []Row{{
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
{% for i, ts := range rs.Timestamps %}
|
||||
{%z= bb.B %}{% space %}
|
||||
{%f= rs.Values[i] %}{% space %}
|
||||
{%d= int(ts) %}{% newline %}
|
||||
{%dl= ts %}{% newline %}
|
||||
{% endfor %}
|
||||
{% code quicktemplate.ReleaseByteBuffer(bb) %}
|
||||
{% endfunc %}
|
||||
|
@ -35,10 +35,10 @@
|
|||
"timestamps":[
|
||||
{% if len(rs.Timestamps) > 0 %}
|
||||
{% code timestamps := rs.Timestamps %}
|
||||
{%d= int(timestamps[0]) %}
|
||||
{%dl= timestamps[0] %}
|
||||
{% code timestamps = timestamps[1:] %}
|
||||
{% for _, ts := range timestamps %}
|
||||
,{%d= int(ts) %}
|
||||
,{%dl= ts %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
]
|
||||
|
|
|
@ -49,7 +49,7 @@ func StreamExportPrometheusLine(qw422016 *qt422016.Writer, rs *netstorage.Result
|
|||
//line app/vmselect/prometheus/export.qtpl:15
|
||||
qw422016.N().S(` `)
|
||||
//line app/vmselect/prometheus/export.qtpl:16
|
||||
qw422016.N().D(int(ts))
|
||||
qw422016.N().DL(ts)
|
||||
//line app/vmselect/prometheus/export.qtpl:16
|
||||
qw422016.N().S(`
|
||||
`)
|
||||
|
@ -129,7 +129,7 @@ func StreamExportJSONLine(qw422016 *qt422016.Writer, rs *netstorage.Result) {
|
|||
timestamps := rs.Timestamps
|
||||
|
||||
//line app/vmselect/prometheus/export.qtpl:38
|
||||
qw422016.N().D(int(timestamps[0]))
|
||||
qw422016.N().DL(timestamps[0])
|
||||
//line app/vmselect/prometheus/export.qtpl:39
|
||||
timestamps = timestamps[1:]
|
||||
|
||||
|
@ -138,7 +138,7 @@ func StreamExportJSONLine(qw422016 *qt422016.Writer, rs *netstorage.Result) {
|
|||
//line app/vmselect/prometheus/export.qtpl:40
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmselect/prometheus/export.qtpl:41
|
||||
qw422016.N().D(int(ts))
|
||||
qw422016.N().DL(ts)
|
||||
//line app/vmselect/prometheus/export.qtpl:42
|
||||
}
|
||||
//line app/vmselect/prometheus/export.qtpl:43
|
||||
|
|
|
@ -10,7 +10,7 @@
|
|||
{% if len(rs.Timestamps) == 0 || len(rs.Values) == 0 %}{% return %}{% endif %}
|
||||
{%= prometheusMetricName(&rs.MetricName) %}{% space %}
|
||||
{%f= rs.Values[len(rs.Values)-1] %}{% space %}
|
||||
{%d= int(rs.Timestamps[len(rs.Timestamps)-1]) %}{% newline %}
|
||||
{%dl= rs.Timestamps[len(rs.Timestamps)-1] %}{% newline %}
|
||||
{% endfunc %}
|
||||
|
||||
{% endstripspace %}
|
||||
|
|
|
@ -41,7 +41,7 @@ func StreamFederate(qw422016 *qt422016.Writer, rs *netstorage.Result) {
|
|||
//line app/vmselect/prometheus/federate.qtpl:12
|
||||
qw422016.N().S(` `)
|
||||
//line app/vmselect/prometheus/federate.qtpl:13
|
||||
qw422016.N().D(int(rs.Timestamps[len(rs.Timestamps)-1]))
|
||||
qw422016.N().DL(rs.Timestamps[len(rs.Timestamps)-1])
|
||||
//line app/vmselect/prometheus/federate.qtpl:13
|
||||
qw422016.N().S(`
|
||||
`)
|
||||
|
|
|
@ -3,7 +3,7 @@ SeriesCountResponse generates response for /api/v1/series/count .
|
|||
{% func SeriesCountResponse(n uint64) %}
|
||||
{
|
||||
"status":"success",
|
||||
"data":[{%d int(n) %}]
|
||||
"data":[{%dl int64(n) %}]
|
||||
}
|
||||
{% endfunc %}
|
||||
{% endstripspace %}
|
||||
|
|
|
@ -24,7 +24,7 @@ func StreamSeriesCountResponse(qw422016 *qt422016.Writer, n uint64) {
|
|||
//line app/vmselect/prometheus/series_count_response.qtpl:3
|
||||
qw422016.N().S(`{"status":"success","data":[`)
|
||||
//line app/vmselect/prometheus/series_count_response.qtpl:6
|
||||
qw422016.N().D(int(n))
|
||||
qw422016.N().DL(int64(n))
|
||||
//line app/vmselect/prometheus/series_count_response.qtpl:6
|
||||
qw422016.N().S(`]}`)
|
||||
//line app/vmselect/prometheus/series_count_response.qtpl:8
|
||||
|
|
3
app/vmselect/promql/arch_386.go
Normal file
3
app/vmselect/promql/arch_386.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
package promql
|
||||
|
||||
const maxByteSliceLen = 1<<31 - 1
|
|
@ -194,11 +194,14 @@ type parseCacheValue struct {
|
|||
}
|
||||
|
||||
type parseCache struct {
|
||||
m map[string]*parseCacheValue
|
||||
mu sync.RWMutex
|
||||
// Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
|
||||
requests uint64
|
||||
misses uint64
|
||||
|
||||
m map[string]*parseCacheValue
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (pc *parseCache) Requests() uint64 {
|
||||
|
|
|
@ -51,11 +51,14 @@ type regexpCacheValue struct {
|
|||
}
|
||||
|
||||
type regexpCache struct {
|
||||
m map[string]*regexpCacheValue
|
||||
mu sync.RWMutex
|
||||
// Move atomic counters to the top of struct for 8-byte alignment on 32-bit arch.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
|
||||
requests uint64
|
||||
misses uint64
|
||||
|
||||
m map[string]*regexpCacheValue
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (rc *regexpCache) Requests() uint64 {
|
||||
|
|
|
@ -858,7 +858,7 @@ func testRowsEqual(t *testing.T, values []float64, timestamps []int64, valuesExp
|
|||
}
|
||||
continue
|
||||
}
|
||||
if v != vExpected {
|
||||
if math.Abs(v-vExpected) > 1e-15 {
|
||||
t.Fatalf("unexpected value at values[%d]; got %f; want %f\nvalues=\n%v\nvaluesExpected=\n%v",
|
||||
i, v, vExpected, values, valuesExpected)
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
|
@ -43,7 +44,7 @@ var (
|
|||
maxCachedInmemoryBlocksPerPartOnce sync.Once
|
||||
)
|
||||
|
||||
type part struct {
|
||||
type partInternals struct {
|
||||
ph partHeader
|
||||
|
||||
path string
|
||||
|
@ -55,7 +56,14 @@ type part struct {
|
|||
indexFile fs.ReadAtCloser
|
||||
itemsFile fs.ReadAtCloser
|
||||
lensFile fs.ReadAtCloser
|
||||
}
|
||||
|
||||
type part struct {
|
||||
partInternals
|
||||
|
||||
// Align atomic counters inside caches by 8 bytes on 32-bit architectures.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
|
||||
_ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte
|
||||
idxbCache indexBlockCache
|
||||
ibCache inmemoryBlockCache
|
||||
}
|
||||
|
@ -114,15 +122,15 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
|
|||
}
|
||||
metaindexReader.MustClose()
|
||||
|
||||
p := &part{
|
||||
path: path,
|
||||
size: size,
|
||||
mrs: mrs,
|
||||
var p part
|
||||
p.path = path
|
||||
p.size = size
|
||||
p.mrs = mrs
|
||||
|
||||
p.indexFile = indexFile
|
||||
p.itemsFile = itemsFile
|
||||
p.lensFile = lensFile
|
||||
|
||||
indexFile: indexFile,
|
||||
itemsFile: itemsFile,
|
||||
lensFile: lensFile,
|
||||
}
|
||||
p.ph.CopyFrom(ph)
|
||||
p.idxbCache.Init()
|
||||
p.ibCache.Init()
|
||||
|
@ -133,7 +141,7 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
|
|||
p.MustClose()
|
||||
return nil, err
|
||||
}
|
||||
return p, nil
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
func (p *part) MustClose() {
|
||||
|
@ -165,12 +173,15 @@ func putIndexBlock(idxb *indexBlock) {
|
|||
var indexBlockPool sync.Pool
|
||||
|
||||
type indexBlockCache struct {
|
||||
// Atomically updated counters must go first in the struct, so they are properly
|
||||
// aligned to 8 bytes on 32-bit architectures.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
requests uint64
|
||||
misses uint64
|
||||
|
||||
m map[uint64]*indexBlock
|
||||
missesMap map[uint64]uint64
|
||||
mu sync.RWMutex
|
||||
|
||||
requests uint64
|
||||
misses uint64
|
||||
}
|
||||
|
||||
func (idxbc *indexBlockCache) Init() {
|
||||
|
@ -274,12 +285,15 @@ func (idxbc *indexBlockCache) Misses() uint64 {
|
|||
}
|
||||
|
||||
type inmemoryBlockCache struct {
|
||||
// Atomically updated counters must go first in the struct, so they are properly
|
||||
// aligned to 8 bytes on 32-bit architectures.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
requests uint64
|
||||
misses uint64
|
||||
|
||||
m map[inmemoryBlockCacheKey]*inmemoryBlock
|
||||
missesMap map[inmemoryBlockCacheKey]uint64
|
||||
mu sync.RWMutex
|
||||
|
||||
requests uint64
|
||||
misses uint64
|
||||
}
|
||||
|
||||
type inmemoryBlockCacheKey struct {
|
||||
|
|
|
@ -70,6 +70,17 @@ const rawItemsFlushInterval = time.Second
|
|||
|
||||
// Table represents mergeset table.
|
||||
type Table struct {
|
||||
// Atomically updated counters must go first in the struct, so they are properly
|
||||
// aligned to 8 bytes on 32-bit architectures.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
|
||||
activeMerges uint64
|
||||
mergesCount uint64
|
||||
itemsMerged uint64
|
||||
assistedMerges uint64
|
||||
|
||||
mergeIdx uint64
|
||||
|
||||
path string
|
||||
|
||||
flushCallback func()
|
||||
|
@ -83,8 +94,6 @@ type Table struct {
|
|||
rawItemsLock sync.Mutex
|
||||
rawItemsLastFlushTime time.Time
|
||||
|
||||
mergeIdx uint64
|
||||
|
||||
snapshotLock sync.RWMutex
|
||||
|
||||
flockF *os.File
|
||||
|
@ -100,11 +109,6 @@ type Table struct {
|
|||
|
||||
// Use syncwg instead of sync, since Add/Wait may be called from concurrent goroutines.
|
||||
rawItemsPendingFlushesWG syncwg.WaitGroup
|
||||
|
||||
activeMerges uint64
|
||||
mergesCount uint64
|
||||
itemsMerged uint64
|
||||
assistedMerges uint64
|
||||
}
|
||||
|
||||
type partWrapper struct {
|
||||
|
|
|
@ -43,6 +43,11 @@ func (cm *connMetrics) init(group, name, addr string) {
|
|||
}
|
||||
|
||||
type statConn struct {
|
||||
// Move atomic counters to the top of struct in order to properly align them on 32-bit arch.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
|
||||
closeCalls uint64
|
||||
|
||||
readTimeout time.Duration
|
||||
lastReadTime time.Time
|
||||
|
||||
|
@ -52,8 +57,6 @@ type statConn struct {
|
|||
net.Conn
|
||||
|
||||
cm *connMetrics
|
||||
|
||||
closeCalls uint64
|
||||
}
|
||||
|
||||
func (sc *statConn) Read(p []byte) (int, error) {
|
||||
|
|
|
@ -59,7 +59,7 @@ func TestBlockStreamReaderManyTSIDManyRows(t *testing.T) {
|
|||
r.PrecisionBits = defaultPrecisionBits
|
||||
const blocks = 123
|
||||
for i := 0; i < 3210; i++ {
|
||||
r.TSID.MetricID = uint64((1e12 - i) % blocks)
|
||||
r.TSID.MetricID = uint64((1e9 - i) % blocks)
|
||||
r.Value = rand.Float64()
|
||||
r.Timestamp = int64(rand.Float64() * 1e9)
|
||||
rows = append(rows, r)
|
||||
|
@ -73,7 +73,7 @@ func TestBlockStreamReaderReadConcurrent(t *testing.T) {
|
|||
r.PrecisionBits = defaultPrecisionBits
|
||||
const blocks = 123
|
||||
for i := 0; i < 3210; i++ {
|
||||
r.TSID.MetricID = uint64((1e12 - i) % blocks)
|
||||
r.TSID.MetricID = uint64((1e9 - i) % blocks)
|
||||
r.Value = rand.Float64()
|
||||
r.Timestamp = int64(rand.Float64() * 1e9)
|
||||
rows = append(rows, r)
|
||||
|
|
|
@ -68,8 +68,30 @@ func shouldCacheBlock(item []byte) bool {
|
|||
|
||||
// indexDB represents an index db.
|
||||
type indexDB struct {
|
||||
name string
|
||||
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
|
||||
|
||||
refCount uint64
|
||||
|
||||
// The number of missing MetricID -> TSID entries.
|
||||
// High rate for this value means corrupted indexDB.
|
||||
missingTSIDsForMetricID uint64
|
||||
|
||||
// The number of calls to search for metric ids for recent hours.
|
||||
recentHourMetricIDsSearchCalls uint64
|
||||
|
||||
// The number of cache hits during search for metric ids in recent hours.
|
||||
recentHourMetricIDsSearchHits uint64
|
||||
|
||||
// The number of searches for metric ids by days.
|
||||
dateMetricIDsSearchCalls uint64
|
||||
|
||||
// The number of successful searches for metric ids by days.
|
||||
dateMetricIDsSearchHits uint64
|
||||
|
||||
mustDrop uint64
|
||||
|
||||
name string
|
||||
tb *mergeset.Table
|
||||
|
||||
extDB *indexDB
|
||||
|
@ -104,24 +126,6 @@ type indexDB struct {
|
|||
// up to two last hours.
|
||||
currHourMetricIDs *atomic.Value
|
||||
prevHourMetricIDs *atomic.Value
|
||||
|
||||
// The number of missing MetricID -> TSID entries.
|
||||
// High rate for this value means corrupted indexDB.
|
||||
missingTSIDsForMetricID uint64
|
||||
|
||||
// The number of calls to search for metric ids for recent hours.
|
||||
recentHourMetricIDsSearchCalls uint64
|
||||
|
||||
// The number of cache hits during search for metric ids in recent hours.
|
||||
recentHourMetricIDsSearchHits uint64
|
||||
|
||||
// The number of searches for metric ids by days.
|
||||
dateMetricIDsSearchCalls uint64
|
||||
|
||||
// The number of successful searches for metric ids by days.
|
||||
dateMetricIDsSearchHits uint64
|
||||
|
||||
mustDrop uint64
|
||||
}
|
||||
|
||||
// openIndexDB opens index db from the given path with the given caches.
|
||||
|
|
|
@ -25,7 +25,7 @@ func TestMergeBlockStreamsOneStreamOneBlockManyRows(t *testing.T) {
|
|||
minTimestamp := int64(1<<63 - 1)
|
||||
maxTimestamp := int64(-1 << 63)
|
||||
for i := 0; i < maxRowsPerBlock; i++ {
|
||||
r.Timestamp = int64(rand.Intn(1e15))
|
||||
r.Timestamp = int64(rand.Intn(1e9))
|
||||
r.Value = rand.NormFloat64() * 2332
|
||||
rows = append(rows, r)
|
||||
|
||||
|
@ -51,7 +51,7 @@ func TestMergeBlockStreamsOneStreamManyBlocksOneRow(t *testing.T) {
|
|||
for i := 0; i < blocksCount; i++ {
|
||||
initTestTSID(&r.TSID)
|
||||
r.TSID.MetricID = uint64(i * 123)
|
||||
r.Timestamp = int64(rand.Intn(1e15))
|
||||
r.Timestamp = int64(rand.Intn(1e9))
|
||||
r.Value = rand.NormFloat64() * 2332
|
||||
rows = append(rows, r)
|
||||
|
||||
|
@ -78,7 +78,7 @@ func TestMergeBlockStreamsOneStreamManyBlocksManyRows(t *testing.T) {
|
|||
maxTimestamp := int64(-1 << 63)
|
||||
for i := 0; i < rowsCount; i++ {
|
||||
r.TSID.MetricID = uint64(i % blocksCount)
|
||||
r.Timestamp = int64(rand.Intn(1e15))
|
||||
r.Timestamp = int64(rand.Intn(1e9))
|
||||
r.Value = rand.NormFloat64() * 2332
|
||||
rows = append(rows, r)
|
||||
|
||||
|
@ -175,7 +175,7 @@ func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) {
|
|||
const rowsCount1 = 4938
|
||||
for i := 0; i < rowsCount1; i++ {
|
||||
r.TSID.MetricID = uint64(i % blocksCount)
|
||||
r.Timestamp = int64(rand.Intn(1e15))
|
||||
r.Timestamp = int64(rand.Intn(1e9))
|
||||
r.Value = rand.NormFloat64() * 2332
|
||||
rows = append(rows, r)
|
||||
|
||||
|
@ -192,7 +192,7 @@ func TestMergeBlockStreamsTwoStreamsManyBlocksManyRows(t *testing.T) {
|
|||
const rowsCount2 = 3281
|
||||
for i := 0; i < rowsCount2; i++ {
|
||||
r.TSID.MetricID = uint64((i + 17) % blocksCount)
|
||||
r.Timestamp = int64(rand.Intn(1e15))
|
||||
r.Timestamp = int64(rand.Intn(1e9))
|
||||
r.Value = rand.NormFloat64() * 2332
|
||||
rows = append(rows, r)
|
||||
|
||||
|
@ -310,7 +310,7 @@ func TestMergeBlockStreamsManyStreamsManyBlocksManyRows(t *testing.T) {
|
|||
var rows []rawRow
|
||||
for j := 0; j < rowsPerStream; j++ {
|
||||
r.TSID.MetricID = uint64(j % blocksCount)
|
||||
r.Timestamp = int64(rand.Intn(1e10))
|
||||
r.Timestamp = int64(rand.Intn(1e9))
|
||||
r.Value = rand.NormFloat64()
|
||||
rows = append(rows, r)
|
||||
|
||||
|
@ -343,7 +343,7 @@ func TestMergeForciblyStop(t *testing.T) {
|
|||
var rows []rawRow
|
||||
for j := 0; j < rowsPerStream; j++ {
|
||||
r.TSID.MetricID = uint64(j % blocksCount)
|
||||
r.Timestamp = int64(rand.Intn(1e10))
|
||||
r.Timestamp = int64(rand.Intn(1e9))
|
||||
r.Value = rand.NormFloat64()
|
||||
rows = append(rows, r)
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"path/filepath"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unsafe"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/filestream"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
|
@ -27,8 +28,7 @@ var (
|
|||
maxCachedIndexBlocksPerPartOnce sync.Once
|
||||
)
|
||||
|
||||
// part represents a searchable part containing time series data.
|
||||
type part struct {
|
||||
type partInternals struct {
|
||||
ph partHeader
|
||||
|
||||
// Filesystem path to the part.
|
||||
|
@ -44,7 +44,15 @@ type part struct {
|
|||
indexFile fs.ReadAtCloser
|
||||
|
||||
metaindex []metaindexRow
|
||||
}
|
||||
|
||||
// part represents a searchable part containing time series data.
|
||||
type part struct {
|
||||
partInternals
|
||||
|
||||
// Align ibCache to 8 bytes in order to align internal counters on 32-bit architectures.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
_ [(8 - (unsafe.Sizeof(partInternals{}) % 8)) % 8]byte
|
||||
ibCache indexBlockCache
|
||||
}
|
||||
|
||||
|
@ -107,27 +115,26 @@ func newPart(ph *partHeader, path string, size uint64, metaindexReader filestrea
|
|||
}
|
||||
metaindexReader.MustClose()
|
||||
|
||||
p := &part{
|
||||
ph: *ph,
|
||||
path: path,
|
||||
size: size,
|
||||
timestampsFile: timestampsFile,
|
||||
valuesFile: valuesFile,
|
||||
indexFile: indexFile,
|
||||
var p part
|
||||
p.ph = *ph
|
||||
p.path = path
|
||||
p.size = size
|
||||
p.timestampsFile = timestampsFile
|
||||
p.valuesFile = valuesFile
|
||||
p.indexFile = indexFile
|
||||
|
||||
metaindex: metaindex,
|
||||
}
|
||||
p.metaindex = metaindex
|
||||
|
||||
if len(errors) > 0 {
|
||||
// Return only the first error, since it has no sense in returning all errors.
|
||||
err = fmt.Errorf("cannot initialize part %q: %s", p, errors[0])
|
||||
err = fmt.Errorf("cannot initialize part %q: %s", &p, errors[0])
|
||||
p.MustClose()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.ibCache.Init()
|
||||
|
||||
return p, nil
|
||||
return &p, nil
|
||||
}
|
||||
|
||||
// String returns human-readable representation of p.
|
||||
|
@ -168,12 +175,14 @@ func putIndexBlock(ib *indexBlock) {
|
|||
var indexBlockPool sync.Pool
|
||||
|
||||
type indexBlockCache struct {
|
||||
// Put atomic counters to the top of struct in order to align them to 8 bytes on 32-bit architectures.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
requests uint64
|
||||
misses uint64
|
||||
|
||||
m map[uint64]*indexBlock
|
||||
missesMap map[uint64]uint64
|
||||
mu sync.RWMutex
|
||||
|
||||
requests uint64
|
||||
misses uint64
|
||||
}
|
||||
|
||||
func (ibc *indexBlockCache) Init() {
|
||||
|
|
|
@ -90,6 +90,22 @@ const inmemoryPartsFlushInterval = 5 * time.Second
|
|||
|
||||
// partition represents a partition.
|
||||
type partition struct {
|
||||
// Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
|
||||
activeBigMerges uint64
|
||||
activeSmallMerges uint64
|
||||
bigMergesCount uint64
|
||||
smallMergesCount uint64
|
||||
bigRowsMerged uint64
|
||||
smallRowsMerged uint64
|
||||
bigRowsDeleted uint64
|
||||
smallRowsDeleted uint64
|
||||
|
||||
smallAssistedMerges uint64
|
||||
|
||||
mergeIdx uint64
|
||||
|
||||
smallPartsPath string
|
||||
bigPartsPath string
|
||||
|
||||
|
@ -123,8 +139,6 @@ type partition struct {
|
|||
// rawRowsLastFlushTime is the last time rawRows are flushed.
|
||||
rawRowsLastFlushTime time.Time
|
||||
|
||||
mergeIdx uint64
|
||||
|
||||
snapshotLock sync.RWMutex
|
||||
|
||||
stopCh chan struct{}
|
||||
|
@ -134,29 +148,22 @@ type partition struct {
|
|||
rawRowsFlusherWG sync.WaitGroup
|
||||
inmemoryPartsFlusherWG sync.WaitGroup
|
||||
|
||||
activeBigMerges uint64
|
||||
activeSmallMerges uint64
|
||||
bigMergesCount uint64
|
||||
smallMergesCount uint64
|
||||
bigRowsMerged uint64
|
||||
smallRowsMerged uint64
|
||||
bigRowsDeleted uint64
|
||||
smallRowsDeleted uint64
|
||||
|
||||
smallAssistedMerges uint64
|
||||
}
|
||||
|
||||
// partWrapper is a wrapper for the part.
|
||||
type partWrapper struct {
|
||||
// Put atomic counters to the top of struct, so they are aligned to 8 bytes on 32-bit arch.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
|
||||
// The number of references to the part.
|
||||
refCount uint64
|
||||
|
||||
// The part itself.
|
||||
p *part
|
||||
|
||||
// non-nil if the part is inmemoryPart.
|
||||
mp *inmemoryPart
|
||||
|
||||
// The number of references to the part.
|
||||
refCount uint64
|
||||
|
||||
// Whether the part is in merge now.
|
||||
isInMerge bool
|
||||
}
|
||||
|
|
|
@ -14,34 +14,34 @@ func TestPartitionMaxRowsByPath(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestAppendPartsToMerge(t *testing.T) {
|
||||
testAppendPartsToMerge(t, 2, []int{}, nil)
|
||||
testAppendPartsToMerge(t, 2, []int{123}, nil)
|
||||
testAppendPartsToMerge(t, 2, []int{4, 2}, nil)
|
||||
testAppendPartsToMerge(t, 2, []int{128, 64, 32, 16, 8, 4, 2, 1}, nil)
|
||||
testAppendPartsToMerge(t, 4, []int{128, 64, 32, 10, 9, 7, 2, 1}, []int{2, 7, 9, 10})
|
||||
testAppendPartsToMerge(t, 2, []int{128, 64, 32, 16, 8, 4, 2, 2}, []int{2, 2})
|
||||
testAppendPartsToMerge(t, 4, []int{128, 64, 32, 16, 8, 4, 2, 2}, []int{2, 2, 4, 8})
|
||||
testAppendPartsToMerge(t, 2, []int{1, 1}, []int{1, 1})
|
||||
testAppendPartsToMerge(t, 2, []int{2, 2, 2}, []int{2, 2})
|
||||
testAppendPartsToMerge(t, 2, []int{4, 2, 4}, []int{4, 4})
|
||||
testAppendPartsToMerge(t, 2, []int{1, 3, 7, 2}, nil)
|
||||
testAppendPartsToMerge(t, 3, []int{1, 3, 7, 2}, []int{1, 2, 3})
|
||||
testAppendPartsToMerge(t, 4, []int{1, 3, 7, 2}, []int{1, 2, 3})
|
||||
testAppendPartsToMerge(t, 3, []int{11, 1, 10, 100, 10}, []int{10, 10, 11})
|
||||
testAppendPartsToMerge(t, 2, []uint64{}, nil)
|
||||
testAppendPartsToMerge(t, 2, []uint64{123}, nil)
|
||||
testAppendPartsToMerge(t, 2, []uint64{4, 2}, nil)
|
||||
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 1}, nil)
|
||||
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 10, 9, 7, 2, 1}, []uint64{2, 7, 9, 10})
|
||||
testAppendPartsToMerge(t, 2, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2})
|
||||
testAppendPartsToMerge(t, 4, []uint64{128, 64, 32, 16, 8, 4, 2, 2}, []uint64{2, 2, 4, 8})
|
||||
testAppendPartsToMerge(t, 2, []uint64{1, 1}, []uint64{1, 1})
|
||||
testAppendPartsToMerge(t, 2, []uint64{2, 2, 2}, []uint64{2, 2})
|
||||
testAppendPartsToMerge(t, 2, []uint64{4, 2, 4}, []uint64{4, 4})
|
||||
testAppendPartsToMerge(t, 2, []uint64{1, 3, 7, 2}, nil)
|
||||
testAppendPartsToMerge(t, 3, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
|
||||
testAppendPartsToMerge(t, 4, []uint64{1, 3, 7, 2}, []uint64{1, 2, 3})
|
||||
testAppendPartsToMerge(t, 3, []uint64{11, 1, 10, 100, 10}, []uint64{10, 10, 11})
|
||||
}
|
||||
|
||||
func TestAppendPartsToMergeManyParts(t *testing.T) {
|
||||
// Verify that big number of parts are merged into minimal number of parts
|
||||
// using minimum merges.
|
||||
var a []int
|
||||
var a []uint64
|
||||
maxOutPartRows := uint64(0)
|
||||
for i := 0; i < 1024; i++ {
|
||||
n := int(rand.NormFloat64() * 1e9)
|
||||
n := uint64(uint32(rand.NormFloat64() * 1e9))
|
||||
if n < 0 {
|
||||
n = -n
|
||||
}
|
||||
n++
|
||||
maxOutPartRows += uint64(n)
|
||||
maxOutPartRows += n
|
||||
a = append(a, n)
|
||||
}
|
||||
pws := newTestPartWrappersForRowsCount(a)
|
||||
|
@ -67,11 +67,10 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
|
|||
}
|
||||
}
|
||||
pw := &partWrapper{
|
||||
p: &part{
|
||||
ph: partHeader{
|
||||
p: &part{},
|
||||
}
|
||||
pw.p.ph = partHeader{
|
||||
RowsCount: rowsCount,
|
||||
},
|
||||
},
|
||||
}
|
||||
rowsMerged += rowsCount
|
||||
pwsNew = append(pwsNew, pw)
|
||||
|
@ -94,7 +93,7 @@ func TestAppendPartsToMergeManyParts(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []int) {
|
||||
func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount, expectedRowsCount []uint64) {
|
||||
t.Helper()
|
||||
|
||||
pws := newTestPartWrappersForRowsCount(initialRowsCount)
|
||||
|
@ -111,11 +110,13 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
|
|||
prefix := []*partWrapper{
|
||||
{
|
||||
p: &part{
|
||||
partInternals: partInternals{
|
||||
ph: partHeader{
|
||||
RowsCount: 1234,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{},
|
||||
{},
|
||||
}
|
||||
|
@ -132,21 +133,23 @@ func testAppendPartsToMerge(t *testing.T, maxPartsToMerge int, initialRowsCount,
|
|||
}
|
||||
}
|
||||
|
||||
func newTestRowsCountFromPartWrappers(pws []*partWrapper) []int {
|
||||
var rowsCount []int
|
||||
func newTestRowsCountFromPartWrappers(pws []*partWrapper) []uint64 {
|
||||
var rowsCount []uint64
|
||||
for _, pw := range pws {
|
||||
rowsCount = append(rowsCount, int(pw.p.ph.RowsCount))
|
||||
rowsCount = append(rowsCount, pw.p.ph.RowsCount)
|
||||
}
|
||||
return rowsCount
|
||||
}
|
||||
|
||||
func newTestPartWrappersForRowsCount(rowsCount []int) []*partWrapper {
|
||||
func newTestPartWrappersForRowsCount(rowsCount []uint64) []*partWrapper {
|
||||
var pws []*partWrapper
|
||||
for _, rc := range rowsCount {
|
||||
pw := &partWrapper{
|
||||
p: &part{
|
||||
partInternals: partInternals{
|
||||
ph: partHeader{
|
||||
RowsCount: uint64(rc),
|
||||
RowsCount: rc,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
|
|
@ -29,6 +29,15 @@ const maxRetentionMonths = 12 * 100
|
|||
|
||||
// Storage represents TSDB storage.
|
||||
type Storage struct {
|
||||
// Atomic counters must go at the top of the structure in order to properly align by 8 bytes on 32-bit archs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212 .
|
||||
tooSmallTimestampRows uint64
|
||||
tooBigTimestampRows uint64
|
||||
|
||||
addRowsConcurrencyLimitReached uint64
|
||||
addRowsConcurrencyLimitTimeout uint64
|
||||
addRowsConcurrencyDroppedRows uint64
|
||||
|
||||
path string
|
||||
cachePath string
|
||||
retentionMonths int
|
||||
|
@ -66,13 +75,6 @@ type Storage struct {
|
|||
|
||||
currHourMetricIDsUpdaterWG sync.WaitGroup
|
||||
retentionWatcherWG sync.WaitGroup
|
||||
|
||||
tooSmallTimestampRows uint64
|
||||
tooBigTimestampRows uint64
|
||||
|
||||
addRowsConcurrencyLimitReached uint64
|
||||
addRowsConcurrencyLimitTimeout uint64
|
||||
addRowsConcurrencyDroppedRows uint64
|
||||
}
|
||||
|
||||
// OpenStorage opens storage on the given path with the given number of retention months.
|
||||
|
|
|
@ -34,11 +34,15 @@ type table struct {
|
|||
|
||||
// partitionWrapper provides refcounting mechanism for the partition.
|
||||
type partitionWrapper struct {
|
||||
pt *partition
|
||||
// Atomic counters must be at the top of struct for proper 8-byte alignment on 32-bit archs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/212
|
||||
|
||||
refCount uint64
|
||||
|
||||
// The partition must be dropped if mustDrop > 0
|
||||
mustDrop uint64
|
||||
|
||||
pt *partition
|
||||
}
|
||||
|
||||
func (ptw *partitionWrapper) incRef() {
|
||||
|
|
Loading…
Reference in a new issue