44 "context"
55 "errors"
66 "net"
7+ "strings"
8+ "sync/atomic"
79 "testing"
810 "time"
911
@@ -136,39 +138,100 @@ func TestSubscribe(t *testing.T) {
136138 pb .RegisterRelayerServer (r .server , r )
137139
138140 lis := bufconn .Listen (1024 * 1024 )
141+
142+ serverDone := make (chan struct {})
143+ var serverErr atomic.Value
144+
145+ mock .ExpectXGroupCreateMkStream (blockStreamName , "member_group:testClient" , "0" ).SetVal ("OK" )
146+
147+ mock .ExpectXReadGroup (& redis.XReadGroupArgs {
148+ Group : "member_group:testClient" ,
149+ Consumer : "member_consumer:testClient" ,
150+ Streams : []string {blockStreamName , "0" },
151+ Count : 1 ,
152+ Block : time .Second ,
153+ }).SetErr (redis .Nil )
154+
155+ mock .ExpectXReadGroup (& redis.XReadGroupArgs {
156+ Group : "member_group:testClient" ,
157+ Consumer : "member_consumer:testClient" ,
158+ Streams : []string {blockStreamName , ">" },
159+ Count : 1 ,
160+ Block : time .Second ,
161+ }).SetVal ([]redis.XStream {
162+ {
163+ Stream : blockStreamName ,
164+ Messages : []redis.XMessage {
165+ {
166+ ID : "123-1" ,
167+ Values : map [string ]interface {}{
168+ "payload_id" : "payload_123" ,
169+ "execution_payload" : "some_encoded_payload" ,
170+ "sender_instance_id" : "instance_abc" ,
171+ },
172+ },
173+ },
174+ },
175+ })
176+
177+ ackCalled := make (chan struct {})
178+
179+ customMatch := func (expected , actual []interface {}) error {
180+ if len (actual ) >= 1 {
181+ cmdName , ok := actual [0 ].(string )
182+ if ok && strings .ToUpper (cmdName ) == "XACK" {
183+ select {
184+ case <- ackCalled :
185+ default :
186+ close (ackCalled )
187+ }
188+ }
189+ }
190+ return nil
191+ }
192+
193+ mock .CustomMatch (customMatch ).ExpectXAck (blockStreamName , "member_group:testClient" , "123-1" ).SetVal (int64 (1 ))
194+
139195 go func () {
140- if err := r .server .Serve (lis ); err != nil {
141- t .Errorf ("Server exited with error: %v" , err )
196+ err := r .server .Serve (lis )
197+ if err != nil && err != grpc .ErrServerStopped {
198+ serverErr .Store (err )
142199 }
200+ close (serverDone )
143201 }()
144202
145- defer r .server .GracefulStop ()
203+ defer func () {
204+ r .server .GracefulStop ()
205+ <- serverDone
206+ if err , ok := serverErr .Load ().(error ); ok {
207+ t .Errorf ("Server error: %v" , err )
208+ }
209+ }()
146210
147- ctx , cancel := context .WithCancel (context .Background ())
211+ // Create a gRPC client
212+ ctx , cancel := context .WithTimeout (context .Background (), 5 * time .Second )
148213 defer cancel ()
149214
150215 conn , err := grpc .NewClient (
151216 "passthrough:///" ,
152217 grpc .WithTransportCredentials (insecure .NewCredentials ()),
153218 grpc .WithContextDialer (func (ctx context.Context , s string ) (net.Conn , error ) {
154219 return lis .Dial ()
155- }))
220+ }),
221+ )
156222 if err != nil {
157223 t .Fatalf ("failed to dial bufconn: %v" , err )
158224 }
159225 defer conn .Close ()
160226
161227 client := pb .NewRelayerClient (conn )
162228
163- // Expect a consumer group to be created
164- mock .ExpectXGroupCreateMkStream (blockStreamName , "member_group:testClient" , "0" ).SetVal ("OK" )
165-
229+ // Call Subscribe
166230 stream , err := client .Subscribe (ctx )
167231 if err != nil {
168232 t .Fatalf ("failed to call Subscribe: %v" , err )
169233 }
170234
171- // Send initial subscribe request
172235 err = stream .Send (& pb.ClientMessage {
173236 Message : & pb.ClientMessage_SubscribeRequest {
174237 SubscribeRequest : & pb.SubscribeRequest {
@@ -180,36 +243,6 @@ func TestSubscribe(t *testing.T) {
180243 t .Fatalf ("failed to send subscribe request: %v" , err )
181244 }
182245
183- mock .ExpectXReadGroup (& redis.XReadGroupArgs {
184- Group : "member_group:testClient" ,
185- Consumer : "member_consumer:testClient" ,
186- Streams : []string {blockStreamName , "0" },
187- Count : 1 ,
188- Block : time .Second ,
189- }).SetErr (redis .Nil )
190-
191- mock .ExpectXReadGroup (& redis.XReadGroupArgs {
192- Group : "member_group:testClient" ,
193- Consumer : "member_consumer:testClient" ,
194- Streams : []string {blockStreamName , ">" },
195- Count : 1 ,
196- Block : time .Second ,
197- }).SetVal ([]redis.XStream {
198- {
199- Stream : blockStreamName ,
200- Messages : []redis.XMessage {
201- {
202- ID : "123-1" ,
203- Values : map [string ]interface {}{
204- "payload_id" : "payload_123" ,
205- "execution_payload" : "some_encoded_payload" ,
206- "sender_instance_id" : "instance_abc" ,
207- },
208- },
209- },
210- },
211- })
212-
213246 recvMsg , err := stream .Recv ()
214247 if err != nil {
215248 t .Fatalf ("failed to receive message from server: %v" , err )
@@ -218,8 +251,6 @@ func TestSubscribe(t *testing.T) {
218251 t .Errorf ("expected payload_123, got %s" , recvMsg .GetPayloadId ())
219252 }
220253
221- mock .ExpectXAck (blockStreamName , "member_group:testClient" , "123-1" ).SetVal (1 )
222-
223254 err = stream .Send (& pb.ClientMessage {
224255 Message : & pb.ClientMessage_AckPayload {
225256 AckPayload : & pb.AckPayloadRequest {
@@ -233,28 +264,13 @@ func TestSubscribe(t *testing.T) {
233264 t .Fatalf ("failed to send ack: %v" , err )
234265 }
235266
236- mock .ExpectXReadGroup (& redis.XReadGroupArgs {
237- Group : "member_group:testClient" ,
238- Consumer : "member_consumer:testClient" ,
239- Streams : []string {blockStreamName , "0" },
240- Count : 1 ,
241- Block : time .Second ,
242- }).SetErr (redis .Nil )
243-
244- mock .ExpectXReadGroup (& redis.XReadGroupArgs {
245- Group : "member_group:testClient" ,
246- Consumer : "member_consumer:testClient" ,
247- Streams : []string {blockStreamName , ">" },
248- Count : 1 ,
249- Block : time .Second ,
250- }).SetErr (redis .Nil )
251-
252- // Give the server some time to process these reads
253- time .Sleep (100 * time .Millisecond )
267+ select {
268+ case <- ackCalled :
269+ case <- time .After (2 * time .Second ):
270+ t .Fatalf ("timeout waiting for XAck to be called" )
271+ }
254272
255273 if err := mock .ExpectationsWereMet (); err != nil {
256274 t .Errorf ("unmet redis expectations: %v" , err )
257275 }
258-
259- cancel ()
260276}
0 commit comments