From ed642162959423c78afc65fd5feb2ffb7016f1e3 Mon Sep 17 00:00:00 2001 From: maedeazad Date: Thu, 23 May 2019 14:13:51 +0430 Subject: [PATCH 1/5] rename github username --- .gitignore | 2 +- README.md | 2 +- api/ubroker.proto | 2 +- cmd/ubroker/main.go | 6 +- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 +- internal/server/grpc.go | 2 +- internal/server/grpc_test.go | 4 +- internal/server/http.go | 2 +- internal/server/http_test.go | 4 +- internal/server/moc_broker_test.go | 2 +- pkg/ubroker/ubroker.pb.go | 506 +++++++++++++++++++++++++++++ 12 files changed, 522 insertions(+), 16 deletions(-) create mode 100644 pkg/ubroker/ubroker.pb.go diff --git a/.gitignore b/.gitignore index 4b66428..eb6cd8e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ /ubroker -*.pb.go \ No newline at end of file +# *.pb.go diff --git a/README.md b/README.md index 331a550..5eec89a 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Build Status](https://travis-ci.org/arcana261/ubroker.svg?branch=master)](https://travis-ci.org/arcana261/ubroker) [![Join the chat at https://gitter.im/arcana261-ubroker/community](https://badges.gitter.im/arcana261-ubroker/community.svg)](https://gitter.im/arcana261-ubroker/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +[![Build Status](https://travis-ci.org/maedeazad/ubroker.svg?branch=master)](https://travis-ci.org/maedeazad/ubroker) [![Join the chat at https://gitter.im/maedeazad-ubroker/community](https://badges.gitter.im/maedeazad-ubroker/community.svg)](https://gitter.im/maedeazad-ubroker/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) # ubroker diff --git a/api/ubroker.proto b/api/ubroker.proto index 7b2c23c..4453f7a 100644 --- a/api/ubroker.proto +++ b/api/ubroker.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package ubroker; -option go_package = "github.com/arcana261/ubroker/pkg/ubroker"; +option go_package = "github.com/maedeazad/ubroker/pkg/ubroker"; import "google/protobuf/empty.proto"; diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 95e9562..6d6aae1 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -9,11 +9,11 @@ import ( "os/signal" "time" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/pkg/ubroker" "google.golang.org/grpc" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/internal/server" + "github.com/maedeazad/ubroker/internal/broker" + "github.com/maedeazad/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index c040e9b..3e7b571 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/pkg/ubroker" ) // New creates a new instance of ubroker.Broker diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 0c3780b..86c47bf 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/arcana261/ubroker/internal/broker" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/internal/broker" + "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/grpc.go b/internal/server/grpc.go index 2b393ed..c53a9b2 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -3,7 +3,7 @@ package server import ( "context" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go index ec28f00..b00770d 100644 --- a/internal/server/grpc_test.go +++ b/internal/server/grpc_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/mock" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/internal/server" + "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/phayes/freeport" "google.golang.org/grpc" "google.golang.org/grpc/codes" diff --git a/internal/server/http.go b/internal/server/http.go index 1badf2c..5a50e03 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index aff3746..5bcfec0 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -8,8 +8,8 @@ import ( "strings" "testing" - "github.com/arcana261/ubroker/internal/server" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/internal/server" + "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) diff --git a/internal/server/moc_broker_test.go b/internal/server/moc_broker_test.go index d3a0fa4..3e0599b 100644 --- a/internal/server/moc_broker_test.go +++ b/internal/server/moc_broker_test.go @@ -3,7 +3,7 @@ package server_test import ( "context" - "github.com/arcana261/ubroker/pkg/ubroker" + "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" ) diff --git a/pkg/ubroker/ubroker.pb.go b/pkg/ubroker/ubroker.pb.go new file mode 100644 index 0000000..5e333d2 --- /dev/null +++ b/pkg/ubroker/ubroker.pb.go @@ -0,0 +1,506 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: api/ubroker.proto + +package ubroker + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + empty "github.com/golang/protobuf/ptypes/empty" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type Message struct { + Body []byte `protobuf:"bytes,1,opt,name=body,proto3" json:"body,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Message) Reset() { *m = Message{} } +func (m *Message) String() string { return proto.CompactTextString(m) } +func (*Message) ProtoMessage() {} +func (*Message) Descriptor() ([]byte, []int) { + return fileDescriptor_c9a5bc08b618fc5f, []int{0} +} + +func (m *Message) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Message.Unmarshal(m, b) +} +func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Message.Marshal(b, m, deterministic) +} +func (m *Message) XXX_Merge(src proto.Message) { + xxx_messageInfo_Message.Merge(m, src) +} +func (m *Message) XXX_Size() int { + return xxx_messageInfo_Message.Size(m) +} +func (m *Message) XXX_DiscardUnknown() { + xxx_messageInfo_Message.DiscardUnknown(m) +} + +var xxx_messageInfo_Message proto.InternalMessageInfo + +func (m *Message) GetBody() []byte { + if m != nil { + return m.Body + } + return nil +} + +type Delivery struct { + Message *Message `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` + Id int32 `protobuf:"varint,2,opt,name=id,proto3" json:"id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Delivery) Reset() { *m = Delivery{} } +func (m *Delivery) String() string { return proto.CompactTextString(m) } +func (*Delivery) ProtoMessage() {} +func (*Delivery) Descriptor() ([]byte, []int) { + return fileDescriptor_c9a5bc08b618fc5f, []int{1} +} + +func (m *Delivery) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_Delivery.Unmarshal(m, b) +} +func (m *Delivery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_Delivery.Marshal(b, m, deterministic) +} +func (m *Delivery) XXX_Merge(src proto.Message) { + xxx_messageInfo_Delivery.Merge(m, src) +} +func (m *Delivery) XXX_Size() int { + return xxx_messageInfo_Delivery.Size(m) +} +func (m *Delivery) XXX_DiscardUnknown() { + xxx_messageInfo_Delivery.DiscardUnknown(m) +} + +var xxx_messageInfo_Delivery proto.InternalMessageInfo + +func (m *Delivery) GetMessage() *Message { + if m != nil { + return m.Message + } + return nil +} + +func (m *Delivery) GetId() int32 { + if m != nil { + return m.Id + } + return 0 +} + +type FetchRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FetchRequest) Reset() { *m = FetchRequest{} } +func (m *FetchRequest) String() string { return proto.CompactTextString(m) } +func (*FetchRequest) ProtoMessage() {} +func (*FetchRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_c9a5bc08b618fc5f, []int{2} +} + +func (m *FetchRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FetchRequest.Unmarshal(m, b) +} +func (m *FetchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FetchRequest.Marshal(b, m, deterministic) +} +func (m *FetchRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_FetchRequest.Merge(m, src) +} +func (m *FetchRequest) XXX_Size() int { + return xxx_messageInfo_FetchRequest.Size(m) +} +func (m *FetchRequest) XXX_DiscardUnknown() { + xxx_messageInfo_FetchRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_FetchRequest proto.InternalMessageInfo + +type AcknowledgeRequest struct { + Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *AcknowledgeRequest) Reset() { *m = AcknowledgeRequest{} } +func (m *AcknowledgeRequest) String() string { return proto.CompactTextString(m) } +func (*AcknowledgeRequest) ProtoMessage() {} +func (*AcknowledgeRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_c9a5bc08b618fc5f, []int{3} +} + +func (m *AcknowledgeRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_AcknowledgeRequest.Unmarshal(m, b) +} +func (m *AcknowledgeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_AcknowledgeRequest.Marshal(b, m, deterministic) +} +func (m *AcknowledgeRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_AcknowledgeRequest.Merge(m, src) +} +func (m *AcknowledgeRequest) XXX_Size() int { + return xxx_messageInfo_AcknowledgeRequest.Size(m) +} +func (m *AcknowledgeRequest) XXX_DiscardUnknown() { + xxx_messageInfo_AcknowledgeRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_AcknowledgeRequest proto.InternalMessageInfo + +func (m *AcknowledgeRequest) GetId() int32 { + if m != nil { + return m.Id + } + return 0 +} + +type ReQueueRequest struct { + Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *ReQueueRequest) Reset() { *m = ReQueueRequest{} } +func (m *ReQueueRequest) String() string { return proto.CompactTextString(m) } +func (*ReQueueRequest) ProtoMessage() {} +func (*ReQueueRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_c9a5bc08b618fc5f, []int{4} +} + +func (m *ReQueueRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_ReQueueRequest.Unmarshal(m, b) +} +func (m *ReQueueRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_ReQueueRequest.Marshal(b, m, deterministic) +} +func (m *ReQueueRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReQueueRequest.Merge(m, src) +} +func (m *ReQueueRequest) XXX_Size() int { + return xxx_messageInfo_ReQueueRequest.Size(m) +} +func (m *ReQueueRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReQueueRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ReQueueRequest proto.InternalMessageInfo + +func (m *ReQueueRequest) GetId() int32 { + if m != nil { + return m.Id + } + return 0 +} + +func init() { + proto.RegisterType((*Message)(nil), "ubroker.Message") + proto.RegisterType((*Delivery)(nil), "ubroker.Delivery") + proto.RegisterType((*FetchRequest)(nil), "ubroker.FetchRequest") + proto.RegisterType((*AcknowledgeRequest)(nil), "ubroker.AcknowledgeRequest") + proto.RegisterType((*ReQueueRequest)(nil), "ubroker.ReQueueRequest") +} + +func init() { proto.RegisterFile("api/ubroker.proto", fileDescriptor_c9a5bc08b618fc5f) } + +var fileDescriptor_c9a5bc08b618fc5f = []byte{ + // 306 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x92, 0xc1, 0x4b, 0xc3, 0x30, + 0x14, 0xc6, 0xc9, 0x70, 0xab, 0xbc, 0x8d, 0xe1, 0x1e, 0xa8, 0x63, 0x43, 0x18, 0xc5, 0x43, 0xd9, + 0xa1, 0x95, 0x0d, 0x2f, 0xde, 0x1c, 0xba, 0x9b, 0xa0, 0x3d, 0x7a, 0x6b, 0x96, 0x67, 0x16, 0xd6, + 0x9a, 0xda, 0x36, 0xca, 0xfc, 0xbb, 0xfd, 0x03, 0xc4, 0xac, 0x29, 0x93, 0xb1, 0xdb, 0xcb, 0xe3, + 0xfb, 0xf2, 0xfd, 0xf2, 0x11, 0x18, 0x24, 0xb9, 0x8a, 0x0c, 0x2f, 0xf4, 0x86, 0x8a, 0x30, 0x2f, + 0x74, 0xa5, 0xd1, 0xab, 0x8f, 0xa3, 0xb1, 0xd4, 0x5a, 0xa6, 0x14, 0xd9, 0x35, 0x37, 0x6f, 0x11, + 0x65, 0x79, 0xb5, 0xdd, 0xa9, 0xfc, 0x2b, 0xf0, 0x9e, 0xa8, 0x2c, 0x13, 0x49, 0x88, 0x70, 0xc2, + 0xb5, 0xd8, 0x0e, 0xd9, 0x84, 0x05, 0xbd, 0xd8, 0xce, 0xfe, 0x12, 0x4e, 0x1f, 0x28, 0x55, 0x9f, + 0x54, 0x6c, 0x71, 0x0a, 0x5e, 0xb6, 0x93, 0x5a, 0x49, 0x77, 0x76, 0x16, 0xba, 0xc4, 0xfa, 0x8a, + 0xd8, 0x09, 0xb0, 0x0f, 0x2d, 0x25, 0x86, 0xad, 0x09, 0x0b, 0xda, 0x71, 0x4b, 0x09, 0xbf, 0x0f, + 0xbd, 0x25, 0x55, 0xab, 0x75, 0x4c, 0x1f, 0x86, 0xca, 0xca, 0xbf, 0x06, 0xbc, 0x5f, 0x6d, 0xde, + 0xf5, 0x57, 0x4a, 0x42, 0x52, 0xbd, 0xad, 0x5d, 0xac, 0x71, 0x4d, 0xa0, 0x1f, 0xd3, 0x8b, 0x21, + 0x73, 0x4c, 0x31, 0xfb, 0x61, 0xd0, 0x59, 0x58, 0x06, 0xbc, 0x85, 0xb6, 0x8d, 0xc0, 0xf3, 0x06, + 0x6b, 0x3f, 0x72, 0x34, 0x68, 0xd6, 0xee, 0x45, 0x01, 0xbb, 0x61, 0xb8, 0x80, 0xee, 0x1e, 0x09, + 0x8e, 0x1b, 0xd5, 0x21, 0xdf, 0xe8, 0x22, 0xdc, 0x55, 0x19, 0xba, 0x2a, 0xc3, 0xc7, 0xbf, 0x2a, + 0xf1, 0x0e, 0xbc, 0x9a, 0x13, 0x2f, 0x1b, 0xff, 0x7f, 0xf2, 0xa3, 0xde, 0x39, 0x78, 0xcf, 0x86, + 0xa7, 0xaa, 0x5c, 0xe3, 0x41, 0x9f, 0xc7, 0x4c, 0x8b, 0xe9, 0x6b, 0x20, 0x55, 0xb5, 0x36, 0x3c, + 0x5c, 0xe9, 0x2c, 0xca, 0x12, 0x12, 0x94, 0x7c, 0x27, 0xc2, 0xfd, 0x80, 0x28, 0xdf, 0x48, 0x37, + 0xf3, 0x8e, 0xf5, 0xce, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x73, 0xdb, 0x1b, 0x91, 0x23, 0x02, + 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// BrokerClient is the client API for Broker service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type BrokerClient interface { + // Fetch should return a single Delivery per FetchRequest. + // Should return: + // Unavailable: If broker has been closed + Fetch(ctx context.Context, opts ...grpc.CallOption) (Broker_FetchClient, error) + // Acknowledge a message + // Should return: + // OK: on success + // Unavailable: If broker has been closed + // InvalidArgument: If requested ID is invalid + Acknowledge(ctx context.Context, in *AcknowledgeRequest, opts ...grpc.CallOption) (*empty.Empty, error) + // ReQueue a message + // OK: on success + // Unavailable: If broker has been closed + // InvalidArgument: If requested ID is invalid + ReQueue(ctx context.Context, in *ReQueueRequest, opts ...grpc.CallOption) (*empty.Empty, error) + // Publish message to Queue + // OK: on success + // Unavailable: If broker has been closed + Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*empty.Empty, error) +} + +type brokerClient struct { + cc *grpc.ClientConn +} + +func NewBrokerClient(cc *grpc.ClientConn) BrokerClient { + return &brokerClient{cc} +} + +func (c *brokerClient) Fetch(ctx context.Context, opts ...grpc.CallOption) (Broker_FetchClient, error) { + stream, err := c.cc.NewStream(ctx, &_Broker_serviceDesc.Streams[0], "/ubroker.Broker/Fetch", opts...) + if err != nil { + return nil, err + } + x := &brokerFetchClient{stream} + return x, nil +} + +type Broker_FetchClient interface { + Send(*FetchRequest) error + Recv() (*Delivery, error) + grpc.ClientStream +} + +type brokerFetchClient struct { + grpc.ClientStream +} + +func (x *brokerFetchClient) Send(m *FetchRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *brokerFetchClient) Recv() (*Delivery, error) { + m := new(Delivery) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *brokerClient) Acknowledge(ctx context.Context, in *AcknowledgeRequest, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/ubroker.Broker/Acknowledge", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *brokerClient) ReQueue(ctx context.Context, in *ReQueueRequest, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/ubroker.Broker/ReQueue", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *brokerClient) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*empty.Empty, error) { + out := new(empty.Empty) + err := c.cc.Invoke(ctx, "/ubroker.Broker/Publish", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// BrokerServer is the server API for Broker service. +type BrokerServer interface { + // Fetch should return a single Delivery per FetchRequest. + // Should return: + // Unavailable: If broker has been closed + Fetch(Broker_FetchServer) error + // Acknowledge a message + // Should return: + // OK: on success + // Unavailable: If broker has been closed + // InvalidArgument: If requested ID is invalid + Acknowledge(context.Context, *AcknowledgeRequest) (*empty.Empty, error) + // ReQueue a message + // OK: on success + // Unavailable: If broker has been closed + // InvalidArgument: If requested ID is invalid + ReQueue(context.Context, *ReQueueRequest) (*empty.Empty, error) + // Publish message to Queue + // OK: on success + // Unavailable: If broker has been closed + Publish(context.Context, *Message) (*empty.Empty, error) +} + +// UnimplementedBrokerServer can be embedded to have forward compatible implementations. +type UnimplementedBrokerServer struct { +} + +func (*UnimplementedBrokerServer) Fetch(srv Broker_FetchServer) error { + return status.Errorf(codes.Unimplemented, "method Fetch not implemented") +} +func (*UnimplementedBrokerServer) Acknowledge(ctx context.Context, req *AcknowledgeRequest) (*empty.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Acknowledge not implemented") +} +func (*UnimplementedBrokerServer) ReQueue(ctx context.Context, req *ReQueueRequest) (*empty.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method ReQueue not implemented") +} +func (*UnimplementedBrokerServer) Publish(ctx context.Context, req *Message) (*empty.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented") +} + +func RegisterBrokerServer(s *grpc.Server, srv BrokerServer) { + s.RegisterService(&_Broker_serviceDesc, srv) +} + +func _Broker_Fetch_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(BrokerServer).Fetch(&brokerFetchServer{stream}) +} + +type Broker_FetchServer interface { + Send(*Delivery) error + Recv() (*FetchRequest, error) + grpc.ServerStream +} + +type brokerFetchServer struct { + grpc.ServerStream +} + +func (x *brokerFetchServer) Send(m *Delivery) error { + return x.ServerStream.SendMsg(m) +} + +func (x *brokerFetchServer) Recv() (*FetchRequest, error) { + m := new(FetchRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Broker_Acknowledge_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AcknowledgeRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BrokerServer).Acknowledge(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ubroker.Broker/Acknowledge", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BrokerServer).Acknowledge(ctx, req.(*AcknowledgeRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Broker_ReQueue_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ReQueueRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BrokerServer).ReQueue(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ubroker.Broker/ReQueue", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BrokerServer).ReQueue(ctx, req.(*ReQueueRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Broker_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Message) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(BrokerServer).Publish(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/ubroker.Broker/Publish", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(BrokerServer).Publish(ctx, req.(*Message)) + } + return interceptor(ctx, in, info, handler) +} + +var _Broker_serviceDesc = grpc.ServiceDesc{ + ServiceName: "ubroker.Broker", + HandlerType: (*BrokerServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Acknowledge", + Handler: _Broker_Acknowledge_Handler, + }, + { + MethodName: "ReQueue", + Handler: _Broker_ReQueue_Handler, + }, + { + MethodName: "Publish", + Handler: _Broker_Publish_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Fetch", + Handler: _Broker_Fetch_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "api/ubroker.proto", +} From edf20b7b0ccc4af5950715ac5d646b0828915450 Mon Sep 17 00:00:00 2001 From: maedeazad Date: Thu, 23 May 2019 16:14:58 +0430 Subject: [PATCH 2/5] add Publish + Acknowledge + Fetch --- internal/server/grpc.go | 85 +++++++++- internal/server/grpc_test.go | 310 +++++++++++++++++------------------ 2 files changed, 234 insertions(+), 161 deletions(-) diff --git a/internal/server/grpc.go b/internal/server/grpc.go index c53a9b2..acb2506 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -2,6 +2,7 @@ package server import ( "context" + // "fmt" "github.com/maedeazad/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" @@ -19,18 +20,90 @@ func NewGRPC(broker ubroker.Broker) ubroker.BrokerServer { } } -func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { - return status.Error(codes.Unimplemented, "not implemented") +// Unavailable: status.Error(codes.Unavailable, "Unavailable") +// InvalidArgument: status.Error(codes.InvalidArgument, "InvalidArgument") + +// Publish message to Queue +// OK: on success +// Unavailable: If broker has been closed +func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { + err := s.broker.Publish(ctx, request) + + if err != nil { + return &empty.Empty{}, status.Error(codes.Unavailable, "Unavailable") + } + + return &empty.Empty{}, nil } +// Acknowledge a message +// Should return: +// OK: on success +// Unavailable: If broker has been closed +// InvalidArgument: If requested ID is invalid func (s *grpcServicer) Acknowledge(ctx context.Context, request *ubroker.AcknowledgeRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.Acknowledge(ctx, request.Id) + + if err != nil { + if err == ubroker.ErrClosed{ + return &empty.Empty{}, status.Error(codes.Unavailable, "Unavailable") + }else{ + return &empty.Empty{}, status.Error(codes.InvalidArgument, "InvalidArgument") + } + } + + return &empty.Empty{}, nil } +// ReQueue a message +// OK: on success +// Unavailable: If broker has been closed +// InvalidArgument: If requested ID is invalid func (s *grpcServicer) ReQueue(ctx context.Context, request *ubroker.ReQueueRequest) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") + err := s.broker.ReQueue(ctx, request.Id) + + if err != nil { + if err == ubroker.ErrClosed{ + return &empty.Empty{}, status.Error(codes.Unavailable, "Unavailable") + }else{ + return &empty.Empty{}, status.Error(codes.InvalidArgument, "InvalidArgument") + } + } + + return &empty.Empty{}, nil } -func (s *grpcServicer) Publish(ctx context.Context, request *ubroker.Message) (*empty.Empty, error) { - return &empty.Empty{}, status.Error(codes.Unimplemented, "not implemented") +// Fetch should return a single Delivery per FetchRequest. +// Should return: +// Unavailable: If broker has been closed +func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { + ctx := stream.Context() + + deliveryChannel, err := s.broker.Delivery(ctx) + if err != nil { + return err + } + + for { + _, err = stream.Recv() + if err != nil{ + return status.Error(codes.Unavailable, "Unavailable") + } + + select { + case delivery := <-deliveryChannel: + if delivery != nil{ + return status.Error(codes.Unavailable, "Unavailable") + }else{ + stream.Send(delivery) + } + + case <-ctx.Done(): + return status.Error(codes.Unavailable, "Unavailable") + } + } } + +// func print(s string){ +// fmt.Println(s) +// } diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go index b00770d..b4b9cc4 100644 --- a/internal/server/grpc_test.go +++ b/internal/server/grpc_test.go @@ -27,161 +27,161 @@ func TestGRPCServerTestSuite(t *testing.T) { suite.Run(t, new(GRPCServerTestSuite)) } -func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfClosed() { - broker := &mockBroker{} - broker.On("Delivery", mock.Anything).Once().Return(nil, ubroker.ErrClosed) - - s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { - stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) - s.Nil(err) - - s.Nil(stream.Send(&ubroker.FetchRequest{})) - - _, err = stream.Recv() - s.assertStatusCode(codes.Unavailable, err) - - broker.AssertExpectations(s.T()) - }, broker) -} - -func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfDeliveryClosed() { - broker := &mockBroker{} - broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel(), nil) - - s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { - stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) - s.Nil(err) - - s.Nil(stream.Send(&ubroker.FetchRequest{})) - - _, err = stream.Recv() - s.assertStatusCode(codes.Unavailable, err) - - broker.AssertExpectations(s.T()) - }, broker) -} - -func (s *GRPCServerTestSuite) TestFetchShouldReturnOneItem() { - broker := &mockBroker{} - broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello"), nil) - - s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { - stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) - s.Nil(err) - - s.Nil(stream.Send(&ubroker.FetchRequest{})) - - result, err := stream.Recv() - s.Nil(err) - - s.Equal("hello", string(result.Message.Body)) - - s.Nil(stream.CloseSend()) - - broker.AssertExpectations(s.T()) - }, broker) -} - -func (s *GRPCServerTestSuite) TestFetchShouldReturnTwoItems() { - broker := &mockBroker{} - broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) - - s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { - stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) - s.Nil(err) - - s.Nil(stream.Send(&ubroker.FetchRequest{})) - result, err := stream.Recv() - s.Nil(err) - s.Equal("hello", string(result.Message.Body)) - - s.Nil(stream.Send(&ubroker.FetchRequest{})) - result, err = stream.Recv() - s.Nil(err) - s.Equal("salam", string(result.Message.Body)) - - s.Nil(stream.CloseSend()) - - broker.AssertExpectations(s.T()) - }, broker) -} - -func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForFirstData() { - broker := &mockBroker{} - broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("salam"), nil) - - s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { - stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) - s.Nil(err) - - dataReceived := make(chan struct{}) - go func() { - defer close(dataReceived) - - result, err := stream.Recv() - s.Nil(err) - s.Equal("salam", string(result.Message.Body)) - }() - - time.Sleep(100 * time.Millisecond) - select { - case <-dataReceived: - s.FailNow("Fetch should not delivery if not requested") - return - - default: - } - - s.Nil(stream.Send(&ubroker.FetchRequest{})) - - <-dataReceived - - s.Nil(stream.CloseSend()) - - broker.AssertExpectations(s.T()) - }, broker) -} - -func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForMoreData() { - broker := &mockBroker{} - broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) - - s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { - stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) - s.Nil(err) - - s.Nil(stream.Send(&ubroker.FetchRequest{})) - result, err := stream.Recv() - s.Nil(err) - s.Equal("hello", string(result.Message.Body)) - - dataReceived := make(chan struct{}) - go func() { - defer close(dataReceived) - - result, err = stream.Recv() - s.Nil(err) - s.Equal("salam", string(result.Message.Body)) - }() - - time.Sleep(100 * time.Millisecond) - select { - case <-dataReceived: - s.FailNow("Fetch should not delivery if not requested") - return - - default: - } - - s.Nil(stream.Send(&ubroker.FetchRequest{})) - - <-dataReceived - - s.Nil(stream.CloseSend()) - - broker.AssertExpectations(s.T()) - }, broker) -} +// func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfClosed() { +// broker := &mockBroker{} +// broker.On("Delivery", mock.Anything).Once().Return(nil, ubroker.ErrClosed) +// +// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { +// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) +// s.Nil(err) +// +// s.Nil(stream.Send(&ubroker.FetchRequest{})) +// +// _, err = stream.Recv() +// s.assertStatusCode(codes.Unavailable, err) +// +// broker.AssertExpectations(s.T()) +// }, broker) +// } +// +// func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfDeliveryClosed() { +// broker := &mockBroker{} +// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel(), nil) +// +// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { +// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) +// s.Nil(err) +// +// s.Nil(stream.Send(&ubroker.FetchRequest{})) +// +// _, err = stream.Recv() +// s.assertStatusCode(codes.Unavailable, err) +// +// broker.AssertExpectations(s.T()) +// }, broker) +// } +// +// func (s *GRPCServerTestSuite) TestFetchShouldReturnOneItem() { +// broker := &mockBroker{} +// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello"), nil) +// +// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { +// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) +// s.Nil(err) +// +// s.Nil(stream.Send(&ubroker.FetchRequest{})) +// +// result, err := stream.Recv() +// s.Nil(err) +// +// s.Equal("hello", string(result.Message.Body)) +// +// s.Nil(stream.CloseSend()) +// +// broker.AssertExpectations(s.T()) +// }, broker) +// } +// +// func (s *GRPCServerTestSuite) TestFetchShouldReturnTwoItems() { +// broker := &mockBroker{} +// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) +// +// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { +// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) +// s.Nil(err) +// +// s.Nil(stream.Send(&ubroker.FetchRequest{})) +// result, err := stream.Recv() +// s.Nil(err) +// s.Equal("hello", string(result.Message.Body)) +// +// s.Nil(stream.Send(&ubroker.FetchRequest{})) +// result, err = stream.Recv() +// s.Nil(err) +// s.Equal("salam", string(result.Message.Body)) +// +// s.Nil(stream.CloseSend()) +// +// broker.AssertExpectations(s.T()) +// }, broker) +// } +// +// func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForFirstData() { +// broker := &mockBroker{} +// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("salam"), nil) +// +// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { +// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) +// s.Nil(err) +// +// dataReceived := make(chan struct{}) +// go func() { +// defer close(dataReceived) +// +// result, err := stream.Recv() +// s.Nil(err) +// s.Equal("salam", string(result.Message.Body)) +// }() +// +// time.Sleep(100 * time.Millisecond) +// select { +// case <-dataReceived: +// s.FailNow("Fetch should not delivery if not requested") +// return +// +// default: +// } +// +// s.Nil(stream.Send(&ubroker.FetchRequest{})) +// +// <-dataReceived +// +// s.Nil(stream.CloseSend()) +// +// broker.AssertExpectations(s.T()) +// }, broker) +// } +// +// func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForMoreData() { +// broker := &mockBroker{} +// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) +// +// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { +// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) +// s.Nil(err) +// +// s.Nil(stream.Send(&ubroker.FetchRequest{})) +// result, err := stream.Recv() +// s.Nil(err) +// s.Equal("hello", string(result.Message.Body)) +// +// dataReceived := make(chan struct{}) +// go func() { +// defer close(dataReceived) +// +// result, err = stream.Recv() +// s.Nil(err) +// s.Equal("salam", string(result.Message.Body)) +// }() +// +// time.Sleep(100 * time.Millisecond) +// select { +// case <-dataReceived: +// s.FailNow("Fetch should not delivery if not requested") +// return +// +// default: +// } +// +// s.Nil(stream.Send(&ubroker.FetchRequest{})) +// +// <-dataReceived +// +// s.Nil(stream.CloseSend()) +// +// broker.AssertExpectations(s.T()) +// }, broker) +// } func (s *GRPCServerTestSuite) TestAcknowledgeShouldReturnUnavailableIfClosed() { broker := &mockBroker{} From e57814c25e6b71b6c868f56eca08e2ab35d35e23 Mon Sep 17 00:00:00 2001 From: maedeazad Date: Thu, 23 May 2019 16:23:09 +0430 Subject: [PATCH 3/5] all tests passed. --- internal/server/grpc.go | 6 +- internal/server/grpc_test.go | 310 +++++++++++++++++------------------ 2 files changed, 158 insertions(+), 158 deletions(-) diff --git a/internal/server/grpc.go b/internal/server/grpc.go index acb2506..eef9c6f 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -81,7 +81,7 @@ func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { deliveryChannel, err := s.broker.Delivery(ctx) if err != nil { - return err + return status.Error(codes.Unavailable, "Unavailable") } for { @@ -93,9 +93,9 @@ func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { select { case delivery := <-deliveryChannel: if delivery != nil{ - return status.Error(codes.Unavailable, "Unavailable") - }else{ stream.Send(delivery) + }else{ + return status.Error(codes.Unavailable, "Unavailable") } case <-ctx.Done(): diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go index b4b9cc4..b00770d 100644 --- a/internal/server/grpc_test.go +++ b/internal/server/grpc_test.go @@ -27,161 +27,161 @@ func TestGRPCServerTestSuite(t *testing.T) { suite.Run(t, new(GRPCServerTestSuite)) } -// func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfClosed() { -// broker := &mockBroker{} -// broker.On("Delivery", mock.Anything).Once().Return(nil, ubroker.ErrClosed) -// -// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { -// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) -// s.Nil(err) -// -// s.Nil(stream.Send(&ubroker.FetchRequest{})) -// -// _, err = stream.Recv() -// s.assertStatusCode(codes.Unavailable, err) -// -// broker.AssertExpectations(s.T()) -// }, broker) -// } -// -// func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfDeliveryClosed() { -// broker := &mockBroker{} -// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel(), nil) -// -// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { -// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) -// s.Nil(err) -// -// s.Nil(stream.Send(&ubroker.FetchRequest{})) -// -// _, err = stream.Recv() -// s.assertStatusCode(codes.Unavailable, err) -// -// broker.AssertExpectations(s.T()) -// }, broker) -// } -// -// func (s *GRPCServerTestSuite) TestFetchShouldReturnOneItem() { -// broker := &mockBroker{} -// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello"), nil) -// -// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { -// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) -// s.Nil(err) -// -// s.Nil(stream.Send(&ubroker.FetchRequest{})) -// -// result, err := stream.Recv() -// s.Nil(err) -// -// s.Equal("hello", string(result.Message.Body)) -// -// s.Nil(stream.CloseSend()) -// -// broker.AssertExpectations(s.T()) -// }, broker) -// } -// -// func (s *GRPCServerTestSuite) TestFetchShouldReturnTwoItems() { -// broker := &mockBroker{} -// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) -// -// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { -// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) -// s.Nil(err) -// -// s.Nil(stream.Send(&ubroker.FetchRequest{})) -// result, err := stream.Recv() -// s.Nil(err) -// s.Equal("hello", string(result.Message.Body)) -// -// s.Nil(stream.Send(&ubroker.FetchRequest{})) -// result, err = stream.Recv() -// s.Nil(err) -// s.Equal("salam", string(result.Message.Body)) -// -// s.Nil(stream.CloseSend()) -// -// broker.AssertExpectations(s.T()) -// }, broker) -// } -// -// func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForFirstData() { -// broker := &mockBroker{} -// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("salam"), nil) -// -// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { -// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) -// s.Nil(err) -// -// dataReceived := make(chan struct{}) -// go func() { -// defer close(dataReceived) -// -// result, err := stream.Recv() -// s.Nil(err) -// s.Equal("salam", string(result.Message.Body)) -// }() -// -// time.Sleep(100 * time.Millisecond) -// select { -// case <-dataReceived: -// s.FailNow("Fetch should not delivery if not requested") -// return -// -// default: -// } -// -// s.Nil(stream.Send(&ubroker.FetchRequest{})) -// -// <-dataReceived -// -// s.Nil(stream.CloseSend()) -// -// broker.AssertExpectations(s.T()) -// }, broker) -// } -// -// func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForMoreData() { -// broker := &mockBroker{} -// broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) -// -// s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { -// stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) -// s.Nil(err) -// -// s.Nil(stream.Send(&ubroker.FetchRequest{})) -// result, err := stream.Recv() -// s.Nil(err) -// s.Equal("hello", string(result.Message.Body)) -// -// dataReceived := make(chan struct{}) -// go func() { -// defer close(dataReceived) -// -// result, err = stream.Recv() -// s.Nil(err) -// s.Equal("salam", string(result.Message.Body)) -// }() -// -// time.Sleep(100 * time.Millisecond) -// select { -// case <-dataReceived: -// s.FailNow("Fetch should not delivery if not requested") -// return -// -// default: -// } -// -// s.Nil(stream.Send(&ubroker.FetchRequest{})) -// -// <-dataReceived -// -// s.Nil(stream.CloseSend()) -// -// broker.AssertExpectations(s.T()) -// }, broker) -// } +func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfClosed() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(nil, ubroker.ErrClosed) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + _, err = stream.Recv() + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnUnavailableIfDeliveryClosed() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel(), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + _, err = stream.Recv() + s.assertStatusCode(codes.Unavailable, err) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnOneItem() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + result, err := stream.Recv() + s.Nil(err) + + s.Equal("hello", string(result.Message.Body)) + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldReturnTwoItems() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + result, err := stream.Recv() + s.Nil(err) + s.Equal("hello", string(result.Message.Body)) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + result, err = stream.Recv() + s.Nil(err) + s.Equal("salam", string(result.Message.Body)) + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForFirstData() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("salam"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + dataReceived := make(chan struct{}) + go func() { + defer close(dataReceived) + + result, err := stream.Recv() + s.Nil(err) + s.Equal("salam", string(result.Message.Body)) + }() + + time.Sleep(100 * time.Millisecond) + select { + case <-dataReceived: + s.FailNow("Fetch should not delivery if not requested") + return + + default: + } + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + <-dataReceived + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} + +func (s *GRPCServerTestSuite) TestFetchShouldNotStreamIfNotRequestedForMoreData() { + broker := &mockBroker{} + broker.On("Delivery", mock.Anything).Once().Return(s.makeChannel("hello", "salam"), nil) + + s.runTest(func(ctx context.Context, client ubroker.BrokerClient) { + stream, err := client.Fetch(ctx, grpc.WaitForReady(true)) + s.Nil(err) + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + result, err := stream.Recv() + s.Nil(err) + s.Equal("hello", string(result.Message.Body)) + + dataReceived := make(chan struct{}) + go func() { + defer close(dataReceived) + + result, err = stream.Recv() + s.Nil(err) + s.Equal("salam", string(result.Message.Body)) + }() + + time.Sleep(100 * time.Millisecond) + select { + case <-dataReceived: + s.FailNow("Fetch should not delivery if not requested") + return + + default: + } + + s.Nil(stream.Send(&ubroker.FetchRequest{})) + + <-dataReceived + + s.Nil(stream.CloseSend()) + + broker.AssertExpectations(s.T()) + }, broker) +} func (s *GRPCServerTestSuite) TestAcknowledgeShouldReturnUnavailableIfClosed() { broker := &mockBroker{} From fdcbc1a8421ba54e7b6123da9e0635c891156214 Mon Sep 17 00:00:00 2001 From: maedeazad Date: Thu, 23 May 2019 16:45:44 +0430 Subject: [PATCH 4/5] change username for pull_request --- README.md | 2 +- api/ubroker.proto | 2 +- cmd/ubroker/main.go | 6 +++--- internal/broker/core.go | 2 +- internal/broker/core_test.go | 4 ++-- internal/server/grpc.go | 7 +------ internal/server/grpc_test.go | 4 ++-- internal/server/http.go | 2 +- internal/server/http_test.go | 4 ++-- internal/server/moc_broker_test.go | 2 +- 10 files changed, 15 insertions(+), 20 deletions(-) diff --git a/README.md b/README.md index 5eec89a..331a550 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -[![Build Status](https://travis-ci.org/maedeazad/ubroker.svg?branch=master)](https://travis-ci.org/maedeazad/ubroker) [![Join the chat at https://gitter.im/maedeazad-ubroker/community](https://badges.gitter.im/maedeazad-ubroker/community.svg)](https://gitter.im/maedeazad-ubroker/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +[![Build Status](https://travis-ci.org/arcana261/ubroker.svg?branch=master)](https://travis-ci.org/arcana261/ubroker) [![Join the chat at https://gitter.im/arcana261-ubroker/community](https://badges.gitter.im/arcana261-ubroker/community.svg)](https://gitter.im/arcana261-ubroker/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) # ubroker diff --git a/api/ubroker.proto b/api/ubroker.proto index 4453f7a..7b2c23c 100644 --- a/api/ubroker.proto +++ b/api/ubroker.proto @@ -1,7 +1,7 @@ syntax = "proto3"; package ubroker; -option go_package = "github.com/maedeazad/ubroker/pkg/ubroker"; +option go_package = "github.com/arcana261/ubroker/pkg/ubroker"; import "google/protobuf/empty.proto"; diff --git a/cmd/ubroker/main.go b/cmd/ubroker/main.go index 6d6aae1..95e9562 100644 --- a/cmd/ubroker/main.go +++ b/cmd/ubroker/main.go @@ -9,11 +9,11 @@ import ( "os/signal" "time" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "google.golang.org/grpc" - "github.com/maedeazad/ubroker/internal/broker" - "github.com/maedeazad/ubroker/internal/server" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/internal/server" ) func main() { diff --git a/internal/broker/core.go b/internal/broker/core.go index 3e7b571..c040e9b 100644 --- a/internal/broker/core.go +++ b/internal/broker/core.go @@ -7,7 +7,7 @@ import ( "github.com/pkg/errors" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" ) // New creates a new instance of ubroker.Broker diff --git a/internal/broker/core_test.go b/internal/broker/core_test.go index 86c47bf..0c3780b 100644 --- a/internal/broker/core_test.go +++ b/internal/broker/core_test.go @@ -9,8 +9,8 @@ import ( "testing" "time" - "github.com/maedeazad/ubroker/internal/broker" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/broker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" "github.com/stretchr/testify/assert" diff --git a/internal/server/grpc.go b/internal/server/grpc.go index eef9c6f..73bb255 100644 --- a/internal/server/grpc.go +++ b/internal/server/grpc.go @@ -2,9 +2,8 @@ package server import ( "context" - // "fmt" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/golang/protobuf/ptypes/empty" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -103,7 +102,3 @@ func (s *grpcServicer) Fetch(stream ubroker.Broker_FetchServer) error { } } } - -// func print(s string){ -// fmt.Println(s) -// } diff --git a/internal/server/grpc_test.go b/internal/server/grpc_test.go index b00770d..ec28f00 100644 --- a/internal/server/grpc_test.go +++ b/internal/server/grpc_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/mock" - "github.com/maedeazad/ubroker/internal/server" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/phayes/freeport" "google.golang.org/grpc" "google.golang.org/grpc/codes" diff --git a/internal/server/http.go b/internal/server/http.go index 5a50e03..1badf2c 100644 --- a/internal/server/http.go +++ b/internal/server/http.go @@ -17,7 +17,7 @@ import ( "github.com/sirupsen/logrus" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/pkg/errors" ) diff --git a/internal/server/http_test.go b/internal/server/http_test.go index 5bcfec0..aff3746 100644 --- a/internal/server/http_test.go +++ b/internal/server/http_test.go @@ -8,8 +8,8 @@ import ( "strings" "testing" - "github.com/maedeazad/ubroker/internal/server" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/internal/server" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" ) diff --git a/internal/server/moc_broker_test.go b/internal/server/moc_broker_test.go index 3e0599b..d3a0fa4 100644 --- a/internal/server/moc_broker_test.go +++ b/internal/server/moc_broker_test.go @@ -3,7 +3,7 @@ package server_test import ( "context" - "github.com/maedeazad/ubroker/pkg/ubroker" + "github.com/arcana261/ubroker/pkg/ubroker" "github.com/stretchr/testify/mock" ) From d91405e345b2cefa909db2351245c8255c40c515 Mon Sep 17 00:00:00 2001 From: maedeazad Date: Thu, 23 May 2019 16:50:37 +0430 Subject: [PATCH 5/5] remove ubroker.pb.go as doesn't exist in main repo --- .gitignore | 2 +- pkg/ubroker/ubroker.pb.go | 506 -------------------------------------- 2 files changed, 1 insertion(+), 507 deletions(-) delete mode 100644 pkg/ubroker/ubroker.pb.go diff --git a/.gitignore b/.gitignore index eb6cd8e..b151abc 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ /ubroker -# *.pb.go +*.pb.go diff --git a/pkg/ubroker/ubroker.pb.go b/pkg/ubroker/ubroker.pb.go deleted file mode 100644 index 5e333d2..0000000 --- a/pkg/ubroker/ubroker.pb.go +++ /dev/null @@ -1,506 +0,0 @@ -// Code generated by protoc-gen-go. DO NOT EDIT. -// source: api/ubroker.proto - -package ubroker - -import ( - context "context" - fmt "fmt" - proto "github.com/golang/protobuf/proto" - empty "github.com/golang/protobuf/ptypes/empty" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" - math "math" -) - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package - -type Message struct { - Body []byte `protobuf:"bytes,1,opt,name=body,proto3" json:"body,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Message) Reset() { *m = Message{} } -func (m *Message) String() string { return proto.CompactTextString(m) } -func (*Message) ProtoMessage() {} -func (*Message) Descriptor() ([]byte, []int) { - return fileDescriptor_c9a5bc08b618fc5f, []int{0} -} - -func (m *Message) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Message.Unmarshal(m, b) -} -func (m *Message) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Message.Marshal(b, m, deterministic) -} -func (m *Message) XXX_Merge(src proto.Message) { - xxx_messageInfo_Message.Merge(m, src) -} -func (m *Message) XXX_Size() int { - return xxx_messageInfo_Message.Size(m) -} -func (m *Message) XXX_DiscardUnknown() { - xxx_messageInfo_Message.DiscardUnknown(m) -} - -var xxx_messageInfo_Message proto.InternalMessageInfo - -func (m *Message) GetBody() []byte { - if m != nil { - return m.Body - } - return nil -} - -type Delivery struct { - Message *Message `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` - Id int32 `protobuf:"varint,2,opt,name=id,proto3" json:"id,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *Delivery) Reset() { *m = Delivery{} } -func (m *Delivery) String() string { return proto.CompactTextString(m) } -func (*Delivery) ProtoMessage() {} -func (*Delivery) Descriptor() ([]byte, []int) { - return fileDescriptor_c9a5bc08b618fc5f, []int{1} -} - -func (m *Delivery) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_Delivery.Unmarshal(m, b) -} -func (m *Delivery) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_Delivery.Marshal(b, m, deterministic) -} -func (m *Delivery) XXX_Merge(src proto.Message) { - xxx_messageInfo_Delivery.Merge(m, src) -} -func (m *Delivery) XXX_Size() int { - return xxx_messageInfo_Delivery.Size(m) -} -func (m *Delivery) XXX_DiscardUnknown() { - xxx_messageInfo_Delivery.DiscardUnknown(m) -} - -var xxx_messageInfo_Delivery proto.InternalMessageInfo - -func (m *Delivery) GetMessage() *Message { - if m != nil { - return m.Message - } - return nil -} - -func (m *Delivery) GetId() int32 { - if m != nil { - return m.Id - } - return 0 -} - -type FetchRequest struct { - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *FetchRequest) Reset() { *m = FetchRequest{} } -func (m *FetchRequest) String() string { return proto.CompactTextString(m) } -func (*FetchRequest) ProtoMessage() {} -func (*FetchRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_c9a5bc08b618fc5f, []int{2} -} - -func (m *FetchRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_FetchRequest.Unmarshal(m, b) -} -func (m *FetchRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_FetchRequest.Marshal(b, m, deterministic) -} -func (m *FetchRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_FetchRequest.Merge(m, src) -} -func (m *FetchRequest) XXX_Size() int { - return xxx_messageInfo_FetchRequest.Size(m) -} -func (m *FetchRequest) XXX_DiscardUnknown() { - xxx_messageInfo_FetchRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_FetchRequest proto.InternalMessageInfo - -type AcknowledgeRequest struct { - Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *AcknowledgeRequest) Reset() { *m = AcknowledgeRequest{} } -func (m *AcknowledgeRequest) String() string { return proto.CompactTextString(m) } -func (*AcknowledgeRequest) ProtoMessage() {} -func (*AcknowledgeRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_c9a5bc08b618fc5f, []int{3} -} - -func (m *AcknowledgeRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_AcknowledgeRequest.Unmarshal(m, b) -} -func (m *AcknowledgeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_AcknowledgeRequest.Marshal(b, m, deterministic) -} -func (m *AcknowledgeRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_AcknowledgeRequest.Merge(m, src) -} -func (m *AcknowledgeRequest) XXX_Size() int { - return xxx_messageInfo_AcknowledgeRequest.Size(m) -} -func (m *AcknowledgeRequest) XXX_DiscardUnknown() { - xxx_messageInfo_AcknowledgeRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_AcknowledgeRequest proto.InternalMessageInfo - -func (m *AcknowledgeRequest) GetId() int32 { - if m != nil { - return m.Id - } - return 0 -} - -type ReQueueRequest struct { - Id int32 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` -} - -func (m *ReQueueRequest) Reset() { *m = ReQueueRequest{} } -func (m *ReQueueRequest) String() string { return proto.CompactTextString(m) } -func (*ReQueueRequest) ProtoMessage() {} -func (*ReQueueRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_c9a5bc08b618fc5f, []int{4} -} - -func (m *ReQueueRequest) XXX_Unmarshal(b []byte) error { - return xxx_messageInfo_ReQueueRequest.Unmarshal(m, b) -} -func (m *ReQueueRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { - return xxx_messageInfo_ReQueueRequest.Marshal(b, m, deterministic) -} -func (m *ReQueueRequest) XXX_Merge(src proto.Message) { - xxx_messageInfo_ReQueueRequest.Merge(m, src) -} -func (m *ReQueueRequest) XXX_Size() int { - return xxx_messageInfo_ReQueueRequest.Size(m) -} -func (m *ReQueueRequest) XXX_DiscardUnknown() { - xxx_messageInfo_ReQueueRequest.DiscardUnknown(m) -} - -var xxx_messageInfo_ReQueueRequest proto.InternalMessageInfo - -func (m *ReQueueRequest) GetId() int32 { - if m != nil { - return m.Id - } - return 0 -} - -func init() { - proto.RegisterType((*Message)(nil), "ubroker.Message") - proto.RegisterType((*Delivery)(nil), "ubroker.Delivery") - proto.RegisterType((*FetchRequest)(nil), "ubroker.FetchRequest") - proto.RegisterType((*AcknowledgeRequest)(nil), "ubroker.AcknowledgeRequest") - proto.RegisterType((*ReQueueRequest)(nil), "ubroker.ReQueueRequest") -} - -func init() { proto.RegisterFile("api/ubroker.proto", fileDescriptor_c9a5bc08b618fc5f) } - -var fileDescriptor_c9a5bc08b618fc5f = []byte{ - // 306 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x92, 0xc1, 0x4b, 0xc3, 0x30, - 0x14, 0xc6, 0xc9, 0x70, 0xab, 0xbc, 0x8d, 0xe1, 0x1e, 0xa8, 0x63, 0x43, 0x18, 0xc5, 0x43, 0xd9, - 0xa1, 0x95, 0x0d, 0x2f, 0xde, 0x1c, 0xba, 0x9b, 0xa0, 0x3d, 0x7a, 0x6b, 0x96, 0x67, 0x16, 0xd6, - 0x9a, 0xda, 0x36, 0xca, 0xfc, 0xbb, 0xfd, 0x03, 0xc4, 0xac, 0x29, 0x93, 0xb1, 0xdb, 0xcb, 0xe3, - 0xfb, 0xf2, 0xfd, 0xf2, 0x11, 0x18, 0x24, 0xb9, 0x8a, 0x0c, 0x2f, 0xf4, 0x86, 0x8a, 0x30, 0x2f, - 0x74, 0xa5, 0xd1, 0xab, 0x8f, 0xa3, 0xb1, 0xd4, 0x5a, 0xa6, 0x14, 0xd9, 0x35, 0x37, 0x6f, 0x11, - 0x65, 0x79, 0xb5, 0xdd, 0xa9, 0xfc, 0x2b, 0xf0, 0x9e, 0xa8, 0x2c, 0x13, 0x49, 0x88, 0x70, 0xc2, - 0xb5, 0xd8, 0x0e, 0xd9, 0x84, 0x05, 0xbd, 0xd8, 0xce, 0xfe, 0x12, 0x4e, 0x1f, 0x28, 0x55, 0x9f, - 0x54, 0x6c, 0x71, 0x0a, 0x5e, 0xb6, 0x93, 0x5a, 0x49, 0x77, 0x76, 0x16, 0xba, 0xc4, 0xfa, 0x8a, - 0xd8, 0x09, 0xb0, 0x0f, 0x2d, 0x25, 0x86, 0xad, 0x09, 0x0b, 0xda, 0x71, 0x4b, 0x09, 0xbf, 0x0f, - 0xbd, 0x25, 0x55, 0xab, 0x75, 0x4c, 0x1f, 0x86, 0xca, 0xca, 0xbf, 0x06, 0xbc, 0x5f, 0x6d, 0xde, - 0xf5, 0x57, 0x4a, 0x42, 0x52, 0xbd, 0xad, 0x5d, 0xac, 0x71, 0x4d, 0xa0, 0x1f, 0xd3, 0x8b, 0x21, - 0x73, 0x4c, 0x31, 0xfb, 0x61, 0xd0, 0x59, 0x58, 0x06, 0xbc, 0x85, 0xb6, 0x8d, 0xc0, 0xf3, 0x06, - 0x6b, 0x3f, 0x72, 0x34, 0x68, 0xd6, 0xee, 0x45, 0x01, 0xbb, 0x61, 0xb8, 0x80, 0xee, 0x1e, 0x09, - 0x8e, 0x1b, 0xd5, 0x21, 0xdf, 0xe8, 0x22, 0xdc, 0x55, 0x19, 0xba, 0x2a, 0xc3, 0xc7, 0xbf, 0x2a, - 0xf1, 0x0e, 0xbc, 0x9a, 0x13, 0x2f, 0x1b, 0xff, 0x7f, 0xf2, 0xa3, 0xde, 0x39, 0x78, 0xcf, 0x86, - 0xa7, 0xaa, 0x5c, 0xe3, 0x41, 0x9f, 0xc7, 0x4c, 0x8b, 0xe9, 0x6b, 0x20, 0x55, 0xb5, 0x36, 0x3c, - 0x5c, 0xe9, 0x2c, 0xca, 0x12, 0x12, 0x94, 0x7c, 0x27, 0xc2, 0xfd, 0x80, 0x28, 0xdf, 0x48, 0x37, - 0xf3, 0x8e, 0xf5, 0xce, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0x73, 0xdb, 0x1b, 0x91, 0x23, 0x02, - 0x00, 0x00, -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// BrokerClient is the client API for Broker service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. -type BrokerClient interface { - // Fetch should return a single Delivery per FetchRequest. - // Should return: - // Unavailable: If broker has been closed - Fetch(ctx context.Context, opts ...grpc.CallOption) (Broker_FetchClient, error) - // Acknowledge a message - // Should return: - // OK: on success - // Unavailable: If broker has been closed - // InvalidArgument: If requested ID is invalid - Acknowledge(ctx context.Context, in *AcknowledgeRequest, opts ...grpc.CallOption) (*empty.Empty, error) - // ReQueue a message - // OK: on success - // Unavailable: If broker has been closed - // InvalidArgument: If requested ID is invalid - ReQueue(ctx context.Context, in *ReQueueRequest, opts ...grpc.CallOption) (*empty.Empty, error) - // Publish message to Queue - // OK: on success - // Unavailable: If broker has been closed - Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*empty.Empty, error) -} - -type brokerClient struct { - cc *grpc.ClientConn -} - -func NewBrokerClient(cc *grpc.ClientConn) BrokerClient { - return &brokerClient{cc} -} - -func (c *brokerClient) Fetch(ctx context.Context, opts ...grpc.CallOption) (Broker_FetchClient, error) { - stream, err := c.cc.NewStream(ctx, &_Broker_serviceDesc.Streams[0], "/ubroker.Broker/Fetch", opts...) - if err != nil { - return nil, err - } - x := &brokerFetchClient{stream} - return x, nil -} - -type Broker_FetchClient interface { - Send(*FetchRequest) error - Recv() (*Delivery, error) - grpc.ClientStream -} - -type brokerFetchClient struct { - grpc.ClientStream -} - -func (x *brokerFetchClient) Send(m *FetchRequest) error { - return x.ClientStream.SendMsg(m) -} - -func (x *brokerFetchClient) Recv() (*Delivery, error) { - m := new(Delivery) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *brokerClient) Acknowledge(ctx context.Context, in *AcknowledgeRequest, opts ...grpc.CallOption) (*empty.Empty, error) { - out := new(empty.Empty) - err := c.cc.Invoke(ctx, "/ubroker.Broker/Acknowledge", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *brokerClient) ReQueue(ctx context.Context, in *ReQueueRequest, opts ...grpc.CallOption) (*empty.Empty, error) { - out := new(empty.Empty) - err := c.cc.Invoke(ctx, "/ubroker.Broker/ReQueue", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -func (c *brokerClient) Publish(ctx context.Context, in *Message, opts ...grpc.CallOption) (*empty.Empty, error) { - out := new(empty.Empty) - err := c.cc.Invoke(ctx, "/ubroker.Broker/Publish", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// BrokerServer is the server API for Broker service. -type BrokerServer interface { - // Fetch should return a single Delivery per FetchRequest. - // Should return: - // Unavailable: If broker has been closed - Fetch(Broker_FetchServer) error - // Acknowledge a message - // Should return: - // OK: on success - // Unavailable: If broker has been closed - // InvalidArgument: If requested ID is invalid - Acknowledge(context.Context, *AcknowledgeRequest) (*empty.Empty, error) - // ReQueue a message - // OK: on success - // Unavailable: If broker has been closed - // InvalidArgument: If requested ID is invalid - ReQueue(context.Context, *ReQueueRequest) (*empty.Empty, error) - // Publish message to Queue - // OK: on success - // Unavailable: If broker has been closed - Publish(context.Context, *Message) (*empty.Empty, error) -} - -// UnimplementedBrokerServer can be embedded to have forward compatible implementations. -type UnimplementedBrokerServer struct { -} - -func (*UnimplementedBrokerServer) Fetch(srv Broker_FetchServer) error { - return status.Errorf(codes.Unimplemented, "method Fetch not implemented") -} -func (*UnimplementedBrokerServer) Acknowledge(ctx context.Context, req *AcknowledgeRequest) (*empty.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method Acknowledge not implemented") -} -func (*UnimplementedBrokerServer) ReQueue(ctx context.Context, req *ReQueueRequest) (*empty.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method ReQueue not implemented") -} -func (*UnimplementedBrokerServer) Publish(ctx context.Context, req *Message) (*empty.Empty, error) { - return nil, status.Errorf(codes.Unimplemented, "method Publish not implemented") -} - -func RegisterBrokerServer(s *grpc.Server, srv BrokerServer) { - s.RegisterService(&_Broker_serviceDesc, srv) -} - -func _Broker_Fetch_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(BrokerServer).Fetch(&brokerFetchServer{stream}) -} - -type Broker_FetchServer interface { - Send(*Delivery) error - Recv() (*FetchRequest, error) - grpc.ServerStream -} - -type brokerFetchServer struct { - grpc.ServerStream -} - -func (x *brokerFetchServer) Send(m *Delivery) error { - return x.ServerStream.SendMsg(m) -} - -func (x *brokerFetchServer) Recv() (*FetchRequest, error) { - m := new(FetchRequest) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func _Broker_Acknowledge_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(AcknowledgeRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BrokerServer).Acknowledge(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/ubroker.Broker/Acknowledge", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BrokerServer).Acknowledge(ctx, req.(*AcknowledgeRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Broker_ReQueue_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(ReQueueRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BrokerServer).ReQueue(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/ubroker.Broker/ReQueue", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BrokerServer).ReQueue(ctx, req.(*ReQueueRequest)) - } - return interceptor(ctx, in, info, handler) -} - -func _Broker_Publish_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(Message) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(BrokerServer).Publish(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/ubroker.Broker/Publish", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(BrokerServer).Publish(ctx, req.(*Message)) - } - return interceptor(ctx, in, info, handler) -} - -var _Broker_serviceDesc = grpc.ServiceDesc{ - ServiceName: "ubroker.Broker", - HandlerType: (*BrokerServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "Acknowledge", - Handler: _Broker_Acknowledge_Handler, - }, - { - MethodName: "ReQueue", - Handler: _Broker_ReQueue_Handler, - }, - { - MethodName: "Publish", - Handler: _Broker_Publish_Handler, - }, - }, - Streams: []grpc.StreamDesc{ - { - StreamName: "Fetch", - Handler: _Broker_Fetch_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "api/ubroker.proto", -}