From 5f8443b2cc2d8914d48ff409f49051e13231c69d Mon Sep 17 00:00:00 2001 From: IreneXY Date: Wed, 18 Mar 2026 19:45:05 -0700 Subject: [PATCH] Add Ctrl+C to stop generation mid-stream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add StopGenerationRequested event to ChatBloc - Switch from emit.forEach to StreamSubscription for cancellation support - Ctrl+C during generation stops output and keeps partial response - Ctrl+C at prompt still uses double-press exit logic - Print [✋ interrupted] when generation is stopped - Refactor ChatBloc into smaller private methods Co-Authored-By: Claude Code --- chat_adapter/lib/src/bloc/chat_bloc.dart | 96 ++++++++++++++----- .../lib/src/bloc/chat_bloc_event.dart | 7 ++ chat_adapter/test/bloc/chat_bloc_test.dart | 43 +++++++++ skewr_cli/lib/src/cli/chat_repl.dart | 6 ++ 4 files changed, 128 insertions(+), 24 deletions(-) diff --git a/chat_adapter/lib/src/bloc/chat_bloc.dart b/chat_adapter/lib/src/bloc/chat_bloc.dart index 34268ee..9aac54c 100644 --- a/chat_adapter/lib/src/bloc/chat_bloc.dart +++ b/chat_adapter/lib/src/bloc/chat_bloc.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:bloc/bloc.dart'; import 'package:chat_core/chat_core.dart'; @@ -7,9 +9,11 @@ import 'chat_state.dart'; class ChatBloc extends Bloc { ChatBloc(this._chatService) : super(const ChatState.initial()) { on(_onSendMessage); + on(_onStopGeneration); } final ChatService _chatService; + StreamSubscription? _chatSubscription; Future _onSendMessage( SendMessageRequested event, @@ -25,29 +29,73 @@ class ChatBloc extends Bloc { ), ); - await emit.forEach( - _chatService.chat(state.messages), - onData: (ChatEvent chatEvent) { - return switch (chatEvent) { - TextDelta(:final text) => state.copyWith( - currentResponse: state.currentResponse + text, - ), - Done() => state.copyWith( - messages: [ - ...state.messages, - Message.assistant(content: state.currentResponse), - ], - isGenerating: false, - currentResponse: '', - ), - ChatError(:final message) => state.copyWith( - isGenerating: false, - error: message, - ), - // Tool calls not handled yet (Milestone 3) - ToolCallRequest() => state, - }; - }, - ); + final completer = Completer(); + _chatSubscription = _chatService + .chat(state.messages) + .listen( + (chatEvent) => _handleChatEvent(chatEvent, emit), + onDone: () => _completeSubscription(completer), + onError: (Object e) { + emit(state.copyWith(isGenerating: false, error: e.toString())); + _completeSubscription(completer); + }, + ); + await completer.future; + } + + void _handleChatEvent(ChatEvent chatEvent, Emitter emit) { + emit(switch (chatEvent) { + TextDelta(:final text) => state.copyWith( + currentResponse: state.currentResponse + text, + ), + Done() => state.copyWith( + messages: [ + ...state.messages, + Message.assistant(content: state.currentResponse), + ], + isGenerating: false, + currentResponse: '', + ), + ChatError(:final message) => state.copyWith( + isGenerating: false, + error: message, + ), + ToolCallRequest() => state, + }); + } + + void _completeSubscription(Completer completer) { + _chatSubscription = null; + if (!completer.isCompleted) completer.complete(); + } + + void _cancelSubscription() { + _chatSubscription?.cancel(); + _chatSubscription = null; + } + + void _onStopGeneration( + StopGenerationRequested event, + Emitter emit, + ) { + _cancelSubscription(); + if (state.isGenerating) { + emit( + state.copyWith( + messages: [ + ...state.messages, + Message.assistant(content: state.currentResponse), + ], + isGenerating: false, + currentResponse: '', + ), + ); + } + } + + @override + Future close() { + _cancelSubscription(); + return super.close(); } } diff --git a/chat_adapter/lib/src/bloc/chat_bloc_event.dart b/chat_adapter/lib/src/bloc/chat_bloc_event.dart index dacdf45..12d4af7 100644 --- a/chat_adapter/lib/src/bloc/chat_bloc_event.dart +++ b/chat_adapter/lib/src/bloc/chat_bloc_event.dart @@ -12,3 +12,10 @@ class SendMessageRequested extends ChatBlocEvent { @override List get props => [content]; } + +class StopGenerationRequested extends ChatBlocEvent { + const StopGenerationRequested(); + + @override + List get props => []; +} diff --git a/chat_adapter/test/bloc/chat_bloc_test.dart b/chat_adapter/test/bloc/chat_bloc_test.dart index c775e85..10899f0 100644 --- a/chat_adapter/test/bloc/chat_bloc_test.dart +++ b/chat_adapter/test/bloc/chat_bloc_test.dart @@ -1,3 +1,5 @@ +import 'dart:async'; + import 'package:bloc_test/bloc_test.dart'; import 'package:chat_adapter/chat_adapter.dart'; import 'package:chat_core/chat_core.dart'; @@ -79,5 +81,46 @@ void main() { ), ], ); + + blocTest( + 'stops generation and keeps partial response', + build: () { + final controller = StreamController(); + when( + () => mockService.chat(any()), + ).thenAnswer((_) => controller.stream); + // Emit some deltas then don't close (simulates ongoing stream) + Future.microtask(() { + controller.add(const TextDelta('Hello')); + controller.add(const TextDelta(' wor')); + }); + return ChatBloc(mockService); + }, + act: (bloc) async { + bloc.add(const SendMessageRequested('Hi')); + // Wait for deltas to be processed + await Future.delayed(const Duration(milliseconds: 50)); + bloc.add(const StopGenerationRequested()); + }, + expect: () => [ + ChatState(messages: [Message.user('Hi')], isGenerating: true), + ChatState( + messages: [Message.user('Hi')], + isGenerating: true, + currentResponse: 'Hello', + ), + ChatState( + messages: [Message.user('Hi')], + isGenerating: true, + currentResponse: 'Hello wor', + ), + ChatState( + messages: [ + Message.user('Hi'), + Message.assistant(content: 'Hello wor'), + ], + ), + ], + ); }); } diff --git a/skewr_cli/lib/src/cli/chat_repl.dart b/skewr_cli/lib/src/cli/chat_repl.dart index 2ce5bce..334a4f6 100644 --- a/skewr_cli/lib/src/cli/chat_repl.dart +++ b/skewr_cli/lib/src/cli/chat_repl.dart @@ -32,6 +32,12 @@ class ChatRepl { StreamSubscription _listenSigint() { return ProcessSignal.sigint.watch().listen((_) { + if (_bloc.state.isGenerating) { + _bloc.add(const StopGenerationRequested()); + stdout.writeln(); + stdout.writeln('[\u270b interrupted]'); + return; + } if (_pendingExit) { _clearLine(); stdout.writeln('Goodbye!');