package promscrape

import (
	"bytes"
	"context"
	"flag"
	"fmt"
	"io"
	"sync"
	"sync/atomic"
	"time"

	"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompbmarshal"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/azure"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consul"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/consulagent"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/digitalocean"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dns"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/docker"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/dockerswarm"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/ec2"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/eureka"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/gce"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/http"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kubernetes"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/kuma"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/nomad"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/openstack"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promscrape/discovery/yandexcloud"
	"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutils"
	"github.com/VictoriaMetrics/metrics"
)

var (
	configCheckInterval = flag.Duration("promscrape.configCheckInterval", 0, "Interval for checking for changes in '-promscrape.config' file. "+
		"By default, the checking is disabled. Send SIGHUP signal in order to force config check for changes")
	suppressDuplicateScrapeTargetErrors = flag.Bool("promscrape.suppressDuplicateScrapeTargetErrors", false, "Whether to suppress 'duplicate scrape target' errors; "+
		"see https://docs.victoriametrics.com/vmagent.html#troubleshooting for details")
	promscrapeConfigFile = flag.String("promscrape.config", "", "Optional path to Prometheus config file with 'scrape_configs' section containing targets to scrape. "+
		"The path can point to local file and to http url. "+
		"See https://docs.victoriametrics.com/#how-to-scrape-prometheus-exporters-such-as-node-exporter for details")

	fileSDCheckInterval = flag.Duration("promscrape.fileSDCheckInterval", time.Minute, "Interval for checking for changes in 'file_sd_config'. "+
		"See https://docs.victoriametrics.com/sd_configs.html#file_sd_configs for details")
)

// CheckConfig checks -promscrape.config for errors and unsupported options.
func CheckConfig() error {
	if *promscrapeConfigFile == "" {
		return nil
	}
	_, _, err := loadConfig(*promscrapeConfigFile)
	return err
}

// Init initializes Prometheus scraper with config from the `-promscrape.config`.
//
// Scraped data is passed to pushData.
func Init(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) {
	mustInitClusterMemberID()
	globalStopChan = make(chan struct{})
	scraperWG.Add(1)
	go func() {
		defer scraperWG.Done()
		runScraper(*promscrapeConfigFile, pushData, globalStopChan)
	}()
}

// Stop stops Prometheus scraper.
func Stop() {
	close(globalStopChan)
	scraperWG.Wait()
}

var (
	globalStopChan chan struct{}
	scraperWG      sync.WaitGroup
	// PendingScrapeConfigs - zero value means, that
	// all scrapeConfigs are inited and ready for work.
	PendingScrapeConfigs int32

	// configData contains -promscrape.config data
	configData atomic.Pointer[[]byte]
)

// WriteConfigData writes -promscrape.config contents to w
func WriteConfigData(w io.Writer) {
	p := configData.Load()
	if p == nil {
		// Nothing to write to w
		return
	}
	_, _ = w.Write(*p)
}

func runScraper(configFile string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) {
	if configFile == "" {
		// Nothing to scrape.
		return
	}

	metrics.RegisterSet(configMetricsSet)

	// Register SIGHUP handler for config reload before loadConfig.
	// This guarantees that the config will be re-read if the signal arrives just after loadConfig.
	// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1240
	sighupCh := procutil.NewSighupChan()

	logger.Infof("reading Prometheus configs from %q", configFile)
	cfg, data, err := loadConfig(configFile)
	if err != nil {
		logger.Fatalf("cannot read %q: %s", configFile, err)
	}
	marshaledData := cfg.marshal()
	configData.Store(&marshaledData)
	cfg.mustStart()

	configSuccess.Set(1)
	configTimestamp.Set(fasttime.UnixTimestamp())

	scs := newScrapeConfigs(pushData, globalStopCh)
	scs.add("azure_sd_configs", *azure.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getAzureSDScrapeWork(swsPrev) })
	scs.add("consul_sd_configs", *consul.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulSDScrapeWork(swsPrev) })
	scs.add("consulagent_sd_configs", *consulagent.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getConsulAgentSDScrapeWork(swsPrev) })
	scs.add("digitalocean_sd_configs", *digitalocean.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDigitalOceanDScrapeWork(swsPrev) })
	scs.add("dns_sd_configs", *dns.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDNSSDScrapeWork(swsPrev) })
	scs.add("docker_sd_configs", *docker.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDockerSDScrapeWork(swsPrev) })
	scs.add("dockerswarm_sd_configs", *dockerswarm.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getDockerSwarmSDScrapeWork(swsPrev) })
	scs.add("ec2_sd_configs", *ec2.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEC2SDScrapeWork(swsPrev) })
	scs.add("eureka_sd_configs", *eureka.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getEurekaSDScrapeWork(swsPrev) })
	scs.add("file_sd_configs", *fileSDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getFileSDScrapeWork(swsPrev) })
	scs.add("gce_sd_configs", *gce.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getGCESDScrapeWork(swsPrev) })
	scs.add("http_sd_configs", *http.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getHTTPDScrapeWork(swsPrev) })
	scs.add("kubernetes_sd_configs", *kubernetes.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKubernetesSDScrapeWork(swsPrev) })
	scs.add("kuma_sd_configs", *kuma.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getKumaSDScrapeWork(swsPrev) })
	scs.add("nomad_sd_configs", *nomad.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getNomadSDScrapeWork(swsPrev) })
	scs.add("openstack_sd_configs", *openstack.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getOpenStackSDScrapeWork(swsPrev) })
	scs.add("yandexcloud_sd_configs", *yandexcloud.SDCheckInterval, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getYandexCloudSDScrapeWork(swsPrev) })
	scs.add("static_configs", 0, func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork { return cfg.getStaticScrapeWork() })

	var tickerCh <-chan time.Time
	if *configCheckInterval > 0 {
		ticker := time.NewTicker(*configCheckInterval)
		tickerCh = ticker.C
		defer ticker.Stop()
	}
	for {
		scs.updateConfig(cfg)
	waitForChans:
		select {
		case <-sighupCh:
			logger.Infof("SIGHUP received; reloading Prometheus configs from %q", configFile)
			cfgNew, dataNew, err := loadConfig(configFile)
			if err != nil {
				configReloadErrors.Inc()
				configSuccess.Set(0)
				logger.Errorf("cannot read %q on SIGHUP: %s; continuing with the previous config", configFile, err)
				goto waitForChans
			}
			if bytes.Equal(data, dataNew) {
				configSuccess.Set(1)
				logger.Infof("nothing changed in %q", configFile)
				goto waitForChans
			}
			cfgNew.mustRestart(cfg)
			cfg = cfgNew
			data = dataNew
			marshaledData = cfgNew.marshal()
			configData.Store(&marshaledData)
		case <-tickerCh:
			cfgNew, dataNew, err := loadConfig(configFile)
			if err != nil {
				configReloadErrors.Inc()
				configSuccess.Set(0)
				logger.Errorf("cannot read %q: %s; continuing with the previous config", configFile, err)
				goto waitForChans
			}
			if bytes.Equal(data, dataNew) {
				configSuccess.Set(1)
				// Nothing changed since the previous loadConfig
				goto waitForChans
			}
			cfgNew.mustRestart(cfg)
			cfg = cfgNew
			data = dataNew
			marshaledData = cfgNew.marshal()
			configData.Store(&marshaledData)
		case <-globalStopCh:
			cfg.mustStop()
			logger.Infof("stopping Prometheus scrapers")
			startTime := time.Now()
			scs.stop()
			logger.Infof("stopped Prometheus scrapers in %.3f seconds", time.Since(startTime).Seconds())
			return
		}
		logger.Infof("found changes in %q; applying these changes", configFile)
		configReloads.Inc()
		configSuccess.Set(1)
		configTimestamp.Set(fasttime.UnixTimestamp())
	}
}

var (
	configMetricsSet   = metrics.NewSet()
	configReloads      = configMetricsSet.NewCounter(`vm_promscrape_config_reloads_total`)
	configReloadErrors = configMetricsSet.NewCounter(`vm_promscrape_config_reloads_errors_total`)
	configSuccess      = configMetricsSet.NewCounter(`vm_promscrape_config_last_reload_successful`)
	configTimestamp    = configMetricsSet.NewCounter(`vm_promscrape_config_last_reload_success_timestamp_seconds`)
)

type scrapeConfigs struct {
	pushData     func(at *auth.Token, wr *prompbmarshal.WriteRequest)
	wg           sync.WaitGroup
	stopCh       chan struct{}
	globalStopCh <-chan struct{}
	scfgs        []*scrapeConfig
}

func newScrapeConfigs(pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scrapeConfigs {
	return &scrapeConfigs{
		pushData:     pushData,
		stopCh:       make(chan struct{}),
		globalStopCh: globalStopCh,
	}
}

func (scs *scrapeConfigs) add(name string, checkInterval time.Duration, getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork) {
	atomic.AddInt32(&PendingScrapeConfigs, 1)
	scfg := &scrapeConfig{
		name:          name,
		pushData:      scs.pushData,
		getScrapeWork: getScrapeWork,
		checkInterval: checkInterval,
		cfgCh:         make(chan *Config, 1),
		stopCh:        scs.stopCh,

		discoveryDuration: metrics.GetOrCreateHistogram(fmt.Sprintf("vm_promscrape_service_discovery_duration_seconds{type=%q}", name)),
	}
	scs.wg.Add(1)
	go func() {
		defer scs.wg.Done()
		scfg.run(scs.globalStopCh)
	}()
	scs.scfgs = append(scs.scfgs, scfg)
}

func (scs *scrapeConfigs) updateConfig(cfg *Config) {
	for _, scfg := range scs.scfgs {
		scfg.cfgCh <- cfg
	}
}

func (scs *scrapeConfigs) stop() {
	close(scs.stopCh)
	scs.wg.Wait()
	scs.scfgs = nil
}

type scrapeConfig struct {
	name          string
	pushData      func(at *auth.Token, wr *prompbmarshal.WriteRequest)
	getScrapeWork func(cfg *Config, swsPrev []*ScrapeWork) []*ScrapeWork
	checkInterval time.Duration
	cfgCh         chan *Config
	stopCh        <-chan struct{}

	discoveryDuration *metrics.Histogram
}

func (scfg *scrapeConfig) run(globalStopCh <-chan struct{}) {
	sg := newScraperGroup(scfg.name, scfg.pushData, globalStopCh)
	defer sg.stop()

	var tickerCh <-chan time.Time
	if scfg.checkInterval > 0 {
		ticker := time.NewTicker(scfg.checkInterval)
		defer ticker.Stop()
		tickerCh = ticker.C
	}

	cfg := <-scfg.cfgCh
	var swsPrev []*ScrapeWork
	updateScrapeWork := func(cfg *Config) {
		startTime := time.Now()
		sws := scfg.getScrapeWork(cfg, swsPrev)
		sg.update(sws)
		swsPrev = sws
		if sg.scrapersStarted.Get() > 0 {
			// update duration only if at least one scraper has started
			// otherwise this SD is considered as inactive
			scfg.discoveryDuration.UpdateDuration(startTime)
		}
	}
	updateScrapeWork(cfg)
	atomic.AddInt32(&PendingScrapeConfigs, -1)

	for {

		select {
		case <-scfg.stopCh:
			return
		case cfg = <-scfg.cfgCh:
		case <-tickerCh:
		}
		updateScrapeWork(cfg)
	}
}

type scraperGroup struct {
	name     string
	wg       sync.WaitGroup
	mLock    sync.Mutex
	m        map[string]*scraper
	pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)

	changesCount    *metrics.Counter
	activeScrapers  *metrics.Counter
	scrapersStarted *metrics.Counter
	scrapersStopped *metrics.Counter

	globalStopCh <-chan struct{}
}

func newScraperGroup(name string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest), globalStopCh <-chan struct{}) *scraperGroup {
	sg := &scraperGroup{
		name:     name,
		m:        make(map[string]*scraper),
		pushData: pushData,

		changesCount:    metrics.NewCounter(fmt.Sprintf(`vm_promscrape_config_changes_total{type=%q}`, name)),
		activeScrapers:  metrics.NewCounter(fmt.Sprintf(`vm_promscrape_active_scrapers{type=%q}`, name)),
		scrapersStarted: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_started_total{type=%q}`, name)),
		scrapersStopped: metrics.NewCounter(fmt.Sprintf(`vm_promscrape_scrapers_stopped_total{type=%q}`, name)),

		globalStopCh: globalStopCh,
	}
	metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="up"}`, name), func() float64 {
		return float64(tsmGlobal.StatusByGroup(sg.name, true))
	})
	metrics.NewGauge(fmt.Sprintf(`vm_promscrape_targets{type=%q, status="down"}`, name), func() float64 {
		return float64(tsmGlobal.StatusByGroup(sg.name, false))
	})
	return sg
}

func (sg *scraperGroup) stop() {
	sg.mLock.Lock()
	for _, sc := range sg.m {
		sc.cancel()
	}
	sg.m = nil
	sg.mLock.Unlock()
	sg.wg.Wait()
}

func (sg *scraperGroup) update(sws []*ScrapeWork) {
	sg.mLock.Lock()
	defer sg.mLock.Unlock()

	additionsCount := 0
	deletionsCount := 0
	swsMap := make(map[string]*promutils.Labels, len(sws))
	var swsToStart []*ScrapeWork
	for _, sw := range sws {
		key := sw.key()
		originalLabels, ok := swsMap[key]
		if ok {
			if !*suppressDuplicateScrapeTargetErrors {
				logger.Errorf("skipping duplicate scrape target with identical labels; endpoint=%s, labels=%s; "+
					"make sure service discovery and relabeling is set up properly; "+
					"see also https://docs.victoriametrics.com/vmagent.html#troubleshooting; "+
					"original labels for target1: %s; original labels for target2: %s",
					sw.ScrapeURL, sw.Labels.String(), originalLabels.String(), sw.OriginalLabels.String())
			}
			droppedTargetsMap.Register(sw.OriginalLabels, sw.RelabelConfigs)
			continue
		}
		swsMap[key] = sw.OriginalLabels
		if sg.m[key] != nil {
			// The scraper for the given key already exists.
			continue
		}
		swsToStart = append(swsToStart, sw)
	}

	// Stop deleted scrapers before starting new scrapers in order to prevent
	// series overlap when old scrape target is substituted by new scrape target.
	var stoppedChs []<-chan struct{}
	for key, sc := range sg.m {
		if _, ok := swsMap[key]; !ok {
			sc.cancel()
			stoppedChs = append(stoppedChs, sc.stoppedCh)
			delete(sg.m, key)
			deletionsCount++
		}
	}
	// Wait until all the deleted scrapers are stopped before starting new scrapers.
	for _, ch := range stoppedChs {
		<-ch
	}

	// Start new scrapers only after the deleted scrapers are stopped.
	for _, sw := range swsToStart {
		sc := newScraper(sw, sg.name, sg.pushData)
		sg.activeScrapers.Inc()
		sg.scrapersStarted.Inc()
		sg.wg.Add(1)
		tsmGlobal.Register(&sc.sw)
		go func(sw *ScrapeWork) {
			defer func() {
				sg.wg.Done()
				close(sc.stoppedCh)
			}()
			sc.sw.run(sc.ctx.Done(), sg.globalStopCh)
			tsmGlobal.Unregister(&sc.sw)
			sg.activeScrapers.Dec()
			sg.scrapersStopped.Inc()
		}(sw)
		key := sw.key()
		sg.m[key] = sc
		additionsCount++
	}

	if additionsCount > 0 || deletionsCount > 0 {
		sg.changesCount.Add(additionsCount + deletionsCount)
		logger.Infof("%s: added targets: %d, removed targets: %d; total targets: %d", sg.name, additionsCount, deletionsCount, len(sg.m))
	}
}

type scraper struct {
	sw scrapeWork

	ctx    context.Context
	cancel context.CancelFunc

	// stoppedCh is unblocked when the given scraper is stopped.
	stoppedCh chan struct{}
}

func newScraper(sw *ScrapeWork, group string, pushData func(at *auth.Token, wr *prompbmarshal.WriteRequest)) *scraper {
	ctx, cancel := context.WithCancel(context.Background())
	sc := &scraper{
		ctx:       ctx,
		cancel:    cancel,
		stoppedCh: make(chan struct{}),
	}
	c := newClient(ctx, sw)
	sc.sw.Config = sw
	sc.sw.ScrapeGroup = group
	sc.sw.ReadData = c.ReadData
	sc.sw.GetStreamReader = c.GetStreamReader
	sc.sw.PushData = pushData
	return sc
}