Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ Changelog

- Removed ``setup.py``.

- Added year parameter to prevent leap year parsing failures. Updated
tests.

0.4 (2021-04-04)
----------------
Expand Down
35 changes: 24 additions & 11 deletions src/syslogmp/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
from typing import Tuple
from typing import Optional, Tuple

from .facility import Facility
from .message import Message
Expand All @@ -37,10 +37,27 @@

MAX_MESSAGE_LENGTH: int = 1024

def parse(data: bytes, year: Optional[int] = None) -> Message:
"""
Parse a syslog message.

def parse(data: bytes) -> Message:
"""Parse a syslog message."""
parser = _Parser(data)
Args:
data: Raw syslog message as bytes
year: Year to use for timestamp parsing (defaults to current year)
to work around `datetime.strptime` failing on February 29th
if no year is given ("ValueError: day is out of range for month")
in which case the (non-leap) year 1900 would be used.

Returns:
A Message object containing parsed syslog message components

Raises:
MessageFormatError: If the syslog message cannot be parsed correctly
"""
if year is None:
year = datetime.today().year

parser = _Parser(data, year)

priority_value = parser._parse_pri_part()
timestamp, hostname = parser._parse_header_part()
Expand All @@ -58,7 +75,7 @@ def parse(data: bytes) -> Message:
class _Parser:
"""Parse a syslog message."""

def __init__(self, data: bytes) -> None:
def __init__(self, data: bytes, year: int) -> None:
ensure(isinstance(data, bytes), 'Data must be a byte string.')

ensure(
Expand All @@ -67,6 +84,7 @@ def __init__(self, data: bytes) -> None:
)

self.stream = Stream(data)
self.year = year

def _parse_pri_part(self) -> PriorityValue:
"""Extract facility and severity from the PRI part."""
Expand All @@ -92,12 +110,7 @@ def _parse_timestamp(self) -> datetime:
timestamp_bytes = self.stream.read(15)
timestamp_ascii = timestamp_bytes.decode('ascii')

# Explicitly specify the current year to work around
# `datetime.strptime` failing on February 29th if no year is
# given ("ValueError: day is out of range for month") in which
# case the (non-leap) year 1900 would be used.
current_year = datetime.today().year
timestamp_ascii_with_year = f'{current_year:d} {timestamp_ascii}'
timestamp_ascii_with_year = f"{self.year:d} {timestamp_ascii}"

try:
timestamp = datetime.strptime(
Expand Down
37 changes: 37 additions & 0 deletions tests/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ def test_parse_erroneous_message(data):
(2012),
(2016),
(2020),
(2024),
],
)
def test_parse_leap_day_in_leap_year(current_year):
Expand All @@ -142,6 +143,24 @@ def test_parse_leap_day_in_leap_year(current_year):
with freeze_time(fake_date):
actual = parse(data)

assert actual.timestamp == expected_timestamp


@pytest.mark.parametrize(
'current_year',
[
(2012),
(2016),
(2020),
(2024),
],
)
def test_parse_leap_day_in_leap_year_with_year_arg(current_year):
data = b'<165>Feb 29 19:56:43 localhost foobar'
expected_timestamp = datetime(current_year, 2, 29, 19, 56, 43)

actual = parse(data, current_year)

assert actual.timestamp == expected_timestamp


Expand All @@ -152,6 +171,7 @@ def test_parse_leap_day_in_leap_year(current_year):
(2015),
(2017),
(2018),
(2025),
],
)
def test_parse_leap_day_in_non_leap_year(current_year):
Expand All @@ -161,3 +181,20 @@ def test_parse_leap_day_in_non_leap_year(current_year):
with pytest.raises(MessageFormatError):
with freeze_time(fake_date):
parse(data)


@pytest.mark.parametrize(
'current_year',
[
(1900),
(2015),
(2017),
(2018),
(2025),
],
)
def test_parse_leap_day_in_non_leap_year_with_year_arg(current_year):
data = b'<165>Feb 29 19:56:43 localhost foobar'

with pytest.raises(MessageFormatError):
parse(data, current_year)