mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2025-01-10 15:14:09 +00:00
296 lines
7 KiB
Go
296 lines
7 KiB
Go
package gozstd
|
|
|
|
/*
|
|
#define ZSTD_STATIC_LINKING_ONLY
|
|
#include "zstd.h"
|
|
|
|
#include <stdlib.h> // for malloc/free
|
|
*/
|
|
import "C"
|
|
|
|
import (
|
|
"fmt"
|
|
"io"
|
|
"runtime"
|
|
"unsafe"
|
|
)
|
|
|
|
var (
|
|
cstreamInBufSize = C.ZSTD_CStreamInSize()
|
|
cstreamOutBufSize = C.ZSTD_CStreamOutSize()
|
|
)
|
|
|
|
type cMemPtr *[1 << 30]byte
|
|
|
|
// Writer implements zstd writer.
|
|
type Writer struct {
|
|
w io.Writer
|
|
compressionLevel int
|
|
cs *C.ZSTD_CStream
|
|
cd *CDict
|
|
|
|
inBuf *C.ZSTD_inBuffer
|
|
outBuf *C.ZSTD_outBuffer
|
|
|
|
inBufGo cMemPtr
|
|
outBufGo cMemPtr
|
|
}
|
|
|
|
// NewWriter returns new zstd writer writing compressed data to w.
|
|
//
|
|
// The returned writer must be closed with Close call in order
|
|
// to finalize the compressed stream.
|
|
//
|
|
// Call Release when the Writer is no longer needed.
|
|
func NewWriter(w io.Writer) *Writer {
|
|
return newWriterDictLevel(w, nil, DefaultCompressionLevel)
|
|
}
|
|
|
|
// NewWriterLevel returns new zstd writer writing compressed data to w
|
|
// at the given compression level.
|
|
//
|
|
// The returned writer must be closed with Close call in order
|
|
// to finalize the compressed stream.
|
|
//
|
|
// Call Release when the Writer is no longer needed.
|
|
func NewWriterLevel(w io.Writer, compressionLevel int) *Writer {
|
|
return newWriterDictLevel(w, nil, compressionLevel)
|
|
}
|
|
|
|
// NewWriterDict returns new zstd writer writing compressed data to w
|
|
// using the given cd.
|
|
//
|
|
// The returned writer must be closed with Close call in order
|
|
// to finalize the compressed stream.
|
|
//
|
|
// Call Release when the Writer is no longer needed.
|
|
func NewWriterDict(w io.Writer, cd *CDict) *Writer {
|
|
return newWriterDictLevel(w, cd, 0)
|
|
}
|
|
|
|
func newWriterDictLevel(w io.Writer, cd *CDict, compressionLevel int) *Writer {
|
|
cs := C.ZSTD_createCStream()
|
|
initCStream(cs, cd, compressionLevel)
|
|
|
|
inBuf := (*C.ZSTD_inBuffer)(C.malloc(C.sizeof_ZSTD_inBuffer))
|
|
inBuf.src = C.malloc(cstreamInBufSize)
|
|
inBuf.size = 0
|
|
inBuf.pos = 0
|
|
|
|
outBuf := (*C.ZSTD_outBuffer)(C.malloc(C.sizeof_ZSTD_outBuffer))
|
|
outBuf.dst = C.malloc(cstreamOutBufSize)
|
|
outBuf.size = cstreamOutBufSize
|
|
outBuf.pos = 0
|
|
|
|
zw := &Writer{
|
|
w: w,
|
|
compressionLevel: compressionLevel,
|
|
cs: cs,
|
|
cd: cd,
|
|
inBuf: inBuf,
|
|
outBuf: outBuf,
|
|
}
|
|
|
|
zw.inBufGo = cMemPtr(zw.inBuf.src)
|
|
zw.outBufGo = cMemPtr(zw.outBuf.dst)
|
|
|
|
runtime.SetFinalizer(zw, freeCStream)
|
|
return zw
|
|
}
|
|
|
|
// Reset resets zw to write to w using the given dictionary cd and the given
|
|
// compressionLevel.
|
|
func (zw *Writer) Reset(w io.Writer, cd *CDict, compressionLevel int) {
|
|
zw.inBuf.size = 0
|
|
zw.inBuf.pos = 0
|
|
zw.outBuf.size = cstreamOutBufSize
|
|
zw.outBuf.pos = 0
|
|
|
|
zw.cd = cd
|
|
initCStream(zw.cs, zw.cd, compressionLevel)
|
|
|
|
zw.w = w
|
|
}
|
|
|
|
func initCStream(cs *C.ZSTD_CStream, cd *CDict, compressionLevel int) {
|
|
if cd != nil {
|
|
result := C.ZSTD_initCStream_usingCDict(cs, cd.p)
|
|
ensureNoError("ZSTD_initCStream_usingCDict", result)
|
|
} else {
|
|
result := C.ZSTD_initCStream(cs, C.int(compressionLevel))
|
|
ensureNoError("ZSTD_initCStream", result)
|
|
}
|
|
}
|
|
|
|
func freeCStream(v interface{}) {
|
|
v.(*Writer).Release()
|
|
}
|
|
|
|
// Release releases all the resources occupied by zw.
|
|
//
|
|
// zw cannot be used after the release.
|
|
func (zw *Writer) Release() {
|
|
if zw.cs == nil {
|
|
return
|
|
}
|
|
|
|
result := C.ZSTD_freeCStream(zw.cs)
|
|
ensureNoError("ZSTD_freeCStream", result)
|
|
zw.cs = nil
|
|
|
|
C.free(unsafe.Pointer(zw.inBuf.src))
|
|
C.free(unsafe.Pointer(zw.inBuf))
|
|
zw.inBuf = nil
|
|
|
|
C.free(unsafe.Pointer(zw.outBuf.dst))
|
|
C.free(unsafe.Pointer(zw.outBuf))
|
|
zw.outBuf = nil
|
|
|
|
zw.w = nil
|
|
zw.cd = nil
|
|
}
|
|
|
|
// ReadFrom reads all the data from r and writes it to zw.
|
|
//
|
|
// Returns the number of bytes read from r.
|
|
//
|
|
// ReadFrom may not flush the compressed data to the underlying writer
|
|
// due to performance reasons.
|
|
// Call Flush or Close when the compressed data must propagate
|
|
// to the underlying writer.
|
|
func (zw *Writer) ReadFrom(r io.Reader) (int64, error) {
|
|
nn := int64(0)
|
|
for {
|
|
// Fill the inBuf.
|
|
for zw.inBuf.size < cstreamInBufSize {
|
|
n, err := r.Read(zw.inBufGo[zw.inBuf.size:cstreamInBufSize])
|
|
|
|
// Sometimes n > 0 even when Read() returns an error.
|
|
// This is true especially if the error is io.EOF.
|
|
zw.inBuf.size += C.size_t(n)
|
|
nn += int64(n)
|
|
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
return nn, nil
|
|
}
|
|
return nn, err
|
|
}
|
|
}
|
|
|
|
// Flush the inBuf.
|
|
if err := zw.flushInBuf(); err != nil {
|
|
return nn, err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Write writes p to zw.
|
|
//
|
|
// Write doesn't flush the compressed data to the underlying writer
|
|
// due to performance reasons.
|
|
// Call Flush or Close when the compressed data must propagate
|
|
// to the underlying writer.
|
|
func (zw *Writer) Write(p []byte) (int, error) {
|
|
pLen := len(p)
|
|
if pLen == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
for {
|
|
n := copy(zw.inBufGo[zw.inBuf.size:cstreamInBufSize], p)
|
|
zw.inBuf.size += C.size_t(n)
|
|
p = p[n:]
|
|
if len(p) == 0 {
|
|
// Fast path - just copy the data to input buffer.
|
|
return pLen, nil
|
|
}
|
|
if err := zw.flushInBuf(); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (zw *Writer) flushInBuf() error {
|
|
prevInBufPos := zw.inBuf.pos
|
|
result := C.ZSTD_compressStream(zw.cs, zw.outBuf, zw.inBuf)
|
|
ensureNoError("ZSTD_compressStream", result)
|
|
|
|
// Move the remaining data to the start of inBuf.
|
|
copy(zw.inBufGo[:cstreamInBufSize], zw.inBufGo[zw.inBuf.pos:zw.inBuf.size])
|
|
zw.inBuf.size -= zw.inBuf.pos
|
|
zw.inBuf.pos = 0
|
|
|
|
if zw.outBuf.size-zw.outBuf.pos > zw.outBuf.pos && prevInBufPos != zw.inBuf.pos {
|
|
// There is enough space in outBuf and the last compression
|
|
// succeeded, so don't flush outBuf yet.
|
|
return nil
|
|
}
|
|
|
|
// Flush outBuf, since there is low space in it or the last compression
|
|
// attempt was unsuccessful.
|
|
return zw.flushOutBuf()
|
|
}
|
|
|
|
func (zw *Writer) flushOutBuf() error {
|
|
if zw.outBuf.pos == 0 {
|
|
// Nothing to flush.
|
|
return nil
|
|
}
|
|
|
|
outBuf := zw.outBufGo[:zw.outBuf.pos]
|
|
n, err := zw.w.Write(outBuf)
|
|
zw.outBuf.pos = 0
|
|
if err != nil {
|
|
return fmt.Errorf("cannot flush internal buffer to the underlying writer: %s", err)
|
|
}
|
|
if n != len(outBuf) {
|
|
panic(fmt.Errorf("BUG: the underlying writer violated io.Writer contract and didn't return error after writing incomplete data; written %d bytes; want %d bytes",
|
|
n, len(outBuf)))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Flush flushes the remaining data from zw to the underlying writer.
|
|
func (zw *Writer) Flush() error {
|
|
// Flush inBuf.
|
|
for zw.inBuf.size > 0 {
|
|
if err := zw.flushInBuf(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Flush the internal buffer to outBuf.
|
|
for {
|
|
result := C.ZSTD_flushStream(zw.cs, zw.outBuf)
|
|
ensureNoError("ZSTD_flushStream", result)
|
|
if err := zw.flushOutBuf(); err != nil {
|
|
return err
|
|
}
|
|
if result == 0 {
|
|
// No more data left in the internal buffer.
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close finalizes the compressed stream and flushes all the compressed data
|
|
// to the underlying writer.
|
|
//
|
|
// It doesn't close the underlying writer passed to New* functions.
|
|
func (zw *Writer) Close() error {
|
|
if err := zw.Flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
result := C.ZSTD_endStream(zw.cs, zw.outBuf)
|
|
ensureNoError("ZSTD_endStream", result)
|
|
if err := zw.flushOutBuf(); err != nil {
|
|
return err
|
|
}
|
|
if result == 0 {
|
|
return nil
|
|
}
|
|
}
|
|
}
|