diff --git a/CHANGES.rst b/CHANGES.rst index c04747f..162fda0 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -7,6 +7,8 @@ Changelog - Removed ``setup.py``. +- Added year parameter to prevent leap year parsing failures. Updated + tests. 0.4 (2021-04-04) ---------------- diff --git a/src/syslogmp/parser.py b/src/syslogmp/parser.py index 07f4880..e2c5ecd 100644 --- a/src/syslogmp/parser.py +++ b/src/syslogmp/parser.py @@ -27,7 +27,7 @@ from __future__ import annotations from dataclasses import dataclass from datetime import datetime -from typing import Tuple +from typing import Optional, Tuple from .facility import Facility from .message import Message @@ -37,10 +37,27 @@ MAX_MESSAGE_LENGTH: int = 1024 +def parse(data: bytes, year: Optional[int] = None) -> Message: + """ + Parse a syslog message. -def parse(data: bytes) -> Message: - """Parse a syslog message.""" - parser = _Parser(data) + Args: + data: Raw syslog message as bytes + year: Year to use for timestamp parsing (defaults to current year) + to work around `datetime.strptime` failing on February 29th + if no year is given ("ValueError: day is out of range for month") + in which case the (non-leap) year 1900 would be used. + + Returns: + A Message object containing parsed syslog message components + + Raises: + MessageFormatError: If the syslog message cannot be parsed correctly + """ + if year is None: + year = datetime.today().year + + parser = _Parser(data, year) priority_value = parser._parse_pri_part() timestamp, hostname = parser._parse_header_part() @@ -58,7 +75,7 @@ def parse(data: bytes) -> Message: class _Parser: """Parse a syslog message.""" - def __init__(self, data: bytes) -> None: + def __init__(self, data: bytes, year: int) -> None: ensure(isinstance(data, bytes), 'Data must be a byte string.') ensure( @@ -67,6 +84,7 @@ def __init__(self, data: bytes) -> None: ) self.stream = Stream(data) + self.year = year def _parse_pri_part(self) -> PriorityValue: """Extract facility and severity from the PRI part.""" @@ -92,12 +110,7 @@ def _parse_timestamp(self) -> datetime: timestamp_bytes = self.stream.read(15) timestamp_ascii = timestamp_bytes.decode('ascii') - # Explicitly specify the current year to work around - # `datetime.strptime` failing on February 29th if no year is - # given ("ValueError: day is out of range for month") in which - # case the (non-leap) year 1900 would be used. - current_year = datetime.today().year - timestamp_ascii_with_year = f'{current_year:d} {timestamp_ascii}' + timestamp_ascii_with_year = f"{self.year:d} {timestamp_ascii}" try: timestamp = datetime.strptime( diff --git a/tests/test_parser.py b/tests/test_parser.py index 4d921f3..1622117 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -132,6 +132,7 @@ def test_parse_erroneous_message(data): (2012), (2016), (2020), + (2024), ], ) def test_parse_leap_day_in_leap_year(current_year): @@ -142,6 +143,24 @@ def test_parse_leap_day_in_leap_year(current_year): with freeze_time(fake_date): actual = parse(data) + assert actual.timestamp == expected_timestamp + + +@pytest.mark.parametrize( + 'current_year', + [ + (2012), + (2016), + (2020), + (2024), + ], +) +def test_parse_leap_day_in_leap_year_with_year_arg(current_year): + data = b'<165>Feb 29 19:56:43 localhost foobar' + expected_timestamp = datetime(current_year, 2, 29, 19, 56, 43) + + actual = parse(data, current_year) + assert actual.timestamp == expected_timestamp @@ -152,6 +171,7 @@ def test_parse_leap_day_in_leap_year(current_year): (2015), (2017), (2018), + (2025), ], ) def test_parse_leap_day_in_non_leap_year(current_year): @@ -161,3 +181,20 @@ def test_parse_leap_day_in_non_leap_year(current_year): with pytest.raises(MessageFormatError): with freeze_time(fake_date): parse(data) + + +@pytest.mark.parametrize( + 'current_year', + [ + (1900), + (2015), + (2017), + (2018), + (2025), + ], +) +def test_parse_leap_day_in_non_leap_year_with_year_arg(current_year): + data = b'<165>Feb 29 19:56:43 localhost foobar' + + with pytest.raises(MessageFormatError): + parse(data, current_year)