diff --git a/libarchive/adapters/archive_read.py b/libarchive/adapters/archive_read.py index a299104..a1da43f 100644 --- a/libarchive/adapters/archive_read.py +++ b/libarchive/adapters/archive_read.py @@ -1,6 +1,7 @@ import contextlib import ctypes import logging +import sys import libarchive.constants.archive import libarchive.exception @@ -49,6 +50,18 @@ def _archive_read_support_format_all(archive): message = c_archive_error_string(archive) raise libarchive.exception.ArchiveError(message) +def _archive_read_add_passphrase(archive, passphrase): + try: + if sys.version_info >= (3, 0): + passphrase = bytes(passphrase, 'utf-8') + else: + passphrase = unicode(passphrase).encode('utf-8') + return libarchive.calls.archive_read.c_archive_read_add_passphrase( + archive, passphrase) + except: + message = c_archive_error_string(archive) + raise libarchive.exception.ArchiveError(message) + def _archive_read_support_format_7zip(archive): try: return libarchive.calls.archive_read.\ @@ -267,7 +280,7 @@ def _set_read_context(archive_res, format_code=None, filter_code=None): archive_read_support_filter_all(archive_res) @contextlib.contextmanager -def _enumerator(opener, entry_cls, format_code=None, filter_code=None): +def _enumerator(opener, entry_cls, passphrases=None, format_code=None, filter_code=None): """Return an archive enumerator from a user-defined source, using a user- defined entry type. """ @@ -275,6 +288,9 @@ def _enumerator(opener, entry_cls, format_code=None, filter_code=None): archive_res = _archive_read_new() try: + if passphrases is not None: + for passphrase in passphrases: + r = _archive_read_add_passphrase(archive_res, passphrase) r = _set_read_context(archive_res, format_code, filter_code) opener(archive_res) diff --git a/libarchive/adapters/archive_write.py b/libarchive/adapters/archive_write.py index 1116926..6d59552 100644 --- a/libarchive/adapters/archive_write.py +++ b/libarchive/adapters/archive_write.py @@ -81,6 +81,30 @@ def _archive_write_data(archive, data): message = c_archive_error_string(archive) raise ValueError("No bytes were written. Error? [%s]" % (message)) +def _archive_write_set_passphrase(archive, passphrase): + try: + if sys.version_info >= (3, 0): + passphrase = bytes(passphrase, 'utf-8') + else: + passphrase = unicode(passphrase).encode('utf-8') + return libarchive.calls.archive_write.c_archive_write_set_passphrase( + archive, passphrase) + except: + message = c_archive_error_string(archive) + raise libarchive.exception.ArchiveError(message) + +def _archive_write_set_options(archive, options): + try: + if sys.version_info >= (3, 0): + options = bytes(options, 'utf-8') + else: + options = unicode(options).encode('utf-8') + return libarchive.calls.archive_write.c_archive_write_set_options( + archive, options) + except: + message = c_archive_error_string(archive) + raise libarchive.exception.ArchiveError(message) + def _archive_write_add_filter_bzip2(archive): try: libarchive.calls.archive_write.c_archive_write_add_filter_bzip2( @@ -190,12 +214,18 @@ def _set_write_context(archive_res, format_code, filter_code=None): def _create(opener, format_code, files, + passphrase=None, + options="zip:encryption=zipcrypt", filter_code=None, block_size=16384): """Create an archive from a collection of files (not recursive).""" a = _archive_write_new() _set_write_context(a, format_code, filter_code) + if passphrase is not None and \ + format_code == libarchive.constants.ARCHIVE_FORMAT_ZIP: + r = _archive_write_set_options(a, options) + r = _archive_write_set_passphrase(a, passphrase) _LOGGER.debug("Opening archive (create).") opener(a) diff --git a/libarchive/calls/archive_read.py b/libarchive/calls/archive_read.py index 986f5a9..e7e30a4 100644 --- a/libarchive/calls/archive_read.py +++ b/libarchive/calls/archive_read.py @@ -84,3 +84,7 @@ def _check_zero_success(value): c_archive_read_data_block = libarchive.archive_read_data_block c_archive_read_data_block.argtypes = [c_void_p, POINTER(c_void_p), POINTER(c_size_t), POINTER(c_longlong)] c_archive_read_data_block.restype = c_int + +c_archive_read_add_passphrase = libarchive.archive_read_add_passphrase +c_archive_read_add_passphrase.argtypes = [c_void_p, c_char_p] +c_archive_read_add_passphrase.restype = c_int diff --git a/libarchive/calls/archive_write.py b/libarchive/calls/archive_write.py index 94e1a52..a3e9e47 100644 --- a/libarchive/calls/archive_write.py +++ b/libarchive/calls/archive_write.py @@ -22,6 +22,14 @@ def _check_zero_success(value): c_archive_write_disk_set_options.argtypes = [c_void_p, c_int] c_archive_write_disk_set_options.restype = _check_zero_success +c_archive_write_set_options = libarchive.archive_write_set_options +c_archive_write_set_options.argtypes = [c_void_p, c_char_p] +c_archive_write_set_options.restype = _check_zero_success + +c_archive_write_set_passphrase = libarchive.archive_write_set_passphrase +c_archive_write_set_passphrase.argtypes = [c_void_p, c_char_p] +c_archive_write_set_passphrase.restype = c_int + c_archive_write_header = libarchive.archive_write_header c_archive_write_header.argtypes = [c_void_p, c_void_p] c_archive_write_header.restype = _check_zero_success diff --git a/libarchive/test_support.py b/libarchive/test_support.py index fb285b3..70ab013 100644 --- a/libarchive/test_support.py +++ b/libarchive/test_support.py @@ -36,11 +36,14 @@ def chdir(path): os.chdir(original_path) @contextlib.contextmanager -def test_archive(): +def test_archive(passphrase=None, encryption="traditional"): with chdir(_APP_PATH): temp_path = tempfile.mkdtemp() - output_filename = 'archive.7z' + if passphrase is not None: + output_filename = 'archive.7z' + else: + output_filename = 'archive.zip' output_filepath = os.path.join(temp_path, output_filename) # Also, write a source file with a unicode name that we can add to @@ -61,10 +64,19 @@ def test_archive(): unicode_test_filepath, ] - libarchive.public.create_file( - output_filepath, - libarchive.constants.ARCHIVE_FORMAT_7ZIP, - files) + if passphrase is not None: + options = "zip:encryption={}".format(encryption) + libarchive.public.create_file( + output_filepath, + libarchive.constants.ARCHIVE_FORMAT_7ZIP, + files, + options=options, + passphrase=passphrase) + else: + libarchive.public.create_file( + output_filepath, + libarchive.constants.ARCHIVE_FORMAT_ZIP, + files) assert \ os.path.exists(output_filepath) is True, \ diff --git a/tests/adapters/test_archive_read.py b/tests/adapters/test_archive_read.py index 38306a2..f7e5093 100644 --- a/tests/adapters/test_archive_read.py +++ b/tests/adapters/test_archive_read.py @@ -14,10 +14,27 @@ class TestArchiveRead(unittest.TestCase): + def _test_enumerate_from_file(self, passphrase=None, encryption="traditional"): + if passphrase is not None: + with libarchive.test_support.test_archive(passphrase, encryption) as filepath: + with libarchive.adapters.archive_read.file_enumerator(filepath, passphrases=[passphrase]) as e: + list(e) + else: + with libarchive.test_support.test_archive() as filepath: + with libarchive.adapters.archive_read.file_enumerator(filepath) as e: + list(e) + def test_enumerate_from_file(self): - with libarchive.test_support.test_archive() as filepath: - with libarchive.adapters.archive_read.file_enumerator(filepath) as e: - list(e) + self._test_enumerate_from_file() + + def test_enumerate_from_file_with_passphrase_traditional(self): + self._test_enumerate_from_file(passphrase="test_passphrase") + + def test_enumerate_from_file_with_passphrase_aes128(self): + self._test_enumerate_from_file(passphrase="test_passphrase", encryption="aes128") + + def test_enumerate_from_file_with_passphrase_aes256(self): + self._test_enumerate_from_file(passphrase="test_passphrase", encryption="aes256") def test_enumerate_from_memory(self): with libarchive.test_support.test_archive() as filepath: diff --git a/tests/adapters/test_archive_write.py b/tests/adapters/test_archive_write.py index 83f037f..6758002 100644 --- a/tests/adapters/test_archive_write.py +++ b/tests/adapters/test_archive_write.py @@ -102,3 +102,54 @@ def test_create_file__unicode(self): ] self.assertEquals(actual, expected) + + def test_create_file_with_passphrase_traditional(self): + self._create_file_with_passphrase("traditional") + + + def test_create_file_with_passphrase_aes128(self): + self._create_file_with_passphrase("aes128") + + + def test_create_file_with_passphrase_aes256(self): + self._create_file_with_passphrase("aes256") + + + def _create_file_with_passphrase(self, encryption): + with libarchive.test_support.chdir(_APP_PATH): + temp_path = tempfile.mkdtemp() + + output_filename = 'archive.zip' + output_filepath = os.path.join(temp_path, output_filename) + try: + files = [ + 'libarchive/resources/README.md', + 'libarchive/resources/requirements.txt', + ] + options = "zip:encryption={}".format(encryption) + libarchive.adapters.archive_write.create_file( + output_filepath, + libarchive.constants.ARCHIVE_FORMAT_ZIP, + files, + options=options, + passphrase="test_passphrase") + + assert \ + os.path.exists(output_filepath) is True, \ + "Test archive was not created correctly." + + with libarchive.adapters.archive_read.file_enumerator(output_filepath, passphrases=["test_passphrase"]) as e: + actual = [entry.pathname for entry in e] + + finally: + try: + shutil.rmtree(output_path) + except: + pass + + expected = [ + 'libarchive/resources/README.md', + 'libarchive/resources/requirements.txt', + ] + + self.assertEquals(actual, expected)