Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 72 additions & 24 deletions chat_adapter/lib/src/bloc/chat_bloc.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:async';

import 'package:bloc/bloc.dart';
import 'package:chat_core/chat_core.dart';

Expand All @@ -7,9 +9,11 @@ import 'chat_state.dart';
class ChatBloc extends Bloc<ChatBlocEvent, ChatState> {
ChatBloc(this._chatService) : super(const ChatState.initial()) {
on<SendMessageRequested>(_onSendMessage);
on<StopGenerationRequested>(_onStopGeneration);
}

final ChatService _chatService;
StreamSubscription<ChatEvent>? _chatSubscription;

Future<void> _onSendMessage(
SendMessageRequested event,
Expand All @@ -25,29 +29,73 @@ class ChatBloc extends Bloc<ChatBlocEvent, ChatState> {
),
);

await emit.forEach(
_chatService.chat(state.messages),
onData: (ChatEvent chatEvent) {
return switch (chatEvent) {
TextDelta(:final text) => state.copyWith(
currentResponse: state.currentResponse + text,
),
Done() => state.copyWith(
messages: [
...state.messages,
Message.assistant(content: state.currentResponse),
],
isGenerating: false,
currentResponse: '',
),
ChatError(:final message) => state.copyWith(
isGenerating: false,
error: message,
),
// Tool calls not handled yet (Milestone 3)
ToolCallRequest() => state,
};
},
);
final completer = Completer<void>();
_chatSubscription = _chatService
.chat(state.messages)
.listen(
(chatEvent) => _handleChatEvent(chatEvent, emit),
onDone: () => _completeSubscription(completer),
onError: (Object e) {
emit(state.copyWith(isGenerating: false, error: e.toString()));
_completeSubscription(completer);
},
);
await completer.future;
}

void _handleChatEvent(ChatEvent chatEvent, Emitter<ChatState> emit) {
emit(switch (chatEvent) {
TextDelta(:final text) => state.copyWith(
currentResponse: state.currentResponse + text,
),
Done() => state.copyWith(
messages: [
...state.messages,
Message.assistant(content: state.currentResponse),
],
isGenerating: false,
currentResponse: '',
),
ChatError(:final message) => state.copyWith(
isGenerating: false,
error: message,
),
ToolCallRequest() => state,
});
}

void _completeSubscription(Completer<void> completer) {
_chatSubscription = null;
if (!completer.isCompleted) completer.complete();
}

void _cancelSubscription() {
_chatSubscription?.cancel();
_chatSubscription = null;
}

void _onStopGeneration(
StopGenerationRequested event,
Emitter<ChatState> emit,
) {
_cancelSubscription();
if (state.isGenerating) {
emit(
state.copyWith(
messages: [
...state.messages,
Message.assistant(content: state.currentResponse),
],
isGenerating: false,
currentResponse: '',
),
);
}
}

@override
Future<void> close() {
_cancelSubscription();
return super.close();
}
}
7 changes: 7 additions & 0 deletions chat_adapter/lib/src/bloc/chat_bloc_event.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,10 @@ class SendMessageRequested extends ChatBlocEvent {
@override
List<Object?> get props => [content];
}

class StopGenerationRequested extends ChatBlocEvent {
const StopGenerationRequested();

@override
List<Object?> get props => [];
}
43 changes: 43 additions & 0 deletions chat_adapter/test/bloc/chat_bloc_test.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'dart:async';

import 'package:bloc_test/bloc_test.dart';
import 'package:chat_adapter/chat_adapter.dart';
import 'package:chat_core/chat_core.dart';
Expand Down Expand Up @@ -79,5 +81,46 @@ void main() {
),
],
);

blocTest<ChatBloc, ChatState>(
'stops generation and keeps partial response',
build: () {
final controller = StreamController<ChatEvent>();
when(
() => mockService.chat(any()),
).thenAnswer((_) => controller.stream);
// Emit some deltas then don't close (simulates ongoing stream)
Future.microtask(() {
controller.add(const TextDelta('Hello'));
controller.add(const TextDelta(' wor'));
});
return ChatBloc(mockService);
},
act: (bloc) async {
bloc.add(const SendMessageRequested('Hi'));
// Wait for deltas to be processed
await Future<void>.delayed(const Duration(milliseconds: 50));
bloc.add(const StopGenerationRequested());
},
expect: () => [
ChatState(messages: [Message.user('Hi')], isGenerating: true),
ChatState(
messages: [Message.user('Hi')],
isGenerating: true,
currentResponse: 'Hello',
),
ChatState(
messages: [Message.user('Hi')],
isGenerating: true,
currentResponse: 'Hello wor',
),
ChatState(
messages: [
Message.user('Hi'),
Message.assistant(content: 'Hello wor'),
],
),
],
);
});
}
6 changes: 6 additions & 0 deletions skewr_cli/lib/src/cli/chat_repl.dart
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class ChatRepl {

StreamSubscription<ProcessSignal> _listenSigint() {
return ProcessSignal.sigint.watch().listen((_) {
if (_bloc.state.isGenerating) {
_bloc.add(const StopGenerationRequested());
stdout.writeln();
stdout.writeln('[\u270b interrupted]');
return;
}
if (_pendingExit) {
_clearLine();
stdout.writeln('Goodbye!');
Expand Down
Loading