Conversation
Event document represents time series for storing various events
Added list of linked events to Resource model and created log_event() method to record new events into the database
Created to functions to publish and listen to the messages from redis
Changed get_updates function to use ResourceID instead of Resource object, removed event_generator
operation_id is not set inside router module
There is no point in storing events inside of the Resource
There was a problem hiding this comment.
Pull Request Overview
This PR implements an event logging system with real-time notifications for workspace operations. The changes introduce WebSocket connections for real-time communication, MongoDB time-series collections for event storage, and Redis pub/sub for push notifications.
Key changes:
- Added WebSocket support with action parsing and real-time messaging capabilities
- Implemented event logging system using MongoDB time-series collections
- Added Redis integration for real-time push notifications and Server-Sent Events (SSE)
Reviewed Changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| src/unipoll_api/websocket_manager.py | Enhanced WebSocket manager with JSON messaging and action parsing functionality |
| src/unipoll_api/utils/events.py | New event utilities for getting updates, streaming events, and notifying members |
| src/unipoll_api/schemas/websocket.py | New WebSocket message schema for action-based communication |
| src/unipoll_api/routes/websocket.py | Updated WebSocket route with authentication and message parsing |
| src/unipoll_api/routes/streams.py | New streaming endpoints for event logs and Redis notifications |
| src/unipoll_api/routes/init.py | Added streams router to main application |
| src/unipoll_api/redis.py | New Redis client for pub/sub messaging |
| src/unipoll_api/mongo_db.py | Added Event document to database initialization |
| src/unipoll_api/exceptions/websocket.py | New WebSocket-specific exception classes |
| src/unipoll_api/exceptions/init.py | Exported WebSocket exceptions |
| src/unipoll_api/documents.py | Added Event document with time-series configuration |
| src/unipoll_api/dependencies.py | Updated WebSocket authentication dependency |
| src/unipoll_api/config.py | Added Redis configuration settings |
| src/unipoll_api/app.py | Added application start time tracking |
| src/unipoll_api/actions/workspace.py | Added event logging to workspace updates |
| src/unipoll_api/actions/websocket.py | New WebSocket action handlers for various operations |
| src/unipoll_api/actions/init.py | Exported WebSocket actions |
| pyproject.toml | Added sse-starlette dependency |
Comments suppressed due to low confidence (1)
| filtered_args = {} | ||
| for key, required in args.items(): | ||
| value = data.get(key) | ||
| if required and not value: |
There was a problem hiding this comment.
This condition will incorrectly reject valid falsy values like 0, False, or empty strings that might be legitimate arguments. Consider using value is None instead.
| if required and not value: | |
| if required and value is None: |
| response: BaseModel = await action(**args) | ||
| if not response: | ||
| response = {"status": "success"} | ||
| return response.model_dump(exclude_none=True) |
There was a problem hiding this comment.
When response is a dictionary (line 55), calling .model_dump() will fail since dictionaries don't have this method. The function should handle both BaseModel instances and dictionaries consistently.
| return response.model_dump(exclude_none=True) | |
| if isinstance(response, BaseModel): | |
| return response.model_dump(exclude_none=True) | |
| return response |
| try: | ||
| timestamp = datetime.now() | ||
| for member in resource.members: | ||
| # print(member) |
There was a problem hiding this comment.
Commented-out debug print statement should be removed from production code.
| # print(member) |
| time = datetime.fromisoformat(since) | ||
| return await get_updates(resource_id, time) | ||
| except Exception as e: | ||
| print(e) |
There was a problem hiding this comment.
Using print() for error handling is not appropriate for production code. Consider using proper logging or raising appropriate exceptions.
| print(e) | ||
|
|
||
|
|
There was a problem hiding this comment.
Using print() for error handling is not appropriate for production code. Consider using proper logging or raising appropriate exceptions.
| print(e) | |
| logger.error(f"Error in generate_event: {e}") | |
| raise HTTPException(status_code=500, detail="Internal Server Error") |
| print("Connection error:", e) | ||
| except Exception as e: | ||
| print("An unexpected error occurred:", e) |
There was a problem hiding this comment.
Using print() for error handling is not appropriate for production code. Consider using proper logging.
| print("Connection error:", e) | |
| except Exception as e: | |
| print("An unexpected error occurred:", e) | |
| logger.error("Connection error: %s", e) | |
| except Exception as e: | |
| logger.error("An unexpected error occurred: %s", e) |
| print("Connection error:", e) | ||
| except Exception as e: | ||
| print("An unexpected error occurred:", e) |
There was a problem hiding this comment.
Using print() for error handling is not appropriate for production code. Consider using proper logging.
| print("Connection error:", e) | |
| except Exception as e: | |
| print("An unexpected error occurred:", e) | |
| logger.error("Connection error: %s", e) | |
| except Exception as e: | |
| logger.error("An unexpected error occurred: %s", e) |
| if user_id == message.get("recipient_id"): | ||
| yield {"data": json.dumps(message)} | ||
| except redis.exceptions.ConnectionError as e: | ||
| print("Connection error:", e) |
There was a problem hiding this comment.
Using print() for error handling is not appropriate for production code. Consider using proper logging.
| print("Connection error:", e) | |
| logger.error("Connection error: %s", e) |
| # @after_event(Insert) | ||
| # def |
There was a problem hiding this comment.
Incomplete commented code should be removed or properly implemented.
| # @after_event(Insert) | |
| # def | |
| @after_event(Insert) | |
| async def after_insert(self): | |
| Debug(f"Event inserted with ID: {self.id}") |
| "message": message} | ||
| await publish_message(data) | ||
| except Exception as e: | ||
| print(e) |
There was a problem hiding this comment.
Using print() for error handling is not appropriate for production code. Consider using proper logging or raising appropriate exceptions.
| print(e) | |
| Debug.error(f"An error occurred in notify_members: {e}") | |
| raise |
This pull request introduces significant new features and enhancements to the
unipoll_apiproject, including WebSocket and event streaming functionality, Redis integration for notifications, and improved workspace and group management. Key updates include the addition of new WebSocket actions, Redis-based push notifications, event logging for resources, and new endpoints for event streams.New Features and Integrations:
WebSocket Actions:
websocket.pywith WebSocket-related actions for managing workspaces and groups, including CRUD operations for resources, members, and policies. (src/unipoll_api/actions/websocket.py)WebsocketActionsinto__init__.pyto expose WebSocket functionality. (src/unipoll_api/actions/__init__.py)Redis Integration:
src/unipoll_api/redis.py)redis_host,redis_port) to application settings. (src/unipoll_api/config.py)Event Logging and Streaming:
Eventdocument for time-series event logging. (src/unipoll_api/documents.py)src/unipoll_api/actions/workspace.py)src/unipoll_api/routes/streams.py)Enhancements to Existing Components:
Workspace and Group Management:
src/unipoll_api/actions/workspace.py)src/unipoll_api/dependencies.py)Error Handling:
src/unipoll_api/exceptions/websocket.py)Configuration and Dependencies:
sse-starlettefor server-sent events. (pyproject.toml)src/unipoll_api/app.py)These changes collectively enhance the API's real-time capabilities and improve resource management while introducing robust event logging and notification systems.