From 15587dc63ee6d10e5b100bce31b257f0dbfa2656 Mon Sep 17 00:00:00 2001 From: Aliaksandr Valialkin Date: Thu, 9 May 2024 17:10:24 +0200 Subject: [PATCH] wip --- app/logsgenerator/Makefile | 7 - app/logsgenerator/README.md | 1 - app/vlogsgenerator/Makefile | 7 + app/vlogsgenerator/README.md | 126 +++++++++++++++ app/{logsgenerator => vlogsgenerator}/main.go | 150 ++++++++++++++---- lib/logstorage/block_header.go | 12 +- 6 files changed, 261 insertions(+), 42 deletions(-) delete mode 100644 app/logsgenerator/Makefile delete mode 100644 app/logsgenerator/README.md create mode 100644 app/vlogsgenerator/Makefile create mode 100644 app/vlogsgenerator/README.md rename app/{logsgenerator => vlogsgenerator}/main.go (52%) diff --git a/app/logsgenerator/Makefile b/app/logsgenerator/Makefile deleted file mode 100644 index 73ef05cdf..000000000 --- a/app/logsgenerator/Makefile +++ /dev/null @@ -1,7 +0,0 @@ -# All these commands must run from repository root. - -logsgenerator: - APP_NAME=logsgenerator $(MAKE) app-local - -logsgenerator-race: - APP_NAME=logsgenerator RACE=-race $(MAKE) app-local diff --git a/app/logsgenerator/README.md b/app/logsgenerator/README.md deleted file mode 100644 index 899b40661..000000000 --- a/app/logsgenerator/README.md +++ /dev/null @@ -1 +0,0 @@ -Logs generator diff --git a/app/vlogsgenerator/Makefile b/app/vlogsgenerator/Makefile new file mode 100644 index 000000000..731244250 --- /dev/null +++ b/app/vlogsgenerator/Makefile @@ -0,0 +1,7 @@ +# All these commands must run from repository root. + +vlogsgenerator: + APP_NAME=vlogsgenerator $(MAKE) app-local + +vlogsgenerator-race: + APP_NAME=vlogsgenerator RACE=-race $(MAKE) app-local diff --git a/app/vlogsgenerator/README.md b/app/vlogsgenerator/README.md new file mode 100644 index 000000000..bb70bf0d7 --- /dev/null +++ b/app/vlogsgenerator/README.md @@ -0,0 +1,126 @@ +# vlogsgenerator + +Logs generator for [VictoriaLogs](https://docs.victoriametrics.com/victorialogs/). + +## How to build vlogsgenerator? + +Run `make vlogsgenerator` from the repository root. This builds `bin/vlogsgenerator` binary. + +## How to generate logs? + +`vlogsgenerator` generates logs in [JSON line format](https://jsonlines.org/) suitable for the ingestion +via [`/insert/jsonline` endpoint at VictoriaLogs](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api). + +By default it writes the generated logs into `stdout`. For example, the following command writes generated logs to `stdout`: + +``` +bin/vlogsgenerator +``` + +It is possible to redirect the generated logs to file. For example, the following command writes the generated logs to `logs.json` file: + +``` +bin/vlogsgenerator > logs.json +``` + +The generated logs at `logs.json` file can be inspected with the following command: + +``` +head logs.json | jq . +``` + +Below is an example output: + +```json +{ + "_time": "2024-05-08T14:34:00.854Z", + "_msg": "message for the stream 8 and worker 0; ip=185.69.136.129; uuid=b4fe8f1a-c93c-dea3-ba11-5b9f0509291e; u64=8996587920687045253", + "host": "host_8", + "worker_id": "0", + "const_0": "some value 0 8", + "const_1": "some value 1 8", + "const_2": "some value 2 8", + "var_0": "some value 0 12752539384823438260", + "dict_0": "warn", + "dict_1": "info", + "u8_0": "6", + "u16_0": "35202", + "u32_0": "1964973739", + "u64_0": "4810489083243239145", + "float_0": "1.868", + "ip_0": "250.34.75.125", + "timestamp_0": "1799-03-16T01:34:18.311Z" +} +{ + "_time": "2024-05-08T14:34:00.854Z", + "_msg": "message for the stream 9 and worker 0; ip=164.244.254.194; uuid=7e8373b1-ce0d-1ce7-8e96-4bcab8955598; u64=13949903463741076522", + "host": "host_9", + "worker_id": "0", + "const_0": "some value 0 9", + "const_1": "some value 1 9", + "const_2": "some value 2 9", + "var_0": "some value 0 5371555382075206134", + "dict_0": "INFO", + "dict_1": "FATAL", + "u8_0": "219", + "u16_0": "31459", + "u32_0": "3918836777", + "u64_0": "6593354256620219850", + "float_0": "1.085", + "ip_0": "253.151.88.158", + "timestamp_0": "2042-10-05T16:42:57.082Z" +} +``` + +The generated logs can be written directly to VictoriaLogs by passing the address of [`/insert/jsonline` endpoint](https://docs.victoriametrics.com/victorialogs/data-ingestion/#json-stream-api) +to `-addr` command-line flag. For example, the following command writes the generated logs to VictoriaLogs running at `localhost`: + +``` +bin/vlogsgenerator -addr=http://localhost:9428/insert/jsonline +``` + +`vlogsgenerator` accepts various command-line flags, which can be used for configuring the number and the shape of the generated logs. +These flags can be inspected by running `vlogsgenerator -help`. Below are the most interesting flags: + +* `-start` - starting timestamp for generating logs. Logs are evenly generated on the [`-start` ... `-end`] interval. +* `-end` - ending timestamp for generating logs. Logs are evenly generated on the [`-start` ... `-end`] interval. +* `-activeStreams` - the number of active [log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields) to generate. +* `-logsPerStream` - the number of log entries to generate per each log stream. Log entries are evenly distributed on the [`-start` ... `-end`] interval. + +The total number of generated logs can be calculated as `-activeStreams` * `-logsPerStream`. + +For example, the following command generates `1_000_000` log entries on the time range `[2024-01-01 - 2024-02-01]` across `100` +[log streams](https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields), where every logs stream contains `10_000` log entries, +and writes them to `http://localhost:9428/insert/jsonline`: + +``` +bin/vlogsgenerator \ + -start=2024-01-01 -end=2024-02-01 \ + -activeStreams=100 \ + -logsPerStream=10_000 \ + -addr=http://localhost:9428/insert/jsonline +``` + +By default `vlogsgenerator` generates and writes logs by a single worker. This may limit the maximum data ingestion rate during benchmarks. +The number of workers can be changed via `-workers` command-line flag. For example, the following command generates and writes logs with `16` workers: + +``` +bin/vlogsgenerator \ + -start=2024-01-01 -end=2024-02-01 \ + -activeStreams=100 \ + -logsPerStream=10_000 \ + -addr=http://localhost:9428/insert/jsonline \ + -workers=16 +``` + +Every 10 seconds `vlogsgenerator` writes statistics about the generated logs into `stderr`. The frequency of the generated statistics can be adjusted via `-statInterval` command-line flag. +For example, the following command writes statistics every 2 seconds: + +``` +bin/vlogsgenerator \ + -start=2024-01-01 -end=2024-02-01 \ + -activeStreams=100 \ + -logsPerStream=10_000 \ + -addr=http://localhost:9428/insert/jsonline \ + -statInterval=2s +``` diff --git a/app/logsgenerator/main.go b/app/vlogsgenerator/main.go similarity index 52% rename from app/logsgenerator/main.go rename to app/vlogsgenerator/main.go index 3d57c42ba..225e59e19 100644 --- a/app/logsgenerator/main.go +++ b/app/vlogsgenerator/main.go @@ -5,9 +5,12 @@ import ( "flag" "fmt" "io" + "math" + "math/rand" "net/http" "net/url" "os" + "strconv" "sync" "sync/atomic" "time" @@ -19,18 +22,34 @@ import ( ) var ( - addr = flag.String("addr", "http://localhost:9428/insert/jsonline", "HTTP address to push the generated logs to") + addr = flag.String("addr", "stdout", "HTTP address to push the generated logs to; if it is set to stdout, then logs are generated to stdout") workers = flag.Int("workers", 1, "The number of workers to use to push logs to -addr") start = newTimeFlag("start", "-1d", "Generated logs start from this time; see https://docs.victoriametrics.com/#timestamp-formats") end = newTimeFlag("end", "0s", "Generated logs end at this time; see https://docs.victoriametrics.com/#timestamp-formats") - activeStreams = flag.Int("activeStreams", 1_000, "The number of active log streams to generate; see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields") - totalStreams = flag.Int("totalStreams", 2_000, "The number of total log streams; if -totalStreams > -activeStreams, then some active streams are substituted with new streams "+ + activeStreams = flag.Int("activeStreams", 100, "The number of active log streams to generate; see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#stream-fields") + totalStreams = flag.Int("totalStreams", 0, "The number of total log streams; if -totalStreams > -activeStreams, then some active streams are substituted with new streams "+ "during data generation") - logsPerStream = flag.Int64("logsPerStream", 1_000, "The number of log entries to generate per each log stream. Log entries are evenly distributed between -start and -end") - varFieldsPerLog = flag.Int("varFieldsPerLog", 3, "The number of additional fields with variable values to generate per each log entry; "+ + logsPerStream = flag.Int64("logsPerStream", 1_000, "The number of log entries to generate per each log stream. Log entries are evenly distributed between -start and -end") + constFieldsPerLog = flag.Int("constFieldsPerLog", 3, "The number of fields with constaint values to generate per each log entry; "+ "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") - constFieldsPerLog = flag.Int("constFieldsPerLog", 3, "The number of additional fields with constaint values to generate per each log entry; "+ + varFieldsPerLog = flag.Int("varFieldsPerLog", 1, "The number of fields with variable values to generate per each log entry; "+ + "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") + dictFieldsPerLog = flag.Int("dictFieldsPerLog", 2, "The number of fields with up to 8 different values to generate per each log entry; "+ + "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") + u8FieldsPerLog = flag.Int("u8FieldsPerLog", 1, "The number of fields with uint8 values to generate per each log entry; "+ + "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") + u16FieldsPerLog = flag.Int("u16FieldsPerLog", 1, "The number of fields with uint16 values to generate per each log entry; "+ + "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") + u32FieldsPerLog = flag.Int("u32FieldsPerLog", 1, "The number of fields with uint32 values to generate per each log entry; "+ + "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") + u64FieldsPerLog = flag.Int("u64FieldsPerLog", 1, "The number of fields with uint64 values to generate per each log entry; "+ + "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") + floatFieldsPerLog = flag.Int("floatFieldsPerLog", 1, "The number of fields with float64 values to generate per each log entry; "+ + "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") + ipFieldsPerLog = flag.Int("ipFieldsPerLog", 1, "The number of fields with IPv4 values to generate per each log entry; "+ + "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") + timestampFieldsPerLog = flag.Int("timestampFieldsPerLog", 1, "The number of fields with ISO8601 timestamps per each log entry; "+ "see https://docs.victoriametrics.com/VictoriaLogs/keyConcepts.html#data-model") statInterval = flag.Duration("statInterval", 10*time.Second, "The interval between publishing the stats") @@ -43,16 +62,20 @@ func main() { buildinfo.Init() logger.Init() - remoteWriteURL, err := url.Parse(*addr) - if err != nil { - logger.Fatalf("cannot parse -addr=%q: %s", *addr, err) + var remoteWriteURL *url.URL + if *addr != "stdout" { + urlParsed, err := url.Parse(*addr) + if err != nil { + logger.Fatalf("cannot parse -addr=%q: %s", *addr, err) + } + qs, err := url.ParseQuery(urlParsed.RawQuery) + if err != nil { + logger.Fatalf("cannot parse query string in -addr=%q: %w", *addr, err) + } + qs.Set("_stream_fields", "host,worker_id") + urlParsed.RawQuery = qs.Encode() + remoteWriteURL = urlParsed } - qs, err := url.ParseQuery(remoteWriteURL.RawQuery) - if err != nil { - logger.Fatalf("cannot parse query string in -addr=%q: %w", *addr, err) - } - qs.Set("_stream_fields", "host,worker_id") - remoteWriteURL.RawQuery = qs.Encode() if start.nsec >= end.nsec { logger.Fatalf("-start=%s must be smaller than -end=%s", start, end) @@ -63,12 +86,6 @@ func main() { if *logsPerStream <= 0 { logger.Fatalf("-logsPerStream must be bigger than 0; got %d", *logsPerStream) } - if *varFieldsPerLog <= 0 { - logger.Fatalf("-varFieldsPerLog must be bigger than 0; got %d", *varFieldsPerLog) - } - if *constFieldsPerLog <= 0 { - logger.Fatalf("-constFieldsPerLog must be bigger than 0; got %d", *constFieldsPerLog) - } if *totalStreams < *activeStreams { *totalStreams = *activeStreams } @@ -165,6 +182,14 @@ func generateAndPushLogs(cfg *workerConfig, workerID int) { close(doneCh) }() + if cfg.url == nil { + _, err := io.Copy(os.Stdout, pr) + if err != nil { + logger.Fatalf("unexpected error when writing logs to stdout: %s", err) + } + return + } + req, err := http.NewRequest("POST", cfg.url.String(), pr) if err != nil { logger.Fatalf("cannot create request to %q: %s", cfg.url, err) @@ -190,29 +215,69 @@ func generateLogs(bw *bufio.Writer, workerID, activeStreams, totalStreams int) { currNsec := start.nsec for currNsec < end.nsec { firstStreamID := int((currNsec - start.nsec) / streamStep) - generateLogsAtTimestamp(bw, workerID, currNsec, firstStreamID, activeStreams, *varFieldsPerLog, *constFieldsPerLog) + generateLogsAtTimestamp(bw, workerID, currNsec, firstStreamID, activeStreams) currNsec += step } } -func generateLogsAtTimestamp(bw *bufio.Writer, workerID int, ts int64, firstStreamID, activeStreams, varFieldsPerEntry, constFieldsPerEntry int) { +func generateLogsAtTimestamp(bw *bufio.Writer, workerID int, ts int64, firstStreamID, activeStreams int) { streamID := firstStreamID timeStr := toRFC3339(ts) for i := 0; i < activeStreams; i++ { - fmt.Fprintf(bw, `{"_time":%q,"_msg":"message #%d (%d) for the stream %d and worker %d; some foo bar baz error warn 1.2.3.4","host":"host_%d","worker_id":"%d"`, - timeStr, ts, i, streamID, workerID, streamID, workerID) - for j := 0; j < varFieldsPerEntry; j++ { - fmt.Fprintf(bw, `,"var_field_%d":"value_%d_%d_%d"`, j, i, j, streamID) + ip := toIPv4(rand.Uint32()) + uuid := toUUID(rand.Uint64(), rand.Uint64()) + fmt.Fprintf(bw, `{"_time":%q,"_msg":"message for the stream %d and worker %d; ip=%s; uuid=%s; u64=%d","host":"host_%d","worker_id":"%d"`, + timeStr, streamID, workerID, ip, uuid, rand.Uint64(), streamID, workerID) + for j := 0; j < *constFieldsPerLog; j++ { + fmt.Fprintf(bw, `,"const_%d":"some value %d %d"`, j, j, streamID) } - for j := 0; j < constFieldsPerEntry; j++ { - fmt.Fprintf(bw, `,"const_field_%d":"value_%d_%d"`, j, j, streamID) + for j := 0; j < *varFieldsPerLog; j++ { + fmt.Fprintf(bw, `,"var_%d":"some value %d %d"`, j, j, rand.Uint64()) + } + for j := 0; j < *dictFieldsPerLog; j++ { + fmt.Fprintf(bw, `,"dict_%d":"%s"`, j, dictValues[rand.Intn(len(dictValues))]) + } + for j := 0; j < *u8FieldsPerLog; j++ { + fmt.Fprintf(bw, `,"u8_%d":"%d"`, j, uint8(rand.Uint32())) + } + for j := 0; j < *u16FieldsPerLog; j++ { + fmt.Fprintf(bw, `,"u16_%d":"%d"`, j, uint16(rand.Uint32())) + } + for j := 0; j < *u32FieldsPerLog; j++ { + fmt.Fprintf(bw, `,"u32_%d":"%d"`, j, rand.Uint32()) + } + for j := 0; j < *u64FieldsPerLog; j++ { + fmt.Fprintf(bw, `,"u64_%d":"%d"`, j, rand.Uint64()) + } + for j := 0; j < *floatFieldsPerLog; j++ { + fmt.Fprintf(bw, `,"float_%d":"%v"`, j, math.Round(10_000*rand.Float64())/1000) + } + for j := 0; j < *ipFieldsPerLog; j++ { + ip := toIPv4(rand.Uint32()) + fmt.Fprintf(bw, `,"ip_%d":"%s"`, j, ip) + } + for j := 0; j < *timestampFieldsPerLog; j++ { + timestamp := toISO8601(int64(rand.Uint64())) + fmt.Fprintf(bw, `,"timestamp_%d":"%s"`, j, timestamp) } fmt.Fprintf(bw, "}\n") logEntriesCount.Add(1) + streamID++ } } +var dictValues = []string{ + "debug", + "info", + "warn", + "error", + "fatal", + "ERROR", + "FATAL", + "INFO", +} + func newTimeFlag(name, defaultValue, description string) *timeFlag { var tf timeFlag if err := tf.Set(defaultValue); err != nil { @@ -242,5 +307,30 @@ func (tf *timeFlag) String() string { } func toRFC3339(nsec int64) string { - return time.Unix(nsec/1e9, nsec%1e9).UTC().Format(time.RFC3339Nano) + return time.Unix(0, nsec).UTC().Format(time.RFC3339Nano) +} + +func toISO8601(nsec int64) string { + return time.Unix(0, nsec).UTC().Format("2006-01-02T15:04:05.000Z") +} + +func toIPv4(n uint32) string { + dst := make([]byte, 0, len("255.255.255.255")) + dst = marshalUint64(dst, uint64(n>>24)) + dst = append(dst, '.') + dst = marshalUint64(dst, uint64((n>>16)&0xff)) + dst = append(dst, '.') + dst = marshalUint64(dst, uint64((n>>8)&0xff)) + dst = append(dst, '.') + dst = marshalUint64(dst, uint64(n&0xff)) + return string(dst) +} + +func toUUID(a, b uint64) string { + return fmt.Sprintf("%08x-%04x-%04x-%04x-%012x", a&(1<<32-1), (a>>32)&(1<<16-1), (a >> 48), b&(1<<16-1), b>>16) +} + +// marshalUint64 appends string representation of n to dst and returns the result. +func marshalUint64(dst []byte, n uint64) []byte { + return strconv.AppendUint(dst, n, 10) } diff --git a/lib/logstorage/block_header.go b/lib/logstorage/block_header.go index 72b463cd7..454e27eaf 100644 --- a/lib/logstorage/block_header.go +++ b/lib/logstorage/block_header.go @@ -423,12 +423,16 @@ func (ch *columnHeader) marshal(dst []byte) []byte { minValue := math.Float64frombits(ch.minValue) maxValue := math.Float64frombits(ch.maxValue) if minValue > maxValue { - logger.Panicf("BUG: minValue=%g must be smaller than maxValue=%g", minValue, maxValue) + logger.Panicf("BUG: minValue=%g must be smaller than maxValue=%g for valueTypeFloat64", minValue, maxValue) } - } else { - if ch.minValue > ch.maxValue { - logger.Panicf("BUG: minValue=%d must be smaller than maxValue=%d", ch.minValue, ch.maxValue) + } else if ch.valueType == valueTypeTimestampISO8601 { + minValue := int64(ch.minValue) + maxValue := int64(ch.maxValue) + if minValue > maxValue { + logger.Panicf("BUG: minValue=%g must be smaller than maxValue=%g for valueTypeTimestampISO8601", minValue, maxValue) } + } else if ch.minValue > ch.maxValue { + logger.Panicf("BUG: minValue=%d must be smaller than maxValue=%d for valueType=%d", ch.minValue, ch.maxValue, ch.valueType) } // Encode common fields - ch.name and ch.valueType