Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/buildstream/_assetcache.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ def push_directory(
# to establish a connection to this remote at initialization time.
#
class RemotePair:
def __init__(self, casd: CASDProcessManager, cas: CASCache, spec: RemoteSpec):
def __init__(self, casd: CASDProcessManager, spec: RemoteSpec):
self.index: Optional[AssetRemote] = None
self.storage: Optional[CASRemote] = None
self.error: Optional[str] = None
Expand All @@ -275,7 +275,7 @@ def __init__(self, casd: CASDProcessManager, cas: CASCache, spec: RemoteSpec):
index.check()
self.index = index
if spec.remote_type in [RemoteType.STORAGE, RemoteType.ALL]:
storage = CASRemote(spec, cas)
storage = CASRemote(spec, casd)
storage.check()
self.storage = storage
except RemoteError as e:
Expand Down Expand Up @@ -322,7 +322,7 @@ def setup_remotes(self, specs: Iterable[RemoteSpec], project_specs: Dict[str, Li
if spec in self._remotes:
continue

remote = RemotePair(casd, self.cas, spec)
remote = RemotePair(casd, spec)
if remote.error:
self.context.messenger.warn("Failed to initialize remote {}: {}".format(spec.url, remote.error))

Expand Down
45 changes: 9 additions & 36 deletions src/buildstream/_cas/cascache.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,9 @@ def __init__(self, path, *, casd, remote_cache=False):
else:
assert not self._remote_cache

self._default_remote = CASRemote(None, self)
self._default_remote = CASRemote(None, casd)
self._default_remote.init()

# get_cas():
#
# Return ContentAddressableStorage stub for buildbox-casd channel.
#
def get_cas(self):
assert self._casd, "CASCache was created without buildbox-casd"
return self._casd.get_cas()

# get_local_cas():
#
# Return LocalCAS stub for buildbox-casd channel.
#
def get_local_cas(self):
assert self._casd, "CASCache was created without buildbox-casd"
return self._casd.get_local_cas()

# preflight():
#
# Preflight check.
Expand Down Expand Up @@ -133,7 +117,7 @@ def contains_files(self, digests):
# Returns: True if the directory is available in the local cache
#
def contains_directory(self, digest):
local_cas = self.get_local_cas()
local_cas = self._casd.get_local_cas()

# Without a remote cache, `FetchTree` simply checks the local cache.
request = local_cas_pb2.FetchTreeRequest()
Expand Down Expand Up @@ -241,7 +225,7 @@ def checkout(self, dest, tree, *, can_link=False, _fetch=True):
#
def ensure_tree(self, tree):
if self._remote_cache:
local_cas = self.get_local_cas()
local_cas = self._casd.get_local_cas()

request = local_cas_pb2.FetchTreeRequest()
request.root_digest.CopyFrom(tree)
Expand All @@ -260,7 +244,7 @@ def ensure_tree(self, tree):
# dir_digest (Digest): Digest object for the directory to fetch.
#
def fetch_directory(self, remote, dir_digest):
local_cas = self.get_local_cas()
local_cas = self._casd.get_local_cas()

request = local_cas_pb2.FetchTreeRequest()
request.instance_name = remote.local_cas_instance_name
Expand Down Expand Up @@ -399,7 +383,7 @@ def add_objects(self, *, paths=None, buffers=None, instance_name=None):
for path in paths:
request.path.append(path)

local_cas = self.get_local_cas()
local_cas = self._casd.get_local_cas()

response = local_cas.CaptureFiles(request)

Expand Down Expand Up @@ -432,7 +416,7 @@ def add_objects(self, *, paths=None, buffers=None, instance_name=None):
# (Digest): The digest of the imported directory
#
def import_directory(self, path: str, properties: Optional[List[str]] = None) -> SourceRef:
local_cas = self.get_local_cas()
local_cas = self._casd.get_local_cas()

request = local_cas_pb2.CaptureTreeRequest()
request.path.append(path)
Expand Down Expand Up @@ -478,7 +462,7 @@ def import_directory(self, path: str, properties: Optional[List[str]] = None) ->
#
@contextlib.contextmanager
def stage_directory(self, directory_digest):
local_cas = self.get_local_cas()
local_cas = self._casd.get_local_cas()

request = local_cas_pb2.StageTreeRequest()
request.root_digest.CopyFrom(directory_digest)
Expand Down Expand Up @@ -535,7 +519,7 @@ def missing_blobs_for_directory(self, digest, *, remote=None):
# Returns: List of missing Digest objects
#
def missing_blobs(self, blobs, *, remote=None):
cas = self.get_cas()
cas = self._casd.get_cas()

if remote:
instance_name = remote.local_cas_instance_name
Expand Down Expand Up @@ -576,7 +560,7 @@ def required_blobs_for_directory(self, directory_digest, *, excluded_subdirs=Non

if self._remote_cache and _fetch_tree:
# Ensure we have the directory protos in the local cache
local_cas = self.get_local_cas()
local_cas = self._casd.get_local_cas()

request = local_cas_pb2.FetchTreeRequest()
request.root_digest.CopyFrom(directory_digest)
Expand Down Expand Up @@ -719,17 +703,6 @@ def _send_directory(self, remote, digest):
def get_cache_usage(self):
return self._cache_usage_monitor.get_cache_usage()

# get_casd()
#
# Get the underlying buildbox-casd process
#
# Returns:
# (subprocess.Process): The casd process that is used for the current cascache
#
def get_casd(self):
assert self._casd is not None, "Only call this with a running buildbox-casd process"
return self._casd


# _CASCacheUsage
#
Expand Down
28 changes: 5 additions & 23 deletions src/buildstream/_cas/casremote.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ def __init__(self, blob, msg):
# Represents a single remote CAS cache.
#
class CASRemote(BaseRemote):
def __init__(self, spec, cascache, **kwargs):
def __init__(self, spec, casd, **kwargs):
super().__init__(spec, **kwargs)

self.cascache = cascache
self.casd = casd
self.local_cas_instance_name = None

# check_remote
Expand All @@ -55,30 +55,12 @@ def _configure_protocols(self):
self.local_cas_instance_name = ""
return

local_cas = self.cascache.get_local_cas()
local_cas = self.casd.get_local_cas()
request = local_cas_pb2.GetInstanceNameForRemotesRequest()
self.spec.to_localcas_remote(request.content_addressable_storage)
response = local_cas.GetInstanceNameForRemotes(request)
self.local_cas_instance_name = response.instance_name

# push_message():
#
# Push the given protobuf message to a remote.
#
# Args:
# message (Message): A protobuf message to push.
#
# Raises:
# (CASRemoteError): if there was an error
#
def push_message(self, message):

message_buffer = message.SerializeToString()

self.init()

return self.cascache.add_object(buffer=message_buffer, instance_name=self.local_cas_instance_name)


# Represents a batch of blobs queued for fetching.
#
Expand Down Expand Up @@ -107,7 +89,7 @@ def send(self, *, missing_blobs=None):
if not self._requests:
return

local_cas = self._remote.cascache.get_local_cas()
local_cas = self._remote.casd.get_local_cas()

for request in self._requests:
batch_response = local_cas.FetchMissingBlobs(request)
Expand Down Expand Up @@ -161,7 +143,7 @@ def send(self):
if not self._requests:
return

local_cas = self._remote.cascache.get_local_cas()
local_cas = self._remote.casd.get_local_cas()

for request in self._requests:
batch_response = local_cas.UploadMissingBlobs(request)
Expand Down
2 changes: 1 addition & 1 deletion src/buildstream/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1810,7 +1810,7 @@ def _run(self, *, announce_session: bool = False):
self._session_start_callback()

self._running = True
status = self._scheduler.run(self.queues, self._context.get_cascache().get_casd())
status = self._scheduler.run(self.queues, self._context.get_casd())
self._running = False

if status == SchedStatus.ERROR:
Expand Down
13 changes: 6 additions & 7 deletions src/buildstream/sandbox/_reremote.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@


class RERemote(CASRemote):
def __init__(self, cas_spec, remote_execution_specs, cascache):
super().__init__(cas_spec, cascache)
def __init__(self, cas_spec, remote_execution_specs, casd):
super().__init__(cas_spec, casd)

self.remote_execution_specs = remote_execution_specs
self.exec_service = None
self.operations_service = None
self.ac_service = None

def _configure_protocols(self):
local_cas = self.cascache.get_local_cas()
local_cas = self.casd.get_local_cas()
request = local_cas_pb2.GetInstanceNameForRemotesRequest()
if self.remote_execution_specs.storage_spec:
self.remote_execution_specs.storage_spec.to_localcas_remote(request.content_addressable_storage)
Expand All @@ -50,10 +50,9 @@ def _configure_protocols(self):
response = local_cas.GetInstanceNameForRemotes(request)
self.local_cas_instance_name = response.instance_name

casd = self.cascache.get_casd()
self.exec_service = casd.get_exec_service()
self.operations_service = casd.get_operations_service()
self.ac_service = casd.get_ac_service()
self.exec_service = self.casd.get_exec_service()
self.operations_service = self.casd.get_operations_service()
self.ac_service = self.casd.get_ac_service()

def _check(self):
super()._check()
Expand Down
7 changes: 3 additions & 4 deletions src/buildstream/sandbox/_sandboxbuildboxrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

context = self._get_context()
cascache = context.get_cascache()
casd = context.get_casd()

re_specs = context.remote_execution_specs
if re_specs and re_specs.action_spec:
self.re_remote = RERemote(context.remote_cache_spec, re_specs, cascache)
self.re_remote = RERemote(context.remote_cache_spec, re_specs, casd)
try:
self.re_remote.init()
self.re_remote.check()
Expand Down Expand Up @@ -110,8 +110,7 @@ def _execute_action(self, action, flags):
stdout, stderr = self._get_output()

context = self._get_context()
cascache = context.get_cascache()
casd = cascache.get_casd()
casd = context.get_casd()
config = self._get_config()

if config.remote_apis_socket_path and context.remote_cache_spec and not self.re_remote:
Expand Down
4 changes: 2 additions & 2 deletions src/buildstream/sandbox/_sandboxremote.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

context = self._get_context()
cascache = context.get_cascache()
casd = context.get_casd()

specs = context.remote_execution_specs
if specs is None or specs.exec_spec is None:
Expand All @@ -48,7 +48,7 @@ def __init__(self, *args, **kwargs):
self.action_spec = specs.action_spec
self.operation_name = None

self.re_remote = RERemote(context.remote_cache_spec, specs, cascache)
self.re_remote = RERemote(context.remote_cache_spec, specs, casd)
try:
self.re_remote.init()
except grpc.RpcError as e:
Expand Down
7 changes: 6 additions & 1 deletion tests/artifactcache/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,12 @@ def test_pull_tree(cli, tmpdir, datafiles):
# Push the Tree as a regular message
_, remotes = artifactcache.get_remotes(project.name, True)
assert len(remotes) == 1
tree_digest = remotes[0].push_message(tree)

remotes[0].init()
tree_digest = cas.add_object(
buffer=tree.SerializeToString(), instance_name=remotes[0].local_cas_instance_name
)

tree_hash, tree_size = tree_digest.hash, tree_digest.size_bytes
assert tree_hash and tree_size

Expand Down
56 changes: 0 additions & 56 deletions tests/artifactcache/push.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

from buildstream import _yaml
from buildstream._project import Project
from buildstream._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
from buildstream._testing import cli # pylint: disable=unused-import

from tests.testutils import create_artifact_share, create_split_share, dummy_context
Expand Down Expand Up @@ -131,58 +130,3 @@ def test_push_split(cli, tmpdir, datafiles):
cli.get_artifact_name(project_dir, "test", "target.bst", cache_key=element_key)
)
assert storage.get_cas_files(proto) is not None


@pytest.mark.datafiles(DATA_DIR)
def test_push_message(tmpdir, datafiles):
project_dir = str(datafiles)

# Set up an artifact cache.
artifactshare = os.path.join(str(tmpdir), "artifactshare")
with create_artifact_share(artifactshare) as share:
# Configure artifact share
rootcache_dir = os.path.join(str(tmpdir), "cache")
user_config_file = str(tmpdir.join("buildstream.conf"))
user_config = {
"scheduler": {"pushers": 1},
"artifacts": {
"servers": [
{
"url": share.repo,
"push": True,
}
]
},
"cachedir": rootcache_dir,
}

# Write down the user configuration file
_yaml.roundtrip_dump(user_config, file=user_config_file)

with dummy_context(config=user_config_file) as context:
# Load the project manually
project = Project(project_dir, context)
project.ensure_fully_loaded()

# Create a local artifact cache handle
artifactcache = context.artifactcache

# Initialize remotes
context.initialize_remotes(True, True, None, None)
assert artifactcache.has_push_remotes()

command = remote_execution_pb2.Command(
arguments=["/usr/bin/gcc", "--help"],
working_directory="/buildstream-build",
output_directories=["/buildstream-install"],
)

# Push the message object
_, remotes = artifactcache.get_remotes(project.name, True)
assert len(remotes) == 1
command_digest = remotes[0].push_message(command)
message_hash, message_size = command_digest.hash, command_digest.size_bytes

assert message_hash and message_size
message_digest = remote_execution_pb2.Digest(hash=message_hash, size_bytes=message_size)
assert share.has_object(message_digest)
Loading