From 00c3dbd15d8cb25fe2f4131f10ae98aa364c6058 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Tue, 20 Jun 2023 20:02:46 -0700 Subject: [PATCH] app/victoria-logs: add ability to debug data ingestion by passing `debug` query arg to data ingestion API --- app/vlinsert/elasticsearch/elasticsearch.go | 32 +++-- app/vlstorage/main.go | 4 +- docs/VictoriaLogs/README.md | 127 ++++++++++++++------ lib/httputils/array.go | 15 +++ lib/logstorage/log_rows.go | 49 ++++++-- lib/logstorage/partition.go | 20 +-- 6 files changed, 167 insertions(+), 80 deletions(-) create mode 100644 lib/httputils/array.go diff --git a/app/vlinsert/elasticsearch/elasticsearch.go b/app/vlinsert/elasticsearch/elasticsearch.go index de4cf945c..8a9d6d977 100644 --- a/app/vlinsert/elasticsearch/elasticsearch.go +++ b/app/vlinsert/elasticsearch/elasticsearch.go @@ -17,6 +17,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/httputils" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/protoparser/common" @@ -107,24 +108,30 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { msgField = msgf } - // Extract stream field names from _stream_fields query arg - var streamFields []string - if sfs := r.FormValue("_stream_fields"); sfs != "" { - streamFields = strings.Split(sfs, ",") - } + streamFields := httputils.GetArray(r, "_stream_fields") + ignoreFields := httputils.GetArray(r, "ignore_fields") - // Extract field names, which must be ignored - var ignoreFields []string - if ifs := r.FormValue("ignore_fields"); ifs != "" { - ignoreFields = strings.Split(ifs, ",") + isDebug := httputils.GetBool(r, "debug") + debugRequestURI := "" + debugRemoteAddr := "" + if isDebug { + debugRequestURI = httpserver.GetRequestURI(r) + debugRemoteAddr = httpserver.GetQuotedRemoteAddr(r) } lr := logstorage.GetLogRows(streamFields, ignoreFields) processLogMessage := func(timestamp int64, fields []logstorage.Field) { lr.MustAdd(tenantID, timestamp, fields) + if isDebug { + s := lr.GetRowString(0) + lr.ResetKeepSettings() + logger.Infof("remoteAddr=%s; requestURI=%s; ignoring log entry because of `debug` query arg: %s", debugRemoteAddr, debugRequestURI, s) + rowsDroppedTotal.Inc() + return + } if lr.NeedFlush() { vlstorage.MustAddRows(lr) - lr.Reset() + lr.ResetKeepSettings() } } @@ -148,7 +155,10 @@ func RequestHandler(path string, w http.ResponseWriter, r *http.Request) bool { } } -var bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`) +var ( + bulkRequestsTotal = metrics.NewCounter(`vl_http_requests_total{path="/insert/elasticsearch/_bulk"}`) + rowsDroppedTotal = metrics.NewCounter(`vl_rows_dropped_total{path="/insert/elasticsearch/_bulk",reason="debug"}`) +) func readBulkRequest(r io.Reader, isGzip bool, timeField, msgField string, processLogMessage func(timestamp int64, fields []logstorage.Field), diff --git a/app/vlstorage/main.go b/app/vlstorage/main.go index afaa78f1d..06afa3d14 100644 --- a/app/vlstorage/main.go +++ b/app/vlstorage/main.go @@ -138,10 +138,10 @@ func initStorageMetrics(strg *logstorage.Storage) *metrics.Set { return float64(m().UncompressedFileSize) }) - ms.NewGauge(`vlinsert_rows_dropped_total{reason="too_big_timestamp"}`, func() float64 { + ms.NewGauge(`vl_rows_dropped_total{reason="too_big_timestamp"}`, func() float64 { return float64(m().RowsDroppedTooBigTimestamp) }) - ms.NewGauge(`vlinsert_rows_dropped_total{reason="too_small_timestamp"}`, func() float64 { + ms.NewGauge(`vl_rows_dropped_total{reason="too_small_timestamp"}`, func() float64 { return float64(m().RowsDroppedTooSmallTimestamp) }) diff --git a/docs/VictoriaLogs/README.md b/docs/VictoriaLogs/README.md index 4beebe287..2c1c8b488 100644 --- a/docs/VictoriaLogs/README.md +++ b/docs/VictoriaLogs/README.md @@ -82,32 +82,14 @@ It is recommended setting up monitoring of VictoriaLogs according to [these docs ### Data ingestion -VictoriaLogs supports the following data ingestion techniques: +VictoriaLogs supports the following data ingestion approaches: - Via [Filebeat](https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html). See [these docs](#filebeat-setup). - Via [Logstash](https://www.elastic.co/guide/en/logstash/current/introduction.html). See [these docs](#logstash-setup). -The ingested log entries can be queried according to [these docs](#querying). +The ingested logs can be queried according to [these docs](#querying). -#### Data ingestion troubleshooting - -VictoriaLogs provides the following command-line flags, which can help debugging data ingestion issues: - -- `-logNewStreams` - if this flag is passed to VictoriaLogs, then it logs all the newly - registered [log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields). - This may help debugging [high cardinality issues](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality). -- `-logIngestedRows` - if this flag is passed to VictoriaLogs, then it logs all the ingested - [log entries](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). - -VictoriaLogs exposes various [metrics](#monitoring), which may help debugging data ingestion issues: - -- `vl_rows_ingested_total` - the number of ingested [log entries](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) - since the last VictoriaLogs restart. If this number icreases over time, then logs are successfully ingested into VictoriaLogs. - The ingested logs can be inspected in logs by passing `-logIngestedRows` command-line flag to VictoriaLogs. -- `vl_streams_created_total` - the number of created [log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) - since the last VictoriaLogs restart. If this metric grows rapidly during extended periods of time, then this may lead - to [high cardinality issues](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality). - The newly created log streams can be inspected in logs by passing `-logNewStreams` command-line flag to VictoriaLogs. +See also [data ingestion troubleshooting](#data-ingestion-troubleshooting) docs. #### Filebeat setup @@ -125,17 +107,24 @@ output.elasticsearch: Substitute the `localhost:9428` address inside `hosts` section with the real TCP address of VictoriaLogs. -The `_msg_field` parameter must contain the field name with the log message generated by Filebeat. This is usually `message` field. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field) for details. +See [these docs](#data-ingestion-parameters) for details on the `parameters` section. -The `_time_field` parameter must contain the field name with the log timestamp generated by Filebeat. This is usually `@timestamp` field. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) for details. +It is recommended to verify whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) +and uses the correct [stream fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields). +This can be done by specifying `debug` [parameter](#data-ingestion-parameters): -It is recommended specifying comma-separated list of field names, which uniquely identify every log stream collected by Filebeat, in the `_stream_fields` parameter. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) for details. +```yml +output.elasticsearch: + hosts: ["http://localhost:9428/insert/elasticsearch/"] + parameters: + _msg_field: "message" + _time_field: "@timestamp" + _stream_fields: "host.hostname,log.file.path" + debug: "1" +``` -If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) aren't needed, -then VictoriaLogs can be instructed to ignore them during data ingestion - just pass `ignore_fields` parameter with comma-separated list of fields to ignore. +If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) must be skipped +during data ingestion, then they can be put into `ignore_fields` [parameter](#data-ingestion-parameters). For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs: ```yml @@ -215,17 +204,28 @@ output { Substitute `localhost:9428` address inside `hosts` with the real TCP address of VictoriaLogs. -The `_msg_field` parameter must contain the field name with the log message generated by Logstash. This is usually `message` field. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field) for details. +See [these docs](#data-ingestion-parameters) for details on the `parameters` section. -The `_time_field` parameter must contain the field name with the log timestamp generated by Logstash. This is usually `@timestamp` field. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) for details. +It is recommended to verify whether the initial setup generates the needed [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) +and uses the correct [stream fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields). +This can be done by specifying `debug` [parameter](#data-ingestion-parameters): -It is recommended specifying comma-separated list of field names, which uniquely identify every log stream collected by Logstash, in the `_stream_fields` parameter. -See [these docs](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) for details. +```conf +output { + elasticsearch { + hosts => ["http://localhost:9428/insert/elasticsearch/"] + parameters => { + "_msg_field" => "message" + "_time_field" => "@timestamp" + "_stream_fields" => "host.name,process.name" + "debug" => "1" + } + } +} +``` -If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) aren't needed, -then VictoriaLogs can be instructed to ignore them during data ingestion - just pass `ignore_fields` parameter with comma-separated list of fields to ignore. +If some [log fields](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) must be skipped +during data ingestion, then they can be put into `ignore_fields` [parameter](#data-ingestion-parameters). For example, the following config instructs VictoriaLogs to ignore `log.offset` and `event.original` fields in the ingested logs: ```conf @@ -284,6 +284,55 @@ The ingested log entries can be queried according to [these docs](#querying). See also [data ingestion troubleshooting](#data-ingestion-troubleshooting) docs. +#### Data ingestion parameters + +VictoriaLogs accepts the following parameters at [data ingestion](#data-ingestion) HTTP APIs: + +- `_msg_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) + with the [log message](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#message-field) generated by the log shipper. + This is usually the `message` field for Filebeat and Logstash. + If the `_msg_field` parameter isn't set, then VictoriaLogs reads the log message from the `_msg` field. + +- `_time_field` - it must contain the name of the [log field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) + with the [log timestamp](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#time-field) generated by the log shipper. + This is usually the `@timestamp` field for Filebeat and Logstash. + If the `_time_field` parameter isn't set, then VictoriaLogs reads the timestamp from the `_time` field. + If this field doesn't exist, then the current timestamp is used. + +- `_stream_fields` - it should contain comma-separated list of [log field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) names, + which uniquely identify every [log stream](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) collected the log shipper. + If the `_stream_fields` parameter isn't set, then all the ingested logs are written to default log stream - `{}`. + +- `ignore_fields` - this parameter may contain the list of [log field](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) names, + which must be ignored during [data ingestion](#data-ingestion). + +- `debug` - if this parameter is set to `1`, then the [ingested](#data-ingestion) logs aren't stored in VictoriaLogs. Instead, + the ingested data is logged by VictoriaLogs, so it can be investigated later. + +#### Data ingestion troubleshooting + +VictoriaLogs provides the following command-line flags, which can help debugging data ingestion issues: + +- `-logNewStreams` - if this flag is passed to VictoriaLogs, then it logs all the newly + registered [log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields). + This may help debugging [high cardinality issues](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality). +- `-logIngestedRows` - if this flag is passed to VictoriaLogs, then it logs all the ingested + [log entries](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model). + See also `debug` [parameter](#data-ingestion-parameters). + +VictoriaLogs exposes various [metrics](#monitoring), which may help debugging data ingestion issues: + +- `vl_rows_ingested_total` - the number of ingested [log entries](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model) + since the last VictoriaLogs restart. If this number icreases over time, then logs are successfully ingested into VictoriaLogs. + The ingested logs can be inspected in the following ways: + - By passing `debug=1` parameter to every request to [data ingestion endpoints](#data-ingestion). The ingested rows aren't stored in VictoriaLogs + in this case. Instead, they are logged, so they can be investigated later. The `vl_rows_dropped_total` [metric](#monitoring) is incremented for each logged row. + - By passing `-logIngestedRows` command-line flag to VictoriaLogs. In this case it logs all the ingested data, so it can be investigated later. +- `vl_streams_created_total` - the number of created [log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) + since the last VictoriaLogs restart. If this metric grows rapidly during extended periods of time, then this may lead + to [high cardinality issues](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#high-cardinality). + The newly created log streams can be inspected in logs by passing `-logNewStreams` command-line flag to VictoriaLogs. + ### Querying VictoriaLogs can be queried at the `/select/logsql/query` endpoint. The [LogsQL](https://docs.victoriametrics.com/VictoriaLogs/LogsQL.html) @@ -468,12 +517,12 @@ outside the configured retention. VictoriaLogs automatically drops logs at [data ingestion](#data-ingestion) stage if they have timestamps outside the configured retention. A sample of dropped logs is logged with `WARN` message in order to simplify troubleshooting. -The `vlinsert_rows_dropped_total` [metric](#monitoring) is incremented each time an ingested log entry is dropped because of timestamp outside the retention. +The `vl_rows_dropped_total` [metric](#monitoring) is incremented each time an ingested log entry is dropped because of timestamp outside the retention. It is recommended setting up the following alerting rule at [vmalert](https://docs.victoriametrics.com/vmalert.html) in order to be notified when logs with wrong timestamps are ingested into VictoriaLogs: ```metricsql -rate(vlinsert_rows_dropped_total[5m]) > 0 +rate(vl_rows_dropped_total[5m]) > 0 ``` By default VictoriaLogs doesn't accept log entries with timestamps bigger than `now+2d`, e.g. 2 days in the future. diff --git a/lib/httputils/array.go b/lib/httputils/array.go new file mode 100644 index 000000000..5d61f3f7e --- /dev/null +++ b/lib/httputils/array.go @@ -0,0 +1,15 @@ +package httputils + +import ( + "net/http" + "strings" +) + +// GetArray returns an array of comma-separated values from r arg with the argKey name. +func GetArray(r *http.Request, argKey string) []string { + v := r.FormValue(argKey) + if v == "" { + return nil + } + return strings.Split(v, ",") +} diff --git a/lib/logstorage/log_rows.go b/lib/logstorage/log_rows.go index 2192fa0d2..6efbc8a3d 100644 --- a/lib/logstorage/log_rows.go +++ b/lib/logstorage/log_rows.go @@ -76,8 +76,25 @@ func (rf *RowFormatter) String() string { return string(b) } -// Reset resets lr +// Reset resets lr with all its settings. +// +// Call ResetKeepSettings() for resetting lr without resetting its settings. func (lr *LogRows) Reset() { + lr.ResetKeepSettings() + + sfs := lr.streamFields + for k := range sfs { + delete(sfs, k) + } + + ifs := lr.ignoreFields + for k := range ifs { + delete(ifs, k) + } +} + +// ResetKeepSettings resets rows stored in lr, while keeping its settings passed to GetLogRows(). +func (lr *LogRows) ResetKeepSettings() { lr.buf = lr.buf[:0] fb := lr.fieldsBuf @@ -107,16 +124,6 @@ func (lr *LogRows) Reset() { lr.rows = rows[:0] lr.sf = nil - - sfs := lr.streamFields - for k := range sfs { - delete(sfs, k) - } - - ifs := lr.ignoreFields - for k := range ifs { - delete(ifs, k) - } } // NeedFlush returns true if lr contains too much data, so it must be flushed to the storage. @@ -200,6 +207,26 @@ func (lr *LogRows) mustAddInternal(sid streamID, timestamp int64, fields []Field lr.buf = buf } +// GetRowString returns string representation of the row with the given idx. +func (lr *LogRows) GetRowString(idx int) string { + tf := TimeFormatter(lr.timestamps[idx]) + streamTags := getStreamTagsString(lr.streamTagsCanonicals[idx]) + var rf RowFormatter + rf = append(rf[:0], lr.rows[idx]...) + rf = append(rf, Field{ + Name: "_time", + Value: tf.String(), + }) + rf = append(rf, Field{ + Name: "_stream", + Value: streamTags, + }) + sort.Slice(rf, func(i, j int) bool { + return rf[i].Name < rf[j].Name + }) + return rf.String() +} + // GetLogRows returns LogRows from the pool for the given streamFields. // // streamFields is a set of field names, which must be associated with the stream. diff --git a/lib/logstorage/partition.go b/lib/logstorage/partition.go index 08c119d1d..64465de20 100644 --- a/lib/logstorage/partition.go +++ b/lib/logstorage/partition.go @@ -153,23 +153,9 @@ func (pt *partition) logNewStream(streamTagsCanonical []byte, fields []Field) { } func (pt *partition) logIngestedRows(lr *LogRows) { - var rf RowFormatter - for i, fields := range lr.rows { - tf := TimeFormatter(lr.timestamps[i]) - streamTags := getStreamTagsString(lr.streamTagsCanonicals[i]) - rf = append(rf[:0], fields...) - rf = append(rf, Field{ - Name: "_time", - Value: tf.String(), - }) - rf = append(rf, Field{ - Name: "_stream", - Value: streamTags, - }) - sort.Slice(rf, func(i, j int) bool { - return rf[i].Name < rf[j].Name - }) - logger.Infof("partition %s: new log entry %s", pt.path, &rf) + for i := range lr.rows { + s := lr.GetRowString(i) + logger.Infof("partition %s: new log entry %s", pt.path, s) } }