-
Notifications
You must be signed in to change notification settings - Fork 1
feat(streaming): add live trading orchestrator pipeline #63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(streaming): add live trading orchestrator pipeline #63
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| self._sync_position_manager() | ||
| if qty > 0: | ||
| if self.position_manager is not None: | ||
| self.position_manager.open_position(symbol, qty, price) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent double-counting cash when mirroring trades to position manager
When a position manager is configured, _handle_signal copies the portfolio’s post-trade cash into the PositionManager and then immediately calls position_manager.open_position(...). PositionManager.open_position/close_position adjust cash themselves, so this ordering subtracts the notional a second time on buys and adds it twice on sells. The position manager’s portfolio_value and risk guard will therefore drift from the actual portfolio on every trade, leading to incorrect risk halts or position sizing in live trading whenever a position manager config is supplied.
Useful? React with 👍 / 👎.
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| symbol = message.get("symbol") | ||
| if not symbol: | ||
| continue | ||
| self._update_history(message) | ||
| price = float( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normalize provider payloads before inferring signals
The consumer loop assumes incoming messages already expose standard keys (symbol, price/close) and immediately skips anything missing them. StreamingGateway currently forwards provider payloads untouched (the adapters don’t rewrite Alpaca/IB fields and MessageProcessor is a pass‑through), so real provider messages that use keys like S/p will be dropped and no history, features, or trades will ever be produced. The live trading pipeline will therefore sit idle when connected to the shipped adapters unless the messages are normalized before they hit this code.
Useful? React with 👍 / 👎.
|
@codex review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codex Review
Here are some automated review suggestions for this pull request.
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| history = self._history.get(symbol) | ||
| history = pd.concat([history, row]) if history is not None else row | ||
| self._history[symbol] = history.tail(self.history_window) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Preserve chronological order when buffering streaming history
Incoming streaming bars are appended to _history without reordering, so any late or replayed messages are treated as the newest rows before tail(self.history_window) trims the buffer. When providers replay buffered ticks after reconnects (or deliver out-of-order messages), an older bar can displace the true latest tick and the feature generator will run on stale prices, producing incorrect signals and trades. Sorting or de-duping by timestamp before trimming would avoid using misordered data.
Useful? React with 👍 / 👎.
…-implementation Harden live trading price handling and risk updates
Summary
Testing
Codex Task