diff --git a/control_plane/cmd/main.go b/control_plane/cmd/main.go index 70af281..bf55ac3 100644 --- a/control_plane/cmd/main.go +++ b/control_plane/cmd/main.go @@ -46,19 +46,19 @@ func run() { os.Exit(1) } - actionsTopic := os.Getenv("FUNCTION_ACTIONS_TOPIC") - if actionsTopic == "" { - actionsTopic = "function_actions" + metricsTopic := os.Getenv("FUNCTION_METRICS_TOPIC") + if metricsTopic == "" { + metricsTopic = "function_metrics" } brokers := strings.Split(addresses, ",") - actionsProducer := kafka.NewProducer(kafka.ProducerConfig{Topic: actionsTopic, Addrs: brokers}) + metricsProducer := kafka.NewProducer(kafka.ProducerConfig{Topic: metricsTopic, Addrs: brokers}) mux := http.NewServeMux() mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) }) - api := httpapi.New(cfg, actionsProducer, restCfg) + api := httpapi.New(cfg, metricsProducer, restCfg) api.Register(mux) slog.Info("control plane listening", slog.String("addr", cfg.HTTP.Addr)) diff --git a/control_plane/internal/http/api.go b/control_plane/internal/http/api.go index f6ee3d8..e98e757 100644 --- a/control_plane/internal/http/api.go +++ b/control_plane/internal/http/api.go @@ -13,17 +13,18 @@ import ( "github.com/segmentio/kafka-go" "github.com/usamaroman/faas_demo/control_plane/internal/config" "github.com/usamaroman/faas_demo/pkg/knative" + "github.com/usamaroman/faas_demo/pkg/types" "k8s.io/client-go/rest" ) type API struct { - cfg config.Config - producer *kafka.Writer // optional - restCfg *rest.Config + cfg config.Config + metricsProducer *kafka.Writer // optional + restCfg *rest.Config } func New(cfg config.Config, producer *kafka.Writer, restCfg *rest.Config) *API { - return &API{cfg: cfg, producer: producer, restCfg: restCfg} + return &API{cfg: cfg, metricsProducer: producer, restCfg: restCfg} } func (a *API) Register(mux *http.ServeMux) { @@ -118,13 +119,19 @@ func (a *API) handleRun(w http.ResponseWriter, r *http.Request) { return } - if a.producer != nil && a.cfg.Kafka.Topic != "" { - evt := map[string]any{ - "tenant": req.Email, - "pod": name, - "ts": time.Now().UTC().Format(time.RFC3339Nano), + if a.metricsProducer != nil { + now := time.Now().UTC().Unix() + evt := types.Metric{ + Pod: name, + CPUPercent: 0, + MemMB: 0, + Timestamp: now, + Tenant: req.Email, + Type: "start", + StartTime: now, + EndTime: 0, } - _ = publishJSON(a.producer, evt) + _ = publishJSON(a.metricsProducer, evt) } writeJSON(w, http.StatusOK, RunResponse{ @@ -157,22 +164,3 @@ func publishJSON(w *kafka.Writer, payload any) error { msg := kafka.Message{Value: b} return w.WriteMessages(context.Background(), msg) } - -// toStringMap converts map[string]any to map[string]string using fmt.Sprint for values. -func toStringMap(in map[string]any) map[string]string { - if len(in) == 0 { - return nil - } - out := make(map[string]string, len(in)) - for k, v := range in { - switch t := v.(type) { - case nil: - out[k] = "" - case string: - out[k] = t - default: - out[k] = fmt.Sprint(t) - } - } - return out -} diff --git a/docker-compose.yaml b/docker-compose.yaml index 4b4da10..344925f 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -151,7 +151,7 @@ services: - ./config:/app/config environment: KAFKA_ADDRS: kafka:29092 - KAFKA_TOPIC: function_actions + FUNCTION_METRICS_TOPIC: function_metrics ports: - "8080:8080" diff --git a/invoicer/cmd/main.go b/invoicer/cmd/main.go index dc79ea5..578ef03 100644 --- a/invoicer/cmd/main.go +++ b/invoicer/cmd/main.go @@ -37,32 +37,36 @@ type BillingData struct { } type Tariff struct { - ID int `json:"ID"` - Name string `json:"Name"` - ExecPrice float64 `json:"ExecPrice"` - MemPrice float64 `json:"MemPrice"` - CpuPrice float64 `json:"CpuPrice"` + ID int `json:"ID"` + Name string `json:"Name"` + ExecPrice float64 `json:"ExecPrice"` + MemPrice float64 `json:"MemPrice"` + CpuPrice float64 `json:"CpuPrice"` + ColdStartPricePerSecond float64 `json:"ColdStartPricePerSecond"` } type BillingResponse struct { - TenantID string `json:"tenant_id"` - PodName string `json:"pod_name"` - DurationSec int64 `json:"duration_sec"` - MemoryMB float64 `json:"memory_mb"` - ExecCost float64 `json:"exec_cost"` - MemoryCost float64 `json:"memory_cost"` - TotalCost float64 `json:"total_cost"` - TariffName string `json:"tariff_name"` - CalculatedAt time.Time `json:"calculated_at"` + TenantID string `json:"tenant_id"` + PodName string `json:"pod_name"` + DurationSec int64 `json:"duration_sec"` + MemoryMB float64 `json:"memory_mb"` + ExecCost float64 `json:"exec_cost"` + MemoryCost float64 `json:"memory_cost"` + ColdStartCost float64 `json:"cold_start_cost"` + TotalCost float64 `json:"total_cost"` + TariffName string `json:"tariff_name"` + CalculatedAt time.Time `json:"calculated_at"` } type NotificationMessage struct { - TenantID string `json:"tenant_id"` - Email string `json:"email"` - MemoryMB float64 `json:"memory_mb"` - TotalCost float64 `json:"total_cost"` - PodName string `json:"pod_name"` - Timestamp int64 `json:"timestamp"` + TenantID string `json:"tenant_id"` + Email string `json:"email"` + MemoryMB float64 `json:"memory_mb"` + TotalCost float64 `json:"total_cost"` + PodName string `json:"pod_name"` + Timestamp int64 `json:"timestamp"` + ColdStartSeconds int64 `json:"cold_start_seconds"` + ColdStartCost float64 `json:"cold_start_cost"` } func main() { @@ -198,12 +202,19 @@ func (i *Invoicer) getBilling(c *gin.Context) { return } - var totalExecCost, totalMemoryCost float64 + var totalExecCost, totalMemoryCost, totalColdStartCost float64 var totalDuration int64 var totalMemoryMB float64 resp := make([]BillingResponse, 0, len(billingData)) + coldStartSeconds, err := i.getColdStartSecondsFromClickHouse(tenantID) + if err != nil { + slog.Error("failed to compute cold start seconds", slog.String("error", err.Error())) + c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to compute cold start"}) + return + } + for _, data := range billingData { duration := int64(data.EndTime.Sub(data.StartTime).Seconds()) totalDuration += duration @@ -216,17 +227,19 @@ func (i *Invoicer) getBilling(c *gin.Context) { totalExecCost += execCost totalMemoryCost += memoryCost - totalCost := totalExecCost + totalMemoryCost + totalColdStartCost = float64(coldStartSeconds) * tariff.ColdStartPricePerSecond + totalCost := totalExecCost + totalMemoryCost + totalColdStartCost resp = append(resp, BillingResponse{ - TenantID: tenantID, - PodName: billingData[0].PodName, - DurationSec: totalDuration, - MemoryMB: totalMemoryMB, - ExecCost: totalExecCost, - MemoryCost: totalMemoryCost, - TotalCost: totalCost, - TariffName: tariff.Name, - CalculatedAt: time.Now(), + TenantID: tenantID, + PodName: billingData[0].PodName, + DurationSec: totalDuration, + MemoryMB: totalMemoryMB, + ExecCost: totalExecCost, + MemoryCost: totalMemoryCost, + ColdStartCost: totalColdStartCost, + TotalCost: totalCost, + TariffName: tariff.Name, + CalculatedAt: time.Now(), }) } @@ -264,6 +277,35 @@ func (i *Invoicer) getBillingDataFromClickHouse(tenantID string) ([]BillingData, return result, nil } +// getColdStartSecondsFromClickHouse calculates the sum of cold start seconds for a tenant. +func (i *Invoicer) getColdStartSecondsFromClickHouse(tenantID string) (int64, error) { + // For each start event, find the first metric (non-start) timestamp >= start_time per pod + query := ` + WITH starts AS ( + SELECT tenant, pod, start_time + FROM function_metrics_local + WHERE tenant = ? AND type = 'start' AND start_time IS NOT NULL + ), first_metrics AS ( + SELECT tenant, pod, min(timestamp) AS first_metric_ts + FROM function_metrics_local + WHERE (type IS NULL OR type != 'start') + GROUP BY tenant, pod + ) + SELECT coalesce( + sum(greatest(0, dateDiff('second', s.start_time, fm.first_metric_ts))), 0 + ) AS total_cold_start_seconds + FROM starts s + LEFT JOIN first_metrics fm USING (tenant, pod) + ` + + row := i.clickhouseClient.QueryRow(context.Background(), query, tenantID) + var total int64 + if err := row.Scan(&total); err != nil { + return 0, err + } + return total, nil +} + func (i *Invoicer) getTariffFromPriceService(tariffID int) (*Tariff, error) { url := fmt.Sprintf("%s/v1/tariff/%d", i.priceServiceURL, tariffID) @@ -310,25 +352,34 @@ func (i *Invoicer) sendStopNotification(action types.Action) { // Calculate costs var totalMemoryMB, totalCost float64 + // compute cold start seconds for this tenant (could refine to this pod if needed) + coldStartSeconds, err := i.getColdStartSecondsFromClickHouse(action.Tenant) + if err != nil { + slog.Error("failed to compute cold start seconds for notification", slog.String("error", err.Error())) + coldStartSeconds = 0 + } for _, data := range billingData { if data.PodName == action.Pod { duration := int64(data.EndTime.Sub(data.StartTime).Seconds()) execCost := float64(duration) * tariff.ExecPrice memoryCost := data.TotalMemoryConsumedMB * tariff.MemPrice totalMemoryMB = data.TotalMemoryConsumedMB - totalCost = execCost + memoryCost + coldStartCost := float64(coldStartSeconds) * tariff.ColdStartPricePerSecond + totalCost = execCost + memoryCost + coldStartCost break } } // Create notification message notification := NotificationMessage{ - TenantID: action.Tenant, - Email: action.Tenant, - MemoryMB: totalMemoryMB, - TotalCost: totalCost, - PodName: action.Pod, - Timestamp: time.Now().Unix(), + TenantID: action.Tenant, + Email: action.Tenant, + MemoryMB: totalMemoryMB, + TotalCost: totalCost, + PodName: action.Pod, + Timestamp: time.Now().Unix(), + ColdStartSeconds: coldStartSeconds, + ColdStartCost: float64(coldStartSeconds) * tariff.ColdStartPricePerSecond, } // Send notification to Kafka diff --git a/migrations/clickhouse/20251012191647_funcion_metrics_kafka.sql b/migrations/clickhouse/20251012191647_funcion_metrics_kafka.sql index d8d5849..5e49726 100644 --- a/migrations/clickhouse/20251012191647_funcion_metrics_kafka.sql +++ b/migrations/clickhouse/20251012191647_funcion_metrics_kafka.sql @@ -5,7 +5,10 @@ CREATE TABLE IF NOT EXISTS metrics.function_metrics_kafka ( cpu_percent Float32, mem_mb Float32, timestamp Int64, - tenant String + tenant String, + type String, + start_time Int64, + end_time Int64 ) ENGINE = Kafka SETTINGS kafka_broker_list = 'kafka:29092', diff --git a/migrations/clickhouse/20251012194535_funcion_metrics_local.sql b/migrations/clickhouse/20251012194535_funcion_metrics_local.sql index e5bab84..539582c 100644 --- a/migrations/clickhouse/20251012194535_funcion_metrics_local.sql +++ b/migrations/clickhouse/20251012194535_funcion_metrics_local.sql @@ -5,7 +5,10 @@ CREATE TABLE IF NOT EXISTS metrics.function_metrics_local ( cpu_percent Float32, mem_mb Float32, timestamp DateTime, - tenant String + tenant String, + type String, + start_time DateTime, + end_time DateTime ) ENGINE = MergeTree PARTITION BY toYYYYMMDD(timestamp) diff --git a/migrations/clickhouse/20251012194539_funcion_metrics_mv.sql b/migrations/clickhouse/20251012194539_funcion_metrics_mv.sql index e0a3ce2..95e6ff8 100644 --- a/migrations/clickhouse/20251012194539_funcion_metrics_mv.sql +++ b/migrations/clickhouse/20251012194539_funcion_metrics_mv.sql @@ -8,7 +8,10 @@ SELECT cpu_percent, mem_mb, toDateTime(timestamp) AS timestamp, - tenant + tenant, + type, + if(start_time = 0, NULL, toDateTime(start_time)) AS start_time, + if(end_time = 0, NULL, toDateTime(end_time)) AS end_time FROM metrics.function_metrics_kafka; -- +goose StatementEnd diff --git a/notifier/cmd/main.go b/notifier/cmd/main.go index cf4717a..393fd94 100644 --- a/notifier/cmd/main.go +++ b/notifier/cmd/main.go @@ -29,12 +29,14 @@ type Config struct { } type NotificationMessage struct { - TenantID string `json:"tenant_id"` - Email string `json:"email"` - MemoryMB float64 `json:"memory_mb"` - TotalCost float64 `json:"total_cost"` - PodName string `json:"pod_name"` - Timestamp int64 `json:"timestamp"` + TenantID string `json:"tenant_id"` + Email string `json:"email"` + MemoryMB float64 `json:"memory_mb"` + TotalCost float64 `json:"total_cost"` + PodName string `json:"pod_name"` + Timestamp int64 `json:"timestamp"` + ColdStartSeconds int64 `json:"cold_start_seconds"` + ColdStartCost float64 `json:"cold_start_cost"` } func main() { @@ -89,6 +91,7 @@ func main() {
Best regards,
FaaS Team