Conversation
Summary of ChangesHello @Yunnglin, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces substantial improvements to the server's operational stability and resource efficiency. It refines how tasks are queued and processed, implements more granular control over adapter lifecycles, and updates core configurations to support these changes. The modifications aim to provide a more robust and predictable environment for training and inference, while also streamlining client-side interactions and documentation. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant refactoring of the server-side adapter management and task queuing systems, bringing more robust lifecycle handling, fairer scheduling, and improved state management. The changes also include updates to client examples and documentation to align with the new server capabilities.
My review has identified a critical concurrency issue in the AdapterManagerMixin due to missing thread synchronization, a bug involving a duplicated method call, and some areas for improvement in documentation clarity and code style in the client library. Addressing these points will greatly improve the stability and maintainability of the system.
| # 'inactivity_counter': int} | ||
| # Dict mapping adapter_name -> | ||
| # {'token': str, 'session_id': str, 'last_activity': float, 'created_at': float, 'inactivity_counter': int} | ||
| self._adapter_records: dict[str, dict[str, Any]] = {} |
There was a problem hiding this comment.
The _adapter_records dictionary is accessed and modified from multiple threads without any locking. The _adapter_countdown_loop runs in a separate threading.Thread, while methods like register_adapter, unregister_adapter, and touch_adapter are called from the asyncio event loop that handles requests. This concurrent access to a shared dictionary is not thread-safe and will lead to race conditions and data corruption. Please re-introduce a threading.Lock to protect all accesses to _adapter_records.
|
|
||
|
|
||
| class DataLoader: | ||
| class DataLoader(object): |
There was a problem hiding this comment.
Inheriting from object is redundant in Python 3. This issue is also present in src/twinkle_client/dataset/base.py and src/twinkle_client/processor/base.py. These style issues are likely because src/twinkle_client/ is excluded from pre-commit checks. Please remove the redundant inheritance for cleaner code and consider re-enabling pre-commit for this directory to maintain code quality.
| class DataLoader(object): | |
| class DataLoader: |
There was a problem hiding this comment.
Pull request overview
This pull request titled "Fix moe" contains extensive changes that go well beyond MoE (Mixture of Experts) fixes. The changes include a major refactoring of the task queue system, adapter lifecycle management improvements, API signature changes, and numerous formatting updates to client-generated files.
Changes:
- Major task queue refactoring: Changed from single queue to per-model/per-token queues with coroutine factories instead of coroutines
- Adapter lifecycle management: Enhanced session-based expiration, TTL enforcement, and gradient state tracking
- API signature changes: Breaking changes to
schedule_task(now requires coro_factory),_on_adapter_expired(removed token parameter), and parameter renames
Reviewed changes
Copilot reviewed 31 out of 32 changed files in this pull request and generated 22 comments.
Show a summary per file
| File | Description |
|---|---|
| src/twinkle_client/init.py | Changed API key validation from None check to falsy check |
| src/twinkle_client/sampler/vllm_sampler.py | Import reordering and formatting changes |
| src/twinkle_client/processor/base.py | Formatting changes, added trailing whitespace, quote style changes |
| src/twinkle_client/model/multi_lora_transformers.py | Import reordering |
| src/twinkle_client/dataset/*.py | Formatting changes, quote style changes, trailing whitespace |
| src/twinkle_client/dataloader/dataloader.py | Formatting changes, added object inheritance |
| src/twinkle/server/utils/task_queue.py | Major refactoring: per-queue architecture, coro_factory pattern, preflight checks |
| src/twinkle/server/utils/state.py | Added session heartbeat tracking, kwargs parameter |
| src/twinkle/server/utils/adapter_manager.py | Session-based expiration, TTL enforcement, adapter state management, removed token from _on_adapter_expired |
| src/twinkle/server/twinkle/sampler.py | Adapter expiration handling (signature mismatch bug) |
| src/twinkle/server/twinkle/model.py | Duplicate remove_adapter call bug |
| src/twinkle/server/tinker/server.py | Server config handling, normalize_models with incorrect parameter name |
| src/twinkle/server/tinker/sampler.py | Adapter URI validation logic bug (fails all non-adapter requests) |
| src/twinkle/server/tinker/model.py | Gradient state tracking, cleanup refactoring, batch validation moved |
| src/twinkle/server/tinker/common/compat_base.py | Enhanced metrics cleaning with string parsing bug |
| src/twinkle/model/megatron/megatron.py | Parameter rename from resume to load_optimizer |
| cookbook/client/tinker/sample.py | Duplicate imports |
| cookbook/client/tinker/short_math_grpo.py | Updated to use Template instead of tokenizer |
| cookbook/client/tinker/self_congnition.py | Updated to use Template instead of tokenizer |
| cookbook/client/twinkle/transformer/server_config.yaml | Config updates: dp_size, environment variables |
| cookbook/client/tinker/megatron/server_config*.yaml | Adapter timeout, max_lifetime, rate limit changes |
| cookbook/client/tinker/grpo.py | File deleted |
| .pre-commit-config.yaml | Expanded exclusion patterns for examples/cookbook |
| .gitignore | Added swanlog/ directory |
Comments suppressed due to low confidence (1)
src/twinkle/server/twinkle/sampler.py:180
- This method requires 3 positional arguments, whereas overridden AdapterManagerMixin._on_adapter_expired requires 2. This call correctly calls the base method, but does not match the signature of the overriding method.
def _on_adapter_expired(self, adapter_name: str, token: str) -> None:
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| def __call__(self, inputs: Union[InputFeature, List[InputFeature]], **kwargs): | ||
| response = http_post( | ||
| url=f'{self.server_url}/processors/call', | ||
| json_data={ | ||
| 'processor_id': self.processor_id, | ||
| 'function': '__call__', | ||
| **{ | ||
| 'inputs': inputs | ||
| }, | ||
| **{'inputs': inputs}, | ||
| **kwargs | ||
| }) | ||
| } | ||
| ) | ||
| response.raise_for_status() | ||
| return response.json()['result'] | ||
| return response.json()["result"] | ||
|
No newline at end of file |
There was a problem hiding this comment.
Trailing whitespace has been added at lines 42 and 55. While this doesn't affect functionality, it's inconsistent with the file's style and may trigger linting warnings. The pre-commit configuration has been updated to exclude client files, but adding trailing whitespace is still not ideal. Consider removing these trailing spaces.
| async def schedule_task( | ||
| self, | ||
| coro: Coroutine, | ||
| coro_factory: Callable[[], Coroutine], |
There was a problem hiding this comment.
The signature change from coro: Coroutine to coro_factory: Callable[[], Coroutine] is a breaking API change. All callers of schedule_task must now pass a factory function instead of an already-created coroutine. While all the visible callers in this PR have been updated (e.g., _do_sample() changed to _do_sample), this could break external code or plugins that call schedule_task. Consider whether this breaking change is intended and if it should be documented or versioned appropriately.
| # Remove adapter from model | ||
| self.model.remove_adapter(adapter_name) | ||
|
|
There was a problem hiding this comment.
The _on_adapter_expired method calls self.model.remove_adapter(adapter_name) twice: once at line 204 and again at line 211. This appears to be duplicated logic that could cause issues. The second call will likely fail if the first one succeeded, or it's unnecessary redundancy. One of these calls should be removed.
| # Remove adapter from model | |
| self.model.remove_adapter(adapter_name) |
| # Validate adapter URI existence if provided | ||
| if not adapter_uri or not os.path.exists(adapter_uri): | ||
| return types.RequestFailedResponse( | ||
| error=f'Adapter URI {model_path} does not exist. Please check the model_path.', | ||
| category=types.RequestErrorCategory.User, | ||
| ) |
There was a problem hiding this comment.
The validation at lines 165-169 checks if adapter_uri exists, but this check happens even when model_path is None or when no adapter is needed. The condition should be if adapter_uri and not os.path.exists(adapter_uri): to avoid failing when no adapter URI is intentionally provided. Currently, if adapter_uri is None or empty string, the check not adapter_uri or not os.path.exists(adapter_uri) will always be True, causing all requests without an adapter to fail.
| s = value.strip() | ||
| if s: | ||
| try: | ||
| head, unit = s.split() # ignore unit/tail |
There was a problem hiding this comment.
At line 101, s.split() without an argument will split on any whitespace and could raise ValueError if the split result doesn't have exactly 2 elements. For example, "123 seconds remaining" would split into 3 elements. The unpacking head, unit = s.split() will fail with ValueError: too many values to unpack. Consider using s.split(maxsplit=1) or handling the ValueError more gracefully.
| head, unit = s.split() # ignore unit/tail | |
| parts = s.split(maxsplit=1) # split into value and unit/tail | |
| if len(parts) != 2: | |
| raise ValueError("Expected a value and a unit in metric string") | |
| head, unit = parts |
|
|
||
| from twinkle.dataset import Dataset, DatasetMeta | ||
| from twinkle_client.http import http_post, heartbeat_manager | ||
| from twinkle.dataset import Dataset |
There was a problem hiding this comment.
Import of 'Dataset' is not used.
| from twinkle.dataset import Dataset |
| # 1. Modify the source files in src/twinkle/ | ||
| # 2. Run: python client_tools/client_generator.py | ||
| # ============================================================================ | ||
| from typing import Any, Optional, Union, Type, Dict, Literal, List |
There was a problem hiding this comment.
Import of 'List' is not used.
Import of 'Type' is not used.
Import of 'Union' is not used.
Import of 'Literal' is not used.
| from typing import Any, Optional, Union, Type, Dict, Literal, List | |
| from typing import Any, Optional, Dict |
| from pydantic import BaseModel, Field | ||
| from ray import serve | ||
| from typing import Any, Dict, List, Optional | ||
| from typing import Any, Dict, List, Optional, Union |
There was a problem hiding this comment.
Import of 'Union' is not used.
| from typing import Any, Dict, List, Optional, Union | |
| from typing import Any, Dict, List, Optional |
| from dataclasses import dataclass | ||
| from enum import Enum | ||
| from typing import TYPE_CHECKING, Any, Callable, Coroutine, Dict, List, Optional, Tuple | ||
| from typing import TYPE_CHECKING, Any, Callable, Coroutine, Deque, Dict, Optional |
There was a problem hiding this comment.
Import of 'Dict' is not used.
Import of 'Optional' is not used.
| from typing import TYPE_CHECKING, Any, Callable, Coroutine, Deque, Dict, Optional | |
| from typing import TYPE_CHECKING, Any, Callable, Coroutine, Deque |
| try: | ||
| while queue_key in self._queue_order: | ||
| self._queue_order.remove(queue_key) | ||
| except ValueError: |
There was a problem hiding this comment.
'except' clause does nothing but pass and there is no explanatory comment.
| except ValueError: | |
| except ValueError: | |
| # If the queue_key is already absent from _queue_order, we can safely ignore this. |
No description provided.