Skip to content

Commit 829e298

Browse files
authored
Merge pull request #17 from cloudblue/LITE-24285
LITE-24285: Added async execution capabilities
2 parents fcfef0d + a5011ff commit 829e298

7 files changed

Lines changed: 323 additions & 270 deletions

File tree

executor/executor.py

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
import asyncio
2+
import inspect
13
import logging
24
import sys
35
from datetime import datetime
46

57
import pytz
6-
from connect.client import ClientError, ConnectClient
8+
from connect.client import AsyncConnectClient, ClientError, ConnectClient
79
from connect.reports.constants import REPORTS_ENV
810
from connect.reports.datamodels import Account, Report
911
from connect.reports.renderers import get_renderer
@@ -72,20 +74,30 @@ def normalize_parameters(connect_parameters):
7274
return parameters
7375

7476

77+
async def execute_report_async(entrypoint, args, renderer, output_file):
78+
if inspect.iscoroutinefunction(entrypoint):
79+
data = await entrypoint(*args)
80+
else:
81+
data = entrypoint(*args)
82+
return await renderer.render_async(
83+
data,
84+
output_file,
85+
start_time=datetime.now(tz=pytz.utc),
86+
)
87+
88+
89+
def _run_render(is_async, entrypoint, args, renderer, output_file):
90+
if is_async:
91+
return asyncio.run(execute_report_async(entrypoint, args, renderer, output_file))
92+
else:
93+
data = entrypoint(*args)
94+
return renderer.render(data, output_file, start_time=datetime.now(tz=pytz.utc))
95+
96+
7597
def execute_report(control_client, report_definition, connect_report): # noqa: CCR001
7698
report_env = get_report_env()
7799
reports_dir = get_default_reports_dir()
78100

79-
report_client = ConnectClient(
80-
endpoint=report_env["api_endpoint"],
81-
use_specs=False,
82-
api_key=report_env["client_token"],
83-
max_retries=5,
84-
default_limit=500,
85-
default_headers=get_user_agent(),
86-
timeout=(360, 360),
87-
resourceset_append=False,
88-
)
89101
connect_parameters = connect_report.get('parameters', [])
90102
parameters = normalize_parameters(connect_parameters)
91103

@@ -120,6 +132,23 @@ def progress(current_value, max_value):
120132
logger.exception('An error ocurred while importing report entrypoint.')
121133
handle_preparation_exception(e, control_client)
122134

135+
is_async = (
136+
inspect.isasyncgenfunction(report_entry_point)
137+
or inspect.iscoroutinefunction(report_entry_point)
138+
)
139+
140+
client_class = AsyncConnectClient if is_async else ConnectClient
141+
report_client = client_class(
142+
endpoint=report_env["api_endpoint"],
143+
use_specs=False,
144+
api_key=report_env["client_token"],
145+
max_retries=5,
146+
default_limit=500,
147+
default_headers=get_user_agent(),
148+
timeout=(360, 360),
149+
resourceset_append=False,
150+
)
151+
123152
renderer_id = connect_report['renderer']
124153
renderer_definition = next(
125154
filter(
@@ -151,8 +180,7 @@ def progress(current_value, max_value):
151180
renderer.set_extra_context,
152181
],
153182
)
154-
data = report_entry_point(*args)
155-
return renderer.render(data, '/report', start_time=datetime.now(tz=pytz.utc))
183+
return _run_render(is_async, report_entry_point, args, renderer, '/report')
156184
except Exception as e:
157185
handle_exception(e, control_client, connect_report)
158186

0 commit comments

Comments
 (0)