diff --git a/src/buildstream/_assetcache.py b/src/buildstream/_assetcache.py index df70fc2ff..9406b9cf9 100644 --- a/src/buildstream/_assetcache.py +++ b/src/buildstream/_assetcache.py @@ -264,7 +264,7 @@ def push_directory( # to establish a connection to this remote at initialization time. # class RemotePair: - def __init__(self, casd: CASDProcessManager, cas: CASCache, spec: RemoteSpec): + def __init__(self, casd: CASDProcessManager, spec: RemoteSpec): self.index: Optional[AssetRemote] = None self.storage: Optional[CASRemote] = None self.error: Optional[str] = None @@ -275,7 +275,7 @@ def __init__(self, casd: CASDProcessManager, cas: CASCache, spec: RemoteSpec): index.check() self.index = index if spec.remote_type in [RemoteType.STORAGE, RemoteType.ALL]: - storage = CASRemote(spec, cas) + storage = CASRemote(spec, casd) storage.check() self.storage = storage except RemoteError as e: @@ -322,7 +322,7 @@ def setup_remotes(self, specs: Iterable[RemoteSpec], project_specs: Dict[str, Li if spec in self._remotes: continue - remote = RemotePair(casd, self.cas, spec) + remote = RemotePair(casd, spec) if remote.error: self.context.messenger.warn("Failed to initialize remote {}: {}".format(spec.url, remote.error)) diff --git a/src/buildstream/_cas/cascache.py b/src/buildstream/_cas/cascache.py index fbfc150f3..68fd4b610 100644 --- a/src/buildstream/_cas/cascache.py +++ b/src/buildstream/_cas/cascache.py @@ -71,25 +71,9 @@ def __init__(self, path, *, casd, remote_cache=False): else: assert not self._remote_cache - self._default_remote = CASRemote(None, self) + self._default_remote = CASRemote(None, casd) self._default_remote.init() - # get_cas(): - # - # Return ContentAddressableStorage stub for buildbox-casd channel. - # - def get_cas(self): - assert self._casd, "CASCache was created without buildbox-casd" - return self._casd.get_cas() - - # get_local_cas(): - # - # Return LocalCAS stub for buildbox-casd channel. - # - def get_local_cas(self): - assert self._casd, "CASCache was created without buildbox-casd" - return self._casd.get_local_cas() - # preflight(): # # Preflight check. @@ -133,7 +117,7 @@ def contains_files(self, digests): # Returns: True if the directory is available in the local cache # def contains_directory(self, digest): - local_cas = self.get_local_cas() + local_cas = self._casd.get_local_cas() # Without a remote cache, `FetchTree` simply checks the local cache. request = local_cas_pb2.FetchTreeRequest() @@ -241,7 +225,7 @@ def checkout(self, dest, tree, *, can_link=False, _fetch=True): # def ensure_tree(self, tree): if self._remote_cache: - local_cas = self.get_local_cas() + local_cas = self._casd.get_local_cas() request = local_cas_pb2.FetchTreeRequest() request.root_digest.CopyFrom(tree) @@ -260,7 +244,7 @@ def ensure_tree(self, tree): # dir_digest (Digest): Digest object for the directory to fetch. # def fetch_directory(self, remote, dir_digest): - local_cas = self.get_local_cas() + local_cas = self._casd.get_local_cas() request = local_cas_pb2.FetchTreeRequest() request.instance_name = remote.local_cas_instance_name @@ -399,7 +383,7 @@ def add_objects(self, *, paths=None, buffers=None, instance_name=None): for path in paths: request.path.append(path) - local_cas = self.get_local_cas() + local_cas = self._casd.get_local_cas() response = local_cas.CaptureFiles(request) @@ -432,7 +416,7 @@ def add_objects(self, *, paths=None, buffers=None, instance_name=None): # (Digest): The digest of the imported directory # def import_directory(self, path: str, properties: Optional[List[str]] = None) -> SourceRef: - local_cas = self.get_local_cas() + local_cas = self._casd.get_local_cas() request = local_cas_pb2.CaptureTreeRequest() request.path.append(path) @@ -478,7 +462,7 @@ def import_directory(self, path: str, properties: Optional[List[str]] = None) -> # @contextlib.contextmanager def stage_directory(self, directory_digest): - local_cas = self.get_local_cas() + local_cas = self._casd.get_local_cas() request = local_cas_pb2.StageTreeRequest() request.root_digest.CopyFrom(directory_digest) @@ -535,7 +519,7 @@ def missing_blobs_for_directory(self, digest, *, remote=None): # Returns: List of missing Digest objects # def missing_blobs(self, blobs, *, remote=None): - cas = self.get_cas() + cas = self._casd.get_cas() if remote: instance_name = remote.local_cas_instance_name @@ -576,7 +560,7 @@ def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=Non if self._remote_cache and _fetch_tree: # Ensure we have the directory protos in the local cache - local_cas = self.get_local_cas() + local_cas = self._casd.get_local_cas() request = local_cas_pb2.FetchTreeRequest() request.root_digest.CopyFrom(directory_digest) @@ -719,17 +703,6 @@ def _send_directory(self, remote, digest): def get_cache_usage(self): return self._cache_usage_monitor.get_cache_usage() - # get_casd() - # - # Get the underlying buildbox-casd process - # - # Returns: - # (subprocess.Process): The casd process that is used for the current cascache - # - def get_casd(self): - assert self._casd is not None, "Only call this with a running buildbox-casd process" - return self._casd - # _CASCacheUsage # diff --git a/src/buildstream/_cas/casremote.py b/src/buildstream/_cas/casremote.py index ae4aa9004..3fefb9d47 100644 --- a/src/buildstream/_cas/casremote.py +++ b/src/buildstream/_cas/casremote.py @@ -37,10 +37,10 @@ def __init__(self, blob, msg): # Represents a single remote CAS cache. # class CASRemote(BaseRemote): - def __init__(self, spec, cascache, **kwargs): + def __init__(self, spec, casd, **kwargs): super().__init__(spec, **kwargs) - self.cascache = cascache + self.casd = casd self.local_cas_instance_name = None # check_remote @@ -55,30 +55,12 @@ def _configure_protocols(self): self.local_cas_instance_name = "" return - local_cas = self.cascache.get_local_cas() + local_cas = self.casd.get_local_cas() request = local_cas_pb2.GetInstanceNameForRemotesRequest() self.spec.to_localcas_remote(request.content_addressable_storage) response = local_cas.GetInstanceNameForRemotes(request) self.local_cas_instance_name = response.instance_name - # push_message(): - # - # Push the given protobuf message to a remote. - # - # Args: - # message (Message): A protobuf message to push. - # - # Raises: - # (CASRemoteError): if there was an error - # - def push_message(self, message): - - message_buffer = message.SerializeToString() - - self.init() - - return self.cascache.add_object(buffer=message_buffer, instance_name=self.local_cas_instance_name) - # Represents a batch of blobs queued for fetching. # @@ -107,7 +89,7 @@ def send(self, *, missing_blobs=None): if not self._requests: return - local_cas = self._remote.cascache.get_local_cas() + local_cas = self._remote.casd.get_local_cas() for request in self._requests: batch_response = local_cas.FetchMissingBlobs(request) @@ -161,7 +143,7 @@ def send(self): if not self._requests: return - local_cas = self._remote.cascache.get_local_cas() + local_cas = self._remote.casd.get_local_cas() for request in self._requests: batch_response = local_cas.UploadMissingBlobs(request) diff --git a/src/buildstream/_stream.py b/src/buildstream/_stream.py index da8d48550..5b7de307e 100644 --- a/src/buildstream/_stream.py +++ b/src/buildstream/_stream.py @@ -1810,7 +1810,7 @@ def _run(self, *, announce_session: bool = False): self._session_start_callback() self._running = True - status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd()) + status = self._scheduler.run(self.queues, self._context.get_casd()) self._running = False if status == SchedStatus.ERROR: diff --git a/src/buildstream/sandbox/_reremote.py b/src/buildstream/sandbox/_reremote.py index 79853fa34..923a93efd 100644 --- a/src/buildstream/sandbox/_reremote.py +++ b/src/buildstream/sandbox/_reremote.py @@ -22,8 +22,8 @@ class RERemote(CASRemote): - def __init__(self, cas_spec, remote_execution_specs, cascache): - super().__init__(cas_spec, cascache) + def __init__(self, cas_spec, remote_execution_specs, casd): + super().__init__(cas_spec, casd) self.remote_execution_specs = remote_execution_specs self.exec_service = None @@ -31,7 +31,7 @@ def __init__(self, cas_spec, remote_execution_specs, cascache): self.ac_service = None def _configure_protocols(self): - local_cas = self.cascache.get_local_cas() + local_cas = self.casd.get_local_cas() request = local_cas_pb2.GetInstanceNameForRemotesRequest() if self.remote_execution_specs.storage_spec: self.remote_execution_specs.storage_spec.to_localcas_remote(request.content_addressable_storage) @@ -50,10 +50,9 @@ def _configure_protocols(self): response = local_cas.GetInstanceNameForRemotes(request) self.local_cas_instance_name = response.instance_name - casd = self.cascache.get_casd() - self.exec_service = casd.get_exec_service() - self.operations_service = casd.get_operations_service() - self.ac_service = casd.get_ac_service() + self.exec_service = self.casd.get_exec_service() + self.operations_service = self.casd.get_operations_service() + self.ac_service = self.casd.get_ac_service() def _check(self): super()._check() diff --git a/src/buildstream/sandbox/_sandboxbuildboxrun.py b/src/buildstream/sandbox/_sandboxbuildboxrun.py index c87642f6b..007ae95b4 100644 --- a/src/buildstream/sandbox/_sandboxbuildboxrun.py +++ b/src/buildstream/sandbox/_sandboxbuildboxrun.py @@ -38,11 +38,11 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) context = self._get_context() - cascache = context.get_cascache() + casd = context.get_casd() re_specs = context.remote_execution_specs if re_specs and re_specs.action_spec: - self.re_remote = RERemote(context.remote_cache_spec, re_specs, cascache) + self.re_remote = RERemote(context.remote_cache_spec, re_specs, casd) try: self.re_remote.init() self.re_remote.check() @@ -110,8 +110,7 @@ def _execute_action(self, action, flags): stdout, stderr = self._get_output() context = self._get_context() - cascache = context.get_cascache() - casd = cascache.get_casd() + casd = context.get_casd() config = self._get_config() if config.remote_apis_socket_path and context.remote_cache_spec and not self.re_remote: diff --git a/src/buildstream/sandbox/_sandboxremote.py b/src/buildstream/sandbox/_sandboxremote.py index 8072938bf..28d6fcb32 100644 --- a/src/buildstream/sandbox/_sandboxremote.py +++ b/src/buildstream/sandbox/_sandboxremote.py @@ -37,7 +37,7 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) context = self._get_context() - cascache = context.get_cascache() + casd = context.get_casd() specs = context.remote_execution_specs if specs is None or specs.exec_spec is None: @@ -48,7 +48,7 @@ def __init__(self, *args, **kwargs): self.action_spec = specs.action_spec self.operation_name = None - self.re_remote = RERemote(context.remote_cache_spec, specs, cascache) + self.re_remote = RERemote(context.remote_cache_spec, specs, casd) try: self.re_remote.init() except grpc.RpcError as e: diff --git a/tests/artifactcache/pull.py b/tests/artifactcache/pull.py index 114957b85..f1e0751b5 100644 --- a/tests/artifactcache/pull.py +++ b/tests/artifactcache/pull.py @@ -195,7 +195,12 @@ def test_pull_tree(cli, tmpdir, datafiles): # Push the Tree as a regular message _, remotes = artifactcache.get_remotes(project.name, True) assert len(remotes) == 1 - tree_digest = remotes[0].push_message(tree) + + remotes[0].init() + tree_digest = cas.add_object( + buffer=tree.SerializeToString(), instance_name=remotes[0].local_cas_instance_name + ) + tree_hash, tree_size = tree_digest.hash, tree_digest.size_bytes assert tree_hash and tree_size diff --git a/tests/artifactcache/push.py b/tests/artifactcache/push.py index 7c2160f6e..4594ae6e8 100644 --- a/tests/artifactcache/push.py +++ b/tests/artifactcache/push.py @@ -21,7 +21,6 @@ from buildstream import _yaml from buildstream._project import Project -from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 from buildstream._testing import cli # pylint: disable=unused-import from tests.testutils import create_artifact_share, create_split_share, dummy_context @@ -131,58 +130,3 @@ def test_push_split(cli, tmpdir, datafiles): cli.get_artifact_name(project_dir, "test", "target.bst", cache_key=element_key) ) assert storage.get_cas_files(proto) is not None - - -@pytest.mark.datafiles(DATA_DIR) -def test_push_message(tmpdir, datafiles): - project_dir = str(datafiles) - - # Set up an artifact cache. - artifactshare = os.path.join(str(tmpdir), "artifactshare") - with create_artifact_share(artifactshare) as share: - # Configure artifact share - rootcache_dir = os.path.join(str(tmpdir), "cache") - user_config_file = str(tmpdir.join("buildstream.conf")) - user_config = { - "scheduler": {"pushers": 1}, - "artifacts": { - "servers": [ - { - "url": share.repo, - "push": True, - } - ] - }, - "cachedir": rootcache_dir, - } - - # Write down the user configuration file - _yaml.roundtrip_dump(user_config, file=user_config_file) - - with dummy_context(config=user_config_file) as context: - # Load the project manually - project = Project(project_dir, context) - project.ensure_fully_loaded() - - # Create a local artifact cache handle - artifactcache = context.artifactcache - - # Initialize remotes - context.initialize_remotes(True, True, None, None) - assert artifactcache.has_push_remotes() - - command = remote_execution_pb2.Command( - arguments=["/usr/bin/gcc", "--help"], - working_directory="/buildstream-build", - output_directories=["/buildstream-install"], - ) - - # Push the message object - _, remotes = artifactcache.get_remotes(project.name, True) - assert len(remotes) == 1 - command_digest = remotes[0].push_message(command) - message_hash, message_size = command_digest.hash, command_digest.size_bytes - - assert message_hash and message_size - message_digest = remote_execution_pb2.Digest(hash=message_hash, size_bytes=message_size) - assert share.has_object(message_digest)