diff --git a/lib/backup/azremote/azblob.go b/lib/backup/azremote/azblob.go index 1892f121b..9335e5642 100644 --- a/lib/backup/azremote/azblob.go +++ b/lib/backup/azremote/azblob.go @@ -9,7 +9,13 @@ import ( "strings" "time" + "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common" "github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon" @@ -17,9 +23,10 @@ import ( ) const ( - envStorageAcctName = "AZURE_STORAGE_ACCOUNT_NAME" - envStorageAccKey = "AZURE_STORAGE_ACCOUNT_KEY" - envStorageAccCs = "AZURE_STORAGE_ACCOUNT_CONNECTION_STRING" + envStorageAcctName = "AZURE_STORAGE_ACCOUNT_NAME" + envStorageAccKey = "AZURE_STORAGE_ACCOUNT_KEY" + envStorageAccCs = "AZURE_STORAGE_ACCOUNT_CONNECTION_STRING" + storageErrorCodeBlobNotFound = "BlobNotFound" ) // FS represents filesystem for backups in Azure Blob Storage. @@ -32,7 +39,7 @@ type FS struct { // Directory in the bucket to write to. Dir string - client *azblob.ContainerClient + client *container.Client } // Init initializes fs. @@ -50,10 +57,10 @@ func (fs *FS) Init() error { fs.Dir += "/" } - var sc *azblob.ServiceClient + var sc *service.Client var err error if cs, ok := os.LookupEnv(envStorageAccCs); ok { - sc, err = azblob.NewServiceClientFromConnectionString(cs, nil) + sc, err = service.NewClientFromConnectionString(cs, nil) if err != nil { return fmt.Errorf("failed to create AZBlob service client from connection string: %w", err) } @@ -68,7 +75,7 @@ func (fs *FS) Init() error { } serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName) - sc, err = azblob.NewServiceClientWithSharedKey(serviceURL, creds, nil) + sc, err = service.NewClientWithSharedKeyCredential(serviceURL, creds, nil) if err != nil { return fmt.Errorf("failed to create AZBlob service client from account name and key: %w", err) } @@ -78,11 +85,7 @@ func (fs *FS) Init() error { return fmt.Errorf(`failed to detect any credentials type for AZBlob. Ensure there is connection string set at %q, or shared key at %q and %q`, envStorageAccCs, envStorageAcctName, envStorageAccKey) } - containerClient, err := sc.NewContainerClient(fs.Container) - if err != nil { - return fmt.Errorf("failed to create AZBlob container client: %w", err) - } - + containerClient := sc.NewContainerClient(fs.Container) fs.client = containerClient return nil @@ -103,14 +106,17 @@ func (fs *FS) ListParts() ([]common.Part, error) { dir := fs.Dir ctx := context.Background() - opts := &azblob.ContainerListBlobsFlatOptions{ + opts := &azblob.ListBlobsFlatOptions{ Prefix: &dir, } - pager := fs.client.ListBlobsFlat(opts) + pager := fs.client.NewListBlobsFlatPager(opts) var parts []common.Part - for pager.NextPage(ctx) { - resp := pager.PageResponse() + for pager.More() { + resp, err := pager.NextPage(ctx) + if err != nil { + return nil, fmt.Errorf("cannot list blobs at %s (remote path %q): %w", fs, fs.Container, err) + } for _, v := range resp.Segment.BlobItems { file := *v.Name @@ -132,21 +138,14 @@ func (fs *FS) ListParts() ([]common.Part, error) { } - if err := pager.Err(); err != nil { - return nil, fmt.Errorf("error when iterating objects at %q: %w", dir, err) - } - return parts, nil } // DeletePart deletes part p from fs. func (fs *FS) DeletePart(p common.Part) error { - bc, err := fs.clientForPart(p) - if err != nil { - return err - } + bc := fs.clientForPart(p) ctx := context.Background() - if _, err := bc.Delete(ctx, &azblob.BlobDeleteOptions{}); err != nil { + if _, err := bc.Delete(ctx, &blob.DeleteOptions{}); err != nil { return fmt.Errorf("cannot delete %q at %s (remote path %q): %w", p.Path, fs, bc.URL(), err) } return nil @@ -165,29 +164,24 @@ func (fs *FS) CopyPart(srcFS common.OriginFS, p common.Part) error { return fmt.Errorf("cannot perform server-side copying from %s to %s: both of them must be AZBlob", srcFS, fs) } - sbc, err := src.clientForPart(p) - if err != nil { - return fmt.Errorf("failed to initialize server-side copy of src %q: %w", p.Path, err) - } - dbc, err := fs.clientForPart(p) - if err != nil { - return fmt.Errorf("failed to initialize server-side copy of dst %q: %w", p.Path, err) - } + sbc := src.client.NewBlobClient(p.RemotePath(src.Dir)) + dbc := fs.clientForPart(p) - ssCopyPermission := azblob.BlobSASPermissions{ + ssCopyPermission := sas.BlobPermissions{ Read: true, Create: true, Write: true, } - t, err := sbc.GetSASToken(ssCopyPermission, time.Now(), time.Now().Add(30*time.Minute)) + + t, err := sbc.GetSASURL(ssCopyPermission, time.Now().Add(-10*time.Minute), time.Now().Add(30*time.Minute)) if err != nil { return fmt.Errorf("failed to generate SAS token of src %q: %w", p.Path, err) } - srcURL := sbc.URL() + "?" + t.Encode() - + // Hotfix for SDK issue: https://github.com/Azure/azure-sdk-for-go/issues/19245 + t = strings.Replace(t, "/?", "?", -1) ctx := context.Background() - _, err = dbc.CopyFromURL(ctx, srcURL, &azblob.BlockBlobCopyFromURLOptions{}) + _, err = dbc.CopyFromURL(ctx, t, &blob.CopyFromURLOptions{}) if err != nil { return fmt.Errorf("cannot copy %q from %s to %s: %w", p.Path, src, fs, err) } @@ -197,18 +191,15 @@ func (fs *FS) CopyPart(srcFS common.OriginFS, p common.Part) error { // DownloadPart downloads part p from fs to w. func (fs *FS) DownloadPart(p common.Part, w io.Writer) error { - bc, err := fs.clientForPart(p) - if err != nil { - return err - } + bc := fs.clientForPart(p) ctx := context.Background() - r, err := bc.Download(ctx, &azblob.BlobDownloadOptions{}) + r, err := bc.DownloadStream(ctx, &blob.DownloadStreamOptions{}) if err != nil { return fmt.Errorf("cannot open reader for %q at %s (remote path %q): %w", p.Path, fs, bc.URL(), err) } - body := r.Body(&azblob.RetryReaderOptions{}) + body := r.NewRetryReader(ctx, &azblob.RetryReaderOptions{}) n, err := io.Copy(w, body) if err1 := body.Close(); err1 != nil && err == nil { err = err1 @@ -224,13 +215,10 @@ func (fs *FS) DownloadPart(p common.Part, w io.Writer) error { // UploadPart uploads part p from r to fs. func (fs *FS) UploadPart(p common.Part, r io.Reader) error { - bc, err := fs.clientForPart(p) - if err != nil { - return err - } + bc := fs.clientForPart(p) ctx := context.Background() - _, err = bc.UploadStream(ctx, r, azblob.UploadStreamOptions{}) + _, err := bc.UploadStream(ctx, r, &blockblob.UploadStreamOptions{}) if err != nil { return fmt.Errorf("cannot upload data to %q at %s (remote path %q): %w", p.Path, fs, bc.URL(), err) @@ -239,19 +227,15 @@ func (fs *FS) UploadPart(p common.Part, r io.Reader) error { return nil } -func (fs *FS) clientForPart(p common.Part) (*azblob.BlockBlobClient, error) { +func (fs *FS) clientForPart(p common.Part) *blockblob.Client { path := p.RemotePath(fs.Dir) return fs.clientForPath(path) } -func (fs *FS) clientForPath(path string) (*azblob.BlockBlobClient, error) { - bc, err := fs.client.NewBlockBlobClient(path) - if err != nil { - return nil, fmt.Errorf("unexpected error when creating client for blob %q: %w", path, err) - } - - return bc, nil +func (fs *FS) clientForPath(path string) *blockblob.Client { + bc := fs.client.NewBlockBlobClient(path) + return bc } // DeleteFile deletes filePath at fs if it exists. @@ -267,7 +251,7 @@ func (fs *FS) DeleteFile(filePath string) error { } path := fs.Dir + filePath - bc, err := fs.clientForPath(path) + bc := fs.clientForPath(path) if err != nil { return err } @@ -284,16 +268,12 @@ func (fs *FS) DeleteFile(filePath string) error { // The file is overwritten if it exists. func (fs *FS) CreateFile(filePath string, data []byte) error { path := fs.Dir + filePath - bc, err := fs.clientForPath(path) - if err != nil { - return err - } + bc := fs.clientForPath(path) ctx := context.Background() - r, err := bc.UploadBuffer(ctx, data, azblob.UploadOption{ - Parallelism: 1, + _, err := bc.UploadBuffer(ctx, data, &blockblob.UploadBufferOptions{ + Concurrency: 1, }) - defer func() { _ = r.Body.Close() }() if err != nil { return fmt.Errorf("cannot upload %d bytes to %q at %s (remote path %q): %w", len(data), filePath, fs, bc.URL(), err) @@ -306,19 +286,14 @@ func (fs *FS) CreateFile(filePath string, data []byte) error { func (fs *FS) HasFile(filePath string) (bool, error) { path := fs.Dir + filePath - bc, err := fs.clientForPath(path) - if err != nil { - return false, err - } + bc := fs.clientForPath(path) ctx := context.Background() - _, err = bc.GetProperties(ctx, nil) - - var azerr *azblob.InternalError - var sterr *azblob.StorageError - - if errors.As(err, &azerr) && azerr.As(&sterr) { - if sterr.ErrorCode == azblob.StorageErrorCodeBlobNotFound { + _, err := bc.GetProperties(ctx, nil) + logger.Infof("GetProperties(%q) returned %w", bc.URL(), err) + var azerr *azcore.ResponseError + if errors.As(err, &azerr) { + if azerr.ErrorCode == storageErrorCodeBlobNotFound { return false, nil } return false, fmt.Errorf("unexpected error when obtaining properties for %q at %s (remote path %q): %w", filePath, fs, bc.URL(), err)