diff --git a/server/src/testflinger/api/schemas.py b/server/src/testflinger/api/schemas.py index dd62cf2ea..a7abc8e78 100644 --- a/server/src/testflinger/api/schemas.py +++ b/server/src/testflinger/api/schemas.py @@ -49,16 +49,37 @@ class ProvisionLogsIn(Schema): detail = fields.String(required=False) +class AgentName(Schema): + """Agent name schema.""" + + agent_name = fields.String(required=True) + + class AgentIn(Schema): """Agent data input schema.""" identifier = fields.String(required=False) - job_id = fields.String(required=False) - location = fields.String(required=False) - log = fields.List(fields.String(), required=False) + job_id = fields.String( + required=False, + metadata={"description": "Job ID the device is running, if any"}, + ) + location = fields.String( + required=False, metadata={"description": "Location of the device"} + ) + log = fields.List( + fields.String(), + required=False, + metadata={"description": "Push and keep only the last 100 lines"}, + ) provision_type = fields.String(required=False) - queues = fields.List(fields.String(), required=False) - state = fields.String(required=False) + queues = fields.List( + fields.String(), + required=False, + metadata={"description": "Queues the device is listening on"}, + ) + state = fields.String( + required=False, metadata={"description": "State the device is in"} + ) comment = fields.String(required=False) @@ -92,6 +113,12 @@ class Attachment(Schema): device = fields.String(required=False) +class FileUpload(Schema): + """Schema for file upload requests.""" + + file = fields.File(required=True) + + class CM3ProvisionData(Schema): """Schema for the `provision_data` section of a CM3 job.""" @@ -372,6 +399,18 @@ class JobId(Schema): job_id = fields.String(required=True) +class JobGetQuery(Schema): + """SJob GET query schema""" + + queue = fields.List( + fields.String(), + required=True, + metadata={ + "description": "List of queue name(s) that the agent can process" + }, + ) + + class JobSearchRequest(Schema): """Job search request schema.""" @@ -432,9 +471,18 @@ class ResultPost(Schema): keys=fields.String(validate=OneOf(TestPhases)), values=fields.Integer(), required=False, + metadata={ + "description": "Dictionary mapping phase names to exit codes" + }, + ) + device_info = fields.Dict( + required=False, + metadata={"description": "Device information"}, + ) + job_state = fields.String( + required=False, + metadata={"description": "Current job state"}, ) - device_info = fields.Dict(required=False) - job_state = fields.String(required=False) class JobEvent(Schema): @@ -467,20 +515,57 @@ class RestrictedQueueOut(Schema): owners = fields.List(fields.String(), required=True) +class LogTypeParam(Schema): + """Schema for Log type parameter.""" + + log_type = fields.String( + required=True, + validate=OneOf(["output", "serial"]), + metadata={"description": "Type of log to retrieve (output or serial)"}, + ) + + class LogPost(Schema): """Schema for POST of log fragments.""" - fragment_number = fields.Integer(required=True) - timestamp = fields.DateTime(required=True) - phase = fields.String(required=True, validate=OneOf(TestPhases)) - log_data = fields.String(required=True) + fragment_number = fields.Integer( + required=True, + metadata={ + "description": "Sequential fragment number of the log fragment being posted, starting from 0" + }, + ) + timestamp = fields.DateTime( + required=True, + metadata={ + "description": "Timestamp in ISO 8601 format of when the log fragment was created" + }, + ) + phase = fields.String( + required=True, + validate=OneOf(TestPhases), + metadata={ + "description": "Test phase name Test phase name (setup, provision, firmware_update, test, allocate, reserve, cleanup)" + }, + ) + log_data = fields.String( + required=True, + metadata={"description": "The log content for this fragment"}, + ) class LogGetItem(Schema): """Schema for GET of logs for a single phase.""" - last_fragment_number = fields.Integer(required=True) - log_data = fields.String(required=True) + last_fragment_number = fields.Integer( + required=True, + metadata={"description": "The highest fragment number for this phase"}, + ) + log_data = fields.String( + required=True, + metadata={ + "description": "Combined log text from all matching fragments for this phase" + }, + ) class LogGet(Schema): @@ -500,10 +585,23 @@ class LogQueryParams(Schema): """Schema for Log GET Query parameters.""" start_fragment = fields.Integer( - required=False, validate=validators.Range(min=0) + required=False, + validate=validators.Range(min=0), + metadata={ + "description": "Starting fragment number to query from, defaults to 0" + }, + ) + start_timestamp = fields.DateTime( + required=False, + metadata={ + "description": "Starting timestamp to query from in ISO 8601 format" + }, + ) + phase = fields.String( + required=False, + validate=OneOf(TestPhases), + metadata={"description": "Test phase name to filter logs"}, ) - start_timestamp = fields.DateTime(required=False) - phase = fields.String(required=False, validate=OneOf(TestPhases)) job_empty = { @@ -537,6 +635,58 @@ class LogQueryParams(Schema): }, } + +class QueueName(Schema): + """Queue name schema.""" + + queue = fields.String(required=True) + + +class QueueDict(Schema): + """Queue input schema.""" + + data = fields.Dict( + keys=fields.Nested(QueueName), + values=fields.String(metadata={"description": "Queue description"}), + required=True, + ) + + +class QueueList(Schema): + """Queue list schema.""" + + queue = fields.List( + fields.Nested(QueueName), + required=False, + metadata={"description": "List of queue names"}, + ) + + +class QueueWaitTimePercentilesOut(Schema): + """Queue wait time percentiles output schema.""" + + data = fields.Dict( + keys=fields.Nested(QueueName), + values=fields.Dict( + keys=fields.String(validate=OneOf(["5", "10", "50", "90", "95"])), + values=fields.Float(), + metadata={ + "description": "Percentile statistics for job wait times in seconds" + }, + ), + required=True, + ) + + +class JobInQueueOut(Schema): + """Job in queue output schema.""" + + job_id = fields.String(required=True) + created_at = fields.String(required=True) + job_state = fields.String(required=True) + job_queue = fields.String(required=True) + + images_out = { 200: { "description": "Mapping of image names and provision data", @@ -558,6 +708,28 @@ class LogQueryParams(Schema): } +class ImagePostIn(Schema): + """Agent image input schema.""" + + data = fields.Dict( + keys=fields.String(metadata={"description": "Queue name"}), + values=fields.Dict( + keys=fields.String(metadata={"description": "Image name"}), + values=fields.String( + metadata={"description": "Image provision data"} + ), + metadata={"description": "Image data for the queue"}, + ), + required=True, + ) + + +class ClientId(Schema): + """Client ID input schema.""" + + client_id = fields.String(required=True) + + class ClientPermissionsIn(Schema): """Client Permissions output schema.""" @@ -600,6 +772,12 @@ class SecretIn(Schema): value = fields.String(required=True) +class SecretPath(Schema): + """Secret path schema.""" + + path = fields.String(required=True) + + class ResultLegacy(Schema): """Legacy Result Post schema for backwards compatibility.""" @@ -666,3 +844,29 @@ def _dump(self, obj, **kwargs): # So we need to remove it result.pop(self.type_field) return result + + +class Oauth2Token(Schema): + """Token output schema.""" + + access_token = fields.String(required=True) + token_type = fields.String(required=True) + expires_in = fields.Integer(required=True) + refresh_token = fields.String(required=True) + + +class Oauth2RefreshTokenIn(Schema): + """Refresh token input schema.""" + + refresh_token = fields.String( + required=True, + metadata={"description": "Opaque refresh token"}, + ) + + +class Oauth2RefreshTokenOut(Schema): + """Refresh token output schema.""" + + access_token = fields.String(required=True) + token_type = fields.String(required=True) + expires_in = fields.Integer(required=True) diff --git a/server/src/testflinger/api/v1.py b/server/src/testflinger/api/v1.py index 858e30415..cbb925181 100644 --- a/server/src/testflinger/api/v1.py +++ b/server/src/testflinger/api/v1.py @@ -22,7 +22,7 @@ from http import HTTPStatus import requests -from apiflask import APIBlueprint, abort +from apiflask import APIBlueprint, abort, security from flask import current_app, g, jsonify, request, send_file from marshmallow import ValidationError from prometheus_client import Counter @@ -76,10 +76,36 @@ def get_version(): @v1.post("/job") @authenticate -@v1.input(schemas.Job, location="json") -@v1.output(schemas.JobId) +@v1.input( + schemas.Job, + location="json", + example={ + "job_queue": "myqueue", + "name": "Example Test Job", + "tags": ["test", "sample"], + "provision_data": {"url": ""}, + "test_data": {"test_cmds": "lsb_release -a"}, + }, +) +@v1.output( + schemas.JobId, + status_code=200, + description="(OK) Returns the job_id (UUID) of the newly created job", + example={"job_id": "550e8400-1234-1234-1234-446655440000"}, +) +@v1.doc( + responses={ + 422: { + "description": "(Unprocessable Content) The submitted job contains references to secrets that are inaccessible" + } + } +) def job_post(json_data: dict): - """Add a job to the queue.""" + """ + Create a test job request and place it on the specified queue. + + Most parameters passed in the data section of this API will be specific to the type of agent receiving them. The `job_queue` parameter is used to designate the queue used, but all others will be passed along to the agent. + """ try: job_queue = json_data.get("job_queue") except (AttributeError, BadRequest): @@ -191,10 +217,35 @@ def job_builder(data: dict): @v1.get("/job") -@v1.output(schemas.Job) -@v1.doc(responses=schemas.job_empty) +@v1.input( + schema=schemas.JobGetQuery, + location="query", + arg_name="queue", + example=["foo", "bar"], +) +@v1.output( + schemas.Job, + status_code=200, + description="(OK) JSON job data that was submitted by the requester", +) +@v1.doc( + responses={ + 204: { + "description": "(No Content) No jobs available in the specified queues" + }, + 400: { + "description": "(Bad request) No queue is specified in the request" + }, + } +) def job_get(): - """Request a job to run from supported queues.""" + """Get a test job from the specified queue(s). + + When an agent wants to request a job for processing, it can make this request along with a list of one or more queues that it is configured to process. The server will only return one job. + + Note: + Any secrets that are referenced in the job are "resolved" when the job is retrieved by an agent through this endpoint. Any secrets that are inaccessible at the time of retrieval will be resolved to the empty string. + """ queue_list = request.args.getlist("queue") if not queue_list: return "No queue(s) specified in request", HTTPStatus.BAD_REQUEST @@ -242,16 +293,18 @@ def retrieve_secrets(data: dict) -> dict | None: @v1.get("/job/") -@v1.output(schemas.Job) +@v1.input(schema=schemas.JobId, location="path", arg_name="job_id") +@v1.output( + schemas.Job, status_code=200, description="(OK) JSON data for the job" +) +@v1.doc( + responses={ + 400: {"description": "(Bad Request) Invalid job_id specified"}, + 204: {"description": "(No Content) Job not found"}, + } +) def job_get_id(job_id): - """Request the json job definition for a specified job, even if it has - already run. - - :param job_id: - UUID as a string for the job - :return: - JSON data for the job or error string and http error - """ + """Request the json job definition for a specified job, even if it has already run.""" if not check_valid_uuid(job_id): abort(400, message="Invalid job_id specified") response = database.mongo.db.jobs.find_one( @@ -265,13 +318,17 @@ def job_get_id(job_id): @v1.get("/job//attachments") +@v1.input(schema=schemas.JobId, location="path", arg_name="job_id") +@v1.doc( + responses={ + 400: {"description": "(Bad Request) Invalid job_id specified"}, + 204: {"description": "(No Content) No attachments found for this job"}, + } +) def attachment_get(job_id): - """Return the attachments bundle for a specified job_id. + """Download the attachments bundle for a specified job_id. - :param job_id: - UUID as a string for the job - :return: - send_file stream of attachment tarball to download + Returns a gzip-compressed tarball containing all files that were uploaded as attachments. """ if not check_valid_uuid(job_id): return "Invalid job id\n", 400 @@ -283,11 +340,24 @@ def attachment_get(job_id): @v1.post("/job//attachments") +@v1.input(schema=schemas.JobId, location="path", arg_name="job_id") +@v1.input( + schema=schemas.FileUpload, + location="files", +) +@v1.doc( + responses={ + 400: {"description": "(Bad Request) Invalid job_id specified"}, + 422: { + "description": "(Unprocessable Entity) Job not awaiting attachments or the job_id is not valid" + }, + } +) def attachments_post(job_id): """Post attachment bundle for a specified job_id. - :param job_id: - UUID as a string for the job + Upload a gzip-compressed tarball containing files to be used as attachments for the job. + The job must be in a state where it's awaiting attachments. """ if not check_valid_uuid(job_id): return "Invalid job id\n", 400 @@ -313,10 +383,33 @@ def attachments_post(job_id): @v1.get("/job/search") -@v1.input(schemas.JobSearchRequest, location="query") -@v1.output(schemas.JobSearchResponse) +@v1.input( + schemas.JobSearchRequest, + location="query", + example={"tags": ["foo", "bar"], "match": "all"}, +) +@v1.output( + schemas.JobSearchResponse, + status_code=200, + example={ + "jobs": [ + { + "job_id": "550e8400-e29b-41d4-a716-446655440000", + "job_queue": "myqueue", + } + ] + }, +) def search_jobs(query_data): - """Search for jobs by tags.""" + """Search for jobs by tag(s) and state(s). + + Parameters: + + - `tags` (array): List of string tags to search for + - `match` (string): Match mode for + - `tags` (string, "all" or "any", default: "any") + - `state` (array): List of job states to include (or "active" to search all states other than cancelled and completed) + """ tags = query_data.get("tags") match = request.args.get("match", "any") states = request.args.getlist("state") @@ -352,11 +445,20 @@ def search_jobs(query_data): @v1.post("/result//artifact") +@v1.input(schema=schemas.JobId, location="path", arg_name="job_id") +@v1.input( + schema=schemas.FileUpload, + location="files", +) +@v1.doc( + responses={ + 400: {"description": "(Bad Request) Invalid job_id specified"}, + } +) def artifacts_post(job_id): - """Post artifact bundle for a specified job_id. + """Upload a file artifact for the specified job_id. - :param job_id: - UUID as a string for the job + Upload a gzip-compressed tarball containing test artifacts or results files. """ if not check_valid_uuid(job_id): return "Invalid job id\n", 400 @@ -368,13 +470,17 @@ def artifacts_post(job_id): @v1.get("/result//artifact") +@v1.input(schema=schemas.JobId, location="path", arg_name="job_id") +@v1.doc( + responses={ + 400: {"description": "(Bad Request) Invalid job_id specified"}, + 204: {"description": "(No Content) No artifact found for this job"}, + } +) def artifacts_get(job_id): - """Return artifact bundle for a specified job_id. + """Download previously submitted artifact for the specified job_id. - :param job_id: - UUID as a string for the job - :return: - send_file stream of artifact tarball to download + Returns a gzip-compressed tarball containing test artifacts or results files. """ if not check_valid_uuid(job_id): return "Invalid job id\n", 400 @@ -401,14 +507,58 @@ def to_url(self, obj): @v1.get("/result//log/") -@v1.output(schemas.LogGet) +@v1.input(schema=schemas.JobId, location="path", arg_name="job_id") +@v1.input( + schema=schemas.LogTypeParam, + location="path", + arg_name="log_type", + examples={ + "Get all output logs for a job": {"log_type": "output"}, + }, +) +@v1.input( + schemas.LogQueryParams, + location="query", + examples={ + "Get only setup phase output logs": {"phase": "setup"}, + "Get logs from fragment 5 onwards": {"start_fragment": 5}, + "Get logs after a specific timestamp": { + "start_timestamp": "2025-10-15T10:30:00Z" + }, + }, +) +@v1.output( + schemas.LogGet, + status_code=200, + description="(OK) JSON object with logs organized by phase", + example={ + "output": { + "setup": { + "last_fragment_number": 5, + "log_data": "Starting setup...\nSetup complete\n", + }, + "provision": { + "last_fragment_number": 12, + "log_data": "Provisioning device...\nDevice ready\n", + }, + } + }, +) +@v1.doc( + responses={ + 204: { + "description": "(No Content) No logs found for this job_id and log_type" + }, + 400: { + "description": "(Bad Request) Invalid job_id, log_type or query parameter specified" + }, + } +) def log_get(job_id: str, log_type: LogType): - """Get logs for a specified job_id. + """Retrieve logs for the specified job_id and log type. - :param job_id: UUID as a string for the job - :param log_type: LogType enum value for the type of log requested - :raises HTTPError: If the job_id is not a valid UUID or if invalid query - :return: Dictionary with log data + This endpoint supports querying logs with optional filtering by phase, fragment number, + or timestamp. Logs are persistent and can be retrieved multiple times. """ args = request.args if not check_valid_uuid(job_id): @@ -444,14 +594,37 @@ def log_get(job_id: str, log_type: LogType): @v1.post("/result//log/") -@v1.input(schemas.LogPost, location="json") +@v1.input(schema=schemas.JobId, location="path", arg_name="job_id") +@v1.input( + schema=schemas.LogTypeParam, + location="path", + arg_name="log_type", +) +@v1.input( + schemas.LogPost, + location="json", + example={ + "fragment_number": 0, + "timestamp": "2025-10-15T10:00:00+00:00", + "phase": "setup", + "log_data": "Starting setup phase...", + }, +) +@v1.output( + None, status_code=200, description="(OK) Log fragment posted successfully" +) +@v1.doc( + responses={ + 400: { + "description": "(Bad Request) Invalid job_id or log_type specified" + } + } +) def log_post(job_id: str, log_type: LogType, json_data: dict) -> str: - """Post logs for a specified job ID. + """ + Post a log fragment for the specified job_id and log type - :param job_id: UUID as a string for the job - :param log_type: LogType enum value for the type of log being posted - :raises HTTPError: If the job_id is not a valid UUID - :param json_data: Dictionary with log data + This is the new logging endpoint that agents use to stream log data in fragments. Each fragment includes metadata for tracking and querying. """ if not check_valid_uuid(job_id): abort(HTTPStatus.BAD_REQUEST, message="Invalid job_id specified") @@ -469,12 +642,33 @@ def log_post(job_id: str, log_type: LogType, json_data: dict) -> str: @v1.post("/result/") -@v1.input(schemas.ResultSchema, location="json") +@v1.input(schemas.JobId, location="path", arg_name="job_id") +@v1.input( + schemas.ResultSchema, + location="json", + example={ + "status": {"setup": 0, "provision": 0, "test": 0}, + "device_info": {}, + }, +) +@v1.output( + None, + status_code=200, + description="(OK) Job outcome data posted successfully", +) +@v1.doc( + responses={ + 400: {"description": "(Bad Request) Invalid job_id specified"}, + 413: { + "description": "(Request Entity Too Large) Payload exceeds 16MB BSON size limit" + }, + } +) def result_post(job_id: str, json_data: dict) -> str: - """Post a result for a specified job_id. + """Post job outcome data for the specified job_id. - :param job_id: UUID as a string for the job - :raises HTTPError: If the job_id is not a valid UUID + Submit test results including exit codes for each phase, device information, + and job state. The payload must not exceed the 16MB BSON size limit. """ if not check_valid_uuid(job_id): abort(HTTPStatus.BAD_REQUEST, message="Invalid job_id specified") @@ -490,12 +684,31 @@ def result_post(job_id: str, json_data: dict) -> str: @v1.get("/result/") -@v1.output(schemas.ResultGet) +@v1.input(schemas.JobId, location="path", arg_name="job_id") +@v1.output( + schemas.ResultGet, + status_code=200, + description="(OK) JSON data with flattened structure", +) +@v1.doc( + responses={ + 400: {"description": "(Bad Request) Invalid job_id specified"}, + 204: {"description": "(No Content) No results found for this job_id"}, + } +) def result_get(job_id: str): - """Return results for a specified job_id. + """Return previously submitted job outcome data for the specified job_id. + + This endpoint reconstructs results from the new logging system to maintain backward compatibility. It combines phase status information with logs to provide a complete view of job results. + + Returns: + + JSON data with flattened structure including: + - `{phase}_status`: Exit code for each phase + - `{phase}_output`: Standard output logs for each phase (if available) + - `{phase}_serial`: Serial console logs for each phase (if available) + - Additional metadata fields (device_info, job_state, etc.) - :param job_id: UUID as a string for the job - :raises HTTPError: If the job_id is not a valid UUID """ if not check_valid_uuid(job_id): abort(HTTPStatus.BAD_REQUEST, message="Invalid job_id specified") @@ -516,12 +729,27 @@ def result_get(job_id: str): @v1.post("/job//action") -@v1.input(schemas.ActionIn, location="json") +@v1.input(schemas.JobId, location="path", arg_name="job_id") +@v1.input(schemas.ActionIn, location="json", example={"action": "cancel"}) +@v1.output( + None, status_code=200, description="(OK) Action executed successfully" +) +@v1.doc( + responses={ + 400: { + "description": " (Bad Request) The job is already completed or cancelled" + }, + 404: {"description": "(Not Found) The job isn't found"}, + 422: { + "description": "(Unprocessable Entity) The action or the argument to it could not be processed" + }, + } +) def action_post(job_id, json_data): - """Take action on the job status for a specified job ID. + """Execute action for the specified job_id. - :param job_id: - UUID as a string for the job + Supported actions: + - cancel: Cancel a job that hasn't been completed yet """ if not check_valid_uuid(job_id): return "Invalid job id\n", 400 @@ -555,6 +783,11 @@ def queues_get(): @v1.post("/agents/queues") +@v1.input( + schema=schemas.QueueDict, + location="json", + example={"myqueue": "queue 1", "myqueue2": "queue 2"}, +) def queues_post(): """Tell testflinger the queue names that are being serviced. @@ -573,9 +806,14 @@ def queues_post(): @v1.get("/agents/images/") +@v1.input(schema=schemas.QueueName, location="path", arg_name="queue") @v1.doc(responses=schemas.images_out) def images_get(queue): - """Get a dict of known images for a given queue.""" + """Get a dict of all known image names and the associated provisioning data for a given queue. + + Returns a dictionary mapping image names to their provisioning URLs or data. + Returns an empty dict if the queue doesn't exist or has no images. + """ queue_data = database.mongo.db.queues.find_one( {"name": queue}, {"_id": False, "images": True} ) @@ -586,19 +824,21 @@ def images_get(queue): @v1.post("/agents/images") -def images_post(): - """Tell testflinger about known images for a specified queue - images will be stored in a dict of key/value pairs as part of the queues - collection. That dict will contain image_name:provision_data mappings, ex: - { - "some_queue": { +@v1.input( + schema=schemas.ImagePostIn, + location="json", + example={ + "myqueue": { "core22": "http://cdimage.ubuntu.com/.../core-22.tar.gz", - "jammy": "http://cdimage.ubuntu.com/.../ubuntu-22.04.tar.gz" + "jammy": "http://cdimage.ubuntu.com/.../ubuntu-22.04.tar.gz", }, - "other_queue": { - ... - } - }. + "other_queue": {"image1": "data1", "image2": "data2"}, + }, +) +def images_post(): + """Tell testflinger about known images for a specified queue + + Images will be stored in a dict of key/value pairs as part of the queues collection. """ image_dict = request.get_json() # We need to delete and recreate the images in case some were removed @@ -612,9 +852,17 @@ def images_post(): @v1.get("/agents/data") -@v1.output(schemas.AgentOut(many=True)) +@v1.output( + schemas.AgentOut(many=True), + status_code=200, + description="JSON data for all known agents, useful for external systems that need to gather this information", +) def agents_get_all(): - """Get all agent data.""" + """Get all agent data. + + Returns JSON data for all known agents, including their state, queues, location, and + information about restricted queue ownership. Useful for external systems monitoring agents. + """ agents = database.get_agents() restricted_queues = database.get_restricted_queues() restricted_queues_owners = database.get_restricted_queues_owners() @@ -631,14 +879,22 @@ def agents_get_all(): @v1.get("/agents/data/") -@v1.output(schemas.AgentOut) +@v1.input(schemas.AgentName, location="path", arg_name="agent_name") +@v1.output( + schemas.AgentOut, + status_code=200, + description="JSON data for the specified agent, useful for getting information from a single agent. ", +) +@v1.doc( + responses={ + 404: {"description": "(Not Found) The specified agent was not found"} + } +) def agents_get_one(agent_name): """Get the information from a specified agent. - :param agent_name: - String with the name of the agent to retrieve information from. - :return: - JSON data with the specified agent information. + Returns JSON data for the specified agent, including state, queues, location, and + restricted queue ownership information. """ agent_data = database.get_agent_info(agent_name) @@ -685,9 +941,23 @@ def agents_post(agent_name, json_data): @v1.post("/agents/provision_logs/") -@v1.input(schemas.ProvisionLogsIn, location="json") +@v1.input(schemas.AgentName, location="path", arg_name="agent_name") +@v1.input( + schemas.ProvisionLogsIn, + location="json", + example={ + "job_id": "00000000-0000-0000-0000-000000000000", + "exit_code": 1, + "detail": "foo", + }, +) def agents_provision_logs_post(agent_name, json_data): - """Post provision logs for the agent to the server.""" + """Post provision logs for the agent to the server. + + Submit provision log entries including job_id, exit_code, and detail information. + The server maintains the last 100 provision log entries per agent and tracks provision + success/failure streaks. + """ agent_record = {} # timestamp this agent record and provision log entry @@ -726,25 +996,43 @@ def agents_provision_logs_post(agent_name, json_data): @v1.post("/job//events") -@v1.input(schemas.StatusUpdate, location="json") -def agents_status_post(job_id, json_data): - """Post status updates from the agent to the server to be forwarded - to TestObserver. - - The json sent to this endpoint may contain data such as the following: - { - "agent_id": "", - "job_queue": "", - "job_status_webhook": "", +@v1.input(schemas.JobId, location="path", arg_name="job_id") +@v1.input( + schemas.StatusUpdate, + location="json", + example={ + "agent_id": "agent-00", + "job_queue": "myqueue", + "job_status_webhook": "http://mywebhook", "events": [ - { - "event_name": "", - "timestamp": "", - "detail": "" + { + "event_name": "started_provisioning", + "timestamp": "2024-05-03T19:11:33.541130+00:00", + "detail": "my_detailed_message", + } + ], + }, +) +@v1.output( + None, + status_code=200, + description="(OK) Text response from the webhook if the server was successfully able to post.", +) +@v1.doc( + responses={ + 400: { + "description": "(Bad Request) Invalid job_id or JSON data specified" + }, + 504: { + "description": "(Gateway Timeout) The webhook did not respond in time" }, - ... - ] } +) +def agents_status_post(job_id, json_data): + """Post job status updates from the agent to the server and posts them to the specified webhook URL. + + The `job_status_webhook` parameter is required for this endpoint. Other + parameters included here will be forwarded to the webhook. """ _ = job_id @@ -784,8 +1072,20 @@ def check_valid_uuid(job_id): @v1.get("/job//position") +@v1.input(schema=schemas.JobId, location="path", arg_name="job_id") +@v1.output( + str, + status_code=200, + description="(OK) Zero-based position indicating how many jobs are ahead of this job in the queue.", +) +@v1.doc( + responses={ + 400: {"description": "(Bad Request) Invalid job_id specified"}, + 410: {"description": "(Gone) Job not found or already started"}, + } +) def job_position_get(job_id): - """Return the position of the specified jobid in the queue.""" + """Return the position of the specified job_id in the queue.""" job_data, status = job_get_id(job_id) if status == 204: return "Job not found or already started\n", 410 @@ -830,8 +1130,22 @@ def cancel_job(job_id): @v1.get("/queues/wait_times") +@v1.input(schema=schemas.QueueList, location="query", arg_name="queue") +@v1.output( + schemas.QueueWaitTimePercentilesOut, + status_code=200, + description="(OK) JSON mapping of queue names to wait time metrics percentiles", + example={ + "myqueue": {5: 10.5, 10: 15.2, 50: 30.0, 90: 60.8, 95: 75.3}, + "otherqueue": {5: 8.0, 10: 12.5, 50: 25.0, 90: 50.0, 95: 65.0}, + }, +) def queue_wait_time_percentiles_get(): - """Get wait time metrics - optionally take a list of queues.""" + """Get wait time metrics for queues, optionally take a list of queues. + + Returns percentile statistics (p5, p10, p50, p90, p95) for job wait times in the specified queues. + If no queues are specified, returns metrics for all queues. + """ queues = request.args.getlist("queue") wait_times = database.get_queue_wait_times(queues) queue_percentile_data = {} @@ -843,7 +1157,20 @@ def queue_wait_time_percentiles_get(): @v1.get("/queues//agents") -@v1.output(schemas.AgentOut(many=True)) +@v1.input(schemas.QueueName, location="path", arg_name="queue_name") +@v1.output( + schemas.AgentOut(many=True), + status_code=200, + description="JSON data with an array of agent objects listening to the specified queue, including the agent state, location, and current job information.", +) +@v1.doc( + responses={ + 204: { + "description": "(No Content) No agents found listening to the specified queue" + }, + 404: {"description": "(Not Found) The specified queue does not exist"}, + } +) def get_agents_on_queue(queue_name): """Get the list of all data for agents listening to a specified queue.""" if not database.queue_exists(queue_name): @@ -859,13 +1186,24 @@ def get_agents_on_queue(queue_name): @v1.get("/queues//jobs") +@v1.input(schemas.QueueName, location="path", arg_name="queue_name") +@v1.output( + schemas.JobInQueueOut(many=True), + status_code=200, + description="JSON data with an array of job objects including job_id, created_at timestamp, job_state, and job_queue for all jobs in the specified queue.", +) +@v1.doc( + responses={ + 204: { + "description": "(No Content) No jobs found in the specified queue" + }, + } +) def get_jobs_by_queue(queue_name): - """Get the jobs in a specified queue along with its state. + """Get the jobs in a specified queue along with their state. - :param queue_name - String with the queue name where to perform the query. - :return: - JSON data with the jobs allocated to the specified queue. + Returns JSON data with an array of job objects including job_id, created_at timestamp, + job_state, and job_queue for all jobs in the specified queue. """ jobs = database.get_jobs_on_queue(queue_name) @@ -892,6 +1230,27 @@ def get_jobs_by_queue(queue_name): @v1.post("/oauth2/token") +@v1.auth_required( + auth=security.HTTPBasicAuth( + description="Base64 encoded pair of client_id:client_key" + ) +) +@v1.output( + schemas.Oauth2Token, + status_code=200, + description="(OK) JSON object containing access token, token type, expiration time, and refresh token", + example={ + "access_token": "", + "token_type": "Bearer", + "expires_in": 30, + "refresh_token": "", + }, +) +@v1.doc( + responses={ + 401: {"description": "(Unauthorized) Invalid client id or client key"}, + } +) def retrieve_token(): """ Issue both access token and refresh token for a client. @@ -909,6 +1268,10 @@ def retrieve_token(): max_reservation_time: , } } + Notes: + - `expires_in` is the lifetime (in seconds) of the access token. + - Refresh tokens default to 30 days; admin may issue non-expiring tokens for trusted integrations. + """ auth_header = request.authorization if auth_header is None: @@ -948,6 +1311,28 @@ def retrieve_token(): @v1.post("/oauth2/refresh") +@v1.input( + schema=schemas.Oauth2RefreshTokenIn, + location="json", + example={"refresh_token": ""}, +) +@v1.output( + schemas.Oauth2RefreshTokenOut, + status_code=200, + description="(OK) JSON object containing new access token, token type, and expiration time", + example={ + "access_token": "", + "token_type": "Bearer", + "expires_in": 30, + }, +) +@v1.doc( + responses={ + 400: { + "description": "(Bad Request) Missing, invalid, revoked, or expired refresh token" + }, + } +) def refresh_access_token(): """Refresh access token using a valid refresh token.""" data = request.get_json() @@ -970,6 +1355,27 @@ def refresh_access_token(): @v1.post("/oauth2/revoke") +@v1.input( + schema=schemas.Oauth2RefreshTokenIn, + location="json", + example={"refresh_token": ""}, +) +@v1.output( + str, + status_code=200, + description="(OK) Text response indicating successful revocation of the refresh token", + example={"message": "Refresh token revoked successfully"}, +) +@v1.doc( + responses={ + 400: { + "description": "(Bad Request) Missing, invalid, or already revoked refresh token" + }, + 401: { + "description": "(Unauthorized) Admin privileges required to revoke refresh tokens" + }, + } +) @authenticate @require_role(ServerRoles.ADMIN) def revoke_refresh_token(): @@ -995,6 +1401,20 @@ def revoke_refresh_token(): @authenticate @require_role(ServerRoles.ADMIN, ServerRoles.MANAGER, ServerRoles.CONTRIBUTOR) @v1.output(schemas.RestrictedQueueOut(many=True)) +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with permissions to access restricted queues", + ) +) +@v1.doc( + responses={ + 401: {"description": "(Unauthorized) Missing or invalid access token"}, + 403: { + "description": "(Forbidden) Insufficient permissions for the authenticated user to access restricted queues" + }, + } +) def get_all_restricted_queues() -> list[dict]: """List all agent's restricted queues and its owners.""" restricted_queues = database.get_restricted_queues() @@ -1017,6 +1437,24 @@ def get_all_restricted_queues() -> list[dict]: @authenticate @require_role(ServerRoles.ADMIN, ServerRoles.MANAGER, ServerRoles.CONTRIBUTOR) @v1.output(schemas.RestrictedQueueOut) +@v1.input(schemas.QueueName, location="path", arg_name="queue_name") +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with permissions to access restricted queues", + ) +) +@v1.doc( + responses={ + 401: {"description": "(Unauthorized) Missing or invalid access token"}, + 403: { + "description": "(Forbidden) Insufficient permissions for the authenticated user to access restricted queues" + }, + 404: { + "description": "(Not Found) The specified restricted queue was not found" + }, + } +) def get_restricted_queue(queue_name: str) -> dict: """Get restricted queues for a specific agent.""" if not database.check_queue_restricted(queue_name): @@ -1034,11 +1472,37 @@ def get_restricted_queue(queue_name: str) -> dict: @v1.post("/restricted-queues/") +@v1.input( + schemas.RestrictedQueueIn, + location="json", + example={"client_id": "myclient"}, +) +@v1.input(schemas.QueueName, location="path", arg_name="queue_name") +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with admin or manager permissions", + ) +) +@v1.doc( + responses={ + 400: { + "description": "(Bad Request) Missing client_id to set as owner of restricted queue" + }, + 401: {"description": "(Unauthorized) Missing or invalid access token"}, + 403: { + "description": "(Forbidden) Insufficient permissions for the authenticated user to associate restricted queues" + }, + 404: { + "description": "(Not Found) The specified restricted queue does not exist or is not associated to an agent" + }, + } +) @authenticate @require_role(ServerRoles.ADMIN, ServerRoles.MANAGER) -@v1.input(schemas.RestrictedQueueIn, location="json") def add_restricted_queue(queue_name: str, json_data: dict) -> dict: - """Add an owner to the specific restricted queue.""" + """Add an owner to the specific restricted queue. + If the queue does not exist yet, it will be created automatically.""" client_id = json_data.get("client_id", "") # Validate client ID is available in request data @@ -1064,9 +1528,34 @@ def add_restricted_queue(queue_name: str, json_data: dict) -> dict: @v1.delete("/restricted-queues/") +@v1.input( + schemas.RestrictedQueueIn, + location="json", + example={"client_id": "myclient"}, +) +@v1.input(schemas.QueueName, location="path", arg_name="queue_name") +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with admin or manager permissions", + ) +) +@v1.doc( + responses={ + 400: { + "description": "(Bad Request) Missing client_id to remove as owner of restricted queue" + }, + 401: {"description": "(Unauthorized) Missing or invalid access token"}, + 403: { + "description": "(Forbidden) Insufficient permissions for the authenticated user to remove restricted queues" + }, + 404: { + "description": "(Not Found) The specified queue was not found or it is not in the restricted queue list" + }, + } +) @authenticate @require_role(ServerRoles.ADMIN, ServerRoles.MANAGER) -@v1.input(schemas.RestrictedQueueIn, location="json") def delete_restricted_queue(queue_name: str, json_data: dict) -> dict: """Delete an owner from the specific restricted queue.""" if not database.check_queue_restricted(queue_name): @@ -1084,16 +1573,54 @@ def delete_restricted_queue(queue_name: str, json_data: dict) -> dict: @v1.get("/client-permissions") @authenticate @require_role(ServerRoles.ADMIN, ServerRoles.MANAGER) -@v1.output(schemas.ClientPermissionsOut(many=True)) +@v1.output( + schemas.ClientPermissionsOut(many=True), + description="JSON data with a list all client IDs and its permission excluding the hashed secret stored in database", +) +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with admin or manager permissions", + ) +) +@v1.doc( + responses={ + 401: {"description": "(Unauthorized) Missing or invalid access token"}, + 403: { + "description": "(Forbidden) Insufficient permissions for the authenticated user" + }, + } +) def get_all_client_permissions() -> list[dict]: - """Retrieve all client permissions from database.""" + """Retrieve all all client_id and their permissions from database.""" return database.get_client_permissions() @v1.get("/client-permissions/") @authenticate @require_role(ServerRoles.ADMIN, ServerRoles.MANAGER) -@v1.output(schemas.ClientPermissionsOut) +@v1.output( + schemas.ClientPermissionsOut, + description="JSON data with the permissions of a specified client excluding the hashed secret stored in database", +) +@v1.input(schemas.ClientId, location="path", arg_name="client_id") +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with admin or manager permissions", + ) +) +@v1.doc( + responses={ + 401: {"description": "(Unauthorized) Missing or invalid access token"}, + 403: { + "description": "(Forbidden) Insufficient permissions for the authenticated user" + }, + 404: { + "description": "(Not Found) The specified client_id was not found" + }, + } +) def get_client_permissions(client_id) -> list[dict]: """Retrieve single client-permissions from database.""" if not database.check_client_exists(client_id): @@ -1108,7 +1635,41 @@ def get_client_permissions(client_id) -> list[dict]: @v1.put("/client-permissions/") @authenticate @require_role(ServerRoles.ADMIN, ServerRoles.MANAGER) -@v1.input(schemas.ClientPermissionsIn) +@v1.input( + schemas.ClientPermissionsIn, + location="json", + example={ + "client_id": "myclient", + "client_secret": "my-secret-password", + "max_priority": {}, + "max_reservation_time": {"*": 40000}, + "role": "contributor", + }, +) +@v1.input(schemas.ClientId, location="path", arg_name="client_id") +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with admin or manager permissions", + ) +) +@v1.doc( + responses={ + 400: { + "description": "(Bad Request) Missing client_secret when creating new client permissions" + }, + 401: {"description": "(Unauthorized) Missing or invalid access token"}, + 403: { + "description": "(Forbidden) Insufficient permissions for the authenticated user to modify client permissions" + }, + 404: { + "description": "(Not Found) The specified client_id was not found" + }, + 422: { + "description": "(Unprocessable Entity) System admin client cannot be modified using the API" + }, + } +) def set_client_permissions(client_id: str, json_data: dict) -> str: """Add or create client permissions for a specified user.""" # Testflinger Admin credential can't be modified from API!' @@ -1167,6 +1728,27 @@ def set_client_permissions(client_id: str, json_data: dict) -> str: @v1.delete("/client-permissions/") @authenticate @require_role(ServerRoles.ADMIN) +@v1.input(schemas.ClientId, location="path", arg_name="client_id") +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with admin permissions", + ) +) +@v1.doc( + responses={ + 401: {"description": "(Unauthorized) Missing or invalid access token"}, + 403: { + "description": "(Forbidden) Insufficient permissions for the authenticated user to delete client permissions" + }, + 404: { + "description": "(Not Found) The specified client_id was not found" + }, + 422: { + "description": "(Unprocessable Entity) System admin can't be removed using the API" + }, + } +) def delete_client_permissions(client_id: str) -> str: """Delete client id along with its permissions.""" # Testflinger Admin credential can't be removed from API!' @@ -1190,6 +1772,27 @@ def delete_client_permissions(client_id: str) -> str: @v1.put("/secrets//") @authenticate @v1.input(schemas.SecretIn, location="json") +@v1.input(schemas.ClientId, location="path", arg_name="client_id") +@v1.input(schemas.SecretPath, location="path", arg_name="path") +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with permissions to store secrets", + ) +) +@v1.doc( + responses={ + 400: { + "description": "(Bad Request) No secrets store configured or access error" + }, + 403: { + "description": "(Forbidden) client_id does not match authenticated client id" + }, + 500: { + "description": "(Internal Server Error) Error storing the secret value" + }, + } +) def secrets_put(client_id, path, json_data): """Store a secret value for the specified client_id and path.""" if current_app.secrets_store is None: @@ -1211,6 +1814,27 @@ def secrets_put(client_id, path, json_data): @v1.delete("/secrets//") @authenticate +@v1.input(schemas.ClientId, location="path", arg_name="client_id") +@v1.input(schemas.SecretPath, location="path", arg_name="path") +@v1.auth_required( + auth=security.HTTPTokenAuth( + scheme="Bearer", + description="Based64-encoded JWT access token with permissions to delete secrets", + ) +) +@v1.doc( + responses={ + 400: { + "description": "(Bad Request) No secrets store configured or access error" + }, + 403: { + "description": "(Forbidden) client_id does not match authenticated client id" + }, + 500: { + "description": "(Internal Server Error) Error deleting the secret value" + }, + } +) def secrets_delete(client_id, path): """Remove a secret value for the specified client_id and path.""" if current_app.secrets_store is None: @@ -1231,6 +1855,7 @@ def secrets_delete(client_id, path): @v1.get("/result//output") +@v1.doc(deprecated=True) def legacy_output_get(job_id: str) -> str: """Legacy endpoint to get job output for a specified job_id. @@ -1252,6 +1877,7 @@ def legacy_output_get(job_id: str) -> str: @v1.post("/result//output") +@v1.doc(deprecated=True) def legacy_output_post(job_id: str) -> str: """Legacy endpoint to post output for a specified job_id. @@ -1274,6 +1900,7 @@ def legacy_output_post(job_id: str) -> str: @v1.get("/result//serial_output") +@v1.doc(deprecated=True) def legacy_serial_output_get(job_id: str) -> str: """Legacy endpoint to get latest serial output for a specified job ID. @@ -1295,6 +1922,7 @@ def legacy_serial_output_get(job_id: str) -> str: @v1.post("/result//serial_output") +@v1.doc(deprecated=True) def legacy_serial_output_post(job_id: str) -> str: """Legacy endpoint to post serial output for a specified job ID.