Skip to content

Conversation

@aim2120
Copy link
Collaborator

@aim2120 aim2120 commented Nov 24, 2025

Bug

Fixes a bug in SingleValueSubject that could cause hangs due to lack of cooperative cancellation.

Most notably, it would prevent timeout from throwing an error if the subject never finished.

Before

// this would hang if subject never finishes (does not throw a TimeoutError)
DeferredTask { try await subject.execute() }.timeout(.seconds(1)).execute()

After

// this throws a TimeoutError as expected
DeferredTask { try await subject.execute() }.timeout(.seconds(1)).execute()

I've verified that without the fix (the cooperative cancellation cancel() implementation on SingleValueSubject) both new tests will hang.

Caveats

With the diff at time of writing, this change is "opt in", due to the fact that a cancelled SingleValueSubject will now throw if send() is called and the subject has been cancelled. This is breaking behavioral change that may cause unexpected failures in consumer code (in fact, some of Afluent's own tests would break with this change).

Another option to consider would be ignoring the already-finished continuation when send() is called if the SingleValueSubject was finished due to cancellation. This may be a more desirable behavior, since it would enable cooperative cancellation to be rolled out without breaking current consumer expectations.

Please let me know what you think and which path forward seems preferable!

Summary by CodeRabbit

  • New Features

    • Added public cancellation capability for single-value operations so they can be cancelled cleanly.
  • Documentation

    • Added a warning about requiring cooperative cancellation on inputs to avoid hangs.
  • Bug Fixes

    • Suppress spurious completion errors when an operation is cancelled, improving cancellation behavior.
  • Tests

    • Added tests for cancellation behavior and timeout error message assertions.

✏️ Tip: You can customize this high-level summary in your review settings.

@trunk-io
Copy link

trunk-io bot commented Nov 24, 2025

😎 Merged successfully - details.

@coderabbitai
Copy link

coderabbitai bot commented Nov 24, 2025

📝 Walkthrough

Walkthrough

Adds a public cancel() method to SingleValueSubject that finishes its continuation with a CancellationError and suppresses SubjectError.alreadyCompleted when the subject is cancelled. Updates Race doc comment with a cancellation warning. Adds tests covering subject cancellation and timeout behavior.

Changes

Cohort / File(s) Summary
Documentation
Sources/Afluent/Race.swift
Added a Warning section to the Race doc comment advising that input tasks must support cooperative cancellation to avoid potential hangs (includes reference link).
SingleValueSubject implementation
Sources/Afluent/Workers/SingleValueSubject.swift
Added public func cancel() that finishes the internal continuation by throwing CancellationError. Adjusted send(_:), send() (Void), and send(error:) to suppress SubjectError.alreadyCompleted when state.isCancelled == true.
Tests
Tests/AfluentTests/WorkerTests/SingleValueSubjectTests.swift, Tests/AfluentTests/WorkerTests/TimeoutTests.swift
Added singleValueSubjectCancellation test. Updated taskTimesOutIfItTakesTooLong assertion and added taskTimesOutIfItTakesTooLong_withSingleValueSubject mirroring timeout checks and error-description assertions.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Caller as Task/Caller
    participant Subject as SingleValueSubject
    participant Cont as Continuation

    Caller->>Subject: cancel()
    alt subject not completed
        Subject->>Cont: finish(throwing: CancellationError)
        Cont-->>Caller: awaiting call resumes with CancellationError
    else subject already completed
        Subject-->>Caller: no-op / alreadyCompleted suppressed
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Areas to review closely:

  • SingleValueSubject.cancel() implementation and correct finishing of the continuation.
  • Suppression logic for SubjectError.alreadyCompleted in send(_:), send() and send(error:) when state.isCancelled.
  • New/updated tests (SingleValueSubjectTests.swift, TimeoutTests.swift) for timing/race conditions.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately captures the main fix: implementing cooperative cancellation in SingleValueSubject to prevent hangs when used with Timeout operations.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch aim2120/single-value-subject-timeout-hang

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@aim2120 aim2120 force-pushed the aim2120/single-value-subject-timeout-hang branch from 1731f46 to 57af766 Compare November 24, 2025 01:37
@aim2120 aim2120 marked this pull request as ready for review November 24, 2025 01:46
@Tyler-Keith-Thompson
Copy link
Owner

Another option to consider would be ignoring the already-finished continuation when send() is called if the SingleValueSubject was finished due to cancellation.

Let's do that, that sounds like it would be reasonably expected.

@aim2120
Copy link
Collaborator Author

aim2120 commented Nov 24, 2025

Updated to not throw when send() called after cancellation

Copy link
Owner

@Tyler-Keith-Thompson Tyler-Keith-Thompson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this and fixing it! Looks good.

@trunk-io trunk-io bot merged commit f90d0fc into main Nov 24, 2025
4 of 5 checks passed
@trunk-io trunk-io bot deleted the aim2120/single-value-subject-timeout-hang branch November 24, 2025 02:03
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d5e3ae4 and 81d88bc.

📒 Files selected for processing (1)
  • Tests/AfluentTests/WorkerTests/SingleValueSubjectTests.swift (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
Tests/AfluentTests/WorkerTests/SingleValueSubjectTests.swift (1)
Sources/Afluent/Workers/SingleValueSubject.swift (4)
  • cancel (93-97)
  • send (108-121)
  • send (128-141)
  • send (149-162)
🔇 Additional comments (1)
Tests/AfluentTests/WorkerTests/SingleValueSubjectTests.swift (1)

209-228: Test structure looks good and correctly verifies cooperative cancellation.

The test properly verifies the two key behaviors:

  1. Cancellation causes CancellationError to be thrown
  2. Calling send() after cancellation doesn't throw

The use of withMainSerialExecutor and Task.megaYield() appropriately addresses potential race conditions. The comment on line 225 clearly documents the expected behavior.

Minor note: The _ = pattern on line 214 appears to be used to discard the #expect result. While this works, you may want to verify if this is the idiomatic pattern for the Testing framework or if the result can simply be ignored without explicit assignment.

Comment on lines +209 to +228
@Test func singleValueSubjectCancellation() async throws {
try await withMainSerialExecutor {
let subject = SingleValueSubject<Void>()

let task = Task {
_ = await #expect(throws: CancellationError.self) {
try await subject.execute()
}
}

// make sure we don't cancel before the subject operation has begun
await Task.megaYield()
task.cancel()

await task.value

// should not throw, even though the subject was cancelled
try subject.send()
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Consider adding test coverage for non-void send variants after cancellation.

The test correctly verifies that send() doesn't throw after cancellation for a SingleValueSubject<Void>. However, based on the relevant code snippets, the same behavior should apply to send(_:) and send(error:). Consider adding similar test cases to ensure comprehensive coverage of the cooperative cancellation behavior.

Example scenarios to test:

  • Cancelling a SingleValueSubject<Int>, then calling send(42) should not throw
  • Cancelling a subject, then calling send(error: someError) should not throw

Suggested test additions:

@Test func singleValueSubjectCancellation_withValue() async throws {
    try await withMainSerialExecutor {
        let subject = SingleValueSubject<Int>()
        
        let task = Task {
            _ = await #expect(throws: CancellationError.self) {
                try await subject.execute()
            }
        }
        
        await Task.megaYield()
        task.cancel()
        await task.value
        
        // should not throw, even though the subject was cancelled
        try subject.send(42)
    }
}

@Test func singleValueSubjectCancellation_withError() async throws {
    enum TestError: Error { case test }
    try await withMainSerialExecutor {
        let subject = SingleValueSubject<Int>()
        
        let task = Task {
            _ = await #expect(throws: CancellationError.self) {
                try await subject.execute()
            }
        }
        
        await Task.megaYield()
        task.cancel()
        await task.value
        
        // should not throw, even though the subject was cancelled
        try subject.send(error: TestError.test)
    }
}
🤖 Prompt for AI Agents
In Tests/AfluentTests/WorkerTests/SingleValueSubjectTests.swift around lines
209-228, add coverage for non-void send variants after cancellation: create two
new async tests (e.g., singleValueSubjectCancellation_withValue and
singleValueSubjectCancellation_withError) that mirror the existing Void test
pattern (use withMainSerialExecutor, create SingleValueSubject<Int>(), start a
Task that expects CancellationError from subject.execute(), await
Task.megaYield(), cancel the task, await task.value) and then call try
subject.send(42) in the value test and declare a small TestError enum and call
try subject.send(error: TestError.test) in the error test to ensure these send
variants also do not throw after cancellation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants