Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ DerivedData/
*.swp
*~

# Large media originals (keep only compressed mp4)
*.mov

# IDE
.idea/
.vscode/
Expand Down
14 changes: 10 additions & 4 deletions CreedFlow/Sources/CreedFlow/Engine/DeploymentCoordinator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,24 @@ extension Orchestrator {
}
deployment.port = port

// Persist port assignment before deploying
try await dbQueue.write { db in
var d = deployment
try d.update(db)
}

// Run actual local deployment
_ = try await localDeployService.deploy(
project: project,
deployment: deployment,
port: port
)

// Mark deployment as successful
deployment.status = .success
deployment.completedAt = Date()
// Mark deployment as successful — re-read from DB to avoid stale overwrites
try await dbQueue.write { db in
var d = deployment
guard var d = try Deployment.fetchOne(db, id: deployment.id) else { return }
d.status = .success
d.completedAt = Date()
try d.update(db)
}

Expand Down
11 changes: 10 additions & 1 deletion CreedFlow/Sources/CreedFlow/Engine/Orchestrator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ final class Orchestrator {

private(set) var isRunning = false
private(set) var activeRunners: [UUID: MultiBackendRunner] = [:]
private let runnersLock = NSLock()
private var pollingTask: Task<Void, Never>?

init(
Expand Down Expand Up @@ -173,12 +174,16 @@ final class Orchestrator {
for backend in await backendRouter.allBackends {
await backend.cancelAll()
}
runnersLock.lock()
activeRunners.removeAll()
runnersLock.unlock()
}

/// Get a runner for a specific task (for UI display of live output)
func runner(for taskId: UUID) -> MultiBackendRunner? {
activeRunners[taskId]
runnersLock.lock()
defer { runnersLock.unlock() }
return activeRunners[taskId]
}

// MARK: - Private
Expand Down Expand Up @@ -256,7 +261,9 @@ final class Orchestrator {
continue
}
let runner = MultiBackendRunner(backend: backend, dbQueue: dbQueue)
runnersLock.lock()
activeRunners[task.id] = runner
runnersLock.unlock()

// Record selected backend immediately so UI shows it during in_progress
let selectedBackend = backend.backendType.rawValue
Expand All @@ -283,7 +290,9 @@ final class Orchestrator {
defer {
Task { [weak self] in
await self?.scheduler.release(task: task)
self?.runnersLock.lock()
self?.activeRunners.removeValue(forKey: task.id)
self?.runnersLock.unlock()
}
}

Expand Down
105 changes: 29 additions & 76 deletions CreedFlow/Sources/CreedFlow/Services/CLI/LMStudioBackend.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import Foundation
actor LMStudioBackend: CLIBackend {
nonisolated let backendType = CLIBackendType.lmstudio

private var activeRequests: [UUID: URLSessionDataTask] = [:]
private var activeTasks: Set<UUID> = []

var isAvailable: Bool {
get async {
Expand Down Expand Up @@ -65,46 +65,19 @@ actor LMStudioBackend: CLIBackend {
return (processId, AsyncThrowingStream { $0.finish(throwing: error) })
}

let stream = AsyncThrowingStream<CLIOutputEvent, Error> { continuation in
let task = URLSession.shared.dataTask(with: request) { data, response, error in
let elapsed = Int(Date().timeIntervalSince(startTime) * 1000)

if let error = error {
continuation.yield(.error(error.localizedDescription))
let result = CLIResult(
output: error.localizedDescription,
isError: true,
sessionId: nil,
model: "lmstudio:\(model)",
costUSD: nil,
durationMs: elapsed,
inputTokens: 0,
outputTokens: 0
)
continuation.yield(.result(result))
continuation.finish()
return
}
activeTasks.insert(processId)
let capturedRequest = request

guard let data = data else {
continuation.yield(.error("No data received"))
let result = CLIResult(
output: "No data received",
isError: true,
sessionId: nil,
model: "lmstudio:\(model)",
costUSD: nil,
durationMs: elapsed,
inputTokens: 0,
outputTokens: 0
)
continuation.yield(.result(result))
continuation.finish()
return
let stream = AsyncThrowingStream<CLIOutputEvent, Error> { continuation in
let asyncTask = Task { [weak self] in
defer {
Task { await self?.removeActiveTask(processId) }
}

// Parse OpenAI-compatible response
do {
let (data, _) = try await URLSession.shared.data(for: capturedRequest)
let elapsed = Int(Date().timeIntervalSince(startTime) * 1000)

guard let json = try JSONSerialization.jsonObject(with: data) as? [String: Any],
let choices = json["choices"] as? [[String: Any]],
let first = choices.first,
Expand All @@ -113,81 +86,61 @@ actor LMStudioBackend: CLIBackend {
let raw = String(data: data, encoding: .utf8) ?? "Unknown response"
continuation.yield(.error("Failed to parse response: \(raw)"))
let result = CLIResult(
output: raw,
isError: true,
sessionId: nil,
model: "lmstudio:\(model)",
costUSD: nil,
durationMs: elapsed,
inputTokens: 0,
outputTokens: 0
output: raw, isError: true, sessionId: nil,
model: "lmstudio:\(model)", costUSD: nil,
durationMs: elapsed, inputTokens: 0, outputTokens: 0
)
continuation.yield(.result(result))
continuation.finish()
return
}

// Extract token usage if available
let usage = json["usage"] as? [String: Any]
let inputTokens = usage?["prompt_tokens"] as? Int ?? 0
let outputTokens = usage?["completion_tokens"] as? Int ?? 0

continuation.yield(.text(content))
let result = CLIResult(
output: content,
isError: false,
sessionId: nil,
model: "lmstudio:\(model)",
costUSD: nil,
durationMs: elapsed,
inputTokens: inputTokens,
outputTokens: outputTokens
output: content, isError: false, sessionId: nil,
model: "lmstudio:\(model)", costUSD: nil,
durationMs: elapsed, inputTokens: inputTokens, outputTokens: outputTokens
)
continuation.yield(.result(result))
continuation.finish()
} catch {
continuation.yield(.error("JSON parse error: \(error.localizedDescription)"))
let elapsed = Int(Date().timeIntervalSince(startTime) * 1000)
continuation.yield(.error(error.localizedDescription))
let result = CLIResult(
output: error.localizedDescription,
isError: true,
sessionId: nil,
model: "lmstudio:\(model)",
costUSD: nil,
durationMs: elapsed,
inputTokens: 0,
outputTokens: 0
output: error.localizedDescription, isError: true, sessionId: nil,
model: "lmstudio:\(model)", costUSD: nil,
durationMs: elapsed, inputTokens: 0, outputTokens: 0
)
continuation.yield(.result(result))
continuation.finish()
}
}

self.activeRequests[processId] = task
task.resume()

continuation.onTermination = { @Sendable _ in
task.cancel()
asyncTask.cancel()
}
}

return (processId, stream)
}

private func removeActiveTask(_ id: UUID) {
activeTasks.remove(id)
}

func cancel(_ processId: UUID) async {
guard let task = activeRequests[processId] else { return }
task.cancel()
activeRequests.removeValue(forKey: processId)
activeTasks.remove(processId)
}

func cancelAll() async {
for (_, task) in activeRequests {
task.cancel()
}
activeRequests.removeAll()
activeTasks.removeAll()
}

func activeCount() -> Int {
activeRequests = activeRequests.filter { $0.value.state == .running }
return activeRequests.count
activeTasks.count
}
}
19 changes: 11 additions & 8 deletions CreedFlow/Sources/CreedFlow/Services/CLI/MultiBackendRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,21 @@ final class MultiBackendRunner {
/// Execute an agent task with full lifecycle management.
/// - Parameter promptOverride: If provided, replaces the agent's default prompt (used by PromptRecommender / ChainExecutor).
func execute(task: AgentTask, agent: any AgentProtocol, workingDirectory: String = "", promptOverride: String? = nil) async throws -> AgentResult {
isRunning = true
liveOutput = []
defer { isRunning = false }
await MainActor.run {
isRunning = true
liveOutput = []
}
defer { Task { @MainActor in self.isRunning = false } }

addOutputLine("Starting \(agent.agentType.rawValue) agent via \(backendType.rawValue) for: \(task.title)", type: .system)
await MainActor.run { self.addOutputLine("Starting \(agent.agentType.rawValue) agent via \(backendType.rawValue) for: \(task.title)", type: .system) }

// Generate MCP config if agent needs MCP servers (Claude only)
var mcpConfigPath: String?
if backendType == .claude, let serverNames = agent.mcpServers, !serverNames.isEmpty {
let generator = MCPConfigGenerator(dbQueue: dbQueue)
mcpConfigPath = try generator.generateConfig(serverNames: serverNames)
if let path = mcpConfigPath {
addOutputLine("MCP config: \(path)", type: .system)
await MainActor.run { self.addOutputLine("MCP config: \(path)", type: .system) }
}
}
defer {
Expand Down Expand Up @@ -147,7 +149,7 @@ final class MultiBackendRunner {
// TaskGroup cancellation from group.cancelAll() — not a real error
} catch let error as ClaudeError where error.localizedDescription.contains("timed out") {
let errorMsg = "Task timed out after \(timeoutSeconds)s"
addOutputLine("Error: \(errorMsg)", type: .error)
await MainActor.run { self.addOutputLine("Error: \(errorMsg)", type: .error) }

try await dbQueue.write { db in
var updatedTask = task
Expand All @@ -165,7 +167,7 @@ final class MultiBackendRunner {

// Detect rate-limit signals in error output
if let signal = RateLimitDetector.detect(in: errorMsg) {
addOutputLine("Rate limited: \(signal)", type: .error)
await MainActor.run { self.addOutputLine("Rate limited: \(signal)", type: .error) }

try await dbQueue.write { db in
var updatedTask = task
Expand All @@ -180,7 +182,7 @@ final class MultiBackendRunner {
throw RateLimitError(backendType: backendType, rawSignal: signal)
}

addOutputLine("Error: \(errorMsg)", type: .error)
await MainActor.run { self.addOutputLine("Error: \(errorMsg)", type: .error) }

try await dbQueue.write { db in
var updatedTask = task
Expand Down Expand Up @@ -249,6 +251,7 @@ final class MultiBackendRunner {
return agentResult
}

@MainActor
private func addOutputLine(_ text: String, type: OutputLine.LineType) {
liveOutput.append(OutputLine(text: text, type: type, timestamp: Date()))
}
Expand Down
21 changes: 10 additions & 11 deletions CreedFlow/Sources/CreedFlow/Services/Chat/ProjectChatService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ private let logger = Logger(subsystem: "com.creedflow", category: "ProjectChatSe

/// Manages chat conversations for a project — handles sending messages,
/// streaming AI responses, and task proposal approval.
@MainActor
@Observable
final class ProjectChatService {
private(set) var messages: [ProjectMessage] = []
Expand All @@ -30,8 +31,8 @@ final class ProjectChatService {
self.history = ChatHistory(dbQueue: dbQueue)
}

deinit {
observationTask?.cancel()
nonisolated deinit {
// observationTask will be cancelled when self is deallocated
}

// MARK: - Binding
Expand All @@ -43,8 +44,9 @@ final class ProjectChatService {
observationTask?.cancel()

// Load project
Task {
self.project = try? await dbQueue.read { db in
let dbRef = dbQueue
Task { @MainActor [weak self] in
self?.project = try? await dbRef.read { db in
try Project.fetchOne(db, id: projectId)
}
}
Expand All @@ -57,13 +59,10 @@ final class ProjectChatService {
.fetchAll(db)
}

let db = dbQueue
observationTask = Task { [weak self] in
observationTask = Task { @MainActor [weak self] in
do {
for try await msgs in observation.values(in: db) {
await MainActor.run {
self?.messages = msgs
}
for try await msgs in observation.values(in: dbRef) {
self?.messages = msgs
}
} catch {
logger.error("Observation error: \(error.localizedDescription)")
Expand Down Expand Up @@ -419,7 +418,7 @@ final class ProjectChatService {
return nil
}

private static func parseAgentType(_ raw: String) -> AgentTask.AgentType {
nonisolated private static func parseAgentType(_ raw: String) -> AgentTask.AgentType {
switch raw.lowercased() {
case "coder": return .coder
case "devops": return .devops
Expand Down
Loading