Skip to content
1 change: 0 additions & 1 deletion cmd/subzero-ion-connect/subzero_ion_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ func init() {
if err := query.CommitEvents(ctx, events...); err != nil {
return errors.Wrapf(err, "failed to delete outdated replaced events")
}

if err := storage.AcceptEvents(ctx, events...); err != nil {
return errors.Wrapf(err, "storage.AcceptEvents failed: %s", model.Events(events).String())
}
Expand Down
5 changes: 3 additions & 2 deletions database/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"github.com/ice-blockchain/cometbft/config"
cmtlog "github.com/ice-blockchain/cometbft/libs/log"
"github.com/ice-blockchain/cometbft/multiplex/client"
"github.com/ice-blockchain/cometbft/multiplex/server"
"github.com/ice-blockchain/cometbft/multiplex/types"
"github.com/ice-blockchain/subzero/model"
)

type (
consensus struct {
Server server.Server
Server types.Backend
Client client.Client
Logger cmtlog.Logger
ServerConfig *config.Config
Expand Down Expand Up @@ -56,6 +56,7 @@ func (c *consensus) waitForStop(ctx context.Context) {
if c.Server != nil {
c.Logger.Debug("stopping consensus server")
err = c.Server.Stop()
c.Server.Wait()
c.Logger.Debug("stopped consensus server")
}
if ch != nil {
Expand Down
29 changes: 28 additions & 1 deletion database/command/command_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,44 @@ import (
"github.com/ice-blockchain/cometbft/config"
"github.com/ice-blockchain/cometbft/crypto/ed25519"
"github.com/ice-blockchain/cometbft/multiplex"
"github.com/ice-blockchain/cometbft/multiplex/runtime"
"github.com/ice-blockchain/cometbft/p2p"
)

type TestConsensus interface {
Consensus
DiscoveryPort() uint16
NodeID() string
Stop(ctx context.Context, timeout time.Duration) error
Start(ctx context.Context)
Wait() // Waits for quit channel.
}

func (c *consensus) DiscoveryPort() uint16 {
return c.Config.DiscoveryPort
}

func (c *consensus) NodeID() string {
nodeKeyFile := c.ServerConfig.NodeKeyFile()
nodeKey, err := p2p.LoadNodeKey(nodeKeyFile)
if err != nil {
panic(fmt.Errorf("failed to load node key file: %w", err))
}
return string(nodeKey.ID())
}

func (c *consensus) Start(ctx context.Context) {
if c.Server != nil {
panic("the server is already running")
}
c.ServerConfig.Instrumentation.Namespace = uuid.NewString()
server, err := multiplex.NewServer(ctx, c, c.ServerConfig, c.Logger)
server, err := multiplex.NewServer(ctx, c, c.ServerConfig, c.Logger,
multiplex.WithRuntimeManagerOptions(
runtime.RegistryWithConsensusOptions(
runtime.ConsensusPoolWithAcceptor(c),
),
),
)
if err != nil {
panic(errors.Wrapf(err, "failed to start consensus server"))
}
Expand All @@ -48,6 +66,14 @@ func (c *consensus) Start(ctx context.Context) {
go c.waitForStop(ctx)
}

func (c *consensus) Wait() {
if c.Server == nil {
return
}

c.Server.Wait()
}

func (c *consensus) Stop(ctx context.Context, timeout time.Duration) error {
ch := make(chan error, 1)

Expand Down Expand Up @@ -110,6 +136,7 @@ func newConsensusNode(ctx context.Context, nodeCfg *config.Config, port uint16,
DiscoveryPort: port,
AbsoluteRootPath: storage,
AbsoluteNodePrivateKeyPath: keyPath.Name(),
Enabled: true,
}),
}
if len(opts) > 0 {
Expand Down
28 changes: 17 additions & 11 deletions database/command/global.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"os"
"path/filepath"
"strings"
"sync"

"github.com/cockroachdb/errors"
Expand All @@ -16,6 +15,7 @@ import (
cmtlog "github.com/ice-blockchain/cometbft/libs/log"
"github.com/ice-blockchain/cometbft/multiplex"
"github.com/ice-blockchain/cometbft/multiplex/client"
"github.com/ice-blockchain/cometbft/multiplex/runtime"
"github.com/ice-blockchain/cometbft/p2p"
"github.com/ice-blockchain/subzero/cfg"
"github.com/ice-blockchain/subzero/database/query"
Expand Down Expand Up @@ -62,6 +62,7 @@ type Config struct {
RelayUrl string `yaml:"relay-url"`
DiscoveryPort uint16 `yaml:"discovery-port"`
Debug bool `yaml:"debug"`
Enabled bool `yaml:"enabled"`
}

type Option func(*consensus)
Expand Down Expand Up @@ -100,16 +101,21 @@ func WithClient(client client.Client) Option {
}
}

func Enabled() bool {
conf := cfg.MustGet[Config]()
disabled = !conf.Enabled
return !disabled
}

func MustInit(ctx context.Context, opts ...Option) {
conf := cfg.MustGet[Config]()
if disabled || strings.Contains(conf.RelayUrl, ".testnet.") || (conf.RelayUrl == "" && conf.AbsoluteRootPath == "" && conf.DiscoveryPort == 0) {
disabled = true
disabled = !conf.Enabled
if disabled {
return
}
globalConsensus.Once.Do(func() {
globalConsensus.Consensus = mustInit(ctx, config.DefaultConfig(), opts...)
})

}

func mustInit(ctx context.Context, serverCfg *config.Config, opts ...Option) *consensus {
Expand All @@ -129,12 +135,6 @@ func mustInit(ctx context.Context, serverCfg *config.Config, opts ...Option) *co

serverCfg.SetRoot(c.Config.AbsoluteRootPath)
serverCfg.NodeKey = c.Config.AbsoluteNodePrivateKeyPath
serverCfg.MultiplexConfig = config.MultiplexBaseConfig(
map[string]string{},
map[string][]string{},
).MultiplexConfig
serverCfg.P2P.MaxPacketMsgPayloadSize = 1 * 1024 * 1024
serverCfg.DBBackend = "goleveldb"
serverCfg.DiscoveryPort = c.Config.DiscoveryPort
if c.Config.ExternalAddress != "" {
serverCfg.P2P.ExternalAddress = c.Config.ExternalAddress
Expand All @@ -159,7 +159,13 @@ func mustInit(ctx context.Context, serverCfg *config.Config, opts ...Option) *co
if err != nil {
panic(errors.Wrapf(err, "failed to generate consensus node key"))
}
cometbftServer, err := multiplex.NewServer(ctx, c, serverCfg, c.Logger)
cometbftServer, err := multiplex.NewServer(ctx, c, serverCfg, c.Logger,
multiplex.WithRuntimeManagerOptions(
runtime.RegistryWithConsensusOptions(
runtime.ConsensusPoolWithAcceptor(c),
),
),
)
if err != nil {
panic(errors.Wrapf(err, "failed to start consensus server"))
}
Expand Down
87 changes: 87 additions & 0 deletions database/query/ddl/00020_enable_cometbft_trigger.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
-- SPDX-License-Identifier: ice License 1.0

CREATE OR REPLACE TRIGGER trigger_events_store_replaceable_data_before_update
AFTER UPDATE ON events
FOR EACH ROW
WHEN (((10000 <= old.kind AND old.kind < 20000 ) OR old.kind = 0 OR old.kind = 3 OR (30000 <= old.kind AND old.kind < 40000)) AND old.id != new.id)
EXECUTE FUNCTION events_store_replaceable_data_before_update();
--------
CREATE OR REPLACE FUNCTION events_store_replaceable_data_before_update()
RETURNS TRIGGER AS $$
BEGIN
IF NEW.reference_id = NEW.id THEN
NEW.reference_id = NULL;
RETURN NEW; -- dont insert into replaceable_events_before_update, its replay
END IF;
insert into replaceable_events_before_update (
created_at,
lookup_created_at,
expiration,
kind,
lookup,
key_alg,
content,
d_tag,
h_tag,
address,
id,
system_id,
pubkey,
master_pubkey,
sig,
sig_alg,
reference_id,
tags,
t_tags,
gift_receiver_pubkey,
has_images,
has_videos,
deleted,
is_reply,
is_root_reply,
is_quote,
has_ephemeral_attestation,
has_references,
hidden,
verified,
lang,
replaced_by_id
)
values (
old.created_at,
old.lookup_created_at,
old.expiration,
old.kind,
old.lookup,
old.key_alg,
old.content,
old.d_tag,
old.h_tag,
old.address,
old.id,
old.system_id,
old.pubkey,
old.master_pubkey,
old.sig,
old.sig_alg,
old.reference_id,
old.tags,
old.t_tags,
old.gift_receiver_pubkey,
old.has_images,
old.has_videos,
old.deleted,
old.is_reply,
old.is_root_reply,
old.is_quote,
old.has_ephemeral_attestation,
old.has_references,
old.hidden,
old.verified,
old.lang,
new.id
)
ON CONFLICT DO NOTHING;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
1 change: 0 additions & 1 deletion database/query/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ func TestReplaceableEvents(t *testing.T) {
require.Equal(t, ev3.Event, stored[0].Event, "event 3")
require.Equal(t, ev2.Event, stored[1].Event, "event 2")
t.Run("rollback with command", func(t *testing.T) {
t.Skip("re-enable trigger_events_store_replaceable_data_before_update to work")
// Rollback
require.NoError(t, db.RollbackEvents(t.Context(), ev2, ev3))
stored = helperSelectEvents(t, db, model.Filter{
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ require (
github.com/google/uuid v1.6.0
github.com/gorilla/websocket v1.5.3
github.com/hashicorp/go-multierror v1.1.1
github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20250714150930-6fb404dc7d83
github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20251211081312-072c958a5404
github.com/ice-blockchain/go/src v0.0.0-20250625091122-356c0c7d1830
github.com/imroc/req/v3 v3.57.0
github.com/jackc/pgerrcode v0.0.0-20250907135507-afb5586c32a6
Expand Down Expand Up @@ -87,7 +87,7 @@ require (
go.uber.org/goleak v1.3.0
golang.org/x/net v0.48.0
golang.org/x/sync v0.19.0
google.golang.org/api v0.257.0
google.golang.org/api v0.258.0
google.golang.org/grpc v1.77.0
)

Expand Down Expand Up @@ -195,15 +195,15 @@ require (
github.com/google/s2a-go v0.1.9 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
github.com/googleapis/gax-go/v2 v2.16.0 // indirect
github.com/gookit/color v1.6.0 // indirect
github.com/gorilla/css v1.0.1 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
github.com/huandu/xstrings v1.5.0 // indirect
github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20250714150930-6fb404dc7d83 // indirect
github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20251211081312-072c958a5404 // indirect
github.com/icholy/digest v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/enterprise-certificate-proxy v0.3.7 h1:zrn2Ee/nWmHulBx5sAVrGgAa0f2/R35S4DJwfFaUPFQ=
github.com/googleapis/enterprise-certificate-proxy v0.3.7/go.mod h1:MkHOF77EYAE7qfSuSS9PU6g4Nt4e11cnsDUowfwewLA=
github.com/googleapis/gax-go/v2 v2.15.0 h1:SyjDc1mGgZU5LncH8gimWo9lW1DtIfPibOG81vgd/bo=
github.com/googleapis/gax-go/v2 v2.15.0/go.mod h1:zVVkkxAQHa1RQpg9z2AUCMnKhi0Qld9rcmyfL1OZhoc=
github.com/googleapis/gax-go/v2 v2.16.0 h1:iHbQmKLLZrexmb0OSsNGTeSTS0HO4YvFOG8g5E4Zd0Y=
github.com/googleapis/gax-go/v2 v2.16.0/go.mod h1:o1vfQjjNZn4+dPnRdl/4ZD7S9414Y4xA+a/6Icj6l14=
github.com/gookit/assert v0.1.1 h1:lh3GcawXe/p+cU7ESTZ5Ui3Sm/x8JWpIis4/1aF0mY0=
github.com/gookit/assert v0.1.1/go.mod h1:jS5bmIVQZTIwk42uXl4lyj4iaaxx32tqH16CFj0VX2E=
github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQHCoQ=
Expand Down Expand Up @@ -415,10 +415,10 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO
github.com/huandu/xstrings v1.5.0 h1:2ag3IFq9ZDANvthTwTiqSSZLjDc+BedvHPAp5tJy2TI=
github.com/huandu/xstrings v1.5.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20250714150930-6fb404dc7d83 h1:Z5y2thskt5DyCxnPx+Lmed7N/r3qEebBqkFhQuSrF38=
github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20250714150930-6fb404dc7d83/go.mod h1:j0GCjFgUZ4JtaNHQs5FFOaDReK0+Mss/YfwmYMEI6V4=
github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20250714150930-6fb404dc7d83 h1:hyiw8EykSG2nFcRVAZMCGDgS0jxcfIPgtGcSPJE8Poo=
github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20250714150930-6fb404dc7d83/go.mod h1:2CYWOH6Lxdb1DeMpvCXsArRzLV1CwTj781meNNiAu6I=
github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20251211081312-072c958a5404 h1:JszqKwX/dEaEdNqQp53Cjz9Lu/CluUh16QOf5VgweFQ=
github.com/ice-blockchain/cometbft v1.0.0-rc1.0.20251211081312-072c958a5404/go.mod h1:j0GCjFgUZ4JtaNHQs5FFOaDReK0+Mss/YfwmYMEI6V4=
github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20251211081312-072c958a5404 h1:erZwmyNagwLX2GksrWLq/kme4FAYLQfvQ8bvURHITjE=
github.com/ice-blockchain/cometbft/api v1.0.0-rc.1.0.20251211081312-072c958a5404/go.mod h1:2CYWOH6Lxdb1DeMpvCXsArRzLV1CwTj781meNNiAu6I=
github.com/ice-blockchain/go-nostr v0.42.3-ion.0.20251001102109-e2a999a12f4d h1:B/9bWYRiQjy84xAf8GKv/Y12PAd2Mieh7ikUKF8Hj68=
github.com/ice-blockchain/go-nostr v0.42.3-ion.0.20251001102109-e2a999a12f4d/go.mod h1:c7xUEi3rkXfSVPveupRW3AP71t9JJwVzrepJjfw1wZQ=
github.com/ice-blockchain/go/src v0.0.0-20250625091122-356c0c7d1830 h1:FymrVSpOD6rKyZ97MJr5Xu2ZP29EqTKpdMqTf2DrMgU=
Expand Down Expand Up @@ -1015,8 +1015,8 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk=
gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E=
google.golang.org/api v0.257.0 h1:8Y0lzvHlZps53PEaw+G29SsQIkuKrumGWs9puiexNAA=
google.golang.org/api v0.257.0/go.mod h1:4eJrr+vbVaZSqs7vovFd1Jb/A6ml6iw2e6FBYf3GAO4=
google.golang.org/api v0.258.0 h1:IKo1j5FBlN74fe5isA2PVozN3Y5pwNKriEgAXPOkDAc=
google.golang.org/api v0.258.0/go.mod h1:qhOMTQEZ6lUps63ZNq9jhODswwjkjYYguA7fA3TBFww=
google.golang.org/appengine/v2 v2.0.6 h1:LvPZLGuchSBslPBp+LAhihBeGSiRh1myRoYK4NtuBIw=
google.golang.org/appengine/v2 v2.0.6/go.mod h1:WoEXGoXNfa0mLvaH5sV3ZSGXwVmy8yf7Z1JKf3J3wLI=
google.golang.org/genproto v0.0.0-20251213004720-97cd9d5aeac2 h1:stRtB2UVzFOWnorVuwF0BVVEjQ3AN6SjHWdg811UIQM=
Expand Down
4 changes: 4 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (r *router) MustListenAndServe(ctx context.Context) {
}

func (r *router) BroadcastUserEvents(ctx context.Context, events ...*model.Event) error {
if command.Enabled() {
log.Debug().Msg("skipping broadcasting user events because consensus enabled")
return nil
}
return r.Broadcaster.Broadcast(ctx, events...)
}

Expand Down
Loading
Loading