From 14caf424a40541c51d70a5ad223d0f77bc06a0e3 Mon Sep 17 00:00:00 2001 From: Nick Schuch Date: Wed, 10 Sep 2025 22:42:16 +1000 Subject: [PATCH 1/2] Update Compass mock --- internal/server/mock/compass/stream_traces.go | 182 +++++++++--------- pb/compass.pb.go | 6 +- pb/compass_grpc.pb.go | 81 +++++--- 3 files changed, 153 insertions(+), 116 deletions(-) diff --git a/internal/server/mock/compass/stream_traces.go b/internal/server/mock/compass/stream_traces.go index a283e75..0551afa 100644 --- a/internal/server/mock/compass/stream_traces.go +++ b/internal/server/mock/compass/stream_traces.go @@ -1,7 +1,7 @@ package compass import ( - "context" + "fmt" "time" "github.com/brianvoe/gofakeit/v7" @@ -10,107 +10,117 @@ import ( ) // StreamTraces streams Compass traces from a specific environment. -func (s *Server) StreamTraces(ctx context.Context, req *pb.StreamTracesRequest) (*pb.StreamTracesResponse, error) { +func (s *Server) StreamTraces(_ *pb.StreamTracesRequest, server pb.Compass_StreamTracesServer) error { for { // Simulate some processing delay. time.Sleep(time.Second) - resp := &pb.StreamTracesResponse{ - Traces: []*pb.CompassTrace{ - { - Metadata: &pb.CompassTraceMetadata{ - RequestId: gofakeit.UUID(), - Uri: "/sites/default/files/styles/scale_crop_7_3_wide/public/veggie-pasta-bake-hero-umami.jpg.webp?itok=CYsHBUlX", - StartTime: 11479712402527, - EndTime: 11480550685871, - }, - FunctionCalls: []*pb.CompassTraceFunctionCall{ - { - Name: "PDOStatement::execute", - StartTime: 11479719656578, - ElapsedTime: 5999966, - }, - { - Name: "Drupal\\Core\\Database\\StatementPrefetchIterator::execute", - StartTime: 11479719664878, - ElapsedTime: 5999966, + for { + resp := &pb.StreamTracesResponse{ + Traces: []*pb.CompassTrace{ + { + Metadata: &pb.CompassTraceMetadata{ + RequestId: gofakeit.UUID(), + Uri: "/sites/default/files/styles/scale_crop_7_3_wide/public/veggie-pasta-bake-hero-umami.jpg.webp?itok=CYsHBUlX", + StartTime: 11479712402527, + EndTime: 11480550685871, }, - { - Name: "Drupal\\sqlite\\Driver\\Database\\sqlite\\Statement::execute", - StartTime: 11479719666498, - ElapsedTime: 5999966, - }, - { - Name: "Drupal\\Core\\Database\\Query\\Upsert::execute", - StartTime: 11479719668488, - ElapsedTime: 5999966, + FunctionCalls: []*pb.CompassTraceFunctionCall{ + { + Name: "PDOStatement::execute", + StartTime: 11479719656578, + ElapsedTime: 5999966, + }, + { + Name: "Drupal\\Core\\Database\\StatementPrefetchIterator::execute", + StartTime: 11479719664878, + ElapsedTime: 5999966, + }, + { + Name: "Drupal\\sqlite\\Driver\\Database\\sqlite\\Statement::execute", + StartTime: 11479719666498, + ElapsedTime: 5999966, + }, + { + Name: "Drupal\\Core\\Database\\Query\\Upsert::execute", + StartTime: 11479719668488, + ElapsedTime: 5999966, + }, }, }, - }, - { - Metadata: &pb.CompassTraceMetadata{ - RequestId: gofakeit.UUID(), - Uri: "/sites/default/files/styles/scale_crop_7_3_wide/public/veggie-pasta-bake-hero-umami.jpg.webp?itok=CYsHBUlX", - StartTime: 11479712402527, - EndTime: 11480550685871, - }, - FunctionCalls: []*pb.CompassTraceFunctionCall{ - { - Name: "PDOStatement::execute", - StartTime: 11479719656578, - ElapsedTime: 5999966, - }, - { - Name: "Drupal\\Core\\Database\\StatementPrefetchIterator::execute", - StartTime: 11479719664878, - ElapsedTime: 5999966, - }, - { - Name: "Drupal\\sqlite\\Driver\\Database\\sqlite\\Statement::execute", - StartTime: 11479719666498, - ElapsedTime: 5999966, + { + Metadata: &pb.CompassTraceMetadata{ + RequestId: gofakeit.UUID(), + Uri: "/sites/default/files/styles/scale_crop_7_3_wide/public/veggie-pasta-bake-hero-umami.jpg.webp?itok=CYsHBUlX", + StartTime: 11479712402527, + EndTime: 11480550685871, }, - { - Name: "Drupal\\Core\\Database\\Query\\Upsert::execute", - StartTime: 11479719668488, - ElapsedTime: 5999966, + FunctionCalls: []*pb.CompassTraceFunctionCall{ + { + Name: "PDOStatement::execute", + StartTime: 11479719656578, + ElapsedTime: 5999966, + }, + { + Name: "Drupal\\Core\\Database\\StatementPrefetchIterator::execute", + StartTime: 11479719664878, + ElapsedTime: 5999966, + }, + { + Name: "Drupal\\sqlite\\Driver\\Database\\sqlite\\Statement::execute", + StartTime: 11479719666498, + ElapsedTime: 5999966, + }, + { + Name: "Drupal\\Core\\Database\\Query\\Upsert::execute", + StartTime: 11479719668488, + ElapsedTime: 5999966, + }, }, }, - }, - { - Metadata: &pb.CompassTraceMetadata{ - RequestId: gofakeit.UUID(), - Uri: "/sites/default/files/styles/scale_crop_7_3_wide/public/veggie-pasta-bake-hero-umami.jpg.webp?itok=CYsHBUlX", - StartTime: 11479712402527, - EndTime: 11480550685871, - }, - FunctionCalls: []*pb.CompassTraceFunctionCall{ - { - Name: "PDOStatement::execute", - StartTime: 11479719656578, - ElapsedTime: 5999966, - }, - { - Name: "Drupal\\Core\\Database\\StatementPrefetchIterator::execute", - StartTime: 11479719664878, - ElapsedTime: 5999966, + { + Metadata: &pb.CompassTraceMetadata{ + RequestId: gofakeit.UUID(), + Uri: "/sites/default/files/styles/scale_crop_7_3_wide/public/veggie-pasta-bake-hero-umami.jpg.webp?itok=CYsHBUlX", + StartTime: 11479712402527, + EndTime: 11480550685871, }, - { - Name: "Drupal\\sqlite\\Driver\\Database\\sqlite\\Statement::execute", - StartTime: 11479719666498, - ElapsedTime: 5999966, - }, - { - Name: "Drupal\\Core\\Database\\Query\\Upsert::execute", - StartTime: 11479719668488, - ElapsedTime: 5999966, + FunctionCalls: []*pb.CompassTraceFunctionCall{ + { + Name: "PDOStatement::execute", + StartTime: 11479719656578, + ElapsedTime: 5999966, + }, + { + Name: "Drupal\\Core\\Database\\StatementPrefetchIterator::execute", + StartTime: 11479719664878, + ElapsedTime: 5999966, + }, + { + Name: "Drupal\\sqlite\\Driver\\Database\\sqlite\\Statement::execute", + StartTime: 11479719666498, + ElapsedTime: 5999966, + }, + { + Name: "Drupal\\Core\\Database\\Query\\Upsert::execute", + StartTime: 11479719668488, + ElapsedTime: 5999966, + }, }, }, }, - }, + } + + err := server.Send(resp) + if err != nil { + fmt.Println("Stopping log stream for:", err.Error()) + break + } + + time.Sleep(500 * time.Millisecond) } // Simulate sending the response back to the client. - return resp, nil + return nil } } diff --git a/pb/compass.pb.go b/pb/compass.pb.go index cb71f2a..e76d2b7 100644 --- a/pb/compass.pb.go +++ b/pb/compass.pb.go @@ -362,13 +362,13 @@ var file_compass_proto_rawDesc = []byte{ 0x28, 0x03, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x65, 0x6c, 0x61, 0x70, 0x73, 0x65, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x65, 0x6c, 0x61, 0x70, 0x73, 0x65, 0x64, 0x54, 0x69, 0x6d, 0x65, - 0x32, 0x5a, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x73, 0x73, 0x12, 0x4f, 0x0a, 0x0c, 0x53, + 0x32, 0x5c, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x73, 0x73, 0x12, 0x51, 0x0a, 0x0c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x54, 0x72, 0x61, 0x63, 0x65, 0x73, 0x12, 0x1d, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x54, 0x72, 0x61, 0x63, 0x65, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x54, 0x72, 0x61, 0x63, - 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x06, 0x5a, 0x04, - 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x30, 0x01, 0x42, 0x06, + 0x5a, 0x04, 0x2e, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pb/compass_grpc.pb.go b/pb/compass_grpc.pb.go index 3c514f3..64ba18c 100644 --- a/pb/compass_grpc.pb.go +++ b/pb/compass_grpc.pb.go @@ -23,7 +23,7 @@ const _ = grpc.SupportPackageIsVersion7 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type CompassClient interface { // Stream Compass traces from a specific environment. - StreamTraces(ctx context.Context, in *StreamTracesRequest, opts ...grpc.CallOption) (*StreamTracesResponse, error) + StreamTraces(ctx context.Context, in *StreamTracesRequest, opts ...grpc.CallOption) (Compass_StreamTracesClient, error) } type compassClient struct { @@ -34,13 +34,36 @@ func NewCompassClient(cc grpc.ClientConnInterface) CompassClient { return &compassClient{cc} } -func (c *compassClient) StreamTraces(ctx context.Context, in *StreamTracesRequest, opts ...grpc.CallOption) (*StreamTracesResponse, error) { - out := new(StreamTracesResponse) - err := c.cc.Invoke(ctx, "/workflow.compass/StreamTraces", in, out, opts...) +func (c *compassClient) StreamTraces(ctx context.Context, in *StreamTracesRequest, opts ...grpc.CallOption) (Compass_StreamTracesClient, error) { + stream, err := c.cc.NewStream(ctx, &Compass_ServiceDesc.Streams[0], "/workflow.compass/StreamTraces", opts...) if err != nil { return nil, err } - return out, nil + x := &compassStreamTracesClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Compass_StreamTracesClient interface { + Recv() (*StreamTracesResponse, error) + grpc.ClientStream +} + +type compassStreamTracesClient struct { + grpc.ClientStream +} + +func (x *compassStreamTracesClient) Recv() (*StreamTracesResponse, error) { + m := new(StreamTracesResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil } // CompassServer is the server API for Compass service. @@ -48,7 +71,7 @@ func (c *compassClient) StreamTraces(ctx context.Context, in *StreamTracesReques // for forward compatibility type CompassServer interface { // Stream Compass traces from a specific environment. - StreamTraces(context.Context, *StreamTracesRequest) (*StreamTracesResponse, error) + StreamTraces(*StreamTracesRequest, Compass_StreamTracesServer) error mustEmbedUnimplementedCompassServer() } @@ -56,8 +79,8 @@ type CompassServer interface { type UnimplementedCompassServer struct { } -func (UnimplementedCompassServer) StreamTraces(context.Context, *StreamTracesRequest) (*StreamTracesResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method StreamTraces not implemented") +func (UnimplementedCompassServer) StreamTraces(*StreamTracesRequest, Compass_StreamTracesServer) error { + return status.Errorf(codes.Unimplemented, "method StreamTraces not implemented") } func (UnimplementedCompassServer) mustEmbedUnimplementedCompassServer() {} @@ -72,22 +95,25 @@ func RegisterCompassServer(s grpc.ServiceRegistrar, srv CompassServer) { s.RegisterService(&Compass_ServiceDesc, srv) } -func _Compass_StreamTraces_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(StreamTracesRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(CompassServer).StreamTraces(ctx, in) +func _Compass_StreamTraces_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(StreamTracesRequest) + if err := stream.RecvMsg(m); err != nil { + return err } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/workflow.compass/StreamTraces", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(CompassServer).StreamTraces(ctx, req.(*StreamTracesRequest)) - } - return interceptor(ctx, in, info, handler) + return srv.(CompassServer).StreamTraces(m, &compassStreamTracesServer{stream}) +} + +type Compass_StreamTracesServer interface { + Send(*StreamTracesResponse) error + grpc.ServerStream +} + +type compassStreamTracesServer struct { + grpc.ServerStream +} + +func (x *compassStreamTracesServer) Send(m *StreamTracesResponse) error { + return x.ServerStream.SendMsg(m) } // Compass_ServiceDesc is the grpc.ServiceDesc for Compass service. @@ -96,12 +122,13 @@ func _Compass_StreamTraces_Handler(srv interface{}, ctx context.Context, dec fun var Compass_ServiceDesc = grpc.ServiceDesc{ ServiceName: "workflow.compass", HandlerType: (*CompassServer)(nil), - Methods: []grpc.MethodDesc{ + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ { - MethodName: "StreamTraces", - Handler: _Compass_StreamTraces_Handler, + StreamName: "StreamTraces", + Handler: _Compass_StreamTraces_Handler, + ServerStreams: true, }, }, - Streams: []grpc.StreamDesc{}, Metadata: "compass.proto", } From 3f4f735d6512f6c68996791f93fb4672016382e8 Mon Sep 17 00:00:00 2001 From: Nick Schuch Date: Wed, 10 Sep 2025 22:44:45 +1000 Subject: [PATCH 2/2] Fix staticcheck --- internal/server/mock/compass/stream_traces.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/server/mock/compass/stream_traces.go b/internal/server/mock/compass/stream_traces.go index 0551afa..e881f3f 100644 --- a/internal/server/mock/compass/stream_traces.go +++ b/internal/server/mock/compass/stream_traces.go @@ -113,14 +113,10 @@ func (s *Server) StreamTraces(_ *pb.StreamTracesRequest, server pb.Compass_Strea err := server.Send(resp) if err != nil { - fmt.Println("Stopping log stream for:", err.Error()) - break + return fmt.Errorf("stopping log stream for: %w", err) } time.Sleep(500 * time.Millisecond) } - - // Simulate sending the response back to the client. - return nil } }