Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions agent/src/testflinger_agent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,22 +316,49 @@ def post_log(
):
"""Post log data to the testflinger server for this job.

:param job_id
id for the job
:param log_input
Dataclass with all of the keys for the log endpoint
:param log_type
Enum of different log types the server accepts
:param job_id: id for the job
:param log_input: Dataclass with all of the keys for the log endpoint
:param log_type: Enum of different log types the server accepts
"""
endpoint = urljoin(self.server, f"/v1/result/{job_id}/log/{log_type}")

# Define request success flags
request = None

# TODO: Remove legacy endpoint support in future versions
# Define legacy_request success flag
legacy_request = None

# Enum is "serial", for compatibility, define "serial_output" instead
suffix = (
"serial_output"
if log_type == LogType.SERIAL_OUTPUT
else log_type.value
)
legacy_endpoint = urljoin(self.server, f"/v1/result/{job_id}/{suffix}")
# Prioritize writing to legacy endpoint
try:
job_request = self.session.post(
legacy_request = self.session.post(
legacy_endpoint,
data=log_input.log_data.encode("utf-8"),
timeout=60,
)
except requests.exceptions.RequestException as exc:
logger.error(exc)
logger.info("Fallback to new log endpoint")

# Write logs to new endpoint
try:
request = self.session.post(
endpoint, json=asdict(log_input), timeout=60
)
except requests.exceptions.RequestException as exc:
logger.error(exc)
return False
return bool(job_request)

# Return True if either request was successful
return any(
req is not None and req.ok for req in (legacy_request, request)
)

def post_advertised_queues(self):
"""Post the list of advertised queues to testflinger server."""
Expand Down
2 changes: 1 addition & 1 deletion agent/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion cli/testflinger_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,7 @@ def do_poll(
if job_state == "waiting":
print("This job is waiting on a node to become available.")
cur_fragment = start_fragment
consecutive_empty_polls = 0
while True:
try:
job_state = self.get_job_state(job_id)
Expand Down Expand Up @@ -1117,8 +1118,16 @@ def do_poll(
f"of it in the queue are complete"
)
elif last_fragment_number < 0:
print("Waiting on output...")
consecutive_empty_polls += 1
# Only show message after 90 seconds of no output
# Indicates the CLI is actively attempting to poll data
if consecutive_empty_polls == 9:
print("Waiting on output...", file=sys.stderr)
else:
# Reset counter when we get data
consecutive_empty_polls = 0

# Print the retrieved log data
print(log_data, end="", flush=True)
cur_fragment = last_fragment_number + 1
time.sleep(10)
Expand Down
53 changes: 50 additions & 3 deletions cli/tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import re
import sys
import tarfile
import time
import uuid
from http import HTTPStatus
from pathlib import Path
Expand All @@ -33,8 +34,8 @@

import testflinger_cli
from testflinger_cli.client import HTTPError
from testflinger_cli.errors import AuthorizationError, NetworkError
from testflinger_cli.enums import LogType
from testflinger_cli.errors import NetworkError

URL = "https://testflinger.canonical.com"

Expand Down Expand Up @@ -1193,8 +1194,6 @@ def test_live_polling_with_fragments_progression(
capsys, requests_mock, monkeypatch
):
"""Test live polling uses cur_fragment and progresses through fragments."""
import time

job_id = str(uuid.uuid1())

# Mock time.sleep
Expand Down Expand Up @@ -1275,3 +1274,51 @@ def combined_log_wrapper_2(

# Should have slept between iterations
assert len(sleep_calls) >= 2

def test_live_polling_with_empty_poll(capsys, requests_mock, monkeypatch):
"""Test that live output handles empty polls correctly."""
job_id = str(uuid.uuid1())

sleep_calls = []
monkeypatch.setattr(
time, "sleep", lambda duration: sleep_calls.append(duration)
)

# Mock job status
requests_mock.get(
f"{URL}/v1/result/{job_id}",
10 * [{"json": {"job_state": "active"}}]
+ [{"json": {"job_state": "complete"}}],
)

# Mock log output with 10 empty responses
requests_mock.get(
f"{URL}/v1/result/{job_id}/log/output",
10
* [
{
"json": {
"output": {
"test": {"last_fragment_number": -1, "log_data": ""}
}
}
}
]
+ [
{
"json": {
"output": {
"test": {"last_fragment_number": 0, "log_data": "data"}
}
}
}
],
)

sys.argv = ["", "poll", job_id]
tfcli = testflinger_cli.TestflingerCli()
tfcli.run()

captured = capsys.readouterr()
assert "Waiting on output..." in captured.err
assert len(sleep_calls) >= 9
2 changes: 1 addition & 1 deletion common/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "uv_build"
name = "testflinger-common"
description = "Testflinger common modules"
readme = "README.md"
version = "1.1.3"
version = "1.1.4"
requires-python = ">=3.10"
dependencies = ["strenum>=0.4.15"]

Expand Down
2 changes: 1 addition & 1 deletion common/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions server/src/testflinger/api/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,3 +597,71 @@ class SecretIn(Schema):
"""Secret input schema."""

value = fields.String(required=True)


class ResultLegacy(Schema):
"""Legacy Result Post schema for backwards compatibility."""

# TODO: Remove this schema after deprecating legacy endpoints
setup_status = fields.Integer(required=False)
setup_output = fields.String(required=False)
setup_serial = fields.String(required=False)
provision_status = fields.Integer(required=False)
provision_output = fields.String(required=False)
provision_serial = fields.String(required=False)
firmware_update_status = fields.Integer(required=False)
firmware_update_output = fields.String(required=False)
firmware_update_serial = fields.String(required=False)
test_status = fields.Integer(required=False)
test_output = fields.String(required=False)
test_serial = fields.String(required=False)
allocate_status = fields.Integer(required=False)
allocate_output = fields.String(required=False)
allocate_serial = fields.String(required=False)
reserve_status = fields.Integer(required=False)
reserve_output = fields.String(required=False)
reserve_serial = fields.String(required=False)
cleanup_status = fields.Integer(required=False)
cleanup_output = fields.String(required=False)
cleanup_serial = fields.String(required=False)
device_info = fields.Dict(required=False)
job_state = fields.String(required=False)


class ResultSchema(OneOfSchema):
"""Polymorphic schema for posting results in new and legacy formats."""

type_schemas = {
"new": ResultPost,
"legacy": ResultLegacy,
}

def get_obj_type(self, obj):
"""Get object type depending on which schema is correctly parsed."""
return self.get_data_type(obj)

def get_data_type(self, data):
"""Get schema type depending on which schema is correctly parsed."""
# Try legacy first
try:
ResultLegacy().load(data)
return "legacy"
except ValidationError:
# If legacy fails, try new format
try:
ResultPost().load(data)
return "new"
except ValidationError as err:
# Re-raise the last validation error with more context
raise ValidationError(
"Invalid result data schema. "
f"Data does not match either legacy or new format: {err}"
) from err

def _dump(self, obj, **kwargs):
result = super()._dump(obj, **kwargs)
# Parent dump injects the type field:
# result[self.type_field] = self.get_obj_type(obj)
# So we need to remove it
result.pop(self.type_field)
return result
Loading