Skip to content

goroutine leak #158

@dnmyGo

Description

@dnmyGo

Concurrently collecting GPU metrics, but I have been unable to detect the termination of goroutines, leading to the continuous creation of new goroutines and eventual goroutine leaks , Could you please help check if there is a problem in the code that is causing the continuous wait

package collection

import (
	"encoding/binary"
	"fmt"
	"github.com/NVIDIA/go-nvml/pkg/nvml"
	"rdma-manager/rdma-agent/src/log"
	"runtime/debug"
	"sync"
	"time"
)

type NvLinkStats struct {
	NvLinkTx    uint64
	NvLinkRx    uint64
	TxTimestamp int64
	RxTimestamp int64
}

func GpuInit() {
	ret := nvml.Init()
	if ret != nvml.SUCCESS {
		log.Logger.Error("Unable to initialize NVML: %s", nvml.ErrorString(ret))
	}
}

func GpuShutDown() {
	ret := nvml.Shutdown()
	if ret != nvml.SUCCESS {
		log.Logger.Error("Unable to shutdown NVML:", nvml.ErrorString(ret))
	}
}
func GetGpuCount() int {
	count, ret := nvml.DeviceGetCount()
	if ret != nvml.SUCCESS {
		log.Logger.Errorf("GetGpuCount failed: %v", ret)
		return 0
	}
	return count
}

// 4. NVLink带宽
func getNvLinkStats(idx int) (NvLinkStats, nvml.Return) {
	device, ret := nvml.DeviceGetHandleByIndex(idx)
	var nvLinkInfo NvLinkStats
	if ret != nvml.SUCCESS {
		return nvLinkInfo, ret
	}
	nvLinkCountInfo := []nvml.FieldValue{
		{FieldId: nvml.FI_DEV_NVLINK_LINK_COUNT},
	}
	nvCountRet := device.GetFieldValues(nvLinkCountInfo)
	if nvCountRet != nvml.SUCCESS {
		return nvLinkInfo, nvCountRet
	}
	nvLinkCountRaw := nvLinkCountInfo[0].Value // [8]byte
	nvLinkCount := binary.LittleEndian.Uint32(nvLinkCountRaw[:4])
	if nvLinkCount == 0 {
		return nvLinkInfo, nvCountRet
	}
	rxIds := make([]nvml.FieldValue, nvLinkCount)
	txIds := make([]nvml.FieldValue, nvLinkCount)
	for j := 0; j < int(nvLinkCount); j++ {
		rxIds[j].FieldId = nvml.FI_DEV_NVLINK_THROUGHPUT_RAW_RX
		rxIds[j].ScopeId = uint32(j)
		txIds[j].FieldId = nvml.FI_DEV_NVLINK_THROUGHPUT_RAW_TX
		txIds[j].ScopeId = uint32(j)
	}
	var rxTotal, txTotal uint64
	if rxRet := device.GetFieldValues(rxIds); rxRet == nvml.SUCCESS {
		for i := 0; i < int(nvLinkCount); i++ {
			if rxIds[i].NvmlReturn == uint32(nvml.SUCCESS) {
				rxTotal += binary.LittleEndian.Uint64(rxIds[i].Value[:])
			}
		}
	}
	if txRet := device.GetFieldValues(txIds); txRet == nvml.SUCCESS {
		for i := 0; i < int(nvLinkCount); i++ {
			if txIds[i].NvmlReturn == uint32(nvml.SUCCESS) {
				txTotal += binary.LittleEndian.Uint64(txIds[i].Value[:])
			}
		}
	}
	if rxTotal == 0 || txTotal == 0 {
		return nvLinkInfo, nvCountRet
	}
	now := time.Now().UnixNano()
	nvLinkInfo.RxTimestamp = now
	nvLinkInfo.TxTimestamp = now
	nvLinkInfo.NvLinkRx = rxTotal
	nvLinkInfo.NvLinkTx = txTotal
	return nvLinkInfo, nvml.SUCCESS
}
func CollectGpuStats() map[string]string {
	result := make(map[string]string)
	var wg sync.WaitGroup
	var mu sync.Mutex
	gpuCount, ret := nvml.DeviceGetCount()
	if ret != nvml.SUCCESS {
		log.Logger.Errorf("Unable to get device count: %v", nvml.ErrorString(ret))
		return result
	}
	for i := 0; i < gpuCount; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			defer func() {
				if r := recover(); r != nil {
					log.Logger.Errorf("GPU-CGO采集程序%d panic: %v,Stack:\n%s", i, r, string(debug.Stack()))
				}
			}()
			device, ret := nvml.DeviceGetHandleByIndex(i)
			if ret != nvml.SUCCESS {
				return
			}
			// 1. GPU 利用率
			util, GpuRet := device.GetUtilizationRates()
			// 2. 显存利用率
			var memUtil float64
			mem, memRet := device.GetMemoryInfo()
			if memRet == nvml.SUCCESS {
				memUtil = float64(mem.Used) / float64(mem.Total) * 100
			}
			// 3. PCIe带宽
			pcieTx, _ := device.GetPcieThroughput(nvml.PCIE_UTIL_TX_BYTES)
			pcieRx, _ := device.GetPcieThroughput(nvml.PCIE_UTIL_RX_BYTES)

			mu.Lock()
			if GpuRet == nvml.SUCCESS {
				result[fmt.Sprintf("GPU%dUtilization", i)] = fmt.Sprintf("%d", util.Gpu)
			}
			result[fmt.Sprintf("GPU%dMemoryUtilization", i)] = fmt.Sprintf("%.2f", memUtil)
			// KB/s -> B/s
			result[fmt.Sprintf("GPU%dPCIeTxBps", i)] = fmt.Sprintf("%d", pcieTx*1000)
			result[fmt.Sprintf("GPU%dPCIeRxBps", i)] = fmt.Sprintf("%d", pcieRx*1000)
			mu.Unlock()

			// NVLink
			nv0, ret0 := getNvLinkStats(i)
			time.Sleep(100 * time.Millisecond)
			nv1, ret1 := getNvLinkStats(i)
			if ret0 == nvml.SUCCESS && ret1 == nvml.SUCCESS {
				if nv1.NvLinkRx >= nv0.NvLinkRx && nv1.NvLinkTx >= nv0.NvLinkTx {
					var txBps, rxBps int
					interval := float64(nv1.TxTimestamp-nv0.TxTimestamp) / 1e9
					if interval > 0 {
						rxBps = int(float64(nv1.NvLinkRx-nv0.NvLinkRx)/interval) * 1024
						txBps = int(float64(nv1.NvLinkTx-nv0.NvLinkTx)/interval) * 1024
					}
					mu.Lock()
					result[fmt.Sprintf("GPU%dNVLinkTxBps", i)] = fmt.Sprintf("%d", txBps)
					result[fmt.Sprintf("GPU%dNVLinkRxBps", i)] = fmt.Sprintf("%d", rxBps)
					mu.Unlock()
				}
			}
		}()
	}
	wg.Wait()
	return result
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions