diff --git a/cmd/subzero-ion-connect/subzero_ion_connect.go b/cmd/subzero-ion-connect/subzero_ion_connect.go index 89311cf2..fd1b45de 100644 --- a/cmd/subzero-ion-connect/subzero_ion_connect.go +++ b/cmd/subzero-ion-connect/subzero_ion_connect.go @@ -134,7 +134,6 @@ func init() { if err := query.CommitEvents(ctx, events...); err != nil { return errors.Wrapf(err, "failed to delete outdated replaced events") } - if err := storage.AcceptEvents(ctx, events...); err != nil { return errors.Wrapf(err, "storage.AcceptEvents failed: %s", model.Events(events).String()) } diff --git a/database/command/command.go b/database/command/command.go index fc3724d6..d0e608a2 100644 --- a/database/command/command.go +++ b/database/command/command.go @@ -19,13 +19,13 @@ import ( "github.com/ice-blockchain/cometbft/config" cmtlog "github.com/ice-blockchain/cometbft/libs/log" "github.com/ice-blockchain/cometbft/multiplex/client" - "github.com/ice-blockchain/cometbft/multiplex/server" + "github.com/ice-blockchain/cometbft/multiplex/types" "github.com/ice-blockchain/subzero/model" ) type ( consensus struct { - Server server.Server + Server types.Backend Client client.Client Logger cmtlog.Logger ServerConfig *config.Config @@ -56,6 +56,7 @@ func (c *consensus) waitForStop(ctx context.Context) { if c.Server != nil { c.Logger.Debug("stopping consensus server") err = c.Server.Stop() + c.Server.Wait() c.Logger.Debug("stopped consensus server") } if ch != nil { diff --git a/database/command/command_fixture.go b/database/command/command_fixture.go index 864d7c22..a47a7d42 100644 --- a/database/command/command_fixture.go +++ b/database/command/command_fixture.go @@ -18,26 +18,44 @@ import ( "github.com/ice-blockchain/cometbft/config" "github.com/ice-blockchain/cometbft/crypto/ed25519" "github.com/ice-blockchain/cometbft/multiplex" + "github.com/ice-blockchain/cometbft/multiplex/runtime" "github.com/ice-blockchain/cometbft/p2p" ) type TestConsensus interface { Consensus DiscoveryPort() uint16 + NodeID() string Stop(ctx context.Context, timeout time.Duration) error Start(ctx context.Context) + Wait() // Waits for quit channel. } func (c *consensus) DiscoveryPort() uint16 { return c.Config.DiscoveryPort } +func (c *consensus) NodeID() string { + nodeKeyFile := c.ServerConfig.NodeKeyFile() + nodeKey, err := p2p.LoadNodeKey(nodeKeyFile) + if err != nil { + panic(fmt.Errorf("failed to load node key file: %w", err)) + } + return string(nodeKey.ID()) +} + func (c *consensus) Start(ctx context.Context) { if c.Server != nil { panic("the server is already running") } c.ServerConfig.Instrumentation.Namespace = uuid.NewString() - server, err := multiplex.NewServer(ctx, c, c.ServerConfig, c.Logger) + server, err := multiplex.NewServer(ctx, c, c.ServerConfig, c.Logger, + multiplex.WithRuntimeManagerOptions( + runtime.RegistryWithConsensusOptions( + runtime.ConsensusPoolWithAcceptor(c), + ), + ), + ) if err != nil { panic(errors.Wrapf(err, "failed to start consensus server")) } @@ -48,6 +66,14 @@ func (c *consensus) Start(ctx context.Context) { go c.waitForStop(ctx) } +func (c *consensus) Wait() { + if c.Server == nil { + return + } + + c.Server.Wait() +} + func (c *consensus) Stop(ctx context.Context, timeout time.Duration) error { ch := make(chan error, 1) @@ -110,6 +136,7 @@ func newConsensusNode(ctx context.Context, nodeCfg *config.Config, port uint16, DiscoveryPort: port, AbsoluteRootPath: storage, AbsoluteNodePrivateKeyPath: keyPath.Name(), + Enabled: true, }), } if len(opts) > 0 { diff --git a/database/command/global.go b/database/command/global.go index b66c23cc..002c8a71 100644 --- a/database/command/global.go +++ b/database/command/global.go @@ -7,7 +7,6 @@ import ( "fmt" "os" "path/filepath" - "strings" "sync" "github.com/cockroachdb/errors" @@ -16,6 +15,7 @@ import ( cmtlog "github.com/ice-blockchain/cometbft/libs/log" "github.com/ice-blockchain/cometbft/multiplex" "github.com/ice-blockchain/cometbft/multiplex/client" + "github.com/ice-blockchain/cometbft/multiplex/runtime" "github.com/ice-blockchain/cometbft/p2p" "github.com/ice-blockchain/subzero/cfg" "github.com/ice-blockchain/subzero/database/query" @@ -62,6 +62,7 @@ type Config struct { RelayUrl string `yaml:"relay-url"` DiscoveryPort uint16 `yaml:"discovery-port"` Debug bool `yaml:"debug"` + Enabled bool `yaml:"enabled"` } type Option func(*consensus) @@ -100,16 +101,21 @@ func WithClient(client client.Client) Option { } } +func Enabled() bool { + conf := cfg.MustGet[Config]() + disabled = !conf.Enabled + return !disabled +} + func MustInit(ctx context.Context, opts ...Option) { conf := cfg.MustGet[Config]() - if disabled || strings.Contains(conf.RelayUrl, ".testnet.") || (conf.RelayUrl == "" && conf.AbsoluteRootPath == "" && conf.DiscoveryPort == 0) { - disabled = true + disabled = !conf.Enabled + if disabled { return } globalConsensus.Once.Do(func() { globalConsensus.Consensus = mustInit(ctx, config.DefaultConfig(), opts...) }) - } func mustInit(ctx context.Context, serverCfg *config.Config, opts ...Option) *consensus { @@ -129,12 +135,6 @@ func mustInit(ctx context.Context, serverCfg *config.Config, opts ...Option) *co serverCfg.SetRoot(c.Config.AbsoluteRootPath) serverCfg.NodeKey = c.Config.AbsoluteNodePrivateKeyPath - serverCfg.MultiplexConfig = config.MultiplexBaseConfig( - map[string]string{}, - map[string][]string{}, - ).MultiplexConfig - serverCfg.P2P.MaxPacketMsgPayloadSize = 1 * 1024 * 1024 - serverCfg.DBBackend = "goleveldb" serverCfg.DiscoveryPort = c.Config.DiscoveryPort if c.Config.ExternalAddress != "" { serverCfg.P2P.ExternalAddress = c.Config.ExternalAddress @@ -159,7 +159,13 @@ func mustInit(ctx context.Context, serverCfg *config.Config, opts ...Option) *co if err != nil { panic(errors.Wrapf(err, "failed to generate consensus node key")) } - cometbftServer, err := multiplex.NewServer(ctx, c, serverCfg, c.Logger) + cometbftServer, err := multiplex.NewServer(ctx, c, serverCfg, c.Logger, + multiplex.WithRuntimeManagerOptions( + runtime.RegistryWithConsensusOptions( + runtime.ConsensusPoolWithAcceptor(c), + ), + ), + ) if err != nil { panic(errors.Wrapf(err, "failed to start consensus server")) } diff --git a/database/query/ddl/00020_enable_cometbft_trigger.sql b/database/query/ddl/00020_enable_cometbft_trigger.sql new file mode 100644 index 00000000..c4b58a1f --- /dev/null +++ b/database/query/ddl/00020_enable_cometbft_trigger.sql @@ -0,0 +1,87 @@ +-- SPDX-License-Identifier: ice License 1.0 + +CREATE OR REPLACE TRIGGER trigger_events_store_replaceable_data_before_update + AFTER UPDATE ON events + FOR EACH ROW + WHEN (((10000 <= old.kind AND old.kind < 20000 ) OR old.kind = 0 OR old.kind = 3 OR (30000 <= old.kind AND old.kind < 40000)) AND old.id != new.id) + EXECUTE FUNCTION events_store_replaceable_data_before_update(); +-------- +CREATE OR REPLACE FUNCTION events_store_replaceable_data_before_update() + RETURNS TRIGGER AS $$ +BEGIN + IF NEW.reference_id = NEW.id THEN + NEW.reference_id = NULL; + RETURN NEW; -- dont insert into replaceable_events_before_update, its replay + END IF; + insert into replaceable_events_before_update ( + created_at, + lookup_created_at, + expiration, + kind, + lookup, + key_alg, + content, + d_tag, + h_tag, + address, + id, + system_id, + pubkey, + master_pubkey, + sig, + sig_alg, + reference_id, + tags, + t_tags, + gift_receiver_pubkey, + has_images, + has_videos, + deleted, + is_reply, + is_root_reply, + is_quote, + has_ephemeral_attestation, + has_references, + hidden, + verified, + lang, + replaced_by_id + ) + values ( + old.created_at, + old.lookup_created_at, + old.expiration, + old.kind, + old.lookup, + old.key_alg, + old.content, + old.d_tag, + old.h_tag, + old.address, + old.id, + old.system_id, + old.pubkey, + old.master_pubkey, + old.sig, + old.sig_alg, + old.reference_id, + old.tags, + old.t_tags, + old.gift_receiver_pubkey, + old.has_images, + old.has_videos, + old.deleted, + old.is_reply, + old.is_root_reply, + old.is_quote, + old.has_ephemeral_attestation, + old.has_references, + old.hidden, + old.verified, + old.lang, + new.id + ) + ON CONFLICT DO NOTHING; + RETURN NEW; +END; +$$ LANGUAGE plpgsql; diff --git a/database/query/query_test.go b/database/query/query_test.go index 7d7e3ac2..4f2f63a2 100644 --- a/database/query/query_test.go +++ b/database/query/query_test.go @@ -194,7 +194,6 @@ func TestReplaceableEvents(t *testing.T) { require.Equal(t, ev3.Event, stored[0].Event, "event 3") require.Equal(t, ev2.Event, stored[1].Event, "event 2") t.Run("rollback with command", func(t *testing.T) { - t.Skip("re-enable trigger_events_store_replaceable_data_before_update to work") // Rollback require.NoError(t, db.RollbackEvents(t.Context(), ev2, ev3)) stored = helperSelectEvents(t, db, model.Filter{ diff --git a/go.mod b/go.mod index 82078d70..73d27fe6 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.3 github.com/hashicorp/go-multierror v1.1.1 - github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20250714150930-6fb404dc7d83 + github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20251211081312-072c958a5404 github.com/ice-blockchain/go/src v0.0.0-20250625091122-356c0c7d1830 github.com/imroc/req/v3 v3.57.0 github.com/jackc/pgerrcode v0.0.0-20250907135507-afb5586c32a6 @@ -87,7 +87,7 @@ require ( go.uber.org/goleak v1.3.0 golang.org/x/net v0.48.0 golang.org/x/sync v0.19.0 - google.golang.org/api v0.257.0 + google.golang.org/api v0.258.0 google.golang.org/grpc v1.77.0 ) @@ -195,7 +195,7 @@ require ( github.com/google/s2a-go v0.1.9 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect - github.com/googleapis/gax-go/v2 v2.15.0 // indirect + github.com/googleapis/gax-go/v2 v2.16.0 // indirect github.com/gookit/color v1.6.0 // indirect github.com/gorilla/css v1.0.1 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect @@ -203,7 +203,7 @@ require ( github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hexops/gotextdiff v1.0.3 // indirect github.com/huandu/xstrings v1.5.0 // indirect - github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20250714150930-6fb404dc7d83 // indirect + github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20251211081312-072c958a5404 // indirect github.com/icholy/digest v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect diff --git a/go.sum b/go.sum index fd4c7715..54c00ccb 100644 --- a/go.sum +++ b/go.sum @@ -386,8 +386,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/enterprise-certificate-proxy v0.3.7 h1:zrn2Ee/nWmHulBx5sAVrGgAa0f2/R35S4DJwfFaUPFQ= github.com/googleapis/enterprise-certificate-proxy v0.3.7/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA= -github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81vgd/bo= -github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc= +github.com/googleapis/gax-go/v2 v2.16.0 h1:iHbQmKLLZrexmb0OSsNGTeSTS0HO4YvFOG8g5E4Zd0Y= +github.com/googleapis/gax-go/v2 v2.16.0/go.mod h1:o1vfQjjNZn4+dPnRdl/4ZD7S9414Y4xA+a/6Icj6l14= github.com/gookit/assert v0.1.1 h1:lh3GcawXe/p+cU7ESTZ5Ui3Sm/x8JWpIis4/1aF0mY0= github.com/gookit/assert v0.1.1/go.mod h1:jS5bmIVQZTIwk42uXl4lyj4iaaxx32tqH16CFj0VX2E= github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQHCoQ= @@ -415,10 +415,10 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI= github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20250714150930-6fb404dc7d83 h1:Z5y2thskt5DyCxnPx+Lmed7N/r3qEebBqkFhQuSrF38= -github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20250714150930-6fb404dc7d83/go.mod h1:j0GCjFgUZ4JtaNHQs5FFOaDReK0+Mss/YfwmYMEI6V4= -github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20250714150930-6fb404dc7d83 h1:hyiw8EykSG2nFcRVAZMCGDgS0jxcfIPgtGcSPJE8Poo= -github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20250714150930-6fb404dc7d83/go.mod h1:2CYWOH6Lxdb1DeMpvCXsArRzLV1CwTj781meNNiAu6I= +github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20251211081312-072c958a5404 h1:JszqKwX/dEaEdNqQp53Cjz9Lu/CluUh16QOf5VgweFQ= +github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20251211081312-072c958a5404/go.mod h1:j0GCjFgUZ4JtaNHQs5FFOaDReK0+Mss/YfwmYMEI6V4= +github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20251211081312-072c958a5404 h1:erZwmyNagwLX2GksrWLq/kme4FAYLQfvQ8bvURHITjE= +github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20251211081312-072c958a5404/go.mod h1:2CYWOH6Lxdb1DeMpvCXsArRzLV1CwTj781meNNiAu6I= github.com/ice-blockchain/go-nostr v0.42.3-ion.0.20251001102109-e2a999a12f4d h1:B/9bWYRiQjy84xAf8GKv/Y12PAd2Mieh7ikUKF8Hj68= github.com/ice-blockchain/go-nostr v0.42.3-ion.0.20251001102109-e2a999a12f4d/go.mod h1:c7xUEi3rkXfSVPveupRW3AP71t9JJwVzrepJjfw1wZQ= github.com/ice-blockchain/go/src v0.0.0-20250625091122-356c0c7d1830 h1:FymrVSpOD6rKyZ97MJr5Xu2ZP29EqTKpdMqTf2DrMgU= @@ -1015,8 +1015,8 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= -google.golang.org/api v0.257.0 h1:8Y0lzvHlZps53PEaw+G29SsQIkuKrumGWs9puiexNAA= -google.golang.org/api v0.257.0/go.mod h1:4eJrr+vbVaZSqs7vovFd1Jb/A6ml6iw2e6FBYf3GAO4= +google.golang.org/api v0.258.0 h1:IKo1j5FBlN74fe5isA2PVozN3Y5pwNKriEgAXPOkDAc= +google.golang.org/api v0.258.0/go.mod h1:qhOMTQEZ6lUps63ZNq9jhODswwjkjYYguA7fA3TBFww= google.golang.org/appengine/v2 v2.0.6 h1:LvPZLGuchSBslPBp+LAhihBeGSiRh1myRoYK4NtuBIw= google.golang.org/appengine/v2 v2.0.6/go.mod h1:WoEXGoXNfa0mLvaH5sV3ZSGXwVmy8yf7Z1JKf3J3wLI= google.golang.org/genproto v0.0.0-20251213004720-97cd9d5aeac2 h1:stRtB2UVzFOWnorVuwF0BVVEjQ3AN6SjHWdg811UIQM= diff --git a/server/server.go b/server/server.go index 96cb5280..8fcd2ca0 100644 --- a/server/server.go +++ b/server/server.go @@ -166,6 +166,10 @@ func (r *router) MustListenAndServe(ctx context.Context) { } func (r *router) BroadcastUserEvents(ctx context.Context, events ...*model.Event) error { + if command.Enabled() { + log.Debug().Msg("skipping broadcasting user events because consensus enabled") + return nil + } return r.Broadcaster.Broadcast(ctx, events...) } diff --git a/server/ws/consensus_test.go b/server/ws/consensus_test.go index a4307f54..356e3fe3 100644 --- a/server/ws/consensus_test.go +++ b/server/ws/consensus_test.go @@ -69,9 +69,11 @@ func TestConsensusEvents(t *testing.T) { return nil }) command.RegisterRollbackListener(query.RollbackEvents) - consensusDone := map[string]chan bool{} + consensusDone := map[string]chan bool{} // relays accepted event + finalizedDone := map[string]chan bool{} // block commit includes event for _, s := range pubsubServers { consensusDone[s.Endpoint()] = make(chan bool, 1000) + finalizedDone[s.Endpoint()] = make(chan bool, 1000) } accepted := xsync.NewMap[string, bool]() normalAccept := func(ctx context.Context, events ...*model.Event) error { @@ -87,10 +89,28 @@ func TestConsensusEvents(t *testing.T) { return nil } command.RegisterAcceptListener(normalAccept) + + committed := xsync.NewMap[string, bool]() + normalCommit := func(ctx context.Context, events ...*model.Event) error { + if _, ok := committed.Load(mapPort(ctx).Endpoint() + helperHashEvents(t, events...)); ok { + return nil + } + finalizedDone[mapPort(ctx).Endpoint()] <- true + committed.Store(mapPort(ctx).Endpoint()+helperHashEvents(t, events...), true) + t.Log("COMMIT", mapPort(ctx).Endpoint(), events[0].Kind, events[0].Content) + return nil + } + command.RegisterCommitListener(normalCommit) + RegisterWSSubscriptionListener(func(ctx context.Context, filters ...model.Filter) EventIterator { return mapPort(ctx).DB.SelectEvents(ctx, filters...) }) - relay := helperMustNewRelay(t, pubsubServers[0]) + + relay := helperMustNewRelay(t, pubsubServers[0]) // :9988 + secondRelay := helperMustNewRelay(t, pubsubServers[1]) // :9977 + thirdRelay := helperMustNewRelay(t, pubsubServers[2]) // :9966 + fourthRelay := helperMustNewRelay(t, pubsubServers[3]) // :9955 + var ev, ev2 *model.Event var attestationEvent *model.Event masterPrivKey, masterPubkey := model.GenerateKeyPair() @@ -118,6 +138,8 @@ func TestConsensusEvents(t *testing.T) { helperSignWithMinLeadingZeroBits(t, relaysList, privkey) require.NoError(t, relay.PublishMany(t.Context(), &attestationEvent.Event, &relaysList.Event)) require.NoError(t, helperAwaitConsensus(t, consensusDone, relay)) + require.NoError(t, helperAwaitFinalized(t, finalizedDone)) + ev = &model.Event{Event: nostr.Event{ CreatedAt: nostr.Now(), Kind: nostr.KindTextNote, @@ -130,8 +152,9 @@ func TestConsensusEvents(t *testing.T) { require.NoError(t, relay.Publish(t.Context(), ev.Event)) require.NoError(t, helperAwaitConsensus(t, consensusDone, relay)) + require.NoError(t, helperAwaitFinalized(t, finalizedDone)) }) - secondRelay := helperMustNewRelay(t, pubsubServers[1]) + t.Run("query events", func(t *testing.T) { receivedEventsFromFirstRelay := helperQueryEvents(t, t.Context(), relay, nostr.Filter{Kinds: []int{nostr.KindTextNote}}) require.Len(t, receivedEventsFromFirstRelay, 1) @@ -151,6 +174,7 @@ func TestConsensusEvents(t *testing.T) { helperSignWithMinLeadingZeroBits(t, ev2, privkey) require.NoError(t, secondRelay.Publish(t.Context(), ev2.Event)) require.NoError(t, helperAwaitConsensus(t, consensusDone, secondRelay)) + require.NoError(t, helperAwaitFinalized(t, finalizedDone)) receivedEventsFromFirstRelay = helperQueryEvents(t, t.Context(), relay, nostr.Filter{Kinds: []int{nostr.KindTextNote}}) require.Len(t, receivedEventsFromFirstRelay, 2) require.Contains(t, receivedEventsFromFirstRelay, ev) @@ -182,7 +206,6 @@ func TestConsensusEvents(t *testing.T) { }) var notAcceptedEvent *model.Event - thirdRelay := helperMustNewRelay(t, pubsubServers[2]) t.Run("failed consensus rolled back", func(t *testing.T) { rolledBack := map[string]chan bool{} for _, s := range pubsubServers { @@ -226,24 +249,60 @@ func TestConsensusEvents(t *testing.T) { require.Contains(t, receivedEventsFromThirdRelay, ev2) require.NotContains(t, receivedEventsFromThirdRelay, notAcceptedEvent) }) + command.RegisterAcceptListener(normalAccept) command.RegisterRollbackListener(query.RollbackEvents) - var eventMissedByRelay3DuringBroadcastTime, eventAfterNodeComesUp *model.Event + var eventMissedByRelay3DuringBroadcastTime, + eventMissedByRelay3DuringBroadcastTime2, + eventMissedByRelay3DuringBroadcastTime3, + eventAfterNodeComesUp *model.Event t.Run("relay fetches missed data after downtime, broadcast still works as 2/3 reached", func(t *testing.T) { - err := pubsubServers[2].Consensus.Stop(t.Context(), 10*time.Second) - t.Logf("stopping consensus on relay %v: %v", pubsubServers[2].Endpoint(), err) - eventMissedByRelay3DuringBroadcastTime = &model.Event{Event: nostr.Event{ - CreatedAt: nostr.Now(), - Kind: nostr.KindTextNote, - Tags: model.Tags{ - {model.CustomIONTagOnBehalfOf, masterPubkey}, - }, - Content: "eventMissedByRelay3DuringBroadcastTime", - }} + stopCtx, stopFn := context.WithCancel(context.Background()) + defer stopFn() + + t.Logf("stopping consensus on relay %v %v", pubsubServers[2].Endpoint(), pubsubServers[2].Consensus.NodeID()) + err := pubsubServers[2].Consensus.Stop(stopCtx, 5*time.Second) + time.Sleep(10 * time.Second) // wait for shutdown.. + t.Logf("stopped consensus on relay %v %v: %v", pubsubServers[2].Endpoint(), pubsubServers[2].Consensus.NodeID(), err) + + // 3 events each broadcast in its own block, we make sure of this with + // the helperAwaitFinalized call before sending next events. + eventsMissedByRelay3DuringBroadcastTime := []*model.Event{ + &model.Event{Event: nostr.Event{ + CreatedAt: nostr.Now(), + Kind: nostr.KindTextNote, + Tags: model.Tags{ + {model.CustomIONTagOnBehalfOf, masterPubkey}, + }, + Content: "eventMissedByRelay3DuringBroadcastTime", + }}, + &model.Event{Event: nostr.Event{ + CreatedAt: nostr.Now(), + Kind: nostr.KindTextNote, + Tags: model.Tags{ + {model.CustomIONTagOnBehalfOf, masterPubkey}, + }, + Content: "eventMissedByRelay3DuringBroadcastTime2", + }}, + &model.Event{Event: nostr.Event{ + CreatedAt: nostr.Now(), + Kind: nostr.KindTextNote, + Tags: model.Tags{ + {model.CustomIONTagOnBehalfOf, masterPubkey}, + }, + Content: "eventMissedByRelay3DuringBroadcastTime3", + }}, + } + + // broadcast+commit missedEvent #1 + eventMissedByRelay3DuringBroadcastTime = eventsMissedByRelay3DuringBroadcastTime[0] helperSignWithMinLeadingZeroBits(t, eventMissedByRelay3DuringBroadcastTime, privkey) require.NoError(t, relay.Publish(t.Context(), eventMissedByRelay3DuringBroadcastTime.Event)) waitAcceptErr := helperAwaitConsensus(t, consensusDone, relay, thirdRelay) require.NoError(t, waitAcceptErr) + require.NoError(t, helperAwaitFinalized(t, finalizedDone, thirdRelay)) + + // missedEvent #1 sanity checks. receivedEventsFromFirstRelay := helperQueryEvents(t, t.Context(), relay, nostr.Filter{Kinds: []int{nostr.KindTextNote}}) require.Contains(t, receivedEventsFromFirstRelay, eventMissedByRelay3DuringBroadcastTime) require.NotContains(t, receivedEventsFromFirstRelay, notAcceptedEvent) @@ -252,7 +311,33 @@ func TestConsensusEvents(t *testing.T) { require.NotContains(t, receivedEventsFromSecondRelay, notAcceptedEvent) receivedEventsFromThirdRelay := helperQueryEvents(t, t.Context(), thirdRelay, nostr.Filter{Kinds: []int{nostr.KindTextNote}}) require.NotContains(t, receivedEventsFromThirdRelay, eventMissedByRelay3DuringBroadcastTime) - pubsubServers[2].Consensus.Start(t.Context()) + + // broadcast+commit missedEvent #2 + eventMissedByRelay3DuringBroadcastTime2 = eventsMissedByRelay3DuringBroadcastTime[1] + helperSignWithMinLeadingZeroBits(t, eventMissedByRelay3DuringBroadcastTime2, privkey) + require.NoError(t, relay.Publish(t.Context(), eventMissedByRelay3DuringBroadcastTime2.Event)) + require.NoError(t, helperAwaitConsensus(t, consensusDone, relay, thirdRelay)) + require.NoError(t, helperAwaitFinalized(t, finalizedDone, thirdRelay)) + + // broadcast+commit missedEvent #3 + eventMissedByRelay3DuringBroadcastTime3 = eventsMissedByRelay3DuringBroadcastTime[2] + helperSignWithMinLeadingZeroBits(t, eventMissedByRelay3DuringBroadcastTime3, privkey) + require.NoError(t, relay.Publish(t.Context(), eventMissedByRelay3DuringBroadcastTime3.Event)) + require.NoError(t, helperAwaitConsensus(t, consensusDone, relay, thirdRelay)) + require.NoError(t, helperAwaitFinalized(t, finalizedDone, thirdRelay)) + + backendCtx, backendCancelFn := context.WithCancel(context.Background()) + defer backendCancelFn() + + t.Logf("starting consensus on relay %v %v", pubsubServers[2].Endpoint(), pubsubServers[2].Consensus.NodeID()) + pubsubServers[2].Consensus.Start(backendCtx) + time.Sleep(10 * time.Second) // wait for "Start" + t.Logf("started consensus on relay %v %v", pubsubServers[2].Endpoint(), pubsubServers[2].Consensus.NodeID()) + + broadcastCtx, broadcastCancelFn := context.WithTimeout(backendCtx, 30*time.Second) + defer broadcastCancelFn() + + // thirdRelay is back and will blocksync #1,#2,#3 before working on afterEvent #4. eventAfterNodeComesUp = &model.Event{Event: nostr.Event{ CreatedAt: nostr.Now(), Kind: nostr.KindTextNote, @@ -262,12 +347,34 @@ func TestConsensusEvents(t *testing.T) { Content: "eventAfterNodeComesUp", }} helperSignWithMinLeadingZeroBits(t, eventAfterNodeComesUp, privkey) - require.NoError(t, relay.Publish(t.Context(), eventAfterNodeComesUp.Event)) + require.NoError(t, relay.Publish(broadcastCtx, eventAfterNodeComesUp.Event)) require.NoError(t, helperAwaitConsensus(t, consensusDone, relay)) - time.Sleep(10 * time.Second) - receivedEventsFromThirdRelay = helperQueryEvents(t, t.Context(), thirdRelay, nostr.Filter{Kinds: []int{nostr.KindTextNote}}) + + // require +3 blocks from thirdRelay only (others have already finalized those). + require.NoError(t, helperAwaitFinalized(t, finalizedDone, relay, secondRelay, fourthRelay)) + require.NoError(t, helperAwaitFinalized(t, finalizedDone, relay, secondRelay, fourthRelay)) + require.NoError(t, helperAwaitFinalized(t, finalizedDone, relay, secondRelay, fourthRelay)) + // require block with eventAfterNodeComesUp from all + require.NoError(t, helperAwaitFinalized(t, finalizedDone)) + + // all events verification, sanity checks for queries. + receivedEventsFromFirstRelay = helperQueryEvents(t, t.Context(), relay, nostr.Filter{Kinds: []int{nostr.KindTextNote}}) + require.Contains(t, receivedEventsFromFirstRelay, eventMissedByRelay3DuringBroadcastTime) + require.Contains(t, receivedEventsFromFirstRelay, eventMissedByRelay3DuringBroadcastTime2) + require.Contains(t, receivedEventsFromFirstRelay, eventMissedByRelay3DuringBroadcastTime3) + require.Contains(t, receivedEventsFromFirstRelay, eventAfterNodeComesUp) + require.NotContains(t, receivedEventsFromFirstRelay, notAcceptedEvent) + receivedEventsFromSecondRelay = helperQueryEvents(t, t.Context(), secondRelay, nostr.Filter{Kinds: []int{nostr.KindTextNote}}) + require.Contains(t, receivedEventsFromSecondRelay, eventMissedByRelay3DuringBroadcastTime) + require.Contains(t, receivedEventsFromSecondRelay, eventMissedByRelay3DuringBroadcastTime2) + require.Contains(t, receivedEventsFromSecondRelay, eventMissedByRelay3DuringBroadcastTime3) + require.Contains(t, receivedEventsFromSecondRelay, eventAfterNodeComesUp) + require.NotContains(t, receivedEventsFromSecondRelay, notAcceptedEvent) + receivedEventsFromThirdRelay = helperQueryEvents(t, broadcastCtx, thirdRelay, nostr.Filter{Kinds: []int{nostr.KindTextNote}}) require.Contains(t, receivedEventsFromThirdRelay, eventAfterNodeComesUp) require.Contains(t, receivedEventsFromThirdRelay, eventMissedByRelay3DuringBroadcastTime) + require.Contains(t, receivedEventsFromThirdRelay, eventMissedByRelay3DuringBroadcastTime2) + require.Contains(t, receivedEventsFromThirdRelay, eventMissedByRelay3DuringBroadcastTime3) require.NotContains(t, receivedEventsFromThirdRelay, notAcceptedEvent) }) @@ -317,6 +424,7 @@ func TestConsensusEvents(t *testing.T) { helperMustCloseRelay(t, relay) helperMustCloseRelay(t, secondRelay) helperMustCloseRelay(t, thirdRelay) + helperMustCloseRelay(t, fourthRelay) } func BenchmarkConcurrentConsensusEvents(b *testing.B) { @@ -452,24 +560,80 @@ func helperPickRandomRelay(tb testing.TB) *nostrRelay { return relay } +// Wait (concurrently) for all relays except broadcastFrom to accept. +// consensusDone is written on by the consensusEventListener (AcceptBroadcastTx). func helperAwaitConsensus(t testing.TB, consensusDone map[string]chan bool, broadcastFrom ...*nostrRelay) error { t.Helper() skipUrls := make([]string, 0, len(broadcastFrom)) for _, skipRelay := range broadcastFrom { skipUrls = append(skipUrls, skipRelay.URL) } + + var wg sync.WaitGroup + errChan := make(chan error, 1) + for endpoint, done := range consensusDone { if slices.Contains(skipUrls, endpoint) { continue } - select { - case <-done: + + wg.Add(1) + go func(ep string, ch chan bool) { + defer wg.Done() + select { + case <-ch: + return + case <-time.After(30 * time.Second): + select { + case errChan <- errors.Errorf("timeout awaiting consensus from %v", ep): + default: + } + } + }(endpoint, done) + } + + wg.Wait() + close(errChan) + + return <-errChan +} + +// Wait (concurrently) for all relays except skipRelays to commit. +// consensusDone is written on by the commitEventListener (CommitBroadcastTx). +func helperAwaitFinalized(t testing.TB, finalizedDone map[string]chan bool, skipRelays ...*nostrRelay) error { + t.Helper() + skipUrls := make([]string, 0, len(skipRelays)) + for _, skipRelay := range skipRelays { + skipUrls = append(skipUrls, skipRelay.URL) + } + + var wg sync.WaitGroup + errChan := make(chan error, 1) + + for endpoint, done := range finalizedDone { + if slices.Contains(skipUrls, endpoint) { continue - case <-time.After(30 * time.Second): - return errors.Errorf("timeout awaiting consensus from %v", endpoint) } + + wg.Add(1) + go func(ep string, ch chan bool) { + defer wg.Done() + select { + case <-ch: + return + case <-time.After(60 * time.Second): + select { + case errChan <- errors.Errorf("timeout awaiting block commit from %v", ep): + default: + } + } + }(endpoint, done) } - return nil + + wg.Wait() + close(errChan) + + return <-errChan } func helperBenchReportMetrics(