2019-11-07 19:05:39 +00:00
package actions
import (
"flag"
"fmt"
2023-05-03 08:48:53 +00:00
"path/filepath"
2019-11-07 19:05:39 +00:00
"strings"
"sync"
"time"
2022-10-05 21:10:00 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/azremote"
2019-11-07 19:05:39 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fsremote"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/gcsremote"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/s3remote"
)
var (
credsFilePath = flag . String ( "credsFilePath" , "" , "Path to file with GCS or S3 credentials. Credentials are loaded from default locations if not set.\n" +
"See https://cloud.google.com/iam/docs/creating-managing-service-account-keys and https://docs.aws.amazon.com/general/latest/gr/aws-security-credentials.html" )
configFilePath = flag . String ( "configFilePath" , "" , "Path to file with S3 configs. Configs are loaded from default location if not set.\n" +
"See https://docs.aws.amazon.com/general/latest/gr/aws-security-credentials.html" )
2020-09-07 18:53:24 +00:00
configProfile = flag . String ( "configProfile" , "" , "Profile name for S3 configs. If no set, the value of the environment variable will be loaded (AWS_PROFILE or AWS_DEFAULT_PROFILE), " +
"or if both not set, DefaultSharedConfigProfile is used" )
2019-11-29 16:05:46 +00:00
customS3Endpoint = flag . String ( "customS3Endpoint" , "" , "Custom S3 endpoint for use with S3-compatible storages (e.g. MinIO). S3 is used if not set" )
2021-11-16 23:03:03 +00:00
s3ForcePathStyle = flag . Bool ( "s3ForcePathStyle" , true , "Prefixing endpoint with bucket name when set false, true by default." )
2019-11-07 19:05:39 +00:00
)
func runParallel ( concurrency int , parts [ ] common . Part , f func ( p common . Part ) error , progress func ( elapsed time . Duration ) ) error {
var err error
runWithProgress ( progress , func ( ) {
err = runParallelInternal ( concurrency , parts , f )
} )
return err
}
func runParallelPerPath ( concurrency int , perPath map [ string ] [ ] common . Part , f func ( parts [ ] common . Part ) error , progress func ( elapsed time . Duration ) ) error {
var err error
runWithProgress ( progress , func ( ) {
err = runParallelPerPathInternal ( concurrency , perPath , f )
} )
return err
}
func runParallelPerPathInternal ( concurrency int , perPath map [ string ] [ ] common . Part , f func ( parts [ ] common . Part ) error ) error {
if concurrency <= 0 {
concurrency = 1
}
if len ( perPath ) == 0 {
return nil
}
// len(perPath) capacity guarantees non-blocking behavior below.
resultCh := make ( chan error , len ( perPath ) )
workCh := make ( chan [ ] common . Part , len ( perPath ) )
stopCh := make ( chan struct { } )
// Start workers
var wg sync . WaitGroup
wg . Add ( concurrency )
for i := 0 ; i < concurrency ; i ++ {
go func ( ) {
defer wg . Done ( )
for parts := range workCh {
select {
case <- stopCh :
return
default :
}
resultCh <- f ( parts )
}
} ( )
}
// Feed workers with work.
for _ , parts := range perPath {
workCh <- parts
}
close ( workCh )
// Read results.
var err error
for i := 0 ; i < len ( perPath ) ; i ++ {
err = <- resultCh
if err != nil {
// Stop the work.
close ( stopCh )
break
}
}
// Wait for all the workers to stop.
wg . Wait ( )
return err
}
func runParallelInternal ( concurrency int , parts [ ] common . Part , f func ( p common . Part ) error ) error {
if concurrency <= 0 {
concurrency = 1
}
if len ( parts ) == 0 {
return nil
}
// len(parts) capacity guarantees non-blocking behavior below.
resultCh := make ( chan error , len ( parts ) )
workCh := make ( chan common . Part , len ( parts ) )
stopCh := make ( chan struct { } )
// Start workers
var wg sync . WaitGroup
wg . Add ( concurrency )
for i := 0 ; i < concurrency ; i ++ {
go func ( ) {
defer wg . Done ( )
for p := range workCh {
select {
case <- stopCh :
return
default :
}
resultCh <- f ( p )
}
} ( )
}
// Feed workers with work.
for _ , p := range parts {
workCh <- p
}
close ( workCh )
// Read results.
var err error
for i := 0 ; i < len ( parts ) ; i ++ {
err = <- resultCh
if err != nil {
// Stop the work.
close ( stopCh )
break
}
}
// Wait for all the workers to stop.
wg . Wait ( )
return err
}
func runWithProgress ( progress func ( elapsed time . Duration ) , f func ( ) ) {
startTime := time . Now ( )
doneCh := make ( chan struct { } )
go func ( ) {
f ( )
close ( doneCh )
} ( )
tc := time . NewTicker ( 10 * time . Second )
for {
select {
case <- doneCh :
tc . Stop ( )
// The last progress call.
progress ( time . Since ( startTime ) )
return
case <- tc . C :
progress ( time . Since ( startTime ) )
}
}
}
func getPartsSize ( parts [ ] common . Part ) uint64 {
n := uint64 ( 0 )
for _ , p := range parts {
n += p . Size
}
return n
}
// NewRemoteFS returns new remote fs from the given path.
func NewRemoteFS ( path string ) ( common . RemoteFS , error ) {
if len ( path ) == 0 {
return nil , fmt . Errorf ( "path cannot be empty" )
}
n := strings . Index ( path , "://" )
if n < 0 {
2022-10-05 21:10:00 +00:00
return nil , fmt . Errorf ( "Missing scheme in path %q. Supported schemes: `gs://`, `s3://`, `azblob://`, `fs://`" , path )
2019-11-07 19:05:39 +00:00
}
scheme := path [ : n ]
dir := path [ n + len ( "://" ) : ]
switch scheme {
case "fs" :
2023-05-03 08:48:53 +00:00
if ! filepath . IsAbs ( dir ) {
2019-11-07 19:05:39 +00:00
return nil , fmt . Errorf ( "dir must be absolute; got %q" , dir )
}
fs := & fsremote . FS {
Dir : dir ,
}
return fs , nil
2021-09-29 09:10:26 +00:00
case "gcs" , "gs" :
2019-11-07 19:05:39 +00:00
n := strings . Index ( dir , "/" )
if n < 0 {
return nil , fmt . Errorf ( "missing directory on the gcs bucket %q" , dir )
}
bucket := dir [ : n ]
dir = dir [ n : ]
fs := & gcsremote . FS {
CredsFilePath : * credsFilePath ,
Bucket : bucket ,
Dir : dir ,
}
if err := fs . Init ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot initialize connection to gcs: %w" , err )
2019-11-07 19:05:39 +00:00
}
return fs , nil
2022-10-05 21:10:00 +00:00
case "azblob" :
n := strings . Index ( dir , "/" )
if n < 0 {
return nil , fmt . Errorf ( "missing directory on the AZBlob container %q" , dir )
}
bucket := dir [ : n ]
dir = dir [ n : ]
fs := & azremote . FS {
Container : bucket ,
Dir : dir ,
}
if err := fs . Init ( ) ; err != nil {
return nil , fmt . Errorf ( "cannot initialize connection to AZBlob: %w" , err )
}
return fs , nil
2019-11-07 19:05:39 +00:00
case "s3" :
n := strings . Index ( dir , "/" )
if n < 0 {
return nil , fmt . Errorf ( "missing directory on the s3 bucket %q" , dir )
}
bucket := dir [ : n ]
dir = dir [ n : ]
fs := & s3remote . FS {
2021-11-16 23:03:03 +00:00
CredsFilePath : * credsFilePath ,
ConfigFilePath : * configFilePath ,
CustomEndpoint : * customS3Endpoint ,
S3ForcePathStyle : * s3ForcePathStyle ,
ProfileName : * configProfile ,
Bucket : bucket ,
Dir : dir ,
2019-11-07 19:05:39 +00:00
}
if err := fs . Init ( ) ; err != nil {
2020-06-30 19:58:18 +00:00
return nil , fmt . Errorf ( "cannot initialize connection to s3: %w" , err )
2019-11-07 19:05:39 +00:00
}
return fs , nil
default :
2020-05-15 14:12:55 +00:00
return nil , fmt . Errorf ( "unsupported scheme %q" , scheme )
2019-11-07 19:05:39 +00:00
}
}