Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 34 additions & 34 deletions src/cantools/logreader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
import datetime
import enum
import re
import typing
from typing import Literal, Optional, Union, TYPE_CHECKING, overload

if typing.TYPE_CHECKING:
if TYPE_CHECKING:
import io
from collections.abc import Iterator


TimestampType = typing.Union[datetime.datetime, datetime.timedelta, None]
TimestampType = Optional[Union[datetime.datetime, datetime.timedelta]]

class TimestampFormat(enum.Enum):
"""Describes a type of timestamp. ABSOLUTE is referring to UNIX time
Expand All @@ -30,7 +30,7 @@ def __init__(self, channel: str,
is_extended_frame: bool,
data: bytes,
is_remote_frame: bool,
timestamp: 'TimestampType',
timestamp: TimestampType,
timestamp_format: TimestampFormat):
"""Constructor for DataFrame

Expand All @@ -55,23 +55,23 @@ def __repr__(self) -> str:

class BasePattern:

pattern: 're.Pattern[str]'
pattern: re.Pattern[str]

def match(self, line: str) -> 'DataFrame|None':
def match(self, line: str) -> Optional[DataFrame]:
mo = self.pattern.match(line)
if mo:
return self.unpack(mo)

return None

@abc.abstractmethod
def unpack(self, match_object: 're.Match[str]') -> 'DataFrame|None':
def unpack(self, match_object: re.Match[str]) -> Optional[DataFrame]:
raise NotImplementedError()


class CandumpBasePattern(BasePattern):

def unpack(self, match_object: 're.Match[str]') -> DataFrame:
def unpack(self, match_object: re.Match[str]) -> DataFrame:
channel = match_object.group('channel')
frame_id = int(match_object.group('can_id'), 16)
is_extended_frame = len(match_object.group('can_id')) > 3
Expand All @@ -88,7 +88,7 @@ def unpack(self, match_object: 're.Match[str]') -> DataFrame:
return DataFrame(channel=channel, frame_id=frame_id, is_extended_frame=is_extended_frame, data=data, is_remote_frame=is_remote_frame, timestamp=timestamp, timestamp_format=timestamp_format)

@abc.abstractmethod
def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]':
def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]:
raise NotImplementedError()


Expand All @@ -101,7 +101,7 @@ class CandumpDefaultPattern(CandumpBasePattern):
pattern = re.compile(
r'^\s*?(?P<channel>[a-zA-Z0-9]+)\s+(?P<can_id>[0-9A-F]+)\s+\[\d+\]\s*(?P<can_data>remote request|[0-9A-F ]*).*?$')

def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]':
def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]:
timestamp = None
timestamp_format = TimestampFormat.MISSING
return timestamp, timestamp_format
Expand All @@ -116,12 +116,12 @@ class CandumpTimestampedPattern(CandumpBasePattern):
pattern = re.compile(
r'^\s*?\((?P<timestamp>[\d.]+)\)\s+(?P<channel>[a-zA-Z0-9]+)\s+(?P<can_id>[0-9A-F]+)\s+\[\d+\]\s*(?P<can_data>remote request|[0-9A-F ]*).*?$')

def __init__(self, tz: 'datetime.tzinfo|None') -> None:
def __init__(self, tz: Optional[datetime.tzinfo]) -> None:
self.tz = tz

def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]':
def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]:
seconds = float(match_object.group('timestamp'))
timestamp: typing.Union[datetime.timedelta, datetime.datetime]
timestamp: Union[datetime.timedelta, datetime.datetime]
if seconds < 662688000: # 1991-01-01 00:00:00, "Released in 1991, the Mercedes-Benz W140 was the first production vehicle to feature a CAN-based multiplex wiring system."
timestamp = datetime.timedelta(seconds=seconds)
timestamp_format = TimestampFormat.RELATIVE
Expand All @@ -138,10 +138,10 @@ class CandumpDefaultLogPattern(CandumpBasePattern):
pattern = re.compile(
r'^\s*?\((?P<timestamp>[\d.]+?)\)\s+?(?P<channel>[a-zA-Z0-9]+)\s+?(?P<can_id>[0-9A-F]+?)#(#[0-9A-F])?(?P<can_data>R|([0-9A-Fa-f]{2})*)(\s+[RT])?$')

def __init__(self, tz: 'datetime.tzinfo|None') -> None:
def __init__(self, tz: Optional[datetime.tzinfo]) -> None:
self.tz = tz

def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]':
def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]:
timestamp = datetime.datetime.fromtimestamp(float(match_object.group('timestamp')), self.tz)
timestamp_format = TimestampFormat.ABSOLUTE
return timestamp, timestamp_format
Expand All @@ -156,7 +156,7 @@ class CandumpAbsoluteLogPattern(CandumpBasePattern):
pattern = re.compile(
r'^\s*?\((?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+)\)\s+(?P<channel>[a-zA-Z0-9]+)\s+(?P<can_id>[0-9A-F]+)\s+\[\d+\]\s*(?P<can_data>remote request|[0-9A-F ]*).*?$')

def parse_timestamp(self, match_object: 're.Match[str]') -> 'tuple[TimestampType, TimestampFormat]':
def parse_timestamp(self, match_object: re.Match[str]) -> tuple[TimestampType, TimestampFormat]:
timestamp = datetime.datetime.strptime(match_object.group('timestamp'), "%Y-%m-%d %H:%M:%S.%f")
timestamp_format = TimestampFormat.ABSOLUTE
return timestamp, timestamp_format
Expand All @@ -173,7 +173,7 @@ class PCANTracePatternV10(BasePattern):
pattern = re.compile(
r'^\s*?\d+\)\s*?(?P<timestamp>\d+)\s+(?P<can_id>[0-9A-F]+)\s+(?P<dlc>[0-9])\s+(?P<can_data>RTR|[0-9A-F ]*)$')

def unpack(self, match_object: 're.Match[str]') -> 'DataFrame|None':
def unpack(self, match_object: re.Match[str]) -> Optional[DataFrame]:
channel = self.parse_channel(match_object)
frame_id = int(match_object.group('can_id'), 16)
is_extended_frame = len(match_object.group('can_id')) > 4
Expand All @@ -185,10 +185,10 @@ def unpack(self, match_object: 're.Match[str]') -> 'DataFrame|None':

return DataFrame(channel=channel, frame_id=frame_id, is_extended_frame=is_extended_frame, data=data, is_remote_frame=is_remote_frame, timestamp=timestamp, timestamp_format=timestamp_format)

def parse_channel(self, match_object: 're.Match[str]') -> str:
def parse_channel(self, match_object: re.Match[str]) -> str:
return 'pcanx'

def parse_data(self, match_object: 're.Match[str]') -> 'tuple[bytes, bool]':
def parse_data(self, match_object: re.Match[str]) -> tuple[bytes, bool]:
data = match_object.group('can_data')
if data == 'RTR':
is_remote_frame = True
Expand All @@ -211,7 +211,7 @@ class PCANTracePatternV11(PCANTracePatternV10):
pattern = re.compile(
r'^\s*?\d+\)\s*?(?P<timestamp>\d+.\d+)\s+(?P<type>\w+)\s+(?P<can_id>[0-9A-F]+)\s+(?P<dlc>[0-9])\s+(?P<can_data>RTR|[0-9A-F ]*)$')

def unpack(self, match_object: 're.Match[str]') -> 'DataFrame|None':
def unpack(self, match_object: re.Match[str]) -> Optional[DataFrame]:
if match_object.group('type') in ('Error', 'Warng'): # yes, they really spell Warning without the 'in'
return None

Expand All @@ -228,7 +228,7 @@ class PCANTracePatternV12(PCANTracePatternV11):
pattern = re.compile(
r'^\s*?\d+\)\s*?(?P<timestamp>\d+.\d+)\s+(?P<channel>[0-9])\s+(?P<type>\w+)\s+(?P<can_id>[0-9A-F]+)\s+(?P<dlc>[0-9])\s+(?P<can_data>RTR|[0-9A-F ]*)$')

def parse_channel(self, match_object: 're.Match[str]') -> str:
def parse_channel(self, match_object: re.Match[str]) -> str:
return 'pcan' + match_object.group('channel')


Expand All @@ -254,10 +254,10 @@ class PCANTracePatternV20(PCANTracePatternV13):
pattern = re.compile(
r'^\s*?\d+?\s*?(?P<timestamp>\d+.\d+)\s+(?P<type>\w+)\s+(?P<can_id>[0-9A-F]+)\s+(?P<rxtx>\w+)\s+(?P<dlc>[0-9]+)(\s+(?P<can_data>[0-9A-F ]*))?$')

def parse_channel(self, match_object: 're.Match[str]') -> str:
def parse_channel(self, match_object: re.Match[str]) -> str:
return 'pcanx'

def parse_data(self, match_object: 're.Match[str]') -> 'tuple[bytes, bool]':
def parse_data(self, match_object: re.Match[str]) -> tuple[bytes, bool]:
if match_object.group('type') == 'RR':
is_remote_frame = True
data = bytes(0)
Expand All @@ -277,7 +277,7 @@ class PCANTracePatternV21(PCANTracePatternV20):
pattern = re.compile(
r'^\s*?\d+?\s*?(?P<timestamp>\d+.\d+)\s+(?P<type>\w+)\s+(?P<channel>[0-9])\s+(?P<can_id>[0-9A-F]+)\s+(?P<rxtx>.+)\s+-\s+(?P<dlc>[0-9]+)(\s+(?P<can_data>[0-9A-F ]*))?$')

def parse_channel(self, match_object: 're.Match[str]') -> str:
def parse_channel(self, match_object: re.Match[str]) -> str:
return 'pcan' + match_object.group('channel')


Expand All @@ -292,35 +292,35 @@ class Parser:
print(f'{frame.timestamp}: {frame.frame_id}')
"""

def __init__(self, stream: 'io.TextIOBase|None' = None, *, tz: 'datetime.tzinfo|None' = None) -> None:
def __init__(self, stream: Optional[io.TextIOBase] = None, *, tz: Optional[datetime.tzinfo] = None) -> None:
self.stream = stream
self.pattern: typing.Optional[BasePattern] = None
self.pattern: Optional[BasePattern] = None
self.tz = tz

def detect_pattern(self, line: str) -> 'BasePattern|None':
def detect_pattern(self, line: str) -> Optional[BasePattern]:
for p in [CandumpDefaultPattern(), CandumpTimestampedPattern(self.tz), CandumpDefaultLogPattern(self.tz), CandumpAbsoluteLogPattern(), PCANTracePatternV21(), PCANTracePatternV20(), PCANTracePatternV13(), PCANTracePatternV12(), PCANTracePatternV11(), PCANTracePatternV10()]:
mo = p.pattern.match(line)
if mo:
return p

return None

def parse(self, line: str) -> 'DataFrame|None':
def parse(self, line: str) -> Optional[DataFrame]:
if self.pattern is None:
self.pattern = self.detect_pattern(line)
if self.pattern is None:
return None
return self.pattern.match(line)

@typing.overload
def iterlines(self) -> 'Iterator[tuple[str, DataFrame]]':
@overload
def iterlines(self) -> Iterator[tuple[str, DataFrame]]:
pass

@typing.overload
def iterlines(self, keep_unknowns: 'typing.Literal[True]') -> 'Iterator[tuple[str, DataFrame|None]]':
@overload
def iterlines(self, keep_unknowns: Literal[True]) -> Iterator[tuple[str, Optional[DataFrame]]]:
pass

def iterlines(self, keep_unknowns: bool = False) -> 'Iterator[tuple[str, DataFrame|None]]':
def iterlines(self, keep_unknowns: bool = False) -> Iterator[tuple[str, Optional[DataFrame]]]:
"""Returns an generator that yields (str, DataFrame) tuples with the
raw log entry and a parsed log entry. If keep_unknowns=True, (str,
None) tuples will be returned for log entries that couldn't be decoded.
Expand All @@ -341,7 +341,7 @@ def iterlines(self, keep_unknowns: bool = False) -> 'Iterator[tuple[str, DataFra
else:
continue

def __iter__(self) -> 'Iterator[DataFrame]':
def __iter__(self) -> Iterator[DataFrame]:
"""Returns DataFrame log entries. Non-parseable log entries is
discarded."""
for _, frame in self.iterlines():
Expand Down
Loading