2022-10-05 21:10:00 +00:00
package azremote
import (
"context"
"errors"
"fmt"
"io"
2023-12-04 08:21:29 +00:00
"path"
2022-10-05 21:10:00 +00:00
"strings"
"time"
2022-10-06 21:55:36 +00:00
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
2024-07-10 09:52:05 +00:00
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
2022-10-05 21:10:00 +00:00
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
2022-10-06 21:55:36 +00:00
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
2022-10-05 21:10:00 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
2022-10-26 11:49:20 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
2022-10-05 21:10:00 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
const (
2022-10-06 21:55:36 +00:00
envStorageAcctName = "AZURE_STORAGE_ACCOUNT_NAME"
envStorageAccKey = "AZURE_STORAGE_ACCOUNT_KEY"
envStorageAccCs = "AZURE_STORAGE_ACCOUNT_CONNECTION_STRING"
2024-07-10 09:52:05 +00:00
envStorageDomain = "AZURE_STORAGE_DOMAIN"
envStorageDefault = "AZURE_USE_DEFAULT_CREDENTIAL"
2022-10-06 21:55:36 +00:00
storageErrorCodeBlobNotFound = "BlobNotFound"
2022-10-05 21:10:00 +00:00
)
2024-07-10 09:52:05 +00:00
var (
errNoCredentials = fmt . Errorf (
` failed to detect credentials for AZBlob .
Ensure that one of the options is set : connection string at % q ; shared key at % q and % q ; account name at % q and set % q to "true" ` ,
envStorageAccCs ,
envStorageAcctName ,
envStorageAccKey ,
envStorageAcctName ,
envStorageDefault ,
)
errInvalidCredentials = fmt . Errorf ( "failed to process credentials: only one of %s, %s and %s, or %s and %s can be specified" ,
envStorageAccCs ,
envStorageAcctName ,
envStorageAccKey ,
envStorageAcctName ,
envStorageDefault ,
)
)
2022-10-05 21:10:00 +00:00
// FS represents filesystem for backups in Azure Blob Storage.
//
// Init must be called before calling other FS methods.
type FS struct {
// Azure Blob Storage bucket to use.
Container string
// Directory in the bucket to write to.
Dir string
2022-10-06 21:55:36 +00:00
client * container . Client
2024-07-10 09:52:05 +00:00
env envLookuper
2022-10-05 21:10:00 +00:00
}
// Init initializes fs.
//
// The returned fs must be stopped when no long needed with MustStop call.
func ( fs * FS ) Init ( ) error {
2024-07-10 09:52:05 +00:00
switch {
case fs . client != nil :
2022-10-05 21:10:00 +00:00
logger . Panicf ( "BUG: fs.Init has been already called" )
2024-07-10 09:52:05 +00:00
case fs . env == nil :
fs . env = envtemplate . LookupEnv
2022-10-05 21:10:00 +00:00
}
2024-07-10 09:52:05 +00:00
fs . Dir = cleanDirectory ( fs . Dir )
sc , err := fs . newClient ( )
if err != nil {
return fmt . Errorf ( "failed to create AZBlob service client: %w" , err )
2022-10-05 21:10:00 +00:00
}
2024-07-10 09:52:05 +00:00
containerClient := sc . NewContainerClient ( fs . Container )
fs . client = containerClient
return nil
}
func ( fs * FS ) newClient ( ) ( * service . Client , error ) {
connString , hasConnString := fs . env ( envStorageAccCs )
accountName , hasAccountName := fs . env ( envStorageAcctName )
accountKey , hasAccountKey := fs . env ( envStorageAccKey )
useDefault , _ := fs . env ( envStorageDefault )
domain := "blob.core.windows.net"
if storageDomain , ok := fs . env ( envStorageDomain ) ; ok {
logger . Infof ( "Overriding default Azure blob domain with %q" , storageDomain )
domain = storageDomain
2022-10-05 21:10:00 +00:00
}
2024-07-10 09:52:05 +00:00
// not used if connection string is set
serviceURL := fmt . Sprintf ( "https://%s.%s/" , accountName , domain )
switch {
// can't specify any combination of more than one credential
case moreThanOne ( hasConnString , ( hasAccountName && hasAccountKey ) , ( useDefault == "true" && hasAccountName ) ) :
return nil , errInvalidCredentials
case hasConnString :
logger . Infof ( "Creating AZBlob service client from connection string" )
return service . NewClientFromConnectionString ( connString , nil )
case hasAccountName && hasAccountKey :
logger . Infof ( "Creating AZBlob service client from account name and key" )
2022-10-05 21:10:00 +00:00
creds , err := azblob . NewSharedKeyCredential ( accountName , accountKey )
if err != nil {
2024-07-10 09:52:05 +00:00
return nil , fmt . Errorf ( "failed to create Shared Key credentials: %w" , err )
2022-10-05 21:10:00 +00:00
}
2024-07-10 09:52:05 +00:00
return service . NewClientWithSharedKeyCredential ( serviceURL , creds , nil )
case useDefault == "true" && hasAccountName :
logger . Infof ( "Creating AZBlob service client from default credential" )
creds , err := azidentity . NewDefaultAzureCredential ( nil )
2022-10-05 21:10:00 +00:00
if err != nil {
2024-07-10 09:52:05 +00:00
return nil , fmt . Errorf ( "failed to create default Azure credentials: %w" , err )
2022-10-05 21:10:00 +00:00
}
2024-07-10 09:52:05 +00:00
return service . NewClient ( serviceURL , creds , nil )
default :
return nil , errNoCredentials
2022-10-05 21:10:00 +00:00
}
}
// MustStop stops fs.
func ( fs * FS ) MustStop ( ) {
fs . client = nil
}
// String returns human-readable description for fs.
func ( fs * FS ) String ( ) string {
return fmt . Sprintf ( "AZBlob{container: %q, dir: %q}" , fs . Container , fs . Dir )
}
// ListParts returns all the parts for fs.
func ( fs * FS ) ListParts ( ) ( [ ] common . Part , error ) {
dir := fs . Dir
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
opts := & azblob . ListBlobsFlatOptions {
2022-10-05 21:10:00 +00:00
Prefix : & dir ,
}
2022-10-06 21:55:36 +00:00
pager := fs . client . NewListBlobsFlatPager ( opts )
2022-10-05 21:10:00 +00:00
var parts [ ] common . Part
2022-10-06 21:55:36 +00:00
for pager . More ( ) {
resp , err := pager . NextPage ( ctx )
if err != nil {
return nil , fmt . Errorf ( "cannot list blobs at %s (remote path %q): %w" , fs , fs . Container , err )
}
2022-10-05 21:10:00 +00:00
for _ , v := range resp . Segment . BlobItems {
file := * v . Name
if ! strings . HasPrefix ( file , dir ) {
return nil , fmt . Errorf ( "unexpected prefix for AZBlob key %q; want %q" , file , dir )
}
if fscommon . IgnorePath ( file ) {
continue
}
var p common . Part
if ! p . ParseFromRemotePath ( file [ len ( dir ) : ] ) {
2022-10-06 21:57:31 +00:00
logger . Errorf ( "skipping unknown object %q" , file )
2022-10-05 21:10:00 +00:00
continue
}
p . ActualSize = uint64 ( * v . Properties . ContentLength )
parts = append ( parts , p )
}
}
return parts , nil
}
// DeletePart deletes part p from fs.
func ( fs * FS ) DeletePart ( p common . Part ) error {
2023-10-10 12:13:23 +00:00
return fs . delete ( p . RemotePath ( fs . Dir ) )
2022-10-05 21:10:00 +00:00
}
// RemoveEmptyDirs recursively removes empty dirs in fs.
func ( fs * FS ) RemoveEmptyDirs ( ) error {
// Blob storage has no directories, so nothing to remove.
return nil
}
// CopyPart copies p from srcFS to fs.
func ( fs * FS ) CopyPart ( srcFS common . OriginFS , p common . Part ) error {
src , ok := srcFS . ( * FS )
if ! ok {
return fmt . Errorf ( "cannot perform server-side copying from %s to %s: both of them must be AZBlob" , srcFS , fs )
}
2022-10-06 21:55:36 +00:00
sbc := src . client . NewBlobClient ( p . RemotePath ( src . Dir ) )
dbc := fs . clientForPart ( p )
2022-10-05 21:10:00 +00:00
2022-10-06 21:55:36 +00:00
ssCopyPermission := sas . BlobPermissions {
2022-10-05 21:10:00 +00:00
Read : true ,
Create : true ,
Write : true ,
}
2022-10-06 21:55:36 +00:00
2023-02-08 17:13:55 +00:00
startTime := time . Now ( ) . Add ( - 10 * time . Minute )
o := & blob . GetSASURLOptions {
StartTime : & startTime ,
}
t , err := sbc . GetSASURL ( ssCopyPermission , time . Now ( ) . Add ( 30 * time . Minute ) , o )
2022-10-05 21:10:00 +00:00
if err != nil {
return fmt . Errorf ( "failed to generate SAS token of src %q: %w" , p . Path , err )
}
ctx := context . Background ( )
2022-12-13 17:32:57 +00:00
// In order to support copy of files larger than 256MB, we need to use the async copy
// Ref: https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob-from-url
_ , err = dbc . StartCopyFromURL ( ctx , t , & blob . StartCopyFromURLOptions { } )
2022-10-05 21:10:00 +00:00
if err != nil {
2022-12-13 17:32:57 +00:00
return fmt . Errorf ( "cannot start async copy %q from %s to %s: %w" , p . Path , src , fs , err )
}
var copyStatus * blob . CopyStatusType
var copyStatusDescription * string
for {
r , err := dbc . GetProperties ( ctx , nil )
if err != nil {
return fmt . Errorf ( "failed to check copy status, cannot get properties of %q at %s: %w" , p . Path , fs , err )
}
// After the copy will be finished status will be changed to success/failed/aborted
// Ref: https://learn.microsoft.com/en-us/rest/api/storageservices/get-blob-properties#response-headers - x-ms-copy-status
if * r . CopyStatus != blob . CopyStatusTypePending {
copyStatus = r . CopyStatus
copyStatusDescription = r . CopyStatusDescription
break
}
time . Sleep ( 5 * time . Second )
}
if * copyStatus != blob . CopyStatusTypeSuccess {
return fmt . Errorf ( "copy of %q from %s to %s failed: expected status %q, received %q (description: %q)" , p . Path , src , fs , blob . CopyStatusTypeSuccess , * copyStatus , * copyStatusDescription )
2022-10-05 21:10:00 +00:00
}
return nil
}
// DownloadPart downloads part p from fs to w.
func ( fs * FS ) DownloadPart ( p common . Part , w io . Writer ) error {
2022-10-06 21:55:36 +00:00
bc := fs . clientForPart ( p )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
r , err := bc . DownloadStream ( ctx , & blob . DownloadStreamOptions { } )
2022-10-05 21:10:00 +00:00
if err != nil {
return fmt . Errorf ( "cannot open reader for %q at %s (remote path %q): %w" , p . Path , fs , bc . URL ( ) , err )
}
2022-10-06 21:55:36 +00:00
body := r . NewRetryReader ( ctx , & azblob . RetryReaderOptions { } )
2022-10-05 21:10:00 +00:00
n , err := io . Copy ( w , body )
if err1 := body . Close ( ) ; err1 != nil && err == nil {
err = err1
}
if err != nil {
return fmt . Errorf ( "cannot download %q from at %s (remote path %q): %w" , p . Path , fs , bc . URL ( ) , err )
}
if uint64 ( n ) != p . Size {
return fmt . Errorf ( "wrong data size downloaded from %q at %s; got %d bytes; want %d bytes" , p . Path , fs , n , p . Size )
}
return nil
}
// UploadPart uploads part p from r to fs.
func ( fs * FS ) UploadPart ( p common . Part , r io . Reader ) error {
2022-10-06 21:55:36 +00:00
bc := fs . clientForPart ( p )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
_ , err := bc . UploadStream ( ctx , r , & blockblob . UploadStreamOptions { } )
2022-10-05 21:10:00 +00:00
if err != nil {
return fmt . Errorf ( "cannot upload data to %q at %s (remote path %q): %w" , p . Path , fs , bc . URL ( ) , err )
}
return nil
}
2022-10-06 21:55:36 +00:00
func ( fs * FS ) clientForPart ( p common . Part ) * blockblob . Client {
2022-10-05 21:10:00 +00:00
path := p . RemotePath ( fs . Dir )
return fs . clientForPath ( path )
}
2022-10-06 21:55:36 +00:00
func ( fs * FS ) clientForPath ( path string ) * blockblob . Client {
bc := fs . client . NewBlockBlobClient ( path )
return bc
2022-10-05 21:10:00 +00:00
}
// DeleteFile deletes filePath at fs if it exists.
//
// The function does nothing if the filePath doesn't exists.
func ( fs * FS ) DeleteFile ( filePath string ) error {
v , err := fs . HasFile ( filePath )
if err != nil {
return err
}
if ! v {
return nil
}
2023-12-04 08:21:29 +00:00
path := path . Join ( fs . Dir , filePath )
2023-10-10 12:13:23 +00:00
return fs . delete ( path )
}
func ( fs * FS ) delete ( path string ) error {
if * common . DeleteAllObjectVersions {
return fs . deleteObjectWithGenerations ( path )
}
return fs . deleteObject ( path )
}
func ( fs * FS ) deleteObjectWithGenerations ( path string ) error {
pager := fs . client . NewListBlobsFlatPager ( & azblob . ListBlobsFlatOptions {
Prefix : & path ,
Include : azblob . ListBlobsInclude {
Versions : true ,
} ,
} )
ctx := context . Background ( )
for pager . More ( ) {
resp , err := pager . NextPage ( ctx )
if err != nil {
return fmt . Errorf ( "cannot list blobs at %s (remote path %q): %w" , path , fs . Container , err )
}
for _ , v := range resp . Segment . BlobItems {
var c * blob . Client
// Either versioning is disabled or we are deleting the current version
if v . VersionID == nil || ( v . VersionID != nil && v . IsCurrentVersion != nil && * v . IsCurrentVersion ) {
c = fs . client . NewBlobClient ( * v . Name )
} else {
c , err = fs . client . NewBlobClient ( * v . Name ) . WithVersionID ( * v . VersionID )
if err != nil {
return fmt . Errorf ( "cannot read blob at %q at %s: %w" , path , fs . Container , err )
}
}
if _ , err := c . Delete ( ctx , nil ) ; err != nil {
return fmt . Errorf ( "cannot delete %q at %s: %w" , path , fs . Container , err )
}
}
2022-10-05 21:10:00 +00:00
}
2023-10-10 12:13:23 +00:00
return nil
}
func ( fs * FS ) deleteObject ( path string ) error {
bc := fs . clientForPath ( path )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
if _ , err := bc . Delete ( ctx , nil ) ; err != nil {
2023-10-10 12:13:23 +00:00
return fmt . Errorf ( "cannot delete %q at %s: %w" , bc . URL ( ) , fs , err )
2022-10-05 21:10:00 +00:00
}
return nil
}
// CreateFile creates filePath at fs and puts data into it.
//
// The file is overwritten if it exists.
func ( fs * FS ) CreateFile ( filePath string , data [ ] byte ) error {
2023-12-04 08:21:29 +00:00
path := path . Join ( fs . Dir , filePath )
2022-10-06 21:55:36 +00:00
bc := fs . clientForPath ( path )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
_ , err := bc . UploadBuffer ( ctx , data , & blockblob . UploadBufferOptions {
Concurrency : 1 ,
2022-10-05 21:10:00 +00:00
} )
if err != nil {
return fmt . Errorf ( "cannot upload %d bytes to %q at %s (remote path %q): %w" , len ( data ) , filePath , fs , bc . URL ( ) , err )
}
return nil
}
2023-02-13 12:27:13 +00:00
// HasFile returns true if filePath exists at fs.
2022-10-05 21:10:00 +00:00
func ( fs * FS ) HasFile ( filePath string ) ( bool , error ) {
2023-12-04 08:21:29 +00:00
path := path . Join ( fs . Dir , filePath )
2022-10-06 21:55:36 +00:00
bc := fs . clientForPath ( path )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
_ , err := bc . GetProperties ( ctx , nil )
var azerr * azcore . ResponseError
if errors . As ( err , & azerr ) {
if azerr . ErrorCode == storageErrorCodeBlobNotFound {
2022-10-05 21:10:00 +00:00
return false , nil
}
2022-12-13 17:32:57 +00:00
logger . Errorf ( "GetProperties(%q) returned %s" , bc . URL ( ) , err )
2022-10-05 21:10:00 +00:00
return false , fmt . Errorf ( "unexpected error when obtaining properties for %q at %s (remote path %q): %w" , filePath , fs , bc . URL ( ) , err )
}
return true , nil
}
2023-08-14 00:17:12 +00:00
// ReadFile returns the content of filePath at fs.
func ( fs * FS ) ReadFile ( filePath string ) ( [ ] byte , error ) {
resp , err := fs . clientForPath ( fs . Dir + filePath ) . DownloadStream ( context . Background ( ) , & blob . DownloadStreamOptions { } )
if err != nil {
return nil , fmt . Errorf ( "cannot download %q at %s (remote dir %q): %w" , filePath , fs , fs . Dir , err )
}
defer resp . Body . Close ( )
b , err := io . ReadAll ( resp . Body )
if err != nil {
return nil , fmt . Errorf ( "cannot read %q at %s (remote dir %q): %w" , filePath , fs , fs . Dir , err )
}
return b , nil
}
2024-07-10 09:52:05 +00:00
// envLookuper is for looking up environment variables. It is
// needed to allow unit tests to provide alternate values since the envtemplate
// package uses a singleton to read all environment variables into memory at
// init time.
type envLookuper func ( name string ) ( string , bool )
func moreThanOne ( vals ... bool ) bool {
var n int
for _ , v := range vals {
if v {
n ++
}
}
return n > 1
}
// cleanDirectory ensures that the directory is properly formatted for Azure
// Blob Storage. It removes any leading slashes and ensures that the directory
// ends with a trailing slash.
func cleanDirectory ( dir string ) string {
for strings . HasPrefix ( dir , "/" ) {
dir = dir [ 1 : ]
}
if ! strings . HasSuffix ( dir , "/" ) {
dir += "/"
}
return dir
}