app/vlinsert: support unix timestamps in seconds and milliseconds in JSON stream data ingestion API

This commit is contained in:
Aliaksandr Valialkin 2024-09-28 21:56:50 +02:00
parent 7d7d7c03bc
commit 806bc2ac58
No known key found for this signature in database
GPG key ID: 52C003EE2BCDB9EB
6 changed files with 79 additions and 34 deletions

View file

@ -6,9 +6,7 @@ import (
"flag"
"fmt"
"io"
"math"
"net/http"
"strconv"
"strings"
"time"
@ -252,22 +250,7 @@ func parseElasticsearchTimestamp(s string) (int64, error) {
}
if len(s) < len("YYYY-MM-DD") || s[len("YYYY")] != '-' {
// Try parsing timestamp in seconds or milliseconds
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp in milliseconds from %q: %w", s, err)
}
if n < (1<<31) && n >= (-1<<31) {
// The timestamp is in seconds. Convert it to milliseconds
n *= 1e3
}
if n > int64(math.MaxInt64)/1e6 {
return 0, fmt.Errorf("too big timestamp in milliseconds: %d; mustn't exceed %d", n, int64(math.MaxInt64)/1e6)
}
if n < int64(math.MinInt64)/1e6 {
return 0, fmt.Errorf("too small timestamp in milliseconds: %d; must be bigger than %d", n, int64(math.MinInt64)/1e6)
}
n *= 1e6
return n, nil
return insertutils.ParseUnixTimestamp(s)
}
if len(s) == len("YYYY-MM-DD") {
t, err := time.Parse("2006-01-02", s)

View file

@ -2,6 +2,8 @@ package insertutils
import (
"fmt"
"math"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logstorage"
@ -19,15 +21,49 @@ func ExtractTimestampRFC3339NanoFromFields(timeField string, fields []logstorage
if f.Name != timeField {
continue
}
if f.Value == "" || f.Value == "0" {
return time.Now().UnixNano(), nil
}
nsecs, ok := logstorage.TryParseTimestampRFC3339Nano(f.Value)
if !ok {
return 0, fmt.Errorf("cannot unmarshal rfc3339 timestamp from %s=%q", timeField, f.Value)
nsecs, err := parseTimestamp(f.Value)
if err != nil {
return 0, fmt.Errorf("cannot parse timestamp from field %q: %s", timeField, err)
}
f.Value = ""
if nsecs == 0 {
nsecs = time.Now().UnixNano()
}
return nsecs, nil
}
return time.Now().UnixNano(), nil
}
func parseTimestamp(s string) (int64, error) {
if s == "" || s == "0" {
return time.Now().UnixNano(), nil
}
if len(s) <= len("YYYY") || s[len("YYYY")] != '-' {
return ParseUnixTimestamp(s)
}
nsecs, ok := logstorage.TryParseTimestampRFC3339Nano(s)
if !ok {
return 0, fmt.Errorf("cannot unmarshal rfc3339 timestamp %q", s)
}
return nsecs, nil
}
// ParseUnixTimestamp parses s as unix timestamp in either seconds or milliseconds and returns the parsed timestamp in nanoseconds.
func ParseUnixTimestamp(s string) (int64, error) {
n, err := strconv.ParseInt(s, 10, 64)
if err != nil {
return 0, fmt.Errorf("cannot parse unix timestamp from %q: %w", s, err)
}
if n < (1<<31) && n >= (-1<<31) {
// The timestamp is in seconds. Convert it to milliseconds
n *= 1e3
}
if n > int64(math.MaxInt64)/1e6 {
return 0, fmt.Errorf("too big timestamp in milliseconds: %d; mustn't exceed %d", n, int64(math.MaxInt64)/1e6)
}
if n < int64(math.MinInt64)/1e6 {
return 0, fmt.Errorf("too small timestamp in milliseconds: %d; must be bigger than %d", n, int64(math.MinInt64)/1e6)
}
n *= 1e6
return n, nil
}

View file

@ -27,25 +27,41 @@ func TestExtractTimestampRFC3339NanoFromFields_Success(t *testing.T) {
}
}
// UTC time
f("time", []logstorage.Field{
{Name: "foo", Value: "bar"},
{Name: "time", Value: "2024-06-18T23:37:20Z"},
}, 1718753840000000000)
// Time with timezone
f("time", []logstorage.Field{
{Name: "foo", Value: "bar"},
{Name: "time", Value: "2024-06-18T23:37:20+08:00"},
}, 1718725040000000000)
// SQL datetime format
f("time", []logstorage.Field{
{Name: "foo", Value: "bar"},
{Name: "time", Value: "2024-06-18T23:37:20.123-05:30"},
{Name: "time", Value: "2024-06-18 23:37:20.123-05:30"},
}, 1718773640123000000)
// Time with nanosecond precision
f("time", []logstorage.Field{
{Name: "time", Value: "2024-06-18T23:37:20.123456789-05:30"},
{Name: "foo", Value: "bar"},
}, 1718773640123456789)
// Unix timestamp in milliseconds
f("time", []logstorage.Field{
{Name: "foo", Value: "bar"},
{Name: "time", Value: "1718773640123"},
}, 1718773640123000000)
// Unix timestamp in seconds
f("time", []logstorage.Field{
{Name: "foo", Value: "bar"},
{Name: "time", Value: "1718773640"},
}, 1718773640000000000)
}
func TestExtractTimestampRFC3339NanoFromFields_Error(t *testing.T) {

View file

@ -15,7 +15,7 @@ according to [these docs](https://docs.victoriametrics.com/victorialogs/quicksta
## tip
* FEATURE: [ElasticSearch bulk API](https://docs.victoriametrics.com/victorialogs/data-ingestion/#elasticsearch-bulk-api): accept timestamps in seconds in the ingested logs.
* FEATURE: [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/): accept Unix timestamps in seconds in the ingested logs.
## [v0.31.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v0.31.0-victorialogs)

View file

@ -55,9 +55,6 @@ Otherwise the timestamp field must be in one of the following formats:
- Unix timestamp in seconds or in milliseconds. For example, `1686026893` (seconds) or `1686026893735` (milliseconds).
For example, `2023-06-20T15:32:10Z` or `2023-06-20 15:32:10.123456789+02:00`.
If timezone information is missing (for example, `2023-06-20 15:32:10`), then the time is parsed in the local timezone of the host where VictoriaLogs runs.
See [these docs](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for details on fields,
which must be present in the ingested log messages.
@ -104,9 +101,14 @@ It is possible to push unlimited number of log lines in a single request to this
If the [timestamp field](https://docs.victoriametrics.com/victorialogs/keyconcepts/#time-field) is set to `"0"`,
then the current timestamp at VictoriaLogs side is used per each ingested log line.
Otherwise the timestamp field must be in the [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) or [RFC3339](https://www.rfc-editor.org/rfc/rfc3339) format.
Otherwise the timestamp field must be in one of the following formats:
- [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) or [RFC3339](https://www.rfc-editor.org/rfc/rfc3339).
For example, `2023-06-20T15:32:10Z` or `2023-06-20 15:32:10.123456789+02:00`.
If timezone information is missing (for example, `2023-06-20 15:32:10`), then the time is parsed in the local timezone of the host where VictoriaLogs runs.
If timezone information is missing (for example, `2023-06-20 15:32:10`),
then the time is parsed in the local timezone of the host where VictoriaLogs runs.
- Unix timestamp in seconds or in milliseconds. For example, `1686026893` (seconds) or `1686026893735` (milliseconds).
See [these docs](https://docs.victoriametrics.com/victorialogs/keyconcepts/#data-model) for details on fields,
which must be present in the ingested log messages.

View file

@ -135,7 +135,15 @@ during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-inges
### Time field
The ingested [log entries](#data-model) may contain `_time` field with the timestamp of the ingested log entry.
The timestamp must be in [RFC3339](https://www.rfc-editor.org/rfc/rfc3339) or [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) format.
The timestamp field must be in one of the following formats:
- [ISO8601](https://en.wikipedia.org/wiki/ISO_8601) or [RFC3339](https://www.rfc-editor.org/rfc/rfc3339).
For example, `2023-06-20T15:32:10Z` or `2023-06-20 15:32:10.123456789+02:00`.
If timezone information is missing (for example, `2023-06-20 15:32:10`),
then the time is parsed in the local timezone of the host where VictoriaLogs runs.
- Unix timestamp in seconds or in milliseconds. For example, `1686026893` (seconds) or `1686026893735` (milliseconds).
For example, the following [log entry](#data-model) contains valid timestamp with millisecond precision in the `_time` field:
```json
@ -152,7 +160,7 @@ field via `_time_field` query arg during [data ingestion](https://docs.victoriam
For example, if timestamp is located in the `event.created` field, then specify `_time_field=event.created` query arg
during [data ingestion](https://docs.victoriametrics.com/victorialogs/data-ingestion/).
If `_time` field is missing, then the data ingestion time is used as log entry timestamp.
If `_time` field is missing or if it equals `0`, then the data ingestion time is used as log entry timestamp.
The `_time` field is used in [time filter](https://docs.victoriametrics.com/victorialogs/logsql/#time-filter) for quickly narrowing down
the search to a particular time range.