lib/logstorage: remove unnecesary abstraction - RowsFormatter

It is better to use the AppendFieldsToJSON function directly
instead of hiding it under RowsFormatter abstraction.
This commit is contained in:
Aliaksandr Valialkin 2025-01-28 18:01:44 +01:00
parent ec64a1fd7c
commit 95f182053b
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
4 changed files with 16 additions and 24 deletions
app/vlinsert/insertutils
lib/logstorage

View file

@ -199,8 +199,8 @@ func (lmp *logMessageProcessor) AddRow(timestamp int64, fields, streamFields []l
lmp.bytesIngestedTotal.Add(n)
if len(fields) > *MaxFieldsPerLine {
rf := logstorage.RowFormatter(fields)
logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, rf)
line := logstorage.MarshalFieldsToJSON(nil, fields)
logger.Warnf("dropping log line with %d fields; it exceeds -insert.maxFieldsPerLine=%d; %s", len(fields), *MaxFieldsPerLine, line)
rowsDroppedTotalTooManyFields.Inc()
return
}

View file

@ -65,15 +65,6 @@ func (sf *sortedFields) Swap(i, j int) {
a[i], a[j] = a[j], a[i]
}
// RowFormatter implementes fmt.Stringer for []Field aka a single log row
type RowFormatter []Field
// String returns user-readable representation for rf
func (rf *RowFormatter) String() string {
result := MarshalFieldsToJSON(nil, *rf)
return string(result)
}
// Reset resets lr with all its settings.
//
// Call ResetKeepSettings() for resetting lr without resetting its settings.
@ -274,20 +265,21 @@ func (lr *LogRows) addFieldsInternal(fields []Field, ignoreFields map[string]str
func (lr *LogRows) GetRowString(idx int) string {
tf := TimeFormatter(lr.timestamps[idx])
streamTags := getStreamTagsString(lr.streamTagsCanonicals[idx])
var rf RowFormatter
rf = append(rf[:0], lr.rows[idx]...)
rf = append(rf, Field{
var fields []Field
fields = append(fields[:0], lr.rows[idx]...)
fields = append(fields, Field{
Name: "_time",
Value: tf.String(),
})
rf = append(rf, Field{
fields = append(fields, Field{
Name: "_stream",
Value: streamTags,
})
sort.Slice(rf, func(i, j int) bool {
return rf[i].Name < rf[j].Name
sort.Slice(fields, func(i, j int) bool {
return fields[i].Name < fields[j].Name
})
return rf.String()
line := MarshalFieldsToJSON(nil, fields)
return string(line)
}
// GetLogRows returns LogRows from the pool for the given streamFields.

View file

@ -147,8 +147,8 @@ func (pt *partition) mustAddRows(lr *LogRows) {
func (pt *partition) logNewStream(streamTagsCanonical []byte, fields []Field) {
streamTags := getStreamTagsString(streamTagsCanonical)
rf := RowFormatter(fields)
logger.Infof("partition %s: new stream %s for log entry %s", pt.path, streamTags, &rf)
line := MarshalFieldsToJSON(nil, fields)
logger.Infof("partition %s: new stream %s for log entry %s", pt.path, streamTags, line)
}
func (pt *partition) logIngestedRows(lr *LogRows) {

View file

@ -544,22 +544,22 @@ func (s *Storage) MustAddRows(lr *LogRows) {
for i, ts := range lr.timestamps {
day := ts / nsecsPerDay
if day < minAllowedDay {
rf := RowFormatter(lr.rows[i])
line := MarshalFieldsToJSON(nil, lr.rows[i])
tsf := TimeFormatter(ts)
minAllowedTsf := TimeFormatter(minAllowedDay * nsecsPerDay)
tooSmallTimestampLogger.Warnf("skipping log entry with too small timestamp=%s; it must be bigger than %s according "+
"to the configured -retentionPeriod=%dd. See https://docs.victoriametrics.com/victorialogs/#retention ; "+
"log entry: %s", &tsf, &minAllowedTsf, durationToDays(s.retention), &rf)
"log entry: %s", &tsf, &minAllowedTsf, durationToDays(s.retention), line)
s.rowsDroppedTooSmallTimestamp.Add(1)
continue
}
if day > maxAllowedDay {
rf := RowFormatter(lr.rows[i])
line := MarshalFieldsToJSON(nil, lr.rows[i])
tsf := TimeFormatter(ts)
maxAllowedTsf := TimeFormatter(maxAllowedDay * nsecsPerDay)
tooBigTimestampLogger.Warnf("skipping log entry with too big timestamp=%s; it must be smaller than %s according "+
"to the configured -futureRetention=%dd; see https://docs.victoriametrics.com/victorialogs/#retention ; "+
"log entry: %s", &tsf, &maxAllowedTsf, durationToDays(s.futureRetention), &rf)
"log entry: %s", &tsf, &maxAllowedTsf, durationToDays(s.futureRetention), line)
s.rowsDroppedTooBigTimestamp.Add(1)
continue
}