-
Notifications
You must be signed in to change notification settings - Fork 0
Open
Description
User Story
As a FlowForge system, I want to receive and process webhooks from GitHub/GitLab/Bitbucket, so that jobs can be automatically triggered by repository events like pushes and pull requests.
Acceptance Criteria
- Implement webhook endpoints for GitHub, GitLab, and Bitbucket
- Verify webhook signatures for security (HMAC validation)
- Parse different event types (push, pull_request, merge_request)
- Extract relevant information for job creation
- Handle webhook delivery failures with proper status codes
- Implement idempotency to prevent duplicate job creation
- Support webhook secret rotation without downtime
- Queue webhook processing for reliability
- Track webhook metrics (received, processed, failed)
- Provide webhook debugging interface
Technical Implementation
Webhook Handler Architecture
// pkg/webhook/handler.go
package webhook
import (
"context"
"crypto/hmac"
"encoding/json"
"net/http"
)
type WebhookHandler struct {
providers map[string]Provider
processor *EventProcessor
verifier *SignatureVerifier
queue MessageQueue
metrics *MetricsCollector
config *WebhookConfig
}
type WebhookConfig struct {
MaxPayloadSize int64
ProcessingTimeout time.Duration
EnableDebugMode bool
QueueEnabled bool
}
type Provider interface {
Name() string
ParseRequest(r *http.Request) (*WebhookEvent, error)
VerifySignature(r *http.Request, secret string) error
ExtractJobInfo(event *WebhookEvent) (*JobRequest, error)
}
type WebhookEvent struct {
ID string
Provider string
EventType string
Repository RepositoryInfo
Sender UserInfo
Payload json.RawMessage
ReceivedAt time.Time
}
func (h *WebhookHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
// Extract provider from path
provider := h.extractProvider(r.URL.Path)
if provider == nil {
http.Error(w, "Unknown provider", http.StatusNotFound)
return
}
// Size limit
r.Body = http.MaxBytesReader(w, r.Body, h.config.MaxPayloadSize)
// Parse event
event, err := provider.ParseRequest(r)
if err != nil {
h.metrics.RecordWebhookError(provider.Name(), "parse_error")
http.Error(w, "Invalid webhook payload", http.StatusBadRequest)
return
}
// Get webhook secret
secret, err := h.getWebhookSecret(ctx, event.Repository.ID)
if err != nil {
http.Error(w, "Configuration error", http.StatusInternalServerError)
return
}
// Verify signature
if err := provider.VerifySignature(r, secret); err != nil {
h.metrics.RecordWebhookError(provider.Name(), "signature_invalid")
http.Error(w, "Invalid signature", http.StatusUnauthorized)
return
}
// Check idempotency
if h.isDuplicate(ctx, event) {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status": "already_processed"}`))
return
}
// Process or queue
if h.config.QueueEnabled {
if err := h.queueEvent(ctx, event); err != nil {
http.Error(w, "Failed to queue event", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
w.Write([]byte(`{"status": "queued"}`))
} else {
if err := h.processEvent(ctx, event); err != nil {
http.Error(w, "Processing failed", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status": "processed"}`))
}
h.metrics.RecordWebhookReceived(provider.Name(), event.EventType)
}GitHub Provider Implementation
// pkg/webhook/github.go
type GitHubProvider struct {
supportedEvents []string
}
func (p *GitHubProvider) ParseRequest(r *http.Request) (*WebhookEvent, error) {
eventType := r.Header.Get("X-GitHub-Event")
deliveryID := r.Header.Get("X-GitHub-Delivery")
if eventType == "" {
return nil, fmt.Errorf("missing X-GitHub-Event header")
}
body, err := io.ReadAll(r.Body)
if err != nil {
return nil, err
}
// Parse common fields
var basePayload struct {
Repository struct {
ID int64 `json:"id"`
FullName string `json:"full_name"`
CloneURL string `json:"clone_url"`
SSHURL string `json:"ssh_url"`
} `json:"repository"`
Sender struct {
Login string `json:"login"`
ID int64 `json:"id"`
} `json:"sender"`
}
if err := json.Unmarshal(body, &basePayload); err != nil {
return nil, err
}
return &WebhookEvent{
ID: deliveryID,
Provider: "github",
EventType: eventType,
Repository: RepositoryInfo{
ID: fmt.Sprintf("github:%d", basePayload.Repository.ID),
FullName: basePayload.Repository.FullName,
CloneURL: basePayload.Repository.CloneURL,
SSHURL: basePayload.Repository.SSHURL,
},
Sender: UserInfo{
Username: basePayload.Sender.Login,
ID: fmt.Sprintf("github:%d", basePayload.Sender.ID),
},
Payload: body,
ReceivedAt: time.Now(),
}, nil
}
func (p *GitHubProvider) VerifySignature(r *http.Request, secret string) error {
signature := r.Header.Get("X-Hub-Signature-256")
if signature == "" {
return fmt.Errorf("missing signature header")
}
body, err := io.ReadAll(r.Body)
if err != nil {
return err
}
// Restore body for further processing
r.Body = io.NopCloser(bytes.NewReader(body))
// Calculate expected signature
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
expectedSig := "sha256=" + hex.EncodeToString(mac.Sum(nil))
if !hmac.Equal([]byte(signature), []byte(expectedSig)) {
return fmt.Errorf("signature mismatch")
}
return nil
}
func (p *GitHubProvider) ExtractJobInfo(event *WebhookEvent) (*JobRequest, error) {
switch event.EventType {
case "push":
return p.extractPushJobInfo(event)
case "pull_request":
return p.extractPullRequestJobInfo(event)
default:
return nil, fmt.Errorf("unsupported event type: %s", event.EventType)
}
}
func (p *GitHubProvider) extractPushJobInfo(event *WebhookEvent) (*JobRequest, error) {
var payload struct {
Ref string `json:"ref"`
Before string `json:"before"`
After string `json:"after"`
Commits []struct {
Message string `json:"message"`
Author struct {
Name string `json:"name"`
Email string `json:"email"`
} `json:"author"`
} `json:"commits"`
}
if err := json.Unmarshal(event.Payload, &payload); err != nil {
return nil, err
}
// Extract branch name
branch := strings.TrimPrefix(payload.Ref, "refs/heads/")
return &JobRequest{
Repository: event.Repository,
Branch: branch,
TargetBranch: "main", // Or from configuration
Task: "Automated update from push",
Metadata: map[string]string{
"event_type": "push",
"commit_sha": payload.After,
"commit_msg": getFirstCommitMessage(payload.Commits),
"triggered_by": event.Sender.Username,
},
}, nil
}GitLab Provider Implementation
// pkg/webhook/gitlab.go
type GitLabProvider struct {
supportedEvents []string
}
func (p *GitLabProvider) VerifySignature(r *http.Request, secret string) error {
token := r.Header.Get("X-Gitlab-Token")
if token == "" {
return fmt.Errorf("missing X-Gitlab-Token header")
}
if subtle.ConstantTimeCompare([]byte(token), []byte(secret)) != 1 {
return fmt.Errorf("invalid token")
}
return nil
}
func (p *GitLabProvider) ParseRequest(r *http.Request) (*WebhookEvent, error) {
eventType := r.Header.Get("X-Gitlab-Event")
body, err := io.ReadAll(r.Body)
if err != nil {
return nil, err
}
var basePayload struct {
ObjectKind string `json:"object_kind"`
Project struct {
ID int `json:"id"`
PathWithNamespace string `json:"path_with_namespace"`
GitSSHURL string `json:"git_ssh_url"`
GitHTTPURL string `json:"git_http_url"`
} `json:"project"`
User struct {
Username string `json:"username"`
ID int `json:"id"`
} `json:"user"`
}
if err := json.Unmarshal(body, &basePayload); err != nil {
return nil, err
}
return &WebhookEvent{
ID: r.Header.Get("X-Gitlab-Event-UUID"),
Provider: "gitlab",
EventType: basePayload.ObjectKind,
Repository: RepositoryInfo{
ID: fmt.Sprintf("gitlab:%d", basePayload.Project.ID),
FullName: basePayload.Project.PathWithNamespace,
CloneURL: basePayload.Project.GitHTTPURL,
SSHURL: basePayload.Project.GitSSHURL,
},
Sender: UserInfo{
Username: basePayload.User.Username,
ID: fmt.Sprintf("gitlab:%d", basePayload.User.ID),
},
Payload: body,
ReceivedAt: time.Now(),
}, nil
}Event Processing
// pkg/webhook/processor.go
type EventProcessor struct {
jobManager *JobManager
repoService *RepositoryService
metrics *MetricsCollector
}
func (p *EventProcessor) ProcessEvent(ctx context.Context, event *WebhookEvent) error {
provider := p.getProvider(event.Provider)
if provider == nil {
return fmt.Errorf("unknown provider: %s", event.Provider)
}
// Extract job information
jobReq, err := provider.ExtractJobInfo(event)
if err != nil {
return fmt.Errorf("failed to extract job info: %w", err)
}
// Verify repository exists and is active
repo, err := p.repoService.GetByExternalID(ctx, event.Repository.ID)
if err != nil {
return fmt.Errorf("repository not found: %w", err)
}
if !repo.IsActive {
return fmt.Errorf("repository is not active")
}
// Create job
job := &Job{
RepositoryID: repo.ID,
UserID: repo.UserID,
Branch: jobReq.Branch,
TargetBranch: jobReq.TargetBranch,
Task: jobReq.Task,
Priority: p.calculatePriority(event),
Metadata: jobReq.Metadata,
WebhookEvent: event,
}
createdJob, err := p.jobManager.CreateJob(ctx, job)
if err != nil {
return fmt.Errorf("failed to create job: %w", err)
}
p.metrics.RecordJobCreatedFromWebhook(event.Provider, event.EventType)
return nil
}
func (p *EventProcessor) calculatePriority(event *WebhookEvent) int {
// Higher priority for certain events
switch event.EventType {
case "pull_request":
return 8
case "merge_request":
return 8
case "push":
if strings.Contains(event.Payload, "main") {
return 7
}
return 5
default:
return 5
}
}Idempotency Handler
// pkg/webhook/idempotency.go
type IdempotencyHandler struct {
cache Cache
ttl time.Duration
}
func (h *IdempotencyHandler) CheckAndMark(ctx context.Context, event *WebhookEvent) (bool, error) {
key := h.generateKey(event)
// Try to set with NX (only if not exists)
ok, err := h.cache.SetNX(ctx, key, "1", h.ttl)
if err != nil {
return false, err
}
// If ok is false, key already existed (duplicate)
return !ok, nil
}
func (h *IdempotencyHandler) generateKey(event *WebhookEvent) string {
// Use provider, event ID, and event type for uniqueness
return fmt.Sprintf("webhook:%s:%s:%s", event.Provider, event.ID, event.EventType)
}Webhook Secret Management
// pkg/webhook/secrets.go
type SecretManager struct {
store SecretStore
cache Cache
rotator *SecretRotator
}
type WebhookSecret struct {
RepositoryID string
Provider string
Secret string
Version int
CreatedAt time.Time
ExpiresAt *time.Time
}
func (m *SecretManager) GetSecret(ctx context.Context, repoID, provider string) (string, error) {
// Check cache first
cacheKey := fmt.Sprintf("webhook_secret:%s:%s", repoID, provider)
if secret, err := m.cache.Get(ctx, cacheKey); err == nil {
return secret, nil
}
// Get from store
secret, err := m.store.GetWebhookSecret(ctx, repoID, provider)
if err != nil {
return "", err
}
// Cache for 5 minutes
m.cache.Set(ctx, cacheKey, secret.Secret, 5*time.Minute)
return secret.Secret, nil
}
func (m *SecretManager) RotateSecret(ctx context.Context, repoID, provider string) (*WebhookSecret, error) {
// Generate new secret
newSecret := generateSecureToken(32)
// Store with new version
secret := &WebhookSecret{
RepositoryID: repoID,
Provider: provider,
Secret: newSecret,
Version: m.getNextVersion(ctx, repoID, provider),
CreatedAt: time.Now(),
}
if err := m.store.SaveWebhookSecret(ctx, secret); err != nil {
return nil, err
}
// Invalidate cache
cacheKey := fmt.Sprintf("webhook_secret:%s:%s", repoID, provider)
m.cache.Delete(ctx, cacheKey)
return secret, nil
}Webhook Debugging
// pkg/webhook/debug.go
type WebhookDebugger struct {
storage DebugStorage
config *DebugConfig
}
type WebhookDebugEntry struct {
ID string
Provider string
EventType string
Headers map[string]string
Body []byte
StatusCode int
Error *string
ProcessedAt time.Time
}
func (d *WebhookDebugger) RecordWebhook(r *http.Request, response *WebhookResponse, err error) {
if !d.config.Enabled {
return
}
entry := &WebhookDebugEntry{
ID: generateID(),
Provider: extractProvider(r.URL.Path),
EventType: r.Header.Get(d.getEventHeader(provider)),
Headers: d.sanitizeHeaders(r.Header),
Body: d.readBody(r),
StatusCode: response.StatusCode,
ProcessedAt: time.Now(),
}
if err != nil {
errStr := err.Error()
entry.Error = &errStr
}
// Store asynchronously
go d.storage.Store(entry)
}Architecture References
Webhook Integration Points
Reference: /docs/02-system-components.md:159-162
└── /webhooks
└── POST /github # GitHub webhook
Webhook Handler Implementation
Reference: /docs/02-system-components.md:1041-1060
@router.post("/webhooks/github")
async def github_webhook(
request: Request,
signature: str = Header(None, alias="X-Hub-Signature-256")
):
# Verify webhook signature
if not verify_github_signature(request.body, signature):
raise HTTPException(401, "Invalid signature")
event = request.headers.get("X-GitHub-Event")
payload = await request.json()
# Handle different event types
if event == "push":
await handle_push_event(payload)
elif event == "pull_request":
await handle_pr_event(payload)Event-Driven Architecture
Reference: /docs/03-data-flow.md:227-228
System events include webhook-related events:
enum EventType {
RATE_LIMIT_EXCEEDED = 'rate_limit.exceeded'
}Dependencies
- go-git/v5: Repository operations
- Redis: Idempotency and caching
- PostgreSQL: Webhook configuration
- RabbitMQ/Kafka: Event queuing
Definition of Done
- Unit tests cover all webhook providers with 90%+ coverage
- Integration tests verify webhook processing end-to-end
- Signature verification works for all providers
- Idempotency prevents duplicate job creation
- Secret rotation works without downtime
- Debugging interface shows recent webhooks
- Documentation includes webhook setup guide
Effort Estimate
13 Story Points - Complex with multiple provider implementations
Labels
- backend
- integration
- webhooks
- epic-5