diff --git a/compressor.py b/compressor.py new file mode 100644 index 0000000..b31389d --- /dev/null +++ b/compressor.py @@ -0,0 +1,107 @@ +from collections import abc +from typing import Union + + +class ObjectCompressor: + """ + Note: Tested with Python v 3.7.3 + This is a class containing compress and decompress functions as described on https://github.com/Ontraport/Backend-Test + """ + def compress(self, object_to_compress, path_so_far: str = "", dictionary: dict = None) -> dict: + """ + This function compresses a multi-dimensional container of any size (tested with nested Python dictionaries and nested custom classes) + and returns a compressed version of the original container as a Python dictionary + (note that a top level empty Array becomes an empty Array) + """ + if(dictionary is None): + dictionary = {} + # If this is a Collection, it will always be Sized, but we want to handle Mappings slightly different, since they can contain nested objects + if(isinstance(object_to_compress, abc.Collection)): + # If this is a Mapping (e.g. a dictionary), iterate through keys and compress recursively + if(isinstance(object_to_compress, abc.Mapping)): + if(len(object_to_compress) > 0): + for attribute in object_to_compress.keys(): + value = object_to_compress.get(attribute) + self.save_or_recurse(path_so_far + "/" + str(attribute), dictionary, value) + else: + if(path_so_far): # only add this object if it wasn't top level + self.save_or_recurse(path_so_far, dictionary, {}, True) + # If this is a Sized Collection (e.g. a List), iterate through elements and compress recursively + elif(isinstance(object_to_compress, abc.Sized)): + if(len(object_to_compress) > 0): + for i in range(len(object_to_compress)): + value = object_to_compress[i] + self.save_or_recurse(path_so_far + "/" + str(i), dictionary, value) + else: + if(path_so_far): # only add this object if it wasn't top level + self.save_or_recurse(path_so_far, dictionary, [], True) + else: + return [] + # If this is a custom object, iterate through non-private attributes and compress + else: + filtered_attributes = list(filter(lambda x: not x.startswith('__'), dir(object_to_compress))) + if(len(filtered_attributes) > 0): + for attribute in filtered_attributes: + value = getattr(object_to_compress, str(attribute)) + self.save_or_recurse(path_so_far + "/" + str(attribute), dictionary, value) + else: + if(path_so_far): # only add this object if it wasn't top level + self.save_or_recurse(path_so_far, dictionary, {}, True) + return dictionary + + def save_or_recurse(self, path_so_far: str, dictionary: dict, value, force_insert: bool = False) -> dict: + """ + This helper function was introduced to minimize duplicate code in the compress(...) method + """ + if(path_so_far.startswith("/")): + path_so_far = path_so_far[1:] + if (not isinstance(value, (float, int, str, bool, type(None))) and force_insert is False): + return self.compress(value, path_so_far, dictionary) + else: + dictionary[path_so_far] = value + return dictionary + + def decompress(self, compressed_object: dict) -> Union[dict, list]: + """ + This function expands a compressed container into a dictionary representation of its original form + NOTE: If a custom object was flattened, decompress will still return a dictionary representation + """ + if(isinstance(compressed_object, abc.Collection)): + if(isinstance(compressed_object, abc.Mapping)): + resultObject = {} + keys = compressed_object.keys() + # For each key in the compressed object + for attribute in compressed_object.keys(): + pathList = attribute.split("/") + if(pathList[0].isnumeric() and isinstance(resultObject, abc.Mapping)): + resultObject = [] + pointer = resultObject + # For each token in the path + for i in range(len(pathList)): + token = pathList[i] + if(isinstance(pointer, abc.MutableSequence)): # We are in a list + if(token.isnumeric()): + if(int(token) < len(pointer)): # Already exists in this list + pointer = pointer[int(token)] + else: + if(i < len(pathList) - 1 and pathList[i+1].isnumeric()): + pointer.append([]) + elif(i == len(pathList) - 1): + pointer.append(compressed_object[attribute]) + else: + pointer.append({}) + pointer = pointer[-1] + else: # We are in a Map + if(i < len(pathList) - 1): + if(pathList[i+1].isnumeric()): # There is an intermediate array + if(token not in pointer): + pointer[token] = [] + else: + if(token not in pointer): + pointer[token] = {} + pointer = pointer[token] + else: + pointer[token] = compressed_object[attribute] + return resultObject + elif(isinstance(compressed_object, abc.Sized) and len(compressed_object) == 0): + return compressed_object diff --git a/tests.py b/tests.py new file mode 100644 index 0000000..80d87e2 --- /dev/null +++ b/tests.py @@ -0,0 +1,146 @@ +import unittest +from compressor import ObjectCompressor + + +class CompressionTests(unittest.TestCase): + """ + This is a class unit tests for compress/decompress functions + They can be run with ' tests.py' + """ + def setUp(self): + self.object_compressor = ObjectCompressor() + self.example_dictionary = {'one': {'two': 3, 'four': [5, 6, 7]}, 'eight': {'nine': {'ten': 11}}} + self.example_dictionary_compressed = {'one/two': 3, 'one/four/0': 5, 'one/four/1': 6, 'one/four/2': 7, 'eight/nine/ten': 11} + + self.custom_object = MyCustomObject() + self.custom_object.one = MyCustomObject() + self.custom_object.one.two = 3 + self.custom_object.one.four = [5, 6, 7] + self.custom_object.eight = MyCustomObject() + self.custom_object.eight.nine = MyCustomObject() + self.custom_object.eight.nine.ten = 11 + + def test_example_case_compression(self): + compressed_version = self.object_compressor.compress(self.example_dictionary) + self.assertEqual(self.example_dictionary_compressed, compressed_version) + + def test_example_caseDecompression(self): + decompressed_version = self.object_compressor.decompress(self.example_dictionary_compressed) + self.assertEqual(self.example_dictionary, decompressed_version) + + def test_example_case_compression_and_decompression(self): + compressed_version = self.object_compressor.compress(self.example_dictionary) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(self.example_dictionary, decompressed_version) + + def test_middle_array_compression(self): + dictionary_with_middle_array = {'one': {'two': 3, 'four': [{'a': 2.3, 'b': None}, 'c']}, 'eight': {'nine': {'ten': 11}}} + middle_array_compressed = {'one/two': 3, 'one/four/0/a': 2.3, 'one/four/0/b': None, 'one/four/1': 'c', 'eight/nine/ten': 11} + compressed_version = self.object_compressor.compress(dictionary_with_middle_array) + self.assertEqual(middle_array_compressed, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(dictionary_with_middle_array, decompressed_version) + + def test_top_array_compression(self): + array_object = [{'two': 3}, {'four': [{'a': 2.3, 'b': None}, 'c']}, {'eight': {'nine': {'ten': 11}}}] + top_array_compressed = {'0/two': 3, '1/four/0/a': 2.3, '1/four/0/b': None, '1/four/1': 'c', '2/eight/nine/ten': 11} + compressed_version = self.object_compressor.compress(array_object) + self.assertEqual(top_array_compressed, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(array_object, decompressed_version) + + def test_empty_array_compression(self): + dictionary_with_empty_array = {'one': {'two': 3, 'four': []}, 'eight': {'nine': {'ten': 11}}} + empty_dictionary_compressed = {'one/two': 3, 'one/four': [], 'eight/nine/ten': 11} + compressed_version = self.object_compressor.compress(dictionary_with_empty_array) + self.assertEqual(empty_dictionary_compressed, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(dictionary_with_empty_array, decompressed_version) + + def test_empty_object_compression(self): + dictionary_with_empty_container = {'one': {'two': 3, 'four': [5, 6, 7]}, 'eight': {'nine': {}}} + empty_dictionary_compressed = {'one/two': 3, 'one/four/0': 5, 'one/four/1': 6, 'one/four/2': 7, 'eight/nine': {}} + compressed_version = self.object_compressor.compress(dictionary_with_empty_container) + self.assertEqual(empty_dictionary_compressed, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(dictionary_with_empty_container, decompressed_version) + + def test_string_and_bool_and_float_compression(self): + object_to_test = {'a': {'d': True}, 'b': 'myString', 'c': 21.89} + decompressed_object_to_test = {'a/d': True, 'b': 'myString', 'c': 21.89} + compressed_version = self.object_compressor.compress(object_to_test) + self.assertEqual(decompressed_object_to_test, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(object_to_test, decompressed_version) + + def test_empty_top_level_object_compression(self): + compressed_version = self.object_compressor.compress({}) + self.assertEqual({}, compressed_version) + + def test_single_value_compression(self): + target_object = {'a': 1} + compressed_version = self.object_compressor.compress(target_object) + self.assertEqual({'a': 1}, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(target_object, decompressed_version) + + def test_single_value_empty_object_compression(self): + target_object = {'a': {}} + compressed_version = self.object_compressor.compress(target_object) + self.assertEqual({'a': {}}, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(target_object, decompressed_version) + + def test_single_value_empty_array_compression(self): + target_object = {'a': []} + compressed_version = self.object_compressor.compress(target_object) + self.assertEqual({'a': []}, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(target_object, decompressed_version) + + def test_empty_top_level_array_compression(self): + target_object = [] + compressed_version = self.object_compressor.compress([]) + self.assertEqual([], compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(target_object, decompressed_version) + + def test_single_array_compression(self): + compressed_version = self.object_compressor.compress({'a': [0, 1, 2]}) + self.assertEqual({'a/0': 0, 'a/1': 1, 'a/2': 2}, compressed_version) + + # These are the example case tests using a custom Python object instead of dictionary/List as a container + def test_custom_object_compression(self): + compressed_version = self.object_compressor.compress(self.custom_object) + self.assertEqual(self.example_dictionary_compressed, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(self.example_dictionary, decompressed_version) + + def test_empty_array_custom_object_compression(self): + dictionary_with_empty_array = {'one': {'two': 3, 'four': []}, 'eight': {'nine': {'ten': 11}}} + empty_dictionary_compressed = {'one/two': 3, 'one/four': [], 'eight/nine/ten': 11} + self.custom_object.one.four = [] + compressed_version = self.object_compressor.compress(self.custom_object) + self.assertEqual(empty_dictionary_compressed, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(dictionary_with_empty_array, decompressed_version) + self.custom_object.one.four = [5, 6, 7] + + def test_empty_object_custom_object_compression(self): + self.custom_object.eight.nine = MyCustomObject() + dictionary_with_empty_container = {'one': {'two': 3, 'four': [5, 6, 7]}, 'eight': {'nine': {}}} + empty_object_compressed = {'one/two': 3, 'one/four/0': 5, 'one/four/1': 6, 'one/four/2': 7, 'eight/nine': {}} + compressed_version = self.object_compressor.compress(self.custom_object) + self.assertEqual(empty_object_compressed, compressed_version) + decompressed_version = self.object_compressor.decompress(compressed_version) + self.assertEqual(dictionary_with_empty_container, decompressed_version) + self.custom_object.eight.nine.ten = 11 + + +class MyCustomObject: + def __init__(self): + return + + +if __name__ == '__main__': + unittest.main()