Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 107 additions & 0 deletions compressor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from collections import abc
from typing import Union


class ObjectCompressor:
"""
Note: Tested with Python v 3.7.3
This is a class containing compress and decompress functions as described on https://github.com/Ontraport/Backend-Test
"""
def compress(self, object_to_compress, path_so_far: str = "", dictionary: dict = None) -> dict:
"""
This function compresses a multi-dimensional container of any size (tested with nested Python dictionaries and nested custom classes)
and returns a compressed version of the original container as a Python dictionary
(note that a top level empty Array becomes an empty Array)
"""
if(dictionary is None):
dictionary = {}
# If this is a Collection, it will always be Sized, but we want to handle Mappings slightly different, since they can contain nested objects
if(isinstance(object_to_compress, abc.Collection)):
# If this is a Mapping (e.g. a dictionary), iterate through keys and compress recursively
if(isinstance(object_to_compress, abc.Mapping)):
if(len(object_to_compress) > 0):
for attribute in object_to_compress.keys():
value = object_to_compress.get(attribute)
self.save_or_recurse(path_so_far + "/" + str(attribute), dictionary, value)
else:
if(path_so_far): # only add this object if it wasn't top level
self.save_or_recurse(path_so_far, dictionary, {}, True)
# If this is a Sized Collection (e.g. a List), iterate through elements and compress recursively
elif(isinstance(object_to_compress, abc.Sized)):
if(len(object_to_compress) > 0):
for i in range(len(object_to_compress)):
value = object_to_compress[i]
self.save_or_recurse(path_so_far + "/" + str(i), dictionary, value)
else:
if(path_so_far): # only add this object if it wasn't top level
self.save_or_recurse(path_so_far, dictionary, [], True)
else:
return []
# If this is a custom object, iterate through non-private attributes and compress
else:
filtered_attributes = list(filter(lambda x: not x.startswith('__'), dir(object_to_compress)))
if(len(filtered_attributes) > 0):
for attribute in filtered_attributes:
value = getattr(object_to_compress, str(attribute))
self.save_or_recurse(path_so_far + "/" + str(attribute), dictionary, value)
else:
if(path_so_far): # only add this object if it wasn't top level
self.save_or_recurse(path_so_far, dictionary, {}, True)
return dictionary

def save_or_recurse(self, path_so_far: str, dictionary: dict, value, force_insert: bool = False) -> dict:
"""
This helper function was introduced to minimize duplicate code in the compress(...) method
"""
if(path_so_far.startswith("/")):
path_so_far = path_so_far[1:]
if (not isinstance(value, (float, int, str, bool, type(None))) and force_insert is False):
return self.compress(value, path_so_far, dictionary)
else:
dictionary[path_so_far] = value
return dictionary

def decompress(self, compressed_object: dict) -> Union[dict, list]:
"""
This function expands a compressed container into a dictionary representation of its original form
NOTE: If a custom object was flattened, decompress will still return a dictionary representation
"""
if(isinstance(compressed_object, abc.Collection)):
if(isinstance(compressed_object, abc.Mapping)):
resultObject = {}
keys = compressed_object.keys()
# For each key in the compressed object
for attribute in compressed_object.keys():
pathList = attribute.split("/")
if(pathList[0].isnumeric() and isinstance(resultObject, abc.Mapping)):
resultObject = []
pointer = resultObject
# For each token in the path
for i in range(len(pathList)):
token = pathList[i]
if(isinstance(pointer, abc.MutableSequence)): # We are in a list
if(token.isnumeric()):
if(int(token) < len(pointer)): # Already exists in this list
pointer = pointer[int(token)]
else:
if(i < len(pathList) - 1 and pathList[i+1].isnumeric()):
pointer.append([])
elif(i == len(pathList) - 1):
pointer.append(compressed_object[attribute])
else:
pointer.append({})
pointer = pointer[-1]
else: # We are in a Map
if(i < len(pathList) - 1):
if(pathList[i+1].isnumeric()): # There is an intermediate array
if(token not in pointer):
pointer[token] = []
else:
if(token not in pointer):
pointer[token] = {}
pointer = pointer[token]
else:
pointer[token] = compressed_object[attribute]
return resultObject
elif(isinstance(compressed_object, abc.Sized) and len(compressed_object) == 0):
return compressed_object
146 changes: 146 additions & 0 deletions tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
import unittest
from compressor import ObjectCompressor


class CompressionTests(unittest.TestCase):
"""
This is a class unit tests for compress/decompress functions
They can be run with '<Python3 executable> tests.py'
"""
def setUp(self):
self.object_compressor = ObjectCompressor()
self.example_dictionary = {'one': {'two': 3, 'four': [5, 6, 7]}, 'eight': {'nine': {'ten': 11}}}
self.example_dictionary_compressed = {'one/two': 3, 'one/four/0': 5, 'one/four/1': 6, 'one/four/2': 7, 'eight/nine/ten': 11}

self.custom_object = MyCustomObject()
self.custom_object.one = MyCustomObject()
self.custom_object.one.two = 3
self.custom_object.one.four = [5, 6, 7]
self.custom_object.eight = MyCustomObject()
self.custom_object.eight.nine = MyCustomObject()
self.custom_object.eight.nine.ten = 11

def test_example_case_compression(self):
compressed_version = self.object_compressor.compress(self.example_dictionary)
self.assertEqual(self.example_dictionary_compressed, compressed_version)

def test_example_caseDecompression(self):
decompressed_version = self.object_compressor.decompress(self.example_dictionary_compressed)
self.assertEqual(self.example_dictionary, decompressed_version)

def test_example_case_compression_and_decompression(self):
compressed_version = self.object_compressor.compress(self.example_dictionary)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(self.example_dictionary, decompressed_version)

def test_middle_array_compression(self):
dictionary_with_middle_array = {'one': {'two': 3, 'four': [{'a': 2.3, 'b': None}, 'c']}, 'eight': {'nine': {'ten': 11}}}
middle_array_compressed = {'one/two': 3, 'one/four/0/a': 2.3, 'one/four/0/b': None, 'one/four/1': 'c', 'eight/nine/ten': 11}
compressed_version = self.object_compressor.compress(dictionary_with_middle_array)
self.assertEqual(middle_array_compressed, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(dictionary_with_middle_array, decompressed_version)

def test_top_array_compression(self):
array_object = [{'two': 3}, {'four': [{'a': 2.3, 'b': None}, 'c']}, {'eight': {'nine': {'ten': 11}}}]
top_array_compressed = {'0/two': 3, '1/four/0/a': 2.3, '1/four/0/b': None, '1/four/1': 'c', '2/eight/nine/ten': 11}
compressed_version = self.object_compressor.compress(array_object)
self.assertEqual(top_array_compressed, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(array_object, decompressed_version)

def test_empty_array_compression(self):
dictionary_with_empty_array = {'one': {'two': 3, 'four': []}, 'eight': {'nine': {'ten': 11}}}
empty_dictionary_compressed = {'one/two': 3, 'one/four': [], 'eight/nine/ten': 11}
compressed_version = self.object_compressor.compress(dictionary_with_empty_array)
self.assertEqual(empty_dictionary_compressed, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(dictionary_with_empty_array, decompressed_version)

def test_empty_object_compression(self):
dictionary_with_empty_container = {'one': {'two': 3, 'four': [5, 6, 7]}, 'eight': {'nine': {}}}
empty_dictionary_compressed = {'one/two': 3, 'one/four/0': 5, 'one/four/1': 6, 'one/four/2': 7, 'eight/nine': {}}
compressed_version = self.object_compressor.compress(dictionary_with_empty_container)
self.assertEqual(empty_dictionary_compressed, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(dictionary_with_empty_container, decompressed_version)

def test_string_and_bool_and_float_compression(self):
object_to_test = {'a': {'d': True}, 'b': 'myString', 'c': 21.89}
decompressed_object_to_test = {'a/d': True, 'b': 'myString', 'c': 21.89}
compressed_version = self.object_compressor.compress(object_to_test)
self.assertEqual(decompressed_object_to_test, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(object_to_test, decompressed_version)

def test_empty_top_level_object_compression(self):
compressed_version = self.object_compressor.compress({})
self.assertEqual({}, compressed_version)

def test_single_value_compression(self):
target_object = {'a': 1}
compressed_version = self.object_compressor.compress(target_object)
self.assertEqual({'a': 1}, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(target_object, decompressed_version)

def test_single_value_empty_object_compression(self):
target_object = {'a': {}}
compressed_version = self.object_compressor.compress(target_object)
self.assertEqual({'a': {}}, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(target_object, decompressed_version)

def test_single_value_empty_array_compression(self):
target_object = {'a': []}
compressed_version = self.object_compressor.compress(target_object)
self.assertEqual({'a': []}, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(target_object, decompressed_version)

def test_empty_top_level_array_compression(self):
target_object = []
compressed_version = self.object_compressor.compress([])
self.assertEqual([], compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(target_object, decompressed_version)

def test_single_array_compression(self):
compressed_version = self.object_compressor.compress({'a': [0, 1, 2]})
self.assertEqual({'a/0': 0, 'a/1': 1, 'a/2': 2}, compressed_version)

# These are the example case tests using a custom Python object instead of dictionary/List as a container
def test_custom_object_compression(self):
compressed_version = self.object_compressor.compress(self.custom_object)
self.assertEqual(self.example_dictionary_compressed, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(self.example_dictionary, decompressed_version)

def test_empty_array_custom_object_compression(self):
dictionary_with_empty_array = {'one': {'two': 3, 'four': []}, 'eight': {'nine': {'ten': 11}}}
empty_dictionary_compressed = {'one/two': 3, 'one/four': [], 'eight/nine/ten': 11}
self.custom_object.one.four = []
compressed_version = self.object_compressor.compress(self.custom_object)
self.assertEqual(empty_dictionary_compressed, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(dictionary_with_empty_array, decompressed_version)
self.custom_object.one.four = [5, 6, 7]

def test_empty_object_custom_object_compression(self):
self.custom_object.eight.nine = MyCustomObject()
dictionary_with_empty_container = {'one': {'two': 3, 'four': [5, 6, 7]}, 'eight': {'nine': {}}}
empty_object_compressed = {'one/two': 3, 'one/four/0': 5, 'one/four/1': 6, 'one/four/2': 7, 'eight/nine': {}}
compressed_version = self.object_compressor.compress(self.custom_object)
self.assertEqual(empty_object_compressed, compressed_version)
decompressed_version = self.object_compressor.decompress(compressed_version)
self.assertEqual(dictionary_with_empty_container, decompressed_version)
self.custom_object.eight.nine.ten = 11


class MyCustomObject:
def __init__(self):
return


if __name__ == '__main__':
unittest.main()