Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ Dockerfile
*.yml
var
hermer-cluster*
hermes.secret
*.secret
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ hermes.json
var
hermes.db*
hermes-cluster*
hermes.secret
*.secret
video.md
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ ADD etc/litefs.yml /etc/litefs.yml
# Ensure our mount & data directories exists before mounting with LiteFS.
RUN mkdir -p /var/lib/litefs /mnt/litefs

ENTRYPOINT ["litefs", "mount"]
ENTRYPOINT ["hermes", "run"]
2 changes: 1 addition & 1 deletion api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (s Server) Handler() http.Handler {
r.Use(mw.Tracing(s.provider))
r.Use(mw.WithRequestID)
r.Use(mw.Idempotency)
r.Use(mw.Caching(s.cache))
// r.Use(mw.Caching(s.cache))
r.Use(middleware.Recoverer)

r.Get("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down
6 changes: 6 additions & 0 deletions cmd/hermes/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func run(ctx context.Context, args []string) (err error) {
return runServe(ctx, args)
case "migrate":
return runMigrate(ctx, args)
case "run":
if err := runMigrate(ctx, args); err != nil {
return err
}
return runServe(ctx, args)
case "version":
fmt.Printf("version: %s, date: %s\n", build.Info().Version, build.Info().Date)
return
Expand All @@ -55,6 +60,7 @@ Usage:
Available Commands:
server start the hermes server
migrate run database migrations
run makes migrations and starts the server
version print the version of hermes
`[1:])
}
2 changes: 1 addition & 1 deletion cmd/hermes/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"go.opentelemetry.io/otel"
)

func runMigrate(ctx context.Context, args []string) (err error) {
func runMigrate(ctx context.Context, _ []string) (err error) {
config := newConfig()

provider, err := tracing.Provider(
Expand Down
70 changes: 34 additions & 36 deletions cmd/hermes/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,14 @@ import (
"github.com/go-chi/chi/v5"
"github.com/rugwirobaker/hermes"
"github.com/rugwirobaker/hermes/api"
"github.com/rugwirobaker/hermes/fly"
"github.com/rugwirobaker/hermes/observ"
"github.com/rugwirobaker/hermes/sqlite"
"github.com/rugwirobaker/hermes/tracing"
"go.opentelemetry.io/otel"
)

var (
cleanupInterval = 2 * time.Minute
retention = 2 * time.Hour
// cleanupInterval = 2 * time.Minute
// retention = 2 * time.Hour
)

type Server struct {
Expand All @@ -48,7 +46,7 @@ func (s *Server) Stop(ctx context.Context) error {
return s.Shutdown(ctx)
}

func runServe(ctx context.Context, args []string) (err error) {
func runServe(ctx context.Context, _ []string) (err error) {
signalCh := make(chan os.Signal, 2)
signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM)

Expand Down Expand Up @@ -100,7 +98,7 @@ func runServe(ctx context.Context, args []string) (err error) {

cache := hermes.NewIdempotencyKeyStore(db)

environment := fly.NewEnvironment()
// environment := fly.NewEnvironment()

log.Println("initialized hermes api")
api := api.New(service, events, apps, messages, cache, provider)
Expand All @@ -120,16 +118,16 @@ func runServe(ctx context.Context, args []string) (err error) {
}
}()

role, err := environment.GetNodeRole(ctx)
if err != nil {
log.Fatalf("could not get node role: %v", err)
}
// role, err := environment.GetNodeRole(ctx)
// if err != nil {
// log.Fatalf("could not get node role: %v", err)
// }

if role == "primary" {
log.Println("this is the primary node, starting cleanup routine")
// if role == "primary" {
// log.Println("this is the primary node, starting cleanup routine")

go startCleanupRoutine(ctx, db, cleanupInterval, retention)
}
// go startCleanupRoutine(ctx, db, cleanupInterval, retention)
// }

<-signalCh
log.Println("received signal, shutting down")
Expand All @@ -150,25 +148,25 @@ func runServe(ctx context.Context, args []string) (err error) {

}

func startCleanupRoutine(ctx context.Context, db *sqlite.DB, interval, ret time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()

ctx, span := observ.StartSpan(ctx, "cleanup")
defer span.End()

for {
select {
case <-ticker.C:
numDeleted, err := sqlite.DeleteOldRecords(ctx, db, ret)
if err != nil {
log.Printf("Error cleaning up old records: %v", err)
} else {
log.Printf("Deleted %d old records", numDeleted)
}
case <-ctx.Done():
log.Printf("Cleanup routine stopped")
return
}
}
}
// func startCleanupRoutine(ctx context.Context, db *sqlite.DB, interval, ret time.Duration) {
// ticker := time.NewTicker(interval)
// defer ticker.Stop()

// ctx, span := observ.StartSpan(ctx, "cleanup")
// defer span.End()

// for {
// select {
// case <-ticker.C:
// numDeleted, err := sqlite.DeleteOldRecords(ctx, db, ret)
// if err != nil {
// log.Printf("Error cleaning up old records: %v", err)
// } else {
// log.Printf("Deleted %d old records", numDeleted)
// }
// case <-ctx.Done():
// log.Printf("Cleanup routine stopped")
// return
// }
// }
// }
30 changes: 30 additions & 0 deletions config.hcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
app_name = "hermes"

app {

env = {
PRIMARY_REGION = "lhr"
DATABASE_URL = "hermes.db"
ENVIRONMENT = "staging"
PORT = "8080"
HERMES_SENDER_IDENTITY="hermes"
HONEYCOMB_DSN="api.honeycomb.io"
}

port = 8080

compute {
cpu = 1
memory = 256
cpu_kind = "shared"
}

process {
name = "hermes"
}

storage {
name = "data"
destination = "/data"
}
}
14 changes: 7 additions & 7 deletions etc/litefs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ exec:
# These environment variables will be available in your Fly.io application.
# You must specify "experiement.enable_consul" for FLY_CONSUL_URL to be available.
lease:
type: "consul"
type: "static"
hostname: "${HOSTNAME}"
advertise-url: "http://${HOSTNAME}.vm.${FLY_APP_NAME}.internal:20202"
candidate: ${FLY_REGION == PRIMARY_REGION}
promote: true
consul:
key: "litefs/${FLY_APP_NAME}"
url: "${FLY_CONSUL_URL}"
advertise-url: "http://{ANDASY_APP}.internal:20202"
candidate: true
#promote: true
# consul:
# key: "litefs/${FLY_APP_NAME}"
# url: "${FLY_CONSUL_URL}"

backup:
type: "litefs-cloud"
Expand Down
48 changes: 24 additions & 24 deletions fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,38 @@ kill_timeout = 5
primary_region = "lhr"

[deploy]
strategy = "rolling"
strategy = "rolling"

[experimental]
auto_rollback = true
enable_consul = true
auto_rollback = true
enable_consul = true

[env]
PRIMARY_REGION = "lhr"
DATABASE_URL = "/var/lib/hermes/state.db"
ENVIRONMENT = "test"
PORT = "8081"
PRIMARY_REGION = "lhr"
DATABASE_URL = "/var/lib/hermes/state.db"
ENVIRONMENT = "test"
PORT = "8081"

[[mounts]]
source = "data_machines"
destination = "/data"
source = "data_machines"
destination = "/data"


[http_service]
internal_port = 8080
force_https = true
auto_stop_machines = false
auto_start_machines = false
[http_service.concurrency]
type = "requests"
soft_limit = 200
hard_limit = 250
internal_port = 8080
force_https = true
auto_stop_machines = "off"
auto_start_machines = false
[http_service.concurrency]
type = "requests"
soft_limit = 200
hard_limit = 250

[checks]
[checks.healthz]
type = "tcp"
grace_period = "4s"
interval = "8s"
port = 8080
timeout = "10s"
restart_limit = 3
[checks.healthz]
type = "tcp"
grace_period = "4s"
interval = "8s"
port = 8080
timeout = "10s"
restart_limit = 3
4 changes: 4 additions & 0 deletions sqlite/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"errors"
"fmt"
"log"
"os"
"path/filepath"

Expand Down Expand Up @@ -38,6 +39,9 @@ func NewDB(dsn, server string, provider trace.TracerProvider) (*DB, error) {
if err != nil {
return nil, err
}

log.Println("dsn:", dsn)

return &DB{db, dsn}, nil
}

Expand Down
8 changes: 8 additions & 0 deletions sqlite/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ func (db *DB) Migrate(dir Direction) (int, error) {
createTableIdempotencyKeys,
},
},
{
Id: "7",
Up: []string{
insertHermesApp,
},
},
},
}

Expand Down Expand Up @@ -159,3 +165,5 @@ var createTableIdempotencyKeys = `CREATE TABLE IF NOT EXISTS idempotency_keys (
path TEXT NOT NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);`

var insertHermesApp = `INSERT INTO apps (name, token, sender) VALUES ('hermes', 'hermes_token', 'Hermes');`
4 changes: 2 additions & 2 deletions tracing/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
)

// DBTraceDriver driver will register a new tracing sql driver and return the driver name.
func DBTraceDriver(tp trace.TracerProvider, driver, dns, service string) (string, error) {
func DBTraceDriver(tp trace.TracerProvider, driver, dsn, service string) (string, error) {

if service == "" {
service = driver + "-db-service"
Expand All @@ -20,7 +20,7 @@ func DBTraceDriver(tp trace.TracerProvider, driver, dns, service string) (string
// Register the otelsql wrapper for the provided database driver.
driverName, err := otelsql.RegisterWithSource(
driver,
dns,
dsn,
otelsql.WithDefaultAttributes(
semconv.ServiceNameKey.String(service),
),
Expand Down
Loading