diff --git a/app/vmstorage/main.go b/app/vmstorage/main.go index 8c0209a77..7888d99bb 100644 --- a/app/vmstorage/main.go +++ b/app/vmstorage/main.go @@ -30,6 +30,7 @@ var ( vmselectAddr = flag.String("vmselectAddr", ":8401", "TCP address to accept connections from vmselect services") snapshotAuthKey = flag.String("snapshotAuthKey", "", "authKey, which must be passed in query string to /snapshot* pages") forceMergeAuthKey = flag.String("forceMergeAuthKey", "", "authKey, which must be passed in query string to /internal/force_merge pages") + forceFlushAuthKey = flag.String("forceFlushAuthKey", "", "authKey, which must be passed in query string to /internal/force_flush pages") finalMergeDelay = flag.Duration("finalMergeDelay", 30*time.Second, "The delay before starting final merge for per-month partition after no new data is ingested into it. "+ "Query speed and disk space usage is usually reduced after the final merge is complete. Too low delay for final merge may result in increased "+ @@ -146,6 +147,16 @@ func requestHandler(w http.ResponseWriter, r *http.Request, strg *storage.Storag }() return true } + if path == "/internal/force_flush" { + authKey := r.FormValue("authKey") + if authKey != *forceFlushAuthKey { + httpserver.Errorf(w, r, "invalid authKey %q. It must match the value from -forceFlushAuthKey command line flag", authKey) + return true + } + logger.Infof("flushing storage to make pending data available for reading") + strg.DebugFlush() + return true + } if !strings.HasPrefix(path, "/snapshot") { return false } diff --git a/lib/storage/storage.go b/lib/storage/storage.go index 4a548e417..fcc5f9bb2 100644 --- a/lib/storage/storage.go +++ b/lib/storage/storage.go @@ -206,8 +206,8 @@ func (s *Storage) RetentionMsecs() int64 { return s.retentionMsecs } -// debugFlush flushes recently added storage data, so it becomes visible to search. -func (s *Storage) debugFlush() { +// DebugFlush flushes recently added storage data, so it becomes visible to search. +func (s *Storage) DebugFlush() { s.tb.flushRawRows() s.idb().tb.DebugFlush() } diff --git a/lib/storage/storage_test.go b/lib/storage/storage_test.go index 73c744d70..8eb31ea06 100644 --- a/lib/storage/storage_test.go +++ b/lib/storage/storage_test.go @@ -597,7 +597,7 @@ func testStorageDeleteMetrics(s *Storage, workerNum int) error { return fmt.Errorf("unexpected error when adding mrs: %w", err) } } - s.debugFlush() + s.DebugFlush() // Verify tag values exist tvs, err := s.SearchTagValues(accountID, projectID, workerTag, 1e5, noDeadline)