Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 61 additions & 12 deletions kubernetes/device-plugin/server.go

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disclaimer I'm familiar with C# and JavaScript and have some ancient experience with Java, and C/C++; this is my first time seeing go. I am too excited to see this in production to leave it untouched.

I love this change and look forward to using it! If someone has already tested this on a single GPU machine, then I vote for an approval.

Minor Nit-picks:

  • variableNameNew could be more descriptive like variableNameWithId or **variableNameId"
  • Overall, the code is well formatted and easy to read, but there are a few things an automatic code formatter might improve.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"log"
"net"
"os"
"strings"

"golang.org/x/net/context"
"google.golang.org/grpc"
Expand All @@ -36,6 +37,8 @@ const (
serverSock = pluginapi.DevicePluginPath + "nvshare-device-plugin.sock"
)


Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor nit-pick - might not hurt to run this through a code formatter.


type NvshareDevicePlugin struct {
devs []*pluginapi.Device
socket string
Expand All @@ -47,9 +50,15 @@ type NvshareDevicePlugin struct {
}

func NewNvshareDevicePlugin() *NvshareDevicePlugin {
socketId:= os.Getenv("NVSHARE_SOCK_ID")
if len(socketId) == 0 {
socketId = "0"
}

serverSockNew:= strings.Split(serverSock, ".sock")[0]+socketId+".sock"
return &NvshareDevicePlugin{
devs: getDevices(),
socket: serverSock,
socket: serverSockNew,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why you did this, Could the name be something more meaningful? The next person might add another "New" on the end.

I suggest ServerSockWithId or ServerSockId


stop: make(chan interface{}),
health: make(chan *pluginapi.Device),
Expand All @@ -75,31 +84,44 @@ func (m *NvshareDevicePlugin) cleanup() {
func (m *NvshareDevicePlugin) Start() error {
m.initialize()

socketId:= os.Getenv("NVSHARE_SOCK_ID")
if len(socketId) == 0 {
socketId = "0"
}
resourceNameNew:= resourceName+socketId

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resourceNameWithId?


err := m.Serve()
if err != nil {
log.Printf("Could not start device plugin for '%s': %s", resourceName, err)
log.Printf("Could not start device plugin for '%s': %s", resourceNameNew, err)
m.cleanup()
return err
}
log.Printf("Starting to serve '%s' on %s", resourceName, m.socket)
log.Printf("Starting to serve '%s' on %s", resourceNameNew, m.socket)

err = m.Register()
if err != nil {
log.Printf("Could not register device plugin: %s", err)
m.Stop()
return err
}
log.Printf("Registered device plugin for '%s' with Kubelet", resourceName)
log.Printf("Registered device plugin for '%s' with Kubelet", resourceNameNew)

return nil
}

/* Stop the gRPC server and clean up the UNIX socket file */
func (m *NvshareDevicePlugin) Stop() error {

socketId:= os.Getenv("NVSHARE_SOCK_ID")
if len(socketId) == 0 {
socketId = "0"
}
resourceNameNew:= resourceName+socketId

if (m == nil) || (m.server == nil) {
return nil
}
log.Printf("Stopping to serve '%s' on %s\n", resourceName, m.socket)
log.Printf("Stopping to serve '%s' on %s\n", resourceNameNew, m.socket)
m.server.Stop()
err := os.Remove(m.socket)
if (err != nil) && (!os.IsNotExist(err)) {
Expand All @@ -111,6 +133,13 @@ func (m *NvshareDevicePlugin) Stop() error {

/* Starts the gRPC server which serves incoming requests from kubelet */
func (m *NvshareDevicePlugin) Serve() error {

socketId:= os.Getenv("NVSHARE_SOCK_ID")
if len(socketId) == 0 {
socketId = "0"
}
resourceNameNew:= resourceName+socketId

os.Remove(m.socket)
sock, err := net.Listen("unix", m.socket)
if err != nil {
Expand All @@ -123,17 +152,17 @@ func (m *NvshareDevicePlugin) Serve() error {
lastCrashTime := time.Now()
restartCount := 0
for {
log.Printf("Starting gRPC server for '%s'", resourceName)
log.Printf("Starting gRPC server for '%s'", resourceNameNew)
err := m.server.Serve(sock)
if err == nil {
break
}

log.Printf("GRPC server for '%s' crashed with error: %v",
resourceName, err)
resourceNameNew, err)

if restartCount > 5 {
log.Fatalf("GRPC server for '%s' has repeatedly crashed recently. Quitting", resourceName)
log.Fatalf("GRPC server for '%s' has repeatedly crashed recently. Quitting", resourceNameNew)
}
timeSinceLastCrash := time.Since(lastCrashTime).Seconds()
lastCrashTime = time.Now()
Expand All @@ -156,6 +185,14 @@ func (m *NvshareDevicePlugin) Serve() error {

/* Registers the device plugin for resourceName with kubelet */
func (m *NvshareDevicePlugin) Register() error {

socketId:= os.Getenv("NVSHARE_SOCK_ID")
if len(socketId) == 0 {
socketId = "0"
}
resourceNameNew := resourceName+socketId


conn, err := m.dial(pluginapi.KubeletSocket, 5*time.Second)
if err != nil {
return err
Expand All @@ -166,7 +203,7 @@ func (m *NvshareDevicePlugin) Register() error {
reqt := &pluginapi.RegisterRequest{
Version: pluginapi.Version,
Endpoint: path.Base(m.socket),
ResourceName: resourceName,
ResourceName: resourceNameNew,
Options: &pluginapi.DevicePluginOptions{
GetPreferredAllocationAvailable: false,
},
Expand Down Expand Up @@ -218,12 +255,20 @@ func (m *NvshareDevicePlugin) ListAndWatch(e *pluginapi.Empty, s pluginapi.Devic
*/
func (m *NvshareDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
log.SetOutput(os.Stderr)

socketId:= os.Getenv("NVSHARE_SOCK_ID")
if len(socketId) == 0 {
socketId = "0"
}

resourceNameNew := resourceName+socketId

responses := pluginapi.AllocateResponse{}
for _, req := range reqs.ContainerRequests {
for _, id := range req.DevicesIDs {
log.Printf("Received Allocate request for %s", id)
if !m.deviceExists(id) {
return nil, fmt.Errorf("invalid allocation request for '%s' - unknown device: %s", resourceName, id)
return nil, fmt.Errorf("invalid allocation request for '%s' - unknown device: %s", resourceNameNew, id)
}
}

Expand All @@ -249,10 +294,14 @@ func (m *NvshareDevicePlugin) Allocate(ctx context.Context, reqs *pluginapi.Allo
ReadOnly: true,
}
mounts = append(mounts, mount)

SocketHostPathNew:= strings.Split(SocketHostPath, ".sock")[0]+socketId+".sock"
SocketContainerPathNew:= strings.Split(SocketContainerPath, ".sock")[0]+socketId+".sock"

/* Mount scheduler socket */
mount = &pluginapi.Mount{
HostPath: SocketHostPath,
ContainerPath: SocketContainerPath,
HostPath: SocketHostPathNew,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand you want this to stand out especially on your test machine. Do we want this on the main branch?

ContainerPath: SocketContainerPathNew,
ReadOnly: true,
}
mounts = append(mounts, mount)
Expand Down
38 changes: 33 additions & 5 deletions kubernetes/manifests/device-plugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ spec:
priorityClassName: system-node-critical
containers:
- name: nvshare-lib
image: docker.io/grgalex/nvshare:libnvshare-v0.1-f654c296
image: nvshare:libnvshare-a4301f8e

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this local dev code?

command:
- sleep
- infinity
Expand All @@ -46,6 +46,11 @@ spec:
- "/bin/sh"
- "-c"
- "umount -v /host-var-run-nvshare/libnvshare.so && rm /host-var-run-nvshare/libnvshare.so"
env:
- name: NVIDIA_VISIBLE_DEVICES
value: "1"
- name: NVSHARE_SOCK_ID
value: "1"
securityContext:
# Necessary for mounts to work.
privileged: true
Expand All @@ -58,21 +63,44 @@ spec:
# mount for /var/run/nvshare/libnvshare.so
mountPropagation: Bidirectional
- name: nvshare-device-plugin
image: docker.io/grgalex/nvshare:nvshare-device-plugin-v0.1-f654c296
image: nvshare:nvshare-device-plugin-a4301f8e
imagePullPolicy: IfNotPresent
env:
- name: NVSHARE_VIRTUAL_DEVICES
value: "10"
Copy link

@jake-brewer-isa jake-brewer-isa Mar 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this work for a 12 GPU machine? It looks like existing code that moved. Even this were changed to 12, would this add value on a 12 GPU machine? Anyway, I entered an issue since this looks like a separate issue than the multi-GPU support.

From reviewing code not live experience: Hard coded to 10 virtual GPU's?

#17

- name: NVIDIA_VISIBLE_DEVICES
value: "0"
- name: NVSHARE_SOCK_ID
value: "0"
- name: CUDA_VISIBLE_DEVICES
value: "0"
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
volumeMounts:
- name: device-plugin-socket
mountPath: /var/lib/kubelet/device-plugins

- name: nvshare-device-plugin-1
image: nvshare:nvshare-device-plugin-a4301f8e
imagePullPolicy: IfNotPresent
env:
- name: NVSHARE_VIRTUAL_DEVICES
value: "10"
- name: NVIDIA_VISIBLE_DEVICES
value: "1"
- name: NVSHARE_SOCK_ID
value: "1"
- name: CUDA_VISIBLE_DEVICES
value: "1"
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
volumeMounts:
- name: device-plugin-socket
mountPath: /var/lib/kubelet/device-plugins
resources:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss resources moving somewhere else, or was this just not used?

limits:
nvidia.com/gpu: 1
volumes:
- name: host-var-run-nvshare
hostPath:
Expand Down
28 changes: 27 additions & 1 deletion kubernetes/manifests/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ spec:
priorityClassName: system-node-critical
containers:
- name: nvshare-scheduler
image: docker.io/grgalex/nvshare:nvshare-scheduler-v0.1-f654c296
image: nvshare:nvshare-scheduler-a4301f8e

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this local dev code

imagePullPolicy: IfNotPresent
securityContext:
allowPrivilegeEscalation: false
Expand All @@ -38,6 +38,32 @@ spec:
volumeMounts:
- name: nvshare-socket-directory
mountPath: /var/run/nvshare
env:
- name: NVIDIA_VISIBLE_DEVICES
value: "0"
- name: NVSHARE_SOCK_ID
value: "0"
- name: CUDA_VISIBLE_DEVICES
value: "0"

- name: nvshare-scheduler-1
image: nvshare:nvshare-scheduler-a4301f8e
imagePullPolicy: IfNotPresent
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop: ["ALL"]
volumeMounts:
- name: nvshare-socket-directory
mountPath: /var/run/nvshare
env:
- name: NVIDIA_VISIBLE_DEVICES
value: "1"
- name: NVSHARE_SOCK_ID
value: "1"
- name: CUDA_VISIBLE_DEVICES
value: "1"

volumes:
- name: nvshare-socket-directory
hostPath:
Expand Down
7 changes: 6 additions & 1 deletion src/comm.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,14 @@ int nvshare_get_scheduler_path(char *sock_path)

offset = ret; /* Start from the trailing NULL byte */

// get scheduler socket ID
char *NVSHARE_SOCK_ID = getenv("NVSHARE_SOCK_ID");
if (NVSHARE_SOCK_ID == NULL) {
NVSHARE_SOCK_ID = "0";
}
/* TODO: Ensure it fits in sock_path, check return value */
ret = snprintf(sock_path + offset, NVSHARE_SOCK_PATH_MAX - offset,
"%s", "scheduler.sock");
"%s%s%s", "scheduler",NVSHARE_SOCK_ID,".sock");
return 0;
}

Expand Down