diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 3483135c..607d0882 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -2,7 +2,7 @@ FROM mcr.microsoft.com/devcontainers/miniconda:0-3 # commenting out mamba install, is given an error, see: # https://github.com/conda/conda-libmamba-solver/issues/540 -RUN conda install -n base -c conda-forge mamba +RUN conda install -n base -c conda-forge # Copy environment.yml (if found) to a temp location so we update the environment. Also # copy "noop.txt" so the COPY instruction does not fail if no environment.yml exists. diff --git a/hsds/basenode.py b/hsds/basenode.py index c30bfd4a..69a00531 100644 --- a/hsds/basenode.py +++ b/hsds/basenode.py @@ -236,6 +236,8 @@ async def docker_update_dn_info(app): log.error("HEAD node seems to be down.") app["dn_urls"] = [] app["dn_ids"] = [] + except HTTPServiceUnavailable: + log.warn("Head ServiceUnavailable") except OSError: log.error("failed to register") app["dn_urls"] = [] diff --git a/hsds/datanode_lib.py b/hsds/datanode_lib.py index 08ecc52a..34fa832b 100644 --- a/hsds/datanode_lib.py +++ b/hsds/datanode_lib.py @@ -149,8 +149,7 @@ async def write_s3_obj(app, obj_id, bucket=None): bucket = domain_bucket if obj_id in pending_s3_write: - msg = f"write_s3_key - not expected for key {obj_id} to be in " - msg += "pending_s3_write map" + msg = f"write_s3_key - not expected for key {obj_id} to be in pending_s3_write map" log.error(msg) raise KeyError(msg) @@ -172,12 +171,10 @@ async def write_s3_obj(app, obj_id, bucket=None): # timestamp is first element of two-tuple last_update_time = dirty_ids[obj_id][0] else: - msg = f"write_s3_obj - {obj_id} not in dirty_ids, " - msg += "assuming flush write" + msg = f"write_s3_obj - {obj_id} not in dirty_ids, assuming flush write" log.debug(msg) if last_update_time > now: - msg = f"last_update time {last_update_time} is in the future for " - msg += f"obj_id: {obj_id}" + msg = f"last_update time {last_update_time} is in the future for obj_id: {obj_id}" log.error(msg) raise ValueError(msg) @@ -198,8 +195,7 @@ async def write_s3_obj(app, obj_id, bucket=None): dset_id = getDatasetId(obj_id) if dset_id in filter_map: filter_ops = filter_map[dset_id] - msg = f"write_s3_obj: got filter_op: {filter_ops} " - msg += f"for dset: {dset_id}" + msg = f"write_s3_obj: got filter_op: {filter_ops} for dset: {dset_id}" log.debug(msg) else: filter_ops = None @@ -237,13 +233,11 @@ async def write_s3_obj(app, obj_id, bucket=None): # meta data update # check for object in meta cache if obj_id not in meta_cache: - msg = f"write_s3_obj: expected to find obj_id: {obj_id} " - msg += "in meta cache" + msg = f"write_s3_obj: expected to find obj_id: {obj_id} in meta cache" log.error(msg) raise KeyError(f"{obj_id} not found in meta cache") if not meta_cache.isDirty(obj_id): - msg = f"write_s3_obj: expected meta cache obj {obj_id} " - msg == "to be dirty" + msg = f"write_s3_obj: expected meta cache obj {obj_id} to be dirty" log.error(msg) raise ValueError("bad dirty state for obj") obj_json = meta_cache[obj_id] @@ -264,8 +258,7 @@ async def write_s3_obj(app, obj_id, bucket=None): else: timestamp = 0 if timestamp > last_update_time: - msg = f"write_s3_obj: {obj_id} got updated while s3 " - msg += "write was in progress" + msg = f"write_s3_obj: {obj_id} got updated while s3 write was in progress" log.info(msg) else: log.debug(f"write_s3obj: clear dirty for {obj_id} ") @@ -279,11 +272,10 @@ async def write_s3_obj(app, obj_id, bucket=None): finally: # clear pending_s3_write item - log.debug(f"write_s3_obj finally block, success={success}") + log.debug(f"write_s3_obj {obj_id} finally block, success={success}") if obj_id in pending_s3_write: if pending_s3_write[obj_id] != now: - msg = "pending_s3_write timestamp got updated unexpectedly " - msg += f"for {obj_id}" + msg = f"pending_s3_write timestamp got updated unexpectedly for {obj_id}" log.error(msg) del pending_s3_write[obj_id] # clear task @@ -1259,10 +1251,9 @@ def callback(future): if obj_id in pending_s3_write: pending_time = s3sync_start - pending_s3_write[obj_id] - msg = f"s3sync - key {obj_id} has been pending for " - msg += f"{pending_time:.3f}" + msg = f"s3sync - key {obj_id} has been pending for {pending_time:.3f}" log.debug(msg) - if s3sync_start - pending_s3_write[obj_id] > s3_sync_task_timeout: + if pending_time > s3_sync_task_timeout: msg = f"s3sync - obj {obj_id} has been in pending_s3_write " msg += f"for {pending_time:.3f} seconds, restarting" log.warn(msg) diff --git a/hsds/headnode.py b/hsds/headnode.py index 9b49517d..55547294 100755 --- a/hsds/headnode.py +++ b/hsds/headnode.py @@ -103,6 +103,8 @@ def is_healthy(self): async def isClusterReady(app): sn_count = 0 dn_count = 0 + active_sn_ids = app["active_sn_ids"] + active_dn_ids = app["active_dn_ids"] target_sn_count = await getTargetNodeCount(app, "sn") target_dn_count = await getTargetNodeCount(app, "dn") last_create_time = None @@ -115,9 +117,11 @@ async def isClusterReady(app): if last_create_time is None or node.create_time > last_create_time: last_create_time = node.create_time if node.type == "sn": - sn_count += 1 + if node_id in active_sn_ids: + sn_count += 1 else: - dn_count += 1 + if node_id in active_dn_ids: + dn_count += 1 if sn_count == 0 or dn_count == 0: log.debug("no nodes, cluster not ready") return False @@ -171,6 +175,20 @@ async def info(request): return resp +def getNodeUrls(nodes, node_ids): + """ return a list of node urls for the given set of node ids """ + + node_urls = [] + for node_id in node_ids: + if node_id: + node = nodes[node_id] + node_url = f"http://{node.host}:{node.port}" + node_urls.append(node_url) + else: + node_urls.append(None) + return node_urls + + async def register(request): """HTTP method for nodes to register with head node""" app = request.app @@ -208,7 +226,7 @@ async def register(request): log.debug("register - get ip/port from request.transport") peername = request.transport.get_extra_info("peername") if peername is None: - msg = "Can not determine caller IP" + msg = "Cannot determine caller IP" log.error(msg) raise HTTPBadRequest(reason=msg) if peername[0] is None or peername[0] in ("::1", "127.0.0.1"): @@ -255,10 +273,34 @@ async def register(request): node_host=node_host, node_port=node_port, ) - # delete any existing node with the same port + # delete any existing node with the same port and IP removeNode(app, host=node_host, port=node_port) nodes[node_id] = node + # add to the active list if there's an open slot + if node_type == "sn": + active_list = app["active_sn_ids"] + else: + active_list = app["active_dn_ids"] + + tgt_count = len(active_list) + active_count = sum(id is not None for id in active_list) + if tgt_count == active_count: + # all the slots are filled, see if there is any unhealthy node + # and remove that + for i in range(len(active_list)): + id = active_list[i] + node = nodes[id] + if not node.is_healthy(): + active_list[i] = None # clear the slot + break + + for i in range(len(active_list)): + if not active_list[i]: + log.info(f"Node {node_id} added to {node_type} active list in slot: {i}") + active_list[i] = node_id + break + resp = StreamResponse() resp.headers["Content-Type"] = "application/json" answer = {} @@ -267,38 +309,14 @@ async def register(request): answer["cluster_state"] = "READY" else: answer["cluster_state"] = "WAITING" - sn_urls = [] - dn_urls = [] - sn_ids = [] - dn_ids = [] - for node_id in nodes: - node = nodes[node_id] - if not node.is_healthy(): - continue - node_url = f"http://{node.host}:{node.port}" - if node.type == "sn": - sn_urls.append(node_url) - sn_ids.append(node_id) - else: - dn_urls.append(node_url) - dn_ids.append(node_id) - # sort dn_urls so node number can be determined - dn_id_map = {} - for i in range(len(dn_urls)): - dn_url = dn_urls[i] - dn_id = dn_ids[i] - dn_id_map[dn_url] = dn_id - - dn_urls.sort() - dn_ids = [] # re-arrange to match url order - for dn_url in dn_urls: - dn_ids.append(dn_id_map[dn_url]) + sn_urls = getNodeUrls(nodes, app["active_sn_ids"]) + dn_urls = getNodeUrls(nodes, app["active_dn_ids"]) + answer["sn_ids"] = app["active_sn_ids"] answer["sn_urls"] = sn_urls + answer["dn_ids"] = app["active_dn_ids"] answer["dn_urls"] = dn_urls - answer["sn_ids"] = sn_ids - answer["dn_ids"] = dn_ids answer["req_ip"] = node_host log.debug(f"register returning: {answer}") app["last_health_check"] = int(time.time()) @@ -410,7 +428,7 @@ async def nodeinfo(request): async def getTargetNodeCount(app, node_type): if node_type == "dn": - key = "target_sn_count" + key = "target_dn_count" elif node_type == "sn": key = "target_sn_count" else: @@ -430,7 +448,12 @@ async def getTargetNodeCount(app, node_type): def getActiveNodeCount(app, node_type): count = 0 nodes = app["nodes"] - for node_id in nodes: + if node_type == "sn": + active_list = app["active_sn_ids"] + else: + active_list = app["active_dn_ids"] + + for node_id in active_list: node = nodes[node_id] if node.type != node_type: continue @@ -462,8 +485,6 @@ async def init(): app["head_port"] = config.get("head_port") - nodes = {} - # check to see if we are running in a DCOS cluster if "MARATHON_APP_ID" in os.environ: msg = "Found MARATHON_APP_ID environment variable, setting " @@ -473,7 +494,12 @@ async def init(): else: log.info("not setting is_dcos") - app["nodes"] = nodes + target_sn_count = await getTargetNodeCount(app, "sn") + target_dn_count = await getTargetNodeCount(app, "dn") + + app["nodes"] = {} + app["active_sn_ids"] = [None, ] * target_sn_count + app["active_dn_ids"] = [None, ] * target_dn_count app["dead_node_ids"] = set() app["start_time"] = int(time.time()) # seconds after epoch app["last_health_check"] = 0 diff --git a/hsds/util/azureBlobClient.py b/hsds/util/azureBlobClient.py index 6b22c869..ba9a2eab 100644 --- a/hsds/util/azureBlobClient.py +++ b/hsds/util/azureBlobClient.py @@ -134,7 +134,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1): if isinstance(e, AzureError): if e.status_code == 404: msg = f"storage key: {key} not found " - log.warn(msg) + log.info(msg) raise HTTPNotFound() elif e.status_code in (401, 403): msg = f"azureBlobClient.access denied for get key: {key}" diff --git a/hsds/util/fileClient.py b/hsds/util/fileClient.py index 1bc5e786..4e9c5ef0 100644 --- a/hsds/util/fileClient.py +++ b/hsds/util/fileClient.py @@ -156,7 +156,7 @@ async def get_object(self, key, bucket=None, offset=0, length=-1): log.info(msg) except FileNotFoundError: msg = f"fileClient: {key} not found " - log.warn(msg) + log.info(msg) raise HTTPNotFound() except IOError as ioe: msg = f"fileClient: IOError reading {bucket}/{key}: {ioe}" @@ -166,8 +166,8 @@ async def get_object(self, key, bucket=None, offset=0, length=-1): except CancelledError as cle: self._file_stats_increment("error_count") msg = f"CancelledError for get file obj {key}: {cle}" - log.error(msg) - raise HTTPInternalServerError() + log.warn(msg) + raise except Exception as e: self._file_stats_increment("error_count") msg = f"Unexpected Exception {type(e)} get get_object {key}: {e}" @@ -227,8 +227,8 @@ async def put_object(self, key, data, bucket=None): except CancelledError as cle: # file_stats_increment(app, "error_count") msg = f"CancelledError for put file obj {key}: {cle}" - log.error(msg) - raise HTTPInternalServerError() + log.warn(msg) + raise except Exception as e: # file_stats_increment(app, "error_count") @@ -274,8 +274,8 @@ async def delete_object(self, key, bucket=None): except CancelledError as cle: self._file_stats_increment("error_count") msg = f"CancelledError deleting file obj {key}: {cle}" - log.error(msg) - raise HTTPInternalServerError() + log.warn(msg) + raise except Exception as e: self._file_stats_increment("error_count") diff --git a/hsds/util/idUtil.py b/hsds/util/idUtil.py index fe21bbb0..d7bf9169 100644 --- a/hsds/util/idUtil.py +++ b/hsds/util/idUtil.py @@ -536,5 +536,9 @@ def getDataNodeUrl(app, obj_id): raise HTTPServiceUnavailable() dn_number = getObjPartition(obj_id, dn_node_count) url = dn_urls[dn_number] + if not url: + msg = "Service not ready (no DN url set)" + log.warn(msg) + raise HTTPServiceUnavailable() log.debug(f"got dn_url: {url} for obj_id: {obj_id}") return url