mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-12-01 14:47:38 +00:00
92b9b13df1
Unpack the full columnsHeader block instead of unpacking meta-information per each individual column
when the query, which selects all the columns, is executed. This improves performance when scanning
logs with big number of fields.
(cherry picked from commit 2023f017b1
)
579 lines
16 KiB
Go
579 lines
16 KiB
Go
package logstorage
|
|
|
|
import (
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
|
)
|
|
|
|
// The number of blocks to search at once by a single worker
|
|
//
|
|
// This number must be increased on systems with many CPU cores in order to amortize
|
|
// the overhead for passing the blockSearchWork to worker goroutines.
|
|
const blockSearchWorksPerBatch = 64
|
|
|
|
type blockSearchWork struct {
|
|
// p is the part where the block belongs to.
|
|
p *part
|
|
|
|
// so contains search options for the block search.
|
|
so *searchOptions
|
|
|
|
// bh is the header of the block to search.
|
|
bh blockHeader
|
|
}
|
|
|
|
func (bsw *blockSearchWork) reset() {
|
|
bsw.p = nil
|
|
bsw.so = nil
|
|
bsw.bh.reset()
|
|
}
|
|
|
|
type blockSearchWorkBatch struct {
|
|
bsws []blockSearchWork
|
|
}
|
|
|
|
func (bswb *blockSearchWorkBatch) reset() {
|
|
bsws := bswb.bsws
|
|
for i := range bsws {
|
|
bsws[i].reset()
|
|
}
|
|
bswb.bsws = bsws[:0]
|
|
}
|
|
|
|
func getBlockSearchWorkBatch() *blockSearchWorkBatch {
|
|
v := blockSearchWorkBatchPool.Get()
|
|
if v == nil {
|
|
return &blockSearchWorkBatch{
|
|
bsws: make([]blockSearchWork, 0, blockSearchWorksPerBatch),
|
|
}
|
|
}
|
|
return v.(*blockSearchWorkBatch)
|
|
}
|
|
|
|
func putBlockSearchWorkBatch(bswb *blockSearchWorkBatch) {
|
|
bswb.reset()
|
|
blockSearchWorkBatchPool.Put(bswb)
|
|
}
|
|
|
|
var blockSearchWorkBatchPool sync.Pool
|
|
|
|
func (bswb *blockSearchWorkBatch) appendBlockSearchWork(p *part, so *searchOptions, bh *blockHeader) bool {
|
|
bsws := bswb.bsws
|
|
|
|
bsws = append(bsws, blockSearchWork{
|
|
p: p,
|
|
so: so,
|
|
})
|
|
bsw := &bsws[len(bsws)-1]
|
|
bsw.bh.copyFrom(bh)
|
|
|
|
bswb.bsws = bsws
|
|
|
|
return len(bsws) < cap(bsws)
|
|
}
|
|
|
|
func getBlockSearch() *blockSearch {
|
|
v := blockSearchPool.Get()
|
|
if v == nil {
|
|
return &blockSearch{}
|
|
}
|
|
return v.(*blockSearch)
|
|
}
|
|
|
|
func putBlockSearch(bs *blockSearch) {
|
|
bs.reset()
|
|
|
|
// reset seenStreams before returning bs to the pool in order to reduce memory usage.
|
|
bs.seenStreams = nil
|
|
|
|
blockSearchPool.Put(bs)
|
|
}
|
|
|
|
var blockSearchPool sync.Pool
|
|
|
|
type blockSearch struct {
|
|
// bsw is the actual work to perform on the given block pointed by bsw.ph
|
|
bsw *blockSearchWork
|
|
|
|
// br contains result for the search in the block after search() call
|
|
br blockResult
|
|
|
|
// timestampsCache contains cached timestamps for the given block.
|
|
timestampsCache *encoding.Int64s
|
|
|
|
// bloomFilterCache contains cached bloom filters for requested columns in the given block
|
|
bloomFilterCache map[string]*bloomFilter
|
|
|
|
// valuesCache contains cached values for requested columns in the given block
|
|
valuesCache map[string]*stringBucket
|
|
|
|
// sbu is used for unmarshaling local columns
|
|
sbu stringsBlockUnmarshaler
|
|
|
|
// cshIndexBlockCache holds columnsHeaderIndex data for the given block.
|
|
//
|
|
// It is initialized lazily by calling getColumnsHeaderIndex().
|
|
cshIndexBlockCache []byte
|
|
|
|
// cshBlockCache holds columnsHeader data for the given block.
|
|
//
|
|
// It is initialized lazily by calling getColumnsHeaderBlock().
|
|
cshBlockCache []byte
|
|
cshBlockInitialized bool
|
|
|
|
// ccsCache is the cache for accessed const columns
|
|
ccsCache []Field
|
|
|
|
// chsCache is the cache for accessed column headers
|
|
chsCache []columnHeader
|
|
|
|
// cshIndexCache is the columnsHeaderIndex associated with the given block.
|
|
//
|
|
// It is initialized lazily by calling getColumnsHeaderIndex().
|
|
cshIndexCache *columnsHeaderIndex
|
|
|
|
// cshCache is the columnsHeader associated with the given block.
|
|
//
|
|
// It is initialized lazily by calling getColumnsHeader().
|
|
cshCache *columnsHeader
|
|
|
|
// seenStreams contains seen streamIDs for the recent searches.
|
|
//
|
|
// It is used for speeding up fetching _stream column.
|
|
seenStreams map[u128]string
|
|
}
|
|
|
|
func (bs *blockSearch) reset() {
|
|
bs.bsw = nil
|
|
bs.br.reset()
|
|
|
|
if bs.timestampsCache != nil {
|
|
encoding.PutInt64s(bs.timestampsCache)
|
|
bs.timestampsCache = nil
|
|
}
|
|
|
|
bloomFilterCache := bs.bloomFilterCache
|
|
for k, bf := range bloomFilterCache {
|
|
putBloomFilter(bf)
|
|
delete(bloomFilterCache, k)
|
|
}
|
|
|
|
valuesCache := bs.valuesCache
|
|
for k, values := range valuesCache {
|
|
putStringBucket(values)
|
|
delete(valuesCache, k)
|
|
}
|
|
|
|
bs.sbu.reset()
|
|
|
|
bs.cshIndexBlockCache = bs.cshIndexBlockCache[:0]
|
|
|
|
bs.cshBlockCache = bs.cshBlockCache[:0]
|
|
bs.cshBlockInitialized = false
|
|
|
|
ccsCache := bs.ccsCache
|
|
for i := range ccsCache {
|
|
ccsCache[i].Reset()
|
|
}
|
|
bs.ccsCache = ccsCache[:0]
|
|
|
|
chsCache := bs.chsCache
|
|
for i := range chsCache {
|
|
chsCache[i].reset()
|
|
}
|
|
bs.chsCache = chsCache[:0]
|
|
|
|
if bs.cshIndexCache != nil {
|
|
putColumnsHeaderIndex(bs.cshIndexCache)
|
|
bs.cshIndexCache = nil
|
|
}
|
|
|
|
if bs.cshCache != nil {
|
|
putColumnsHeader(bs.cshCache)
|
|
bs.cshCache = nil
|
|
}
|
|
|
|
// Do not reset seenStreams, since its' lifetime is managed by blockResult.addStreamColumn() code.
|
|
}
|
|
|
|
func (bs *blockSearch) partPath() string {
|
|
return bs.bsw.p.path
|
|
}
|
|
|
|
func (bs *blockSearch) search(bsw *blockSearchWork, bm *bitmap) {
|
|
bs.reset()
|
|
|
|
bs.bsw = bsw
|
|
|
|
// search rows matching the given filter
|
|
bm.init(int(bsw.bh.rowsCount))
|
|
bm.setBits()
|
|
bs.bsw.so.filter.applyToBlockSearch(bs, bm)
|
|
|
|
if bm.isZero() {
|
|
// The filter doesn't match any logs in the current block.
|
|
return
|
|
}
|
|
|
|
bs.br.mustInit(bs, bm)
|
|
|
|
// fetch the requested columns to bs.br.
|
|
if bs.bsw.so.needAllColumns {
|
|
bs.br.initAllColumns()
|
|
} else {
|
|
bs.br.initRequestedColumns()
|
|
}
|
|
}
|
|
|
|
func (bs *blockSearch) partFormatVersion() uint {
|
|
return bs.bsw.p.ph.FormatVersion
|
|
}
|
|
|
|
func (bs *blockSearch) getConstColumnValue(name string) string {
|
|
if name == "_msg" {
|
|
name = ""
|
|
}
|
|
|
|
if bs.partFormatVersion() < 1 {
|
|
csh := bs.getColumnsHeader()
|
|
for _, cc := range csh.constColumns {
|
|
if cc.Name == name {
|
|
return cc.Value
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
columnNameID, ok := bs.getColumnNameID(name)
|
|
if !ok {
|
|
return ""
|
|
}
|
|
|
|
for i := range bs.ccsCache {
|
|
if bs.ccsCache[i].Name == name {
|
|
return bs.ccsCache[i].Value
|
|
}
|
|
}
|
|
|
|
cshIndex := bs.getColumnsHeaderIndex()
|
|
for _, cr := range cshIndex.constColumnsRefs {
|
|
if cr.columnNameID != columnNameID {
|
|
continue
|
|
}
|
|
|
|
b := bs.getColumnsHeaderBlock()
|
|
if cr.offset > uint64(len(b)) {
|
|
logger.Panicf("FATAL: %s: header offset for const column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
|
|
}
|
|
b = b[cr.offset:]
|
|
bs.ccsCache = slicesutil.SetLength(bs.ccsCache, len(bs.ccsCache)+1)
|
|
cc := &bs.ccsCache[len(bs.ccsCache)-1]
|
|
if _, err := cc.unmarshalNoArena(b, false); err != nil {
|
|
logger.Panicf("FATAL: %s: cannot unmarshal header for const column %q: %s", bs.bsw.p.path, name, err)
|
|
}
|
|
cc.Name = strings.Clone(name)
|
|
return cc.Value
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (bs *blockSearch) getColumnHeader(name string) *columnHeader {
|
|
if name == "_msg" {
|
|
name = ""
|
|
}
|
|
|
|
if bs.partFormatVersion() < 1 {
|
|
csh := bs.getColumnsHeader()
|
|
chs := csh.columnHeaders
|
|
for i := range chs {
|
|
ch := &chs[i]
|
|
if ch.name == name {
|
|
return ch
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
columnNameID, ok := bs.getColumnNameID(name)
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
for i := range bs.chsCache {
|
|
if bs.chsCache[i].name == name {
|
|
return &bs.chsCache[i]
|
|
}
|
|
}
|
|
|
|
cshIndex := bs.getColumnsHeaderIndex()
|
|
for _, cr := range cshIndex.columnHeadersRefs {
|
|
if cr.columnNameID != columnNameID {
|
|
continue
|
|
}
|
|
|
|
b := bs.getColumnsHeaderBlock()
|
|
if cr.offset > uint64(len(b)) {
|
|
logger.Panicf("FATAL: %s: header offset for column %q cannot exceed %d bytes; got %d bytes", bs.bsw.p.path, name, len(b), cr.offset)
|
|
}
|
|
b = b[cr.offset:]
|
|
bs.chsCache = slicesutil.SetLength(bs.chsCache, len(bs.chsCache)+1)
|
|
ch := &bs.chsCache[len(bs.chsCache)-1]
|
|
if _, err := ch.unmarshalNoArena(b, partFormatLatestVersion); err != nil {
|
|
logger.Panicf("FATAL: %s: cannot unmarshal header for column %q: %s", bs.bsw.p.path, name, err)
|
|
}
|
|
ch.name = strings.Clone(name)
|
|
return ch
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (bs *blockSearch) getColumnNameID(name string) (uint64, bool) {
|
|
id, ok := bs.bsw.p.columnNameIDs[name]
|
|
return id, ok
|
|
}
|
|
|
|
func (bs *blockSearch) getColumnsHeaderIndex() *columnsHeaderIndex {
|
|
if bs.partFormatVersion() < 1 {
|
|
logger.Panicf("BUG: getColumnsHeaderIndex() can be called only for part encoding v1+, while it has been called for v%d", bs.partFormatVersion())
|
|
}
|
|
|
|
if bs.cshIndexCache == nil {
|
|
bs.cshIndexBlockCache = readColumnsHeaderIndexBlock(bs.cshIndexBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
|
|
|
|
bs.cshIndexCache = getColumnsHeaderIndex()
|
|
if err := bs.cshIndexCache.unmarshalNoArena(bs.cshIndexBlockCache); err != nil {
|
|
logger.Panicf("FATAL: %s: cannot unmarshal columns header index: %s", bs.bsw.p.path, err)
|
|
}
|
|
}
|
|
return bs.cshIndexCache
|
|
}
|
|
|
|
func (bs *blockSearch) getColumnsHeader() *columnsHeader {
|
|
if bs.cshCache == nil {
|
|
b := bs.getColumnsHeaderBlock()
|
|
|
|
csh := getColumnsHeader()
|
|
partFormatVersion := bs.partFormatVersion()
|
|
if err := csh.unmarshalNoArena(b, partFormatVersion); err != nil {
|
|
logger.Panicf("FATAL: %s: cannot unmarshal columns header: %s", bs.bsw.p.path, err)
|
|
}
|
|
if partFormatVersion >= 1 {
|
|
cshIndex := bs.getColumnsHeaderIndex()
|
|
if err := csh.setColumnNames(cshIndex, bs.bsw.p.columnNames); err != nil {
|
|
logger.Panicf("FATAL: %s: %s", bs.bsw.p.path, err)
|
|
}
|
|
}
|
|
|
|
bs.cshCache = csh
|
|
}
|
|
return bs.cshCache
|
|
}
|
|
|
|
func (bs *blockSearch) getColumnsHeaderBlock() []byte {
|
|
if !bs.cshBlockInitialized {
|
|
bs.cshBlockCache = readColumnsHeaderBlock(bs.cshBlockCache[:0], bs.bsw.p, &bs.bsw.bh)
|
|
bs.cshBlockInitialized = true
|
|
}
|
|
return bs.cshBlockCache
|
|
}
|
|
|
|
func readColumnsHeaderIndexBlock(dst []byte, p *part, bh *blockHeader) []byte {
|
|
n := bh.columnsHeaderIndexSize
|
|
if n > maxColumnsHeaderIndexSize {
|
|
logger.Panicf("FATAL: %s: columns header index size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderIndexSize, n)
|
|
}
|
|
|
|
dstLen := len(dst)
|
|
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
|
|
p.columnsHeaderIndexFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderIndexOffset))
|
|
|
|
return dst
|
|
}
|
|
|
|
func readColumnsHeaderBlock(dst []byte, p *part, bh *blockHeader) []byte {
|
|
n := bh.columnsHeaderSize
|
|
if n > maxColumnsHeaderSize {
|
|
logger.Panicf("FATAL: %s: columns header size cannot exceed %d bytes; got %d bytes", p.path, maxColumnsHeaderSize, n)
|
|
}
|
|
dstLen := len(dst)
|
|
dst = bytesutil.ResizeNoCopyMayOverallocate(dst, int(n)+dstLen)
|
|
p.columnsHeaderFile.MustReadAt(dst[dstLen:], int64(bh.columnsHeaderOffset))
|
|
return dst
|
|
}
|
|
|
|
// getBloomFilterForColumn returns bloom filter for the given ch.
|
|
//
|
|
// The returned bloom filter belongs to bs, so it becomes invalid after bs reset.
|
|
func (bs *blockSearch) getBloomFilterForColumn(ch *columnHeader) *bloomFilter {
|
|
bf := bs.bloomFilterCache[ch.name]
|
|
if bf != nil {
|
|
return bf
|
|
}
|
|
|
|
p := bs.bsw.p
|
|
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
|
|
|
|
bb := longTermBufPool.Get()
|
|
bloomFilterSize := ch.bloomFilterSize
|
|
if bloomFilterSize > maxBloomFilterBlockSize {
|
|
logger.Panicf("FATAL: %s: bloom filter block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxBloomFilterBlockSize, bloomFilterSize)
|
|
}
|
|
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(bloomFilterSize))
|
|
|
|
bloomValuesFile.bloom.MustReadAt(bb.B, int64(ch.bloomFilterOffset))
|
|
bf = getBloomFilter()
|
|
if err := bf.unmarshal(bb.B); err != nil {
|
|
logger.Panicf("FATAL: %s: cannot unmarshal bloom filter: %s", bs.partPath(), err)
|
|
}
|
|
longTermBufPool.Put(bb)
|
|
|
|
if bs.bloomFilterCache == nil {
|
|
bs.bloomFilterCache = make(map[string]*bloomFilter)
|
|
}
|
|
bs.bloomFilterCache[ch.name] = bf
|
|
return bf
|
|
}
|
|
|
|
// getValuesForColumn returns block values for the given ch.
|
|
//
|
|
// The returned values belong to bs, so they become invalid after bs reset.
|
|
func (bs *blockSearch) getValuesForColumn(ch *columnHeader) []string {
|
|
values := bs.valuesCache[ch.name]
|
|
if values != nil {
|
|
return values.a
|
|
}
|
|
|
|
p := bs.bsw.p
|
|
bloomValuesFile := p.getBloomValuesFileForColumnName(ch.name)
|
|
|
|
bb := longTermBufPool.Get()
|
|
valuesSize := ch.valuesSize
|
|
if valuesSize > maxValuesBlockSize {
|
|
logger.Panicf("FATAL: %s: values block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxValuesBlockSize, valuesSize)
|
|
}
|
|
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(valuesSize))
|
|
bloomValuesFile.values.MustReadAt(bb.B, int64(ch.valuesOffset))
|
|
|
|
values = getStringBucket()
|
|
var err error
|
|
values.a, err = bs.sbu.unmarshal(values.a[:0], bb.B, bs.bsw.bh.rowsCount)
|
|
longTermBufPool.Put(bb)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: %s: cannot unmarshal column %q: %s", bs.partPath(), ch.name, err)
|
|
}
|
|
|
|
if bs.valuesCache == nil {
|
|
bs.valuesCache = make(map[string]*stringBucket)
|
|
}
|
|
bs.valuesCache[ch.name] = values
|
|
return values.a
|
|
}
|
|
|
|
// getTimestamps returns timestamps for the given bs.
|
|
//
|
|
// The returned timestamps belong to bs, so they become invalid after bs reset.
|
|
func (bs *blockSearch) getTimestamps() []int64 {
|
|
timestamps := bs.timestampsCache
|
|
if timestamps != nil {
|
|
return timestamps.A
|
|
}
|
|
|
|
p := bs.bsw.p
|
|
|
|
bb := longTermBufPool.Get()
|
|
th := &bs.bsw.bh.timestampsHeader
|
|
blockSize := th.blockSize
|
|
if blockSize > maxTimestampsBlockSize {
|
|
logger.Panicf("FATAL: %s: timestamps block size cannot exceed %d bytes; got %d bytes", bs.partPath(), maxTimestampsBlockSize, blockSize)
|
|
}
|
|
bb.B = bytesutil.ResizeNoCopyMayOverallocate(bb.B, int(blockSize))
|
|
p.timestampsFile.MustReadAt(bb.B, int64(th.blockOffset))
|
|
|
|
rowsCount := int(bs.bsw.bh.rowsCount)
|
|
timestamps = encoding.GetInt64s(rowsCount)
|
|
var err error
|
|
timestamps.A, err = encoding.UnmarshalTimestamps(timestamps.A[:0], bb.B, th.marshalType, th.minTimestamp, rowsCount)
|
|
longTermBufPool.Put(bb)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: %s: cannot unmarshal timestamps: %s", bs.partPath(), err)
|
|
}
|
|
bs.timestampsCache = timestamps
|
|
return timestamps.A
|
|
}
|
|
|
|
// mustReadBlockHeaders reads ih block headers from p, appends them to dst and returns the result.
|
|
func (ih *indexBlockHeader) mustReadBlockHeaders(dst []blockHeader, p *part) []blockHeader {
|
|
bbCompressed := longTermBufPool.Get()
|
|
indexBlockSize := ih.indexBlockSize
|
|
if indexBlockSize > maxIndexBlockSize {
|
|
logger.Panicf("FATAL: %s: index block size cannot exceed %d bytes; got %d bytes", p.indexFile.Path(), maxIndexBlockSize, indexBlockSize)
|
|
}
|
|
bbCompressed.B = bytesutil.ResizeNoCopyMayOverallocate(bbCompressed.B, int(indexBlockSize))
|
|
p.indexFile.MustReadAt(bbCompressed.B, int64(ih.indexBlockOffset))
|
|
|
|
bb := longTermBufPool.Get()
|
|
var err error
|
|
bb.B, err = encoding.DecompressZSTD(bb.B, bbCompressed.B)
|
|
longTermBufPool.Put(bbCompressed)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: %s: cannot decompress indexBlock read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)
|
|
}
|
|
|
|
dst, err = unmarshalBlockHeaders(dst, bb.B, p.ph.FormatVersion)
|
|
longTermBufPool.Put(bb)
|
|
if err != nil {
|
|
logger.Panicf("FATAL: %s: cannot unmarshal block headers read at offset %d with size %d: %s", p.indexFile.Path(), ih.indexBlockOffset, ih.indexBlockSize, err)
|
|
}
|
|
|
|
return dst
|
|
}
|
|
|
|
// getStreamStr returns _stream value for the given block at bs.
|
|
func (bs *blockSearch) getStreamStr() string {
|
|
sid := bs.bsw.bh.streamID.id
|
|
streamStr := bs.seenStreams[sid]
|
|
if streamStr != "" {
|
|
// Fast path - streamStr is found in the seenStreams.
|
|
return streamStr
|
|
}
|
|
|
|
// Slow path - load streamStr from the storage.
|
|
streamStr = bs.getStreamStrSlow()
|
|
if streamStr != "" {
|
|
// Store the found streamStr in seenStreams.
|
|
if len(bs.seenStreams) > 20_000 {
|
|
bs.seenStreams = nil
|
|
}
|
|
if bs.seenStreams == nil {
|
|
bs.seenStreams = make(map[u128]string)
|
|
}
|
|
bs.seenStreams[sid] = streamStr
|
|
}
|
|
return streamStr
|
|
}
|
|
|
|
func (bs *blockSearch) getStreamStrSlow() string {
|
|
bb := bbPool.Get()
|
|
defer bbPool.Put(bb)
|
|
|
|
bb.B = bs.bsw.p.pt.idb.appendStreamTagsByStreamID(bb.B[:0], &bs.bsw.bh.streamID)
|
|
if len(bb.B) == 0 {
|
|
// Couldn't find stream tags by sid. This may be the case when the corresponding log stream
|
|
// was recently registered and its tags aren't visible to search yet.
|
|
// The stream tags must become visible in a few seconds.
|
|
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6042
|
|
return ""
|
|
}
|
|
|
|
st := GetStreamTags()
|
|
mustUnmarshalStreamTags(st, bb.B)
|
|
bb.B = st.marshalString(bb.B[:0])
|
|
PutStreamTags(st)
|
|
|
|
return string(bb.B)
|
|
}
|