Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions perfact/zodbsync/tests/test_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -2329,3 +2329,27 @@ def test_layer_info_datafs(self):
)
self.run('record', '/')
assert getattr(self.app.blob, 'zodbsync_layer', None) is None

def test_fail_when_meta_is_missing(self):
"""
Check that playing back a structure where no layer has a meta file for
a given folder does not work.
"""
root = f'{self.repo.path}/__root__'
os.mkdir(f'{root}/newfolder')
os.mkdir(f'{root}/newobj')
with open(f'{root}/newobj/__source__.py', 'w'):
pass
with pytest.raises(AssertionError):
self.run('playback', '/')

def test_fail_when_meta_missing_layers(self):
"""
Check that playing back a structure where no layer has a meta file for
a given folder does not work (multi-layer).
"""
with self.addlayer() as layer:
os.mkdir(f'{self.repo.path}/__root__/newfolder')
os.mkdir(f'{layer}/workdir/__root__/newfolder')
with pytest.raises(AssertionError):
self.run('playback', '/')
33 changes: 22 additions & 11 deletions perfact/zodbsync/zodbsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def mod_read(obj=None, onerrorstop=False, default_owner=None,

if meta_type not in known_types:
if onerrorstop:
assert False, "Unsupported type: %s" % meta_type
raise AssertionError(f"Unsupported type: {meta_type}")
else:
meta['unsupported'] = meta_type
return meta
Expand Down Expand Up @@ -139,7 +139,8 @@ def mod_write(data, parent=None, obj_id=None, override=False, root=None,
temp_obj = None
# ID exists? Check for type
if obj and obj.meta_type != meta_type:
assert override, "Type mismatch for object " + repr(data)
if not override:
raise AssertionError(f"Type mismatch for object {data!r}")
contents = obj_contents(obj)
if contents:
# Rename so we can cut+paste the children
Expand Down Expand Up @@ -322,10 +323,11 @@ def start_transaction(self, note=''):
# Log in as a manager
uf = self.app.acl_users
user = uf.getUser(self.manager_user).__of__(uf)
assert user is not None, (
'User %s is not available in database. Perhaps you need to set'
' create_manager_user in config.py?' % self.manager_user
)
if user is None:
raise AssertionError(
f'User {self.manager_user} is not available in database.'
' Perhaps you need to set create_manager_user in config.py?'
)

self.logger.info('Using user %s' % self.manager_user)
AccessControl.SecurityManagement.newSecurityManager(None, user)
Expand Down Expand Up @@ -424,7 +426,8 @@ def fs_pathinfo(self, path):
'layeridx': None,
}
path = path.lstrip('/')
children = set()
candidates = set() # subfolders on any layer
children = set() # those with a __meta__ file on a some layer
for idx, layer in enumerate(layers):
fspath = os.path.join(layer['workdir'], self.site, path)
if not os.path.isdir(fspath):
Expand All @@ -437,8 +440,14 @@ def fs_pathinfo(self, path):
for entry in os.listdir(fspath):
if entry in children or entry.startswith('__'):
continue
candidates.add(entry)
if os.path.exists(os.path.join(fspath, entry, '__meta__')):
children.add(entry)
missing = candidates - children
if missing:
raise AssertionError(
f"No __meta__ file on any layer: {path}/{children}"
)

result['children'] = sorted(children)
return result
Expand Down Expand Up @@ -632,11 +641,13 @@ def fs_parse(self, fspath, data=None):
if data is None:
data = self.fs_read(fspath)

assert 'meta' in data, 'Missing meta file: ' + fspath
if 'meta' not in data:
raise AssertionError(f"Missing meta file: {fspath}")
src_fnames = data.get('src_fnames', [])
assert len(src_fnames) <= 1, (
"Multiple source files in " + fspath
)
if len(src_fnames) > 1:
raise AssertionError(
f"Multiple source files in {fspath}"
)
result = dict(literal_eval(data['meta']))
if src_fnames:
src_fname = src_fnames[0]
Expand Down