Conversation
This commit improves the pythonicity of the library while maintaining 100% backwards compatibility. All existing code continues to work without modification. Changes: - Add custom exception hierarchy (LoggerError, InvalidTypeError, InvalidSeverityError) to replace assertions for production safety - Add comprehensive Google-style docstrings to all modules, classes, and methods - Add proper type hints using typing module throughout - Fix private method naming (double to single underscore per PEP 8) - Optimize type name access (obj.__class__.__name__) - Fix defaults merging to properly handle None parameters - Add comprehensive test suite (102 tests, all passing) - Update README with new sections on available fields, type hints, and exception handling - Add pytest as dev dependency in setup.py Benefits: - Exceptions work with python -O flag (assertions don't) - Better IDE autocomplete and static analysis support - Comprehensive documentation for all methods - Test coverage ensures stability - Pythonic conventions throughout Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Ensures the exact examples from README.md continue to work as documented. - Test configuration loaded example - Test request received example - Test configuration settings example All 3 README example tests pass.
|
Added tests that mirror the exact README examples to ensure documentation stays accurate. All 105 tests now passing:
The README examples work exactly as documented. |
There was a problem hiding this comment.
Pull request overview
This PR modernizes the kubi_ecs_logger library by replacing assertions with proper exceptions, adding comprehensive Google-style docstrings, improving type hints, and providing a complete test suite with 102 tests. The changes maintain 100% backwards compatibility with existing code.
Changes:
- Introduced custom exception hierarchy (
LoggerError,InvalidTypeError,InvalidSeverityError) replacing all assertions - Added comprehensive Google-style docstrings to all modules, classes, and methods (27+ field methods documented)
- Enhanced type hints throughout with
Optional,List,Dict,Any,Uniontypes - Fixed PEP 8 naming violations (private methods changed from
__methodto_method) - Optimized type name access pattern (
obj.__class__.__name__instead ofstr(type(obj).__name__)) - Refactored defaults merging logic in field methods for consistent behavior where explicit parameters override defaults
- Added comprehensive test suite (102 tests) covering all functionality and backwards compatibility
Reviewed changes
Copilot reviewed 18 out of 19 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| kubi_ecs_logger/exceptions.py | New custom exception hierarchy with proper inheritance from standard Python exceptions |
| kubi_ecs_logger/init.py | Updated module docstring and exports to include new exception classes |
| kubi_ecs_logger/wrapper/logger.py | Major refactoring with docstrings, improved defaults merging pattern, and proper exception handling |
| kubi_ecs_logger/models/severity.py | Replaced assertion with InvalidSeverityError, added comprehensive docstrings |
| kubi_ecs_logger/models/root_schema.py | Optimized type name access, added docstrings |
| kubi_ecs_logger/models/base.py | Replaced assertion with InvalidTypeError, optimized type name access, added docstrings |
| kubi_ecs_logger/models/fields/field_set.py | Added docstrings and proper init type hints |
| kubi_ecs_logger/models/fields/logline.py | Fixed private attribute naming (_level instead of __level), added docstrings |
| tests/*.py | Comprehensive test suite with 102 tests covering all functionality |
| tests/conftest.py | Well-designed fixtures including autouse reset_logger for singleton management |
| setup.py | Added dev dependencies for pytest |
| README.md | Enhanced documentation with available fields section, type hints examples, and exception handling guide |
| .gitignore | Added Claude Code specific files |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
kubi_ecs_logger/wrapper/logger.py
Outdated
| defaults = self._get_defaults_for(Client) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
|
|
||
| self._base.add_object(ECS(version=version)) |
There was a problem hiding this comment.
The ecs() method has three issues:
- It uses Client instead of ECS when calling _get_defaults_for(), which means defaults configured for the 'ecs' field won't be retrieved at all
- The kwargs merged with defaults (line 401) are never passed to the ECS constructor (line 403), so even if defaults were retrieved correctly, they wouldn't be applied
- Unlike other field methods in this PR, it doesn't follow the new pattern of collecting explicit parameters and properly merging defaults. The version parameter should be collected in a params dict first, then defaults should be merged without overriding explicit params, and finally kwargs should be merged.
The correct implementation should be similar to other field methods like error() or event().
| defaults = self._get_defaults_for(Client) | |
| if defaults: | |
| kwargs.update(defaults) | |
| self._base.add_object(ECS(version=version)) | |
| # Collect explicit parameters (only non-None values) | |
| params = {} | |
| if version is not None: | |
| params['version'] = version | |
| # Merge with defaults (defaults don't override explicit params) | |
| defaults = self._get_defaults_for(ECS) | |
| if defaults: | |
| for key, value in defaults.items(): | |
| if key not in params: | |
| params[key] = value | |
| # Merge with kwargs (kwargs override everything) | |
| params.update(kwargs) | |
| self._base.add_object(ECS(**params)) |
| specific ECS fields as __init__ parameters. | ||
|
|
||
| Example: | ||
| >>> class Custom Field(FieldSet): |
There was a problem hiding this comment.
The example class name has a space in it: "Custom Field" should be "CustomField". Class names in Python should not contain spaces.
| >>> class Custom Field(FieldSet): | |
| >>> class CustomField(FieldSet): |
tests/test_severity.py
Outdated
|
|
||
| def test_severity_less_than_or_equal(self): | ||
| """Severity <= comparison works correctly.""" | ||
| assert Severity.DEBUG <= Severity.DEBUG |
There was a problem hiding this comment.
Comparison of identical values; use cmath.isnan() if testing for not-a-number.
tests/test_severity.py
Outdated
|
|
||
| def test_severity_greater_than_or_equal(self): | ||
| """Severity >= comparison works correctly.""" | ||
| assert Severity.CRITICAL >= Severity.CRITICAL |
There was a problem hiding this comment.
Comparison of identical values; use cmath.isnan() if testing for not-a-number.
tests/test_severity.py
Outdated
|
|
||
| def test_severity_equality(self): | ||
| """Severity == comparison works correctly.""" | ||
| assert Severity.INFO == Severity.INFO |
There was a problem hiding this comment.
Comparison of identical values; use cmath.isnan() if testing for not-a-number.
| assert Severity.INFO == Severity.INFO | |
| severity = Severity.INFO | |
| assert severity == Severity.INFO |
| import pytest | ||
| from datetime import datetime | ||
| from kubi_ecs_logger.models import Base, BaseSchema | ||
| from kubi_ecs_logger.models.fields import Event, User, FieldSet |
There was a problem hiding this comment.
Import of 'FieldSet' is not used.
| from kubi_ecs_logger.models.fields import Event, User, FieldSet | |
| from kubi_ecs_logger.models.fields import Event, User |
tests/test_exceptions.py
Outdated
| @@ -0,0 +1,79 @@ | |||
| """Tests for custom exceptions.""" | |||
|
|
|||
| import pytest | |||
There was a problem hiding this comment.
Import of 'pytest' is not used.
| import pytest |
tests/test_fields.py
Outdated
| @@ -0,0 +1,198 @@ | |||
| """Tests for field objects.""" | |||
|
|
|||
| import pytest | |||
There was a problem hiding this comment.
Import of 'pytest' is not used.
| import pytest |
tests/test_fields.py
Outdated
| """Tests for field objects.""" | ||
|
|
||
| import pytest | ||
| from datetime import datetime |
There was a problem hiding this comment.
Import of 'datetime' is not used.
| from datetime import datetime |
tests/test_readme_examples.py
Outdated
| import json | ||
| import sys | ||
| from io import StringIO | ||
| import pytest |
There was a problem hiding this comment.
Import of 'pytest' is not used.
| import pytest |
Addresses all 10 inline code review comments from Copilot: 1. Fixed critical bug in ecs() method (logger.py): - Changed _get_defaults_for(Client) to _get_defaults_for(ECS) - Implemented proper parameter collection and merging pattern - Now correctly passes all params to ECS constructor - Follows same pattern as error() and other field methods 2. Fixed invalid class name in docstring (field_set.py): - Changed "class Custom Field" to "class CustomField" 3. Removed unused imports: - Removed pytest from test_exceptions.py - Removed pytest from test_fields.py - Removed pytest from test_readme_examples.py - Removed datetime from test_fields.py 4. Fixed identical value comparisons in test_severity.py: - Updated <= and >= tests to compare different values - Fixed equality test to avoid comparing identical constants 5. Added regression tests (test_regressions.py): - 5 new tests verifying ecs() bug fix - Tests cover defaults, explicit params, kwargs, and precedence - Prevents reintroduction of the bug All 110 tests passing (105 original + 5 new regression tests). Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 20 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
kubi_ecs_logger/wrapper/logger.py:1368
- Inefficient logic in _append_log_level method. When a LogLine already exists with level=None, the code sets the level on line 1364 but then falls through to line 1368 which calls add_object again. While add_object will safely reject the duplicate (first wins policy), this is wasteful and confusing.
The method should return after setting the level:
if hasattr(self._base, "logline"):
if self._base.logline.level is None:
self._base.logline.level = severity_level
return # Return for both branches
else:
self._base.add_object(LogLine(level=severity_level))Or more simply:
if hasattr(self._base, "logline"):
if self._base.logline.level is None:
self._base.logline.level = severity_level
else:
self._base.add_object(LogLine(level=severity_level)) if hasattr(self._base, "logline"):
if self._base.logline.level is None:
self._base.logline.level = severity_level
else:
return
self._base.add_object(LogLine(level=severity_level))
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
kubi_ecs_logger/wrapper/logger.py
Outdated
| def agent(self, ephemeral_id: str = None, id: str = None, name: str = None, | ||
| type: str = None, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Agent) | ||
| """Add ECS agent fields. | ||
|
|
||
| Information about the agent/client reporting the event. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-agent.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if ephemeral_id is not None: | ||
| params['ephemeral_id'] = ephemeral_id | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
| if type is not None: | ||
| params['type'] = type | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Agent) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Agent(ephemeral_id=ephemeral_id, id=id, name=name, type=type, version=version, **kwargs)) | ||
| self._base.add_object(Agent(**params)) | ||
| return self | ||
|
|
||
| def client(self, address: str = None, bytes: int = None, domain: str = None, ip: str = None, | ||
| mac: str = None, packets: int = None, port: int = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Client) | ||
| """Add ECS client fields. | ||
|
|
||
| Fields about the client (initiator) side of a network connection. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-client.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if address is not None: | ||
| params['address'] = address | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if port is not None: | ||
| params['port'] = port | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Client) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Client(address=address, bytes=bytes, domain=domain, ip=ip, mac=mac, packets=packets, | ||
| port=port, **kwargs)) | ||
| self._base.add_object(Client(**params)) | ||
| return self | ||
|
|
||
| def cloud(self, account_id: str = None, availability_zone: str = None, instance_id: str = None, | ||
| instance_name: str = None, machine_type: str = None, provider: str = None, | ||
| region: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Cloud) | ||
| """Add ECS cloud fields. | ||
|
|
||
| Fields related to cloud or infrastructure provider information. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-cloud.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if account_id is not None: | ||
| params['account_id'] = account_id | ||
| if availability_zone is not None: | ||
| params['availability_zone'] = availability_zone | ||
| if instance_id is not None: | ||
| params['instance_id'] = instance_id | ||
| if instance_name is not None: | ||
| params['instance_name'] = instance_name | ||
| if machine_type is not None: | ||
| params['machine_type'] = machine_type | ||
| if provider is not None: | ||
| params['provider'] = provider | ||
| if region is not None: | ||
| params['region'] = region | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Cloud) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Cloud(account_id=account_id, availability_zone=availability_zone, instance_id=instance_id, | ||
| instance_name=instance_name, machine_type=machine_type, provider=provider, | ||
| region=region, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Cloud(**params)) | ||
| return self | ||
|
|
||
| def container(self, id: str = None, image_name: str = None, image_tag: str = None, | ||
| labels: dict = None, name: str = None, runtime: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Container) | ||
| """Add ECS container fields. | ||
|
|
||
| Runtime environment information for containerized applications. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-container.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if id is not None: | ||
| params['id'] = id | ||
| if image_name is not None: | ||
| params['image_name'] = image_name | ||
| if image_tag is not None: | ||
| params['image_tag'] = image_tag | ||
| if labels is not None: | ||
| params['labels'] = labels | ||
| if name is not None: | ||
| params['name'] = name | ||
| if runtime is not None: | ||
| params['runtime'] = runtime | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Container) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Container(id=id, image_name=image_name, image_tag=image_tag, labels=labels, | ||
| name=name, runtime=runtime, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Container(**params)) | ||
| return self | ||
|
|
||
| def destination(self, address: str = None, bytes: int = None, domain: str = None, ip: str = None, | ||
| mac: str = None, packets: int = None, port: int = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Destination) | ||
| """Add ECS destination fields. | ||
|
|
||
| Fields about the destination (responder) side of a network connection. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-destination.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if address is not None: | ||
| params['address'] = address | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if port is not None: | ||
| params['port'] = port | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Destination) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| self._base.add_object(Destination(address=address, bytes=bytes, domain=domain, ip=ip, mac=mac, packets=packets, | ||
| port=port, **kwargs)) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Destination(**params)) | ||
| return self | ||
|
|
||
| def ecs(self, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Client) | ||
| """Add ECS version information. | ||
|
|
||
| Meta-information about the ECS version used. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-ecs.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(ECS) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(ECS(version=version)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(ECS(**params)) | ||
| return self | ||
|
|
||
| def error(self, code: str = None, id: str = None, message: str = None, **kwargs) -> 'Logger': | ||
| defaults = self.__get_defaults_for(Error) | ||
| def error(self, code: Optional[str] = None, id: Optional[str] = None, | ||
| message: Optional[str] = None, **kwargs) -> 'Logger': | ||
| """Add ECS error fields to the log entry. | ||
|
|
||
| The error fields capture details about errors that occurred during event processing. | ||
|
|
||
| Args: | ||
| code: Error code describing the error | ||
| id: Unique identifier for the error | ||
| message: Error message text | ||
| **kwargs: Additional error fields (e.g., stack_trace, type) | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| Example: | ||
| >>> Logger().error(code="ERR_AUTH_FAILED", message="Invalid credentials") | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-error.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if code is not None: | ||
| params['code'] = code | ||
| if id is not None: | ||
| params['id'] = id | ||
| if message is not None: | ||
| params['message'] = message | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Error) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Error(code=code, id=id, message=message, **kwargs)) | ||
| self._base.add_object(Error(**params)) | ||
| return self | ||
|
|
||
| def event(self, action: str = None, category: str = None, created: datetime = None, | ||
| dataset: str = None, risk_score: float = None, severity: int = None, | ||
| def event(self, action: Optional[str] = None, category: Optional[str] = None, | ||
| created: Optional[datetime] = None, dataset: Optional[str] = None, | ||
| risk_score: Optional[float] = None, severity: Optional[int] = None, | ||
| **kwargs) -> 'Logger': | ||
| defaults = self.__get_defaults_for(Event) | ||
| """Add ECS event fields to the log entry. | ||
|
|
||
| The event fields describe the circumstances of an observed event, | ||
| such as actions taken, their outcomes, and contextual information. | ||
|
|
||
| Args: | ||
| action: The action captured by the event (e.g., "user-login", "file-delete") | ||
| category: Event category (e.g., "authentication", "file") | ||
| created: When the event was created | ||
| dataset: Name of the dataset for event correlation | ||
| risk_score: Risk score calculated for the event (0-100) | ||
| severity: Numeric severity of the event | ||
| **kwargs: Additional event fields (e.g., duration, outcome, type) | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| Example: | ||
| >>> Logger().event(action="user-login", outcome="success", category="authentication") | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-event.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if action is not None: | ||
| params['action'] = action | ||
| if category is not None: | ||
| params['category'] = category | ||
| if created is not None: | ||
| params['created'] = created | ||
| if dataset is not None: | ||
| params['dataset'] = dataset | ||
| if risk_score is not None: | ||
| params['risk_score'] = risk_score | ||
| if severity is not None: | ||
| params['severity'] = severity | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Event) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Event(action=action, category=category, created=created, dataset=dataset, | ||
| risk_score=risk_score, severity=severity, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Event(**params)) | ||
| return self | ||
|
|
||
| def file(self, ctime: datetime = None, device: str = None, extension: str = None, gid: str = None, | ||
| group: str = None, inode: str = None, mode: str = None, mtime: datetime = None, owner: str = None, | ||
| path: str = None, size: int = None, target_path: str = None, type: str = None, uid: str = None, | ||
| **kwargs): | ||
| defaults = self.__get_defaults_for(File) | ||
| """Add ECS file fields. | ||
|
|
||
| Information about files involved in the event. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-file.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if ctime is not None: | ||
| params['ctime'] = ctime | ||
| if device is not None: | ||
| params['device'] = device | ||
| if extension is not None: | ||
| params['extension'] = extension | ||
| if gid is not None: | ||
| params['gid'] = gid | ||
| if group is not None: | ||
| params['group'] = group | ||
| if inode is not None: | ||
| params['inode'] = inode | ||
| if mode is not None: | ||
| params['mode'] = mode | ||
| if mtime is not None: | ||
| params['mtime'] = mtime | ||
| if owner is not None: | ||
| params['owner'] = owner | ||
| if path is not None: | ||
| params['path'] = path | ||
| if size is not None: | ||
| params['size'] = size | ||
| if target_path is not None: | ||
| params['target_path'] = target_path | ||
| if type is not None: | ||
| params['type'] = type | ||
| if uid is not None: | ||
| params['uid'] = uid | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(File) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(File(ctime=ctime, device=device, extension=extension, gid=gid, group=group, | ||
| inode=inode, mode=mode, mtime=mtime, owner=owner, path=path, size=size, | ||
| target_path=target_path, type=type, uid=uid, **kwargs)) | ||
| self._base.add_object(File(**params)) | ||
| return self | ||
|
|
||
| def geo(self, city_name: str = None, continent_name: str = None, country_iso_code: str = None, | ||
| country_name: str = None, location: dict = None, name: str = None, region_iso_code: str = None, | ||
| region_name: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Geo) | ||
| """Add ECS geo fields. | ||
|
|
||
| Geolocation information for IP addresses. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-geo.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if city_name is not None: | ||
| params['city_name'] = city_name | ||
| if continent_name is not None: | ||
| params['continent_name'] = continent_name | ||
| if country_iso_code is not None: | ||
| params['country_iso_code'] = country_iso_code | ||
| if country_name is not None: | ||
| params['country_name'] = country_name | ||
| if location is not None: | ||
| params['location'] = location | ||
| if name is not None: | ||
| params['name'] = name | ||
| if region_iso_code is not None: | ||
| params['region_iso_code'] = region_iso_code | ||
| if region_name is not None: | ||
| params['region_name'] = region_name | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Geo) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Geo(city_name=city_name, continent_name=continent_name, country_iso_code=country_iso_code, | ||
| country_name=country_name, location=location, name=name, | ||
| region_iso_code=region_iso_code, | ||
| region_name=region_name, **kwargs)) | ||
| self._base.add_object(Geo(**params)) | ||
| return self | ||
|
|
||
| def group(self, id: str = None, name: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Group) | ||
| """Add ECS group fields. | ||
|
|
||
| Information about user groups. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-group.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Group) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Group(id=id, name=name, **kwargs)) | ||
| self._base.add_object(Group(**params)) | ||
| return self | ||
|
|
||
| def host(self, architecture: str = None, hostname: str = None, id: str = None, ip: str = None, | ||
| mac: str = None, name: str = None, type: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Host) | ||
| """Add ECS host fields. | ||
|
|
||
| Information about the host machine. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-host.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if architecture is not None: | ||
| params['architecture'] = architecture | ||
| if hostname is not None: | ||
| params['hostname'] = hostname | ||
| if id is not None: | ||
| params['id'] = id | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if name is not None: | ||
| params['name'] = name | ||
| if type is not None: | ||
| params['type'] = type | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Host) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Host(architecture=architecture, hostname=hostname, id=id, ip=ip, mac=mac, | ||
| name=name, type=type, **kwargs)) | ||
| self._base.add_object(Host(**params)) | ||
| return self | ||
|
|
||
| def http_request(self, body_bytes: int = None, body_content: str = None, bytes: int = None, method: str = None, | ||
| referrer: str = None, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(HttpRequest) | ||
| """Add ECS HTTP request fields. | ||
|
|
||
| Details about HTTP requests. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-http.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if body_bytes is not None: | ||
| params['body_bytes'] = body_bytes | ||
| if body_content is not None: | ||
| params['body_content'] = body_content | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if method is not None: | ||
| params['method'] = method | ||
| if referrer is not None: | ||
| params['referrer'] = referrer | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(HttpRequest) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(HttpRequest(body_bytes=body_bytes, body_content=body_content, bytes=bytes, method=method, | ||
| referrer=referrer, version=version, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(HttpRequest(**params)) | ||
| return self | ||
|
|
||
| def http_response(self, body_bytes: int = None, body_content: str = None, bytes: int = None, | ||
| status_code: str = None, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(HttpResponse) | ||
| """Add ECS HTTP response fields. | ||
|
|
||
| Details about HTTP responses. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-http.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if body_bytes is not None: | ||
| params['body_bytes'] = body_bytes | ||
| if body_content is not None: | ||
| params['body_content'] = body_content | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if status_code is not None: | ||
| params['status_code'] = status_code | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(HttpResponse) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(HttpResponse(body_bytes=body_bytes, body_content=body_content, bytes=bytes, | ||
| status_code=status_code, version=version, **kwargs)) | ||
| self._base.add_object(HttpResponse(**params)) | ||
| return self | ||
|
|
||
| def log(self, level: Union[str, Severity] = None, original: str = None, **kwargs) -> 'Logger': | ||
| defaults = self.__get_defaults_for(LogLine) | ||
| """Add ECS log fields. | ||
|
|
||
| Details about the log file or logging subsystem. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-log.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if level is not None: | ||
| params['level'] = level | ||
| if original is not None: | ||
| params['original'] = original | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(LogLine) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(LogLine(level=level, original=original, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(LogLine(**params)) | ||
| return self | ||
|
|
||
| def network(self, application: str = None, bytes: int = None, community_id: str = None, direction: str = None, | ||
| forwarded_ip: str = None, iana_number: str = None, name: str = None, packets: int = None, | ||
| protocol: str = None, transport: str = None, type: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Network) | ||
| """Add ECS network fields. | ||
|
|
||
| Information about network communication. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-network.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if application is not None: | ||
| params['application'] = application | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if community_id is not None: | ||
| params['community_id'] = community_id | ||
| if direction is not None: | ||
| params['direction'] = direction | ||
| if forwarded_ip is not None: | ||
| params['forwarded_ip'] = forwarded_ip | ||
| if iana_number is not None: | ||
| params['iana_number'] = iana_number | ||
| if name is not None: | ||
| params['name'] = name | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if protocol is not None: | ||
| params['protocol'] = protocol | ||
| if transport is not None: | ||
| params['transport'] = transport | ||
| if type is not None: | ||
| params['type'] = type | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Network) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Network(application=application, bytes=bytes, community_id=community_id, | ||
| direction=direction, forwarded_ip=forwarded_ip, iana_number=iana_number, | ||
| name=name, packets=packets, protocol=protocol, transport=transport, type=type, | ||
| **kwargs)) | ||
| self._base.add_object(Network(**params)) | ||
| return self | ||
|
|
||
| def observer(self, hostname: str = None, ip: str = None, mac: str = None, serial_number: str = None, | ||
| type: str = None, vendor: str = None, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Observer) | ||
| """Add ECS observer fields. | ||
|
|
||
| Information about the observing entity (e.g., firewall, proxy). | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-observer.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if hostname is not None: | ||
| params['hostname'] = hostname | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if serial_number is not None: | ||
| params['serial_number'] = serial_number | ||
| if type is not None: | ||
| params['type'] = type | ||
| if vendor is not None: | ||
| params['vendor'] = vendor | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Observer) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Observer(hostname=hostname, ip=ip, mac=mac, serial_number=serial_number, type=type, | ||
| vendor=vendor, version=version, **kwargs)) | ||
| self._base.add_object(Observer(**params)) | ||
| return self | ||
|
|
||
| def organization(self, id: str = None, name: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Organization) | ||
| """Add ECS organization fields. | ||
|
|
||
| Information about the organization. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-organization.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Organization) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Organization(id=id, name=name, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Organization(**params)) | ||
| return self | ||
|
|
||
| def os(self, family: str = None, full: str = None, kernel: str = None, name: str = None, platform: str = None, | ||
| version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(OS) | ||
| """Add ECS operating system fields. | ||
|
|
||
| Information about the operating system. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-os.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if family is not None: | ||
| params['family'] = family | ||
| if full is not None: | ||
| params['full'] = full | ||
| if kernel is not None: | ||
| params['kernel'] = kernel | ||
| if name is not None: | ||
| params['name'] = name | ||
| if platform is not None: | ||
| params['platform'] = platform | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(OS) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(OS(family=family, full=full, kernel=kernel, name=name, platform=platform, version=version, | ||
| **kwargs)) | ||
| self._base.add_object(OS(**params)) | ||
| return self | ||
|
|
||
| def process(self, args: List[str] = None, executable: str = None, name: str = None, pid: int = None, | ||
| ppid: int = None, start: datetime = None, thread_id: int = None, title: str = None, | ||
| working_directory: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Process) | ||
| """Add ECS process fields. | ||
|
|
||
| Information about running processes. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-process.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if args is not None: | ||
| params['args'] = args | ||
| if executable is not None: | ||
| params['executable'] = executable | ||
| if name is not None: | ||
| params['name'] = name | ||
| if pid is not None: | ||
| params['pid'] = pid | ||
| if ppid is not None: | ||
| params['ppid'] = ppid | ||
| if start is not None: | ||
| params['start'] = start | ||
| if thread_id is not None: | ||
| params['thread_id'] = thread_id | ||
| if title is not None: | ||
| params['title'] = title | ||
| if working_directory is not None: | ||
| params['working_directory'] = working_directory | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Process) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Process(args=args, executable=executable, name=name, pid=pid, ppid=ppid, start=start, | ||
| thread_id=thread_id, title=title, working_directory=working_directory, **kwargs)) | ||
| self._base.add_object(Process(**params)) | ||
| return self | ||
|
|
||
| def related(self, ip: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Related) | ||
| """Add ECS related fields. | ||
|
|
||
| Fields for relating entities (IPs, users, hosts). | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-related.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Related) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Related(ip=ip, **kwargs)) | ||
| self._base.add_object(Related(**params)) | ||
| return self | ||
|
|
||
| def server(self, address: str = None, bytes: int = None, domain: str = None, ip: str = None, mac: str = None, | ||
| packets: int = None, port: int = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Server) | ||
| """Add ECS server fields. | ||
|
|
||
| Fields about the server (responder) side of a network connection. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-server.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if address is not None: | ||
| params['address'] = address | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if port is not None: | ||
| params['port'] = port | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Server) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Server(address=address, bytes=bytes, domain=domain, ip=ip, mac=mac, packets=packets, | ||
| port=port, **kwargs)) | ||
| self._base.add_object(Server(**params)) | ||
| return self | ||
|
|
||
| def service(self, ephemeral_id: str = None, id: str = None, name: str = None, state: str = None, type: str = None, | ||
| version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Service) | ||
| """Add ECS service fields. | ||
|
|
||
| Information about the service generating events. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-service.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if ephemeral_id is not None: | ||
| params['ephemeral_id'] = ephemeral_id | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
| if state is not None: | ||
| params['state'] = state | ||
| if type is not None: | ||
| params['type'] = type | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Service) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Service(ephemeral_id=ephemeral_id, id=id, name=name, state=state, type=type, | ||
| version=version, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Service(**params)) | ||
| return self | ||
|
|
||
| def source(self, address: str = None, bytes: int = None, domain: str = None, ip: str = None, mac: str = None, | ||
| packets: int = None, port: int = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Source) | ||
| """Add ECS source fields. | ||
|
|
||
| Fields about the source (initiator) side of a network connection. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-source.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if address is not None: | ||
| params['address'] = address | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if port is not None: | ||
| params['port'] = port | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Source) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Source(address=address, bytes=bytes, domain=domain, ip=ip, mac=mac, packets=packets, | ||
| port=port, **kwargs)) | ||
| self._base.add_object(Source(**params)) | ||
| return self | ||
|
|
||
| def url(self, domain: str = None, fragment: str = None, full: str = None, original: str = None, | ||
| password: str = None, path: str = None, port: int = None, query: str = None, scheme: str = None, | ||
| username: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Url) | ||
| """Add ECS URL fields. | ||
|
|
||
| Information about parsed URLs. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-url.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if fragment is not None: | ||
| params['fragment'] = fragment | ||
| if full is not None: | ||
| params['full'] = full | ||
| if original is not None: | ||
| params['original'] = original | ||
| if password is not None: | ||
| params['password'] = password | ||
| if path is not None: | ||
| params['path'] = path | ||
| if port is not None: | ||
| params['port'] = port | ||
| if query is not None: | ||
| params['query'] = query | ||
| if scheme is not None: | ||
| params['scheme'] = scheme | ||
| if username is not None: | ||
| params['username'] = username | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Url) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Url(domain=domain, fragment=fragment, full=full, original=original, password=password, | ||
| path=path, port=port, query=query, scheme=scheme, username=username, **kwargs)) | ||
| self._base.add_object(Url(**params)) | ||
| return self | ||
|
|
||
| def user(self, email: str = None, full_name: str = None, hash: str = None, id: str = None, name: str = None, | ||
| **kwargs): | ||
| defaults = self.__get_defaults_for(User) | ||
| def user(self, email: Optional[str] = None, full_name: Optional[str] = None, | ||
| hash: Optional[str] = None, id: Optional[str] = None, name: Optional[str] = None, | ||
| **kwargs) -> 'Logger': | ||
| """Add ECS user fields to the log entry. | ||
|
|
||
| The user fields describe information about the user relevant to the event. | ||
|
|
||
| Args: | ||
| email: User email address | ||
| full_name: User's full name | ||
| hash: Unique user hash for anonymization | ||
| id: Unique user identifier | ||
| name: Short username | ||
| **kwargs: Additional user fields (e.g., domain, roles) | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| Example: | ||
| >>> Logger().user(name="alice", email="alice@example.com", id="1234") | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-user.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if email is not None: | ||
| params['email'] = email | ||
| if full_name is not None: | ||
| params['full_name'] = full_name | ||
| if hash is not None: | ||
| params['hash'] = hash | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(User) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(User(email=email, full_name=full_name, hash=hash, id=id, name=name, **kwargs)) | ||
| self._base.add_object(User(**params)) | ||
| return self | ||
|
|
||
| def user_agent(self, device_name: str = None, name: str = None, original: str = None, version: str = None, | ||
| **kwargs): | ||
| defaults = self.__get_defaults_for(UserAgent) | ||
| """Add ECS user agent fields. | ||
|
|
||
| Information about the user agent (browser, app). | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-user_agent.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if device_name is not None: | ||
| params['device_name'] = device_name | ||
| if name is not None: | ||
| params['name'] = name | ||
| if original is not None: | ||
| params['original'] = original | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(UserAgent) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(UserAgent(device_name=device_name, name=name, original=original, version=version, | ||
| **kwargs)) | ||
| self._base.add_object(UserAgent(**params)) | ||
| return self |
There was a problem hiding this comment.
Most field methods are missing return type annotations. Only base(), error(), event(), log(), and user() have -> 'Logger' annotations, but methods like agent(), client(), cloud(), container(), destination(), ecs(), file(), geo(), group(), host(), http_request(), http_response(), network(), observer(), organization(), os(), process(), related(), server(), service(), source(), url(), and user_agent() are missing them.
For consistency and better IDE support, all these methods should have -> 'Logger': return type annotations added after their parameter lists.
kubi_ecs_logger/wrapper/logger.py
Outdated
| def agent(self, ephemeral_id: str = None, id: str = None, name: str = None, | ||
| type: str = None, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Agent) | ||
| """Add ECS agent fields. | ||
|
|
||
| Information about the agent/client reporting the event. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-agent.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if ephemeral_id is not None: | ||
| params['ephemeral_id'] = ephemeral_id | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
| if type is not None: | ||
| params['type'] = type | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Agent) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Agent(ephemeral_id=ephemeral_id, id=id, name=name, type=type, version=version, **kwargs)) | ||
| self._base.add_object(Agent(**params)) | ||
| return self | ||
|
|
||
| def client(self, address: str = None, bytes: int = None, domain: str = None, ip: str = None, | ||
| mac: str = None, packets: int = None, port: int = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Client) | ||
| """Add ECS client fields. | ||
|
|
||
| Fields about the client (initiator) side of a network connection. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-client.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if address is not None: | ||
| params['address'] = address | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if port is not None: | ||
| params['port'] = port | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Client) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Client(address=address, bytes=bytes, domain=domain, ip=ip, mac=mac, packets=packets, | ||
| port=port, **kwargs)) | ||
| self._base.add_object(Client(**params)) | ||
| return self | ||
|
|
||
| def cloud(self, account_id: str = None, availability_zone: str = None, instance_id: str = None, | ||
| instance_name: str = None, machine_type: str = None, provider: str = None, | ||
| region: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Cloud) | ||
| """Add ECS cloud fields. | ||
|
|
||
| Fields related to cloud or infrastructure provider information. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-cloud.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if account_id is not None: | ||
| params['account_id'] = account_id | ||
| if availability_zone is not None: | ||
| params['availability_zone'] = availability_zone | ||
| if instance_id is not None: | ||
| params['instance_id'] = instance_id | ||
| if instance_name is not None: | ||
| params['instance_name'] = instance_name | ||
| if machine_type is not None: | ||
| params['machine_type'] = machine_type | ||
| if provider is not None: | ||
| params['provider'] = provider | ||
| if region is not None: | ||
| params['region'] = region | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Cloud) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Cloud(account_id=account_id, availability_zone=availability_zone, instance_id=instance_id, | ||
| instance_name=instance_name, machine_type=machine_type, provider=provider, | ||
| region=region, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Cloud(**params)) | ||
| return self | ||
|
|
||
| def container(self, id: str = None, image_name: str = None, image_tag: str = None, | ||
| labels: dict = None, name: str = None, runtime: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Container) | ||
| """Add ECS container fields. | ||
|
|
||
| Runtime environment information for containerized applications. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-container.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if id is not None: | ||
| params['id'] = id | ||
| if image_name is not None: | ||
| params['image_name'] = image_name | ||
| if image_tag is not None: | ||
| params['image_tag'] = image_tag | ||
| if labels is not None: | ||
| params['labels'] = labels | ||
| if name is not None: | ||
| params['name'] = name | ||
| if runtime is not None: | ||
| params['runtime'] = runtime | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Container) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Container(id=id, image_name=image_name, image_tag=image_tag, labels=labels, | ||
| name=name, runtime=runtime, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Container(**params)) | ||
| return self | ||
|
|
||
| def destination(self, address: str = None, bytes: int = None, domain: str = None, ip: str = None, | ||
| mac: str = None, packets: int = None, port: int = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Destination) | ||
| """Add ECS destination fields. | ||
|
|
||
| Fields about the destination (responder) side of a network connection. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-destination.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if address is not None: | ||
| params['address'] = address | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if port is not None: | ||
| params['port'] = port | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Destination) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| self._base.add_object(Destination(address=address, bytes=bytes, domain=domain, ip=ip, mac=mac, packets=packets, | ||
| port=port, **kwargs)) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Destination(**params)) | ||
| return self | ||
|
|
||
| def ecs(self, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Client) | ||
| """Add ECS version information. | ||
|
|
||
| Meta-information about the ECS version used. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-ecs.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(ECS) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(ECS(version=version)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(ECS(**params)) | ||
| return self | ||
|
|
||
| def error(self, code: str = None, id: str = None, message: str = None, **kwargs) -> 'Logger': | ||
| defaults = self.__get_defaults_for(Error) | ||
| def error(self, code: Optional[str] = None, id: Optional[str] = None, | ||
| message: Optional[str] = None, **kwargs) -> 'Logger': | ||
| """Add ECS error fields to the log entry. | ||
|
|
||
| The error fields capture details about errors that occurred during event processing. | ||
|
|
||
| Args: | ||
| code: Error code describing the error | ||
| id: Unique identifier for the error | ||
| message: Error message text | ||
| **kwargs: Additional error fields (e.g., stack_trace, type) | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| Example: | ||
| >>> Logger().error(code="ERR_AUTH_FAILED", message="Invalid credentials") | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-error.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if code is not None: | ||
| params['code'] = code | ||
| if id is not None: | ||
| params['id'] = id | ||
| if message is not None: | ||
| params['message'] = message | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Error) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Error(code=code, id=id, message=message, **kwargs)) | ||
| self._base.add_object(Error(**params)) | ||
| return self | ||
|
|
||
| def event(self, action: str = None, category: str = None, created: datetime = None, | ||
| dataset: str = None, risk_score: float = None, severity: int = None, | ||
| def event(self, action: Optional[str] = None, category: Optional[str] = None, | ||
| created: Optional[datetime] = None, dataset: Optional[str] = None, | ||
| risk_score: Optional[float] = None, severity: Optional[int] = None, | ||
| **kwargs) -> 'Logger': | ||
| defaults = self.__get_defaults_for(Event) | ||
| """Add ECS event fields to the log entry. | ||
|
|
||
| The event fields describe the circumstances of an observed event, | ||
| such as actions taken, their outcomes, and contextual information. | ||
|
|
||
| Args: | ||
| action: The action captured by the event (e.g., "user-login", "file-delete") | ||
| category: Event category (e.g., "authentication", "file") | ||
| created: When the event was created | ||
| dataset: Name of the dataset for event correlation | ||
| risk_score: Risk score calculated for the event (0-100) | ||
| severity: Numeric severity of the event | ||
| **kwargs: Additional event fields (e.g., duration, outcome, type) | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| Example: | ||
| >>> Logger().event(action="user-login", outcome="success", category="authentication") | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-event.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if action is not None: | ||
| params['action'] = action | ||
| if category is not None: | ||
| params['category'] = category | ||
| if created is not None: | ||
| params['created'] = created | ||
| if dataset is not None: | ||
| params['dataset'] = dataset | ||
| if risk_score is not None: | ||
| params['risk_score'] = risk_score | ||
| if severity is not None: | ||
| params['severity'] = severity | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Event) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Event(action=action, category=category, created=created, dataset=dataset, | ||
| risk_score=risk_score, severity=severity, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Event(**params)) | ||
| return self | ||
|
|
||
| def file(self, ctime: datetime = None, device: str = None, extension: str = None, gid: str = None, | ||
| group: str = None, inode: str = None, mode: str = None, mtime: datetime = None, owner: str = None, | ||
| path: str = None, size: int = None, target_path: str = None, type: str = None, uid: str = None, | ||
| **kwargs): | ||
| defaults = self.__get_defaults_for(File) | ||
| """Add ECS file fields. | ||
|
|
||
| Information about files involved in the event. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-file.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if ctime is not None: | ||
| params['ctime'] = ctime | ||
| if device is not None: | ||
| params['device'] = device | ||
| if extension is not None: | ||
| params['extension'] = extension | ||
| if gid is not None: | ||
| params['gid'] = gid | ||
| if group is not None: | ||
| params['group'] = group | ||
| if inode is not None: | ||
| params['inode'] = inode | ||
| if mode is not None: | ||
| params['mode'] = mode | ||
| if mtime is not None: | ||
| params['mtime'] = mtime | ||
| if owner is not None: | ||
| params['owner'] = owner | ||
| if path is not None: | ||
| params['path'] = path | ||
| if size is not None: | ||
| params['size'] = size | ||
| if target_path is not None: | ||
| params['target_path'] = target_path | ||
| if type is not None: | ||
| params['type'] = type | ||
| if uid is not None: | ||
| params['uid'] = uid | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(File) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(File(ctime=ctime, device=device, extension=extension, gid=gid, group=group, | ||
| inode=inode, mode=mode, mtime=mtime, owner=owner, path=path, size=size, | ||
| target_path=target_path, type=type, uid=uid, **kwargs)) | ||
| self._base.add_object(File(**params)) | ||
| return self | ||
|
|
||
| def geo(self, city_name: str = None, continent_name: str = None, country_iso_code: str = None, | ||
| country_name: str = None, location: dict = None, name: str = None, region_iso_code: str = None, | ||
| region_name: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Geo) | ||
| """Add ECS geo fields. | ||
|
|
||
| Geolocation information for IP addresses. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-geo.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if city_name is not None: | ||
| params['city_name'] = city_name | ||
| if continent_name is not None: | ||
| params['continent_name'] = continent_name | ||
| if country_iso_code is not None: | ||
| params['country_iso_code'] = country_iso_code | ||
| if country_name is not None: | ||
| params['country_name'] = country_name | ||
| if location is not None: | ||
| params['location'] = location | ||
| if name is not None: | ||
| params['name'] = name | ||
| if region_iso_code is not None: | ||
| params['region_iso_code'] = region_iso_code | ||
| if region_name is not None: | ||
| params['region_name'] = region_name | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Geo) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Geo(city_name=city_name, continent_name=continent_name, country_iso_code=country_iso_code, | ||
| country_name=country_name, location=location, name=name, | ||
| region_iso_code=region_iso_code, | ||
| region_name=region_name, **kwargs)) | ||
| self._base.add_object(Geo(**params)) | ||
| return self | ||
|
|
||
| def group(self, id: str = None, name: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Group) | ||
| """Add ECS group fields. | ||
|
|
||
| Information about user groups. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-group.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Group) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Group(id=id, name=name, **kwargs)) | ||
| self._base.add_object(Group(**params)) | ||
| return self | ||
|
|
||
| def host(self, architecture: str = None, hostname: str = None, id: str = None, ip: str = None, | ||
| mac: str = None, name: str = None, type: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Host) | ||
| """Add ECS host fields. | ||
|
|
||
| Information about the host machine. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-host.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if architecture is not None: | ||
| params['architecture'] = architecture | ||
| if hostname is not None: | ||
| params['hostname'] = hostname | ||
| if id is not None: | ||
| params['id'] = id | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if name is not None: | ||
| params['name'] = name | ||
| if type is not None: | ||
| params['type'] = type | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Host) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Host(architecture=architecture, hostname=hostname, id=id, ip=ip, mac=mac, | ||
| name=name, type=type, **kwargs)) | ||
| self._base.add_object(Host(**params)) | ||
| return self | ||
|
|
||
| def http_request(self, body_bytes: int = None, body_content: str = None, bytes: int = None, method: str = None, | ||
| referrer: str = None, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(HttpRequest) | ||
| """Add ECS HTTP request fields. | ||
|
|
||
| Details about HTTP requests. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-http.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if body_bytes is not None: | ||
| params['body_bytes'] = body_bytes | ||
| if body_content is not None: | ||
| params['body_content'] = body_content | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if method is not None: | ||
| params['method'] = method | ||
| if referrer is not None: | ||
| params['referrer'] = referrer | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(HttpRequest) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(HttpRequest(body_bytes=body_bytes, body_content=body_content, bytes=bytes, method=method, | ||
| referrer=referrer, version=version, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(HttpRequest(**params)) | ||
| return self | ||
|
|
||
| def http_response(self, body_bytes: int = None, body_content: str = None, bytes: int = None, | ||
| status_code: str = None, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(HttpResponse) | ||
| """Add ECS HTTP response fields. | ||
|
|
||
| Details about HTTP responses. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-http.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if body_bytes is not None: | ||
| params['body_bytes'] = body_bytes | ||
| if body_content is not None: | ||
| params['body_content'] = body_content | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if status_code is not None: | ||
| params['status_code'] = status_code | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(HttpResponse) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(HttpResponse(body_bytes=body_bytes, body_content=body_content, bytes=bytes, | ||
| status_code=status_code, version=version, **kwargs)) | ||
| self._base.add_object(HttpResponse(**params)) | ||
| return self | ||
|
|
||
| def log(self, level: Union[str, Severity] = None, original: str = None, **kwargs) -> 'Logger': | ||
| defaults = self.__get_defaults_for(LogLine) | ||
| """Add ECS log fields. | ||
|
|
||
| Details about the log file or logging subsystem. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-log.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if level is not None: | ||
| params['level'] = level | ||
| if original is not None: | ||
| params['original'] = original | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(LogLine) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(LogLine(level=level, original=original, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(LogLine(**params)) | ||
| return self | ||
|
|
||
| def network(self, application: str = None, bytes: int = None, community_id: str = None, direction: str = None, | ||
| forwarded_ip: str = None, iana_number: str = None, name: str = None, packets: int = None, | ||
| protocol: str = None, transport: str = None, type: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Network) | ||
| """Add ECS network fields. | ||
|
|
||
| Information about network communication. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-network.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if application is not None: | ||
| params['application'] = application | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if community_id is not None: | ||
| params['community_id'] = community_id | ||
| if direction is not None: | ||
| params['direction'] = direction | ||
| if forwarded_ip is not None: | ||
| params['forwarded_ip'] = forwarded_ip | ||
| if iana_number is not None: | ||
| params['iana_number'] = iana_number | ||
| if name is not None: | ||
| params['name'] = name | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if protocol is not None: | ||
| params['protocol'] = protocol | ||
| if transport is not None: | ||
| params['transport'] = transport | ||
| if type is not None: | ||
| params['type'] = type | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Network) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Network(application=application, bytes=bytes, community_id=community_id, | ||
| direction=direction, forwarded_ip=forwarded_ip, iana_number=iana_number, | ||
| name=name, packets=packets, protocol=protocol, transport=transport, type=type, | ||
| **kwargs)) | ||
| self._base.add_object(Network(**params)) | ||
| return self | ||
|
|
||
| def observer(self, hostname: str = None, ip: str = None, mac: str = None, serial_number: str = None, | ||
| type: str = None, vendor: str = None, version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Observer) | ||
| """Add ECS observer fields. | ||
|
|
||
| Information about the observing entity (e.g., firewall, proxy). | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-observer.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if hostname is not None: | ||
| params['hostname'] = hostname | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if serial_number is not None: | ||
| params['serial_number'] = serial_number | ||
| if type is not None: | ||
| params['type'] = type | ||
| if vendor is not None: | ||
| params['vendor'] = vendor | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Observer) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Observer(hostname=hostname, ip=ip, mac=mac, serial_number=serial_number, type=type, | ||
| vendor=vendor, version=version, **kwargs)) | ||
| self._base.add_object(Observer(**params)) | ||
| return self | ||
|
|
||
| def organization(self, id: str = None, name: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Organization) | ||
| """Add ECS organization fields. | ||
|
|
||
| Information about the organization. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-organization.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Organization) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Organization(id=id, name=name, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Organization(**params)) | ||
| return self | ||
|
|
||
| def os(self, family: str = None, full: str = None, kernel: str = None, name: str = None, platform: str = None, | ||
| version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(OS) | ||
| """Add ECS operating system fields. | ||
|
|
||
| Information about the operating system. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-os.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if family is not None: | ||
| params['family'] = family | ||
| if full is not None: | ||
| params['full'] = full | ||
| if kernel is not None: | ||
| params['kernel'] = kernel | ||
| if name is not None: | ||
| params['name'] = name | ||
| if platform is not None: | ||
| params['platform'] = platform | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(OS) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(OS(family=family, full=full, kernel=kernel, name=name, platform=platform, version=version, | ||
| **kwargs)) | ||
| self._base.add_object(OS(**params)) | ||
| return self | ||
|
|
||
| def process(self, args: List[str] = None, executable: str = None, name: str = None, pid: int = None, | ||
| ppid: int = None, start: datetime = None, thread_id: int = None, title: str = None, | ||
| working_directory: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Process) | ||
| """Add ECS process fields. | ||
|
|
||
| Information about running processes. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-process.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if args is not None: | ||
| params['args'] = args | ||
| if executable is not None: | ||
| params['executable'] = executable | ||
| if name is not None: | ||
| params['name'] = name | ||
| if pid is not None: | ||
| params['pid'] = pid | ||
| if ppid is not None: | ||
| params['ppid'] = ppid | ||
| if start is not None: | ||
| params['start'] = start | ||
| if thread_id is not None: | ||
| params['thread_id'] = thread_id | ||
| if title is not None: | ||
| params['title'] = title | ||
| if working_directory is not None: | ||
| params['working_directory'] = working_directory | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Process) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Process(args=args, executable=executable, name=name, pid=pid, ppid=ppid, start=start, | ||
| thread_id=thread_id, title=title, working_directory=working_directory, **kwargs)) | ||
| self._base.add_object(Process(**params)) | ||
| return self | ||
|
|
||
| def related(self, ip: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Related) | ||
| """Add ECS related fields. | ||
|
|
||
| Fields for relating entities (IPs, users, hosts). | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-related.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Related) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Related(ip=ip, **kwargs)) | ||
| self._base.add_object(Related(**params)) | ||
| return self | ||
|
|
||
| def server(self, address: str = None, bytes: int = None, domain: str = None, ip: str = None, mac: str = None, | ||
| packets: int = None, port: int = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Server) | ||
| """Add ECS server fields. | ||
|
|
||
| Fields about the server (responder) side of a network connection. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-server.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if address is not None: | ||
| params['address'] = address | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if port is not None: | ||
| params['port'] = port | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Server) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Server(address=address, bytes=bytes, domain=domain, ip=ip, mac=mac, packets=packets, | ||
| port=port, **kwargs)) | ||
| self._base.add_object(Server(**params)) | ||
| return self | ||
|
|
||
| def service(self, ephemeral_id: str = None, id: str = None, name: str = None, state: str = None, type: str = None, | ||
| version: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Service) | ||
| """Add ECS service fields. | ||
|
|
||
| Information about the service generating events. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-service.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if ephemeral_id is not None: | ||
| params['ephemeral_id'] = ephemeral_id | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
| if state is not None: | ||
| params['state'] = state | ||
| if type is not None: | ||
| params['type'] = type | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Service) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| self._base.add_object(Service(ephemeral_id=ephemeral_id, id=id, name=name, state=state, type=type, | ||
| version=version, **kwargs)) | ||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Service(**params)) | ||
| return self | ||
|
|
||
| def source(self, address: str = None, bytes: int = None, domain: str = None, ip: str = None, mac: str = None, | ||
| packets: int = None, port: int = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Source) | ||
| """Add ECS source fields. | ||
|
|
||
| Fields about the source (initiator) side of a network connection. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-source.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if address is not None: | ||
| params['address'] = address | ||
| if bytes is not None: | ||
| params['bytes'] = bytes | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if ip is not None: | ||
| params['ip'] = ip | ||
| if mac is not None: | ||
| params['mac'] = mac | ||
| if packets is not None: | ||
| params['packets'] = packets | ||
| if port is not None: | ||
| params['port'] = port | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Source) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Source(address=address, bytes=bytes, domain=domain, ip=ip, mac=mac, packets=packets, | ||
| port=port, **kwargs)) | ||
| self._base.add_object(Source(**params)) | ||
| return self | ||
|
|
||
| def url(self, domain: str = None, fragment: str = None, full: str = None, original: str = None, | ||
| password: str = None, path: str = None, port: int = None, query: str = None, scheme: str = None, | ||
| username: str = None, **kwargs): | ||
| defaults = self.__get_defaults_for(Url) | ||
| """Add ECS URL fields. | ||
|
|
||
| Information about parsed URLs. | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-url.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if domain is not None: | ||
| params['domain'] = domain | ||
| if fragment is not None: | ||
| params['fragment'] = fragment | ||
| if full is not None: | ||
| params['full'] = full | ||
| if original is not None: | ||
| params['original'] = original | ||
| if password is not None: | ||
| params['password'] = password | ||
| if path is not None: | ||
| params['path'] = path | ||
| if port is not None: | ||
| params['port'] = port | ||
| if query is not None: | ||
| params['query'] = query | ||
| if scheme is not None: | ||
| params['scheme'] = scheme | ||
| if username is not None: | ||
| params['username'] = username | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(Url) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(Url(domain=domain, fragment=fragment, full=full, original=original, password=password, | ||
| path=path, port=port, query=query, scheme=scheme, username=username, **kwargs)) | ||
| self._base.add_object(Url(**params)) | ||
| return self | ||
|
|
||
| def user(self, email: str = None, full_name: str = None, hash: str = None, id: str = None, name: str = None, | ||
| **kwargs): | ||
| defaults = self.__get_defaults_for(User) | ||
| def user(self, email: Optional[str] = None, full_name: Optional[str] = None, | ||
| hash: Optional[str] = None, id: Optional[str] = None, name: Optional[str] = None, | ||
| **kwargs) -> 'Logger': | ||
| """Add ECS user fields to the log entry. | ||
|
|
||
| The user fields describe information about the user relevant to the event. | ||
|
|
||
| Args: | ||
| email: User email address | ||
| full_name: User's full name | ||
| hash: Unique user hash for anonymization | ||
| id: Unique user identifier | ||
| name: Short username | ||
| **kwargs: Additional user fields (e.g., domain, roles) | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| Example: | ||
| >>> Logger().user(name="alice", email="alice@example.com", id="1234") | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-user.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if email is not None: | ||
| params['email'] = email | ||
| if full_name is not None: | ||
| params['full_name'] = full_name | ||
| if hash is not None: | ||
| params['hash'] = hash | ||
| if id is not None: | ||
| params['id'] = id | ||
| if name is not None: | ||
| params['name'] = name | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(User) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(User(email=email, full_name=full_name, hash=hash, id=id, name=name, **kwargs)) | ||
| self._base.add_object(User(**params)) | ||
| return self | ||
|
|
||
| def user_agent(self, device_name: str = None, name: str = None, original: str = None, version: str = None, | ||
| **kwargs): | ||
| defaults = self.__get_defaults_for(UserAgent) | ||
| """Add ECS user agent fields. | ||
|
|
||
| Information about the user agent (browser, app). | ||
|
|
||
| Returns: | ||
| Logger instance for method chaining. | ||
|
|
||
| See: | ||
| https://www.elastic.co/guide/en/ecs/current/ecs-user_agent.html | ||
| """ | ||
| # Collect explicit parameters (only non-None values) | ||
| params = {} | ||
| if device_name is not None: | ||
| params['device_name'] = device_name | ||
| if name is not None: | ||
| params['name'] = name | ||
| if original is not None: | ||
| params['original'] = original | ||
| if version is not None: | ||
| params['version'] = version | ||
|
|
||
| # Merge with defaults (defaults don't override explicit params) | ||
| defaults = self._get_defaults_for(UserAgent) | ||
| if defaults: | ||
| kwargs.update(defaults) | ||
| for key, value in defaults.items(): | ||
| if key not in params: | ||
| params[key] = value | ||
|
|
||
| # Merge with kwargs (kwargs override everything) | ||
| params.update(kwargs) | ||
|
|
||
| self._base.add_object(UserAgent(device_name=device_name, name=name, original=original, version=version, | ||
| **kwargs)) | ||
| self._base.add_object(UserAgent(**params)) | ||
| return self |
There was a problem hiding this comment.
Type hint inconsistency: Most field methods use bare types with = None defaults (e.g., str = None), but some methods like base(), error(), event(), log(), and user() properly use Optional[str] = None.
For proper static type checking and consistency, parameters with None defaults should be annotated as Optional[type] rather than just type. This affects methods: agent, client, cloud, container, destination, ecs, file, geo, group, host, http_request, http_response, network, observer, organization, os, process, related, server, service, source, url, and user_agent.
kubi_ecs_logger/wrapper/logger.py
Outdated
| """ | ||
| defaults = self._get_defaults_for(Base) | ||
| if defaults: | ||
| kwargs.update(defaults) |
There was a problem hiding this comment.
The base() method's defaults merging pattern is inconsistent with other field methods. In base(), defaults override kwargs (line 178: kwargs.update(defaults)), but in all other field methods (agent, client, error, event, etc.), defaults are overridden by kwargs. This inconsistency could lead to unexpected behavior.
Consider changing line 178 from:
kwargs.update(defaults)to:
if defaults:
for key, value in defaults.items():
if key not in kwargs:
kwargs[key] = valueThis would make the precedence consistent: explicit params > kwargs > defaults
| kwargs.update(defaults) | |
| for key, value in defaults.items(): | |
| if key not in kwargs: | |
| kwargs[key] = value |
Addresses 3 new upstream PR comments:
1. Added return type annotations to all field methods:
- All 23 field methods now have -> 'Logger' return type
- Improves IDE support and type checking consistency
- Methods: agent, client, cloud, container, destination, ecs, file,
geo, group, host, http_request, http_response, network, observer,
organization, os, process, related, server, service, source, url,
user_agent
2. Fixed parameter type hints to use Optional:
- Changed bare types (str = None) to Optional[str] = None
- Affects all parameters with None defaults across all field methods
- Proper static type checking support
3. Fixed base() method defaults precedence bug:
- Changed from defaults override kwargs to kwargs override defaults
- Now consistent with all other field methods
- Precedence: explicit params > kwargs > defaults
- Added 2 regression tests to prevent reintroduction
All 112 tests passing (110 original + 2 new base() tests).
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Summary
This PR improves the pythonicity of kubi_ecs_logger while maintaining 100% backwards compatibility. All existing code continues to work without modification.
Changes
Custom Exception Hierarchy
LoggerError,InvalidTypeError,InvalidSeverityError-Oflag (assertions are removed with optimization)Comprehensive Documentation
Improved Type Hints
Optional,List,Dict,Any,Uniontypes throughoutCode Quality Improvements
obj.__class__.__name__)Comprehensive Test Suite
Updated Documentation
Testing
All 102 tests pass:
pytest tests/ -v # 102 passed in 0.10sBackwards Compatibility
Zero breaking changes. All existing code patterns continue to work:
🤖 Generated with Claude Code