From 3ef814559a33cba147c8ff2ab416db15d010e8fa Mon Sep 17 00:00:00 2001 From: EinsteinXue Date: Thu, 25 Dec 2025 15:00:29 +0800 Subject: [PATCH] SynaXG plugin dev scheme review --- api/v1/dataprocessingunitconfig_types.go | 107 ++- cmd/main.go | 14 +- dpu-api/api.proto | 17 + dpu-api/gen/api.pb.go | 195 ++++- dpu-api/gen/api_grpc.pb.go | 163 +++- .../dataprocessingunitconfig_controller.go | 185 ++++- internal/daemon/hostsidemanager.go | 11 + internal/daemon/plugin/vendorplugin.go | 22 + .../vendor-specific-plugins/synaxg/synaxg.go | 700 ++++++++++++++++++ internal/platform/synaxg-dpu.go | 81 ++ internal/platform/vendordetector.go | 1 + 11 files changed, 1437 insertions(+), 59 deletions(-) create mode 100644 internal/daemon/vendor-specific-plugins/synaxg/synaxg.go create mode 100644 internal/platform/synaxg-dpu.go diff --git a/api/v1/dataprocessingunitconfig_types.go b/api/v1/dataprocessingunitconfig_types.go index 727d5738e..add5b1827 100644 --- a/api/v1/dataprocessingunitconfig_types.go +++ b/api/v1/dataprocessingunitconfig_types.go @@ -22,7 +22,70 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +// ========== Add: Define supported DPU operation types ========== +type DpuOperationType string + +const ( + // DpuOpNone No operation (default) + DpuOpNone DpuOperationType = "None" + // DpuOpFirmwareUpgrade Firmware upgrade operation + DpuOpFirmwareUpgrade DpuOperationType = "FirmwareUpgrade" + // DpuOpRestart DPU restart operation (mandatory after firmware upgrade) + DpuOpRestart DpuOperationType = "Reboot" +) + +// ========== Add: Define firmware types ========== +type DpuFirmwareType string + +const ( + // DpuFirmwareTypeOAM OAM type firmware + DpuFirmwareTypeOAM DpuFirmwareType = "OAM" + // DpuFirmwareTypeSDK SDK type firmware + DpuFirmwareTypeSDK DpuFirmwareType = "SDK" +) + +type DpuOperationStatusPhase string + +const ( + // DpuPhasePending Operation pending execution (default) + DpuPhasePending DpuOperationStatusPhase = "Pending" + // DpuPhaseRunning Operation is in progress + DpuPhaseRunning DpuOperationStatusPhase = "Running" + // DpuPhaseSucceeded Operation completed successfully + DpuPhaseSucceeded DpuOperationStatusPhase = "Succeeded" + // DpuPhaseFailed Operation execution failed + DpuPhaseFailed DpuOperationStatusPhase = "Failed" + // DpuPhaseCancelled Operation was cancelled + DpuPhaseCancelled DpuOperationStatusPhase = "Cancelled" +) + +// ========== Add: Define OAM firmware configuration ========== +type DpuFirmwareSpec struct { + // Firmware type (OAM/SDK) + // +kubebuilder:validation:Required + // +kubebuilder:validation:Enum=OAM;SDK + Type DpuFirmwareType `json:"type"` + // Target firmware version number, required + // +kubebuilder:validation:Required + TargetVersion string `json:"targetVersion"` + + // Firmware image path/package path (e.g. /quay.io/openshift/firmware/dpu:v1.0.8) + // +kubebuilder:validation:Required + FirmwarePath string `json:"firmwarePath,omitempty"` + +} + +type DataProcessingUnitManagement struct{ + // DPU operation type to execute: None/FirmwareUpgrade/Restart + // Modify this field to trigger the corresponding operation!!! + Operation DpuOperationType `json:"operation,omitempty"` + + // Detailed configuration for firmware upgrade, required when Operation is upgrade type + // +kubebuilder:validation:RequiredWhen=Operation,FirmwareUpgrade + Firmware DpuFirmwareSpec `json:"firmware,omitempty"` + +} // DataProcessingUnitConfigSpec defines the desired state of DataProcessingUnitConfig. type DataProcessingUnitConfigSpec struct { // INSERT ADDITIONAL SPEC FIELDS - desired state of cluster @@ -30,19 +93,53 @@ type DataProcessingUnitConfigSpec struct { // DpuSelector specifies which DPUs this DpuConfig CR should target. // If empty, the DpuConfig will target all DPUs. - // +optional + //matchLabels: nodeName, pci-address + //pci-address is required. 1 node might have multiple DPUs of the same vendor. + //so the specify the target DPU, pci-address is necessary DpuSelector *metav1.LabelSelector `json:"dpuSelector,omitempty"` - // Foo is an example field of DataProcessingUnitConfig. Edit dataprocessingunitconfig_types.go to remove/update - Foo string `json:"foo,omitempty"` + //each DPU has 1 specific CR + DpuManagement DataProcessingUnitManagement `json:"dpuManagement,omitempty"` } -// DataProcessingUnitConfigStatus defines the observed state of DataProcessingUnitConfig. -type DataProcessingUnitConfigStatus struct { +// DpuNodeOperationStatus defines the observed state of DataProcessingUnitConfig. +type DpuNodeOperationStatus struct { // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster // Important: Run "make" to regenerate code after modifying this file + NodeName string `json:"nodeName"` + + // PciAddress of the target DPU device on this node. + // +optional + PciAddress string `json:"pciAddress,omitempty"` + + // Sub-operation type: distinguish between FirmwareUpgrade and Restart + SubOperation DpuOperationType `json:"subOperation"` + + // Firmware type (valid only when SubOperation is FirmwareUpgrade): OAM/SDK + FirmwareType DpuFirmwareType `json:"firmwareType,omitempty"` + + // Operation execution status: Pending/Running/Succeeded/Failed + Phase cv `json:"phase"` + + // Operation start time + StartTime *metav1.Time `json:"startTime,omitempty"` + + // Operation completion time + CompletionTime *metav1.Time `json:"completionTime,omitempty"` + + // Upgrade-related versions (valid only when SubOperation is FirmwareUpgrade) + OriginalVersion string `json:"originalVersion,omitempty"` // Version before upgrade + TargetVersion string `json:"targetVersion,omitempty"` // Target version for upgrade + + // Error message (required when operation fails) + ErrorMessage string `json:"errorMessage,omitempty"` } +type DataProcessingUnitConfigStatus struct { + // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster + // Important: Run "make" to regenerate code after modifying this file + NodeStatus DpuNodeOperationStatus `json:"nodeStatuses,omitempty"` +} // +kubebuilder:object:root=true // +kubebuilder:subresource:status // +kubebuilder:resource:shortName=dpuconfig diff --git a/cmd/main.go b/cmd/main.go index 8211d16b6..6bd091ab9 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -139,13 +139,13 @@ func main() { os.Exit(1) } } - if err := (&controller.DataProcessingUnitConfigReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }).SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "DataProcessingUnitConfig") - os.Exit(1) - } + // if err := (&controller.DataProcessingUnitConfigReconciler{ + // Client: mgr.GetClient(), + // Scheme: mgr.GetScheme(), + // }).SetupWithManager(mgr); err != nil { + // setupLog.Error(err, "unable to create controller", "controller", "DataProcessingUnitConfig") + // os.Exit(1) + // } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/dpu-api/api.proto b/dpu-api/api.proto index df0c8dbc2..c2b722a1a 100644 --- a/dpu-api/api.proto +++ b/dpu-api/api.proto @@ -13,6 +13,11 @@ service NetworkFunctionService { rpc DeleteNetworkFunction(NFRequest) returns (Empty); } +service DataProcessingUnitManagementService { + rpc DpuRebootFunction(DPUManagementRequest) returns (DPUManagementResponse); + rpc DpuFirmwareUpgradeFunction(DPUManagementRequest) returns (DPUManagementResponse); +} + message InitRequest { bool dpu_mode = 1; string dpu_identifier = 2; @@ -67,3 +72,15 @@ message PingResponse { string responder_id = 2; bool healthy = 3; } + +message DPUManagementRequest{ + string nodeName = 1; + string pciAddress = 2; + string firmwareType = 3; + string firmwareImagePath = 4; +} + +message DPUManagementResponse{ + string status = 1; + string message = 2; +} \ No newline at end of file diff --git a/dpu-api/gen/api.pb.go b/dpu-api/gen/api.pb.go index e9f87bfb8..dfeb343c4 100644 --- a/dpu-api/gen/api.pb.go +++ b/dpu-api/gen/api.pb.go @@ -517,6 +517,126 @@ func (x *PingResponse) GetHealthy() bool { return false } +type DPUManagementRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + NodeName string `protobuf:"bytes,1,opt,name=nodeName,proto3" json:"nodeName,omitempty"` + PciAddress string `protobuf:"bytes,2,opt,name=pciAddress,proto3" json:"pciAddress,omitempty"` + FirmwareType string `protobuf:"bytes,3,opt,name=firmwareType,proto3" json:"firmwareType,omitempty"` + FirmwareImagePath string `protobuf:"bytes,4,opt,name=firmwareImagePath,proto3" json:"firmwareImagePath,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DPUManagementRequest) Reset() { + *x = DPUManagementRequest{} + mi := &file_api_proto_msgTypes[10] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DPUManagementRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DPUManagementRequest) ProtoMessage() {} + +func (x *DPUManagementRequest) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[10] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DPUManagementRequest.ProtoReflect.Descriptor instead. +func (*DPUManagementRequest) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{10} +} + +func (x *DPUManagementRequest) GetNodeName() string { + if x != nil { + return x.NodeName + } + return "" +} + +func (x *DPUManagementRequest) GetPciAddress() string { + if x != nil { + return x.PciAddress + } + return "" +} + +func (x *DPUManagementRequest) GetFirmwareType() string { + if x != nil { + return x.FirmwareType + } + return "" +} + +func (x *DPUManagementRequest) GetFirmwareImagePath() string { + if x != nil { + return x.FirmwareImagePath + } + return "" +} + +type DPUManagementResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Status string `protobuf:"bytes,1,opt,name=status,proto3" json:"status,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *DPUManagementResponse) Reset() { + *x = DPUManagementResponse{} + mi := &file_api_proto_msgTypes[11] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *DPUManagementResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*DPUManagementResponse) ProtoMessage() {} + +func (x *DPUManagementResponse) ProtoReflect() protoreflect.Message { + mi := &file_api_proto_msgTypes[11] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use DPUManagementResponse.ProtoReflect.Descriptor instead. +func (*DPUManagementResponse) Descriptor() ([]byte, []int) { + return file_api_proto_rawDescGZIP(), []int{11} +} + +func (x *DPUManagementResponse) GetStatus() string { + if x != nil { + return x.Status + } + return "" +} + +func (x *DPUManagementResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + var File_api_proto protoreflect.FileDescriptor const file_api_proto_rawDesc = "" + @@ -551,12 +671,25 @@ const file_api_proto_rawDesc = "" + "\fPingResponse\x12\x1c\n" + "\ttimestamp\x18\x01 \x01(\x03R\ttimestamp\x12!\n" + "\fresponder_id\x18\x02 \x01(\tR\vresponderId\x12\x18\n" + - "\ahealthy\x18\x03 \x01(\bR\ahealthy2?\n" + + "\ahealthy\x18\x03 \x01(\bR\ahealthy\"\xa4\x01\n" + + "\x14DPUManagementRequest\x12\x1a\n" + + "\bnodeName\x18\x01 \x01(\tR\bnodeName\x12\x1e\n" + + "\n" + + "pciAddress\x18\x02 \x01(\tR\n" + + "pciAddress\x12\"\n" + + "\ffirmwareType\x18\x03 \x01(\tR\ffirmwareType\x12,\n" + + "\x11firmwareImagePath\x18\x04 \x01(\tR\x11firmwareImagePath\"I\n" + + "\x15DPUManagementResponse\x12\x16\n" + + "\x06status\x18\x01 \x01(\tR\x06status\x12\x18\n" + + "\amessage\x18\x02 \x01(\tR\amessage2?\n" + "\x10LifeCycleService\x12+\n" + "\x04Init\x12\x13.Vendor.InitRequest\x1a\x0e.Vendor.IpPort2\x8e\x01\n" + "\x16NetworkFunctionService\x129\n" + "\x15CreateNetworkFunction\x12\x11.Vendor.NFRequest\x1a\r.Vendor.Empty\x129\n" + - "\x15DeleteNetworkFunction\x12\x11.Vendor.NFRequest\x1a\r.Vendor.Empty2w\n" + + "\x15DeleteNetworkFunction\x12\x11.Vendor.NFRequest\x1a\r.Vendor.Empty2\xd2\x01\n" + + "#DataProcessingUnitManagementService\x12P\n" + + "\x11DpuRebootFunction\x12\x1c.Vendor.DPUManagementRequest\x1a\x1d.Vendor.DPUManagementResponse\x12Y\n" + + "\x1aDpuFirmwareUpgradeFunction\x12\x1c.Vendor.DPUManagementRequest\x1a\x1d.Vendor.DPUManagementResponse2w\n" + "\rDeviceService\x127\n" + "\n" + "GetDevices\x12\r.Vendor.Empty\x1a\x1a.Vendor.DeviceListResponse\x12-\n" + @@ -576,38 +709,44 @@ func file_api_proto_rawDescGZIP() []byte { return file_api_proto_rawDescData } -var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 11) +var file_api_proto_msgTypes = make([]protoimpl.MessageInfo, 13) var file_api_proto_goTypes = []any{ - (*InitRequest)(nil), // 0: Vendor.InitRequest - (*IpPort)(nil), // 1: Vendor.IpPort - (*NFRequest)(nil), // 2: Vendor.NFRequest - (*Empty)(nil), // 3: Vendor.Empty - (*VfCount)(nil), // 4: Vendor.VfCount - (*TopologyInfo)(nil), // 5: Vendor.TopologyInfo - (*Device)(nil), // 6: Vendor.Device - (*DeviceListResponse)(nil), // 7: Vendor.DeviceListResponse - (*PingRequest)(nil), // 8: Vendor.PingRequest - (*PingResponse)(nil), // 9: Vendor.PingResponse - nil, // 10: Vendor.DeviceListResponse.DevicesEntry + (*InitRequest)(nil), // 0: Vendor.InitRequest + (*IpPort)(nil), // 1: Vendor.IpPort + (*NFRequest)(nil), // 2: Vendor.NFRequest + (*Empty)(nil), // 3: Vendor.Empty + (*VfCount)(nil), // 4: Vendor.VfCount + (*TopologyInfo)(nil), // 5: Vendor.TopologyInfo + (*Device)(nil), // 6: Vendor.Device + (*DeviceListResponse)(nil), // 7: Vendor.DeviceListResponse + (*PingRequest)(nil), // 8: Vendor.PingRequest + (*PingResponse)(nil), // 9: Vendor.PingResponse + (*DPUManagementRequest)(nil), // 10: Vendor.DPUManagementRequest + (*DPUManagementResponse)(nil), // 11: Vendor.DPUManagementResponse + nil, // 12: Vendor.DeviceListResponse.DevicesEntry } var file_api_proto_depIdxs = []int32{ 5, // 0: Vendor.Device.topology:type_name -> Vendor.TopologyInfo - 10, // 1: Vendor.DeviceListResponse.devices:type_name -> Vendor.DeviceListResponse.DevicesEntry + 12, // 1: Vendor.DeviceListResponse.devices:type_name -> Vendor.DeviceListResponse.DevicesEntry 6, // 2: Vendor.DeviceListResponse.DevicesEntry.value:type_name -> Vendor.Device 0, // 3: Vendor.LifeCycleService.Init:input_type -> Vendor.InitRequest 2, // 4: Vendor.NetworkFunctionService.CreateNetworkFunction:input_type -> Vendor.NFRequest 2, // 5: Vendor.NetworkFunctionService.DeleteNetworkFunction:input_type -> Vendor.NFRequest - 3, // 6: Vendor.DeviceService.GetDevices:input_type -> Vendor.Empty - 4, // 7: Vendor.DeviceService.SetNumVfs:input_type -> Vendor.VfCount - 8, // 8: Vendor.HeartbeatService.Ping:input_type -> Vendor.PingRequest - 1, // 9: Vendor.LifeCycleService.Init:output_type -> Vendor.IpPort - 3, // 10: Vendor.NetworkFunctionService.CreateNetworkFunction:output_type -> Vendor.Empty - 3, // 11: Vendor.NetworkFunctionService.DeleteNetworkFunction:output_type -> Vendor.Empty - 7, // 12: Vendor.DeviceService.GetDevices:output_type -> Vendor.DeviceListResponse - 4, // 13: Vendor.DeviceService.SetNumVfs:output_type -> Vendor.VfCount - 9, // 14: Vendor.HeartbeatService.Ping:output_type -> Vendor.PingResponse - 9, // [9:15] is the sub-list for method output_type - 3, // [3:9] is the sub-list for method input_type + 10, // 6: Vendor.DataProcessingUnitManagementService.DpuRebootFunction:input_type -> Vendor.DPUManagementRequest + 10, // 7: Vendor.DataProcessingUnitManagementService.DpuFirmwareUpgradeFunction:input_type -> Vendor.DPUManagementRequest + 3, // 8: Vendor.DeviceService.GetDevices:input_type -> Vendor.Empty + 4, // 9: Vendor.DeviceService.SetNumVfs:input_type -> Vendor.VfCount + 8, // 10: Vendor.HeartbeatService.Ping:input_type -> Vendor.PingRequest + 1, // 11: Vendor.LifeCycleService.Init:output_type -> Vendor.IpPort + 3, // 12: Vendor.NetworkFunctionService.CreateNetworkFunction:output_type -> Vendor.Empty + 3, // 13: Vendor.NetworkFunctionService.DeleteNetworkFunction:output_type -> Vendor.Empty + 11, // 14: Vendor.DataProcessingUnitManagementService.DpuRebootFunction:output_type -> Vendor.DPUManagementResponse + 11, // 15: Vendor.DataProcessingUnitManagementService.DpuFirmwareUpgradeFunction:output_type -> Vendor.DPUManagementResponse + 7, // 16: Vendor.DeviceService.GetDevices:output_type -> Vendor.DeviceListResponse + 4, // 17: Vendor.DeviceService.SetNumVfs:output_type -> Vendor.VfCount + 9, // 18: Vendor.HeartbeatService.Ping:output_type -> Vendor.PingResponse + 11, // [11:19] is the sub-list for method output_type + 3, // [3:11] is the sub-list for method input_type 3, // [3:3] is the sub-list for extension type_name 3, // [3:3] is the sub-list for extension extendee 0, // [0:3] is the sub-list for field type_name @@ -624,9 +763,9 @@ func file_api_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_api_proto_rawDesc), len(file_api_proto_rawDesc)), NumEnums: 0, - NumMessages: 11, + NumMessages: 13, NumExtensions: 0, - NumServices: 4, + NumServices: 5, }, GoTypes: file_api_proto_goTypes, DependencyIndexes: file_api_proto_depIdxs, diff --git a/dpu-api/gen/api_grpc.pb.go b/dpu-api/gen/api_grpc.pb.go index 96ce737b7..c910b0501 100644 --- a/dpu-api/gen/api_grpc.pb.go +++ b/dpu-api/gen/api_grpc.pb.go @@ -1,6 +1,6 @@ // Code generated by protoc-gen-go-grpc. DO NOT EDIT. // versions: -// - protoc-gen-go-grpc v1.5.1 +// - protoc-gen-go-grpc v1.6.0 // - protoc v3.19.6 // source: api.proto @@ -63,7 +63,7 @@ type LifeCycleServiceServer interface { type UnimplementedLifeCycleServiceServer struct{} func (UnimplementedLifeCycleServiceServer) Init(context.Context, *InitRequest) (*IpPort, error) { - return nil, status.Errorf(codes.Unimplemented, "method Init not implemented") + return nil, status.Error(codes.Unimplemented, "method Init not implemented") } func (UnimplementedLifeCycleServiceServer) mustEmbedUnimplementedLifeCycleServiceServer() {} func (UnimplementedLifeCycleServiceServer) testEmbeddedByValue() {} @@ -76,7 +76,7 @@ type UnsafeLifeCycleServiceServer interface { } func RegisterLifeCycleServiceServer(s grpc.ServiceRegistrar, srv LifeCycleServiceServer) { - // If the following call pancis, it indicates UnimplementedLifeCycleServiceServer was + // If the following call panics, it indicates UnimplementedLifeCycleServiceServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. @@ -178,10 +178,10 @@ type NetworkFunctionServiceServer interface { type UnimplementedNetworkFunctionServiceServer struct{} func (UnimplementedNetworkFunctionServiceServer) CreateNetworkFunction(context.Context, *NFRequest) (*Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method CreateNetworkFunction not implemented") + return nil, status.Error(codes.Unimplemented, "method CreateNetworkFunction not implemented") } func (UnimplementedNetworkFunctionServiceServer) DeleteNetworkFunction(context.Context, *NFRequest) (*Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method DeleteNetworkFunction not implemented") + return nil, status.Error(codes.Unimplemented, "method DeleteNetworkFunction not implemented") } func (UnimplementedNetworkFunctionServiceServer) mustEmbedUnimplementedNetworkFunctionServiceServer() { } @@ -195,7 +195,7 @@ type UnsafeNetworkFunctionServiceServer interface { } func RegisterNetworkFunctionServiceServer(s grpc.ServiceRegistrar, srv NetworkFunctionServiceServer) { - // If the following call pancis, it indicates UnimplementedNetworkFunctionServiceServer was + // If the following call panics, it indicates UnimplementedNetworkFunctionServiceServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. @@ -261,6 +261,147 @@ var NetworkFunctionService_ServiceDesc = grpc.ServiceDesc{ Metadata: "api.proto", } +const ( + DataProcessingUnitManagementService_DpuRebootFunction_FullMethodName = "/Vendor.DataProcessingUnitManagementService/DpuRebootFunction" + DataProcessingUnitManagementService_DpuFirmwareUpgradeFunction_FullMethodName = "/Vendor.DataProcessingUnitManagementService/DpuFirmwareUpgradeFunction" +) + +// DataProcessingUnitManagementServiceClient is the client API for DataProcessingUnitManagementService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type t interface { + DpuRebootFunction(ctx context.Context, in *DPUManagementRequest, opts ...grpc.CallOption) (*DPUManagementResponse, error) + DpuFirmwareUpgradeFunction(ctx context.Context, in *DPUManagementRequest, opts ...grpc.CallOption) (*DPUManagementResponse, error) +} + +type dataProcessingUnitManagementServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewDataProcessingUnitManagementServiceClient(cc grpc.ClientConnInterface) DataProcessingUnitManagementServiceClient { + return &dataProcessingUnitManagementServiceClient{cc} +} + +func (c *dataProcessingUnitManagementServiceClient) DpuRebootFunction(ctx context.Context, in *DPUManagementRequest, opts ...grpc.CallOption) (*DPUManagementResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(DPUManagementResponse) + err := c.cc.Invoke(ctx, DataProcessingUnitManagementService_DpuRebootFunction_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *dataProcessingUnitManagementServiceClient) DpuFirmwareUpgradeFunction(ctx context.Context, in *DPUManagementRequest, opts ...grpc.CallOption) (*DPUManagementResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(DPUManagementResponse) + err := c.cc.Invoke(ctx, DataProcessingUnitManagementService_DpuFirmwareUpgradeFunction_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DataProcessingUnitManagementServiceServer is the server API for DataProcessingUnitManagementService service. +// All implementations must embed UnimplementedDataProcessingUnitManagementServiceServer +// for forward compatibility. +type DataProcessingUnitManagementServiceServer interface { + DpuRebootFunction(context.Context, *DPUManagementRequest) (*DPUManagementResponse, error) + DpuFirmwareUpgradeFunction(context.Context, *DPUManagementRequest) (*DPUManagementResponse, error) + mustEmbedUnimplementedDataProcessingUnitManagementServiceServer() +} + +// UnimplementedDataProcessingUnitManagementServiceServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedDataProcessingUnitManagementServiceServer struct{} + +func (UnimplementedDataProcessingUnitManagementServiceServer) DpuRebootFunction(context.Context, *DPUManagementRequest) (*DPUManagementResponse, error) { + return nil, status.Error(codes.Unimplemented, "method DpuRebootFunction not implemented") +} +func (UnimplementedDataProcessingUnitManagementServiceServer) DpuFirmwareUpgradeFunction(context.Context, *DPUManagementRequest) (*DPUManagementResponse, error) { + return nil, status.Error(codes.Unimplemented, "method DpuFirmwareUpgradeFunction not implemented") +} +func (UnimplementedDataProcessingUnitManagementServiceServer) mustEmbedUnimplementedDataProcessingUnitManagementServiceServer() { +} +func (UnimplementedDataProcessingUnitManagementServiceServer) testEmbeddedByValue() {} + +// UnsafeDataProcessingUnitManagementServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DataProcessingUnitManagementServiceServer will +// result in compilation errors. +type UnsafeDataProcessingUnitManagementServiceServer interface { + mustEmbedUnimplementedDataProcessingUnitManagementServiceServer() +} + +func RegisterDataProcessingUnitManagementServiceServer(s grpc.ServiceRegistrar, srv DataProcessingUnitManagementServiceServer) { + // If the following call panics, it indicates UnimplementedDataProcessingUnitManagementServiceServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&DataProcessingUnitManagementService_ServiceDesc, srv) +} + +func _DataProcessingUnitManagementService_DpuRebootFunction_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DPUManagementRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DataProcessingUnitManagementServiceServer).DpuRebootFunction(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DataProcessingUnitManagementService_DpuRebootFunction_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DataProcessingUnitManagementServiceServer).DpuRebootFunction(ctx, req.(*DPUManagementRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DataProcessingUnitManagementService_DpuFirmwareUpgradeFunction_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(DPUManagementRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DataProcessingUnitManagementServiceServer).DpuFirmwareUpgradeFunction(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DataProcessingUnitManagementService_DpuFirmwareUpgradeFunction_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DataProcessingUnitManagementServiceServer).DpuFirmwareUpgradeFunction(ctx, req.(*DPUManagementRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// DataProcessingUnitManagementService_ServiceDesc is the grpc.ServiceDesc for DataProcessingUnitManagementService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var DataProcessingUnitManagementService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Vendor.DataProcessingUnitManagementService", + HandlerType: (*DataProcessingUnitManagementServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "DpuRebootFunction", + Handler: _DataProcessingUnitManagementService_DpuRebootFunction_Handler, + }, + { + MethodName: "DpuFirmwareUpgradeFunction", + Handler: _DataProcessingUnitManagementService_DpuFirmwareUpgradeFunction_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "api.proto", +} + const ( DeviceService_GetDevices_FullMethodName = "/Vendor.DeviceService/GetDevices" DeviceService_SetNumVfs_FullMethodName = "/Vendor.DeviceService/SetNumVfs" @@ -319,10 +460,10 @@ type DeviceServiceServer interface { type UnimplementedDeviceServiceServer struct{} func (UnimplementedDeviceServiceServer) GetDevices(context.Context, *Empty) (*DeviceListResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetDevices not implemented") + return nil, status.Error(codes.Unimplemented, "method GetDevices not implemented") } func (UnimplementedDeviceServiceServer) SetNumVfs(context.Context, *VfCount) (*VfCount, error) { - return nil, status.Errorf(codes.Unimplemented, "method SetNumVfs not implemented") + return nil, status.Error(codes.Unimplemented, "method SetNumVfs not implemented") } func (UnimplementedDeviceServiceServer) mustEmbedUnimplementedDeviceServiceServer() {} func (UnimplementedDeviceServiceServer) testEmbeddedByValue() {} @@ -335,7 +476,7 @@ type UnsafeDeviceServiceServer interface { } func RegisterDeviceServiceServer(s grpc.ServiceRegistrar, srv DeviceServiceServer) { - // If the following call pancis, it indicates UnimplementedDeviceServiceServer was + // If the following call panics, it indicates UnimplementedDeviceServiceServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. @@ -446,7 +587,7 @@ type HeartbeatServiceServer interface { type UnimplementedHeartbeatServiceServer struct{} func (UnimplementedHeartbeatServiceServer) Ping(context.Context, *PingRequest) (*PingResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented") + return nil, status.Error(codes.Unimplemented, "method Ping not implemented") } func (UnimplementedHeartbeatServiceServer) mustEmbedUnimplementedHeartbeatServiceServer() {} func (UnimplementedHeartbeatServiceServer) testEmbeddedByValue() {} @@ -459,7 +600,7 @@ type UnsafeHeartbeatServiceServer interface { } func RegisterHeartbeatServiceServer(s grpc.ServiceRegistrar, srv HeartbeatServiceServer) { - // If the following call pancis, it indicates UnimplementedHeartbeatServiceServer was + // If the following call panics, it indicates UnimplementedHeartbeatServiceServer was // embedded by pointer and is nil. This will cause panics if an // unimplemented method is ever invoked, so we test this at initialization // time to prevent it from happening at runtime later due to I/O. diff --git a/internal/controller/dataprocessingunitconfig_controller.go b/internal/controller/dataprocessingunitconfig_controller.go index e1dea6fe1..68d449496 100644 --- a/internal/controller/dataprocessingunitconfig_controller.go +++ b/internal/controller/dataprocessingunitconfig_controller.go @@ -18,19 +18,42 @@ package controller import ( "context" + "embed" + "fmt" + "time" + "github.com/go-logr/logr" + configv1 "github.com/openshift/dpu-operator/api/v1" + "github.com/openshift/dpu-operator/internal/images" + "github.com/openshift/dpu-operator/internal/platform" + "github.com/openshift/dpu-operator/pkgs/render" + "github.com/openshift/dpu-operator/pkgs/vars" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - logf "sigs.k8s.io/controller-runtime/pkg/log" - - configv1 "github.com/openshift/dpu-operator/api/v1" + "sigs.k8s.io/controller-runtime/pkg/log" ) +//go:embed bindata +var dpuBinData embed.FS + // DataProcessingUnitConfigReconciler reconciles a DataProcessingUnitConfig object type DataProcessingUnitConfigReconciler struct { client.Client Scheme *runtime.Scheme + vsp *plugin.VendorPlugin + PciAddr string +} + +func NewDataProcessingUnitConfigReconciler(client client.Client, scheme *runtime.Scheme, vsp *plugin.VendorPlugin) *DataProcessingUnitConfigReconciler { + return &DataProcessingUnitConfigReconciler{ + Client: client, + Scheme: scheme, + vsp: vsp + } } // +kubebuilder:rbac:groups=config.openshift.io,resources=dataprocessingunitconfigs,verbs=get;list;watch;create;update;patch;delete @@ -47,17 +70,163 @@ type DataProcessingUnitConfigReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.21.0/pkg/reconcile func (r *DataProcessingUnitConfigReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - _ = logf.FromContext(ctx) + logger := log.FromContext(ctx) + + logger.Info("DataProcessingUnitConfigReconciler") + + dpuConfig := &configv1.DataProcessingUnitConfig{} + err := r.Get(ctx, req.NamespacedName, dpuConfig) + if err != nil { + if errors.IsNotFound(err) { + logger.Info("DataProcessingUnitConfig not found. Ignoring.") + return ctrl.Result{}, nil + } + logger.Error(err, "Failed to get DataProcessingUnitConfig") + return ctrl.Result{}, err + } + + //DataprocessinUnitConfigReconciler is called by hostsidemanager, which means all the worker nodes within the cluster have a replica hostsidemanager locally + // DpuSelector contains nodeName and pciAddr, here is to judge is the CR is targeting to this dpu on the node + labelMatched, err := IsLabelsMatched(dpuConfig) + if labelMatched { + log.Info("Labels are matched, continue processing") + } else { + //if label not matched, skip processing + log.Info("Labels are not matched, skip processing") + return ctrl.Result{}, nil + } + + //If the CR is targeted at DPU firmware upgrading + if dpuConfig.Spec.DpuManagement.Operation == "FirmwareUpgrade" { + targetVersion := dpuConfig.Spec.DpuManagement.TargetVersion + firmwareImagePath := dpuConfig.Spec.DpuManagement.FirmwarePath + firmwareType := dpuConfig.Spec.DpuManagement.Type + + //Invoke the firmware upgrade operation via the gRPC method provided by the VSP gRPC server + r.vsp.UpgradeFirmware(r.pciAddr, firmwareType, targetVersion, firmwareImagePath) + //The return here is intended to prevent the DPU from being upgraded and rebooted simultaneously. + return ctrl.Result{}, nil + + } + + //f the CR is targeted at DPU rebooting + if dpuConfig.Spec.DpuManagement.Operation == "Reboot" { + r.vsp.RebootDpu(r.pciAddr) + return ctrl.Result{}, nil + } + + return ctrl.Result{}, nil +} + +func (r *DataProcessingUnitConfigReconciler) IsLabelsMatched(dpuConfig *configv1.DataProcessingUnitConfig) (bool, error) { + // get current nodeName + currentNodeName := getCurrentNodeName() + if currentNodeName == "" { + klog.Error("Failed to get current node name") + return false, fmt.Errorf("current node name is empty") + } + klog.Info("Current node info", "currentNodeName", currentNodeName) + + var selector labels.Selector + var nodeName, pciAddr string + var err error + + // parse DpuSelector + if dpuConfig.Spec.DpuSelector != nil { + selector, err = metav1.LabelSelectorAsSelector(dpuConfig.Spec.DpuSelector) + if err != nil { + klog.Error(err, "Invalid DpuSelector") + return false, fmt.Errorf("invalid DpuSelector: %v", err) + } + } + + // Extract noodeName and pci-address from selector + if dpuConfig.Spec.DpuSelector != nil { + nodeName, pciAddr, err = GetNodenameAndPCIAddressFromSelector(dpuConfig.Spec.DpuSelector) + if err != nil { + klog.Errorf("Failed to get PCI address from selector: %v", err) + return false, err + } + klog.Infof("Extracted nodeName: %s, PCI address: %s", nodeName, pciAddr) + } - // TODO(user): your logic here + //check the exsitence of pci-address + currentPciAddr, err = CheckPCIAddressExists(pciAddr) + r.PciAddr = currentPciAddr + // construct current label + currentLabels := labels.Set{ + "nodename": currentNodeName, + "pci-address": currentPciAddr, + } - return ctrl.Result{}, nil + // match labels + matched := selector.Matches(currentLabels) + klog.Infof("Label match result: %t (selector: %s, current labels: %v)", matched, selector.String(), currentLabels) + return matched, nil +} + +func (r *DataProcessingUnitConfigReconciler) CheckPCIAddressExists(pciAddr string) (string, error) { + // check /sys/bus/pci/devices/pciAddr + cmd := exec.Command("ls", fmt.Sprintf("/sys/bus/pci/devices/%s", pciAddr)) + // exec command and obtain err + err := cmd.Run() + if err == nil { + //pciAddr exsits + return pciAddr, nil + } + else { + return "", nil + } +} + +func (r *DataProcessingUnitConfigReconciler) GetNodenameAndPCIAddressFromSelector(selector *metav1.LabelSelector) (string, string, error) { + if selector == nil { + return "", "", fmt.Errorf("DpuSelector is nil") + } + + var nodeName, pciAddress string + var nodeNameFound, pciAddressFound bool + + if selector.MatchLabels != nil { + if pciAddr, ok := selector.MatchLabels["pci-address"]; ok { + pciAddress = pciAddr + pciAddressFound = true + } + + if nn, ok := selector.MatchLabels["nodename"]; ok { + nodeName = nn + nodeNameFound = true + } + } + + if !nodeNameFound && !pciAddressFound { + return "", "", fmt.Errorf("both nodename and pci-address labels not found in DpuSelector") + } else if !nodenameFound { + return "", pciAddress, fmt.Errorf("nodename label not found in DpuSelector (pci-address: %s)", pciAddress) + } else if !pciAddressFound { + return nodename, "", fmt.Errorf("pci-address label not found in DpuSelector (nodename: %s)", nodename) + } + + return nodename, pciAddress, nil +} + +func (r *DataProcessingUnitConfigReconciler) getCurrentNodeName() string { + return os.Getenv("MY_NODE_NAME") } -// SetupWithManager sets up the controller with the Manager. func (r *DataProcessingUnitConfigReconciler) SetupWithManager(mgr ctrl.Manager) error { + //init log + r.log = mgr.GetLogger().WithName("DataProcessingUnitConfigReconciler") + return ctrl.NewControllerManagedBy(mgr). For(&configv1.DataProcessingUnitConfig{}). - Named("dataprocessingunitconfig"). + Owns(&corev1.Pod{}). + Owns(&corev1.ServiceAccount{}). + Owns(&rbacv1.Role{}). + Owns(&rbacv1.RoleBinding{}). + Owns(&rbacv1.ClusterRole{}). + Owns(&rbacv1.ClusterRoleBinding{}). Complete(r) } + + \ No newline at end of file diff --git a/internal/daemon/hostsidemanager.go b/internal/daemon/hostsidemanager.go index 4b1a0e1c9..0f49eb618 100644 --- a/internal/daemon/hostsidemanager.go +++ b/internal/daemon/hostsidemanager.go @@ -26,6 +26,7 @@ import ( "k8s.io/client-go/rest" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" + "github.com/openshift/dpu-operator/internal/controller" ) type HostSideManager struct { @@ -49,6 +50,7 @@ type HostSideManager struct { pathManager utils.PathManager stopRequested bool dpListener net.Listener + dpuMgrClient pb2.DataProcessingUnitManagementServiceClient } func (d *HostSideManager) CreateBridgePort(pf int, vf int, vlan int, mac string) (*pb.BridgePort, error) { @@ -434,6 +436,15 @@ func (d *HostSideManager) setupReconcilers() { if err = sfcReconciler.SetupWithManager(mgr); err != nil { d.log.Error(err, "unable to create controller", "controller", "ServiceFunctionChain") } + + //set dataprocessingUnitConfig reconciler + //inject vsp in order to implement different scheme of rebooting and upgrading + dpuConfigReconciler := controller.NewDataProcessingUnitConfigReconciler(mgr.GetClient(), mgr.GetScheme(), d.vsp) + + if err = dpuConfigReconciler.SetupWithManager(mgr); err != nil { + d.log.Error(err, "unable to create controller", "controller", "DataProcessingUnitConfig_controller") + } + d.manager = mgr } } diff --git a/internal/daemon/plugin/vendorplugin.go b/internal/daemon/plugin/vendorplugin.go index 5360975ba..ec66b3f01 100644 --- a/internal/daemon/plugin/vendorplugin.go +++ b/internal/daemon/plugin/vendorplugin.go @@ -31,6 +31,8 @@ type VendorPlugin interface { DeleteNetworkFunction(input string, output string) error GetDevices() (*pb.DeviceListResponse, error) SetNumVfs(vfCount int32) (*pb.VfCount, error) + RebootDpu(dmr pb.DPUManagementRequest) (*pb.DPUManagementResponse, error) + UpgradeFirmware(dmr pb.DPUManagementRequest) (*pb.DPUManagementResponse, error) } type GrpcPlugin struct { @@ -40,6 +42,7 @@ type GrpcPlugin struct { opiClient opi.BridgePortServiceClient nfclient pb.NetworkFunctionServiceClient dsClient pb.DeviceServiceClient + dmsClient pb.DataProcessingUnitManagementServiceClient dpuMode bool dpuIdentifier DpuIdentifier conn *grpc.ClientConn @@ -149,6 +152,7 @@ func (g *GrpcPlugin) ensureConnected() error { g.nfclient = pb.NewNetworkFunctionServiceClient(conn) g.opiClient = opi.NewBridgePortServiceClient(conn) g.dsClient = pb.NewDeviceServiceClient(conn) + g.dmsClient = pb.NewDataProcessingUnitManagementServiceClient(conn) return nil } @@ -210,6 +214,24 @@ func (g *GrpcPlugin) SetNumVfs(count int32) (*pb.VfCount, error) { return g.dsClient.SetNumVfs(context.Background(), c) } +func (g *GrpcPlugin) RebootDpu(dmr pb.DPUManagementRequest) (*pb.DPUManagementResponse, error) { + err := g.ensureConnected() + if err != nil { + return nil, fmt.Errorf("RebootDpu failed to ensure GRPC connection: %v", err) + } + dpuMgrInfo := &dmr + return g.dmsClient.DpuRebootFunction(ctx, dpuMgrInfo) +} + +func (g *GrpcPlugin) UpgradeFirmware(dmr pb.DPUManagementRequest) (*pb.DPUManagementResponse, error) { + err := g.ensureConnected() + if err != nil { + return nil, fmt.Errorf("UpgradeFirmware failed to ensure GRPC connection: %v", err) + } + dpuMgrInfo := &dmr + return g.dmsClient.DpuUpgradeFirmwareFunction(ctx, dpuMgrInfo) +} + // IsInitialized returns true if the VSP has been successfully initialized func (g *GrpcPlugin) IsInitialized() bool { g.initMutex.RLock() diff --git a/internal/daemon/vendor-specific-plugins/synaxg/synaxg.go b/internal/daemon/vendor-specific-plugins/synaxg/synaxg.go new file mode 100644 index 000000000..9afc018bc --- /dev/null +++ b/internal/daemon/vendor-specific-plugins/synaxg/synaxg.go @@ -0,0 +1,700 @@ +package main + +import ( + "context" + "flag" + "fmt" + "google.golang.org/grpc/credentials/insecure" + "io" + "log" + "net" + "os" + "os/exec" + "sync" + "time" + "archive/tar" + "strings" + + "github.com/go-logr/logr" + pb "github.com/openshift/dpu-operator/dpu-api/gen" + mrvlutils "github.com/openshift/dpu-operator/internal/daemon/vendor-specific-plugins/marvell/mrvl-utils" + dhcp "github.com/openshift/dpu-operator/internal/daemon/vendor-specific-plugins/synaxg/dhcp" + sgpb "github.com/openshift/dpu-operator/internal/daemon/vendor-specific-plugins/synaxg/protos/gen" + "github.com/openshift/dpu-operator/internal/utils" + opi "github.com/opiproject/opi-api/network/evpn-gw/v1alpha1/gen/go" + "github.com/vishvananda/netlink" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "k8s.io/klog/v2" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "github.com/google/go-containerregistry/pkg/authn" + "github.com/google/go-containerregistry/pkg/name" + "github.com/google/go-containerregistry/pkg/v1/remote" +) + +const ( + SysBusPci string = "/sys/bus/pci/devices" + VendorID string = "177d" + DPUdeviceID string = "a0f7" + HostDeviceID string = "ba00" + DefaultPort int32 = 8085 + Version string = "0.0.1" + PortType string = "veth" + NoOfPortPairs int = 2 + IPv6AddrDpu string = "fe80::1" + IPv6AddrHost string = "fe80::2" + DataPlaneType string = "debug" + NumPFs int = 1 + PFID int = 0 + isDPDK bool = false + HostVFDeviceID string = "ba03" + filePathUnbind string = "/sys/bus/pci/drivers/octeon_ep/unbind" + filePathBind string = "/sys/bus/pci/drivers/octeon_ep/bind" +) + +const ( + vduPort int = 50051 + oprPort int = 50601 + loadTime time.Duration = 15 + upgradeTime time.Duration = 1200 + chunkSize int = 1024 * 1024 +) + +const ( + FAIL_LOAD_STATUS int = 0xFF + FAIL_STATUS int = 1 + SUCC_STATUS int = 0 + NOT_NEED_STATUS int = 2 +) + +// multiple dataplane can be added using mrvldp interface functions +type sgDeviceInfo struct { + secInterfaceName string + dpInterfaceName string + dpMAC string + portType string + health string + pciAddress string +} +type sgVspServer struct { + pb.UnimplementedLifeCycleServiceServer + pb.UnimplementedManualOperationServiceServer + pb.UnimplementedNetworkFunctionServiceServer + pb.UnimplementedDeviceServiceServer + opi.UnimplementedBridgePortServiceServer + log logr.Logger + grpcServer *grpc.Server + wg sync.WaitGroup + done chan error + startedWg sync.WaitGroup + pathManager utils.PathManager + version string + isDPUMode bool + deviceStore map[string]sgDeviceInfo + portType string + bridgeName string +} +type CardInfo struct { + SerialNum string + CardIp string +} + +type HelloServiceImpl struct { + sgpb.UnimplementedHelloServiceServer +} + +type CardRequest struct { + PciAddr string + StatusCode int +} + +type PcieDriverInfo struct { + pfName string + deviceID string + deviceIp string +} + +var ( + cardInfo CardInfo + cardReqList CardRequest + driverInfo PcieDriverInfo + hbInterval int32 = 60 + l1HbInterval int32 = 10 +) +// createVethPair function to create a veth pair with the given index and InterfaceInfo + +// Init function to initialize the Marvell VSP Server with the given context and InitRequest +// It will return the IpPort and error +func (vsp *sgVspServer) Init(ctx context.Context, in *pb.InitRequest) (*pb.IpPort, error) { + klog.Infof("Received Init() request: DpuMode: %v", in.DpuMode) + vsp.isDPUMode = in.DpuMode + ipPort, err := vsp.configureIP(in.DpuMode) + if vsp.deviceStore == nil { + vsp.deviceStore = make(map[string]sgDeviceInfo) + } + + vsp.portType = "sriov" + VfsPCI, err := mrvlutils.GetAllVfsByDeviceID(HostVFDeviceID) + if err != nil { + return nil, err + } + for _, vfpci := range VfsPCI { + health := vsp.GetDeviceHealth(vfpci) + vsp.deviceStore[vfpci] = sgDeviceInfo{ + pciAddress: vfpci, + health: health, + portType: "sriov", + } + } + return &pb.IpPort{ + Ip: ipPort.Ip, + Port: ipPort.Port, + }, err +} + +func (vsp *sgVspServer) configureIP(dpuMode bool) (pb.IpPort, error) { + var addr string + var deviceID string + if dpuMode { + addr = IPv6AddrDpu + deviceID = DPUdeviceID + } else { + addr = IPv6AddrHost + deviceID = HostDeviceID + } + ifName, err := mrvlutils.GetNameByDeviceID(deviceID) + if err != nil { + klog.Errorf("Error occurred in getting Interface Name: %v", err) + return pb.IpPort{}, err + } + klog.Infof("Interface Name: %s", ifName) + err = enableIPV6LinkLocal(ifName, addr) + addr = IPv6AddrDpu + if err != nil { + klog.Errorf("Error occurred in enabling IPv6 Link local Address: %v", err) + return pb.IpPort{}, err + } + var connStr string + if dpuMode { + connStr = "[" + addr + "%" + ifName + "]" + } else { + connStr = "[" + addr + "%25" + ifName + "]" + } + klog.Infof("IPv6 Link Local Address Enabled IfName: %v, Connection String: %s", ifName, connStr) + return pb.IpPort{ + Ip: connStr, + Port: DefaultPort, + }, nil + +} + +// enableIPV6LinkLocal function to enable the IPv6 Link Local Address on the given Interface Name +// It will return the error +func enableIPV6LinkLocal(interfaceName string, ipv6Addr string) error { + // Tell NetworkManager to not manage our interface. + err1 := exec.Command("nsenter", "-t", "1", "-m", "-u", "-n", "-i", "--", "nmcli", "device", "set", interfaceName, "managed", "no").Run() + if err1 != nil { + // This error may be fine. Maybe our host doesn't even run + // NetworkManager. Ignore. + klog.Infof("nmcli device set %s managed no failed with error %v", interfaceName, err1) + } + + optimistic_dad_file := "/proc/sys/net/ipv6/conf/" + interfaceName + "/optimistic_dad" + err1 = os.WriteFile(optimistic_dad_file, []byte("1"), os.ModeAppend) + if err1 != nil { + klog.Errorf("Error setting %s: %v", optimistic_dad_file, err1) + } + + // Ensure to set addrgenmode and toggle link state (which can result in creating + // the IPv6 link local address. Ignore errors here. + exec.Command("ip", "link", "set", interfaceName, "addrgenmode", "eui64").Run() + exec.Command("ip", "link", "set", interfaceName, "down").Run() + + err := exec.Command("ip", "link", "set", interfaceName, "up").Run() + if err != nil { + return fmt.Errorf("Error setting link %s up: %v", interfaceName, err) + } + + err = exec.Command("ip", "addr", "replace", ipv6Addr+"/64", "dev", interfaceName, "optimistic").Run() + if err != nil { + return fmt.Errorf("Error configuring IPv6 address %s/64 on link %s: %v", ipv6Addr, interfaceName, err) + } + return nil +} + +// GetDeviceHealth function to get the health of the device based on the given secInterfaceName +func (vsp *sgVspServer) GetDeviceHealth(secInterfaceName string) string { + switch vsp.portType { + case "veth", "sriov": + nfLink, err := netlink.LinkByName(secInterfaceName) + if err != nil { + return "Unhealthy" + } + //check if the interface is up =0 means interface is down + if nfLink.Attrs().Flags&net.FlagUp == 0 { + return "Unhealthy" + } + return "Healthy" + case "hwlbk": + return "Healthy" //TODO: Implement HW Loopback + default: + return "Unhealthy" + } +} + +func execCommand(command string) ([]byte, error) { + var ( + err error + out []byte + ) + cmd := exec.Command("sh", "-c", command) + out, err = cmd.CombinedOutput() + if err != nil { + log.Printf("Error: %v\n", err) + fmt.Printf("Output: %s\n", out) + return nil, err + } + return out, nil +} + +func Reboot(deviceID string) error { + var err error + + commd := fmt.Sprintf(`echo "%s" > %s`, deviceID, filePathUnbind) + _, err = execCommand(commd) + if err != nil { + return err + } + + log.Println("command unbind executed successfully.") + + log.Println("sleep 120 seconds...") + time.Sleep(120 * time.Second) + + commd = fmt.Sprintf(`echo "%s" > %s`, deviceID, filePathBind) + _, err = execCommand(commd) + if err != nil { + return err + } + log.Println("command bind executed successfully.") + + return nil +} + +func (vsp *sgVspServer) DpuRebootFunction(ctx context.Context, in *pb.ManualOperationRequest) (*pb.ManualOperationResponse, error){ + err := Reboot(in.PciAddress) + if err != nil{ + return &pb.ManualOperationResponse{Status: "fail", Message: err.Error()}, err + } + + return &pb.ManualOperationResponse{Status: "Success", Message: "reboot success"}, nil + +} + +// Pulls a file from a Docker image in a registry +func pullFileFromImage(image, filePath string) (outputPath string, error) { + // Parse the image reference + ref, err := name.ParseReference(image) + if err != nil { + return nil, fmt.Errorf("parsing reference: %w", err) + } + + // Get the image from the registry + img, err := remote.Image(ref, remote.WithAuthFromKeychain(authn.DefaultKeychain)) + if err != nil { + return nil, fmt.Errorf("getting image: %w", err) + } + + // Get the image layers + layers, err := img.Layers() + if err != nil { + return nil, fmt.Errorf("getting layers: %w", err) + } + + // Iterate through layers to find the file + for _, layer := range layers { + r, err := layer.Uncompressed() + if err != nil { + return nil, fmt.Errorf("reading layer: %w", err) + } + defer r.Close() + + tr := tar.NewReader(r) + + // Search for the file that we want + for { + hdr, err := tr.Next() + if err == io.EOF { + break + } + if err != nil { + return nil, fmt.Errorf("reading tar: %w", err) + } + fmt.Println("Extracting file from tar:", hdr.Name) + + if strings.HasSuffix(hdr.Name, filePath) { + outputPath := hdr.Name + outFile, err := os.Create(outputPath) + if err != nil { + return outputPath, fmt.Errorf("creating output file: %w", err) + } + // Ensure the output file is closed properly + defer outFile.Close() + + written, err := io.Copy(outFile, tr) + if err != nil { + return outputPath, fmt.Errorf("writing output file: %w", err) + } + + fmt.Println("File successfully extracted:", outputPath) + //fmt.Println("File contents:", fileContent.String()) + fmt.Println("File bytes written:", written) + return outputPath, nil + } + } + } + + return outputPath, fmt.Errorf("file not found in image: %s", filePath) +} + +func GetSdkFileFromRemote(sdkImagePath string) (string, error) { + fileSuffix := ".tar.gz" + + outputPath, err := pullFileFromImage(sdkImagePath, fileSuffix) + if err != nil { + fmt.Println("Error:", err) + } else { + fmt.Println("File saved to:", outputPath) + } + + return outputPath, nil +} + +func sendHeartbeatLoop(addr *string) { + log.Printf("send loop Heartbeat to card oam entry...") + conn, err := grpc.NewClient(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Printf("did not connect: %v", err) + return + } + defer conn.Close() + client := sgpb.NewHeartbeatServiceClient(conn) + + for { + // Contact the server and print out its response. + ctx, cancel := context.WithCancel(context.Background()) + // ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + r, err := client.Heartbeat(ctx, &sgpb.HeartbeatRequest{NotifyInterval: 60}) + if err != nil { + log.Printf("send heartbeat error: %v", err) + return + } + log.Printf("heartbeat result: %d", r.GetResult()) + time.Sleep(60 * time.Second) + } +} +func DpuUpgradeFirmwareFunction(ctx context.Context, client sgpb.SoftwareManagementServiceClient, filePath string) (*sgpb.SoftwareUpgradeResponse, error) { + log.Println("open file.") + f, err := os.Open(filePath) + if err != nil { + log.Printf("open failed: %v", err) + return nil, err + } + defer f.Close() + + stream, err := client.SoftwareUpgradeStream(ctx) + if err != nil { + return nil, err + } + for { + buf := make([]byte, chunkSize) + n, err := f.Read(buf) + if err != nil && err != io.EOF { + log.Printf("read failed: %v", err) + return nil, err + } + if err == io.EOF { + log.Printf("file upload done") + break + } + + req := &sgpb.SoftwareUpgradeStreamRequest{ + RemoteFile: filePath, + ChunkData: buf[:n], + } + + if err := stream.Send(req); err != nil { + log.Printf("send failed: %v", err) + if err == io.EOF { + break + } + return nil, err + } + } + + log.Printf("software upgrade begin...") + res, err := stream.CloseAndRecv() + if err != nil && err != io.EOF { + return nil, err + } + log.Println("software upgrade done.") + fmt.Println(res.ErrorMessage) + return res, nil +} + +func WorkerUpgrade(addr string, filePath string, statusCode *int, wg *sync.WaitGroup) { + defer wg.Done() + // Set up a connection to the bbu server. + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + + if err != nil { + log.Printf("did not connect: %v", err) + return + } + defer conn.Close() + client := sgpb.NewSoftwareManagementServiceClient(conn) + + // Contact the server and print out its response. + ctxUpload, cancel_upload := context.WithTimeout(context.Background(), upgradeTime*time.Second) + defer cancel_upload() + + res, err := SoftwareUpgrade(ctxUpload, client, filePath) + if err != nil { + log.Printf("could not upload file: %v", err) + return + } + if res.Result == sgpb.UpgradeResultStatus_UPG_NOT_RUNNING { + *statusCode = NOT_NEED_STATUS + } else if res.Result == sgpb.UpgradeResultStatus_UPG_SUCCESS { + *statusCode = SUCC_STATUS + } else { + *statusCode = FAIL_STATUS + } + + log.Printf("response %d %s", res.Result, res.ErrorMessage) +} + +func (s *HelloServiceImpl) SayHello(ctx context.Context, in *sgpb.HelloRequest) (*sgpb.HelloResponse, error) { + log.Printf("received request Serial Number %s", in.GetSerialNumber()) + log.Printf("received request Card IP %s", in.GetCardIp()) + + cardInfo = CardInfo{in.GetSerialNumber(), in.GetCardIp()} + + return &sgpb.HelloResponse{Result: sgpb.ResultStatus_SUCCESS, ErrorMessage: "", HbInterval: &hbInterval, L1HbInterval: &l1HbInterval}, nil +} + +func WorkerHello(ctx context.Context) { + + var ( + err error + lis net.Listener + s *grpc.Server + ) + + // Set up a connection to the server. + lis, err = net.Listen("tcp", fmt.Sprintf(":%d", oprPort)) + if err != nil { + log.Printf("failed to listen: %v", err) + } + s = grpc.NewServer() + func(ctx context.Context, s *grpc.Server) { + <-ctx.Done() + s.GracefulStop() + }(ctx, s) + sgpb.RegisterHelloServiceServer(s, &HelloServiceImpl{}) + + if err := s.Serve(lis); err != nil { + log.Printf("failed to serve: %v", err) + } +} + +func CheckSdkVersion(sdkVersionNew, addr string)(upgradeFlag bool, error) { + var ( + targetVersion string + currentVersion string + ) + // Set up a connection to the bbu server. + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + + if err != nil { + log.Printf("did not connect: %v", err) + return false, err + } + defer conn.Close() + client := sgpb.NewSystemManagementServiceClient(conn) + + // Contact the server and print out its response. + ctx, cancel_upload := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel_upload() + + res, err := client.GetSystemBasicInfo(ctx, &pb.GetSystemBasicInfoRequest{}) + if err != nil { + log.Printf("could not sync file: %v", err) + return false, err + } + currentVersion = res.SystemInfo.FirmwareVersion + log.Printf("CARD: FirmwareVersion: %s", currentVersion) + + parts := strings.Split(filename, "-") + if len(parts) > 1 { + // get the second part + targetVersion = parts[1] + log.Printf("TARGET: FirmwareVersion: %s", targetVersion) + } + else{ + log.Printf("SDK package name error") + return false, errors.New("SDK package name error") + } + + if currentVersion == targetVersion { + log.Printf("SDK version not changed, no need to upgrade") + return false, errors.New("SDK version not changed, no need to upgrade") + } + + reture true, nil +} + +func (vsp *sgVspServer) ManualUpgradeSdkFunction(ctx context.Context, in *pb.ManualOperationRequest) (*pb.ManualOperationResponse, error){ + var cardReqElem CardRequest + var wg_update sync.WaitGroup + + isOk, err := dhcp.StartDhcpServer() + if err != nil || isOk !=0{ + log.Printf("Utility update-oam's dhcp server init failed") + return &pb.ManualOperationResponse{Status: "fail", Message: err.Error()}, err + } + + //Step1: listen the Hello Message from SG3 OAM + ctx1, cancelHello := context.WithTimeout(context.Background(), loadTime*time.Second) + defer cancelHello() + wg_update.Add(1) + log.Printf("In loading serial number and card ip ...") + go WorkerHello(ctx1) + wg_update.Wait() + + //send heartbeat to card oam + cardAddr := fmt.Sprintf("%s:%d", cardInfo.CardIp, vduPort) + go sendHeartbeatLoop(&cardAddr) + log.Printf("In updating card ...") + filePath, err := GetSdkFileFromRemote() + if err != nil { + log.Printf("error in getting upload file ...") + return &pb.ManualOperationResponse{Status: "fail", Message: err.Error()}, err + } + //there is only one pci Addr in update + cardReqElem.PciAddr = in.PciAddress + cardReqElem.StatusCode = FAIL_STATUS + + upgradeFlag, err :=CheckSdkVersion(filePath, cardAddr) + if err != nil || upgradeFlag == false{ + log.Printf("error in upgrade") + return &pb.ManualOperationResponse{Status: "fail", Message: err.Error()}, err + } + + wg_update.Add(1) + go WorkerUpgrade(cardAddr, filePath, &cardReqElem.StatusCode, &wg_update) + + wg_update.Wait() + + log.Printf("status code is %d", cardReqElem.StatusCode) + exitCode = exitCode + cardReqElem.StatusCode + + return &pb.ManualOperationResponse{Status: "success", Message: "upgrade success"}, err + +} +// Listen function to listen on the UNIX domain socket +// It will return the Listener and error +func (vsp *sgVspServer) Listen() (net.Listener, error) { + err := vsp.pathManager.EnsureSocketDirExists(vsp.pathManager.VendorPluginSocket()) + if err != nil { + return nil, fmt.Errorf("failed to create run directory for vendor plugin socket: %v", err) + } + listener, err := net.Listen("unix", vsp.pathManager.VendorPluginSocket()) + if err != nil { + return nil, fmt.Errorf("failed to listen on the vendor plugin socket: %v", err) + } + vsp.grpcServer = grpc.NewServer() + pb.RegisterManualOperationServiceServer(vsp.grpcServer, vsp) + pb.RegisterLifeCycleServiceServer(vsp.grpcServer, vsp) + klog.Infof("gRPC server is listening on %v", listener.Addr()) + + return listener, nil +} + +// Serve function to serve the gRPC server on the given listener +// It will return the error +func (vsp *sgVspServer) Serve(listener net.Listener) error { + vsp.wg.Add(1) + go func() { + vsp.version = Version + klog.Infof("Starting Marvell VSP Server: Version: %s", vsp.version) + if err := vsp.grpcServer.Serve(listener); err != nil { + vsp.done <- err + } else { + vsp.done <- nil + } + klog.Info("Stopping Marvell VSP Server") + vsp.wg.Done() + }() + + // Block on any go routines writing to the done channel when an error occurs or they + // are forced to exit. + err := <-vsp.done + + vsp.grpcServer.Stop() + vsp.wg.Wait() + vsp.startedWg.Done() + return err +} + +func (vsp *sgVspServer) Stop() { + vsp.grpcServer.Stop() + vsp.done <- nil + vsp.startedWg.Wait() +} +func WithPathManager(pathManager utils.PathManager) func(*sgVspServer) { + return func(vsp *sgVspServer) { + vsp.pathManager = pathManager + } +} + +func NewSynaxgVspServer(opts ...func(*sgVspServer)) *sgVspServer { + var mode string + flag.StringVar(&mode, "mode", "", "Mode for the daemon, can be either host or dpu") + options := zap.Options{ + Development: true, + Level: zapcore.DebugLevel, + } + options.BindFlags(flag.CommandLine) + flag.Parse() + ctrl.SetLogger(zap.New(zap.UseFlagOptions(&options))) + vsp := &sgVspServer{ + log: ctrl.Log.WithName("MarvellVsp"), + pathManager: *utils.NewPathManager("/"), + deviceStore: make(map[string]sgDeviceInfo), + done: make(chan error), + } + + for _, opt := range opts { + opt(vsp) + } + + return vsp +} + +func main() { + sgVspServer := NewSynaxgVspServer() + listener, err := sgVspServer.Listen() + + if err != nil { + sgVspServer.log.Error(err, "Failed to Listen Marvell VSP server") + return + } + err = sgVspServer.Serve(listener) + if err != nil { + sgVspServer.log.Error(err, "Failed to serve Marvell VSP server") + return + } +} diff --git a/internal/platform/synaxg-dpu.go b/internal/platform/synaxg-dpu.go new file mode 100644 index 000000000..61399bf4e --- /dev/null +++ b/internal/platform/synaxg-dpu.go @@ -0,0 +1,81 @@ +package platform + +import ( + "fmt" + + "github.com/jaypipes/ghw" + "github.com/openshift/dpu-operator/internal/daemon/plugin" + "github.com/openshift/dpu-operator/internal/images" + "github.com/openshift/dpu-operator/internal/utils" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/kind/pkg/errors" +) + +const ( + SgVendorID string = "177d" + SgDPUdeviceID string = "ba00" + SgHostDeviceID string = "xxxx" +) + +type SynaXGDetector struct { + name string +} + +func NewSynaXGDetector() *SynaXGDetector { + return &SynaXGDetector{ + name: "SynaXG DPU", + } +} + +func (d *SynaXGDetector) Name() string { + return d.name +} + +func (pi *SynaXGDetector) IsDPU(platform Platform, pci ghw.PCIDevice, dpuDevices []plugin.DpuIdentifier) (bool, error) { + if pci.Vendor.ID == SgVendorID && + pci.Product.ID == SgHostDeviceID { + return true, nil + } + + return false, nil +} + +// IsDpuPlatform checks if the platform is a SynaXG DPU +func (pi *SynaXGDetector) IsDpuPlatform(platform Platform) (bool, error) { + devices, err := platform.PciDevices() + if err != nil { + return false, errors.Errorf("Error getting devices: %v", err) + } + + for _, pci := range devices { + if pci.Vendor.ID == SgVendorID && + pci.Product.ID == SgDPUdeviceID { + return true, nil + } + } + + return false, nil +} + +func (pi *SynaXGDetector) GetDpuIdentifier(platform Platform, pci *ghw.PCIDevice) (plugin.DpuIdentifier, error) { + identifier := fmt.Sprintf("SynaXG-dpu-%s", SanitizePCIAddress(pci.Address)) + return plugin.DpuIdentifier(identifier), nil +} + +func (pi *SynaXGDetector) VspPlugin(dpuMode bool, imageManager images.ImageManager, client client.Client, pm utils.PathManager, dpuIdentifier plugin.DpuIdentifier) (*plugin.GrpcPlugin, error) { + return plugin.NewGrpcPlugin(dpuMode, dpuIdentifier, client, plugin.WithPathManager(pm)) +} + +// GetVendorName returns the name of the vendor +func (d *SynaXGDetector) GetVendorName() string { + return "SynaXG" +} + +func (d *SynaXGDetector) DpuPlatformName() string { + return "SynaXG-dpu" +} + +// FIXME: Must be a unique value on the DPU that is non changing. +func (d *SynaXGDetector) DpuPlatformIdentifier(platform Platform) (plugin.DpuIdentifier, error) { + return plugin.DpuIdentifier("SynaXG-dpu"), nil +} diff --git a/internal/platform/vendordetector.go b/internal/platform/vendordetector.go index a62cb2447..c5fa87107 100644 --- a/internal/platform/vendordetector.go +++ b/internal/platform/vendordetector.go @@ -66,6 +66,7 @@ func NewDpuDetectorManager(platform Platform) *DpuDetectorManager { NewIntelDetector(), NewMarvellDetector(), NewNetsecAcceleratorDetector(), + NewSynaXGDetector(), // add more detectors here }, }