app/vmselect/netstorage: improve code readability a bit after 6c84b61893

Updates https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4364
This commit is contained in:
Aliaksandr Valialkin 2023-07-05 20:35:44 -07:00
parent 11ac551d52
commit 643e99a157
No known key found for this signature in database
GPG key ID: A72BEC6CD3D0DED1

View file

@ -2718,44 +2718,14 @@ func initStorageNodes(addrs []string) *storageNodesBucket {
sns := make([]*storageNode, 0, len(addrs))
var wg sync.WaitGroup
ms := metrics.NewSet()
// initialize connections to storage nodes in parallel in order speed up the initialization
// for big number of storage nodes.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4364
for _, addr := range addrs {
wg.Add(1)
go func(addr string) {
defer wg.Done()
if _, _, err := net.SplitHostPort(addr); err != nil {
// Automatically add missing port.
addr += ":8401"
}
sn := &storageNode{
// There is no need in requests compression, since they are usually very small.
connPool: netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout),
concurrentQueries: ms.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
registerMetricNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
registerMetricNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tenantsRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tenants", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tenantsErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tenants", type="rpcClient", name="vmselect", addr=%q}`, addr)),
metricBlocksRead: ms.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
metricRowsRead: ms.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
}
sn := newStorageNode(ms, addr)
snsLock.Lock()
sns = append(sns, sn)
snsLock.Unlock()
@ -2769,6 +2739,46 @@ func initStorageNodes(addrs []string) *storageNodesBucket {
}
}
func newStorageNode(ms *metrics.Set, addr string) *storageNode {
if _, _, err := net.SplitHostPort(addr); err != nil {
// Automatically add missing port.
addr += ":8401"
}
// There is no need in requests compression, since vmselect requests are usually very small.
connPool := netutil.NewConnPool(ms, "vmselect", addr, handshake.VMSelectClient, 0, *vmstorageDialTimeout)
sn := &storageNode{
connPool: connPool,
concurrentQueries: ms.NewCounter(fmt.Sprintf(`vm_concurrent_queries{name="vmselect", addr=%q}`, addr)),
registerMetricNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
registerMetricNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="registerMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
deleteSeriesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="deleteSeries", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
labelValuesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="labelValues", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tagValueSuffixesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tagValueSuffixes", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tsdbStatusErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tsdbStatus", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
seriesCountErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="seriesCount", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchMetricNamesErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="searchMetricNames", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
searchErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="search", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tenantsRequests: ms.NewCounter(fmt.Sprintf(`vm_requests_total{action="tenants", type="rpcClient", name="vmselect", addr=%q}`, addr)),
tenantsErrors: ms.NewCounter(fmt.Sprintf(`vm_request_errors_total{action="tenants", type="rpcClient", name="vmselect", addr=%q}`, addr)),
metricBlocksRead: ms.NewCounter(fmt.Sprintf(`vm_metric_blocks_read_total{name="vmselect", addr=%q}`, addr)),
metricRowsRead: ms.NewCounter(fmt.Sprintf(`vm_metric_rows_read_total{name="vmselect", addr=%q}`, addr)),
}
return sn
}
func mustStopStorageNodes(snb *storageNodesBucket) {
for _, sn := range snb.sns {
sn.connPool.MustStop()