Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/shortener/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ func main() {

go func() {
if err := application.Run(); err != nil {
log.Fatalf("Error running application.\n%s", err.Error())
if err.Error() != "http: Server closed" {
log.Printf("Error running application: %s", err.Error())
}
}
}()

Expand All @@ -40,7 +42,6 @@ func main() {
case <-shut:
log.Print("Shutdown process is completed!")
case <-ctx.Done():
log.Print("Shutdown process is failed! Force shutdown")
os.Exit(1)
log.Print("Shutdown process timeout - exiting anyway")
}
}
70 changes: 63 additions & 7 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/anon-d/urlshortener/internal/repository/local"
"github.com/anon-d/urlshortener/internal/service"
serviceCache "github.com/anon-d/urlshortener/internal/service/cache"
"github.com/anon-d/urlshortener/internal/worker"
)

type App struct {
server *http.Server
router *gin.Engine
urlHandler *handler.URLHandler
server *http.Server
router *gin.Engine
urlHandler *handler.URLHandler
deleteWorker *worker.DeleteWorker
}

func New() (*App, error) {
Expand Down Expand Up @@ -72,7 +74,27 @@ func New() (*App, error) {
}

svc := service.New(cacheService, storage, log)
urlHandler := handler.NewURLHandler(svc, cfg.AddrURL, log)

// worker
deleteWorkerAdapter := &deleteStorageAdapter{storage: storage}
deleteWorker := worker.NewDeleteWorker(
deleteWorkerAdapter,
100, // buffer size
500*time.Millisecond, // flush interval
log,
)

// Создаем динамическое количество каналов на основе конфига
deleteChannels := make([]chan handler.DeleteRequest, cfg.DeleteWorkerCount)
for i := 0; i < cfg.DeleteWorkerCount; i++ {
deleteChannels[i] = make(chan handler.DeleteRequest, cfg.DeleteChannelSize)
workerChan := convertDeleteChannel(deleteChannels[i])
deleteWorker.AddChannel(workerChan)
}
deleteWorker.Start()

// канал для handler
urlHandler := handler.NewURLHandler(svc, cfg.AddrURL, log, deleteChannels[0])

// init Gin and http
if cfg.Env == "release" {
Expand Down Expand Up @@ -102,6 +124,7 @@ func New() (*App, error) {

// middleware
router.Use(middleware.GlobalMiddleware(log)...)
router.Use(middleware.AuthMiddleware(cfg.SecretKey))

router.HandleMethodNotAllowed = true

Expand All @@ -111,12 +134,37 @@ func New() (*App, error) {
}

return &App{
server: httpServer,
router: router,
urlHandler: urlHandler,
server: httpServer,
router: router,
urlHandler: urlHandler,
deleteWorker: deleteWorker,
}, nil
}

// deleteStorageAdapter адаптер для работы с Storage через интерфейс DeleteStorage
type deleteStorageAdapter struct {
storage repository.Storage
}

func (d *deleteStorageAdapter) BatchMarkAsDeleted(ctx context.Context, requests []worker.DeleteRequest) error {
return d.storage.BatchMarkAsDeleted(ctx, requests)
}

// handler.DeleteRequest -> worker.DeleteRequest
func convertDeleteChannel(in <-chan handler.DeleteRequest) <-chan worker.DeleteRequest {
out := make(chan worker.DeleteRequest)
go func() {
for req := range in {
out <- worker.DeleteRequest{
UserID: req.UserID,
ShortURL: req.ShortURL,
}
}
close(out)
}()
return out
}

func (a *App) SetupRoutes() {
maintenance := a.router.Group("/maintenance")
{
Expand All @@ -128,6 +176,8 @@ func (a *App) SetupRoutes() {
a.router.POST("/api/shorten", a.urlHandler.Shorten)
a.router.GET("/ping", a.urlHandler.PingDB)
a.router.POST("/api/shorten/batch", a.urlHandler.BatchShorten)
a.router.GET("/api/user/urls", a.urlHandler.GetUserURLs)
a.router.DELETE("/api/user/urls", a.urlHandler.DeleteURLs)
a.router.NoMethod(a.urlHandler.NotAllowed)
a.router.NoRoute(a.urlHandler.NotFound)

Expand All @@ -138,4 +188,10 @@ func (a *App) Run() error {
}

func (a *App) Shutdown(ctx context.Context) {
if a.deleteWorker != nil {
a.deleteWorker.Stop()
}
if err := a.server.Shutdown(ctx); err != nil {
fmt.Println("Failed shutdown. error: ", err)
}
}
62 changes: 51 additions & 11 deletions internal/config/flag/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,31 @@ package config
import (
"flag"
"os"
"strconv"
"sync"
)

type ServerConfig struct {
AddrServer string `env:"SERVER_ADDRESS"`
AddrURL string `env:"BASE_URL"`
Env string `env:"ENV"`
File string `env:"FILE_STORAGE_PATH"`
DSN string `env:"DATABASE_DSN"`
AddrServer string `env:"SERVER_ADDRESS"`
AddrURL string `env:"BASE_URL"`
Env string `env:"ENV"`
File string `env:"FILE_STORAGE_PATH"`
DSN string `env:"DATABASE_DSN"`
DeleteWorkerCount int `env:"DELETE_WORKER_COUNT"`
DeleteChannelSize int `env:"DELETE_CHANNEL_SIZE"`
SecretKey string `env:"SECRET_KEY"`
}

var (
addrServer *string
addrURL *string
envValue *string
fileValue *string
dsnValue *string
flagsOnce sync.Once
addrServer *string
addrURL *string
envValue *string
fileValue *string
dsnValue *string
deleteWorkerCount *int
deleteChannelSize *int
secretKey *string
flagsOnce sync.Once
)

func initFlags() {
Expand All @@ -29,6 +36,9 @@ func initFlags() {
envValue = flag.String("e", "dev", "environment")
fileValue = flag.String("f", "data.json", "file to store data")
dsnValue = flag.String("d", "", "database DSN")
deleteWorkerCount = flag.Int("w", 2, "number of delete worker channels")
deleteChannelSize = flag.Int("c", 1000, "size of each delete channel buffer")
secretKey = flag.String("s", "my-super-secret-key-change-in-production", "secret key for signing cookies")
}

func NewServerConfig() *ServerConfig {
Expand Down Expand Up @@ -70,5 +80,35 @@ func NewServerConfig() *ServerConfig {
cfg.DSN = *dsnValue
}

if envWorkerCount, ok := os.LookupEnv("DELETE_WORKER_COUNT"); ok {
if count, err := parseIntFromEnv(envWorkerCount); err == nil {
cfg.DeleteWorkerCount = count
} else {
cfg.DeleteWorkerCount = *deleteWorkerCount
}
} else {
cfg.DeleteWorkerCount = *deleteWorkerCount
}

if envChannelSize, ok := os.LookupEnv("DELETE_CHANNEL_SIZE"); ok {
if size, err := parseIntFromEnv(envChannelSize); err == nil {
cfg.DeleteChannelSize = size
} else {
cfg.DeleteChannelSize = *deleteChannelSize
}
} else {
cfg.DeleteChannelSize = *deleteChannelSize
}

if envSecretKey, ok := os.LookupEnv("SECRET_KEY"); ok {
cfg.SecretKey = envSecretKey
} else {
cfg.SecretKey = *secretKey
}

return cfg
}

func parseIntFromEnv(s string) (int, error) {
return strconv.Atoi(s)
}
Loading