2022-10-05 21:10:00 +00:00
package azremote
import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"
2022-10-06 21:55:36 +00:00
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
2022-10-05 21:10:00 +00:00
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
2022-10-06 21:55:36 +00:00
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
2022-10-05 21:10:00 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
2022-10-26 11:49:20 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/envtemplate"
2022-10-05 21:10:00 +00:00
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
)
const (
2022-10-06 21:55:36 +00:00
envStorageAcctName = "AZURE_STORAGE_ACCOUNT_NAME"
envStorageAccKey = "AZURE_STORAGE_ACCOUNT_KEY"
envStorageAccCs = "AZURE_STORAGE_ACCOUNT_CONNECTION_STRING"
storageErrorCodeBlobNotFound = "BlobNotFound"
2022-10-05 21:10:00 +00:00
)
// FS represents filesystem for backups in Azure Blob Storage.
//
// Init must be called before calling other FS methods.
type FS struct {
// Azure Blob Storage bucket to use.
Container string
// Directory in the bucket to write to.
Dir string
2022-10-06 21:55:36 +00:00
client * container . Client
2022-10-05 21:10:00 +00:00
}
// Init initializes fs.
//
// The returned fs must be stopped when no long needed with MustStop call.
func ( fs * FS ) Init ( ) error {
if fs . client != nil {
logger . Panicf ( "BUG: fs.Init has been already called" )
}
for strings . HasPrefix ( fs . Dir , "/" ) {
fs . Dir = fs . Dir [ 1 : ]
}
if ! strings . HasSuffix ( fs . Dir , "/" ) {
fs . Dir += "/"
}
2022-10-06 21:55:36 +00:00
var sc * service . Client
2022-10-05 21:10:00 +00:00
var err error
2022-10-26 11:49:20 +00:00
if cs , ok := envtemplate . LookupEnv ( envStorageAccCs ) ; ok {
2022-10-06 21:55:36 +00:00
sc , err = service . NewClientFromConnectionString ( cs , nil )
2022-10-05 21:10:00 +00:00
if err != nil {
return fmt . Errorf ( "failed to create AZBlob service client from connection string: %w" , err )
}
}
2022-10-26 11:49:20 +00:00
accountName , ok1 := envtemplate . LookupEnv ( envStorageAcctName )
accountKey , ok2 := envtemplate . LookupEnv ( envStorageAccKey )
2022-10-05 21:10:00 +00:00
if ok1 && ok2 {
creds , err := azblob . NewSharedKeyCredential ( accountName , accountKey )
if err != nil {
return fmt . Errorf ( "failed to create AZBlob credentials from account name and key: %w" , err )
}
serviceURL := fmt . Sprintf ( "https://%s.blob.core.windows.net/" , accountName )
2022-10-06 21:55:36 +00:00
sc , err = service . NewClientWithSharedKeyCredential ( serviceURL , creds , nil )
2022-10-05 21:10:00 +00:00
if err != nil {
return fmt . Errorf ( "failed to create AZBlob service client from account name and key: %w" , err )
}
}
if sc == nil {
return fmt . Errorf ( ` failed to detect any credentials type for AZBlob. Ensure there is connection string set at %q, or shared key at %q and %q ` , envStorageAccCs , envStorageAcctName , envStorageAccKey )
}
2022-10-06 21:55:36 +00:00
containerClient := sc . NewContainerClient ( fs . Container )
2022-10-05 21:10:00 +00:00
fs . client = containerClient
return nil
}
// MustStop stops fs.
func ( fs * FS ) MustStop ( ) {
fs . client = nil
}
// String returns human-readable description for fs.
func ( fs * FS ) String ( ) string {
return fmt . Sprintf ( "AZBlob{container: %q, dir: %q}" , fs . Container , fs . Dir )
}
// ListParts returns all the parts for fs.
func ( fs * FS ) ListParts ( ) ( [ ] common . Part , error ) {
dir := fs . Dir
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
opts := & azblob . ListBlobsFlatOptions {
2022-10-05 21:10:00 +00:00
Prefix : & dir ,
}
2022-10-06 21:55:36 +00:00
pager := fs . client . NewListBlobsFlatPager ( opts )
2022-10-05 21:10:00 +00:00
var parts [ ] common . Part
2022-10-06 21:55:36 +00:00
for pager . More ( ) {
resp , err := pager . NextPage ( ctx )
if err != nil {
return nil , fmt . Errorf ( "cannot list blobs at %s (remote path %q): %w" , fs , fs . Container , err )
}
2022-10-05 21:10:00 +00:00
for _ , v := range resp . Segment . BlobItems {
file := * v . Name
if ! strings . HasPrefix ( file , dir ) {
return nil , fmt . Errorf ( "unexpected prefix for AZBlob key %q; want %q" , file , dir )
}
if fscommon . IgnorePath ( file ) {
continue
}
var p common . Part
if ! p . ParseFromRemotePath ( file [ len ( dir ) : ] ) {
2022-10-06 21:57:31 +00:00
logger . Errorf ( "skipping unknown object %q" , file )
2022-10-05 21:10:00 +00:00
continue
}
p . ActualSize = uint64 ( * v . Properties . ContentLength )
parts = append ( parts , p )
}
}
return parts , nil
}
// DeletePart deletes part p from fs.
func ( fs * FS ) DeletePart ( p common . Part ) error {
2022-10-06 21:55:36 +00:00
bc := fs . clientForPart ( p )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
if _ , err := bc . Delete ( ctx , & blob . DeleteOptions { } ) ; err != nil {
2022-10-05 21:10:00 +00:00
return fmt . Errorf ( "cannot delete %q at %s (remote path %q): %w" , p . Path , fs , bc . URL ( ) , err )
}
return nil
}
// RemoveEmptyDirs recursively removes empty dirs in fs.
func ( fs * FS ) RemoveEmptyDirs ( ) error {
// Blob storage has no directories, so nothing to remove.
return nil
}
// CopyPart copies p from srcFS to fs.
func ( fs * FS ) CopyPart ( srcFS common . OriginFS , p common . Part ) error {
src , ok := srcFS . ( * FS )
if ! ok {
return fmt . Errorf ( "cannot perform server-side copying from %s to %s: both of them must be AZBlob" , srcFS , fs )
}
2022-10-06 21:55:36 +00:00
sbc := src . client . NewBlobClient ( p . RemotePath ( src . Dir ) )
dbc := fs . clientForPart ( p )
2022-10-05 21:10:00 +00:00
2022-10-06 21:55:36 +00:00
ssCopyPermission := sas . BlobPermissions {
2022-10-05 21:10:00 +00:00
Read : true ,
Create : true ,
Write : true ,
}
2022-10-06 21:55:36 +00:00
2023-02-08 17:13:55 +00:00
startTime := time . Now ( ) . Add ( - 10 * time . Minute )
o := & blob . GetSASURLOptions {
StartTime : & startTime ,
}
t , err := sbc . GetSASURL ( ssCopyPermission , time . Now ( ) . Add ( 30 * time . Minute ) , o )
2022-10-05 21:10:00 +00:00
if err != nil {
return fmt . Errorf ( "failed to generate SAS token of src %q: %w" , p . Path , err )
}
ctx := context . Background ( )
2022-12-13 17:32:57 +00:00
// In order to support copy of files larger than 256MB, we need to use the async copy
// Ref: https://learn.microsoft.com/en-us/rest/api/storageservices/copy-blob-from-url
_ , err = dbc . StartCopyFromURL ( ctx , t , & blob . StartCopyFromURLOptions { } )
2022-10-05 21:10:00 +00:00
if err != nil {
2022-12-13 17:32:57 +00:00
return fmt . Errorf ( "cannot start async copy %q from %s to %s: %w" , p . Path , src , fs , err )
}
var copyStatus * blob . CopyStatusType
var copyStatusDescription * string
for {
r , err := dbc . GetProperties ( ctx , nil )
if err != nil {
return fmt . Errorf ( "failed to check copy status, cannot get properties of %q at %s: %w" , p . Path , fs , err )
}
// After the copy will be finished status will be changed to success/failed/aborted
// Ref: https://learn.microsoft.com/en-us/rest/api/storageservices/get-blob-properties#response-headers - x-ms-copy-status
if * r . CopyStatus != blob . CopyStatusTypePending {
copyStatus = r . CopyStatus
copyStatusDescription = r . CopyStatusDescription
break
}
time . Sleep ( 5 * time . Second )
}
if * copyStatus != blob . CopyStatusTypeSuccess {
return fmt . Errorf ( "copy of %q from %s to %s failed: expected status %q, received %q (description: %q)" , p . Path , src , fs , blob . CopyStatusTypeSuccess , * copyStatus , * copyStatusDescription )
2022-10-05 21:10:00 +00:00
}
return nil
}
// DownloadPart downloads part p from fs to w.
func ( fs * FS ) DownloadPart ( p common . Part , w io . Writer ) error {
2022-10-06 21:55:36 +00:00
bc := fs . clientForPart ( p )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
r , err := bc . DownloadStream ( ctx , & blob . DownloadStreamOptions { } )
2022-10-05 21:10:00 +00:00
if err != nil {
return fmt . Errorf ( "cannot open reader for %q at %s (remote path %q): %w" , p . Path , fs , bc . URL ( ) , err )
}
2022-10-06 21:55:36 +00:00
body := r . NewRetryReader ( ctx , & azblob . RetryReaderOptions { } )
2022-10-05 21:10:00 +00:00
n , err := io . Copy ( w , body )
if err1 := body . Close ( ) ; err1 != nil && err == nil {
err = err1
}
if err != nil {
return fmt . Errorf ( "cannot download %q from at %s (remote path %q): %w" , p . Path , fs , bc . URL ( ) , err )
}
if uint64 ( n ) != p . Size {
return fmt . Errorf ( "wrong data size downloaded from %q at %s; got %d bytes; want %d bytes" , p . Path , fs , n , p . Size )
}
return nil
}
// UploadPart uploads part p from r to fs.
func ( fs * FS ) UploadPart ( p common . Part , r io . Reader ) error {
2022-10-06 21:55:36 +00:00
bc := fs . clientForPart ( p )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
_ , err := bc . UploadStream ( ctx , r , & blockblob . UploadStreamOptions { } )
2022-10-05 21:10:00 +00:00
if err != nil {
return fmt . Errorf ( "cannot upload data to %q at %s (remote path %q): %w" , p . Path , fs , bc . URL ( ) , err )
}
return nil
}
2022-10-06 21:55:36 +00:00
func ( fs * FS ) clientForPart ( p common . Part ) * blockblob . Client {
2022-10-05 21:10:00 +00:00
path := p . RemotePath ( fs . Dir )
return fs . clientForPath ( path )
}
2022-10-06 21:55:36 +00:00
func ( fs * FS ) clientForPath ( path string ) * blockblob . Client {
bc := fs . client . NewBlockBlobClient ( path )
return bc
2022-10-05 21:10:00 +00:00
}
// DeleteFile deletes filePath at fs if it exists.
//
// The function does nothing if the filePath doesn't exists.
func ( fs * FS ) DeleteFile ( filePath string ) error {
v , err := fs . HasFile ( filePath )
if err != nil {
return err
}
if ! v {
return nil
}
path := fs . Dir + filePath
2022-10-06 21:55:36 +00:00
bc := fs . clientForPath ( path )
2022-10-05 21:10:00 +00:00
if err != nil {
return err
}
ctx := context . Background ( )
if _ , err := bc . Delete ( ctx , nil ) ; err != nil {
return fmt . Errorf ( "cannot delete %q at %s (remote path %q): %w" , filePath , fs , bc . URL ( ) , err )
}
return nil
}
// CreateFile creates filePath at fs and puts data into it.
//
// The file is overwritten if it exists.
func ( fs * FS ) CreateFile ( filePath string , data [ ] byte ) error {
path := fs . Dir + filePath
2022-10-06 21:55:36 +00:00
bc := fs . clientForPath ( path )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
_ , err := bc . UploadBuffer ( ctx , data , & blockblob . UploadBufferOptions {
Concurrency : 1 ,
2022-10-05 21:10:00 +00:00
} )
if err != nil {
return fmt . Errorf ( "cannot upload %d bytes to %q at %s (remote path %q): %w" , len ( data ) , filePath , fs , bc . URL ( ) , err )
}
return nil
}
// HasFile returns ture if filePath exists at fs.
func ( fs * FS ) HasFile ( filePath string ) ( bool , error ) {
path := fs . Dir + filePath
2022-10-06 21:55:36 +00:00
bc := fs . clientForPath ( path )
2022-10-05 21:10:00 +00:00
ctx := context . Background ( )
2022-10-06 21:55:36 +00:00
_ , err := bc . GetProperties ( ctx , nil )
var azerr * azcore . ResponseError
if errors . As ( err , & azerr ) {
if azerr . ErrorCode == storageErrorCodeBlobNotFound {
2022-10-05 21:10:00 +00:00
return false , nil
}
2022-12-13 17:32:57 +00:00
logger . Errorf ( "GetProperties(%q) returned %s" , bc . URL ( ) , err )
2022-10-05 21:10:00 +00:00
return false , fmt . Errorf ( "unexpected error when obtaining properties for %q at %s (remote path %q): %w" , filePath , fs , bc . URL ( ) , err )
}
return true , nil
}