Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
978e672
add h5json package
jreadey Apr 14, 2025
abb5d0c
temp use of github branch for h5json ref
jreadey Apr 14, 2025
ed44afa
remove array_util test
jreadey Apr 14, 2025
bdff6e4
use h5json for ndarray_compare function
jreadey Apr 14, 2025
3904cf9
use h5json objid funcs
jreadey Apr 23, 2025
e1926c0
add nodeUtil.py
jreadey Apr 23, 2025
ae4579f
fix parameter for createObjId call
jreadey Apr 23, 2025
d6cad74
fix collection name for use with h5json
jreadey Apr 23, 2025
6add48a
use connsistent collection name for isValidUuid
jreadey Apr 23, 2025
b13321c
fix flake8 format errors
jreadey Apr 23, 2025
fee9390
fix flake8 error in testall
jreadey Apr 23, 2025
f1b1cab
use h5json for unit test id
jreadey Apr 23, 2025
5dc3f76
restrict version on numcodecs
jreadey Apr 24, 2025
fb17e10
allow client to generate obj ids
jreadey Apr 30, 2025
3be18a0
enable attributes to be included with POST req
jreadey May 7, 2025
00d7c96
add create timestamps for attributes in obj create
jreadey May 7, 2025
47b9a6e
enable links to be initialized in post groups
jreadey May 7, 2025
d9c3e87
support dataset value init in post request
jreadey May 8, 2025
4ab24fc
add compound init value test
jreadey May 9, 2025
fc3ad68
added post data with compound data initializer
jreadey May 9, 2025
8a18945
add post_crawler class
jreadey May 15, 2025
a8ec66d
avoid exception for mkdir race condition
jreadey May 15, 2025
41e23e9
use domain crawler to create links for post group multi
jreadey May 15, 2025
7cfa3d6
added multi create for datatype objs
jreadey May 16, 2025
ef746d0
added datatype test with no type in body
jreadey May 18, 2025
b1af9bc
modularize dataset creation args processing
jreadey May 20, 2025
52f42f3
refacotr post dataset args to service_lib.py
jreadey May 21, 2025
ce45804
add multi-dataset test with init data
jreadey May 21, 2025
88e0691
allow client group id for PUT domain
jreadey Jun 6, 2025
7561534
fix np.frombuffer error
jreadey Jun 8, 2025
25c4cf3
fix dsetUtil flake errors
jreadey Jun 8, 2025
5cc77e7
expanded link test
jreadey Jul 14, 2025
45f3aa5
added config to test high latency storage
jreadey Jul 16, 2025
ff1c043
added put_data action for DomainCrawler
jreadey Jul 22, 2025
cda56cf
fix for hang in DomainCrawler put_data handler
jreadey Jul 23, 2025
5a2d4d6
reduce log verbosity
jreadey Jul 23, 2025
053395c
fix for regression with h5pyd master branch
jreadey Jul 29, 2025
78127f1
enable client-based timestamps for attribute and link creation
jreadey Sep 8, 2025
f96b34c
remove python 3.9 from .git workflow
jreadey Sep 9, 2025
03e413f
adjust min time for time skew test
jreadey Sep 9, 2025
b6016e0
use hdf5-json util classes
jreadey Oct 29, 2025
61d38fd
update requirement.txt
jreadey Nov 13, 2025
73d8223
updates to support h5json latest
joshStillerman Dec 16, 2025
a2ca1ee
updated for new hdf5-json methods
jreadey Dec 26, 2025
55c8598
update for h5json changes
jreadey Jan 4, 2026
77042d8
added consolidated metadata support
jreadey Jan 6, 2026
23bb24b
fix for use of H5S_UNLIMITED in maxdims
jreadey Jan 6, 2026
c66d632
fix for domain_test
jreadey Jan 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
fail-fast: false
matrix:
os: [ubuntu-22.04, ubuntu-latest, windows-latest]
python-version: ["3.9", "3.10", "3.11", "3.12"]
python-version: ["3.10", "3.11", "3.12"]
build-method: ["manual", "docker"]

runs-on: ${{ matrix.os }}
Expand Down
7 changes: 6 additions & 1 deletion admin/config/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ flush_sleep_interval: 1 # time to wait between checking on dirty objects
flush_timeout: 10 # max time to wait on all I/O operations to complete for a flush
min_chunk_size: 1m # 1 MB
max_chunk_size: 4m # 4 MB
default_vlen_type_size: 128 # guess for average size of variable length types
max_request_size: 100m # 100 MB - should be no smaller than client_max_body_size in nginx tmpl (if using nginx)
max_chunks_per_folder: 0 # max number of chunks per s3 folder. 0 for unlimiited
max_task_count: 100 # maximum number of concurrent tasks per node before server will return 503 error
Expand All @@ -70,7 +71,7 @@ admin_group: null # enable admin privileges for any user in this group
openid_provider: azure # OpenID authentication provider
openid_url: null # OpenID connect endpoint if provider is not azure or google
openid_audience: null # OpenID audience. This is synonymous with azure_resource_id for azure
openid_claims: unique_name,appid,roles # Comma seperated list of claims to resolve to usernames.
openid_claims: unique_name,appid,roles # Comma separated list of claims to resolve to usernames.
chaos_die: 0 # if > 0, have nodes randomly die after n seconds (for testing)
standalone_app: false # True when run as a single application
blosc_nthreads: 2 # number of threads to use for blosc compression. Set to 0 to have blosc auto-determine thread count
Expand All @@ -88,6 +89,10 @@ allow_any_bucket_read: true # enable reads to buckets other than default bucket
allow_any_bucket_write: true # enable writes to buckets other than default bucket
bit_shuffle_default_blocksize: 2048 # default blocksize for bitshuffle filter
max_rangeget_gap: 1024 # max gap in byte for intelligent range get requests
predate_maxtime: 10.0 # max delta between object created timestamp in request and actual time
posix_delay: 0.0 # delay for POSIX IO operations for simulating cloud storage latencies
max_compact_dset_size: 65536 # size in bytes for maximum compact storage size
max_timestamp_drift: 300 # number of seconds a client-based timestamp can differ from current time
# DEPRECATED - the remaining config values are not used in currently but kept for backward compatibility with older container images
aws_lambda_chunkread_function: null # name of aws lambda function for chunk reading
aws_lambda_threshold: 4 # number of chunks per node per request to reach before using lambda
Expand Down
210 changes: 194 additions & 16 deletions hsds/async_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,24 @@
from aiohttp.client_exceptions import ClientError
from aiohttp.web_exceptions import HTTPNotFound, HTTPInternalServerError
from aiohttp.web_exceptions import HTTPForbidden
from .util.idUtil import isValidUuid, isSchema2Id, getS3Key, isS3ObjKey
from .util.idUtil import getObjId, isValidChunkId, getCollectionForId
from .util.chunkUtil import getDatasetId, getNumChunks, ChunkIterator
from .util.hdf5dtype import getItemSize, createDataType
from .util.arrayUtil import getNumElements, bytesToArray
from .util.dsetUtil import getHyperslabSelection, getFilterOps, getChunkDims, getFilters
from .util.dsetUtil import getDatasetLayoutClass, getDatasetLayout, getShapeDims
from h5json.hdf5dtype import getItemSize
from h5json.hdf5dtype import createDataType
from h5json.array_util import getNumElements, bytesToArray, bytesArrayToList
from h5json.objid import isValidUuid, isSchema2Id, getS3Key, isS3ObjKey
from h5json.objid import getObjId, isValidChunkId, getCollectionForId
from h5json.filters import getFilters
from h5json.shape_util import getShapeDims, getDataSize
from h5json.dset_util import getDatasetLayoutClass, getDatasetLayout, getChunkDims
from h5json.time_util import getNow

from .util.chunkUtil import getDatasetId, getNumChunks, ChunkIterator, getChunkIndex, getChunkIds
from .util.dsetUtil import getHyperslabSelection
from .util.storUtil import getStorKeys, putStorJSONObj, getStorJSONObj
from .util.storUtil import deleteStorObj, getStorBytes, isStorObj
from .datanode_lib import getFilterOps
from . import hsds_logger as log
from . import config
import time


# List all keys under given root and optionally update info.json
# Note: only works with schema v2 domains!
Expand Down Expand Up @@ -71,9 +77,10 @@ async def updateDatasetInfo(app, dset_id, dataset_info, bucket=None):
msg += f"{dset_id}"
log.warn(msg)
return

type_json = dset_json["type"]
item_size = getItemSize(type_json)
if "layout" not in dset_json:
if not getDatasetLayout(dset_json):
msg = "updateDatasetInfo - expected to find layout in dataset_json "
msg += f"for {dset_id}"
log.warn(msg)
Expand Down Expand Up @@ -106,7 +113,7 @@ async def updateDatasetInfo(app, dset_id, dataset_info, bucket=None):
if layout_class == "H5D_CONTIGUOUS_REF":
# In H5D_CONTIGUOUS_REF a non-compressed part of the HDF5 is divided
# into equal size chunks, so we can just compute link bytes and num
# chunks based on the size of the coniguous dataset
# chunks based on the size of the contiguous dataset
layout_dims = getChunkDims(dset_json)
num_chunks = getNumChunks(selection, layout_dims)
chunk_size = item_size
Expand Down Expand Up @@ -262,20 +269,26 @@ def scanRootCallback(app, s3keys):
results = app["scanRoot_results"]
scanRoot_keyset = app["scanRoot_keyset"]
checksums = results["checksums"]

for s3key in s3keys.keys():

if not isS3ObjKey(s3key):
log.info(f"not s3obj key, ignoring: {s3key}")
log.info(f"scanRoot -not s3obj key, ignoring: {s3key}")
continue
if s3key in scanRoot_keyset:
log.warn(f"scanRoot - dejavu for key: {s3key}")
log.warn(f"scanRoot -scanRoot - dejavu for key: {s3key}")
continue
scanRoot_keyset.add(s3key)
msg = f"scanRoot adding key: {s3key} to keyset, "
msg = f"scanRoot - adding key: {s3key} to keyset, "
msg += f"{len(scanRoot_keyset)} keys"
log.debug(msg)

objid = getObjId(s3key)

if objid in app["deleted_ids"]:
log.debug(f"scanRoot - skipping deleted id: {objid}")
continue

etag = None
obj_size = None
lastModified = None
Expand All @@ -300,8 +313,15 @@ def scanRootCallback(app, s3keys):
is_chunk = True
results["num_chunks"] += 1
results["allocated_bytes"] += obj_size
chunk_index = getChunkIndex(objid)
if max(chunk_index) == 0:
# save the first chunk if present
# this will be used to save dataset values to
# the the obj_ids set for small datasets
results["obj_ids"].add(objid)
else:
results["metadata_bytes"] += obj_size
results["obj_ids"].add(objid)

if is_chunk or getCollectionForId(objid) == "datasets":
if is_chunk:
Expand Down Expand Up @@ -339,6 +359,144 @@ def scanRootCallback(app, s3keys):
log.error(msg)


async def _getDatsetValueJson(app, dset_id, dset_json, obj_ids, size_limit=None, bucket=None):
""" If the dataset size is less than size_limit, and the chunk_ids for the dataset are
available, return a JSON representation of the dataset values. Othewise, return None """

dims = getShapeDims(dset_json)
if dims is None:
return None # null dataspace
if "type" not in dset_json:
msg = f"_getDatsetValueJson - expected to find type in dataset_json for {dset_id}"
log.warn(msg)
return None
type_json = dset_json["type"]
item_size = getItemSize(type_json)
if item_size == "H5T_VARIABLE":
item_size = 1024 # make a guess for variable length types
dataset_size = getDataSize(dims, item_size)
if dataset_size > size_limit:
log.debug(f"_getDatasetValueJson - dataset size {dataset_size} exceeds limit {size_limit}")
return None

chunk_dims = getChunkDims(dset_json)
if not chunk_dims:
log.warning(f"_getDatasetValueJson - no layout found for dataset: {dset_id}")
return None
if chunk_dims != dims:
msg = f"_getDatasetValueJson - dataset layout {chunk_dims} does not match dims {dims} "
msg += f"for dataset: {dset_id}, ignoring"
log.warning(msg)
return None
select_all = getHyperslabSelection(dims) # select entire datashape
chunk_ids = getChunkIds(dset_id, select_all, dims)
if len(chunk_ids) == 0:
log.debug(f"_getDatasetValueJson - no chunk ids found for dataset: {dset_id}")
return None
if len(chunk_ids) > 1:
log.debug(f"_getDatasetValueJson - more than one chunk id found for dataset: {dset_id}")
return None
chunk_id = chunk_ids[0]
if chunk_id not in obj_ids:
log.debug(f"_getDatasetValueJson - chunk id {chunk_id} not in scanned obj_ids")
return None
log.debug(f"using chunk: {chunk_id} to get dataset value for {dset_id}")

# fetch the chunk - using getStoreBytes since this will not be used with
# chunk cache or chunk crawlers
# TBD: need parameters for s3path, s3offset, s3size for ref layouts
# regular store read

filters = getFilters(dset_json)
dt = createDataType(type_json)
filter_ops = getFilterOps(app, dset_id, filters, dtype=dt, chunk_shape=chunk_dims)

kwargs = {
"filter_ops": filter_ops,
"offset": None,
"length": None,
"bucket": bucket
}
s3key = getS3Key(chunk_id)

try:
chunk_bytes = await getStorBytes(app, s3key, **kwargs)
except HTTPNotFound:
log.warning(f"_getDatasetValueJson - HTTPNotFound for chunk {chunk_id} bucket:{bucket}")
return None
except HTTPForbidden:
log.warning(f"_getDatasetValueJson - HTTPForbidden for chunk {chunk_id} bucket:{bucket}")
return None
except HTTPInternalServerError:
msg = "_getDatasetValueJson - "
msg += f"HTTPInternalServerError for chunk {chunk_id} bucket:{bucket}"
log.warning(msg)
return None

if chunk_bytes is None:
msg = f"_getDatasetValueJson -read {chunk_id} bucket: {bucket} returned None"
log.warning(msg)
return None

arr = bytesToArray(chunk_bytes, dt, chunk_dims)

json_value = bytesArrayToList(arr)
log.debug(f"_getDatsetValueJson - returning {json_value}")

return json_value


async def getConsolidatedMetaData(app, obj_ids, bucket=None):
# create a consolidated metadata summary for all objects in the domain
# return a dict of obj_ids to their metadata summaries
log.info("getConsolidatedMetaData - creating consolidated metadata summary")
consolidated_metadata = {}
for obj_id in obj_ids:
if isValidChunkId(obj_id):
# skip chunks - we may use the chunk later when processing it's dataset object
continue
s3_key = getS3Key(obj_id)
try:
obj_json = await getStorJSONObj(app, s3_key, bucket=bucket)
except HTTPNotFound:
log.warn(f"HTTPNotFound for {s3_key} bucket:{bucket}")
continue
except HTTPForbidden:
log.warn(f"HTTPForbidden error for {s3_key} bucket:{bucket}")
continue
except HTTPInternalServerError:
msg = f"HTTPInternalServerError error for {s3_key} bucket:{bucket}"
log.warn(msg)
continue
log.debug(f"getConsolidatedMetaData - got json for obj_id: {obj_id}: {obj_json}")
# extract relevant metadata
metadata_summary = {}
if "type" in obj_json:
metadata_summary["type"] = obj_json["type"]
if "shape" in obj_json:
metadata_summary["shape"] = obj_json["shape"]
if "attributes" in obj_json:
metadata_summary["attributes"] = obj_json["attributes"]
if "links" in obj_json:
metadata_summary["links"] = obj_json["links"]
if "creationProperties" in obj_json:
metadata_summary["creationProperties"] = obj_json["creationProperties"]
if getCollectionForId(obj_id) == "datasets":
log.debug("getConsolidatedMetaData - got dataset")
size_limit = 4096 # TBD - make this a config
kwargs = {"size_limit": size_limit, "bucket": bucket}
json_value = await _getDatsetValueJson(app, obj_id, obj_json, obj_ids, **kwargs)
if json_value is not None:
log.debug(f"adding dataset value to metadata summary for dataset: {obj_id}")
metadata_summary["value"] = json_value
else:
log.debug("getConsolidatedMetaData - not a dataset")

consolidated_metadata[obj_id] = metadata_summary
log.info("getConsolidatedMetaData - done creating consolidated metadata summary")
return consolidated_metadata


async def scanRoot(app, rootid, update=False, bucket=None):

# iterate through all s3 keys under the given root.
Expand Down Expand Up @@ -380,9 +538,10 @@ async def scanRoot(app, rootid, update=False, bucket=None):
results["num_linked_chunks"] = 0
results["linked_bytes"] = 0
results["logical_bytes"] = 0
results["checksums"] = {} # map of objid to checksums
results["obj_ids"] = set() # map of object ids scanned (and first chunk id for datasets)
results["checksums"] = {} # map of objid to checksums
results["bucket"] = bucket
results["scan_start"] = time.time()
results["scan_start"] = getNow(app=app)

app["scanRoot_results"] = results
app["scanRoot_keyset"] = set()
Expand All @@ -399,6 +558,9 @@ async def scanRoot(app, rootid, update=False, bucket=None):
num_objects += len(results["datasets"])
num_objects += results["num_chunks"]
log.info(f"scanRoot - got {num_objects} keys for rootid: {rootid}")
obj_ids = results["obj_ids"]
log.info(f"scanRoot - got {len(obj_ids)} unique object ids")
log.debug(f"scanRoot - obj_ids: {obj_ids}")

dataset_results = results["datasets"]
for dsetid in dataset_results:
Expand Down Expand Up @@ -437,7 +599,12 @@ async def scanRoot(app, rootid, update=False, bucket=None):
# free up memory used by the checksums
del results["checksums"]

results["scan_complete"] = time.time()
results["scan_complete"] = getNow(app=app)

# extract the obj_ids set, that won't go into .info.json
obj_ids = results["obj_ids"]
del results["obj_ids"]
log.debug(f"obj_ids set: {obj_ids}")

if update:
# write .info object back to S3
Expand All @@ -446,6 +613,17 @@ async def scanRoot(app, rootid, update=False, bucket=None):
msg += f"{results}"
log.info(msg)
await putStorJSONObj(app, info_key, results, bucket=bucket)

# create a json summary of objects in ths domain
log.debug(f"Creating consolidated metadata summary for root {rootid}")
summary_key = root_prefix + ".summary.json"
summary_data = await getConsolidatedMetaData(app, obj_ids, bucket=bucket)
if summary_data:
log.info(f"Got consolidated metadata summary for root {rootid}")
log.debug(f"Summary data: {summary_data}")
await putStorJSONObj(app, summary_key, summary_data, bucket=bucket)
else:
log.info(f"No consolidated metadata summary for root {rootid}")
return results


Expand Down
Loading
Loading