diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..eb7e8f4 --- /dev/null +++ b/.flake8 @@ -0,0 +1,3 @@ +[flake8] +exclude = unidiff/__init__.py + diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml new file mode 100644 index 0000000..e3b8606 --- /dev/null +++ b/.github/workflows/lint.yml @@ -0,0 +1,25 @@ +name: Lint and type check + +on: + pull_request: + branches: + - main + +jobs: + lint-and-type-check: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v5 + - uses: actions/setup-python@v6 + with: + python-version: '3.11' + + - name: Install flake8 and mypy + run: pip install flake8 mypy + + - name: Run flake8 + run: flake8 unidiff + + - name: Run mypy + run: mypy unidiff diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..f1ec576 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,18 @@ +name: Run tests + +on: + pull_request: + branches: + - main + +jobs: + lint-and-type-check: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v5 + - uses: actions/setup-python@v6 + with: + python-version: '3.11' + - run: + ./run_tests.sh diff --git a/HISTORY b/HISTORY index 93bb138..6b0954f 100644 --- a/HISTORY +++ b/HISTORY @@ -1,6 +1,12 @@ History ------- +0.7.7 - 2025-03-09 +------------------ + +* Drop Python2 support +* Fixed an ImportError of version in init file after changes in the project structure + 0.7.6 - 2025-03-09 ------------------ diff --git a/pyproject.toml b/pyproject.toml index 278e71b..562e39b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "unidiff2" -version = "0.7.6" +version = "0.7.7" description = "Unified diff parsing/metadata extraction library." readme = {file = "README.rst", content-type = "text/x-rst"} keywords = ["unified", "diff", "parse", "metadata"] diff --git a/tests/test_parser.py b/tests/test_parser.py index 74afa43..1ca3388 100644 --- a/tests/test_parser.py +++ b/tests/test_parser.py @@ -31,11 +31,10 @@ import unittest from unidiff import PatchSet -from unidiff.patch import PY2 from unidiff.errors import UnidiffParseError -if not PY2: - unicode = str +unicode = str + class TestUnidiffParser(unittest.TestCase): """Tests for Unified Diff Parser.""" @@ -52,11 +51,7 @@ def test_missing_encoding(self): utf8_file = os.path.join(self.samples_dir, 'samples/sample3.diff') # read bytes with open(utf8_file, 'rb') as diff_file: - if PY2: - self.assertRaises(UnicodeDecodeError, PatchSet, diff_file) - else: - # unicode expected - self.assertRaises(TypeError, PatchSet, diff_file) + self.assertRaises(TypeError, PatchSet, diff_file) def test_encoding_param(self): utf8_file = os.path.join(self.samples_dir, 'samples/sample3.diff') diff --git a/unidiff/__init__.py b/unidiff/__init__.py index 5f50cc2..327f9c9 100644 --- a/unidiff/__init__.py +++ b/unidiff/__init__.py @@ -26,7 +26,6 @@ from __future__ import unicode_literals -from unidiff import __version__ from unidiff.patch import ( DEFAULT_ENCODING, LINE_TYPE_ADDED, @@ -38,4 +37,3 @@ UnidiffParseError, ) -VERSION = __version__.__version__ diff --git a/unidiff/constants.py b/unidiff/constants.py index be63883..2166e4b 100644 --- a/unidiff/constants.py +++ b/unidiff/constants.py @@ -69,7 +69,10 @@ RE_BINARY_DIFF = re.compile( r'^Binary files? ' r'(?P[^\t]+?)(?:\t(?P[\s0-9:\+-]+))?' - r'(?: and (?P[^\t]+?)(?:\t(?P[\s0-9:\+-]+))?)? (differ|has changed)') + r'(?: and (?P[^\t]+?)' + r'(?:\t(?P[\s0-9:\+-]+))?)' + r'? (differ|has changed)' +) RE_PATCH_FILE_PREFIX = re.compile(r"^[abciow12]/.*$") diff --git a/unidiff/patch.py b/unidiff/patch.py index 86db378..48cb93a 100644 --- a/unidiff/patch.py +++ b/unidiff/patch.py @@ -25,9 +25,8 @@ """Classes used by the unified diff parser to keep the diff data.""" from __future__ import unicode_literals - -import codecs -import sys +from io import StringIO +from typing import Iterable, Iterator, List, Optional, Union, Self from unidiff.constants import ( DEFAULT_ENCODING, @@ -55,34 +54,21 @@ from unidiff.errors import UnidiffParseError -PY2 = sys.version_info[0] == 2 -if PY2: - import io - from StringIO import StringIO - open_file = io.open - make_str = lambda x: x.encode(DEFAULT_ENCODING) - - def implements_to_string(cls): - cls.__unicode__ = cls.__str__ - cls.__str__ = lambda x: x.__unicode__().encode(DEFAULT_ENCODING) - return cls -else: - from io import StringIO - from typing import Iterable, Optional, Union - open_file = open - make_str = str - implements_to_string = lambda x: x - unicode = str - basestring = str - - -@implements_to_string +def to_int_or_none(value): + return int(value) if value is not None else None + + class Line(object): """A diff line.""" - def __init__(self, value, line_type, - source_line_no=None, target_line_no=None, diff_line_no=None): - # type: (str, str, Optional[int], Optional[int], Optional[int]) -> None + def __init__( + self, + value: str, + line_type: str, + source_line_no: Optional[int] = None, + target_line_no: Optional[int] = None, + diff_line_no: Optional[int] = None, + ) -> None: super(Line, self).__init__() self.source_line_no = source_line_no self.target_line_no = target_line_no @@ -90,16 +76,13 @@ def __init__(self, value, line_type, self.line_type = line_type self.value = value - def __repr__(self): - # type: () -> str - return make_str("") % (self.line_type, self.value) + def __repr__(self) -> str: + return str("") % (self.line_type, self.value) - def __str__(self): - # type: () -> str + def __str__(self) -> str: return "%s%s" % (self.line_type, self.value) - def __eq__(self, other): - # type: (Line) -> bool + def __eq__(self, other) -> bool: return (self.source_line_no == other.source_line_no and self.target_line_no == other.target_line_no and self.diff_line_no == other.diff_line_no and @@ -107,22 +90,18 @@ def __eq__(self, other): self.value == other.value) @property - def is_added(self): - # type: () -> bool + def is_added(self) -> bool: return self.line_type == LINE_TYPE_ADDED @property - def is_removed(self): - # type: () -> bool + def is_removed(self) -> bool: return self.line_type == LINE_TYPE_REMOVED @property - def is_context(self): - # type: () -> bool + def is_context(self) -> bool: return self.line_type == LINE_TYPE_CONTEXT -@implements_to_string class PatchInfo(list): """Lines with extended patch info. @@ -131,23 +110,25 @@ class PatchInfo(list): """ - def __repr__(self): - # type: () -> str + def __repr__(self) -> str: value = "" % self[0].strip() - return make_str(value) + return str(value) - def __str__(self): - # type: () -> str - return ''.join(unicode(line) for line in self) + def __str__(self) -> str: + return ''.join(str(line) for line in self) -@implements_to_string class Hunk(list): """Each of the modified blocks of a file.""" - def __init__(self, src_start=0, src_len=0, tgt_start=0, tgt_len=0, - section_header=''): - # type: (int, int, int, int, str) -> None + def __init__( + self, + src_start: int = 0, + src_len: int = 0, + tgt_start: int = 0, + tgt_len: int = 0, + section_header: str = '' + ) -> None: super(Hunk, self).__init__() if src_len is None: src_len = 1 @@ -158,30 +139,27 @@ def __init__(self, src_start=0, src_len=0, tgt_start=0, tgt_len=0, self.target_start = int(tgt_start) self.target_length = int(tgt_len) self.section_header = section_header - self._added = None # Optional[int] - self._removed = None # Optional[int] + self._added: Optional[int] = None + self._removed: Optional[int] = None - def __repr__(self): - # type: () -> str + def __repr__(self) -> str: value = "" % (self.source_start, self.source_length, self.target_start, self.target_length, self.section_header) - return make_str(value) + return str(value) - def __str__(self): - # type: () -> str + def __str__(self) -> str: # section header is optional and thus we output it only if it's present head = "@@ -%d,%d +%d,%d @@%s\n" % ( self.source_start, self.source_length, self.target_start, self.target_length, ' ' + self.section_header if self.section_header else '') - content = ''.join(unicode(line) for line in self) + content = ''.join(str(line) for line in self) return head + content - def append(self, line): - # type: (Line) -> None + def append(self, line: Line) -> None: """Append the line to hunk, and keep track of source/target lines.""" # Make sure the line is encoded correctly. This is a no-op except for # potentially raising a UnicodeDecodeError. @@ -189,8 +167,7 @@ def append(self, line): super(Hunk, self).append(line) @property - def added(self): - # type: () -> Optional[int] + def added(self) -> Optional[int]: if self._added is not None: return self._added # re-calculate each time to allow for hunk modifications @@ -198,48 +175,47 @@ def added(self): return sum(1 for line in self if line.is_added) @property - def removed(self): - # type: () -> Optional[int] + def removed(self) -> Optional[int]: if self._removed is not None: return self._removed # re-calculate each time to allow for hunk modifications # (which should mean metadata_only switch wasn't used) return sum(1 for line in self if line.is_removed) - def is_valid(self): - # type: () -> bool + def is_valid(self) -> bool: """Check hunk header data matches entered lines info.""" return (len(self.source) == self.source_length and len(self.target) == self.target_length) - def source_lines(self): - # type: () -> Iterable[Line] + def source_lines(self) -> Iterable[Line]: """Hunk lines from source file (generator).""" - return (l for l in self if l.is_context or l.is_removed) + return (line for line in self if line.is_context or line.is_removed) @property - def source(self): - # type: () -> Iterable[str] - return [str(l) for l in self.source_lines()] + def source(self) -> List[str]: + return [str(line) for line in self.source_lines()] - def target_lines(self): - # type: () -> Iterable[Line] + def target_lines(self) -> Iterable[Line]: """Hunk lines from target file (generator).""" - return (l for l in self if l.is_context or l.is_added) + return (line for line in self if line.is_context or line.is_added) @property - def target(self): - # type: () -> Iterable[str] - return [str(l) for l in self.target_lines()] + def target(self) -> List[str]: + return [str(line) for line in self.target_lines()] class PatchedFile(list): """Patch updated file, it is a list of Hunks.""" - def __init__(self, patch_info=None, source='', target='', - source_timestamp=None, target_timestamp=None, - is_binary_file=False): - # type: (Optional[PatchInfo], str, str, Optional[str], Optional[str], bool, bool) -> None + def __init__( + self, + patch_info: Optional[PatchInfo] = None, + source: str = '', + target: str = '', + source_timestamp: Optional[str] = None, + target_timestamp: Optional[str] = None, + is_binary_file: bool = False, + ) -> None: super(PatchedFile, self).__init__() self.patch_info = patch_info self.source_file = source @@ -248,12 +224,10 @@ def __init__(self, patch_info=None, source='', target='', self.target_timestamp = target_timestamp self.is_binary_file = is_binary_file - def __repr__(self): - # type: () -> str - return make_str("") % make_str(self.path) + def __repr__(self) -> str: + return str("") % str(self.path) - def __str__(self): - # type: () -> str + def __str__(self) -> str: source = '' target = '' # patch info is optional @@ -265,16 +239,29 @@ def __str__(self): target = "+++ %s%s\n" % ( self.target_file, '\t' + self.target_timestamp if self.target_timestamp else '') - hunks = ''.join(unicode(hunk) for hunk in self) + hunks = ''.join(str(hunk) for hunk in self) return info + source + target + hunks - def _parse_hunk(self, header, diff, encoding, metadata_only): - # type: (str, enumerate[str], Optional[str], bool) -> None + def _parse_hunk( + self, + header: str, + diff: Union[enumerate[str], enumerate[bytes], enumerate[StringIO]], + encoding: Optional[str], + metadata_only: bool, + ) -> None: """Parse hunk details.""" header_info = RE_HUNK_HEADER.match(header) - hunk_info = header_info.groups() - hunk = Hunk(*hunk_info) - + if not header_info: + return None + src_start, src_len, tgt_start, tgt_len, header = header_info.groups() + + hunk = Hunk( + src_start=to_int_or_none(src_start), + src_len=to_int_or_none(src_len), + tgt_start=to_int_or_none(tgt_start), + tgt_len=to_int_or_none(tgt_len), + section_header=header, + ) source_line_no = hunk.source_start target_line_no = hunk.target_start expected_source_end = source_line_no + hunk.source_length @@ -283,8 +270,9 @@ def _parse_hunk(self, header, diff, encoding, metadata_only): removed = 0 for diff_line_no, line in diff: - if encoding is not None: + if encoding is not None and isinstance(line, bytes): line = line.decode(encoding) + line = str(line) if metadata_only: # quick line type detection, no regex required @@ -293,8 +281,7 @@ def _parse_hunk(self, header, diff, encoding, metadata_only): LINE_TYPE_REMOVED, LINE_TYPE_CONTEXT, LINE_TYPE_NO_NEWLINE): - raise UnidiffParseError( - 'Hunk diff line expected: %s' % line) + raise UnidiffParseError(f'Hunk diff line expected: {line}') if line_type == LINE_TYPE_ADDED: target_line_no += 1 @@ -368,8 +355,7 @@ def _parse_hunk(self, header, diff, encoding, metadata_only): self.append(hunk) - def _add_no_newline_marker_to_last_hunk(self): - # type: () -> None + def _add_no_newline_marker_to_last_hunk(self) -> None: if not self: raise UnidiffParseError( 'Unexpected marker:' + LINE_VALUE_NO_NEWLINE) @@ -377,16 +363,14 @@ def _add_no_newline_marker_to_last_hunk(self): last_hunk.append( Line(LINE_VALUE_NO_NEWLINE + '\n', line_type=LINE_TYPE_NO_NEWLINE)) - def _append_trailing_empty_line(self): - # type: () -> None + def _append_trailing_empty_line(self) -> None: if not self: raise UnidiffParseError('Unexpected trailing newline character') last_hunk = self[-1] last_hunk.append(Line('\n', line_type=LINE_TYPE_EMPTY)) @property - def path(self): - # type: () -> str + def path(self) -> str: """Return the file path abstracted from VCS.""" filepath = self.source_file if filepath in (None, DEV_NULL) or ( @@ -407,26 +391,23 @@ def path(self): return filepath @property - def added(self): - # type: () -> int + def added(self) -> int: """Return the file total added lines.""" return sum([hunk.added for hunk in self]) @property - def removed(self): - # type: () -> int + def removed(self) -> int: """Return the file total removed lines.""" return sum([hunk.removed for hunk in self]) @property - def is_rename(self): + def is_rename(self) -> bool: return (self.source_file != DEV_NULL - and self.target_file != DEV_NULL - and self.source_file[2:] != self.target_file[2:]) + and self.target_file != DEV_NULL + and self.source_file[2:] != self.target_file[2:]) @property - def is_added_file(self): - # type: () -> bool + def is_added_file(self) -> bool: """Return True if this patch adds the file.""" if self.source_file == DEV_NULL: return True @@ -434,8 +415,7 @@ def is_added_file(self): self[0].source_length == 0) @property - def is_removed_file(self): - # type: () -> bool + def is_removed_file(self) -> bool: """Return True if this patch removes the file.""" if self.target_file == DEV_NULL: return True @@ -443,23 +423,25 @@ def is_removed_file(self): self[0].target_length == 0) @property - def is_modified_file(self): - # type: () -> bool + def is_modified_file(self) -> bool: """Return True if this patch modifies the file.""" return not (self.is_added_file or self.is_removed_file) -@implements_to_string class PatchSet(list): """A list of PatchedFiles.""" - def __init__(self, f, encoding=None, metadata_only=False): - # type: (Union[StringIO, str], Optional[str], bool) -> None + def __init__( + self, + f: Union[StringIO, str], + encoding: Optional[str] = None, + metadata_only: bool = False + ) -> None: super(PatchSet, self).__init__() # convert string inputs to StringIO objects - if isinstance(f, basestring): - f = self._convert_string(f, encoding) # type: StringIO + if isinstance(f, str): + f = self._convert_string(f, encoding) # make sure we pass an iterator object to parse data = iter(f) @@ -467,24 +449,30 @@ def __init__(self, f, encoding=None, metadata_only=False): # when metadata_only is True, only perform a minimal metadata parsing # (ie. hunks without content) which is around 2.5-6 times faster; # it will still validate the diff metadata consistency and get counts - self._parse(data, encoding=encoding, metadata_only=metadata_only) - - def __repr__(self): - # type: () -> str - return make_str('') % super(PatchSet, self).__repr__() - - def __str__(self): - # type: () -> str - return ''.join(unicode(patched_file) for patched_file in self) - - def _parse(self, diff, encoding, metadata_only): - # type: (StringIO, Optional[str], bool) -> None + self._parse( + data, # type: ignore + encoding=encoding, + metadata_only=metadata_only, + ) + + def __repr__(self) -> str: + return str('') % super(PatchSet, self).__repr__() + + def __str__(self) -> str: + return ''.join(str(patched_file) for patched_file in self) + + def _parse( + self, + diff: Iterator[str], + encoding: Optional[str], + metadata_only: bool, + ) -> None: current_file = None patch_info = None + enumerated_diff = enumerate(diff, 1) - diff = enumerate(diff, 1) - for unused_diff_line_no, line in diff: - if encoding is not None: + for _, line in enumerated_diff: + if encoding is not None and isinstance(line, bytes): line = line.decode(encoding) # check for a git file rename @@ -505,7 +493,9 @@ def _parse(self, diff, encoding, metadata_only): is_diff_git_new_file = RE_DIFF_GIT_NEW_FILE.match(line) if is_diff_git_new_file: if current_file is None or patch_info is None: - raise UnidiffParseError('Unexpected new file found: %s' % line) + raise UnidiffParseError( + f'Unexpected new file found: {line}' + ) current_file.source_file = DEV_NULL patch_info.append(line) continue @@ -514,7 +504,9 @@ def _parse(self, diff, encoding, metadata_only): is_diff_git_deleted_file = RE_DIFF_GIT_DELETED_FILE.match(line) if is_diff_git_deleted_file: if current_file is None or patch_info is None: - raise UnidiffParseError('Unexpected deleted file found: %s' % line) + raise UnidiffParseError( + f'Unexpected deleted file found: {line}' + ) current_file.target_file = DEV_NULL patch_info.append(line) continue @@ -538,7 +530,8 @@ def _parse(self, diff, encoding, metadata_only): if is_target_filename: target_file = is_target_filename.group('filename') target_timestamp = is_target_filename.group('timestamp') - if current_file is not None and not (current_file.target_file == target_file): + if (current_file is not None and + not (current_file.target_file == target_file)): raise UnidiffParseError('Target without source: %s' % line) if current_file is None: # add current file to PatchSet @@ -556,15 +549,20 @@ def _parse(self, diff, encoding, metadata_only): if is_hunk_header: patch_info = None if current_file is None: - raise UnidiffParseError('Unexpected hunk found: %s' % line) - current_file._parse_hunk(line, diff, encoding, metadata_only) + raise UnidiffParseError('Unexpected hunk found: {line}') + current_file._parse_hunk( + line, + enumerated_diff, + encoding, + metadata_only, + ) continue # check for no newline marker is_no_newline = RE_NO_NEWLINE_MARKER.match(line) if is_no_newline: if current_file is None: - raise UnidiffParseError('Unexpected marker: %s' % line) + raise UnidiffParseError(f'Unexpected marker: {line}') current_file._add_no_newline_marker_to_last_hunk() continue @@ -587,14 +585,19 @@ def _parse(self, diff, encoding, metadata_only): current_file.is_binary_file = True else: current_file = PatchedFile( - patch_info, source_file, target_file, is_binary_file=True) + patch_info, + source_file, + target_file, + is_binary_file=True + ) self.append(current_file) patch_info = None current_file = None continue if line == 'GIT binary patch\n': - current_file.is_binary_file = True + if current_file is not None: + current_file.is_binary_file = True patch_info = None current_file = None continue @@ -602,53 +605,69 @@ def _parse(self, diff, encoding, metadata_only): patch_info.append(line) @classmethod - def from_filename(cls, filename, encoding=DEFAULT_ENCODING, errors=None, newline=None): - # type: (str, str, Optional[str]) -> PatchSet + def from_filename( + cls, + filename: str, + encoding: str = DEFAULT_ENCODING, + errors: Optional[str] = None, + newline: Optional[str] = None, + ) -> Self: """Return a PatchSet instance given a diff filename.""" - with open_file(filename, 'r', encoding=encoding, errors=errors, newline=newline) as f: - instance = cls(f) + with open( + filename, + 'r', + encoding=encoding, + errors=errors, + newline=newline + ) as f: + instance = cls(f.read()) return instance @staticmethod - def _convert_string(data, encoding=None, errors='strict'): - # type: (Union[str, bytes], str, str) -> StringIO - if encoding is not None: - # if encoding is given, assume bytes and decode - data = unicode(data, encoding=encoding, errors=errors) + def _convert_string( + data: Union[str, bytes], + encoding: Optional[str] = None, + errors: str = 'strict' + ) -> StringIO: + if not isinstance(data, str): + if encoding: + # if encoding is given, assume bytes and decode + data = str(data, encoding=encoding, errors=errors) + else: + raise Exception('convert error') return StringIO(data) @classmethod - def from_string(cls, data, encoding=None, errors='strict'): - # type: (str, str, Optional[str]) -> PatchSet + def from_string( + cls, + data: str, + encoding: Optional[str] = None, + errors: str = 'strict', + ) -> Self: """Return a PatchSet instance given a diff string.""" return cls(cls._convert_string(data, encoding, errors)) @property - def added_files(self): - # type: () -> list[PatchedFile] + def added_files(self) -> List[PatchedFile]: """Return patch added files as a list.""" return [f for f in self if f.is_added_file] @property - def removed_files(self): - # type: () -> list[PatchedFile] + def removed_files(self) -> List[PatchedFile]: """Return patch removed files as a list.""" return [f for f in self if f.is_removed_file] @property - def modified_files(self): - # type: () -> list[PatchedFile] + def modified_files(self) -> List[PatchedFile]: """Return patch modified files as a list.""" return [f for f in self if f.is_modified_file] @property - def added(self): - # type: () -> int + def added(self) -> int: """Return the patch total added lines.""" return sum([f.added for f in self]) @property - def removed(self): - # type: () -> int + def removed(self) -> int: """Return the patch total removed lines.""" return sum([f.removed for f in self])