diff --git a/nslsii/sync_experiment/__init__.py b/nslsii/sync_experiment/__init__.py index 5528c0f..eea544e 100644 --- a/nslsii/sync_experiment/__init__.py +++ b/nslsii/sync_experiment/__init__.py @@ -1 +1 @@ -from .sync_experiment import main, sync_experiment, validate_proposal, switch_redis_proposal +from .sync_experiment import main, sync_experiment, unsync_experiment, switch_proposal diff --git a/nslsii/sync_experiment/sync_experiment.py b/nslsii/sync_experiment/sync_experiment.py index 82d0f21..c55e229 100644 --- a/nslsii/sync_experiment/sync_experiment.py +++ b/nslsii/sync_experiment/sync_experiment.py @@ -1,292 +1,862 @@ import argparse +import base64 +import heapq +import httpx import json import os import re -import warnings -from datetime import datetime -from getpass import getpass -from typing import Any, Dict, Union, Optional - -import httpx import redis -import yaml -from ldap3 import NTLM, Connection, Server -from ldap3.core.exceptions import LDAPInvalidCredentialsResult, LDAPSocketOpenError + +from cryptography.fernet import Fernet, InvalidToken +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from datetime import datetime, timezone +from getpass import getpass +from pydantic.types import SecretStr from redis_json_dict import RedisJSONDict +from tiled.client.context import Context, password_grant, identity_provider_input +from tiled.profiles import load_profiles -data_session_re = re.compile(r"^pass-(?P\d+)$") -nslsii_api_client = httpx.Client(base_url="https://api.nsls2.bnl.gov") +facility_api_client = httpx.Client(base_url="https://api.nsls2.bnl.gov") -def get_current_cycle() -> str: - cycle_response = nslsii_api_client.get( - "/v1/facility/nsls2/cycles/current" - ).raise_for_status() - return cycle_response.json()["cycle"] +class KeyRemapper(dict): + def __missing__(self, key): + self[key] = key + return key -def is_commissioning_proposal(proposal_number, beamline) -> bool: - """True if proposal_number is registered as a commissioning proposal; else False.""" - commissioning_proposals_response = nslsii_api_client.get( - f"/v1/proposals/commissioning?beamline={beamline}" - ).raise_for_status() - commissioning_proposals = commissioning_proposals_response.json()[ - "commissioning_proposals" - ] - return proposal_number in commissioning_proposals +normalized_beamlines = KeyRemapper( + { + "sst1": "sst", + "sst2": "sst", + } +) -def validate_proposal(data_session_value, beamline) -> Dict[str, Any]: - proposal_data = {} - data_session_match = data_session_re.match(data_session_value) +def sync_experiment( + proposal_ids: list[int | str], + activate_id: int | str | None = None, + facility: str = "nsls2", + beamline: str | None = None, + endstation: str | None = None, + verbose: bool = False, +) -> RedisJSONDict: + """Sync a new experiment (proposal) at the beamline. + Authorizes the requested proposals, and activates one of them. + + Parameters + ---------- + proposal_ids : list[int or str] + the list of proposal IDs to authorize + activate_id : int or str (optional) + the ID number of the proposal to activate (defaults to the first proposal in proposal_ids) + facility : str (optional) + the facility that the beamline belongs to (defaults to "nsls2") + beamline : str or None (optional) + the TLA of the beamline from which the experiment is running, not case-sensitive + endstation : str or None (optional) + the endstation at the beamline from which the experiment is running, not case-sensitive + verbose : bool (optional) + turn on verbose printing - if data_session_match is None: + Returns + ------- + md : RedisJSONDict + The updated metadata dictionary + """ + env_beamline, env_endstation = get_beamline_env() + beamline = beamline or env_beamline + endstation = endstation or env_endstation + proposal_ids = [str(proposal_id) for proposal_id in proposal_ids] + activate_proposal = activate_id or proposal_ids[0] + activate_proposal = str(activate_proposal) + + if not beamline: raise ValueError( - f"RE.md['data_session']='{data_session_value}' " - f"is not matched by regular expression '{data_session_re.pattern}'" + "No beamline provided! Please provide a beamline argument, " + "or set the 'BEAMLINE_ACRONYM' environment variable." ) - - try: - current_cycle = get_current_cycle() - proposal_number = data_session_match.group("proposal_number") - proposal_commissioning = is_commissioning_proposal(proposal_number, beamline) - proposal_response = nslsii_api_client.get( - f"/v1/proposal/{proposal_number}" - ).raise_for_status() - proposal_data = proposal_response.json()["proposal"] - if "error_message" in proposal_data: + for proposal_id in proposal_ids: + if not re.fullmatch(r"^\d{6}$", proposal_id): raise ValueError( - f"while verifying data_session '{data_session_value}' " - f"an error was returned by {proposal_response.url}: " - f"{proposal_data}" + f"Provided proposal ID '{proposal_id}' is not valid.\n " + f"A proposal ID must be a 6 character integer." ) - else: - if ( - not proposal_commissioning - and current_cycle not in proposal_data["cycles"] - ): - raise ValueError( - f"Proposal {data_session_value} is not valid in the current NSLS2 cycle ({current_cycle})." - ) - if beamline.upper() not in proposal_data["instruments"]: - raise ValueError( - f"Wrong beamline ({beamline.upper()}) for proposal {data_session_value} ({', '.join(proposal_data['instruments'])})." - ) - # data_session is valid! - - except httpx.RequestError as rerr: - # give the user a warning but allow the run to start - warnings.warn( - f"while verifying data_session '{data_session_value}' " - f"the request {rerr.request.url!r} failed with " - f"'{rerr}'" - ) + if activate_proposal not in proposal_ids: + raise ValueError("Cannot activate a proposal which is not being authorized.") + + beamline = beamline.lower() + if endstation: + endstation = endstation.lower() + normalized_beamline = normalized_beamlines[beamline] + apikey_redis_client = redis.Redis( + host=f"info.{normalized_beamline}.nsls2.bnl.gov", + port=6379, + db=15, + ) - return proposal_data + username, password, duo_append = prompt_for_login( + facility, beamline, endstation, proposal_ids + ) + tiled_context = create_tiled_context( + username, password, duo_append, normalized_beamline, endstation + ) -config_files = [ - os.path.expanduser("~/.config/n2sn_tools.yml"), - "/etc/n2sn_tools.yml", -] + data_sessions = {"pass-" + proposal_id for proposal_id in proposal_ids} + if not proposals_can_be_authorized(username, facility, beamline, data_sessions): + tiled_context.logout() + raise ValueError( + f"You do not have permissions to authorize all proposal IDs: {', '.join(proposal_ids)}" + ) + try: + proposals = retrieve_proposals(facility, beamline, proposal_ids) + except Exception: + tiled_context.logout() + raise + + api_key = get_api_key( + apikey_redis_client, + username, + password, + normalized_beamline, + endstation, + data_sessions, + ) + if not api_key: + try: + api_key_info = create_api_key( + tiled_context, data_sessions, normalized_beamline + ) + except: + tiled_context.logout() + raise + cache_api_key( + apikey_redis_client, + username, + password, + normalized_beamline, + endstation, + data_sessions, + api_key_info, + ) + api_key = get_api_key( + apikey_redis_client, + username, + password, + normalized_beamline, + endstation, + data_sessions, + ) + set_api_key(apikey_redis_client, normalized_beamline, endstation, api_key) + tiled_context.logout() -def authenticate( - username, -): - config = None - for fn in config_files: - try: - with open(fn) as f: - config = yaml.safe_load(f) - except IOError: - pass - else: - break + md_redis_client = redis.Redis(host=f"info.{normalized_beamline}.nsls2.bnl.gov") + redis_prefix = ( + f"{normalized_beamline}-{endstation}-" + if endstation + else f"{normalized_beamline}-" + ) + md = RedisJSONDict(redis_client=md_redis_client, prefix=redis_prefix) + + activate_session = "pass-" + activate_proposal + proposal = proposals[activate_proposal] + + md["data_sessions_authorized"] = list(data_sessions) + users = proposal.pop("users") + pi_name = "" + for user in users: + if user.get("is_pi"): + pi_name = ( + f"{user.get('first_name', '')} {user.get('last_name', '')}".strip() + ) + md["data_session"] = activate_session # e.g. "pass-123456" + md["username"] = username + md["start_datetime"] = datetime.now().isoformat() + # tiled-access-tags used by bluesky-tiled-writer, not saved to metadata + md["tiled_access_tags"] = [activate_session] + md["cycle"] = ( + "commissioning" + if activate_proposal in get_commissioning_proposals(facility, beamline) + else get_current_cycle(facility) + ) + md["proposal"] = { + "proposal_id": proposal.get("proposal_id"), + "title": proposal.get("title"), + "type": proposal.get("type"), + "pi_name": pi_name, + } - if config is None: - raise RuntimeError("Unable to open a config file") + print( + f"Authorized experiments with data sessions {', '.join(md['data_sessions_authorized'])}\n" + ) + print( + f"Activated experiment with data session {md['data_session']} by {md['username']}." + ) - server = config.get("common", {}).get("server") + if verbose: + print(json.dumps(md, indent=2)) - if server is None: - raise RuntimeError("Server name not found!") + return md - auth_server = Server(server, use_ssl=True) - try: - connection = Connection( - auth_server, - user=f"BNL\\{username}", - password=getpass("Password : "), - authentication=NTLM, - auto_bind=True, - raise_exceptions=True, - ) - print(f"\nAuthenticated as : {connection.extend.standard.who_am_i()}") +def unsync_experiment( + facility: str = "nsls2", + beamline: str | None = None, + endstation: str | None = None, + verbose: bool = False, +) -> RedisJSONDict: + """Unsync the currently active experiment (proposal) at the beamline. + Also deauthorizes all currently authorized proposals. - except LDAPInvalidCredentialsResult: - raise RuntimeError(f"Invalid credentials for user '{username}'.") from None - except LDAPSocketOpenError: - print(f"{server} server connection failed...") + Parameters + ---------- + facility : str (optional) + the facility that the beamline belongs to (defaults to "nsls2") + beamline : str or None (optional) + the TLA of the beamline from which the experiment is running, not case-sensitive + endstation : str or None (optional) + the endstation at the beamline from which the experiment is running, not case-sensitive + verbose : bool (optional) + turn on verbose printing + Returns + ------- + md : RedisJSONDict + The updated metadata dictionary + """ + env_beamline, env_endstation = get_beamline_env() + beamline = beamline or env_beamline + endstation = endstation or env_endstation -def should_they_be_here(username, new_data_session, beamline): - user_access_json = nslsii_api_client.get(f"/v1/data-session/{username}").json() + if not beamline: + raise ValueError( + "No beamline provided! Please provide a beamline argument, " + "or set the 'BEAMLINE_ACRONYM' environment variable." + ) - if "nsls2" in user_access_json["facility_all_access"]: - return True + beamline = beamline.lower() + if endstation: + endstation = endstation.lower() + normalized_beamline = normalized_beamlines[beamline] + apikey_redis_client = redis.Redis( + host=f"info.{normalized_beamline}.nsls2.bnl.gov", + port=6379, + db=15, + ) - elif beamline.lower() in user_access_json["beamline_all_access"]: - return True + md_redis_client = redis.Redis(host=f"info.{normalized_beamline}.nsls2.bnl.gov") + md_redis_prefix = ( + f"{normalized_beamline}-{endstation}-" + if endstation + else f"{normalized_beamline}-" + ) + md = RedisJSONDict(redis_client=md_redis_client, prefix=md_redis_prefix) - elif new_data_session in user_access_json["data_sessions"]: - return True + set_api_key(apikey_redis_client, normalized_beamline, endstation, "") + data_sessions_deauthorized = md["data_sessions_authorized"] or [ + "" + ] + md["data_sessions_authorized"] = list() + data_session = md["data_session"] or "" + md["data_session"] = "" + username = md["username"] or "" + md["username"] = "" + md["start_datetime"] = "" + md["tiled_access_tags"] = list() + md["cycle"] = "" + md["proposal"] = { + "proposal_id": "", + "title": "", + "type": "", + "pi_name": "", + } - return False + print( + f"Deauthorized experiments with data sessions {', '.join(data_sessions_deauthorized)}\n" + ) + print(f"Deactivated experiment with data session {data_session} by {username}.") + if verbose: + print(json.dumps(md, indent=2)) -class AuthorizationError(Exception): ... + return md -def switch_redis_proposal( - proposal_number: Union[int, str], - beamline: str, - username: Optional[str] = None, - prefix: str = "", +def switch_proposal( + proposal_id: int | str, + username: str | None = None, + facility: str = "nsls2", + beamline: str | None = None, + endstation: str | None = None, + verbose: bool = False, ) -> RedisJSONDict: - """Update information in RedisJSONDict for a specific beamline + """Switch the active experiment (proposal) at the beamline. Parameters ---------- - proposal_number : int or str - number of the desired proposal, e.g. `123456` - beamline : str - normalized beamline acronym, case-insensitive, e.g. `SMI` or `sst` - username : str or None - login name of the user assigned to the proposal; if None, current user will be kept - prefix : str - optional prefix to identify a specific endstation, e.g. `opls` + proposal_id : int or str + the ID number of the proposal to activate + username : str or None (optional) + the current user's username - will prompt if no provided. + facility : str (optional) + the facility that the beamline belongs to (defaults to "nsls2") + beamline : str or None (optional) + the TLA of the beamline from which the experiment is running, not case-sensitive + endstation : str or None (optional) + the endstation at the beamline from which the experiment is running, not case-sensitive + verbose : bool (optional) + turn on verbose printing Returns ------- md : RedisJSONDict - The updated redis dictionary. + The updated metadata dictionary """ + env_beamline, env_endstation = get_beamline_env() + beamline = beamline or env_beamline + endstation = endstation or env_endstation + + if not beamline: + raise ValueError( + "No beamline provided! Please provide a beamline argument, " + "or set the 'BEAMLINE_ACRONYM' environment variable." + ) - redis_client = redis.Redis(host=f"info.{beamline.lower()}.nsls2.bnl.gov") - redis_prefix = f"{prefix}-" if prefix else "" - md = RedisJSONDict(redis_client=redis_client, prefix=redis_prefix) - username = username or md.get("username") - - new_data_session = f"pass-{proposal_number}" - if (new_data_session == md.get("data_session")) and ( - username == md.get("username") - ): - # The cycle needs to get updated regardless of experiment status - md["cycle"] = ( - "commissioning" - if is_commissioning_proposal(str(proposal_number), beamline) - else get_current_cycle() + beamline = beamline.lower() + if endstation: + endstation = endstation.lower() + normalized_beamline = normalized_beamlines[beamline] + username = username or input("Enter your username: ") + + md_redis_client = redis.Redis( + host=f"info.{normalized_beamline}.nsls2.bnl.gov", db=0 + ) + md_redis_prefix = ( + f"{normalized_beamline}-{endstation}-" + if endstation + else f"{normalized_beamline}-" + ) + md = RedisJSONDict(redis_client=md_redis_client, prefix=md_redis_prefix) + + activate_proposal = str(proposal_id) + activate_session = "pass-" + activate_proposal + data_sessions_authorized = md.get("data_sessions_authorized") + if not data_sessions_authorized: + raise ValueError( + "There are no currently authorized data sessions (proposals).\n" + "Please run sync-experiment before attempting to switch the active proposal." ) - warnings.warn( - f"Experiment {new_data_session} was already started by the same user." + if not username == md.get("username"): + raise ValueError( + "The currently authorized data sessions (proposals) were authorized by a different user.\n" + "Please re-run sync-experiment to authorize as the intended user." + ) + if activate_session not in data_sessions_authorized: + raise ValueError( + f"Cannot switch to proposal which has not been authorized.\n" + f"The authorized data sessions are: {', '.join(data_sessions_authorized)}\n" + f"To authorize different proposals, re-run sync-experiment." ) - else: - if not should_they_be_here(username, new_data_session, beamline): - raise AuthorizationError( - f"User '{username}' is not allowed to take data on proposal {new_data_session}" + proposals = retrieve_proposals(facility, beamline, [activate_proposal]) + proposal = proposals[activate_proposal] + + users = proposal.pop("users") + pi_name = "" + for user in users: + if user.get("is_pi"): + pi_name = ( + f"{user.get('first_name', '')} {user.get('last_name', '')}".strip() ) + md["data_session"] = activate_session # e.g. "pass-123456" + md["username"] = username + md["start_datetime"] = datetime.now().isoformat() + # tiled-access-tags used by bluesky-tiled-writer, not saved to metadata + md["tiled_access_tags"] = [activate_session] + md["cycle"] = ( + "commissioning" + if activate_proposal in get_commissioning_proposals(facility, beamline) + else get_current_cycle(facility) + ) + md["proposal"] = { + "proposal_id": proposal.get("proposal_id"), + "title": proposal.get("title"), + "type": proposal.get("type"), + "pi_name": pi_name, + } + + print( + f"Switched to experiment wihh data session {md['data_session']} by {md['username']}." + ) + + return md + + +def encrypt_api_key(password, api_key): + """ + This is a reference implementation taken from the + cryptography library docs: + https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet + + The number of iterations was taken from Django's current settings. + """ + salt = os.urandom(16) + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=1_500_000, + ) + key = base64.urlsafe_b64encode( + kdf.derive(password.get_secret_value().encode("UTF-8")) + ) + f = Fernet(key) + token = f.encrypt(api_key.encode("UTF-8")) + return token, salt + + +def decrypt_api_key(password, salt, api_key_encrypted): + """ + This is a reference implementation taken from the + cryptography library docs: + https://cryptography.io/en/latest/fernet/#using-passwords-with-fernet + + The number of iterations was taken from Django's current settings. + """ + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + iterations=1_500_000, + ) + key = base64.urlsafe_b64encode( + kdf.derive(password.get_secret_value().encode("UTF-8")) + ) + f = Fernet(key) + token = f.decrypt(api_key_encrypted) + return token.decode("UTF-8") + + +def prompt_for_login(facility, beamline, endstation, proposal_ids): + print(f"\nWelcome to the {beamline.upper()} beamline at {facility.upper()}!\n") + if endstation: + print(f"This is the {endstation.upper()} endstation.\n") + print( + f"Attempting to sync experiment for proposal ID(s) {(', ').join(proposal_ids)}.\n" + ) + print("Please login with your BNL credentials (you may receive a Duo prompt):") + username = input("Username: ") + password = SecretStr(getpass(prompt="Password: ")) + duo_append = input("Duo Passcode or Method (press Enter to ignore): ") + return username, password, duo_append + + +def create_tiled_context(username, password, duo_append, beamline, endstation): + """ + Create a new Tiled context and authenticate. + + Loads the beamline Tiled profile, instantiates the new context, + selects an AuthN provider, attempts to retrieve tokens via password_grant, + then prints a confirmation message and authenticates the context. - proposal_data = validate_proposal(new_data_session, beamline) - users = proposal_data.pop("users") - pi_name = "" - for user in users: - if user.get("is_pi"): - pi_name = ( - f"{user.get('first_name', '')} {user.get('last_name', '')}".strip() - ) - md["data_session"] = new_data_session # e.g. "pass-123456" - md["username"] = username - md["start_datetime"] = datetime.now().isoformat() - # tiled-access-tags used by bluesky-tiled-writer, not saved to metadata - md["tiled_access_tags"] = [new_data_session] - md["cycle"] = ( - "commissioning" - if is_commissioning_proposal(str(proposal_number), beamline) - else get_current_cycle() + """ + profiles = load_profiles() + if endstation and endstation in profiles: + _, profile = profiles[endstation] + elif beamline in profiles: + _, profile = profiles[beamline] + else: + raise ValueError(f"Cannot find Tiled profile for beamline {beamline.upper()}") + + context, _ = Context.from_any_uri( + profile["uri"], verify=profile.get("verify", True) + ) + + providers = context.server_info.authentication.providers + http_client = context.http_client + if len(providers) == 1: + # There is only one choice, so no need to prompt the user. + spec = providers[0] + else: + spec = identity_provider_input(providers) + auth_endpoint = spec.links["auth_endpoint"] + provider = spec.provider + client_id = spec.links.get("client_id") + token_endpoint = spec.links.get("token_endpoint") + oauth2_spec = True if client_id and token_endpoint else False + mode = spec.mode + + if mode != "internal": + raise ValueError( + "Selected provider is not mode 'internal', and " + "sync-experiment only supports password auth currently." + "Please select a provider with mode='internal'." ) - md["proposal"] = { - "proposal_id": proposal_data.get("proposal_id"), - "title": proposal_data.get("title"), - "type": proposal_data.get("type"), - "pi_name": pi_name, - } - print(f"Started experiment {md['data_session']} by {md['username']}.") + if duo_append: + password = SecretStr(f"{password.get_secret_value()},{duo_append}") + try: + tokens = password_grant( + http_client, auth_endpoint, provider, username, password.get_secret_value() + ) + except httpx.HTTPStatusError as err: + if err.response.status_code == httpx.codes.UNAUTHORIZED: + raise ValueError("Username or password not recognized.") from err + else: + raise - return md + confirmation_message = spec.confirmation_message + if confirmation_message: + username = "external user" if oauth2_spec else tokens["identity"]["id"] + print(confirmation_message.format(id=username)) + context.configure_auth(tokens, remember_me=False) -def sync_experiment(proposal_number, beamline, verbose=False, prefix=""): - # Authenticate the user - username = input("Username : ") - authenticate(username) + return context - normalized_beamlines = { - "sst1": "sst", - "sst2": "sst", - } - redis_beamline = normalized_beamlines.get(beamline.lower(), beamline) - md = switch_redis_proposal( - proposal_number, beamline=redis_beamline, username=username, prefix=prefix +def create_api_key(tiled_context, data_sessions, beamline): + access_tags = [data_session for data_session in data_sessions] + access_tags.append(f"_ROOT_NODE_{beamline.upper()}") + scopes = ["read:data", "read:metadata"] + expires_in = "7d" + hostname = os.getenv("HOSTNAME", "unknown host") + note = f"Auto-generated by sync-experiment from {hostname}" + + if expires_in and expires_in.isdigit(): + expires_in = int(expires_in) + info = tiled_context.create_api_key( + access_tags=access_tags, scopes=scopes, expires_in=expires_in, note=note ) + return info - if verbose: - print(json.dumps(md, indent=2)) - return md +def set_api_key(redis_client, beamline, endstation, api_key): + """ + Use to set the active API key in Redis. + + The active API key is stored with key: + --apikey-active + + """ + redis_prefix = ( + f"{beamline}-{endstation}-apikey" if endstation else f"{beamline}-apikey" + ) + redis_client.set(f"{redis_prefix}-active", api_key) + + +def get_api_key(redis_client, username, password, beamline, endstation, data_sessions): + """ + Retrieve an API key from Redis. + Query Redis for key information, decrypt the API key if found, + ignore the key if it is expired, and finally return the key + if it is still fresh. + Failure to decrypt (e.g. a changed password) will also cause + existing keys to be ignored. + + """ + data_sessions_sanitized = sorted( + data_session.replace("-", "") for data_session in data_sessions + ) + redis_prefix = f"{beamline}-{endstation}" if endstation else f"{beamline}" + redis_prefix = ( + f"{redis_prefix}-{username}-{'-'.join(data_sessions_sanitized)}-apikey" + ) + + api_key_cached = {} + cursor = 0 + while True: + cursor, keys = redis_client.scan(cursor=cursor, match=f"{redis_prefix}*") + if keys: + values = redis_client.mget(keys) + keys = [key.decode("UTF-8") for key in keys] + api_key_cached.update(dict(zip(keys, values))) + if cursor == 0: + break + + if api_key_cached: + expires = datetime.fromisoformat( + api_key_cached[f"{redis_prefix}-ts-expires"].decode("UTF-8") + ) + now = datetime.now(timezone.utc) + if expires > now: + salt = api_key_cached[f"{redis_prefix}-salt"] + api_key_encrypted = api_key_cached[f"{redis_prefix}-encrypted"] + try: + api_key = decrypt_api_key(password, salt, api_key_encrypted) + except InvalidToken: + api_key = None + else: + api_key = None + else: + api_key = None + + return api_key + + +def cache_api_key( + redis_client, username, password, beamline, endstation, data_sessions, api_key_info +): + """ + Encrypt and cache and API key in Redis. + + Keys are rotated out via priority queue according to their + expiration dates. There is a limit set on the number of API keys that + may be cached at a time. + + The cached values have keys that are prefixed with: + ----apikey + + There are 4 keys per each API key: + -ts-expires : the timestamp of when the key will expire + -ts-created : the timestamp of when the key was created/cached + -encrypted : the encrypted API key + -salt : the salt used for encryption + + """ + data_sessions_sanitized = sorted( + data_session.replace("-", "") for data_session in data_sessions + ) + redis_prefix = f"{beamline}-{endstation}" if endstation else f"{beamline}" + redis_prefix = ( + f"{redis_prefix}-{username}-{'-'.join(data_sessions_sanitized)}-apikey" + ) + + MAX_ENTRIES = 5 + COUNT_PER_ENTRY = 4 + COUNT_NON_ENTRIES = 1 + length = redis_client.dbsize + + cursor = 0 + expiry_dates = [] + while True: + cursor, keys = redis_client.scan(cursor=cursor, match="*ts-expires") + if keys: + values = redis_client.mget(keys) + expiry_dates.extend( + [ + ( + datetime.fromisoformat(v.decode("UTF-8")), + k.decode("UTF-8").removesuffix("-ts-expires"), + ) + for k, v in zip(keys, values) + ] + ) + if cursor == 0: + break + + heapq.heapify(expiry_dates) + while (length() - COUNT_NON_ENTRIES) / COUNT_PER_ENTRY >= MAX_ENTRIES: + expiry_prefix = heapq.heappop(expiry_dates)[1] + cursor = 0 + while True: + cursor, keys = redis_client.scan( + cursor=cursor, match=f"{expiry_prefix}*", count=10 + ) + if keys: + redis_client.delete(*keys) + if cursor == 0: + break + + encrypted_key, salt = encrypt_api_key(password, api_key_info["secret"]) + redis_client.set( + f"{redis_prefix}-ts-expires", api_key_info["expiration_time"].isoformat() + ) + redis_client.set( + f"{redis_prefix}-ts-created", datetime.now(timezone.utc).isoformat() + ) + redis_client.set(f"{redis_prefix}-encrypted", encrypted_key) + redis_client.set(f"{redis_prefix}-salt", salt) + + +def get_current_cycle(facility): + cycle_response = facility_api_client.get(f"/v1/facility/{facility}/cycles/current") + cycle_response.raise_for_status() + cycle = cycle_response.json()["cycle"] + return cycle + + +def get_commissioning_proposals(facility, beamline): + proposals_response = facility_api_client.get( + f"/v1/proposals/commissioning?beamline={beamline}&facility={facility}" + ) + proposals_response.raise_for_status() + commissioning_proposals = proposals_response.json()["commissioning_proposals"] + return commissioning_proposals + + +def proposals_can_be_authorized(username, facility, beamline, data_sessions): + """ + Check that the user can authorize the requested proposals, + given their data_sessions. + + Activation will be allowed if the user has facility or + beamline "all access", or is listed on all of the proposals. + + Note: for this check to be effective, each proposal also + needs to be checked to ensure that the beamline matches + on the requested proposals. + This must be done in a subsequent validation step. + Otherwise, access could be granted to the wrong proposals. + """ + user_access_response = facility_api_client.get(f"/v1/data-session/{username}") + user_access_response.raise_for_status() + user_access = user_access_response.json() + + can_authorize = ( + facility.lower() in user_access["facility_all_access"] + or beamline.lower() in user_access["beamline_all_access"] + or all( + data_session in user_access["data_sessions"] + for data_session in data_sessions + ) + ) + return can_authorize + + +def retrieve_proposals(facility, beamline, proposal_ids): + """ + Retrieve the data for the proposals that are being authorized. + This is also a validation step, ensuring that all the + requested proposals match the beamline. + ***Without this validation, access could be granted to the wrong proposals.*** + + In the future, this should also match proposals by facility as well. + + If multiple proposals are to be authorized, they must all be allocated + for the same (current) cycle. For commissioning proposals, only one proposal + can be authorized at a time. + """ + current_cycle = get_current_cycle(facility) + commissioning_proposals = get_commissioning_proposals(facility, beamline) + num_proposals = len(proposal_ids) + proposals = {} + for proposal_id in proposal_ids: + proposal_response = facility_api_client.get(f"/v1/proposal/{proposal_id}") + proposal_response.raise_for_status() + proposal = proposal_response.json()["proposal"] + if beamline.upper() not in proposal["instruments"]: + raise ValueError( + f"Proposal {proposal_id} is not at this beamline ({beamline.upper()})." + f"This proposal is at the following beamline(s): {', '.join(proposal['instruments'])}." + ) + is_commissioning_proposal = proposal_id in commissioning_proposals + if num_proposals > 1 and is_commissioning_proposal: + raise ValueError( + f"Cannot authorize multiple experiments alongside a commmissioning proposal." + f"Proposal {proposal_id} is a commissioning proposal." + ) + if not is_commissioning_proposal and current_cycle not in proposal["cycles"]: + raise ValueError( + f"Proposal {proposal_id} is not allocated for the current {facility.upper()} cycle ({current_cycle})." + ) + proposals[proposal_id] = proposal + + return proposals + + +def get_beamline_env(): + beamline = os.getenv("BEAMLINE_ACRONYM") + endstation = os.getenv("ENDSTATION_ACRONYM") + return beamline, endstation def main(): # Used by the `sync-experiment` command parser = argparse.ArgumentParser( - description="Start or switch beamline experiment and record it in Redis" + description="Activate an experiment (proposal) - requires authentication" + ) + parser.add_argument( + "-f", + "--facility", + dest="facility", + type=str, + help="The facility for the experiment (e.g. NSLS2)", + required=False, + default="nsls2", ) parser.add_argument( "-b", "--beamline", dest="beamline", type=str, - help="Which beamline (e.g. CHX)", - required=True, + help="The beamline for the experiment (e.g. CHX)", + required=False, ) parser.add_argument( "-e", "--endstation", - dest="prefix", + dest="endstation", type=str, - help="Prefix for redis keys (e.g. by endstation)", + help="The beamline endstation for the experiment, if applicable", required=False, ) - parser.add_argument( + + # Mutually exclusive modes: sync (proposals+activate), switch, unsync + modes_group = parser.add_mutually_exclusive_group(required=True) + + modes_group.add_argument( "-p", - "--proposal", - dest="proposal", + "--proposals", + dest="proposals", + nargs="+", type=int, - help="Which proposal (e.g. 123456)", - required=True, + help="The proposal ID(s) to authorize for the experiment", + ) + parser.add_argument( + "-a", + "--activate", + dest="activate", + type=int, + help="The ID of the proposal to activate, defaults to the first in the proposals list.", + required=False, + ) + modes_group.add_argument( + "-s", + "--switch", + dest="switch", + type=int, + help="Switch the active proposal to this ID. The proposal must already be authorized.", + ) + modes_group.add_argument( + "-u", + "--unsync", + dest="unsync", + help="Unsync experiment - deauthorize all proposals and deactivate the experiment.", + action=argparse.BooleanOptionalAction, ) parser.add_argument("-v", "--verbose", action=argparse.BooleanOptionalAction) args = parser.parse_args() - sync_experiment( - proposal_number=args.proposal, - beamline=args.beamline, - verbose=args.verbose, - prefix=args.prefix, - ) + if args.activate is not None and args.proposals is None: + parser.error("--activate can only be used when --proposals is provided") + + if args.unsync: + unsync_experiment( + facility=args.facility, + beamline=args.beamline, + endstation=args.endstation, + verbose=args.verbose, + ) + elif args.switch is not None: + switch_proposal( + facility=args.facility, + beamline=args.beamline, + endstation=args.endstation, + proposal_id=args.switch, + verbose=args.verbose, + ) + else: + sync_experiment( + facility=args.facility, + beamline=args.beamline, + endstation=args.endstation, + proposal_ids=args.proposals, + activate_id=args.activate, + verbose=args.verbose, + ) diff --git a/requirements.txt b/requirements.txt index 9146b53..a94212c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,12 +2,12 @@ appdirs bluesky-kafka>=0.8.0 bluesky>=1.8.1 caproto +cryptography databroker h5py httpx ipython ipywidgets -ldap3 matplotlib msgpack >=1.0.0 msgpack-numpy @@ -19,9 +19,11 @@ packaging pillow psutil pycryptodome +pydantic pyolog redis redis-json-dict requests setuptools shortuuid +tiled