mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/vmbackup: fix compatibility with latest azure sdk (#461)
This commit is contained in:
parent
ac09a85a8b
commit
a5861407cc
1 changed files with 51 additions and 76 deletions
|
@ -9,7 +9,13 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/common"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/backup/fscommon"
|
||||
|
@ -17,9 +23,10 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
envStorageAcctName = "AZURE_STORAGE_ACCOUNT_NAME"
|
||||
envStorageAccKey = "AZURE_STORAGE_ACCOUNT_KEY"
|
||||
envStorageAccCs = "AZURE_STORAGE_ACCOUNT_CONNECTION_STRING"
|
||||
envStorageAcctName = "AZURE_STORAGE_ACCOUNT_NAME"
|
||||
envStorageAccKey = "AZURE_STORAGE_ACCOUNT_KEY"
|
||||
envStorageAccCs = "AZURE_STORAGE_ACCOUNT_CONNECTION_STRING"
|
||||
storageErrorCodeBlobNotFound = "BlobNotFound"
|
||||
)
|
||||
|
||||
// FS represents filesystem for backups in Azure Blob Storage.
|
||||
|
@ -32,7 +39,7 @@ type FS struct {
|
|||
// Directory in the bucket to write to.
|
||||
Dir string
|
||||
|
||||
client *azblob.ContainerClient
|
||||
client *container.Client
|
||||
}
|
||||
|
||||
// Init initializes fs.
|
||||
|
@ -50,10 +57,10 @@ func (fs *FS) Init() error {
|
|||
fs.Dir += "/"
|
||||
}
|
||||
|
||||
var sc *azblob.ServiceClient
|
||||
var sc *service.Client
|
||||
var err error
|
||||
if cs, ok := os.LookupEnv(envStorageAccCs); ok {
|
||||
sc, err = azblob.NewServiceClientFromConnectionString(cs, nil)
|
||||
sc, err = service.NewClientFromConnectionString(cs, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create AZBlob service client from connection string: %w", err)
|
||||
}
|
||||
|
@ -68,7 +75,7 @@ func (fs *FS) Init() error {
|
|||
}
|
||||
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
|
||||
|
||||
sc, err = azblob.NewServiceClientWithSharedKey(serviceURL, creds, nil)
|
||||
sc, err = service.NewClientWithSharedKeyCredential(serviceURL, creds, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create AZBlob service client from account name and key: %w", err)
|
||||
}
|
||||
|
@ -78,11 +85,7 @@ func (fs *FS) Init() error {
|
|||
return fmt.Errorf(`failed to detect any credentials type for AZBlob. Ensure there is connection string set at %q, or shared key at %q and %q`, envStorageAccCs, envStorageAcctName, envStorageAccKey)
|
||||
}
|
||||
|
||||
containerClient, err := sc.NewContainerClient(fs.Container)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create AZBlob container client: %w", err)
|
||||
}
|
||||
|
||||
containerClient := sc.NewContainerClient(fs.Container)
|
||||
fs.client = containerClient
|
||||
|
||||
return nil
|
||||
|
@ -103,14 +106,17 @@ func (fs *FS) ListParts() ([]common.Part, error) {
|
|||
dir := fs.Dir
|
||||
ctx := context.Background()
|
||||
|
||||
opts := &azblob.ContainerListBlobsFlatOptions{
|
||||
opts := &azblob.ListBlobsFlatOptions{
|
||||
Prefix: &dir,
|
||||
}
|
||||
|
||||
pager := fs.client.ListBlobsFlat(opts)
|
||||
pager := fs.client.NewListBlobsFlatPager(opts)
|
||||
var parts []common.Part
|
||||
for pager.NextPage(ctx) {
|
||||
resp := pager.PageResponse()
|
||||
for pager.More() {
|
||||
resp, err := pager.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot list blobs at %s (remote path %q): %w", fs, fs.Container, err)
|
||||
}
|
||||
|
||||
for _, v := range resp.Segment.BlobItems {
|
||||
file := *v.Name
|
||||
|
@ -132,21 +138,14 @@ func (fs *FS) ListParts() ([]common.Part, error) {
|
|||
|
||||
}
|
||||
|
||||
if err := pager.Err(); err != nil {
|
||||
return nil, fmt.Errorf("error when iterating objects at %q: %w", dir, err)
|
||||
}
|
||||
|
||||
return parts, nil
|
||||
}
|
||||
|
||||
// DeletePart deletes part p from fs.
|
||||
func (fs *FS) DeletePart(p common.Part) error {
|
||||
bc, err := fs.clientForPart(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bc := fs.clientForPart(p)
|
||||
ctx := context.Background()
|
||||
if _, err := bc.Delete(ctx, &azblob.BlobDeleteOptions{}); err != nil {
|
||||
if _, err := bc.Delete(ctx, &blob.DeleteOptions{}); err != nil {
|
||||
return fmt.Errorf("cannot delete %q at %s (remote path %q): %w", p.Path, fs, bc.URL(), err)
|
||||
}
|
||||
return nil
|
||||
|
@ -165,29 +164,24 @@ func (fs *FS) CopyPart(srcFS common.OriginFS, p common.Part) error {
|
|||
return fmt.Errorf("cannot perform server-side copying from %s to %s: both of them must be AZBlob", srcFS, fs)
|
||||
}
|
||||
|
||||
sbc, err := src.clientForPart(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize server-side copy of src %q: %w", p.Path, err)
|
||||
}
|
||||
dbc, err := fs.clientForPart(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to initialize server-side copy of dst %q: %w", p.Path, err)
|
||||
}
|
||||
sbc := src.client.NewBlobClient(p.RemotePath(src.Dir))
|
||||
dbc := fs.clientForPart(p)
|
||||
|
||||
ssCopyPermission := azblob.BlobSASPermissions{
|
||||
ssCopyPermission := sas.BlobPermissions{
|
||||
Read: true,
|
||||
Create: true,
|
||||
Write: true,
|
||||
}
|
||||
t, err := sbc.GetSASToken(ssCopyPermission, time.Now(), time.Now().Add(30*time.Minute))
|
||||
|
||||
t, err := sbc.GetSASURL(ssCopyPermission, time.Now().Add(-10*time.Minute), time.Now().Add(30*time.Minute))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate SAS token of src %q: %w", p.Path, err)
|
||||
}
|
||||
|
||||
srcURL := sbc.URL() + "?" + t.Encode()
|
||||
|
||||
// Hotfix for SDK issue: https://github.com/Azure/azure-sdk-for-go/issues/19245
|
||||
t = strings.Replace(t, "/?", "?", -1)
|
||||
ctx := context.Background()
|
||||
_, err = dbc.CopyFromURL(ctx, srcURL, &azblob.BlockBlobCopyFromURLOptions{})
|
||||
_, err = dbc.CopyFromURL(ctx, t, &blob.CopyFromURLOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot copy %q from %s to %s: %w", p.Path, src, fs, err)
|
||||
}
|
||||
|
@ -197,18 +191,15 @@ func (fs *FS) CopyPart(srcFS common.OriginFS, p common.Part) error {
|
|||
|
||||
// DownloadPart downloads part p from fs to w.
|
||||
func (fs *FS) DownloadPart(p common.Part, w io.Writer) error {
|
||||
bc, err := fs.clientForPart(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bc := fs.clientForPart(p)
|
||||
|
||||
ctx := context.Background()
|
||||
r, err := bc.Download(ctx, &azblob.BlobDownloadOptions{})
|
||||
r, err := bc.DownloadStream(ctx, &blob.DownloadStreamOptions{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot open reader for %q at %s (remote path %q): %w", p.Path, fs, bc.URL(), err)
|
||||
}
|
||||
|
||||
body := r.Body(&azblob.RetryReaderOptions{})
|
||||
body := r.NewRetryReader(ctx, &azblob.RetryReaderOptions{})
|
||||
n, err := io.Copy(w, body)
|
||||
if err1 := body.Close(); err1 != nil && err == nil {
|
||||
err = err1
|
||||
|
@ -224,13 +215,10 @@ func (fs *FS) DownloadPart(p common.Part, w io.Writer) error {
|
|||
|
||||
// UploadPart uploads part p from r to fs.
|
||||
func (fs *FS) UploadPart(p common.Part, r io.Reader) error {
|
||||
bc, err := fs.clientForPart(p)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bc := fs.clientForPart(p)
|
||||
|
||||
ctx := context.Background()
|
||||
_, err = bc.UploadStream(ctx, r, azblob.UploadStreamOptions{})
|
||||
_, err := bc.UploadStream(ctx, r, &blockblob.UploadStreamOptions{})
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot upload data to %q at %s (remote path %q): %w", p.Path, fs, bc.URL(), err)
|
||||
|
@ -239,19 +227,15 @@ func (fs *FS) UploadPart(p common.Part, r io.Reader) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (fs *FS) clientForPart(p common.Part) (*azblob.BlockBlobClient, error) {
|
||||
func (fs *FS) clientForPart(p common.Part) *blockblob.Client {
|
||||
path := p.RemotePath(fs.Dir)
|
||||
|
||||
return fs.clientForPath(path)
|
||||
}
|
||||
|
||||
func (fs *FS) clientForPath(path string) (*azblob.BlockBlobClient, error) {
|
||||
bc, err := fs.client.NewBlockBlobClient(path)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unexpected error when creating client for blob %q: %w", path, err)
|
||||
}
|
||||
|
||||
return bc, nil
|
||||
func (fs *FS) clientForPath(path string) *blockblob.Client {
|
||||
bc := fs.client.NewBlockBlobClient(path)
|
||||
return bc
|
||||
}
|
||||
|
||||
// DeleteFile deletes filePath at fs if it exists.
|
||||
|
@ -267,7 +251,7 @@ func (fs *FS) DeleteFile(filePath string) error {
|
|||
}
|
||||
|
||||
path := fs.Dir + filePath
|
||||
bc, err := fs.clientForPath(path)
|
||||
bc := fs.clientForPath(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -284,16 +268,12 @@ func (fs *FS) DeleteFile(filePath string) error {
|
|||
// The file is overwritten if it exists.
|
||||
func (fs *FS) CreateFile(filePath string, data []byte) error {
|
||||
path := fs.Dir + filePath
|
||||
bc, err := fs.clientForPath(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bc := fs.clientForPath(path)
|
||||
|
||||
ctx := context.Background()
|
||||
r, err := bc.UploadBuffer(ctx, data, azblob.UploadOption{
|
||||
Parallelism: 1,
|
||||
_, err := bc.UploadBuffer(ctx, data, &blockblob.UploadBufferOptions{
|
||||
Concurrency: 1,
|
||||
})
|
||||
defer func() { _ = r.Body.Close() }()
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot upload %d bytes to %q at %s (remote path %q): %w", len(data), filePath, fs, bc.URL(), err)
|
||||
|
@ -306,19 +286,14 @@ func (fs *FS) CreateFile(filePath string, data []byte) error {
|
|||
func (fs *FS) HasFile(filePath string) (bool, error) {
|
||||
path := fs.Dir + filePath
|
||||
|
||||
bc, err := fs.clientForPath(path)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
bc := fs.clientForPath(path)
|
||||
|
||||
ctx := context.Background()
|
||||
_, err = bc.GetProperties(ctx, nil)
|
||||
|
||||
var azerr *azblob.InternalError
|
||||
var sterr *azblob.StorageError
|
||||
|
||||
if errors.As(err, &azerr) && azerr.As(&sterr) {
|
||||
if sterr.ErrorCode == azblob.StorageErrorCodeBlobNotFound {
|
||||
_, err := bc.GetProperties(ctx, nil)
|
||||
logger.Infof("GetProperties(%q) returned %w", bc.URL(), err)
|
||||
var azerr *azcore.ResponseError
|
||||
if errors.As(err, &azerr) {
|
||||
if azerr.ErrorCode == storageErrorCodeBlobNotFound {
|
||||
return false, nil
|
||||
}
|
||||
return false, fmt.Errorf("unexpected error when obtaining properties for %q at %s (remote path %q): %w", filePath, fs, bc.URL(), err)
|
||||
|
|
Loading…
Reference in a new issue