From 85d8aa6d1fe538dc07cbcfa4bd8073d1c5fa46fe Mon Sep 17 00:00:00 2001 From: romanchechyotkin Date: Thu, 16 Oct 2025 14:08:21 +0300 Subject: [PATCH] refactoring --- control_plane/cmd/main.go | 10 +- control_plane/internal/http/api.go | 46 +++---- docker-compose.yaml | 2 +- invoicer/cmd/main.go | 127 ++++++++++++------ .../20251012191647_funcion_metrics_kafka.sql | 5 +- .../20251012194535_funcion_metrics_local.sql | 5 +- .../20251012194539_funcion_metrics_mv.sql | 5 +- notifier/cmd/main.go | 17 ++- pkg/clickhouse/client.go | 4 + pkg/types/types.go | 3 + price_service/docs/docs.go | 16 ++- price_service/docs/swagger.json | 11 ++ price_service/docs/swagger.yaml | 8 ++ .../internal/controller/v1/request/request.go | 18 +-- .../internal/controller/v1/tariff.go | 18 +-- price_service/internal/entity/entity.go | 15 ++- price_service/internal/repo/tariff/tariff.go | 54 ++++++-- price_service/internal/service/tariffs.go | 27 ++-- .../migrations/00001_create_table_tariffs.sql | 3 +- setup.sh | 2 +- 20 files changed, 263 insertions(+), 133 deletions(-) diff --git a/control_plane/cmd/main.go b/control_plane/cmd/main.go index 70af281..bf55ac3 100644 --- a/control_plane/cmd/main.go +++ b/control_plane/cmd/main.go @@ -46,19 +46,19 @@ func run() { os.Exit(1) } - actionsTopic := os.Getenv("FUNCTION_ACTIONS_TOPIC") - if actionsTopic == "" { - actionsTopic = "function_actions" + metricsTopic := os.Getenv("FUNCTION_METRICS_TOPIC") + if metricsTopic == "" { + metricsTopic = "function_metrics" } brokers := strings.Split(addresses, ",") - actionsProducer := kafka.NewProducer(kafka.ProducerConfig{Topic: actionsTopic, Addrs: brokers}) + metricsProducer := kafka.NewProducer(kafka.ProducerConfig{Topic: metricsTopic, Addrs: brokers}) mux := http.NewServeMux() mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) - api := httpapi.New(cfg, actionsProducer, restCfg) + api := httpapi.New(cfg, metricsProducer, restCfg) api.Register(mux) slog.Info("control plane listening", slog.String("addr", cfg.HTTP.Addr)) diff --git a/control_plane/internal/http/api.go b/control_plane/internal/http/api.go index f6ee3d8..e98e757 100644 --- a/control_plane/internal/http/api.go +++ b/control_plane/internal/http/api.go @@ -13,17 +13,18 @@ import ( "github.com/segmentio/kafka-go" "github.com/usamaroman/faas_demo/control_plane/internal/config" "github.com/usamaroman/faas_demo/pkg/knative" + "github.com/usamaroman/faas_demo/pkg/types" "k8s.io/client-go/rest" ) type API struct { - cfg config.Config - producer *kafka.Writer // optional - restCfg *rest.Config + cfg config.Config + metricsProducer *kafka.Writer // optional + restCfg *rest.Config } func New(cfg config.Config, producer *kafka.Writer, restCfg *rest.Config) *API { - return &API{cfg: cfg, producer: producer, restCfg: restCfg} + return &API{cfg: cfg, metricsProducer: producer, restCfg: restCfg} } func (a *API) Register(mux *http.ServeMux) { @@ -118,13 +119,19 @@ func (a *API) handleRun(w http.ResponseWriter, r *http.Request) { return } - if a.producer != nil && a.cfg.Kafka.Topic != "" { - evt := map[string]any{ - "tenant": req.Email, - "pod": name, - "ts": time.Now().UTC().Format(time.RFC3339Nano), + if a.metricsProducer != nil { + now := time.Now().UTC().Unix() + evt := types.Metric{ + Pod: name, + CPUPercent: 0, + MemMB: 0, + Timestamp: now, + Tenant: req.Email, + Type: "start", + StartTime: now, + EndTime: 0, } - _ = publishJSON(a.producer, evt) + _ = publishJSON(a.metricsProducer, evt) } writeJSON(w, http.StatusOK, RunResponse{ @@ -157,22 +164,3 @@ func publishJSON(w *kafka.Writer, payload any) error { msg := kafka.Message{Value: b} return w.WriteMessages(context.Background(), msg) } - -// toStringMap converts map[string]any to map[string]string using fmt.Sprint for values. -func toStringMap(in map[string]any) map[string]string { - if len(in) == 0 { - return nil - } - out := make(map[string]string, len(in)) - for k, v := range in { - switch t := v.(type) { - case nil: - out[k] = "" - case string: - out[k] = t - default: - out[k] = fmt.Sprint(t) - } - } - return out -} diff --git a/docker-compose.yaml b/docker-compose.yaml index 4b4da10..344925f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -151,7 +151,7 @@ services: - ./config:/app/config environment: KAFKA_ADDRS: kafka:29092 - KAFKA_TOPIC: function_actions + FUNCTION_METRICS_TOPIC: function_metrics ports: - "8080:8080" diff --git a/invoicer/cmd/main.go b/invoicer/cmd/main.go index dc79ea5..578ef03 100644 --- a/invoicer/cmd/main.go +++ b/invoicer/cmd/main.go @@ -37,32 +37,36 @@ type BillingData struct { } type Tariff struct { - ID int `json:"ID"` - Name string `json:"Name"` - ExecPrice float64 `json:"ExecPrice"` - MemPrice float64 `json:"MemPrice"` - CpuPrice float64 `json:"CpuPrice"` + ID int `json:"ID"` + Name string `json:"Name"` + ExecPrice float64 `json:"ExecPrice"` + MemPrice float64 `json:"MemPrice"` + CpuPrice float64 `json:"CpuPrice"` + ColdStartPricePerSecond float64 `json:"ColdStartPricePerSecond"` } type BillingResponse struct { - TenantID string `json:"tenant_id"` - PodName string `json:"pod_name"` - DurationSec int64 `json:"duration_sec"` - MemoryMB float64 `json:"memory_mb"` - ExecCost float64 `json:"exec_cost"` - MemoryCost float64 `json:"memory_cost"` - TotalCost float64 `json:"total_cost"` - TariffName string `json:"tariff_name"` - CalculatedAt time.Time `json:"calculated_at"` + TenantID string `json:"tenant_id"` + PodName string `json:"pod_name"` + DurationSec int64 `json:"duration_sec"` + MemoryMB float64 `json:"memory_mb"` + ExecCost float64 `json:"exec_cost"` + MemoryCost float64 `json:"memory_cost"` + ColdStartCost float64 `json:"cold_start_cost"` + TotalCost float64 `json:"total_cost"` + TariffName string `json:"tariff_name"` + CalculatedAt time.Time `json:"calculated_at"` } type NotificationMessage struct { - TenantID string `json:"tenant_id"` - Email string `json:"email"` - MemoryMB float64 `json:"memory_mb"` - TotalCost float64 `json:"total_cost"` - PodName string `json:"pod_name"` - Timestamp int64 `json:"timestamp"` + TenantID string `json:"tenant_id"` + Email string `json:"email"` + MemoryMB float64 `json:"memory_mb"` + TotalCost float64 `json:"total_cost"` + PodName string `json:"pod_name"` + Timestamp int64 `json:"timestamp"` + ColdStartSeconds int64 `json:"cold_start_seconds"` + ColdStartCost float64 `json:"cold_start_cost"` } func main() { @@ -198,12 +202,19 @@ func (i *Invoicer) getBilling(c *gin.Context) { return } - var totalExecCost, totalMemoryCost float64 + var totalExecCost, totalMemoryCost, totalColdStartCost float64 var totalDuration int64 var totalMemoryMB float64 resp := make([]BillingResponse, 0, len(billingData)) + coldStartSeconds, err := i.getColdStartSecondsFromClickHouse(tenantID) + if err != nil { + slog.Error("failed to compute cold start seconds", slog.String("error", err.Error())) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to compute cold start"}) + return + } + for _, data := range billingData { duration := int64(data.EndTime.Sub(data.StartTime).Seconds()) totalDuration += duration @@ -216,17 +227,19 @@ func (i *Invoicer) getBilling(c *gin.Context) { totalExecCost += execCost totalMemoryCost += memoryCost - totalCost := totalExecCost + totalMemoryCost + totalColdStartCost = float64(coldStartSeconds) * tariff.ColdStartPricePerSecond + totalCost := totalExecCost + totalMemoryCost + totalColdStartCost resp = append(resp, BillingResponse{ - TenantID: tenantID, - PodName: billingData[0].PodName, - DurationSec: totalDuration, - MemoryMB: totalMemoryMB, - ExecCost: totalExecCost, - MemoryCost: totalMemoryCost, - TotalCost: totalCost, - TariffName: tariff.Name, - CalculatedAt: time.Now(), + TenantID: tenantID, + PodName: billingData[0].PodName, + DurationSec: totalDuration, + MemoryMB: totalMemoryMB, + ExecCost: totalExecCost, + MemoryCost: totalMemoryCost, + ColdStartCost: totalColdStartCost, + TotalCost: totalCost, + TariffName: tariff.Name, + CalculatedAt: time.Now(), }) } @@ -264,6 +277,35 @@ func (i *Invoicer) getBillingDataFromClickHouse(tenantID string) ([]BillingData, return result, nil } +// getColdStartSecondsFromClickHouse calculates the sum of cold start seconds for a tenant. +func (i *Invoicer) getColdStartSecondsFromClickHouse(tenantID string) (int64, error) { + // For each start event, find the first metric (non-start) timestamp >= start_time per pod + query := ` + WITH starts AS ( + SELECT tenant, pod, start_time + FROM function_metrics_local + WHERE tenant = ? AND type = 'start' AND start_time IS NOT NULL + ), first_metrics AS ( + SELECT tenant, pod, min(timestamp) AS first_metric_ts + FROM function_metrics_local + WHERE (type IS NULL OR type != 'start') + GROUP BY tenant, pod + ) + SELECT coalesce( + sum(greatest(0, dateDiff('second', s.start_time, fm.first_metric_ts))), 0 + ) AS total_cold_start_seconds + FROM starts s + LEFT JOIN first_metrics fm USING (tenant, pod) + ` + + row := i.clickhouseClient.QueryRow(context.Background(), query, tenantID) + var total int64 + if err := row.Scan(&total); err != nil { + return 0, err + } + return total, nil +} + func (i *Invoicer) getTariffFromPriceService(tariffID int) (*Tariff, error) { url := fmt.Sprintf("%s/v1/tariff/%d", i.priceServiceURL, tariffID) @@ -310,25 +352,34 @@ func (i *Invoicer) sendStopNotification(action types.Action) { // Calculate costs var totalMemoryMB, totalCost float64 + // compute cold start seconds for this tenant (could refine to this pod if needed) + coldStartSeconds, err := i.getColdStartSecondsFromClickHouse(action.Tenant) + if err != nil { + slog.Error("failed to compute cold start seconds for notification", slog.String("error", err.Error())) + coldStartSeconds = 0 + } for _, data := range billingData { if data.PodName == action.Pod { duration := int64(data.EndTime.Sub(data.StartTime).Seconds()) execCost := float64(duration) * tariff.ExecPrice memoryCost := data.TotalMemoryConsumedMB * tariff.MemPrice totalMemoryMB = data.TotalMemoryConsumedMB - totalCost = execCost + memoryCost + coldStartCost := float64(coldStartSeconds) * tariff.ColdStartPricePerSecond + totalCost = execCost + memoryCost + coldStartCost break } } // Create notification message notification := NotificationMessage{ - TenantID: action.Tenant, - Email: action.Tenant, - MemoryMB: totalMemoryMB, - TotalCost: totalCost, - PodName: action.Pod, - Timestamp: time.Now().Unix(), + TenantID: action.Tenant, + Email: action.Tenant, + MemoryMB: totalMemoryMB, + TotalCost: totalCost, + PodName: action.Pod, + Timestamp: time.Now().Unix(), + ColdStartSeconds: coldStartSeconds, + ColdStartCost: float64(coldStartSeconds) * tariff.ColdStartPricePerSecond, } // Send notification to Kafka diff --git a/migrations/clickhouse/20251012191647_funcion_metrics_kafka.sql b/migrations/clickhouse/20251012191647_funcion_metrics_kafka.sql index d8d5849..5e49726 100644 --- a/migrations/clickhouse/20251012191647_funcion_metrics_kafka.sql +++ b/migrations/clickhouse/20251012191647_funcion_metrics_kafka.sql @@ -5,7 +5,10 @@ CREATE TABLE IF NOT EXISTS metrics.function_metrics_kafka ( cpu_percent Float32, mem_mb Float32, timestamp Int64, - tenant String + tenant String, + type String, + start_time Int64, + end_time Int64 ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:29092', diff --git a/migrations/clickhouse/20251012194535_funcion_metrics_local.sql b/migrations/clickhouse/20251012194535_funcion_metrics_local.sql index e5bab84..539582c 100644 --- a/migrations/clickhouse/20251012194535_funcion_metrics_local.sql +++ b/migrations/clickhouse/20251012194535_funcion_metrics_local.sql @@ -5,7 +5,10 @@ CREATE TABLE IF NOT EXISTS metrics.function_metrics_local ( cpu_percent Float32, mem_mb Float32, timestamp DateTime, - tenant String + tenant String, + type String, + start_time DateTime, + end_time DateTime ) ENGINE = MergeTree PARTITION BY toYYYYMMDD(timestamp) diff --git a/migrations/clickhouse/20251012194539_funcion_metrics_mv.sql b/migrations/clickhouse/20251012194539_funcion_metrics_mv.sql index e0a3ce2..95e6ff8 100644 --- a/migrations/clickhouse/20251012194539_funcion_metrics_mv.sql +++ b/migrations/clickhouse/20251012194539_funcion_metrics_mv.sql @@ -8,7 +8,10 @@ SELECT cpu_percent, mem_mb, toDateTime(timestamp) AS timestamp, - tenant + tenant, + type, + if(start_time = 0, NULL, toDateTime(start_time)) AS start_time, + if(end_time = 0, NULL, toDateTime(end_time)) AS end_time FROM metrics.function_metrics_kafka; -- +goose StatementEnd diff --git a/notifier/cmd/main.go b/notifier/cmd/main.go index cf4717a..393fd94 100644 --- a/notifier/cmd/main.go +++ b/notifier/cmd/main.go @@ -29,12 +29,14 @@ type Config struct { } type NotificationMessage struct { - TenantID string `json:"tenant_id"` - Email string `json:"email"` - MemoryMB float64 `json:"memory_mb"` - TotalCost float64 `json:"total_cost"` - PodName string `json:"pod_name"` - Timestamp int64 `json:"timestamp"` + TenantID string `json:"tenant_id"` + Email string `json:"email"` + MemoryMB float64 `json:"memory_mb"` + TotalCost float64 `json:"total_cost"` + PodName string `json:"pod_name"` + Timestamp int64 `json:"timestamp"` + ColdStartSeconds int64 `json:"cold_start_seconds"` + ColdStartCost float64 `json:"cold_start_cost"` } func main() { @@ -89,6 +91,7 @@ func main() { @@ -96,7 +99,7 @@ func main() {

Best regards,
FaaS Team

- `, notification.TenantID, notification.PodName, notification.MemoryMB, notification.TotalCost, + `, notification.TenantID, notification.PodName, notification.MemoryMB, notification.ColdStartSeconds, notification.ColdStartCost, notification.TotalCost, fmt.Sprintf("%d", notification.Timestamp))) d := gomail.NewDialer(cfg.SMTP.Host, cfg.SMTP.Port, cfg.SMTP.Username, cfg.SMTP.Password) diff --git a/pkg/clickhouse/client.go b/pkg/clickhouse/client.go index ee0b437..bc13554 100644 --- a/pkg/clickhouse/client.go +++ b/pkg/clickhouse/client.go @@ -59,3 +59,7 @@ func (c *Client) Close() error { func (c *Client) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) { return c.conn.Query(ctx, query, args...) } + +func (c *Client) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row { + return c.conn.QueryRow(ctx, query, args...) +} diff --git a/pkg/types/types.go b/pkg/types/types.go index 404702b..63b4bd6 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -8,6 +8,9 @@ type Metric struct { MemMB float64 `json:"mem_mb"` Timestamp int64 `json:"timestamp"` Tenant string `json:"tenant"` + Type string `json:"type,omitempty"` + StartTime int64 `json:"start_time,omitempty"` + EndTime int64 `json:"end_time,omitempty"` } type Action struct { diff --git a/price_service/docs/docs.go b/price_service/docs/docs.go index f4f18cd..450dbc5 100644 --- a/price_service/docs/docs.go +++ b/price_service/docs/docs.go @@ -1,4 +1,5 @@ -// Package docs Code generated by swaggo/swag. DO NOT EDIT +// Package docs GENERATED BY SWAG; DO NOT EDIT +// This file was generated by swaggo/swag package docs import "github.com/swaggo/swag" @@ -176,6 +177,9 @@ const docTemplate = `{ "entity.Tariff": { "type": "object", "properties": { + "coldStartPricePerSecond": { + "type": "number" + }, "cpuPrice": { "type": "number" }, @@ -208,6 +212,10 @@ const docTemplate = `{ "name" ], "properties": { + "cold_start_price_per_second": { + "type": "number", + "minimum": 0 + }, "cpu_price": { "type": "number", "minimum": 0 @@ -228,6 +236,10 @@ const docTemplate = `{ "request.UpdateTariff": { "type": "object", "properties": { + "cold_start_price_per_second": { + "type": "number", + "minimum": 0 + }, "cpu_price": { "type": "number", "minimum": 0 @@ -269,8 +281,6 @@ var SwaggerInfo = &swag.Spec{ Description: "", InfoInstanceName: "swagger", SwaggerTemplate: docTemplate, - LeftDelim: "{{", - RightDelim: "}}", } func init() { diff --git a/price_service/docs/swagger.json b/price_service/docs/swagger.json index f625f41..dd7c0ad 100644 --- a/price_service/docs/swagger.json +++ b/price_service/docs/swagger.json @@ -165,6 +165,9 @@ "entity.Tariff": { "type": "object", "properties": { + "coldStartPricePerSecond": { + "type": "number" + }, "cpuPrice": { "type": "number" }, @@ -197,6 +200,10 @@ "name" ], "properties": { + "cold_start_price_per_second": { + "type": "number", + "minimum": 0 + }, "cpu_price": { "type": "number", "minimum": 0 @@ -217,6 +224,10 @@ "request.UpdateTariff": { "type": "object", "properties": { + "cold_start_price_per_second": { + "type": "number", + "minimum": 0 + }, "cpu_price": { "type": "number", "minimum": 0 diff --git a/price_service/docs/swagger.yaml b/price_service/docs/swagger.yaml index ef135bf..1470b06 100644 --- a/price_service/docs/swagger.yaml +++ b/price_service/docs/swagger.yaml @@ -1,6 +1,8 @@ definitions: entity.Tariff: properties: + coldStartPricePerSecond: + type: number cpuPrice: type: number createdAt: @@ -18,6 +20,9 @@ definitions: type: object request.CreateTariff: properties: + cold_start_price_per_second: + minimum: 0 + type: number cpu_price: minimum: 0 type: number @@ -37,6 +42,9 @@ definitions: type: object request.UpdateTariff: properties: + cold_start_price_per_second: + minimum: 0 + type: number cpu_price: minimum: 0 type: number diff --git a/price_service/internal/controller/v1/request/request.go b/price_service/internal/controller/v1/request/request.go index 52b3c3f..3ec8f8e 100644 --- a/price_service/internal/controller/v1/request/request.go +++ b/price_service/internal/controller/v1/request/request.go @@ -1,15 +1,17 @@ package request type CreateTariff struct { - Name string `json:"name" validate:"required"` - ExecPrice float64 `json:"exec_price" validate:"required,gte=0"` - MemPrice float64 `json:"mem_price" validate:"required,gte=0"` - CpuPrice float64 `json:"cpu_price" validate:"required,gte=0"` + Name string `json:"name" validate:"required"` + ExecPrice float64 `json:"exec_price" validate:"required,gte=0"` + MemPrice float64 `json:"mem_price" validate:"required,gte=0"` + CpuPrice float64 `json:"cpu_price" validate:"required,gte=0"` + ColdStartPricePerSecond float64 `json:"cold_start_price_per_second" validate:"gte=0"` } type UpdateTariff struct { - Name string `json:"name"` - ExecPrice float64 `json:"exec_price" validate:"gte=0"` - MemPrice float64 `json:"mem_price" validate:"gte=0"` - CpuPrice float64 `json:"cpu_price" validate:"gte=0"` + Name string `json:"name"` + ExecPrice float64 `json:"exec_price" validate:"gte=0"` + MemPrice float64 `json:"mem_price" validate:"gte=0"` + CpuPrice float64 `json:"cpu_price" validate:"gte=0"` + ColdStartPricePerSecond float64 `json:"cold_start_price_per_second" validate:"gte=0"` } diff --git a/price_service/internal/controller/v1/tariff.go b/price_service/internal/controller/v1/tariff.go index 7a31084..fe7165d 100644 --- a/price_service/internal/controller/v1/tariff.go +++ b/price_service/internal/controller/v1/tariff.go @@ -67,10 +67,11 @@ func (r *tariffRoutes) createNewTariff(c *gin.Context) { } createdTariff, err := r.tariffService.Create(c, &service.TariffInput{ - Name: tariff.Name, - ExecPrice: tariff.ExecPrice, - MemPrice: tariff.MemPrice, - CpuPrice: tariff.CpuPrice, + Name: tariff.Name, + ExecPrice: tariff.ExecPrice, + MemPrice: tariff.MemPrice, + CpuPrice: tariff.CpuPrice, + ColdStartPricePerSecond: tariff.ColdStartPricePerSecond, }) if err != nil { slog.Error("failed to create tariff", err.Error()) @@ -187,10 +188,11 @@ func (r *tariffRoutes) updateTariffByID(c *gin.Context) { } updatedTariff, err := r.tariffService.UpdateByID(c, id, &service.TariffInput{ - Name: updateData.Name, - ExecPrice: updateData.ExecPrice, - MemPrice: updateData.MemPrice, - CpuPrice: updateData.CpuPrice, + Name: updateData.Name, + ExecPrice: updateData.ExecPrice, + MemPrice: updateData.MemPrice, + CpuPrice: updateData.CpuPrice, + ColdStartPricePerSecond: updateData.ColdStartPricePerSecond, }) if err != nil { if errors.Is(err, service.ErrTariffNotFound) { diff --git a/price_service/internal/entity/entity.go b/price_service/internal/entity/entity.go index f627aee..d655253 100644 --- a/price_service/internal/entity/entity.go +++ b/price_service/internal/entity/entity.go @@ -3,13 +3,14 @@ package entity import "time" type Tariff struct { - ID int `db:"id"` - Name string `db:"name"` - ExecPrice float64 `db:"exec_price"` - MemPrice float64 `db:"mem_price"` - CpuPrice float64 `db:"cpu_price"` - CreatedAt time.Time `db:"created_at"` - UpdatedAt time.Time `db:"updated_at"` + ID int `db:"id"` + Name string `db:"name"` + ExecPrice float64 `db:"exec_price"` + MemPrice float64 `db:"mem_price"` + CpuPrice float64 `db:"cpu_price"` + ColdStartPricePerSecond float64 `db:"cold_start_price_per_second"` + CreatedAt time.Time `db:"created_at"` + UpdatedAt time.Time `db:"updated_at"` } type TariffFilters struct { diff --git a/price_service/internal/repo/tariff/tariff.go b/price_service/internal/repo/tariff/tariff.go index 4915cb8..001bd75 100644 --- a/price_service/internal/repo/tariff/tariff.go +++ b/price_service/internal/repo/tariff/tariff.go @@ -5,9 +5,9 @@ import ( "errors" "log/slog" + "github.com/usamaroman/faas_demo/pkg/postgresql" "github.com/usamaroman/faas_demo/price_service/internal/entity" "github.com/usamaroman/faas_demo/price_service/internal/repo/repoerrors" - "github.com/usamaroman/faas_demo/pkg/postgresql" "github.com/Masterminds/squirrel" "github.com/jackc/pgx/v5" @@ -25,9 +25,9 @@ func NewRepo(pg *postgresql.Postgres) *Repo { func (r *Repo) Create(ctx context.Context, body *entity.Tariff) (*entity.Tariff, error) { q, args, err := r.Builder.Insert("tariffs"). - Columns("name", "exec_price", "mem_price", "cpu_price"). - Values(body.Name, body.ExecPrice, body.MemPrice, body.CpuPrice). - Suffix("RETURNING id, exec_price, mem_price, cpu_price, created_at, updated_at"). + Columns("name", "exec_price", "mem_price", "cpu_price", "cold_start_price_per_second"). + Values(body.Name, body.ExecPrice, body.MemPrice, body.CpuPrice, body.ColdStartPricePerSecond). + Suffix("RETURNING id, exec_price, mem_price, cpu_price, cold_start_price_per_second, created_at, updated_at"). ToSql() if err != nil { slog.Error("failed to make query", err.Error()) @@ -41,6 +41,7 @@ func (r *Repo) Create(ctx context.Context, body *entity.Tariff) (*entity.Tariff, &body.ExecPrice, &body.MemPrice, &body.CpuPrice, + &body.ColdStartPricePerSecond, &body.CreatedAt, &body.UpdatedAt, ); err != nil { @@ -72,6 +73,7 @@ func (r *Repo) GetByID(ctx context.Context, id int) (*entity.Tariff, error) { &tariff.ExecPrice, &tariff.MemPrice, &tariff.CpuPrice, + &tariff.ColdStartPricePerSecond, &tariff.CreatedAt, &tariff.UpdatedAt, ); err != nil { @@ -89,7 +91,16 @@ func (r *Repo) GetByID(ctx context.Context, id int) (*entity.Tariff, error) { func (r *Repo) GetAll(ctx context.Context, filters *entity.TariffFilters) ([]entity.Tariff, error) { qb := r.Builder. - Select("*"). + Select( + "id", + "name", + "exec_price", + "mem_price", + "cpu_price", + "cold_start_price_per_second", + "created_at", + "updated_at", + ). From("tariffs") q, args, err := qb.Limit(filters.Limit). @@ -110,12 +121,30 @@ func (r *Repo) GetAll(ctx context.Context, filters *entity.TariffFilters) ([]ent } defer rows.Close() - tariffs, err := pgx.CollectRows(rows, pgx.RowToStructByName[entity.Tariff]) - if err != nil { - slog.Error("failed to collect rows", err.Error()) + var tariffs []entity.Tariff + for rows.Next() { + var t entity.Tariff + if err := rows.Scan( + &t.ID, + &t.Name, + &t.ExecPrice, + &t.MemPrice, + &t.CpuPrice, + &t.ColdStartPricePerSecond, + &t.CreatedAt, + &t.UpdatedAt, + ); err != nil { + slog.Error("failed to scan tariff row", err.Error()) + return nil, err + } + tariffs = append(tariffs, t) } - return tariffs, err + if rows.Err() != nil { + return nil, rows.Err() + } + + return tariffs, nil } func (r *Repo) UpdateByID(ctx context.Context, id int, updates *entity.Tariff) (*entity.Tariff, error) { @@ -137,8 +166,12 @@ func (r *Repo) UpdateByID(ctx context.Context, id int, updates *entity.Tariff) ( builder = builder.Set("cpu_price", updates.CpuPrice) } + if updates.ColdStartPricePerSecond != 0 { + builder = builder.Set("cold_start_price_per_second", updates.ColdStartPricePerSecond) + } + q, args, err := builder.Where(squirrel.Eq{"id": id}). - Suffix("RETURNING id, exec_price, mem_price, cpu_price, created_at, updated_at"). + Suffix("RETURNING id, exec_price, mem_price, cpu_price, cold_start_price_per_second, created_at, updated_at"). ToSql() if err != nil { slog.Error("failed to build SQL query", slog.Any("id", id), err.Error()) @@ -152,6 +185,7 @@ func (r *Repo) UpdateByID(ctx context.Context, id int, updates *entity.Tariff) ( &updates.ExecPrice, &updates.MemPrice, &updates.CpuPrice, + &updates.ColdStartPricePerSecond, &updates.CreatedAt, &updates.UpdatedAt, ); err != nil { diff --git a/price_service/internal/service/tariffs.go b/price_service/internal/service/tariffs.go index 466cf12..56c6717 100644 --- a/price_service/internal/service/tariffs.go +++ b/price_service/internal/service/tariffs.go @@ -15,10 +15,11 @@ type TariffService struct { } type TariffInput struct { - Name string `json:"name"` - ExecPrice float64 `json:"exec_price"` - MemPrice float64 `json:"mem_price"` - CpuPrice float64 `json:"cpu_price"` + Name string `json:"name"` + ExecPrice float64 `json:"exec_price"` + MemPrice float64 `json:"mem_price"` + CpuPrice float64 `json:"cpu_price"` + ColdStartPricePerSecond float64 `json:"cold_start_price_per_second"` } func NewTariffService(tariffRepo repo.Tariff) *TariffService { @@ -31,10 +32,11 @@ func NewTariffService(tariffRepo repo.Tariff) *TariffService { func (s *TariffService) Create(ctx context.Context, body *TariffInput) (*entity.Tariff, error) { tariff, err := s.tariffRepo.Create(ctx, &entity.Tariff{ - Name: body.Name, - ExecPrice: body.ExecPrice, - MemPrice: body.MemPrice, - CpuPrice: body.CpuPrice, + Name: body.Name, + ExecPrice: body.ExecPrice, + MemPrice: body.MemPrice, + CpuPrice: body.CpuPrice, + ColdStartPricePerSecond: body.ColdStartPricePerSecond, }) if err != nil { slog.Error("failed to create tariff", err.Error()) @@ -63,10 +65,11 @@ func (s *TariffService) GetAll(ctx context.Context, filters *entity.TariffFilter func (s *TariffService) UpdateByID(ctx context.Context, id int, updates *TariffInput) (*entity.Tariff, error) { updatedTariff, err := s.tariffRepo.UpdateByID(ctx, id, &entity.Tariff{ - Name: updates.Name, - ExecPrice: updates.ExecPrice, - MemPrice: updates.MemPrice, - CpuPrice: updates.CpuPrice, + Name: updates.Name, + ExecPrice: updates.ExecPrice, + MemPrice: updates.MemPrice, + CpuPrice: updates.CpuPrice, + ColdStartPricePerSecond: updates.ColdStartPricePerSecond, }) if err != nil { if errors.Is(err, repoerrors.ErrNotFound) { diff --git a/price_service/migrations/00001_create_table_tariffs.sql b/price_service/migrations/00001_create_table_tariffs.sql index c27bab0..4f571de 100644 --- a/price_service/migrations/00001_create_table_tariffs.sql +++ b/price_service/migrations/00001_create_table_tariffs.sql @@ -6,6 +6,7 @@ CREATE TABLE tariffs ( exec_price numeric(10, 2), mem_price numeric(10, 2), cpu_price numeric(10, 2), + cold_start_price_per_second numeric(10, 2) NOT NULL DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); @@ -21,7 +22,7 @@ $$ language 'plpgsql'; CREATE TRIGGER update_tariffs_updated_at BEFORE UPDATE ON tariffs FOR EACH ROW EXECUTE FUNCTION update_updated_at_column(); -INSERT INTO tariffs (name, exec_price, mem_price, cpu_price) VALUES ('basic', 0, 0, 0) +INSERT INTO tariffs (name, exec_price, mem_price, cpu_price, cold_start_price_per_second) VALUES ('basic', 0, 0, 0, 0) -- +goose StatementEnd -- +goose Down diff --git a/setup.sh b/setup.sh index d104060..39e715d 100644 --- a/setup.sh +++ b/setup.sh @@ -30,6 +30,6 @@ GOOSE_DBSTRING="tcp://user:1234@localhost:9000/metrics" ./bin/goose clickhouse u # Step 5: Run application services echo "Step 5: Starting application services..." -docker compose up -d meter price_service notifier invoicer control_plane +docker compose up --build -d meter price_service notifier invoicer control_plane echo "Setup completed successfully."