Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 31 additions & 28 deletions cmd/subzero-ion-connect/subzero_ion_connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/nbd-wtf/go-nostr"
"github.com/panjf2000/ants/v2"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
zlog "github.com/rs/zerolog/log"
"github.com/spf13/cobra"

"github.com/ice-blockchain/subzero/appcontext"
Expand All @@ -26,8 +26,10 @@ import (
"github.com/ice-blockchain/subzero/dvm"
followerssender "github.com/ice-blockchain/subzero/followers-sender"
hashtagssender "github.com/ice-blockchain/subzero/hashtags-sender"
"github.com/ice-blockchain/subzero/log"
"github.com/ice-blockchain/subzero/model"
nftcontentsender "github.com/ice-blockchain/subzero/nft-content-sender"
opentelemetry "github.com/ice-blockchain/subzero/open-telemetry"
pushnotifications "github.com/ice-blockchain/subzero/push-notifications"
"github.com/ice-blockchain/subzero/server"
wsserver "github.com/ice-blockchain/subzero/server/ws"
Expand All @@ -38,6 +40,7 @@ import (
type (
Config struct {
LogLevel string `yaml:"log-level" validate:"omitempty,oneof=trace debug info warn error fatal panic"`
Debug bool `yaml:"debug"`
}
)

Expand All @@ -58,12 +61,14 @@ func logInit() {
}

zerolog.SetGlobalLevel(level)
log.Logger = log.Output(zerolog.ConsoleWriter{
Out: os.Stdout,
TimeFormat: time.RFC3339Nano,
TimeLocation: time.UTC,
NoColor: true,
})
if config.Debug {
zlog.Logger = zlog.Output(zerolog.ConsoleWriter{
Out: os.Stdout,
TimeFormat: time.RFC3339Nano,
TimeLocation: time.UTC,
NoColor: true,
})
}
}

var (
Expand All @@ -77,6 +82,7 @@ var (
Run: func(cmd *cobra.Command, _ []string) {
cfg.MustInit(configPath)
logInit()
opentelemetry.MustInit(cmd.Context())
validation.MustInit(cmd.Context())
query.MustInit(cmd.Context())
command.MustInit(cmd.Context())
Expand Down Expand Up @@ -169,10 +175,8 @@ func init() {

antsPool.Submit(func() {
if err := webserver.BroadcastUserEvents(context.WithoutCancel(ctx), events...); err != nil {
log.Error().Str("context", "MAIN").
Err(err).
Str("events", model.Events(events).String()).
Msg("failed to webserver.BroadcastUserEvents")
log.Error(ctx, errors.Wrapf(err, "failed to webserver.BroadcastUserEvents"), "context", "MAIN",
"events", model.Events(events).String())
}
})

Expand All @@ -184,7 +188,7 @@ func init() {
}
})
} else if err != nil {
log.Error().Str("context", "MAIN").Err(err).Str("event_id", events[0].ID).Msg("dvm failed to accept job for event")
log.Error(ctx, errors.Wrapf(err, "dvm failed to accept job for event"), "context", "MAIN", "event_id", events[0].ID)
}

if err := command.AcceptEvents(ctx, events...); err != nil {
Expand All @@ -197,31 +201,31 @@ func init() {

antsPool.Submit(func() {
if err := pushnotifications.AcceptEvents(ctx, events...); err != nil {
log.Error().Err(err).Str("events", model.Events(events).String()).Msg("failed to pushnotifications.AcceptEvents")
log.Error(ctx, errors.Wrapf(err, "failed to pushnotifications.AcceptEvents"), "events", model.Events(events).String())
}
})

antsPool.Submit(func() {
if err := storage.ReplicateFileOnPeers(ctx, events...); err != nil {
log.Error().Err(err).Str("events", model.Events(events).String()).Msg("failed to storage.ReplicateFileOnPeers")
log.Error(ctx, errors.Wrapf(err, "failed to storage.ReplicateFileOnPeers"), "events", model.Events(events).String())
}
})

antsPool.Submit(func() {
if err := hashtagssender.AcceptEvents(ctx, events...); err != nil {
log.Error().Err(err).Str("events", model.Events(events).String()).Msg("failed to hashtagssender.AcceptEvents")
log.Error(ctx, errors.Wrapf(err, "failed to hashtagssender.AcceptEvents"), "events", model.Events(events).String())
}
})

antsPool.Submit(func() {
if err := nftcontentsender.AcceptEvents(ctx, events...); err != nil {
log.Error().Err(err).Str("events", model.Events(events).String()).Msg("failed to nftcontentsender.AcceptEvents")
log.Error(ctx, errors.Wrapf(err, "failed to nftcontentsender.AcceptEvents"), "events", model.Events(events).String())
}
})

antsPool.Submit(func() {
if err := followerssender.AcceptEvents(ctx, events...); err != nil {
log.Error().Err(err).Str("events", model.Events(events).String()).Msg("failed to followerssender.AcceptEvents")
log.Error(ctx, errors.Wrapf(err, "failed to followerssender.AcceptEvents"), "events", model.Events(events).String())
}
})

Expand All @@ -235,16 +239,15 @@ func init() {
start := time.Now()
n := webserver.BroadcastNewEvents(context.WithoutCancel(ctx), events...)
end := time.Since(start)
log.Trace().
Int("event_count", len(events)).
Strs("event_ids", model.Events(events).IDs()).
Dur("duration", end).
Int("subscription_count", n).
Msg("broadcast events")
log.Trace(ctx, "broadcast events",
"event_count", len(events),
"event_ids", model.Events(events).IDs(),
"duration", end,
"subscription_count", n)
})
antsPool.Submit(func() {
if err := pushnotifications.AcceptEvents(ctx, events...); err != nil {
log.Error().Err(err).Str("events", model.Events(events).String()).Msg("failed to pushnotifications.AcceptEvents")
log.Error(ctx, errors.Wrapf(err, "failed to pushnotifications.AcceptEvents"), "events", model.Events(events).String())
}
})

Expand All @@ -261,10 +264,10 @@ func newContext() appcontext.WaitForShutdown {
force := false
for sig := range c {
if force {
log.Warn().Str("signal", sig.String()).Msg("force shutdown")
log.Warn(ctx, "force shutdown", "signal", sig.String())
os.Exit(2)
} else {
log.Info().Str("signal", sig.String()).Msg("graceful shutdown")
log.Info(ctx, "graceful shutdown", "signal", sig.String())
cancel()
force = true
}
Expand All @@ -279,12 +282,12 @@ func main() {
defer appcontext.GetAppContext(appCtx).Recover()
pool, err := ants.NewPool(10_000 * runtime.NumCPU())
if err != nil {
log.Panic().Err(err).Msg("failed to create ants pool")
log.Panic(errors.Wrapf(err, "failed to create ants pool"))
}
defer pool.Release()

antsPool = pool
if err := subzero.ExecuteContext(appCtx); err != nil {
if err = subzero.ExecuteContext(appCtx); err != nil {
log.Panic().Err(err)
}
appCtx.WaitForShutdown()
Expand Down
Loading
Loading