Skip to content

Commit 79abd30

Browse files
committed
refact: memberslist CallbackBroadcast
1 parent 48aef10 commit 79abd30

File tree

5 files changed

+53
-4
lines changed

5 files changed

+53
-4
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,4 +91,4 @@ require (
9191
golang.org/x/tools v0.36.0 // indirect
9292
)
9393

94-
replace github.com/hashicorp/memberlist => github.com/spikeekips/memberlist v0.0.0-20230626195851-39f17fa10d23 // latest fix-data-race branch
94+
replace github.com/hashicorp/memberlist => github.com/HayoungOh5/memberlist v0.0.0-20251120091718-913bc68ce0d2

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
2+
github.com/HayoungOh5/memberlist v0.0.0-20251120091718-913bc68ce0d2 h1:bbPmccnva4F5KQOkUwOsBqYKTDvSCXQu8XV+cKqGEG8=
3+
github.com/HayoungOh5/memberlist v0.0.0-20251120091718-913bc68ce0d2/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0=
24
github.com/Masterminds/semver/v3 v3.4.0 h1:Zog+i5UMtVoCU8oKka5P7i9q9HgrJeGzI9SA1Xbatp0=
35
github.com/Masterminds/semver/v3 v3.4.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
46
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
@@ -283,8 +285,6 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt
283285
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
284286
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
285287
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
286-
github.com/spikeekips/memberlist v0.0.0-20230626195851-39f17fa10d23 h1:nE7dp89DUO4ETbLLwU93ICJPox08LI+yB5UbRx+nHtE=
287-
github.com/spikeekips/memberlist v0.0.0-20230626195851-39f17fa10d23/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0=
288288
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
289289
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
290290
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=

launch/p_states_network_handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func AttachHandlerSendOperation(pctx context.Context) error {
165165
}
166166
}
167167

168-
return memberlist.CallbackBroadcast(b, id, nil)
168+
return memberlist.CallbackBroadcast2(b, id, nil)
169169
},
170170
params.MISC.MaxMessageSize,
171171
),

network/quicmemberlist/delegate.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ func (d *Delegate) QueueBroadcast(b memberlist.Broadcast) {
6868
d.qu.QueueBroadcast(b)
6969
}
7070

71+
func (d *Delegate) QueueBroadcast2(b memberlist.Broadcast) {
72+
d.qu.QueueBroadcast2(b)
73+
}
74+
7175
func (d *Delegate) GetBroadcasts(overhead, limit int) [][]byte {
7276
d.qu.RetransmitMult = d.reTransmitMult()
7377
return d.qu.GetBroadcasts(overhead, limit)

network/quicmemberlist/memberlist.go

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,22 @@ func (srv *Memberlist) Broadcast(b memberlist.Broadcast) {
323323
srv.delegate.QueueBroadcast(b)
324324
}
325325

326+
func (srv *Memberlist) Broadcast2(b memberlist.Broadcast) {
327+
if !srv.CanBroadcast() {
328+
b.Finished()
329+
330+
return
331+
}
332+
333+
if srv.metricsCollector != nil {
334+
srv.metricsCollector.RecordMemberlistBroadcast()
335+
}
336+
337+
srv.Log().Trace().Interface("broadcast", b).Msg("enqueue broadcast")
338+
339+
srv.delegate.QueueBroadcast2(b)
340+
}
341+
326342
func (srv *Memberlist) CallbackBroadcast(b []byte, id string, notifych chan struct{}) error {
327343
if !srv.CanBroadcast() {
328344
if notifych != nil {
@@ -352,6 +368,35 @@ func (srv *Memberlist) CallbackBroadcast(b []byte, id string, notifych chan stru
352368
return nil
353369
}
354370

371+
func (srv *Memberlist) CallbackBroadcast2(b []byte, id string, notifych chan struct{}) error {
372+
if !srv.CanBroadcast() {
373+
if notifych != nil {
374+
close(notifych)
375+
}
376+
377+
return nil
378+
}
379+
380+
// NOTE save b in cache first
381+
srv.cbcache.Set(id, b, srv.args.CallbackBroadcastMessageExpire)
382+
383+
switch i, err := srv.args.Encoder.Marshal(
384+
NewConnInfoBroadcastMessage(id, srv.local.ConnInfo())); {
385+
case err != nil:
386+
return err
387+
default:
388+
buf := bytes.NewBuffer(nil)
389+
defer buf.Reset()
390+
391+
_, _ = buf.Write(callbackBroadcastMessageHeaderPrefix)
392+
_, _ = buf.Write(i)
393+
394+
srv.Broadcast2(NewBroadcast(buf.Bytes(), id, notifych))
395+
}
396+
397+
return nil
398+
}
399+
355400
func (srv *Memberlist) CallbackBroadcastHandler() quicstreamheader.Handler[CallbackBroadcastMessageHeader] {
356401
var sg singleflight.Group
357402

0 commit comments

Comments
 (0)