diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..c297841 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,17 @@ +name: lint +on: + - pull_request +jobs: + lint: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: ">=1.18.0" + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + version: latest diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..29749fa --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,15 @@ +name: test +on: + - pull_request +jobs: + test: + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + - name: Set up Go + uses: actions/setup-go@v3 + with: + go-version: ">=1.18.0" + - name: Run tests + run: make test diff --git a/.golangci.yml b/.golangci.yml new file mode 100644 index 0000000..e2a88ea --- /dev/null +++ b/.golangci.yml @@ -0,0 +1,62 @@ +run: + timeout: 2m + +linters: + enable: + - asciicheck + - bodyclose + - contextcheck + - deadcode + - dogsled + - durationcheck + - dupl + - errcheck + - errchkjson + - errname + - errorlint + - exhaustive + - exportloopref + - gocognit + - gocritic + - gofumpt + - goimports + - gomnd + - gomoddirectives + - gosec + - gosimple + - govet + - ifshort + - ineffassign + - importas + - misspell + - nilerr + - noctx + - prealloc + - predeclared + - revive + - staticcheck + - thelper + - tparallel + - unconvert + - unparam + - unused + - varcheck + - whitespace + - wrapcheck + +linters-settings: + unparam: + check-exported: false + unused: + check-exported: false + gocognit: + min-complexity: 20 + wrapcheck: + ignoreSigs: + - .Errorf( + - errors.New( + - status.Error( + - retry.Recoverable( + - retry.Unrecoverable( + +modules-download-mode: readonly diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..95e41e0 --- /dev/null +++ b/Makefile @@ -0,0 +1,40 @@ +.PHONY: proto +proto: + cd api/proto && \ + protoc \ + --go_opt=Mmux.proto=github.com/bbralion/CTFloodBot/internal/genproto \ + --go-grpc_opt=Mmux.proto=github.com/bbralion/CTFloodBot/internal/genproto \ + --go_opt=paths=source_relative \ + --go-grpc_opt=paths=source_relative \ + --go_out=../../internal/genproto \ + --go-grpc_out=../../internal/genproto \ + mux.proto + +.PHONY: mocks +mocks: + mockgen \ + -package=mocks \ + -destination=internal/mocks/mux_mock.go \ + github.com/bbralion/CTFloodBot/internal/genproto MultiplexerServiceClient,MultiplexerService_RegisterHandlerClient + mockgen \ + -package mocks \ + -destination=internal/mocks/tgbotapi_mock.go \ + --mock_names HttpClient=MockTGBotAPIHTTPClient \ + github.com/go-telegram-bot-api/telegram-bot-api HttpClient + +.PHONY: test +test: + go test -v -race -count=1 ./... + +.PHONY: cover +cover: + go test -race -count=1 -covermode=atomic -coverprofile cover.tmp.out -coverpkg=./... -v ./... && \ + grep -v 'genproto\|mocks' cover.tmp.out > cover.out + go tool cover -func cover.out && \ + go tool cover -html cover.out -o cover.html && \ + open cover.html && sleep 1 && \ + rm -f cover.tmp.out cover.out cover.html + +.PHONY: lint +lint: + golangci-lint run -v \ No newline at end of file diff --git a/api/proto/mux.proto b/api/proto/mux.proto new file mode 100644 index 0000000..7a45297 --- /dev/null +++ b/api/proto/mux.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; +package mux; + +// Config specifies the information clients require to connect to the proxy. +message Config { + string proxy_endpoint = 1; +} + +// Update is a single update received by the proxy, passed as the actual stringified update object. +message Update { + string json = 1; +} + +message ConfigRequest {} + +message ConfigResponse { + Config config = 1; +} + +message RegisterRequest { + repeated string matchers = 1; +} + +service MultiplexerService { + rpc GetConfig(ConfigRequest) returns (ConfigResponse); + rpc RegisterHandler(RegisterRequest) returns (stream Update); +} \ No newline at end of file diff --git a/cmd/aboba-handler/main.go b/cmd/aboba-handler/main.go deleted file mode 100644 index cbc3ad0..0000000 --- a/cmd/aboba-handler/main.go +++ /dev/null @@ -1,43 +0,0 @@ -package main - -import ( - telegramapi "github.com/go-telegram-bot-api/telegram-bot-api" - "github.com/jinzhu/configor" - "github.com/kbats183/CTFloodBot/pkg/core" - "github.com/kbats183/CTFloodBot/pkg/handlers" - "github.com/kbats183/CTFloodBot/pkg/utils" - "go.uber.org/zap" -) - -var config handlers.HandlerConfig - -const ( - TelegramTextCommand = "aboba" - TelegramTextAnswer = "abobus" -) - -func main() { - logger := utils.GetLogger() - - err := configor.Load(&config, "config_handler.yaml") - if err != nil { - logger.Fatal("Failed to parse app config", zap.Error(err)) - } - - handler := handlers.SimpleHandler{ - Handler: func(logger *zap.Logger, update *telegramapi.Update, answerChan handlers.AnswerChan) { - message := update.Message - if message == nil { - return - } - if message.Text == TelegramTextCommand { - msg := telegramapi.NewMessage(message.Chat.ID, TelegramTextAnswer) - msg.ReplyToMessageID = message.MessageID - answerChan <- core.HandlerAnswer{Type: "message:message_config", MessageConfig: &msg} - } - }, - Logger: logger, - Config: config, - } - handler.Run() -} diff --git a/cmd/core/main.go b/cmd/core/main.go deleted file mode 100644 index d79abce..0000000 --- a/cmd/core/main.go +++ /dev/null @@ -1,76 +0,0 @@ -package main - -import ( - "context" - "encoding/json" - telegramapi "github.com/go-telegram-bot-api/telegram-bot-api" - "github.com/jinzhu/configor" - "github.com/kbats183/CTFloodBot/pkg/core" - "github.com/kbats183/CTFloodBot/pkg/utils" - "go.uber.org/zap" -) - -var config core.BotCoreConfig - -func main() { - logger := utils.GetLogger() - ctx := context.Background() - - err := configor.Load(&config, "config_core.yaml") - if err != nil { - logger.Fatal("Failed to parse app config", zap.Error(err)) - } - - botAPI, err := telegramapi.NewBotAPI(config.TelegramToken) - if err != nil { - logger.Fatal("Failed to create telegram api", zap.Error(err)) - } - - redisClient := core.GetRedisClientByConfig(config.Redis) - - tgUpdatesChan, err := botAPI.GetUpdatesChan(telegramapi.NewUpdate(0)) - if err != nil { - logger.Fatal("Failed to get telegram updates chanel", zap.Error(err)) - } - - answerSubscriber := redisClient.Subscribe(ctx, core.RedisAnswersChanel) - answerChan := answerSubscriber.Channel() - for { - select { - case update := <-tgUpdatesChan: - logger.Info("New update", zap.String("update_type", utils.GetTelegramUpdateType(&update))) - byteMessage, err := json.Marshal(update) - if err != nil { - logger.Error("Failed to marshal telegram update", zap.Error(err)) - continue - } - - command := redisClient.Publish(ctx, core.RedisUpdateChanel, byteMessage) - if command.Err() != nil { - logger.Error("Failed to send telegram update in redis", zap.Error(err)) - } - - case answerSubscriber := <-answerChan: - message := answerSubscriber.Payload - var answer core.HandlerAnswer - err := json.Unmarshal([]byte(message), &answer) - if err != nil { - logger.Error("Failed to unmarshal handler answer", zap.Error(err), zap.String("answer", message)) - continue - } - var telegramSend telegramapi.Chattable - switch answer.Type { - case "message:message_config": - telegramSend = answer.MessageConfig - case "message:sticker_config": - telegramSend = answer.StickerConfig - } - if telegramSend != nil { - _, err = botAPI.Send(telegramSend) - if err != nil { - logger.Error("Failed to send in telegram", zap.String("type", answer.Type), zap.Error(err)) - } - } - } - } -} diff --git a/cmd/debug-handler/main.go b/cmd/debug-handler/main.go deleted file mode 100644 index 975d12f..0000000 --- a/cmd/debug-handler/main.go +++ /dev/null @@ -1,29 +0,0 @@ -package main - -import ( - telegramapi "github.com/go-telegram-bot-api/telegram-bot-api" - "github.com/jinzhu/configor" - "github.com/kbats183/CTFloodBot/pkg/handlers" - "github.com/kbats183/CTFloodBot/pkg/utils" - "go.uber.org/zap" -) - -var config handlers.HandlerConfig - -func main() { - logger := utils.GetLogger() - - err := configor.Load(&config, "config_handler.yaml") - if err != nil { - logger.Fatal("Failed to parse app config", zap.Error(err)) - } - - handler := handlers.SimpleHandler{ - Handler: func(logger *zap.Logger, update *telegramapi.Update, _ handlers.AnswerChan) { - logger.Info("Received update", zap.Any("update", update)) - }, - Logger: logger, - Config: config, - } - handler.Run() -} diff --git a/cmd/shake-cat-handler/main.go b/cmd/shake-cat-handler/main.go deleted file mode 100644 index 74b0468..0000000 --- a/cmd/shake-cat-handler/main.go +++ /dev/null @@ -1,43 +0,0 @@ -package main - -import ( - telegramapi "github.com/go-telegram-bot-api/telegram-bot-api" - "github.com/jinzhu/configor" - "github.com/kbats183/CTFloodBot/pkg/core" - "github.com/kbats183/CTFloodBot/pkg/handlers" - "github.com/kbats183/CTFloodBot/pkg/utils" - "go.uber.org/zap" - "strings" -) - -var config handlers.HandlerConfig - -const ( - TelegramTextCommand = "/shake_cat_stick" - StickerID = "CAACAgIAAxkBAAIBiGLfzvi09zcCIPcPc6pu4_GsC3nwAAJVHQACm9J4Sf-ATjduPn5eKQQ" -) - -func main() { - logger := utils.GetLogger() - - err := configor.Load(&config, "config_handler.yaml") - if err != nil { - logger.Fatal("Failed to parse app config", zap.Error(err)) - } - - handler := handlers.SimpleHandler{ - Handler: func(logger *zap.Logger, update *telegramapi.Update, answerChan handlers.AnswerChan) { - message := update.Message - if message == nil { - return - } - if strings.HasPrefix(message.Text, TelegramTextCommand) { - sticker := telegramapi.NewStickerShare(message.Chat.ID, StickerID) - answerChan <- core.HandlerAnswer{Type: "message:sticker_config", StickerConfig: &sticker} - } - }, - Logger: logger, - Config: config, - } - handler.Run() -} diff --git a/config_core.yaml b/config_core.yaml deleted file mode 100644 index 7427275..0000000 --- a/config_core.yaml +++ /dev/null @@ -1,4 +0,0 @@ -redis: - host: localhost:6379 - username: - password: aboba \ No newline at end of file diff --git a/config_handler.yaml b/config_handler.yaml deleted file mode 100644 index 7427275..0000000 --- a/config_handler.yaml +++ /dev/null @@ -1,4 +0,0 @@ -redis: - host: localhost:6379 - username: - password: aboba \ No newline at end of file diff --git a/docker/docker-compose-redis-only.yml b/docker/docker-compose-redis-only.yml deleted file mode 100644 index 02e4ada..0000000 --- a/docker/docker-compose-redis-only.yml +++ /dev/null @@ -1,13 +0,0 @@ -version: '3.8' -services: - cache: - image: redis:6.2-alpine - restart: always - ports: - - '6379:6379' - command: redis-server --save 20 1 --loglevel warning --requirepass aboba - volumes: - - cache:/data -volumes: - cache: - driver: local \ No newline at end of file diff --git a/go.mod b/go.mod index 5674edf..90e7cb8 100644 --- a/go.mod +++ b/go.mod @@ -1,20 +1,28 @@ -module github.com/kbats183/CTFloodBot +module github.com/bbralion/CTFloodBot go 1.18 require ( - github.com/go-redis/redis/v8 v8.11.5 + github.com/go-logr/logr v1.2.3 github.com/go-telegram-bot-api/telegram-bot-api v1.0.1-0.20201107014523-54104a08f947 - github.com/jinzhu/configor v1.2.1 - go.uber.org/zap v1.21.0 + github.com/golang/mock v1.6.0 + github.com/justinas/alice v1.2.0 + github.com/stretchr/testify v1.8.0 + go.uber.org/goleak v1.1.12 + google.golang.org/grpc v1.48.0 + google.golang.org/protobuf v1.28.1 ) require ( - github.com/BurntSushi/toml v0.3.1 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect github.com/technoweenie/multipartstreamer v1.0.1 // indirect - go.uber.org/atomic v1.7.0 // indirect - go.uber.org/multierr v1.6.0 // indirect - gopkg.in/yaml.v2 v2.4.0 // indirect + golang.org/x/net v0.0.0-20220728181054-f92ba40d432d // indirect + golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 // indirect + golang.org/x/text v0.3.7 // indirect + golang.org/x/tools v0.1.12 // indirect + golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f // indirect + google.golang.org/genproto v0.0.0-20220725144611-272f38e5d71b // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 566607e..6c27ec6 100644 --- a/go.sum +++ b/go.sum @@ -1,84 +1,180 @@ -github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= -github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= -github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= -github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= -github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= +github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-telegram-bot-api/telegram-bot-api v1.0.1-0.20201107014523-54104a08f947 h1:CguiLTREMSU5GMaHMlAUAVb2cT8M+IpZVhgRK1te6Ds= github.com/go-telegram-bot-api/telegram-bot-api v1.0.1-0.20201107014523-54104a08f947/go.mod h1:lDm2E64X4OjFdBUA4hlN4mEvbSitvhJdKw7rsA8KHgI= -github.com/jinzhu/configor v1.2.1 h1:OKk9dsR8i6HPOCZR8BcMtcEImAFjIhbJFZNyn5GCZko= -github.com/jinzhu/configor v1.2.1/go.mod h1:nX89/MOmDba7ZX7GCyU/VIaQ2Ar2aizBl2d3JLF/rDc= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/justinas/alice v1.2.0 h1:+MHSA/vccVCF4Uq37S42jwlkvI2Xzl7zTPCN5BnZNVo= +github.com/justinas/alice v1.2.0/go.mod h1:fN5HRH/reO/zrUflLfTN43t3vXvKzvZIENsNEe7i7qA= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= -github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= -github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/technoweenie/multipartstreamer v1.0.1 h1:XRztA5MXiR1TIRHxH2uNxXxaIkKQDeX7m2XsSOlQEnM= github.com/technoweenie/multipartstreamer v1.0.1/go.mod h1:jNVxdtShOxzAsukZwTSw6MDx5eUJoiEBsSvzDU9uzog= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= -go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= -go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de h1:5hukYrvBGR8/eNkX5mdUezrA6JiaEZDtJb9Ei+1LlBs= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/net v0.0.0-20220728181054-f92ba40d432d h1:3iMzhioG3w6/URLOo7X7eZRkWoLdz9iWE/UsnXHNTfY= +golang.org/x/net v0.0.0-20220728181054-f92ba40d432d/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10 h1:WIoqL4EROvwiPdUtaip4VcDdpZ4kha7wBWZrbVKCIZg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= +golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f h1:uF6paiQQebLeSXkrTqHqz0MXhXXS1KgF41eUdBNvxK0= +golang.org/x/xerrors v0.0.0-20220609144429-65e65417b02f/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20220725144611-272f38e5d71b h1:SfSkJugek6xm7lWywqth4r2iTrYLpD8lOj1nMIIhMNM= +google.golang.org/genproto v0.0.0-20220725144611-272f38e5d71b/go.mod h1:iHe1svFLAZg9VWz891+QbRMwUv9O/1Ww+/mngYeThbc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= +google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= -gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..414d864 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,28 @@ +package config + +type TelegramAPI struct { + Token string + Endpoint string +} + +type HTTPProxy struct { + AdvertisedEndpoint string `mapstructure:"advertised_endpoint"` + Listen string + Allow []string +} + +type GRPCProxy struct { + Listen string +} + +type Client struct { + Name string + Token string +} + +type Config struct { + TelegramAPI TelegramAPI `mapstructure:"telegram"` + HTTPProxy HTTPProxy `mapstructure:"http"` + GRPCProxy GRPCProxy `mapstructure:"grpc"` + Clients []Client +} diff --git a/internal/genproto/mux.pb.go b/internal/genproto/mux.pb.go new file mode 100644 index 0000000..deb2ecd --- /dev/null +++ b/internal/genproto/mux.pb.go @@ -0,0 +1,397 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.21.3 +// source: mux.proto + +package genproto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Config specifies the information clients require to connect to the proxy. +type Config struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + ProxyEndpoint string `protobuf:"bytes,1,opt,name=proxy_endpoint,json=proxyEndpoint,proto3" json:"proxy_endpoint,omitempty"` +} + +func (x *Config) Reset() { + *x = Config{} + if protoimpl.UnsafeEnabled { + mi := &file_mux_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Config) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Config) ProtoMessage() {} + +func (x *Config) ProtoReflect() protoreflect.Message { + mi := &file_mux_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Config.ProtoReflect.Descriptor instead. +func (*Config) Descriptor() ([]byte, []int) { + return file_mux_proto_rawDescGZIP(), []int{0} +} + +func (x *Config) GetProxyEndpoint() string { + if x != nil { + return x.ProxyEndpoint + } + return "" +} + +// Update is a single update received by the proxy, passed as the actual stringified update object. +type Update struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Json string `protobuf:"bytes,1,opt,name=json,proto3" json:"json,omitempty"` +} + +func (x *Update) Reset() { + *x = Update{} + if protoimpl.UnsafeEnabled { + mi := &file_mux_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Update) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Update) ProtoMessage() {} + +func (x *Update) ProtoReflect() protoreflect.Message { + mi := &file_mux_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Update.ProtoReflect.Descriptor instead. +func (*Update) Descriptor() ([]byte, []int) { + return file_mux_proto_rawDescGZIP(), []int{1} +} + +func (x *Update) GetJson() string { + if x != nil { + return x.Json + } + return "" +} + +type ConfigRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ConfigRequest) Reset() { + *x = ConfigRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_mux_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ConfigRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConfigRequest) ProtoMessage() {} + +func (x *ConfigRequest) ProtoReflect() protoreflect.Message { + mi := &file_mux_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConfigRequest.ProtoReflect.Descriptor instead. +func (*ConfigRequest) Descriptor() ([]byte, []int) { + return file_mux_proto_rawDescGZIP(), []int{2} +} + +type ConfigResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Config *Config `protobuf:"bytes,1,opt,name=config,proto3" json:"config,omitempty"` +} + +func (x *ConfigResponse) Reset() { + *x = ConfigResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_mux_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ConfigResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConfigResponse) ProtoMessage() {} + +func (x *ConfigResponse) ProtoReflect() protoreflect.Message { + mi := &file_mux_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConfigResponse.ProtoReflect.Descriptor instead. +func (*ConfigResponse) Descriptor() ([]byte, []int) { + return file_mux_proto_rawDescGZIP(), []int{3} +} + +func (x *ConfigResponse) GetConfig() *Config { + if x != nil { + return x.Config + } + return nil +} + +type RegisterRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Matchers []string `protobuf:"bytes,1,rep,name=matchers,proto3" json:"matchers,omitempty"` +} + +func (x *RegisterRequest) Reset() { + *x = RegisterRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_mux_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RegisterRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RegisterRequest) ProtoMessage() {} + +func (x *RegisterRequest) ProtoReflect() protoreflect.Message { + mi := &file_mux_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RegisterRequest.ProtoReflect.Descriptor instead. +func (*RegisterRequest) Descriptor() ([]byte, []int) { + return file_mux_proto_rawDescGZIP(), []int{4} +} + +func (x *RegisterRequest) GetMatchers() []string { + if x != nil { + return x.Matchers + } + return nil +} + +var File_mux_proto protoreflect.FileDescriptor + +var file_mux_proto_rawDesc = []byte{ + 0x0a, 0x09, 0x6d, 0x75, 0x78, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x03, 0x6d, 0x75, 0x78, + 0x22, 0x2f, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x72, + 0x6f, 0x78, 0x79, 0x5f, 0x65, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0d, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x45, 0x6e, 0x64, 0x70, 0x6f, 0x69, 0x6e, + 0x74, 0x22, 0x1c, 0x0a, 0x06, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x12, 0x12, 0x0a, 0x04, 0x6a, + 0x73, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6a, 0x73, 0x6f, 0x6e, 0x22, + 0x0f, 0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x22, 0x35, 0x0a, 0x0e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x23, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x6d, 0x75, 0x78, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, + 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x22, 0x2d, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, + 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x6d, 0x61, + 0x74, 0x63, 0x68, 0x65, 0x72, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x6d, 0x61, + 0x74, 0x63, 0x68, 0x65, 0x72, 0x73, 0x32, 0x82, 0x01, 0x0a, 0x12, 0x4d, 0x75, 0x6c, 0x74, 0x69, + 0x70, 0x6c, 0x65, 0x78, 0x65, 0x72, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x34, 0x0a, + 0x09, 0x47, 0x65, 0x74, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x12, 0x2e, 0x6d, 0x75, 0x78, + 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, + 0x2e, 0x6d, 0x75, 0x78, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x36, 0x0a, 0x0f, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x48, + 0x61, 0x6e, 0x64, 0x6c, 0x65, 0x72, 0x12, 0x14, 0x2e, 0x6d, 0x75, 0x78, 0x2e, 0x52, 0x65, 0x67, + 0x69, 0x73, 0x74, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0b, 0x2e, 0x6d, + 0x75, 0x78, 0x2e, 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x30, 0x01, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_mux_proto_rawDescOnce sync.Once + file_mux_proto_rawDescData = file_mux_proto_rawDesc +) + +func file_mux_proto_rawDescGZIP() []byte { + file_mux_proto_rawDescOnce.Do(func() { + file_mux_proto_rawDescData = protoimpl.X.CompressGZIP(file_mux_proto_rawDescData) + }) + return file_mux_proto_rawDescData +} + +var file_mux_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_mux_proto_goTypes = []interface{}{ + (*Config)(nil), // 0: mux.Config + (*Update)(nil), // 1: mux.Update + (*ConfigRequest)(nil), // 2: mux.ConfigRequest + (*ConfigResponse)(nil), // 3: mux.ConfigResponse + (*RegisterRequest)(nil), // 4: mux.RegisterRequest +} +var file_mux_proto_depIdxs = []int32{ + 0, // 0: mux.ConfigResponse.config:type_name -> mux.Config + 2, // 1: mux.MultiplexerService.GetConfig:input_type -> mux.ConfigRequest + 4, // 2: mux.MultiplexerService.RegisterHandler:input_type -> mux.RegisterRequest + 3, // 3: mux.MultiplexerService.GetConfig:output_type -> mux.ConfigResponse + 1, // 4: mux.MultiplexerService.RegisterHandler:output_type -> mux.Update + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_mux_proto_init() } +func file_mux_proto_init() { + if File_mux_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_mux_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Config); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mux_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Update); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mux_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ConfigRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mux_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ConfigResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mux_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RegisterRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_mux_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_mux_proto_goTypes, + DependencyIndexes: file_mux_proto_depIdxs, + MessageInfos: file_mux_proto_msgTypes, + }.Build() + File_mux_proto = out.File + file_mux_proto_rawDesc = nil + file_mux_proto_goTypes = nil + file_mux_proto_depIdxs = nil +} diff --git a/internal/genproto/mux_grpc.pb.go b/internal/genproto/mux_grpc.pb.go new file mode 100644 index 0000000..cfdc02a --- /dev/null +++ b/internal/genproto/mux_grpc.pb.go @@ -0,0 +1,169 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.3 +// source: mux.proto + +package genproto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// MultiplexerServiceClient is the client API for MultiplexerService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type MultiplexerServiceClient interface { + GetConfig(ctx context.Context, in *ConfigRequest, opts ...grpc.CallOption) (*ConfigResponse, error) + RegisterHandler(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (MultiplexerService_RegisterHandlerClient, error) +} + +type multiplexerServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewMultiplexerServiceClient(cc grpc.ClientConnInterface) MultiplexerServiceClient { + return &multiplexerServiceClient{cc} +} + +func (c *multiplexerServiceClient) GetConfig(ctx context.Context, in *ConfigRequest, opts ...grpc.CallOption) (*ConfigResponse, error) { + out := new(ConfigResponse) + err := c.cc.Invoke(ctx, "/mux.MultiplexerService/GetConfig", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *multiplexerServiceClient) RegisterHandler(ctx context.Context, in *RegisterRequest, opts ...grpc.CallOption) (MultiplexerService_RegisterHandlerClient, error) { + stream, err := c.cc.NewStream(ctx, &MultiplexerService_ServiceDesc.Streams[0], "/mux.MultiplexerService/RegisterHandler", opts...) + if err != nil { + return nil, err + } + x := &multiplexerServiceRegisterHandlerClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type MultiplexerService_RegisterHandlerClient interface { + Recv() (*Update, error) + grpc.ClientStream +} + +type multiplexerServiceRegisterHandlerClient struct { + grpc.ClientStream +} + +func (x *multiplexerServiceRegisterHandlerClient) Recv() (*Update, error) { + m := new(Update) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// MultiplexerServiceServer is the server API for MultiplexerService service. +// All implementations must embed UnimplementedMultiplexerServiceServer +// for forward compatibility +type MultiplexerServiceServer interface { + GetConfig(context.Context, *ConfigRequest) (*ConfigResponse, error) + RegisterHandler(*RegisterRequest, MultiplexerService_RegisterHandlerServer) error + mustEmbedUnimplementedMultiplexerServiceServer() +} + +// UnimplementedMultiplexerServiceServer must be embedded to have forward compatible implementations. +type UnimplementedMultiplexerServiceServer struct { +} + +func (UnimplementedMultiplexerServiceServer) GetConfig(context.Context, *ConfigRequest) (*ConfigResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetConfig not implemented") +} +func (UnimplementedMultiplexerServiceServer) RegisterHandler(*RegisterRequest, MultiplexerService_RegisterHandlerServer) error { + return status.Errorf(codes.Unimplemented, "method RegisterHandler not implemented") +} +func (UnimplementedMultiplexerServiceServer) mustEmbedUnimplementedMultiplexerServiceServer() {} + +// UnsafeMultiplexerServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to MultiplexerServiceServer will +// result in compilation errors. +type UnsafeMultiplexerServiceServer interface { + mustEmbedUnimplementedMultiplexerServiceServer() +} + +func RegisterMultiplexerServiceServer(s grpc.ServiceRegistrar, srv MultiplexerServiceServer) { + s.RegisterService(&MultiplexerService_ServiceDesc, srv) +} + +func _MultiplexerService_GetConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ConfigRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(MultiplexerServiceServer).GetConfig(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/mux.MultiplexerService/GetConfig", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(MultiplexerServiceServer).GetConfig(ctx, req.(*ConfigRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _MultiplexerService_RegisterHandler_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(RegisterRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(MultiplexerServiceServer).RegisterHandler(m, &multiplexerServiceRegisterHandlerServer{stream}) +} + +type MultiplexerService_RegisterHandlerServer interface { + Send(*Update) error + grpc.ServerStream +} + +type multiplexerServiceRegisterHandlerServer struct { + grpc.ServerStream +} + +func (x *multiplexerServiceRegisterHandlerServer) Send(m *Update) error { + return x.ServerStream.SendMsg(m) +} + +// MultiplexerService_ServiceDesc is the grpc.ServiceDesc for MultiplexerService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var MultiplexerService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "mux.MultiplexerService", + HandlerType: (*MultiplexerServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetConfig", + Handler: _MultiplexerService_GetConfig_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "RegisterHandler", + Handler: _MultiplexerService_RegisterHandler_Handler, + ServerStreams: true, + }, + }, + Metadata: "mux.proto", +} diff --git a/internal/mocks/mux_mock.go b/internal/mocks/mux_mock.go new file mode 100644 index 0000000..0b1f7f1 --- /dev/null +++ b/internal/mocks/mux_mock.go @@ -0,0 +1,201 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/bbralion/CTFloodBot/internal/genproto (interfaces: MultiplexerServiceClient,MultiplexerService_RegisterHandlerClient) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + genproto "github.com/bbralion/CTFloodBot/internal/genproto" + gomock "github.com/golang/mock/gomock" + grpc "google.golang.org/grpc" + metadata "google.golang.org/grpc/metadata" +) + +// MockMultiplexerServiceClient is a mock of MultiplexerServiceClient interface. +type MockMultiplexerServiceClient struct { + ctrl *gomock.Controller + recorder *MockMultiplexerServiceClientMockRecorder +} + +// MockMultiplexerServiceClientMockRecorder is the mock recorder for MockMultiplexerServiceClient. +type MockMultiplexerServiceClientMockRecorder struct { + mock *MockMultiplexerServiceClient +} + +// NewMockMultiplexerServiceClient creates a new mock instance. +func NewMockMultiplexerServiceClient(ctrl *gomock.Controller) *MockMultiplexerServiceClient { + mock := &MockMultiplexerServiceClient{ctrl: ctrl} + mock.recorder = &MockMultiplexerServiceClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMultiplexerServiceClient) EXPECT() *MockMultiplexerServiceClientMockRecorder { + return m.recorder +} + +// GetConfig mocks base method. +func (m *MockMultiplexerServiceClient) GetConfig(arg0 context.Context, arg1 *genproto.ConfigRequest, arg2 ...grpc.CallOption) (*genproto.ConfigResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "GetConfig", varargs...) + ret0, _ := ret[0].(*genproto.ConfigResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetConfig indicates an expected call of GetConfig. +func (mr *MockMultiplexerServiceClientMockRecorder) GetConfig(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfig", reflect.TypeOf((*MockMultiplexerServiceClient)(nil).GetConfig), varargs...) +} + +// RegisterHandler mocks base method. +func (m *MockMultiplexerServiceClient) RegisterHandler(arg0 context.Context, arg1 *genproto.RegisterRequest, arg2 ...grpc.CallOption) (genproto.MultiplexerService_RegisterHandlerClient, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RegisterHandler", varargs...) + ret0, _ := ret[0].(genproto.MultiplexerService_RegisterHandlerClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// RegisterHandler indicates an expected call of RegisterHandler. +func (mr *MockMultiplexerServiceClientMockRecorder) RegisterHandler(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterHandler", reflect.TypeOf((*MockMultiplexerServiceClient)(nil).RegisterHandler), varargs...) +} + +// MockMultiplexerService_RegisterHandlerClient is a mock of MultiplexerService_RegisterHandlerClient interface. +type MockMultiplexerService_RegisterHandlerClient struct { + ctrl *gomock.Controller + recorder *MockMultiplexerService_RegisterHandlerClientMockRecorder +} + +// MockMultiplexerService_RegisterHandlerClientMockRecorder is the mock recorder for MockMultiplexerService_RegisterHandlerClient. +type MockMultiplexerService_RegisterHandlerClientMockRecorder struct { + mock *MockMultiplexerService_RegisterHandlerClient +} + +// NewMockMultiplexerService_RegisterHandlerClient creates a new mock instance. +func NewMockMultiplexerService_RegisterHandlerClient(ctrl *gomock.Controller) *MockMultiplexerService_RegisterHandlerClient { + mock := &MockMultiplexerService_RegisterHandlerClient{ctrl: ctrl} + mock.recorder = &MockMultiplexerService_RegisterHandlerClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMultiplexerService_RegisterHandlerClient) EXPECT() *MockMultiplexerService_RegisterHandlerClientMockRecorder { + return m.recorder +} + +// CloseSend mocks base method. +func (m *MockMultiplexerService_RegisterHandlerClient) CloseSend() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CloseSend") + ret0, _ := ret[0].(error) + return ret0 +} + +// CloseSend indicates an expected call of CloseSend. +func (mr *MockMultiplexerService_RegisterHandlerClientMockRecorder) CloseSend() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseSend", reflect.TypeOf((*MockMultiplexerService_RegisterHandlerClient)(nil).CloseSend)) +} + +// Context mocks base method. +func (m *MockMultiplexerService_RegisterHandlerClient) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockMultiplexerService_RegisterHandlerClientMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockMultiplexerService_RegisterHandlerClient)(nil).Context)) +} + +// Header mocks base method. +func (m *MockMultiplexerService_RegisterHandlerClient) Header() (metadata.MD, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Header") + ret0, _ := ret[0].(metadata.MD) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Header indicates an expected call of Header. +func (mr *MockMultiplexerService_RegisterHandlerClientMockRecorder) Header() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Header", reflect.TypeOf((*MockMultiplexerService_RegisterHandlerClient)(nil).Header)) +} + +// Recv mocks base method. +func (m *MockMultiplexerService_RegisterHandlerClient) Recv() (*genproto.Update, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Recv") + ret0, _ := ret[0].(*genproto.Update) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Recv indicates an expected call of Recv. +func (mr *MockMultiplexerService_RegisterHandlerClientMockRecorder) Recv() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Recv", reflect.TypeOf((*MockMultiplexerService_RegisterHandlerClient)(nil).Recv)) +} + +// RecvMsg mocks base method. +func (m *MockMultiplexerService_RegisterHandlerClient) RecvMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RecvMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RecvMsg indicates an expected call of RecvMsg. +func (mr *MockMultiplexerService_RegisterHandlerClientMockRecorder) RecvMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RecvMsg", reflect.TypeOf((*MockMultiplexerService_RegisterHandlerClient)(nil).RecvMsg), arg0) +} + +// SendMsg mocks base method. +func (m *MockMultiplexerService_RegisterHandlerClient) SendMsg(arg0 interface{}) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendMsg", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMsg indicates an expected call of SendMsg. +func (mr *MockMultiplexerService_RegisterHandlerClientMockRecorder) SendMsg(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMsg", reflect.TypeOf((*MockMultiplexerService_RegisterHandlerClient)(nil).SendMsg), arg0) +} + +// Trailer mocks base method. +func (m *MockMultiplexerService_RegisterHandlerClient) Trailer() metadata.MD { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Trailer") + ret0, _ := ret[0].(metadata.MD) + return ret0 +} + +// Trailer indicates an expected call of Trailer. +func (mr *MockMultiplexerService_RegisterHandlerClientMockRecorder) Trailer() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Trailer", reflect.TypeOf((*MockMultiplexerService_RegisterHandlerClient)(nil).Trailer)) +} diff --git a/internal/mocks/tgbotapi_mock.go b/internal/mocks/tgbotapi_mock.go new file mode 100644 index 0000000..8bce3e1 --- /dev/null +++ b/internal/mocks/tgbotapi_mock.go @@ -0,0 +1,50 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/go-telegram-bot-api/telegram-bot-api (interfaces: HttpClient) + +// Package mocks is a generated GoMock package. +package mocks + +import ( + http "net/http" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockTGBotAPIHTTPClient is a mock of HttpClient interface. +type MockTGBotAPIHTTPClient struct { + ctrl *gomock.Controller + recorder *MockTGBotAPIHTTPClientMockRecorder +} + +// MockTGBotAPIHTTPClientMockRecorder is the mock recorder for MockTGBotAPIHTTPClient. +type MockTGBotAPIHTTPClientMockRecorder struct { + mock *MockTGBotAPIHTTPClient +} + +// NewMockTGBotAPIHTTPClient creates a new mock instance. +func NewMockTGBotAPIHTTPClient(ctrl *gomock.Controller) *MockTGBotAPIHTTPClient { + mock := &MockTGBotAPIHTTPClient{ctrl: ctrl} + mock.recorder = &MockTGBotAPIHTTPClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTGBotAPIHTTPClient) EXPECT() *MockTGBotAPIHTTPClientMockRecorder { + return m.recorder +} + +// Do mocks base method. +func (m *MockTGBotAPIHTTPClient) Do(arg0 *http.Request) (*http.Response, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Do", arg0) + ret0, _ := ret[0].(*http.Response) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Do indicates an expected call of Do. +func (mr *MockTGBotAPIHTTPClientMockRecorder) Do(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Do", reflect.TypeOf((*MockTGBotAPIHTTPClient)(nil).Do), arg0) +} diff --git a/internal/models/models.go b/internal/models/models.go new file mode 100644 index 0000000..bd54388 --- /dev/null +++ b/internal/models/models.go @@ -0,0 +1,25 @@ +package models + +import ( + "regexp" + + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" +) + +type PossibleUpdate struct { + Update tgbotapi.Update + Error error +} + +type UpdateChan <-chan PossibleUpdate + +type MatcherGroup []*regexp.Regexp + +func (g MatcherGroup) MatchString(s string) bool { + for _, m := range g { + if m.MatchString(s) { + return true + } + } + return false +} diff --git a/internal/proxy/grpc.go b/internal/proxy/grpc.go new file mode 100644 index 0000000..0ed5950 --- /dev/null +++ b/internal/proxy/grpc.go @@ -0,0 +1,51 @@ +package proxy + +import ( + "context" + "errors" + + "github.com/bbralion/CTFloodBot/internal/genproto" + "github.com/bbralion/CTFloodBot/pkg/auth" + "github.com/bbralion/CTFloodBot/pkg/services" + "github.com/go-logr/logr" + "google.golang.org/grpc" +) + +type GRPC struct { + genproto.UnimplementedMultiplexerServiceServer + AdvertisedHTTPEndpoint string + Addr string + Logger logr.Logger + AuthProvider services.Authenticator +} + +func (p *GRPC) ListenAndServe() error { + if p.AuthProvider == nil || p.AdvertisedHTTPEndpoint == "" { + return errors.New("logger, auth provider and the advertised http endpoint must be set") + } + + return nil +} + +func (p *GRPC) GetConfig(context.Context, *genproto.ConfigRequest) (*genproto.ConfigResponse, error) { + return &genproto.ConfigResponse{ + Config: &genproto.Config{ + ProxyEndpoint: p.AdvertisedHTTPEndpoint, + }, + }, nil +} + +func (p *GRPC) RegisterHandler(*genproto.RegisterRequest, genproto.MultiplexerService_RegisterHandlerServer) error { + return nil +} + +func (p *GRPC) setupGRPC() *grpc.Server { + interceptor := auth.NewGRPCServerInterceptor(p.Logger, p.AuthProvider) + server := grpc.NewServer( + grpc.UnaryInterceptor(interceptor.Unary()), + grpc.StreamInterceptor(interceptor.Stream()), + ) + + genproto.RegisterMultiplexerServiceServer(server, p) + return server +} diff --git a/internal/proxy/http.go b/internal/proxy/http.go new file mode 100644 index 0000000..c80fe02 --- /dev/null +++ b/internal/proxy/http.go @@ -0,0 +1,173 @@ +package proxy + +import ( + "context" + "fmt" + "net/http" + "net/http/httputil" + "net/url" + "regexp" + "sync/atomic" + "time" + + internal "github.com/bbralion/CTFloodBot/internal/services" + "github.com/bbralion/CTFloodBot/pkg/services" + "github.com/go-logr/logr" + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" + "github.com/justinas/alice" +) + +// DefaultRequestTimeout is the default timeout to be used for making requests to the telegram API +const DefaultRequestTimeout = time.Second * 30 + +// 16 megabytes should be enough for most usecases +const DefaultMaxBodyBytes = 16_000_000 + +// HTTP is the telegram API HTTP proxy +type HTTP struct { + http.Server + Logger logr.Logger + // If set will be used to authenticate clients via a token in the Authorization header + AuthProvider services.Authenticator + // If set only paths in the allowlist will be allowed + Allowlist internal.Allowlist + // Transport is the transport to use for making requests to the telegram API. + // http.DefaultTransport will be used by default + Transport *http.Transport + // Telegram API token + Token string + // Telegram API endpoint to use, may be another proxy + Endpoint string + requestCounter int64 +} + +var pathRe = regexp.MustCompile(`^/proxy(\w+)(/.+)$`) + +// Path returns the HTTP proxy path format string suitable for use with tgbotapi +func (p *HTTP) Path() string { + return "/proxy%s/%s" +} + +func (p *HTTP) ListenAndServe() error { + endpointURL, err := url.Parse(p.Endpoint) + if err != nil { + return fmt.Errorf("invalid endpoint url specified: %w", err) + } + + p.Logger = p.Logger.WithName("http") + p.setDefaults() + + // TODO: implement proper handling of special commands such as setMyCommands + handler := httputil.ReverseProxy{ + Director: func(r *http.Request) { + // Route requests using the telegram API token and with a limited body + reqURL := *endpointURL + reqURL.User = nil + reqURL.Path = fmt.Sprintf(p.Endpoint, p.Token, r.URL.Path[1:]) + r.URL = &reqURL + r.Body = http.MaxBytesReader(nil, r.Body, DefaultMaxBodyBytes) + }, + ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) { + p.Logger.Error(err, "request to telegram API failed", "request_id", requestID(r)) + w.WriteHeader(http.StatusBadGateway) + }, + Transport: p.Transport, + } + + p.Handler = alice.New(p.PanicMiddleware, p.RequestIDMiddleware, p.LoggingMiddleware, p.AuthMiddleware, p.AllowPathMiddleware).Then(&handler) + return p.ListenAndServe() +} + +func (p *HTTP) setDefaults() { + if p.Transport == nil { + p.Transport = &http.Transport{} + } + if p.Transport.ResponseHeaderTimeout == 0 { + p.Transport.ResponseHeaderTimeout = DefaultRequestTimeout + } + + if p.Endpoint == "" { + p.Endpoint = tgbotapi.APIEndpoint + } +} + +type requestIDCtxKey struct{} + +func requestID(r *http.Request) int64 { + return r.Context().Value(requestIDCtxKey{}).(int64) +} + +func (p *HTTP) RequestIDMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + id := atomic.AddInt64(&p.requestCounter, 1) + next.ServeHTTP(w, r.WithContext(context.WithValue(r.Context(), requestIDCtxKey{}, id))) + }) +} + +func (p *HTTP) PanicMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer func() { + if v := recover(); v != nil { + p.Logger.Info("recovered from panic", "recover", v, "request_id", requestID(r)) + } + }() + + next.ServeHTTP(w, r) + }) +} + +type clientCtxKey struct{} + +func (p *HTTP) AuthMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + groups := pathRe.FindStringSubmatch(r.URL.Path) + if groups == nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if p.AuthProvider != nil { + client, err := p.AuthProvider.Authenticate(groups[1]) + if err != nil { + w.WriteHeader(http.StatusUnauthorized) + return + } + authenticatedReq := r.WithContext(context.WithValue(r.Context(), clientCtxKey{}, client)) + authenticatedReq.URL.Path = groups[2] + next.ServeHTTP(w, authenticatedReq) + } else { + next.ServeHTTP(w, r) + } + }) +} + +func (p *HTTP) LoggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + rcopy := r.Clone(context.Background()) + + // Always log, even on panic + defer func() { + latency := time.Since(start) + p.Logger.Info("handled request", + "uri", rcopy.RequestURI, + "method", rcopy.Method, + "latency", latency, + "remote_addr", rcopy.RemoteAddr, + "request_id", requestID(r), + "client", r.Context().Value(clientCtxKey{})) + }() + + next.ServeHTTP(w, r) + }) +} + +func (p *HTTP) AllowPathMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if p.Allowlist != nil && !p.Allowlist.Allowed(r.URL.Path) { + w.WriteHeader(http.StatusForbidden) + return + } + next.ServeHTTP(w, r) + }) +} diff --git a/internal/services/allowlist.go b/internal/services/allowlist.go new file mode 100644 index 0000000..232da8a --- /dev/null +++ b/internal/services/allowlist.go @@ -0,0 +1,24 @@ +package services + +// Allowlist specifies values that are explicitly allowed to be used +type Allowlist interface { + Allowed(key string) bool +} + +type staticAllowList struct { + allowed map[string]struct{} +} + +func (l *staticAllowList) Allowed(key string) bool { + _, ok := l.allowed[key] + return ok +} + +// NewStaticAllowlist returns an allowlist that allows only the values specified +func NewStaticAllowlist(allowed []string) Allowlist { + m := make(map[string]struct{}, len(allowed)) + for _, s := range allowed { + m[s] = struct{}{} + } + return &staticAllowList{m} +} diff --git a/internal/services/allowlist_test.go b/internal/services/allowlist_test.go new file mode 100644 index 0000000..d92d5ec --- /dev/null +++ b/internal/services/allowlist_test.go @@ -0,0 +1,63 @@ +package services + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_staticAllowList_Allowed(t *testing.T) { + type args struct { + key string + } + tests := []struct { + name string + allowed []string + args args + want bool + }{ + { + name: "nil static allowlist", + allowed: nil, + args: args{ + key: "test-key", + }, + want: false, + }, + { + name: "empty static allowlist", + allowed: []string{}, + args: args{ + key: "test-key", + }, + want: false, + }, + { + name: "allowed value in allowlist", + allowed: []string{"allowed"}, + args: args{ + key: "allowed", + }, + want: true, + }, + { + name: "unallowed value in allowlist", + allowed: []string{"allowed"}, + args: args{ + key: "unallowed", + }, + want: false, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + req := require.New(t) + + l := NewStaticAllowlist(tt.allowed) + req.Equal(tt.want, l.Allowed(tt.args.key)) + }) + } +} diff --git a/internal/services/multiplexer.go b/internal/services/multiplexer.go new file mode 100644 index 0000000..d5ac1d4 --- /dev/null +++ b/internal/services/multiplexer.go @@ -0,0 +1,89 @@ +package services + +import ( + "context" + "errors" + "sync" + "sync/atomic" + + "github.com/bbralion/CTFloodBot/internal/models" + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" +) + +// Multiplexer allows multiplexing various update handlers based on matchers +type Multiplexer interface { + // Register registers a new handler which will receive updates until the context is canceled. + // Safe for concurrent use, so matchers can be registered from anywhere. + Register(ctx context.Context, matchers models.MatcherGroup) (tgbotapi.UpdatesChannel, error) + // Serve multiplexes the update across the registered handlers. + // Isn't safe for concurrent use, so all calls to Serve must be from a single goroutine. + Serve(update tgbotapi.Update) +} + +type ( + muxKey uint64 + muxHandler struct { + ctx context.Context + matchers models.MatcherGroup + channel chan tgbotapi.Update + } +) + +// mapMux is a default implementation of Multiplexer +type mapMux struct { + curKey muxKey + store sync.Map + bufferLen int +} + +func (m *mapMux) Register(ctx context.Context, matchers models.MatcherGroup) (tgbotapi.UpdatesChannel, error) { + if len(matchers) < 1 { + return nil, errors.New("cannot register with zero matchers") + } + + key := muxKey(atomic.AddUint64((*uint64)(&m.curKey), 1)) + h := &muxHandler{ctx, matchers, make(chan tgbotapi.Update, m.bufferLen)} + + m.store.Store(key, h) + return h.channel, nil +} + +func (m *mapMux) delete(key muxKey, h *muxHandler) { + m.store.Delete(key) + close(h.channel) +} + +func (m *mapMux) Serve(update tgbotapi.Update) { + // Currently only messages are supported + if update.Message == nil { + return + } + + m.store.Range(func(key, value any) bool { + mkey, mvalue := key.(muxKey), value.(*muxHandler) + + // Fail-fast if the handler is already dead + select { + case <-mvalue.ctx.Done(): + m.delete(mkey, mvalue) + return true + default: + } + + // Match and try to send if needed + if mvalue.matchers.MatchString(update.Message.Text) { + select { + case <-mvalue.ctx.Done(): + m.delete(mkey, mvalue) + case mvalue.channel <- update: + } + } + return true + }) +} + +// NewMultiplexer creates a new multiplexer with the +// specified buffer size of created update channels +func NewMultiplexer(bufferLen int) Multiplexer { + return &mapMux{bufferLen: bufferLen} +} diff --git a/internal/services/multiplexer_test.go b/internal/services/multiplexer_test.go new file mode 100644 index 0000000..029e557 --- /dev/null +++ b/internal/services/multiplexer_test.go @@ -0,0 +1,74 @@ +package services + +import ( + "context" + "regexp" + "sync" + "testing" + "time" + + "github.com/bbralion/CTFloodBot/internal/models" + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" + "github.com/stretchr/testify/require" +) + +func startExpectingMuxClient(wg *sync.WaitGroup, req *require.Assertions, mux Multiplexer, updates []tgbotapi.Update, matchers models.MatcherGroup) { + ctx, cancel := context.WithCancel(context.Background()) + ch, err := mux.Register(ctx, matchers) + req.NoError(err, "should register without error") + + wg.Add(1) + go func() { + defer wg.Done() + for i := range updates { + update, ok := <-ch + req.True(ok, "wanted %d updates but got %d", len(updates), i) + + // Currently only test message matches, since no others are supported + req.True(matchers.MatchString(update.Message.Text), "got non-matching update") + req.Equal(updates[i], update, "invalid order of updates") + } + cancel() + // Channel should close on next send + req.Eventually(func() bool { + _, ok := <-ch + return !ok + }, time.Second*10, time.Millisecond*50) + }() +} + +func TestMultiplexer(t *testing.T) { + req := require.New(t) + + updates := make([]tgbotapi.Update, 0, 13) + for _, text := range []string{"/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h", "/j", "/aboba", "/sus", "/0", "/aboba"} { + updates = append(updates, tgbotapi.Update{Message: &tgbotapi.Message{Text: text}}) + } + + var wg sync.WaitGroup + mux := NewMultiplexer(1) + + _, err := mux.Register(context.Background(), nil) + req.Error(err, "registration without matchers") + + startExpectingMuxClient(&wg, req, mux, updates[:2], models.MatcherGroup{regexp.MustCompile("^/[ab]$")}) + startExpectingMuxClient(&wg, req, mux, updates[:6], models.MatcherGroup{regexp.MustCompile("^/[a-f]$")}) + startExpectingMuxClient(&wg, req, mux, updates[:9], models.MatcherGroup{regexp.MustCompile("^/[a-j]$")}) + startExpectingMuxClient(&wg, req, mux, updates[:9], models.MatcherGroup{regexp.MustCompile(".*")}) + startExpectingMuxClient(&wg, req, mux, []tgbotapi.Update{ + {Message: &tgbotapi.Message{Text: "/aboba"}}, + {Message: &tgbotapi.Message{Text: "/sus"}}, + {Message: &tgbotapi.Message{Text: "/aboba"}}, + }, models.MatcherGroup{regexp.MustCompile("^/(aboba|sus)$")}) + + for _, update := range updates { + mux.Serve(update) + } + + // Wait for all clients to finish + time.Sleep(time.Second * 2) + + // serve one last fake update to close the channels + mux.Serve(updates[0]) + wg.Wait() +} diff --git a/pkg/auth/auth.go b/pkg/auth/auth.go new file mode 100644 index 0000000..5bf5883 --- /dev/null +++ b/pkg/auth/auth.go @@ -0,0 +1,110 @@ +package auth + +import ( + "context" + + "github.com/bbralion/CTFloodBot/pkg/services" + "github.com/go-logr/logr" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +const tokenKey = "authorization" + +type GRPCClientInterceptor string + +// NewGRPCClientInterceptor creates a new gRPC client interceptor which uses the given token +func NewGRPCClientInterceptor(token string) GRPCClientInterceptor { + return GRPCClientInterceptor(token) +} + +func (t GRPCClientInterceptor) attach(ctx context.Context) context.Context { + return metadata.AppendToOutgoingContext(ctx, tokenKey, string(t)) +} + +// Unary returns a unary gRPC client interceptor with authentication using the setup token +func (t GRPCClientInterceptor) Unary() grpc.UnaryClientInterceptor { + return func( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, + ) error { + return invoker(t.attach(ctx), method, req, reply, cc, opts...) + } +} + +// Stream returns a stream gRPC client interceptor with authentication using the setup token +func (t GRPCClientInterceptor) Stream() grpc.StreamClientInterceptor { + return func( + ctx context.Context, + desc *grpc.StreamDesc, + cc *grpc.ClientConn, + method string, + streamer grpc.Streamer, opts ...grpc.CallOption, + ) (grpc.ClientStream, error) { + return streamer(t.attach(ctx), desc, cc, method, opts...) + } +} + +// GRPCServerInterceptor is a Unary and Stream interceptor provider which +// uses an underlying AuthProvider for authentication of clients +type GRPCServerInterceptor struct { + logger logr.Logger + provider services.Authenticator +} + +// NewGRPCServerInterceptor returns a new gRPC server interceptor +// which authenticates clients using the specified provider. +func NewGRPCServerInterceptor(logger logr.Logger, provider services.Authenticator) *GRPCServerInterceptor { + return &GRPCServerInterceptor{logger, provider} +} + +func (i *GRPCServerInterceptor) authorize(ctx context.Context, method string) error { + md, ok := metadata.FromIncomingContext(ctx) + if !ok || len(md[tokenKey]) != 1 { + return status.Error(codes.Unauthenticated, "must contain metadata with single auth token") + } + + client, err := i.provider.Authenticate(md[tokenKey][0]) + if err != nil { + return status.Error(codes.Unauthenticated, err.Error()) + } + + i.logger.Info("gRPC request from authenticated client", "client", client, "method", method) + return nil +} + +// Unary returns a unary gRPC server interceptor for authentication +func (i *GRPCServerInterceptor) Unary() grpc.UnaryServerInterceptor { + return func( + ctx context.Context, + req interface{}, + info *grpc.UnaryServerInfo, + handler grpc.UnaryHandler, + ) (interface{}, error) { + if err := i.authorize(ctx, info.FullMethod); err != nil { + return nil, err + } + return handler(ctx, req) + } +} + +// Stream returns a stream gRPC server interceptor for authentication +func (i *GRPCServerInterceptor) Stream() grpc.StreamServerInterceptor { + return func( + srv interface{}, + stream grpc.ServerStream, + info *grpc.StreamServerInfo, + handler grpc.StreamHandler, + ) error { + if err := i.authorize(stream.Context(), info.FullMethod); err != nil { + return err + } + return handler(srv, stream) + } +} diff --git a/pkg/core/answer.go b/pkg/core/answer.go deleted file mode 100644 index 3dc739e..0000000 --- a/pkg/core/answer.go +++ /dev/null @@ -1,11 +0,0 @@ -package core - -import ( - telegramapi "github.com/go-telegram-bot-api/telegram-bot-api" -) - -type HandlerAnswer struct { - Type string `json:"type"` // one of message:message_config, message:sticker_config - MessageConfig *telegramapi.MessageConfig `json:"message_config"` - StickerConfig *telegramapi.StickerConfig `json:"sticker_config"` -} diff --git a/pkg/core/config.go b/pkg/core/config.go deleted file mode 100644 index dab465e..0000000 --- a/pkg/core/config.go +++ /dev/null @@ -1,24 +0,0 @@ -package core - -import ( - "github.com/go-redis/redis/v8" -) - -type BotRedisConfig struct { - Host string `yaml:"host" env:"DB_HOST"` - Username string `yaml:"user" env:"DB_USER"` - Password string `yaml:"password" env:"DB_PASSWORD"` -} - -type BotCoreConfig struct { - TelegramToken string `yaml:"auth_token" env:"BOT_AUTH_TOKEN"` - Redis BotRedisConfig `yaml:"redis"` -} - -func GetRedisClientByConfig(config BotRedisConfig) *redis.Client { - return redis.NewClient(&redis.Options{ - Addr: config.Host, - Username: config.Username, - Password: config.Password, - }) -} diff --git a/pkg/core/constant.go b/pkg/core/constant.go deleted file mode 100644 index bc7c397..0000000 --- a/pkg/core/constant.go +++ /dev/null @@ -1,6 +0,0 @@ -package core - -const ( - RedisUpdateChanel = "TelegramUpdates" - RedisAnswersChanel = "TelegramAnswers" -) diff --git a/pkg/handlers/config.go b/pkg/handlers/config.go deleted file mode 100644 index ac641a5..0000000 --- a/pkg/handlers/config.go +++ /dev/null @@ -1,9 +0,0 @@ -package handlers - -import ( - "github.com/kbats183/CTFloodBot/pkg/core" -) - -type HandlerConfig struct { - Redis core.BotRedisConfig `yaml:"redis"` -} diff --git a/pkg/handlers/simple.go b/pkg/handlers/simple.go deleted file mode 100644 index a86ed05..0000000 --- a/pkg/handlers/simple.go +++ /dev/null @@ -1,63 +0,0 @@ -package handlers - -import ( - "context" - "encoding/json" - telegramapi "github.com/go-telegram-bot-api/telegram-bot-api" - "github.com/kbats183/CTFloodBot/pkg/core" - "go.uber.org/zap" -) - -type AnswerChan chan<- core.HandlerAnswer - -type SimpleHandler struct { - Handler func(logger *zap.Logger, update *telegramapi.Update, answerChan AnswerChan) - Logger *zap.Logger - Config HandlerConfig -} - -func (h *SimpleHandler) Run() { - ctx := context.Background() - - redisClient := core.GetRedisClientByConfig(h.Config.Redis) - - publish := func(message interface{}) { - command := redisClient.Publish(ctx, core.RedisAnswersChanel, message) - if command.Err() != nil { - h.Logger.Error("Failed to send answer", zap.Error(command.Err())) - } - } - - h.Logger.Info("Handler is ready to start") - - subscriber := redisClient.Subscribe(ctx, core.RedisUpdateChanel) - for message := range subscriber.Channel() { - var update telegramapi.Update - err := json.Unmarshal([]byte(message.Payload), &update) - if err != nil { - h.Logger.Fatal("Failed to unmarshal received update", zap.Error(err), zap.String("message", message.Payload)) - } - - go h.processUpdate(&update, publish) - } -} - -func (h *SimpleHandler) createAnswerChan(publish func(message interface{})) AnswerChan { - ch := make(chan core.HandlerAnswer) - go func() { - for v := range ch { - marshal, err := json.Marshal(v) - if err != nil { - h.Logger.Error("Failed to marshal answer", zap.Error(err)) - } - publish(marshal) - } - }() - return ch -} - -func (h *SimpleHandler) processUpdate(update *telegramapi.Update, publish func(message interface{})) { - ch := h.createAnswerChan(publish) - h.Handler(h.Logger, update, ch) - close(ch) -} diff --git a/pkg/proxy/client.go b/pkg/proxy/client.go new file mode 100644 index 0000000..f5b1e41 --- /dev/null +++ b/pkg/proxy/client.go @@ -0,0 +1,95 @@ +package proxy + +import ( + "context" + "errors" + "fmt" + "regexp" + "time" + + "github.com/bbralion/CTFloodBot/internal/models" + "github.com/bbralion/CTFloodBot/pkg/services" + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" +) + +const defaultTimeout = time.Second * 5 + +// Client is the proxy client implementation +type Client struct { + // Handler is the telegram update handler to use + Handler Handler + // Matchers specify the matchers used to filter the requests which should be handled by this client + Matchers []string +} + +// RegisterAndRun registers the client using the given registrar, +// then handles the received updates by responding to them using the api +func (c *Client) RegisterAndRun(ctx context.Context, registrar services.Registrar, api *tgbotapi.BotAPI) error { + if c.Handler == nil || len(c.Matchers) < 1 { + return errors.New("handler and matchers must be specified for client") + } + + matchers := make(models.MatcherGroup, len(c.Matchers)) + for i, m := range c.Matchers { + var err error + if matchers[i], err = regexp.Compile(m); err != nil { + return fmt.Errorf("invalid matcher specified: %w", err) + } + } + + updateCh, err := registrar.Register(ctx, matchers) + if err != nil { + return fmt.Errorf("registering client: %w", err) + } + + for { + select { + case update := <-updateCh: + if update.Error != nil { + return fmt.Errorf("receiving updates: %w", err) + } + c.Handler.Serve(api, update.Update) + case <-ctx.Done(): + return nil + } + } +} + +// func (c *Client) connectGRPC() (*grpc.ClientConn, error) { +// if c.GRPCEndpoint == "" || c.Token == "" { +// return nil, errors.New("endpoint and token must not be empty") +// } + +// interceptor := auth.NewGRPCClientInterceptor(c.Token) +// conn, err := grpc.Dial(c.GRPCEndpoint, +// grpc.WithTransportCredentials(insecure.NewCredentials()), +// grpc.WithUnaryInterceptor(interceptor.Unary()), +// grpc.WithStreamInterceptor(interceptor.Stream())) +// if err != nil { +// return nil, fmt.Errorf("failed to dial proxy gRPC endpoint: %w", err) +// } +// return conn, nil +// } + +// func (c *Client) getConfig(ctx context.Context, gc genproto.MultiplexerServiceClient) (*genproto.Config, error) { +// ctx, cancel := context.WithTimeout(ctx, defaultTimeout) +// defer cancel() + +// config, err := gc.GetConfig(ctx, &genproto.ConfigRequest{}) +// if err != nil { +// return nil, fmt.Errorf("requesting config from gRPC server: %w", err) +// } +// return config.Config, nil +// } + +// func (c *Client) connectHTTP(endpoint string) (*tgbotapi.BotAPI, error) { +// api, err := tgbotapi.NewBotAPIWithClient(c.Token, endpoint, &http.Client{ +// Transport: &http.Transport{ +// ResponseHeaderTimeout: defaultTimeout, +// }, +// }) +// if err != nil { +// return nil, fmt.Errorf("failed to setup telegram bot api: %w", err) +// } +// return api, nil +// } diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go new file mode 100644 index 0000000..be6ab81 --- /dev/null +++ b/pkg/retry/retry.go @@ -0,0 +1,100 @@ +package retry + +import ( + "errors" + "fmt" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type ( + DelayScheduler func() time.Duration + ErrTransformer func(error) error +) + +type recoverError struct { + wrapped error +} + +func (e recoverError) Error() string { + if e.wrapped == nil { + return "temporary recoverable error" + } + return fmt.Sprintf("unrecoverable error: %s", e.wrapped.Error()) +} + +func (e recoverError) Unwrap() error { + return e.wrapped +} + +// Recoverable is used to explicitly mark a recovery +func Recoverable() error { + return recoverError{nil} +} + +// Unrecoverable wraps an error to indicate that it is not recoverable from, +// after which retries will be stopped and it will be returned +func Unrecoverable(err error) error { + return recoverError{err} +} + +// Recover runs the function using a custom delay scheduler +func Recover[T any](f func() (T, error), s DelayScheduler, et ...ErrTransformer) (T, error) { + for { + ret, err := f() + for _, t := range et { + err = t(err) + } + + var re recoverError + if err == nil { + return ret, nil + } else if errors.As(err, &re) && re.wrapped != nil { + return ret, re.wrapped + } + + time.Sleep(s()) + } +} + +const ( + DefaultBackoffMinDelay = time.Millisecond * 50 + DefaultBackoffMaxDelay = time.Minute * 10 + DefaultBackoffFactor = 2 +) + +// Backoff runs the function using the backoff retry algorithm +func Backoff[T any](f func() (T, error), et ...ErrTransformer) (T, error) { + delay, next := time.Duration(0), DefaultBackoffMinDelay + return Recover(f, func() time.Duration { + delay, next = next, next*DefaultBackoffFactor + if next > DefaultBackoffMaxDelay { + next = DefaultBackoffMaxDelay + } + return delay + }, et...) +} + +const DefaultStaticDelay = time.Second + +// Static runs the function using a static retry delay +func Static[T any](f func() (T, error), et ...ErrTransformer) (T, error) { + return Recover(f, func() time.Duration { + return DefaultStaticDelay + }, et...) +} + +// IsGRPCUnavailable is a helper for testing whether the error resembles a gRPC Unavailable status +func IsGRPCUnavailable(err error) bool { + s, ok := status.FromError(err) + return ok && s.Code() == codes.Unavailable +} + +// IsGRPCUnavailable is a helper for testing whether the error +// resembles a gRPC Canceled or DeadlineExceeded status +func IsGRPCCanceled(err error) bool { + s, ok := status.FromError(err) + return ok && (s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded) +} diff --git a/pkg/retry/retry_test.go b/pkg/retry/retry_test.go new file mode 100644 index 0000000..509a04d --- /dev/null +++ b/pkg/retry/retry_test.go @@ -0,0 +1,37 @@ +package retry + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/require" +) + +func assertNumCallsFunc(req *require.Assertions, n int, tmpErr, finalErr error) func() (any, error) { + ctr := 0 + return func() (any, error) { + req.Less(ctr, n, "should be called %d times at most", n) + ctr++ + if ctr == n { + return nil, finalErr + } + return nil, tmpErr + } +} + +func testStrategy(req *require.Assertions, n int, strategy func(func() (any, error), ...ErrTransformer) (any, error)) { + _, err := strategy(assertNumCallsFunc(req, n, errors.New("fake recoverable error"), nil)) + req.NoError(err) + e := errors.New("fake unrecoverable error") + _, err = strategy(assertNumCallsFunc(req, n, errors.New("fake recoverable error"), Unrecoverable(e))) + req.ErrorIs(e, err) +} + +func TestRetry(t *testing.T) { + req := require.New(t) + + for i := 1; i < 4; i++ { + testStrategy(req, i, Backoff[any]) + testStrategy(req, i, Static[any]) + } +} diff --git a/pkg/services/authenticator.go b/pkg/services/authenticator.go new file mode 100644 index 0000000..8204504 --- /dev/null +++ b/pkg/services/authenticator.go @@ -0,0 +1,36 @@ +package services + +import "errors" + +// Client is an identification of a single client of a service +type Client struct { + Name string +} + +var ErrInvalidToken = errors.New("invalid authentication token provided") + +// Authenticator represents a token-based authentication provider +type Authenticator interface { + Authenticate(token string) (Client, error) +} + +type staticAuthenticator struct { + clients map[string]Client +} + +func (p *staticAuthenticator) Authenticate(token string) (Client, error) { + if c, ok := p.clients[token]; ok { + return c, nil + } + return Client{}, ErrInvalidToken +} + +// NewStaticAuthenticator returns an authenticator which authenticates clients +// using a static token->Client map specified at creation time +func NewStaticAuthenticator(clients map[string]Client) Authenticator { + a := &staticAuthenticator{make(map[string]Client, len(clients))} + for k, v := range clients { + a.clients[k] = v + } + return a +} diff --git a/pkg/services/authenticator_test.go b/pkg/services/authenticator_test.go new file mode 100644 index 0000000..9d1a705 --- /dev/null +++ b/pkg/services/authenticator_test.go @@ -0,0 +1,64 @@ +package services + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func Test_staticAuthenticator_Authenticate(t *testing.T) { + type args struct { + token string + } + tests := []struct { + name string + clients map[string]Client + args args + want Client + wantErr bool + }{ + { + name: "nil client map", + clients: nil, + args: args{"faketoken"}, + wantErr: true, + }, + { + name: "empty client map", + clients: map[string]Client{}, + args: args{"faketoken"}, + wantErr: true, + }, + { + name: "valid token", + clients: map[string]Client{ + "goodtoken": {Name: "client1"}, + }, + args: args{"goodtoken"}, + want: Client{Name: "client1"}, + wantErr: false, + }, + { + name: "invalid token", + clients: map[string]Client{ + "goodtoken": {Name: "client1"}, + }, + args: args{"badtoken"}, + wantErr: true, + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + req := require.New(t) + + p := NewStaticAuthenticator(tt.clients) + got, err := p.Authenticate(tt.args.token) + if tt.wantErr { + req.Error(err) + } + req.Equal(tt.want, got) + }) + } +} diff --git a/pkg/services/longpoll.go b/pkg/services/longpoll.go new file mode 100644 index 0000000..2a86f70 --- /dev/null +++ b/pkg/services/longpoll.go @@ -0,0 +1,169 @@ +package services + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "path" + "strconv" + "time" +) + +// DefaultLongPollTimeout is the default timeout used for long polling +const DefaultLongPollTimeout = time.Second * 60 + +// LongPollOptions specifies various options to use inside the long poll streamer. +// Offset, Limit and Timeout specify the values to send to Telegram's getUpdates method. +// By default a Timeout of DefaultLongPollTimeout is used, and DefaultCapacity will be used as the default Limit. +type LongPollOptions struct { + Offset int + Limit int + Timeout time.Duration + Client *http.Client +} + +type longPollStreamer struct { + opts LongPollOptions + endpointURL *url.URL + params url.Values +} + +func (s *longPollStreamer) poll(ctx context.Context) (*http.Response, error) { + s.endpointURL.RawQuery = s.params.Encode() + ctx, cancel := context.WithTimeout(ctx, s.opts.Timeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, "GET", s.endpointURL.String(), nil) + if err != nil { + return nil, fmt.Errorf("preparing request: %w", err) + } + + resp, err := s.opts.Client.Do(req) + if err != nil { + // Unwrap url.Error returned from do to avoid leaking url with bot token + return nil, fmt.Errorf("doing poll request: %w", errors.Unwrap(err)) + } else if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("bad response code while polling: %s", resp.Status) + } + return resp, nil +} + +// readRespones tries to read the response in the fastest way possible. That is, if ContentLength +// is set, then we can use it in order to allocate a buffer wit the correct size from the get-go. +func (s *longPollStreamer) readResponse(resp *http.Response) (buf []byte, err error) { + defer resp.Body.Close() + + if resp.ContentLength != -1 { + buf = make([]byte, resp.ContentLength) + _, err = io.ReadFull(resp.Body, buf) + } else { + buf, err = io.ReadAll(resp.Body) + } + return +} + +func (s *longPollStreamer) parseUpdates(ctx context.Context, stream chan<- Maybe[RawUpdate], resp *http.Response) error { + buf, err := s.readResponse(resp) + if err != nil { + return fmt.Errorf("reading getUpdates response: %w", err) + } + + // Parse API response wrapper. The updates here are parsed as RawUpdate's, which simply set + // their value to the correct portion of the prepared buffer, bypassing an extra copy. + var apiResp struct { + Ok bool + Description string + Result []RawUpdate + } + if err := json.Unmarshal(buf, &apiResp); err != nil { + return fmt.Errorf("parsing getUpdates response: %w", err) + } + if !apiResp.Ok { + return fmt.Errorf("getUpdates response.Ok is false: %s", apiResp.Description) + } + + for _, u := range apiResp.Result { + select { + case <-ctx.Done(): + return nil + default: + } + stream <- Maybe[RawUpdate]{Value: u} + } + if len(apiResp.Result) > 0 { + var updateWrapper struct { + UpdateID int `json:"update_id"` + } + if err := json.Unmarshal(apiResp.Result[len(apiResp.Result)-1], &updateWrapper); err != nil { + return fmt.Errorf("retrieving update_id: %w", err) + } + s.params.Set("offset", strconv.Itoa(updateWrapper.UpdateID+1)) + } + return nil +} + +func (s *longPollStreamer) Stream(ctx context.Context) RawStream { + stream := make(chan Maybe[RawUpdate], s.opts.Limit) + go func() { + defer close(stream) + + for { + resp, err := s.poll(ctx) + if err != nil { + // If the context is finished, then the error we received is *probably* a timeout/canceled + select { + case <-ctx.Done(): + return + default: + } + + // Global context isn't finished, which means that this is a temporary timeout + if errors.Is(err, context.DeadlineExceeded) { + stream <- Maybe[RawUpdate]{Error: fmt.Errorf("temporary timeout while polling: %w", err)} + continue + } + stream <- Maybe[RawUpdate]{Error: err} + return + } + + if err := s.parseUpdates(ctx, stream, resp); err != nil { + stream <- Maybe[RawUpdate]{Error: err} + return + } + } + }() + return stream +} + +// NewLongPollStreamer starts a long polling streamer on the given endpoint in the form +// "https://api.telegram.org" using the specified token for authorization. +// Long poll requests will be made to "{endpoint}/bot{token}/getUpdates". +func NewLongPollStreamer(endpoint, token string, opts LongPollOptions) (RawStreamer, error) { + endpointURL, err := url.Parse(endpoint) + if err != nil { + return nil, fmt.Errorf("invalid long poll endpoint: %w", err) + } + + // Set proper defaults in options and client + if opts.Timeout == 0 { + opts.Timeout = DefaultLongPollTimeout + } + if opts.Limit == 0 { + opts.Limit = DefaultCapacity + } + if opts.Client == nil { + opts.Client = http.DefaultClient + } + + // Timeout is always set to avoid short polling, other opts are used only if set + endpointURL.Path = path.Join(endpointURL.Path, "bot"+token, "getUpdates") + params := make(url.Values) + params.Set("timeout", strconv.Itoa(int(opts.Timeout.Seconds()))) + params.Set("limit", strconv.Itoa(opts.Limit)) + params.Set("offset", strconv.Itoa(opts.Offset)) + return &longPollStreamer{opts, endpointURL, params}, nil +} diff --git a/pkg/services/registrar.go b/pkg/services/registrar.go new file mode 100644 index 0000000..56c3466 --- /dev/null +++ b/pkg/services/registrar.go @@ -0,0 +1,103 @@ +package services + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/bbralion/CTFloodBot/internal/genproto" + "github.com/bbralion/CTFloodBot/internal/models" + "github.com/bbralion/CTFloodBot/pkg/retry" + "github.com/go-logr/logr" + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" +) + +// Registrar allows registration of command handlers for subsequent receival of updates +type Registrar interface { + // Register registers a new command handler with the given matchers. + // The context should span the lifetime of the registered handler and canceled when it dies. + Register(ctx context.Context, matchers models.MatcherGroup) (models.UpdateChan, error) +} + +// gRPCRegistrar is an implementation of Registrar using grpc with retries +type gRPCRegistrar struct { + logger logr.Logger + client genproto.MultiplexerServiceClient +} + +func (r *gRPCRegistrar) tryRegister(ctx context.Context, request *genproto.RegisterRequest, updateCh chan models.PossibleUpdate) error { + stream, err := retry.Backoff(func() (genproto.MultiplexerService_RegisterHandlerClient, error) { + stream, err := r.client.RegisterHandler(ctx, request) + if err == nil { + return stream, nil + } + if retry.IsGRPCUnavailable(err) { + r.logger.Error(err, "gRPC registrar retrying connection to server") + return nil, retry.Recoverable() + } + return nil, retry.Unrecoverable(err) + }) + if err != nil { + return fmt.Errorf("registering handler: %w", err) + } + + for { + updatePB, err := stream.Recv() + if err != nil { + return fmt.Errorf("receiving update: %w", err) + } + + var update tgbotapi.Update + if err := json.Unmarshal([]byte(updatePB.Json), &update); err != nil { + return fmt.Errorf("unmarshaling update json: %w", err) + } + + select { + case updateCh <- models.PossibleUpdate{Update: update}: + case <-ctx.Done(): + return nil + } + } +} + +func (r *gRPCRegistrar) Register(ctx context.Context, matchers models.MatcherGroup) (models.UpdateChan, error) { + if len(matchers) < 1 { + return nil, errors.New("cannot register with zero matchers") + } + + request := &genproto.RegisterRequest{ + Matchers: make([]string, len(matchers)), + } + for i, m := range matchers { + request.Matchers[i] = m.String() + } + + updateCh := make(chan models.PossibleUpdate) + go func() { + defer close(updateCh) + + _, err := retry.Static(func() (any, error) { + err := r.tryRegister(ctx, request, updateCh) + if uw := errors.Unwrap(err); uw == nil || retry.IsGRPCCanceled(uw) { + return nil, nil + } else if retry.IsGRPCUnavailable(uw) { + r.logger.Error(err, "gRPC registrar reconnecting stream") + return nil, retry.Recoverable() + } + return nil, retry.Unrecoverable(err) + }) + if err != nil { + updateCh <- models.PossibleUpdate{Error: err} + } + }() + return updateCh, nil +} + +// NewGRPCRegistrar creates a Registrar based on the gRPC API client with preconfigured retries +func NewGRPCRegistrar(logger logr.Logger, client genproto.MultiplexerServiceClient) Registrar { + if logger == (logr.Logger{}) { + logger = logr.Discard() + } + return &gRPCRegistrar{logger.WithName("registrar"), client} +} diff --git a/pkg/services/registrar_test.go b/pkg/services/registrar_test.go new file mode 100644 index 0000000..741b384 --- /dev/null +++ b/pkg/services/registrar_test.go @@ -0,0 +1,165 @@ +package services + +import ( + "context" + "regexp" + "testing" + "time" + + "github.com/bbralion/CTFloodBot/internal/genproto" + "github.com/bbralion/CTFloodBot/internal/mocks" + "github.com/bbralion/CTFloodBot/internal/models" + "github.com/go-logr/logr" + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func Test_gRPCRegistrar_Register(t *testing.T) { + type args struct { + matchers models.MatcherGroup + } + type streamUpdate struct { + update *genproto.Update + err error + } + type registerResponse struct { + stream []streamUpdate + err error + } + type possibleUpdate struct { + update tgbotapi.Update + err bool + } + + tests := []struct { + name string + args args + registerResponses []registerResponse + want []possibleUpdate + wantErr bool + }{ + { + name: "registration with nil matchers", + args: args{matchers: nil}, + want: nil, + wantErr: true, + }, + { + name: "registration with no matchers", + args: args{matchers: models.MatcherGroup{}}, + want: nil, + wantErr: true, + }, + { + name: "unrecoverable registration error", + args: args{matchers: models.MatcherGroup{regexp.MustCompile("/command")}}, + registerResponses: []registerResponse{{err: status.Error(codes.Unauthenticated, "unauthenticated")}}, + want: []possibleUpdate{{err: true}}, + }, + { + name: "retries during registration", + args: args{matchers: models.MatcherGroup{regexp.MustCompile("^/command"), regexp.MustCompile("^.*$")}}, + registerResponses: []registerResponse{ + {err: status.Error(codes.Unavailable, "temporarily unavailable")}, + {err: status.Error(codes.Unavailable, "starting")}, + {err: nil, stream: []streamUpdate{ + {update: &genproto.Update{Json: `{"update_id":1}`}}, + {update: &genproto.Update{Json: `{"update_id":2}`}}, + {err: status.FromContextError(context.Canceled).Err()}, + }}, + }, + want: []possibleUpdate{ + {update: tgbotapi.Update{UpdateID: 1}}, + {update: tgbotapi.Update{UpdateID: 2}}, + }, + }, + { + name: "invalid json in update", + args: args{matchers: models.MatcherGroup{regexp.MustCompile("^/command"), regexp.MustCompile("^.*$")}}, + registerResponses: []registerResponse{ + {err: nil, stream: []streamUpdate{ + {update: &genproto.Update{Json: `{bad}`}}, + }}, + }, + want: []possibleUpdate{ + {err: true}, + }, + }, + { + name: "reconnect after stream fail", + args: args{matchers: models.MatcherGroup{regexp.MustCompile("^/aboba$")}}, + registerResponses: []registerResponse{ + {err: nil, stream: []streamUpdate{ + {update: &genproto.Update{Json: `{"update_id":1}`}}, + {err: status.Error(codes.Unavailable, "stream broken")}, + }}, + {err: nil, stream: []streamUpdate{ + {update: &genproto.Update{Json: `{"update_id":2}`}}, + {err: status.FromContextError(context.DeadlineExceeded).Err()}, + }}, + }, + want: []possibleUpdate{ + {update: tgbotapi.Update{UpdateID: 1}}, + {update: tgbotapi.Update{UpdateID: 2}}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + ctrl, req := gomock.NewController(t), require.New(t) + defer ctrl.Finish() + + mockMuxClient := mocks.NewMockMultiplexerServiceClient(ctrl) + r := NewGRPCRegistrar(logr.Logger{}, mockMuxClient) + + reqMatchers := make([]string, len(tt.args.matchers)) + for i, m := range tt.args.matchers { + reqMatchers[i] = m.String() + } + + ctx := context.Background() + for i := range tt.registerResponses { + resp := tt.registerResponses[i] + stream := mocks.NewMockMultiplexerService_RegisterHandlerClient(ctrl) + for _, u := range resp.stream { + stream.EXPECT().Recv().Return(u.update, u.err) + } + mockMuxClient.EXPECT().RegisterHandler(ctx, &genproto.RegisterRequest{Matchers: reqMatchers}).Return(stream, resp.err) + } + + updateCh, err := r.Register(ctx, tt.args.matchers) + req.Equal(tt.wantErr, err != nil) + + left := len(tt.want) + req.Eventually(func() bool { + select { + case update, ok := <-updateCh: + if !ok { + req.Zero(left, "less updates on channel than wanted") + return true + } + + req.NotZero(left, "more updates on channel than wanted") + want := &tt.want[len(tt.want)-left] + req.Equal(want.err, update.Error != nil) + req.Equal(want.update, update.Update) + left-- + default: + } + return updateCh == nil + }, time.Second*5, time.Millisecond*50) + }) + } +} + +func TestMain(m *testing.M) { + goleak.VerifyTestMain(m) +} diff --git a/pkg/services/stream.go b/pkg/services/stream.go new file mode 100644 index 0000000..3c67f20 --- /dev/null +++ b/pkg/services/stream.go @@ -0,0 +1,92 @@ +package services + +import ( + "context" + "encoding/json" + "runtime" + "sync" + + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" +) + +// DefaultCapacity is the default capacity which should be passed into Map* and As* functions. +// 100 is selected because Telegram's long polling API limits number of updates received to 100. +const DefaultCapacity = 100 + +// DefaultDecodeBufferSize specifies the default buffer size used for decoding responses +// in various streamers. Can be changed to, well, change the default buffer size. +var DefaultDecodeBufferSize = 256 << 10 + +// Maybe defines a type that either contains a value or an error. +type Maybe[T any] struct { + Value T + Error error +} + +// Stream is a readonly channel of some type. +type Stream[T any] <-chan T + +// MappedStream maps a stream to a stream in parallel using the given mapper. runtime.NumCPU() goroutines +// are used for mapping, and the returned stream will have the same capacity as the input stream. +// The output order after processing is not synchronized or defined. +func MappedStream[T, K any](in Stream[Maybe[T]], mapper func(T) (K, error)) Stream[Maybe[K]] { + var wg sync.WaitGroup + n := runtime.NumCPU() + out := make(chan Maybe[K], cap(in)) + + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + for { + job, ok := <-in + if !ok { + return + } + + result := Maybe[K]{ + Error: job.Error, + } + if result.Error == nil { + result.Value, result.Error = mapper(job.Value) + } + out <- result + } + }() + } + + go func() { + wg.Wait() + close(out) + }() + return out +} + +// RawUpdate is the raw JSON representation of an Update message. +type RawUpdate []byte + +// UnmarshalJSON is like json.RawMessage's UnmarshalJSON, however instead of +// copying the data it simply assigns it. Specifically, this means that +// the data used during decoding should not be reused elsewhere afterwards (i.e. no sync.Pool) +func (r *RawUpdate) UnmarshalJSON(m []byte) error { + *r = m + return nil +} + +// RawStream is a stream of raw updates. +type RawStream Stream[Maybe[RawUpdate]] + +// AsTgBotAPI converts a RawStream into a stream of tgbotapi-style updates. +func (s RawStream) AsTgBotAPI() Stream[Maybe[tgbotapi.Update]] { + return MappedStream(Stream[Maybe[RawUpdate]](s), func(u RawUpdate) (tu tgbotapi.Update, err error) { + err = json.Unmarshal([]byte(u), &tu) + return + }) +} + +// RawStreamer is a provider of RawUpdate's updates via an unbuffered stream. +type RawStreamer interface { + // Stream launches a single instance of the streamer. In general, it isn't safe to use this concurrently. + // On context cancelation/deadline the streamer must stop streaming and close the stream. + Stream(ctx context.Context) RawStream +} diff --git a/pkg/services/stream_test.go b/pkg/services/stream_test.go new file mode 100644 index 0000000..ebf949f --- /dev/null +++ b/pkg/services/stream_test.go @@ -0,0 +1,214 @@ +package services + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "reflect" + "strings" + "sync" + "testing" + "time" + + tgbotapi "github.com/go-telegram-bot-api/telegram-bot-api" + "github.com/stretchr/testify/require" +) + +// streamContains validates that a stream contains the wanted values in any order +func streamContains[T any](req *require.Assertions, s Stream[T], want []T) { + got := make([]bool, len(want)) + req.Eventually(func() bool { + select { + case val, ok := <-s: + if !ok { + for _, v := range got { + req.True(v, "not all wanted values found on stream") + } + return true + } + + var foundUnused bool + for i, v := range got { + if !v && reflect.DeepEqual(want[i], val) { + got[i], foundUnused = true, true + break + } + } + req.True(foundUnused, "redundant value found on stream: %q", val) + default: + } + return false + }, time.Hour*5, time.Millisecond*50) +} + +func TestRawStream_AsTgBotApi(t *testing.T) { + tests := []struct { + name string + updates []Maybe[RawUpdate] + want []Maybe[tgbotapi.Update] + }{ + { + name: "no values", + updates: nil, + want: nil, + }, + { + name: "values and error", + updates: []Maybe[RawUpdate]{ + {Value: RawUpdate(`{"update_id":1,"inline_query":{"query":"inline-query-test"}}`)}, + {Value: RawUpdate(`{"update_id":2,"message":{"text":"message-test"}}`)}, + {Error: errors.New("connection error")}, + }, + want: []Maybe[tgbotapi.Update]{ + {Value: tgbotapi.Update{UpdateID: 1, InlineQuery: &tgbotapi.InlineQuery{Query: "inline-query-test"}}}, + {Value: tgbotapi.Update{UpdateID: 2, Message: &tgbotapi.Message{Text: "message-test"}}}, + {Error: errors.New("connection error")}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + req := require.New(t) + + stream := make(chan Maybe[RawUpdate], DefaultCapacity) + go func() { + defer close(stream) + for _, u := range tt.updates { + stream <- u + } + }() + + streamContains(req, RawStream(stream).AsTgBotAPI(), tt.want) + }) + } +} + +const ( + longPollTestMessagesTotal = 10000 + longPollTestMessagesPerResponse = 100 +) + +func encodeAPIResponse(data any) ([]byte, error) { + response := struct { + Ok bool + Result any + }{ + Ok: true, + Result: data, + } + + b, err := json.Marshal(response) + if err != nil { + return nil, fmt.Errorf("marshaling api response: %w", err) + } + return b, nil +} + +type longPollTestRoundTripper struct { + data [][]byte + mu sync.Mutex + i int +} + +func (rt *longPollTestRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + select { + case <-req.Context().Done(): + return nil, fmt.Errorf("context done: %w", req.Context().Err()) + default: + } + response := func(b []byte) *http.Response { + return &http.Response{ + StatusCode: http.StatusOK, + ContentLength: int64(len(b)), + Body: io.NopCloser(bytes.NewBuffer(b)), + } + } + + if strings.HasSuffix(req.URL.Path, "getMe") { + b, err := encodeAPIResponse(tgbotapi.User{ID: 1}) + if err != nil { + return nil, err + } + return response(b), nil + } + + var cur int + rt.mu.Lock() + cur, rt.i = rt.i, (rt.i+1)%len(rt.data) + rt.mu.Unlock() + return response(rt.data[cur]), nil +} + +func newLongPollTestClient(req *require.Assertions, n int) *http.Client { + data := make([][]byte, n*longPollTestMessagesTotal/longPollTestMessagesPerResponse) + for i := range data { + updates := make([]tgbotapi.Update, longPollTestMessagesPerResponse) + for j := range updates { + updates[j] = tgbotapi.Update{UpdateID: i*(longPollTestMessagesPerResponse) + j} + } + + raw, err := encodeAPIResponse(updates) + req.NoError(err) + data[i] = raw + } + return &http.Client{Transport: &longPollTestRoundTripper{data: data}} +} + +func longPollTestValidate(b *testing.B, s Stream[Maybe[tgbotapi.Update]]) { + b.Helper() + cnt, end := 0, longPollTestMessagesTotal*b.N + for range s { + cnt++ + if cnt == end { + break + } + } +} + +// Benchmark of simple long polling using a single goroutine receiving messages and decoding them +func BenchmarkNaiveLongPoll(b *testing.B) { + req := require.New(b) + b.StopTimer() + b.Logf("Decoding %d messages %d times", longPollTestMessagesTotal, b.N) + api, err := tgbotapi.NewBotAPIWithClient("aboba", tgbotapi.APIEndpoint, newLongPollTestClient(req, b.N)) + req.NoError(err) + + b.StartTimer() + updateCh, _ := api.GetUpdatesChan(tgbotapi.UpdateConfig{}) + stream := make(chan Maybe[tgbotapi.Update]) + go func() { + defer close(stream) + for { + v, ok := <-updateCh + if !ok { + return + } + + stream <- Maybe[tgbotapi.Update]{Value: v} + } + }() + longPollTestValidate(b, stream) + api.StopReceivingUpdates() +} + +// Benchmark of long poll using parallelized json decoding +func BenchmarkOptimizedLongPoll(b *testing.B) { + req := require.New(b) + b.StopTimer() + b.Logf("Decoding %d messages %d times", longPollTestMessagesTotal, b.N) + streamer, err := NewLongPollStreamer("https://api.telegram.org", "aboba", LongPollOptions{Client: newLongPollTestClient(req, b.N)}) + req.NoError(err) + ctx, cancel := context.WithCancel(context.Background()) + + b.StartTimer() + stream := streamer.Stream(ctx).AsTgBotAPI() + longPollTestValidate(b, stream) + cancel() +} diff --git a/pkg/utils/logger.go b/pkg/utils/logger.go deleted file mode 100644 index 3f21554..0000000 --- a/pkg/utils/logger.go +++ /dev/null @@ -1,25 +0,0 @@ -package utils - -import ( - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -func GetLogger() *zap.Logger { - loggerConfig := zap.Config{ - Level: zap.NewAtomicLevelAt(zap.DebugLevel), - Encoding: "json", - OutputPaths: []string{"stdout"}, - ErrorOutputPaths: []string{"stderr"}, - EncoderConfig: zapcore.EncoderConfig{ - MessageKey: "message", - LevelKey: "level", - EncodeLevel: zapcore.LowercaseLevelEncoder, - }, - } - logger, err := loggerConfig.Build() - if err != nil { - panic(err) - } - return logger -} diff --git a/pkg/utils/telegram.go b/pkg/utils/telegram.go deleted file mode 100644 index 8d77c22..0000000 --- a/pkg/utils/telegram.go +++ /dev/null @@ -1,27 +0,0 @@ -package utils - -import ( - telegramapi "github.com/go-telegram-bot-api/telegram-bot-api" - "strings" -) - -func addIfNotNil(slice []string, name string, objectIsNil bool) []string { - if !objectIsNil { - return append(slice, name) - } - return slice -} - -func GetTelegramUpdateType(update *telegramapi.Update) string { - var contents []string - contents = addIfNotNil(contents, "message", update.Message == nil) - contents = addIfNotNil(contents, "edited_message", update.EditedMessage == nil) - contents = addIfNotNil(contents, "channel_post", update.ChannelPost == nil) - contents = addIfNotNil(contents, "edited_channel_post", update.EditedChannelPost == nil) - contents = addIfNotNil(contents, "inline_query", update.InlineQuery == nil) - contents = addIfNotNil(contents, "chosen_inline_result", update.ChosenInlineResult == nil) - contents = addIfNotNil(contents, "callback_query", update.CallbackQuery == nil) - contents = addIfNotNil(contents, "shipping_query", update.ShippingQuery == nil) - contents = addIfNotNil(contents, "pre_checkout_query", update.PreCheckoutQuery == nil) - return strings.Join(contents, ",") -}