mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2024-11-21 14:44:00 +00:00
app/victoria-logs: add ability to debug data ingestion by passing debug
query arg to data ingestion API
This commit is contained in:
parent
e1e48077cc
commit
00c3dbd15d
6 changed files with 167 additions and 80 deletions
|
@ -17,6 +17,7 @@ import (
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||||
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
|
||||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common"
|
||||||
|
@ -107,24 +108,30 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
msgField = msgf
|
msgField = msgf
|
||||||
}
|
}
|
||||||
|
|
||||||
// Extract stream field names from _stream_fields query arg
|
streamFields := httputils.GetArray(r, "_stream_fields")
|
||||||
var streamFields []string
|
ignoreFields := httputils.GetArray(r, "ignore_fields")
|
||||||
if sfs := r.FormValue("_stream_fields"); sfs != "" {
|
|
||||||
streamFields = strings.Split(sfs, ",")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract field names, which must be ignored
|
isDebug := httputils.GetBool(r, "debug")
|
||||||
var ignoreFields []string
|
debugRequestURI := ""
|
||||||
if ifs := r.FormValue("ignore_fields"); ifs != "" {
|
debugRemoteAddr := ""
|
||||||
ignoreFields = strings.Split(ifs, ",")
|
if isDebug {
|
||||||
|
debugRequestURI = httpserver.GetRequestURI(r)
|
||||||
|
debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r)
|
||||||
}
|
}
|
||||||
|
|
||||||
lr := logstorage.GetLogRows(streamFields, ignoreFields)
|
lr := logstorage.GetLogRows(streamFields, ignoreFields)
|
||||||
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
processLogMessage := func(timestamp int64, fields []logstorage.Field) {
|
||||||
lr.MustAdd(tenantID, timestamp, fields)
|
lr.MustAdd(tenantID, timestamp, fields)
|
||||||
|
if isDebug {
|
||||||
|
s := lr.GetRowString(0)
|
||||||
|
lr.ResetKeepSettings()
|
||||||
|
logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", debugRemoteAddr, debugRequestURI, s)
|
||||||
|
rowsDroppedTotal.Inc()
|
||||||
|
return
|
||||||
|
}
|
||||||
if lr.NeedFlush() {
|
if lr.NeedFlush() {
|
||||||
vlstorage.MustAddRows(lr)
|
vlstorage.MustAddRows(lr)
|
||||||
lr.Reset()
|
lr.ResetKeepSettings()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -148,7 +155,10 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`)
|
var (
|
||||||
|
bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`)
|
||||||
|
rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{path="/insert/elasticsearch/_bulk",reason="debug"}`)
|
||||||
|
)
|
||||||
|
|
||||||
func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
|
func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string,
|
||||||
processLogMessage func(timestamp int64, fields []logstorage.Field),
|
processLogMessage func(timestamp int64, fields []logstorage.Field),
|
||||||
|
|
|
@ -138,10 +138,10 @@ func initStorageMetrics(strg *logstorage.Storage) *metrics.Set {
|
||||||
return float64(m().UncompressedFileSize)
|
return float64(m().UncompressedFileSize)
|
||||||
})
|
})
|
||||||
|
|
||||||
ms.NewGauge(`vlinsert_rows_dropped_total{reason="too_big_timestamp"}`, func() float64 {
|
ms.NewGauge(`vl_rows_dropped_total{reason="too_big_timestamp"}`, func() float64 {
|
||||||
return float64(m().RowsDroppedTooBigTimestamp)
|
return float64(m().RowsDroppedTooBigTimestamp)
|
||||||
})
|
})
|
||||||
ms.NewGauge(`vlinsert_rows_dropped_total{reason="too_small_timestamp"}`, func() float64 {
|
ms.NewGauge(`vl_rows_dropped_total{reason="too_small_timestamp"}`, func() float64 {
|
||||||
return float64(m().RowsDroppedTooSmallTimestamp)
|
return float64(m().RowsDroppedTooSmallTimestamp)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -82,32 +82,14 @@ It is recommended setting up monitoring of VictoriaLogs according to [these docs
|
||||||
|
|
||||||
### Data ingestion
|
### Data ingestion
|
||||||
|
|
||||||
VictoriaLogs supports the following data ingestion techniques:
|
VictoriaLogs supports the following data ingestion approaches:
|
||||||
|
|
||||||
- Via [Filebeat](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html). See [these docs](#filebeat-setup).
|
- Via [Filebeat](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html). See [these docs](#filebeat-setup).
|
||||||
- Via [Logstash](https://www.elastic.co/guide/en/logstash/current/introduction.html). See [these docs](#logstash-setup).
|
- Via [Logstash](https://www.elastic.co/guide/en/logstash/current/introduction.html). See [these docs](#logstash-setup).
|
||||||
|
|
||||||
The ingested log entries can be queried according to [these docs](#querying).
|
The ingested logs can be queried according to [these docs](#querying).
|
||||||
|
|
||||||
#### Data ingestion troubleshooting
|
See also [data ingestion troubleshooting](#data-ingestion-troubleshooting) docs.
|
||||||
|
|
||||||
VictoriaLogs provides the following command-line flags, which can help debugging data ingestion issues:
|
|
||||||
|
|
||||||
- `-logNewStreams` - if this flag is passed to VictoriaLogs, then it logs all the newly
|
|
||||||
registered [log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields).
|
|
||||||
This may help debugging [high cardinality issues](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality).
|
|
||||||
- `-logIngestedRows` - if this flag is passed to VictoriaLogs, then it logs all the ingested
|
|
||||||
[log entries](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
|
|
||||||
|
|
||||||
VictoriaLogs exposes various [metrics](#monitoring), which may help debugging data ingestion issues:
|
|
||||||
|
|
||||||
- `vl_rows_ingested_total` - the number of ingested [log entries](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
|
|
||||||
since the last VictoriaLogs restart. If this number icreases over time, then logs are successfully ingested into VictoriaLogs.
|
|
||||||
The ingested logs can be inspected in logs by passing `-logIngestedRows` command-line flag to VictoriaLogs.
|
|
||||||
- `vl_streams_created_total` - the number of created [log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields)
|
|
||||||
since the last VictoriaLogs restart. If this metric grows rapidly during extended periods of time, then this may lead
|
|
||||||
to [high cardinality issues](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality).
|
|
||||||
The newly created log streams can be inspected in logs by passing `-logNewStreams` command-line flag to VictoriaLogs.
|
|
||||||
|
|
||||||
#### Filebeat setup
|
#### Filebeat setup
|
||||||
|
|
||||||
|
@ -125,17 +107,24 @@ output.elasticsearch:
|
||||||
|
|
||||||
Substitute the `localhost:9428` address inside `hosts` section with the real TCP address of VictoriaLogs.
|
Substitute the `localhost:9428` address inside `hosts` section with the real TCP address of VictoriaLogs.
|
||||||
|
|
||||||
The `_msg_field` parameter must contain the field name with the log message generated by Filebeat. This is usually `message` field.
|
See [these docs](#data-ingestion-parameters) for details on the `parameters` section.
|
||||||
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field) for details.
|
|
||||||
|
|
||||||
The `_time_field` parameter must contain the field name with the log timestamp generated by Filebeat. This is usually `@timestamp` field.
|
It is recommended to verify whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
|
||||||
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) for details.
|
and uses the correct [stream fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields).
|
||||||
|
This can be done by specifying `debug` [parameter](#data-ingestion-parameters):
|
||||||
|
|
||||||
It is recommended specifying comma-separated list of field names, which uniquely identify every log stream collected by Filebeat, in the `_stream_fields` parameter.
|
```yml
|
||||||
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) for details.
|
output.elasticsearch:
|
||||||
|
hosts: ["http://localhost:9428/insert/elasticsearch/"]
|
||||||
|
parameters:
|
||||||
|
_msg_field: "message"
|
||||||
|
_time_field: "@timestamp"
|
||||||
|
_stream_fields: "host.hostname,log.file.path"
|
||||||
|
debug: "1"
|
||||||
|
```
|
||||||
|
|
||||||
If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) aren't needed,
|
If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) must be skipped
|
||||||
then VictoriaLogs can be instructed to ignore them during data ingestion - just pass `ignore_fields` parameter with comma-separated list of fields to ignore.
|
during data ingestion, then they can be put into `ignore_fields` [parameter](#data-ingestion-parameters).
|
||||||
For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs:
|
For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs:
|
||||||
|
|
||||||
```yml
|
```yml
|
||||||
|
@ -215,17 +204,28 @@ output {
|
||||||
|
|
||||||
Substitute `localhost:9428` address inside `hosts` with the real TCP address of VictoriaLogs.
|
Substitute `localhost:9428` address inside `hosts` with the real TCP address of VictoriaLogs.
|
||||||
|
|
||||||
The `_msg_field` parameter must contain the field name with the log message generated by Logstash. This is usually `message` field.
|
See [these docs](#data-ingestion-parameters) for details on the `parameters` section.
|
||||||
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field) for details.
|
|
||||||
|
|
||||||
The `_time_field` parameter must contain the field name with the log timestamp generated by Logstash. This is usually `@timestamp` field.
|
It is recommended to verify whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
|
||||||
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) for details.
|
and uses the correct [stream fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields).
|
||||||
|
This can be done by specifying `debug` [parameter](#data-ingestion-parameters):
|
||||||
|
|
||||||
It is recommended specifying comma-separated list of field names, which uniquely identify every log stream collected by Logstash, in the `_stream_fields` parameter.
|
```conf
|
||||||
See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) for details.
|
output {
|
||||||
|
elasticsearch {
|
||||||
|
hosts => ["http://localhost:9428/insert/elasticsearch/"]
|
||||||
|
parameters => {
|
||||||
|
"_msg_field" => "message"
|
||||||
|
"_time_field" => "@timestamp"
|
||||||
|
"_stream_fields" => "host.name,process.name"
|
||||||
|
"debug" => "1"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) aren't needed,
|
If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) must be skipped
|
||||||
then VictoriaLogs can be instructed to ignore them during data ingestion - just pass `ignore_fields` parameter with comma-separated list of fields to ignore.
|
during data ingestion, then they can be put into `ignore_fields` [parameter](#data-ingestion-parameters).
|
||||||
For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs:
|
For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs:
|
||||||
|
|
||||||
```conf
|
```conf
|
||||||
|
@ -284,6 +284,55 @@ The ingested log entries can be queried according to [these docs](#querying).
|
||||||
|
|
||||||
See also [data ingestion troubleshooting](#data-ingestion-troubleshooting) docs.
|
See also [data ingestion troubleshooting](#data-ingestion-troubleshooting) docs.
|
||||||
|
|
||||||
|
#### Data ingestion parameters
|
||||||
|
|
||||||
|
VictoriaLogs accepts the following parameters at [data ingestion](#data-ingestion) HTTP APIs:
|
||||||
|
|
||||||
|
- `_msg_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
|
||||||
|
with the [log message](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field) generated by the log shipper.
|
||||||
|
This is usually the `message` field for Filebeat and Logstash.
|
||||||
|
If the `_msg_field` parameter isn't set, then VictoriaLogs reads the log message from the `_msg` field.
|
||||||
|
|
||||||
|
- `_time_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
|
||||||
|
with the [log timestamp](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) generated by the log shipper.
|
||||||
|
This is usually the `@timestamp` field for Filebeat and Logstash.
|
||||||
|
If the `_time_field` parameter isn't set, then VictoriaLogs reads the timestamp from the `_time` field.
|
||||||
|
If this field doesn't exist, then the current timestamp is used.
|
||||||
|
|
||||||
|
- `_stream_fields` - it should contain comma-separated list of [log field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) names,
|
||||||
|
which uniquely identify every [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) collected the log shipper.
|
||||||
|
If the `_stream_fields` parameter isn't set, then all the ingested logs are written to default log stream - `{}`.
|
||||||
|
|
||||||
|
- `ignore_fields` - this parameter may contain the list of [log field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) names,
|
||||||
|
which must be ignored during [data ingestion](#data-ingestion).
|
||||||
|
|
||||||
|
- `debug` - if this parameter is set to `1`, then the [ingested](#data-ingestion) logs aren't stored in VictoriaLogs. Instead,
|
||||||
|
the ingested data is logged by VictoriaLogs, so it can be investigated later.
|
||||||
|
|
||||||
|
#### Data ingestion troubleshooting
|
||||||
|
|
||||||
|
VictoriaLogs provides the following command-line flags, which can help debugging data ingestion issues:
|
||||||
|
|
||||||
|
- `-logNewStreams` - if this flag is passed to VictoriaLogs, then it logs all the newly
|
||||||
|
registered [log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields).
|
||||||
|
This may help debugging [high cardinality issues](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality).
|
||||||
|
- `-logIngestedRows` - if this flag is passed to VictoriaLogs, then it logs all the ingested
|
||||||
|
[log entries](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model).
|
||||||
|
See also `debug` [parameter](#data-ingestion-parameters).
|
||||||
|
|
||||||
|
VictoriaLogs exposes various [metrics](#monitoring), which may help debugging data ingestion issues:
|
||||||
|
|
||||||
|
- `vl_rows_ingested_total` - the number of ingested [log entries](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model)
|
||||||
|
since the last VictoriaLogs restart. If this number icreases over time, then logs are successfully ingested into VictoriaLogs.
|
||||||
|
The ingested logs can be inspected in the following ways:
|
||||||
|
- By passing `debug=1` parameter to every request to [data ingestion endpoints](#data-ingestion). The ingested rows aren't stored in VictoriaLogs
|
||||||
|
in this case. Instead, they are logged, so they can be investigated later. The `vl_rows_dropped_total` [metric](#monitoring) is incremented for each logged row.
|
||||||
|
- By passing `-logIngestedRows` command-line flag to VictoriaLogs. In this case it logs all the ingested data, so it can be investigated later.
|
||||||
|
- `vl_streams_created_total` - the number of created [log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields)
|
||||||
|
since the last VictoriaLogs restart. If this metric grows rapidly during extended periods of time, then this may lead
|
||||||
|
to [high cardinality issues](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality).
|
||||||
|
The newly created log streams can be inspected in logs by passing `-logNewStreams` command-line flag to VictoriaLogs.
|
||||||
|
|
||||||
### Querying
|
### Querying
|
||||||
|
|
||||||
VictoriaLogs can be queried at the `/select/logsql/query` endpoint. The [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html)
|
VictoriaLogs can be queried at the `/select/logsql/query` endpoint. The [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html)
|
||||||
|
@ -468,12 +517,12 @@ outside the configured retention.
|
||||||
|
|
||||||
VictoriaLogs automatically drops logs at [data ingestion](#data-ingestion) stage if they have timestamps outside the configured retention.
|
VictoriaLogs automatically drops logs at [data ingestion](#data-ingestion) stage if they have timestamps outside the configured retention.
|
||||||
A sample of dropped logs is logged with `WARN` message in order to simplify troubleshooting.
|
A sample of dropped logs is logged with `WARN` message in order to simplify troubleshooting.
|
||||||
The `vlinsert_rows_dropped_total` [metric](#monitoring) is incremented each time an ingested log entry is dropped because of timestamp outside the retention.
|
The `vl_rows_dropped_total` [metric](#monitoring) is incremented each time an ingested log entry is dropped because of timestamp outside the retention.
|
||||||
It is recommended setting up the following alerting rule at [vmalert](https://docs.victoriametrics.com/vmalert.html) in order to be notified
|
It is recommended setting up the following alerting rule at [vmalert](https://docs.victoriametrics.com/vmalert.html) in order to be notified
|
||||||
when logs with wrong timestamps are ingested into VictoriaLogs:
|
when logs with wrong timestamps are ingested into VictoriaLogs:
|
||||||
|
|
||||||
```metricsql
|
```metricsql
|
||||||
rate(vlinsert_rows_dropped_total[5m]) > 0
|
rate(vl_rows_dropped_total[5m]) > 0
|
||||||
```
|
```
|
||||||
|
|
||||||
By default VictoriaLogs doesn't accept log entries with timestamps bigger than `now+2d`, e.g. 2 days in the future.
|
By default VictoriaLogs doesn't accept log entries with timestamps bigger than `now+2d`, e.g. 2 days in the future.
|
||||||
|
|
15
lib/httputils/array.go
Normal file
15
lib/httputils/array.go
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package httputils
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GetArray returns an array of comma-separated values from r arg with the argKey name.
|
||||||
|
func GetArray(r *http.Request, argKey string) []string {
|
||||||
|
v := r.FormValue(argKey)
|
||||||
|
if v == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return strings.Split(v, ",")
|
||||||
|
}
|
|
@ -76,8 +76,25 @@ func (rf *RowFormatter) String() string {
|
||||||
return string(b)
|
return string(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Reset resets lr
|
// Reset resets lr with all its settings.
|
||||||
|
//
|
||||||
|
// Call ResetKeepSettings() for resetting lr without resetting its settings.
|
||||||
func (lr *LogRows) Reset() {
|
func (lr *LogRows) Reset() {
|
||||||
|
lr.ResetKeepSettings()
|
||||||
|
|
||||||
|
sfs := lr.streamFields
|
||||||
|
for k := range sfs {
|
||||||
|
delete(sfs, k)
|
||||||
|
}
|
||||||
|
|
||||||
|
ifs := lr.ignoreFields
|
||||||
|
for k := range ifs {
|
||||||
|
delete(ifs, k)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows().
|
||||||
|
func (lr *LogRows) ResetKeepSettings() {
|
||||||
lr.buf = lr.buf[:0]
|
lr.buf = lr.buf[:0]
|
||||||
|
|
||||||
fb := lr.fieldsBuf
|
fb := lr.fieldsBuf
|
||||||
|
@ -107,16 +124,6 @@ func (lr *LogRows) Reset() {
|
||||||
lr.rows = rows[:0]
|
lr.rows = rows[:0]
|
||||||
|
|
||||||
lr.sf = nil
|
lr.sf = nil
|
||||||
|
|
||||||
sfs := lr.streamFields
|
|
||||||
for k := range sfs {
|
|
||||||
delete(sfs, k)
|
|
||||||
}
|
|
||||||
|
|
||||||
ifs := lr.ignoreFields
|
|
||||||
for k := range ifs {
|
|
||||||
delete(ifs, k)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
|
// NeedFlush returns true if lr contains too much data, so it must be flushed to the storage.
|
||||||
|
@ -200,6 +207,26 @@ func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field
|
||||||
lr.buf = buf
|
lr.buf = buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetRowString returns string representation of the row with the given idx.
|
||||||
|
func (lr *LogRows) GetRowString(idx int) string {
|
||||||
|
tf := TimeFormatter(lr.timestamps[idx])
|
||||||
|
streamTags := getStreamTagsString(lr.streamTagsCanonicals[idx])
|
||||||
|
var rf RowFormatter
|
||||||
|
rf = append(rf[:0], lr.rows[idx]...)
|
||||||
|
rf = append(rf, Field{
|
||||||
|
Name: "_time",
|
||||||
|
Value: tf.String(),
|
||||||
|
})
|
||||||
|
rf = append(rf, Field{
|
||||||
|
Name: "_stream",
|
||||||
|
Value: streamTags,
|
||||||
|
})
|
||||||
|
sort.Slice(rf, func(i, j int) bool {
|
||||||
|
return rf[i].Name < rf[j].Name
|
||||||
|
})
|
||||||
|
return rf.String()
|
||||||
|
}
|
||||||
|
|
||||||
// GetLogRows returns LogRows from the pool for the given streamFields.
|
// GetLogRows returns LogRows from the pool for the given streamFields.
|
||||||
//
|
//
|
||||||
// streamFields is a set of field names, which must be associated with the stream.
|
// streamFields is a set of field names, which must be associated with the stream.
|
||||||
|
|
|
@ -153,23 +153,9 @@ func (pt *partition) logNewStream(streamTagsCanonical []byte, fields []Field) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (pt *partition) logIngestedRows(lr *LogRows) {
|
func (pt *partition) logIngestedRows(lr *LogRows) {
|
||||||
var rf RowFormatter
|
for i := range lr.rows {
|
||||||
for i, fields := range lr.rows {
|
s := lr.GetRowString(i)
|
||||||
tf := TimeFormatter(lr.timestamps[i])
|
logger.Infof("partition %s: new log entry %s", pt.path, s)
|
||||||
streamTags := getStreamTagsString(lr.streamTagsCanonicals[i])
|
|
||||||
rf = append(rf[:0], fields...)
|
|
||||||
rf = append(rf, Field{
|
|
||||||
Name: "_time",
|
|
||||||
Value: tf.String(),
|
|
||||||
})
|
|
||||||
rf = append(rf, Field{
|
|
||||||
Name: "_stream",
|
|
||||||
Value: streamTags,
|
|
||||||
})
|
|
||||||
sort.Slice(rf, func(i, j int) bool {
|
|
||||||
return rf[i].Name < rf[j].Name
|
|
||||||
})
|
|
||||||
logger.Infof("partition %s: new log entry %s", pt.path, &rf)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue