Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions control_plane/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,19 @@ func run() {
os.Exit(1)
}

actionsTopic := os.Getenv("FUNCTION_ACTIONS_TOPIC")
if actionsTopic == "" {
actionsTopic = "function_actions"
metricsTopic := os.Getenv("FUNCTION_METRICS_TOPIC")
if metricsTopic == "" {
metricsTopic = "function_metrics"
}

brokers := strings.Split(addresses, ",")

actionsProducer := kafka.NewProducer(kafka.ProducerConfig{Topic: actionsTopic, Addrs: brokers})
metricsProducer := kafka.NewProducer(kafka.ProducerConfig{Topic: metricsTopic, Addrs: brokers})

mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })

api := httpapi.New(cfg, actionsProducer, restCfg)
api := httpapi.New(cfg, metricsProducer, restCfg)
api.Register(mux)

slog.Info("control plane listening", slog.String("addr", cfg.HTTP.Addr))
Expand Down
46 changes: 17 additions & 29 deletions control_plane/internal/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@ import (
"github.com/segmentio/kafka-go"
"github.com/usamaroman/faas_demo/control_plane/internal/config"
"github.com/usamaroman/faas_demo/pkg/knative"
"github.com/usamaroman/faas_demo/pkg/types"
"k8s.io/client-go/rest"
)

type API struct {
cfg config.Config
producer *kafka.Writer // optional
restCfg *rest.Config
cfg config.Config
metricsProducer *kafka.Writer // optional
restCfg *rest.Config
}

func New(cfg config.Config, producer *kafka.Writer, restCfg *rest.Config) *API {
return &API{cfg: cfg, producer: producer, restCfg: restCfg}
return &API{cfg: cfg, metricsProducer: producer, restCfg: restCfg}
}

func (a *API) Register(mux *http.ServeMux) {
Expand Down Expand Up @@ -118,13 +119,19 @@ func (a *API) handleRun(w http.ResponseWriter, r *http.Request) {
return
}

if a.producer != nil && a.cfg.Kafka.Topic != "" {
evt := map[string]any{
"tenant": req.Email,
"pod": name,
"ts": time.Now().UTC().Format(time.RFC3339Nano),
if a.metricsProducer != nil {
now := time.Now().UTC().Unix()
evt := types.Metric{
Pod: name,
CPUPercent: 0,
MemMB: 0,
Timestamp: now,
Tenant: req.Email,
Type: "start",
StartTime: now,
EndTime: 0,
}
_ = publishJSON(a.producer, evt)
_ = publishJSON(a.metricsProducer, evt)
}

writeJSON(w, http.StatusOK, RunResponse{
Expand Down Expand Up @@ -157,22 +164,3 @@ func publishJSON(w *kafka.Writer, payload any) error {
msg := kafka.Message{Value: b}
return w.WriteMessages(context.Background(), msg)
}

// toStringMap converts map[string]any to map[string]string using fmt.Sprint for values.
func toStringMap(in map[string]any) map[string]string {
if len(in) == 0 {
return nil
}
out := make(map[string]string, len(in))
for k, v := range in {
switch t := v.(type) {
case nil:
out[k] = ""
case string:
out[k] = t
default:
out[k] = fmt.Sprint(t)
}
}
return out
}
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ services:
- ./config:/app/config
environment:
KAFKA_ADDRS: kafka:29092
KAFKA_TOPIC: function_actions
FUNCTION_METRICS_TOPIC: function_metrics
ports:
- "8080:8080"

Expand Down
127 changes: 89 additions & 38 deletions invoicer/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,32 +37,36 @@ type BillingData struct {
}

type Tariff struct {
ID int `json:"ID"`
Name string `json:"Name"`
ExecPrice float64 `json:"ExecPrice"`
MemPrice float64 `json:"MemPrice"`
CpuPrice float64 `json:"CpuPrice"`
ID int `json:"ID"`
Name string `json:"Name"`
ExecPrice float64 `json:"ExecPrice"`
MemPrice float64 `json:"MemPrice"`
CpuPrice float64 `json:"CpuPrice"`
ColdStartPricePerSecond float64 `json:"ColdStartPricePerSecond"`
}

type BillingResponse struct {
TenantID string `json:"tenant_id"`
PodName string `json:"pod_name"`
DurationSec int64 `json:"duration_sec"`
MemoryMB float64 `json:"memory_mb"`
ExecCost float64 `json:"exec_cost"`
MemoryCost float64 `json:"memory_cost"`
TotalCost float64 `json:"total_cost"`
TariffName string `json:"tariff_name"`
CalculatedAt time.Time `json:"calculated_at"`
TenantID string `json:"tenant_id"`
PodName string `json:"pod_name"`
DurationSec int64 `json:"duration_sec"`
MemoryMB float64 `json:"memory_mb"`
ExecCost float64 `json:"exec_cost"`
MemoryCost float64 `json:"memory_cost"`
ColdStartCost float64 `json:"cold_start_cost"`
TotalCost float64 `json:"total_cost"`
TariffName string `json:"tariff_name"`
CalculatedAt time.Time `json:"calculated_at"`
}

type NotificationMessage struct {
TenantID string `json:"tenant_id"`
Email string `json:"email"`
MemoryMB float64 `json:"memory_mb"`
TotalCost float64 `json:"total_cost"`
PodName string `json:"pod_name"`
Timestamp int64 `json:"timestamp"`
TenantID string `json:"tenant_id"`
Email string `json:"email"`
MemoryMB float64 `json:"memory_mb"`
TotalCost float64 `json:"total_cost"`
PodName string `json:"pod_name"`
Timestamp int64 `json:"timestamp"`
ColdStartSeconds int64 `json:"cold_start_seconds"`
ColdStartCost float64 `json:"cold_start_cost"`
}

func main() {
Expand Down Expand Up @@ -198,12 +202,19 @@ func (i *Invoicer) getBilling(c *gin.Context) {
return
}

var totalExecCost, totalMemoryCost float64
var totalExecCost, totalMemoryCost, totalColdStartCost float64
var totalDuration int64
var totalMemoryMB float64

resp := make([]BillingResponse, 0, len(billingData))

coldStartSeconds, err := i.getColdStartSecondsFromClickHouse(tenantID)
if err != nil {
slog.Error("failed to compute cold start seconds", slog.String("error", err.Error()))
c.JSON(http.StatusInternalServerError, gin.H{"error": "failed to compute cold start"})
return
}

for _, data := range billingData {
duration := int64(data.EndTime.Sub(data.StartTime).Seconds())
totalDuration += duration
Expand All @@ -216,17 +227,19 @@ func (i *Invoicer) getBilling(c *gin.Context) {
totalExecCost += execCost
totalMemoryCost += memoryCost

totalCost := totalExecCost + totalMemoryCost
totalColdStartCost = float64(coldStartSeconds) * tariff.ColdStartPricePerSecond
totalCost := totalExecCost + totalMemoryCost + totalColdStartCost
resp = append(resp, BillingResponse{
TenantID: tenantID,
PodName: billingData[0].PodName,
DurationSec: totalDuration,
MemoryMB: totalMemoryMB,
ExecCost: totalExecCost,
MemoryCost: totalMemoryCost,
TotalCost: totalCost,
TariffName: tariff.Name,
CalculatedAt: time.Now(),
TenantID: tenantID,
PodName: billingData[0].PodName,
DurationSec: totalDuration,
MemoryMB: totalMemoryMB,
ExecCost: totalExecCost,
MemoryCost: totalMemoryCost,
ColdStartCost: totalColdStartCost,
TotalCost: totalCost,
TariffName: tariff.Name,
CalculatedAt: time.Now(),
})
}

Expand Down Expand Up @@ -264,6 +277,35 @@ func (i *Invoicer) getBillingDataFromClickHouse(tenantID string) ([]BillingData,
return result, nil
}

// getColdStartSecondsFromClickHouse calculates the sum of cold start seconds for a tenant.
func (i *Invoicer) getColdStartSecondsFromClickHouse(tenantID string) (int64, error) {
// For each start event, find the first metric (non-start) timestamp >= start_time per pod
query := `
WITH starts AS (
SELECT tenant, pod, start_time
FROM function_metrics_local
WHERE tenant = ? AND type = 'start' AND start_time IS NOT NULL
), first_metrics AS (
SELECT tenant, pod, min(timestamp) AS first_metric_ts
FROM function_metrics_local
WHERE (type IS NULL OR type != 'start')
GROUP BY tenant, pod
)
SELECT coalesce(
sum(greatest(0, dateDiff('second', s.start_time, fm.first_metric_ts))), 0
) AS total_cold_start_seconds
FROM starts s
LEFT JOIN first_metrics fm USING (tenant, pod)
`

row := i.clickhouseClient.QueryRow(context.Background(), query, tenantID)
var total int64
if err := row.Scan(&total); err != nil {
return 0, err
}
return total, nil
}

func (i *Invoicer) getTariffFromPriceService(tariffID int) (*Tariff, error) {
url := fmt.Sprintf("%s/v1/tariff/%d", i.priceServiceURL, tariffID)

Expand Down Expand Up @@ -310,25 +352,34 @@ func (i *Invoicer) sendStopNotification(action types.Action) {

// Calculate costs
var totalMemoryMB, totalCost float64
// compute cold start seconds for this tenant (could refine to this pod if needed)
coldStartSeconds, err := i.getColdStartSecondsFromClickHouse(action.Tenant)
if err != nil {
slog.Error("failed to compute cold start seconds for notification", slog.String("error", err.Error()))
coldStartSeconds = 0
}
for _, data := range billingData {
if data.PodName == action.Pod {
duration := int64(data.EndTime.Sub(data.StartTime).Seconds())
execCost := float64(duration) * tariff.ExecPrice
memoryCost := data.TotalMemoryConsumedMB * tariff.MemPrice
totalMemoryMB = data.TotalMemoryConsumedMB
totalCost = execCost + memoryCost
coldStartCost := float64(coldStartSeconds) * tariff.ColdStartPricePerSecond
totalCost = execCost + memoryCost + coldStartCost
break
}
}

// Create notification message
notification := NotificationMessage{
TenantID: action.Tenant,
Email: action.Tenant,
MemoryMB: totalMemoryMB,
TotalCost: totalCost,
PodName: action.Pod,
Timestamp: time.Now().Unix(),
TenantID: action.Tenant,
Email: action.Tenant,
MemoryMB: totalMemoryMB,
TotalCost: totalCost,
PodName: action.Pod,
Timestamp: time.Now().Unix(),
ColdStartSeconds: coldStartSeconds,
ColdStartCost: float64(coldStartSeconds) * tariff.ColdStartPricePerSecond,
}

// Send notification to Kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ CREATE TABLE IF NOT EXISTS metrics.function_metrics_kafka (
cpu_percent Float32,
mem_mb Float32,
timestamp Int64,
tenant String
tenant String,
type String,
start_time Int64,
end_time Int64
) ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka:29092',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ CREATE TABLE IF NOT EXISTS metrics.function_metrics_local (
cpu_percent Float32,
mem_mb Float32,
timestamp DateTime,
tenant String
tenant String,
type String,
start_time DateTime,
end_time DateTime
)
ENGINE = MergeTree
PARTITION BY toYYYYMMDD(timestamp)
Expand Down
5 changes: 4 additions & 1 deletion migrations/clickhouse/20251012194539_funcion_metrics_mv.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ SELECT
cpu_percent,
mem_mb,
toDateTime(timestamp) AS timestamp,
tenant
tenant,
type,
if(start_time = 0, NULL, toDateTime(start_time)) AS start_time,
if(end_time = 0, NULL, toDateTime(end_time)) AS end_time
FROM metrics.function_metrics_kafka;
-- +goose StatementEnd

Expand Down
17 changes: 10 additions & 7 deletions notifier/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,14 @@ type Config struct {
}

type NotificationMessage struct {
TenantID string `json:"tenant_id"`
Email string `json:"email"`
MemoryMB float64 `json:"memory_mb"`
TotalCost float64 `json:"total_cost"`
PodName string `json:"pod_name"`
Timestamp int64 `json:"timestamp"`
TenantID string `json:"tenant_id"`
Email string `json:"email"`
MemoryMB float64 `json:"memory_mb"`
TotalCost float64 `json:"total_cost"`
PodName string `json:"pod_name"`
Timestamp int64 `json:"timestamp"`
ColdStartSeconds int64 `json:"cold_start_seconds"`
ColdStartCost float64 `json:"cold_start_cost"`
}

func main() {
Expand Down Expand Up @@ -89,14 +91,15 @@ func main() {
<ul>
<li><strong>Pod Name:</strong> %s</li>
<li><strong>Memory Used:</strong> %.2f MB</li>
<li><strong>Cold Start:</strong> %d seconds ($%.2f)</li>
<li><strong>Total Cost:</strong> $%.2f</li>
<li><strong>Timestamp:</strong> %s</li>
</ul>
<p>Please ensure payment is processed for your FaaS usage.</p>
<p>Best regards,<br>FaaS Team</p>
</body>
</html>
`, notification.TenantID, notification.PodName, notification.MemoryMB, notification.TotalCost,
`, notification.TenantID, notification.PodName, notification.MemoryMB, notification.ColdStartSeconds, notification.ColdStartCost, notification.TotalCost,
fmt.Sprintf("%d", notification.Timestamp)))

d := gomail.NewDialer(cfg.SMTP.Host, cfg.SMTP.Port, cfg.SMTP.Username, cfg.SMTP.Password)
Expand Down
4 changes: 4 additions & 0 deletions pkg/clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,7 @@ func (c *Client) Close() error {
func (c *Client) Query(ctx context.Context, query string, args ...interface{}) (driver.Rows, error) {
return c.conn.Query(ctx, query, args...)
}

func (c *Client) QueryRow(ctx context.Context, query string, args ...interface{}) driver.Row {
return c.conn.QueryRow(ctx, query, args...)
}
3 changes: 3 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ type Metric struct {
MemMB float64 `json:"mem_mb"`
Timestamp int64 `json:"timestamp"`
Tenant string `json:"tenant"`
Type string `json:"type,omitempty"`
StartTime int64 `json:"start_time,omitempty"`
EndTime int64 `json:"end_time,omitempty"`
}

type Action struct {
Expand Down
Loading