diff --git a/src/cantools/logreader.py b/src/cantools/logreader.py index fd373417..464343bb 100644 --- a/src/cantools/logreader.py +++ b/src/cantools/logreader.py @@ -3,14 +3,14 @@ import datetime import enum import re -import typing +from typing import Literal, Optional, Union, TYPE_CHECKING, overload -if typing.TYPE_CHECKING: +if TYPE_CHECKING: import io from collections.abc import Iterator -TimestampType = typing.Union[datetime.datetime, datetime.timedelta, None] +TimestampType = Optional[Union[datetime.datetime, datetime.timedelta]] class TimestampFormat(enum.Enum): """Describes a type of timestamp. ABSOLUTE is referring to UNIX time @@ -30,7 +30,7 @@ def __init__(self, channel: str, is_extended_frame: bool, data: bytes, is_remote_frame: bool, - timestamp: 'TimestampType', + timestamp: TimestampType, timestamp_format: TimestampFormat): """Constructor for DataFrame @@ -55,9 +55,9 @@ def __repr__(self) -> str: class BasePattern: - pattern: 're.Pattern[str]' + pattern: re.Pattern[str] - def match(self, line: str) -> 'DataFrame|None': + def match(self, line: str) -> Optional[DataFrame]: mo = self.pattern.match(line) if mo: return self.unpack(mo) @@ -65,13 +65,13 @@ def match(self, line: str) -> 'DataFrame|None': return None @abc.abstractmethod - def unpack(self, match_object: 're.Match[str]') -> 'DataFrame|None': + def unpack(self, match_object: re.Match[str]) -> Optional[DataFrame]: raise NotImplementedError() class CandumpBasePattern(BasePattern): - def unpack(self, match_object: 're.Match[str]') -> DataFrame: + def unpack(self, match_object: re.Match[str]) -> DataFrame: channel = match_object.group('channel') frame_id = int(match_object.group('can_id'), 16) is_extended_frame = len(match_object.group('can_id')) > 3 @@ -88,7 +88,7 @@ def unpack(self, match_object: 're.Match[str]') -> DataFrame: return DataFrame(channel=channel, frame_id=frame_id, is_extended_frame=is_extended_frame, data=data, is_remote_frame=is_remote_frame, timestamp=timestamp, timestamp_format=timestamp_format) @abc.abstractmethod - def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]': + def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]: raise NotImplementedError() @@ -101,7 +101,7 @@ class CandumpDefaultPattern(CandumpBasePattern): pattern = re.compile( r'^\s*?(?P[a-zA-Z0-9]+)\s+(?P[0-9A-F]+)\s+\[\d+\]\s*(?Premote request|[0-9A-F ]*).*?$') - def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]': + def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]: timestamp = None timestamp_format = TimestampFormat.MISSING return timestamp, timestamp_format @@ -116,12 +116,12 @@ class CandumpTimestampedPattern(CandumpBasePattern): pattern = re.compile( r'^\s*?\((?P[\d.]+)\)\s+(?P[a-zA-Z0-9]+)\s+(?P[0-9A-F]+)\s+\[\d+\]\s*(?Premote request|[0-9A-F ]*).*?$') - def __init__(self, tz: 'datetime.tzinfo|None') -> None: + def __init__(self, tz: Optional[datetime.tzinfo]) -> None: self.tz = tz - def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]': + def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]: seconds = float(match_object.group('timestamp')) - timestamp: typing.Union[datetime.timedelta, datetime.datetime] + timestamp: Union[datetime.timedelta, datetime.datetime] if seconds < 662688000: # 1991-01-01 00:00:00, "Released in 1991, the Mercedes-Benz W140 was the first production vehicle to feature a CAN-based multiplex wiring system." timestamp = datetime.timedelta(seconds=seconds) timestamp_format = TimestampFormat.RELATIVE @@ -138,10 +138,10 @@ class CandumpDefaultLogPattern(CandumpBasePattern): pattern = re.compile( r'^\s*?\((?P[\d.]+?)\)\s+?(?P[a-zA-Z0-9]+)\s+?(?P[0-9A-F]+?)#(#[0-9A-F])?(?PR|([0-9A-Fa-f]{2})*)(\s+[RT])?$') - def __init__(self, tz: 'datetime.tzinfo|None') -> None: + def __init__(self, tz: Optional[datetime.tzinfo]) -> None: self.tz = tz - def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]': + def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]: timestamp = datetime.datetime.fromtimestamp(float(match_object.group('timestamp')), self.tz) timestamp_format = TimestampFormat.ABSOLUTE return timestamp, timestamp_format @@ -156,7 +156,7 @@ class CandumpAbsoluteLogPattern(CandumpBasePattern): pattern = re.compile( r'^\s*?\((?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+)\)\s+(?P[a-zA-Z0-9]+)\s+(?P[0-9A-F]+)\s+\[\d+\]\s*(?Premote request|[0-9A-F ]*).*?$') - def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]': + def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]: timestamp = datetime.datetime.strptime(match_object.group('timestamp'), "%Y-%m-%d %H:%M:%S.%f") timestamp_format = TimestampFormat.ABSOLUTE return timestamp, timestamp_format @@ -173,7 +173,7 @@ class PCANTracePatternV10(BasePattern): pattern = re.compile( r'^\s*?\d+\)\s*?(?P\d+)\s+(?P[0-9A-F]+)\s+(?P[0-9])\s+(?PRTR|[0-9A-F ]*)$') - def unpack(self, match_object: 're.Match[str]') -> 'DataFrame|None': + def unpack(self, match_object: re.Match[str]) -> Optional[DataFrame]: channel = self.parse_channel(match_object) frame_id = int(match_object.group('can_id'), 16) is_extended_frame = len(match_object.group('can_id')) > 4 @@ -185,10 +185,10 @@ def unpack(self, match_object: 're.Match[str]') -> 'DataFrame|None': return DataFrame(channel=channel, frame_id=frame_id, is_extended_frame=is_extended_frame, data=data, is_remote_frame=is_remote_frame, timestamp=timestamp, timestamp_format=timestamp_format) - def parse_channel(self, match_object: 're.Match[str]') -> str: + def parse_channel(self, match_object: re.Match[str]) -> str: return 'pcanx' - def parse_data(self, match_object: 're.Match[str]') -> 'tuple[bytes, bool]': + def parse_data(self, match_object: re.Match[str]) -> tuple[bytes, bool]: data = match_object.group('can_data') if data == 'RTR': is_remote_frame = True @@ -211,7 +211,7 @@ class PCANTracePatternV11(PCANTracePatternV10): pattern = re.compile( r'^\s*?\d+\)\s*?(?P\d+.\d+)\s+(?P\w+)\s+(?P[0-9A-F]+)\s+(?P[0-9])\s+(?PRTR|[0-9A-F ]*)$') - def unpack(self, match_object: 're.Match[str]') -> 'DataFrame|None': + def unpack(self, match_object: re.Match[str]) -> Optional[DataFrame]: if match_object.group('type') in ('Error', 'Warng'): # yes, they really spell Warning without the 'in' return None @@ -228,7 +228,7 @@ class PCANTracePatternV12(PCANTracePatternV11): pattern = re.compile( r'^\s*?\d+\)\s*?(?P\d+.\d+)\s+(?P[0-9])\s+(?P\w+)\s+(?P[0-9A-F]+)\s+(?P[0-9])\s+(?PRTR|[0-9A-F ]*)$') - def parse_channel(self, match_object: 're.Match[str]') -> str: + def parse_channel(self, match_object: re.Match[str]) -> str: return 'pcan' + match_object.group('channel') @@ -254,10 +254,10 @@ class PCANTracePatternV20(PCANTracePatternV13): pattern = re.compile( r'^\s*?\d+?\s*?(?P\d+.\d+)\s+(?P\w+)\s+(?P[0-9A-F]+)\s+(?P\w+)\s+(?P[0-9]+)(\s+(?P[0-9A-F ]*))?$') - def parse_channel(self, match_object: 're.Match[str]') -> str: + def parse_channel(self, match_object: re.Match[str]) -> str: return 'pcanx' - def parse_data(self, match_object: 're.Match[str]') -> 'tuple[bytes, bool]': + def parse_data(self, match_object: re.Match[str]) -> tuple[bytes, bool]: if match_object.group('type') == 'RR': is_remote_frame = True data = bytes(0) @@ -277,7 +277,7 @@ class PCANTracePatternV21(PCANTracePatternV20): pattern = re.compile( r'^\s*?\d+?\s*?(?P\d+.\d+)\s+(?P\w+)\s+(?P[0-9])\s+(?P[0-9A-F]+)\s+(?P.+)\s+-\s+(?P[0-9]+)(\s+(?P[0-9A-F ]*))?$') - def parse_channel(self, match_object: 're.Match[str]') -> str: + def parse_channel(self, match_object: re.Match[str]) -> str: return 'pcan' + match_object.group('channel') @@ -292,12 +292,12 @@ class Parser: print(f'{frame.timestamp}: {frame.frame_id}') """ - def __init__(self, stream: 'io.TextIOBase|None' = None, *, tz: 'datetime.tzinfo|None' = None) -> None: + def __init__(self, stream: Optional[io.TextIOBase] = None, *, tz: Optional[datetime.tzinfo] = None) -> None: self.stream = stream - self.pattern: typing.Optional[BasePattern] = None + self.pattern: Optional[BasePattern] = None self.tz = tz - def detect_pattern(self, line: str) -> 'BasePattern|None': + def detect_pattern(self, line: str) -> Optional[BasePattern]: for p in [CandumpDefaultPattern(), CandumpTimestampedPattern(self.tz), CandumpDefaultLogPattern(self.tz), CandumpAbsoluteLogPattern(), PCANTracePatternV21(), PCANTracePatternV20(), PCANTracePatternV13(), PCANTracePatternV12(), PCANTracePatternV11(), PCANTracePatternV10()]: mo = p.pattern.match(line) if mo: @@ -305,22 +305,22 @@ def detect_pattern(self, line: str) -> 'BasePattern|None': return None - def parse(self, line: str) -> 'DataFrame|None': + def parse(self, line: str) -> Optional[DataFrame]: if self.pattern is None: self.pattern = self.detect_pattern(line) if self.pattern is None: return None return self.pattern.match(line) - @typing.overload - def iterlines(self) -> 'Iterator[tuple[str, DataFrame]]': + @overload + def iterlines(self) -> Iterator[tuple[str, DataFrame]]: pass - @typing.overload - def iterlines(self, keep_unknowns: 'typing.Literal[True]') -> 'Iterator[tuple[str, DataFrame|None]]': + @overload + def iterlines(self, keep_unknowns: Literal[True]) -> Iterator[tuple[str, Optional[DataFrame]]]: pass - def iterlines(self, keep_unknowns: bool = False) -> 'Iterator[tuple[str, DataFrame|None]]': + def iterlines(self, keep_unknowns: bool = False) -> Iterator[tuple[str, Optional[DataFrame]]]: """Returns an generator that yields (str, DataFrame) tuples with the raw log entry and a parsed log entry. If keep_unknowns=True, (str, None) tuples will be returned for log entries that couldn't be decoded. @@ -341,7 +341,7 @@ def iterlines(self, keep_unknowns: bool = False) -> 'Iterator[tuple[str, DataFra else: continue - def __iter__(self) -> 'Iterator[DataFrame]': + def __iter__(self) -> Iterator[DataFrame]: """Returns DataFrame log entries. Non-parseable log entries is discarded.""" for _, frame in self.iterlines():