Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 51 additions & 25 deletions packages/app/src/app/jobs/jobs-manager.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { CreateJobResponse, JobDetailsResponse, ListJobsResponse } from '@reefgu
import {
BehaviorSubject,
finalize,
interval,
map,
shareReplay,
switchMap,
Expand All @@ -15,7 +14,8 @@ import {
Observable,
of,
Subject,
takeUntil
takeUntil,
repeat
} from 'rxjs';
import { retryHTTPErrors } from '../../util/http-util';
import { AuthService } from '../auth/auth.service';
Expand Down Expand Up @@ -66,6 +66,10 @@ export class JobsManagerService {
// poll job details every milliseconds
jobDetailsInterval: number = 2_000;

/**
* Jobs manager reset
* @see reset()
*/
private _reset$ = new Subject<void>();

constructor() {
Expand Down Expand Up @@ -174,26 +178,34 @@ export class JobsManagerService {
}

/**
* Create a job and poll its details every period. Returns 3 observables to track the job
* Create a job and poll its details. Returns 3 observables to track the job
* creation, job details, and status.
* @param jobType job type
* @param payload job type's payload
* @param period how often to request job status in milliseconds (default 2 seconds)
* @returns StartedJob with observables to track creation, job details, and status.
* @see _watchJobDetails
*/
private _startJob(jobType: JobType, payload: Record<string, any>): StartedJob {
const createJob$ = this.webApi.createJob(jobType, payload).pipe(shareReplay(1));
const cancel$ = new Subject<void>();
const status$ = new BehaviorSubject<ExtendedJobStatus>('CREATING');
const cancel$ = new Subject<void>();

const jobDetails$ = createJob$.pipe(
const createJob$ = this.webApi.createJob(jobType, payload).pipe(
retryHTTPErrors(3),
tap({
error: err => {
status$.next('CREATE_FAILED');
tap(value => {
if (!value.cached) {
// eagerly change to next status (PENDING) so UI can reflect this before jobDetails responds
status$.next('PENDING');
}
// REVIEW else does cached always mean SUCCEEDED?
}),
switchMap(createJobResp => this._watchJobDetails(createJobResp.jobId, status$, cancel$))
// this observable is returned and of interest to multiple subscribers
shareReplay(1)
);

const jobDetails$ = createJob$.pipe(
switchMap(createJobResp => this._watchJobDetails(createJobResp.jobId, status$, cancel$)),
// replay needed here, otherwise each subscription will execute a new switchMap
shareReplay({ bufferSize: 1, refCount: true })
);

return { createJob$, jobDetails$, status$, cancel$ };
Expand All @@ -216,45 +228,59 @@ export class JobsManagerService {
const cancel$ = new Subject<void>();
const status$ = new BehaviorSubject<ExtendedJobStatus>(job.status);

const jobDetails$ = this._watchJobDetails(job.id, status$, cancel$);
const jobDetails$ = this._watchJobDetails(job.id, status$, cancel$).pipe(
shareReplay({ bufferSize: 1, refCount: true })
);

return { createJob$, jobDetails$, status$, cancel$ };
}

/**
* Poll the job's details on jobDetailsInterval until unsubscribed.
* Completes when status is not PENDING or IN_PROGRESS
* Errors when FAILED, CANCELLED, TIMED_OUT
* Caller is responsible for reploy
* @param jobId
* @param status$
* @param cancel$
* @private
*/
private _watchJobDetails(
jobId: number,
status$: BehaviorSubject<ExtendedJobStatus>,
cancel$: Observable<void>
): StartedJob['jobDetails$'] {
status$.next('PENDING');
return interval(this.jobDetailsInterval).pipe(
// Future: if client is tracking many jobs, it would be more efficient to
// share the query/request for all of them (i.e. switchMap to shared observable),
// but this is simplest for now.
switchMap(() => this.webApi.getJob(jobId).pipe(retryHTTPErrors(3))),
// FUTURE if client is tracking many jobs, it would be more efficient to
// share the query/request for all of them (i.e. switchMap to shared observable),
// but this is simplest for now.
return this.webApi.getJob(jobId).pipe(
// infinite retry
retryHTTPErrors(undefined, 50),
// discard extra wrapping object, which has no information.
map(v => v.job),
map(details => details.job),
// only emit when job status changes.
distinctUntilKeyChanged('status'),
// convert job error statuses to thrown errors.
tap(job => {
const s = job.status;
tap(details => {
const s = details.status;
status$.next(s);
if (s === 'FAILED' || s === 'CANCELLED' || s === 'TIMED_OUT') {
throw new Error(`Job id=${job.id} ${s}`);
throw new Error(`Job id=${details.id} ${s}`);
}
}),
// complete observable when not pending/in-progress; emit the last value
takeWhile(
x => x.status === 'PENDING' || x.status === 'IN_PROGRESS',
details => details.status === 'PENDING' || details.status === 'IN_PROGRESS',
true // inclusive: emit the first value that fails the predicate
),
repeat({
delay: this.jobDetailsInterval
}),
takeUntil(cancel$),
takeUntil(this._reset$),
finalize(() => {
status$.complete();
}),
shareReplay(1)
})
);
}
}
Expand Down
7 changes: 5 additions & 2 deletions packages/app/src/util/http-util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,14 @@ function isRetryableHTTPError(err: HttpErrorResponse): boolean {
/**
* Retry HTTP 5XX errors.
* Expects HttpErrorResponse value from HttpClient.
* @param count how many times to retry (default 1)
* @param count how many times to retry (default 1), undefined for infinite retry
* @param delayTime delay in milliseconds before retrying (default no delay)
* @returns
*/
export function retryHTTPErrors<T>(count = 1, delayTime = 0): MonoTypeOperatorFunction<T> {
export function retryHTTPErrors<T>(
count: number | undefined = 1,
delayTime = 0
): MonoTypeOperatorFunction<T> {
// retryWhen deprecated, delay is recommended instead.
return retry({
count,
Expand Down
27 changes: 27 additions & 0 deletions packages/app/src/util/rxjs-util.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import { MonoTypeOperatorFunction, tap } from 'rxjs';

export function tapDebug<T>(name: string): MonoTypeOperatorFunction<T> {
const prefix = `TAP ${name}: `;
let nextCount = 0;
let subCount = 0;
return tap<T>({
next: v => {
nextCount++;
console.log(prefix + `next count=${nextCount}`, v);
},
error: err => {
console.error(prefix + 'error', err);
},
complete: () => {
console.log(prefix + 'complete');
},
subscribe: () => {
subCount++;
console.log(prefix + `subscribe subCount=${subCount}`);
},
unsubscribe: () => {
subCount--;
console.log(prefix + `unsubscribe subCount=${subCount}`);
}
});
}
18 changes: 18 additions & 0 deletions packages/infra/scripts/connect-efs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,24 @@ start_interactive_session() {
exec aws ssm start-session --target "$instance_id"
}

# TODO add to command arg design, add port forwarding config file
# Start a port forwarding session
start_port_forwarding_session() {
local instance_id=$1

log_blue "Starting port forwarding SSM session with instance: $instance_id"
echo ""

local LOCAL_PORT=25432
local REMOTE_HOST="" # DB hostname
local REMOTE_PORT=5432

exec aws ssm start-session \
--target "$instance_id" \
--document-name AWS-StartPortForwardingSessionToRemoteHost \
--parameters "{\"portNumber\":[\"$REMOTE_PORT\"],\"localPortNumber\":[\"$LOCAL_PORT\"],\"host\":[\"$REMOTE_HOST\"]}"
}

# Main function
main() {
local stack_name="${1:-}"
Expand Down
Loading