Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ linters:
- nilerr
- nilnil

# Forces newlines in some places.
- nlreturn

# Finds sending HTTP request without context.Context.
- noctx
Expand Down Expand Up @@ -226,6 +224,7 @@ linters:
# I'm fine to check the error from json.Marshal
- errchkjson


# All SQL queries MUST BE covered with tests.
- execinquery

Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// TODO: add commands for build and run in dev/produciton mode

ROOT=$(realpath $(dir $(lastword $(MAKEFILE_LIST))))
export ROOT=$(realpath $(dir $(lastword $(MAKEFILE_LIST))))

OS := $(shell uname -s)

Expand Down Expand Up @@ -35,6 +35,7 @@ ifneq (,$(findstring NT,$(OS)))
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/internalevent/internalevent.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/project/project.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/source/source.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/destination/destination.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user.proto
protoc --go-grpc_out=contract/go/ --go-grpc_opt=paths=source_relative --go_out=contract/go --go_opt=paths=source_relative --proto_path=./contract/protobuf/ contract/protobuf/user/user_info.proto
else
Expand Down
1 change: 1 addition & 0 deletions adapter/scylladb/scyllainitialize/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func CreateKeySpace(consistency gocql.Consistency, keyspace string, hosts ...str
func RunMigrations(dbConn *ScyllaDBConnection, dir string) error {
logger.L().Debug("running migrations...")
for _, host := range dbConn.hosts {
logger.L().Info(dir)
migration := New(dir, host, dbConn.keyspace)
err := migration.Run()
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions adapter/scylladb/sessionx.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ type SessionxInterface interface {
ExecStmt(stmt string) error
AwaitSchemaAgreement(ctx context.Context) error
Close()
NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch
ExecuteBatch(batch *gocql.Batch) error
}

type Session struct {
Expand All @@ -43,6 +45,16 @@ func (s *Session) Close() {
s.S.Close()
}

func (s *Session) NewBatch(ctx context.Context, batchType gocql.BatchType) *gocql.Batch {
batch := s.S.NewBatch(batchType)

return batch.WithContext(ctx)
}

func (s *Session) ExecuteBatch(batch *gocql.Batch) error {
return s.S.ExecuteBatch(batch)
}

func NewSession(session *gocql.Session) SessionxInterface {
gocqlxSession := gocqlx.NewSession(session)

Expand Down
52 changes: 52 additions & 0 deletions cmd/source/faker/undeliveredevents_publisher/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package main

import (
"sync"

"github.com/ormushq/ormus/adapter/otela"
"github.com/ormushq/ormus/config"
"github.com/ormushq/ormus/contract/go/destination"
"github.com/ormushq/ormus/pkg/channel"
"github.com/ormushq/ormus/pkg/channel/adapter/rabbitmqchannel"
"github.com/ormushq/ormus/pkg/encoder"
)

func main() {
cfg := config.C()
done := make(chan bool)
wg := &sync.WaitGroup{}
bufferSize := cfg.Source.BufferSize
maxRetryPolicy := cfg.Source.MaxRetry
testCount := 1

err := otela.Configure(wg, done, otela.Config{Exporter: otela.ExporterConsole})
if err != nil {
panic(err.Error())
}
inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq)
err = inputAdapter.NewChannel(cfg.Source.UndeliveredEventsQueueName, channel.InputOnlyMode, bufferSize, maxRetryPolicy)
if err != nil {
panic(err.Error())
}
inputChannel, err := inputAdapter.GetInputChannel(cfg.Source.UndeliveredEventsQueueName)
if err != nil {
panic(err.Error())
}

wg.Add(1)
go func() {
defer wg.Done()
for messageID := 0; messageID < testCount; messageID++ {
msg := encoder.EncodeProcessedEvent(&destination.DeliveredEventsList{
Events: []*destination.DeliveredEvent{
{
MessageId: "d5aacd53-f866-4406-8e2f-d6f1dbc96975",
},
},
})
inputChannel <- []byte(msg)

}
}()
wg.Wait()
}
18 changes: 12 additions & 6 deletions cmd/source/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ func main() {
}

cfg := config.C()
_, Consumer, eventSvc, eventValidator := SetupSourceServices(cfg)
Consumer.Consume(context.Background(), cfg.Source.NewSourceEventName, done, wg, Consumer.ProcessNewSourceEvent)

_, consumer, eventSvc, eventValidator := SetupSourceServices(cfg)
consumer.Consume(context.Background(), cfg.Source.NewSourceEventName, done, wg, consumer.ProcessNewSourceEvent)
consumer.Consume(context.Background(), cfg.Source.UndeliveredEventsQueueName, done, wg, consumer.EventHasDeliveredToDestination)
//----------------- Setup Tracer -----------------//
otelcfg := otela.Config{
Endpoint: config.C().Source.Otel.Endpoint,
Expand Down Expand Up @@ -108,7 +108,7 @@ func main() {
wg.Wait()
}

func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, eventHandler sourceevent.Consumer, eventSvc eventsvc.Service, eventValidator eventvalidator.Validator) {
func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, consumer sourceevent.Consumer, eventSvc eventsvc.Service, eventValidator eventvalidator.Validator) {
done := make(chan bool)
wg := &sync.WaitGroup{}

Expand All @@ -118,6 +118,11 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event
panic(err)
}

err = outputAdapter.NewChannel(cfg.Source.UndeliveredEventsQueueName, channel.OutputOnly, cfg.Source.BufferSize, cfg.Source.MaxRetry)
if err != nil {
panic(err)
}

inputAdapter := rabbitmqchannel.New(done, wg, cfg.RabbitMq)
err = inputAdapter.NewChannel(cfg.Source.NewEventQueueName, channel.InputOnlyMode, cfg.Source.BufferSize, cfg.Source.MaxRetry)
if err != nil {
Expand All @@ -135,7 +140,6 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event

writeKeyRepo := writekeyrepo.New(redisAdapter, *ManagerAdapter)
writeKeySvc = writekey.New(&writeKeyRepo, cfg.Source)
eventHandler = *sourceevent.NewConsumer(outputAdapter, writeKeySvc)

DB, err := scylladb.New(cfg.Source.ScyllaDBConfig)
if err != nil {
Expand All @@ -146,5 +150,7 @@ func SetupSourceServices(cfg config.Config) (writeKeySvc writekey.Service, event

eventValidator = eventvalidator.New(&writeKeyRepo, cfg.Source)

return writeKeySvc, eventHandler, eventSvc, eventValidator
consumer = *sourceevent.NewConsumer(outputAdapter, writeKeySvc, eventSvc, cfg.Source.RetryNumber)

return writeKeySvc, consumer, eventSvc, eventValidator
}
2 changes: 2 additions & 0 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ source:
port: 8082
network: "tcp"
write_key_validation_address: "127.0.0.1:8081"
undelivered_events_queue_name: "undelivered_events"
new_event_queue_name: "new-event-received"
write_key_expiration: 120
undelivered_event_retransmit_period: 1
retry_number: 1
new_source_event_name: "new-source-event"
buffersize: 100
number_instants: 10
Expand Down
184 changes: 184 additions & 0 deletions contract/go/destination/destination.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions contract/protobuf/destination/destination.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";

package destination;
option go_package = "github.com/ormushq/ormus/contract/go/destination";

import "google/protobuf/timestamp.proto";

message DeliveredEvent {
string message_id = 1;
}

message DeliveredEventsList {
repeated DeliveredEvent events = 1;
}
Loading
Loading