diff --git a/packages/app/src/app/jobs/jobs-manager.service.ts b/packages/app/src/app/jobs/jobs-manager.service.ts index ebd1bcbd..2f851749 100644 --- a/packages/app/src/app/jobs/jobs-manager.service.ts +++ b/packages/app/src/app/jobs/jobs-manager.service.ts @@ -5,7 +5,6 @@ import { CreateJobResponse, JobDetailsResponse, ListJobsResponse } from '@reefgu import { BehaviorSubject, finalize, - interval, map, shareReplay, switchMap, @@ -15,7 +14,8 @@ import { Observable, of, Subject, - takeUntil + takeUntil, + repeat } from 'rxjs'; import { retryHTTPErrors } from '../../util/http-util'; import { AuthService } from '../auth/auth.service'; @@ -66,6 +66,10 @@ export class JobsManagerService { // poll job details every milliseconds jobDetailsInterval: number = 2_000; + /** + * Jobs manager reset + * @see reset() + */ private _reset$ = new Subject(); constructor() { @@ -174,26 +178,34 @@ export class JobsManagerService { } /** - * Create a job and poll its details every period. Returns 3 observables to track the job + * Create a job and poll its details. Returns 3 observables to track the job * creation, job details, and status. * @param jobType job type * @param payload job type's payload - * @param period how often to request job status in milliseconds (default 2 seconds) * @returns StartedJob with observables to track creation, job details, and status. + * @see _watchJobDetails */ private _startJob(jobType: JobType, payload: Record): StartedJob { - const createJob$ = this.webApi.createJob(jobType, payload).pipe(shareReplay(1)); - const cancel$ = new Subject(); const status$ = new BehaviorSubject('CREATING'); + const cancel$ = new Subject(); - const jobDetails$ = createJob$.pipe( + const createJob$ = this.webApi.createJob(jobType, payload).pipe( retryHTTPErrors(3), - tap({ - error: err => { - status$.next('CREATE_FAILED'); + tap(value => { + if (!value.cached) { + // eagerly change to next status (PENDING) so UI can reflect this before jobDetails responds + status$.next('PENDING'); } + // REVIEW else does cached always mean SUCCEEDED? }), - switchMap(createJobResp => this._watchJobDetails(createJobResp.jobId, status$, cancel$)) + // this observable is returned and of interest to multiple subscribers + shareReplay(1) + ); + + const jobDetails$ = createJob$.pipe( + switchMap(createJobResp => this._watchJobDetails(createJobResp.jobId, status$, cancel$)), + // replay needed here, otherwise each subscription will execute a new switchMap + shareReplay({ bufferSize: 1, refCount: true }) ); return { createJob$, jobDetails$, status$, cancel$ }; @@ -216,45 +228,59 @@ export class JobsManagerService { const cancel$ = new Subject(); const status$ = new BehaviorSubject(job.status); - const jobDetails$ = this._watchJobDetails(job.id, status$, cancel$); + const jobDetails$ = this._watchJobDetails(job.id, status$, cancel$).pipe( + shareReplay({ bufferSize: 1, refCount: true }) + ); return { createJob$, jobDetails$, status$, cancel$ }; } + /** + * Poll the job's details on jobDetailsInterval until unsubscribed. + * Completes when status is not PENDING or IN_PROGRESS + * Errors when FAILED, CANCELLED, TIMED_OUT + * Caller is responsible for reploy + * @param jobId + * @param status$ + * @param cancel$ + * @private + */ private _watchJobDetails( jobId: number, status$: BehaviorSubject, cancel$: Observable ): StartedJob['jobDetails$'] { - status$.next('PENDING'); - return interval(this.jobDetailsInterval).pipe( - // Future: if client is tracking many jobs, it would be more efficient to - // share the query/request for all of them (i.e. switchMap to shared observable), - // but this is simplest for now. - switchMap(() => this.webApi.getJob(jobId).pipe(retryHTTPErrors(3))), + // FUTURE if client is tracking many jobs, it would be more efficient to + // share the query/request for all of them (i.e. switchMap to shared observable), + // but this is simplest for now. + return this.webApi.getJob(jobId).pipe( + // infinite retry + retryHTTPErrors(undefined, 50), // discard extra wrapping object, which has no information. - map(v => v.job), + map(details => details.job), // only emit when job status changes. distinctUntilKeyChanged('status'), // convert job error statuses to thrown errors. - tap(job => { - const s = job.status; + tap(details => { + const s = details.status; status$.next(s); if (s === 'FAILED' || s === 'CANCELLED' || s === 'TIMED_OUT') { - throw new Error(`Job id=${job.id} ${s}`); + throw new Error(`Job id=${details.id} ${s}`); } }), // complete observable when not pending/in-progress; emit the last value takeWhile( - x => x.status === 'PENDING' || x.status === 'IN_PROGRESS', + details => details.status === 'PENDING' || details.status === 'IN_PROGRESS', true // inclusive: emit the first value that fails the predicate ), + repeat({ + delay: this.jobDetailsInterval + }), takeUntil(cancel$), takeUntil(this._reset$), finalize(() => { status$.complete(); - }), - shareReplay(1) + }) ); } } diff --git a/packages/app/src/util/http-util.ts b/packages/app/src/util/http-util.ts index ce911314..32808c06 100644 --- a/packages/app/src/util/http-util.ts +++ b/packages/app/src/util/http-util.ts @@ -105,11 +105,14 @@ function isRetryableHTTPError(err: HttpErrorResponse): boolean { /** * Retry HTTP 5XX errors. * Expects HttpErrorResponse value from HttpClient. - * @param count how many times to retry (default 1) + * @param count how many times to retry (default 1), undefined for infinite retry * @param delayTime delay in milliseconds before retrying (default no delay) * @returns */ -export function retryHTTPErrors(count = 1, delayTime = 0): MonoTypeOperatorFunction { +export function retryHTTPErrors( + count: number | undefined = 1, + delayTime = 0 +): MonoTypeOperatorFunction { // retryWhen deprecated, delay is recommended instead. return retry({ count, diff --git a/packages/app/src/util/rxjs-util.ts b/packages/app/src/util/rxjs-util.ts new file mode 100644 index 00000000..8482a9a4 --- /dev/null +++ b/packages/app/src/util/rxjs-util.ts @@ -0,0 +1,27 @@ +import { MonoTypeOperatorFunction, tap } from 'rxjs'; + +export function tapDebug(name: string): MonoTypeOperatorFunction { + const prefix = `TAP ${name}: `; + let nextCount = 0; + let subCount = 0; + return tap({ + next: v => { + nextCount++; + console.log(prefix + `next count=${nextCount}`, v); + }, + error: err => { + console.error(prefix + 'error', err); + }, + complete: () => { + console.log(prefix + 'complete'); + }, + subscribe: () => { + subCount++; + console.log(prefix + `subscribe subCount=${subCount}`); + }, + unsubscribe: () => { + subCount--; + console.log(prefix + `unsubscribe subCount=${subCount}`); + } + }); +} diff --git a/packages/infra/scripts/connect-efs.sh b/packages/infra/scripts/connect-efs.sh index 1c8bbb85..23d2a0f0 100755 --- a/packages/infra/scripts/connect-efs.sh +++ b/packages/infra/scripts/connect-efs.sh @@ -199,6 +199,24 @@ start_interactive_session() { exec aws ssm start-session --target "$instance_id" } +# TODO add to command arg design, add port forwarding config file +# Start a port forwarding session +start_port_forwarding_session() { + local instance_id=$1 + + log_blue "Starting port forwarding SSM session with instance: $instance_id" + echo "" + + local LOCAL_PORT=25432 + local REMOTE_HOST="" # DB hostname + local REMOTE_PORT=5432 + + exec aws ssm start-session \ + --target "$instance_id" \ + --document-name AWS-StartPortForwardingSessionToRemoteHost \ + --parameters "{\"portNumber\":[\"$REMOTE_PORT\"],\"localPortNumber\":[\"$LOCAL_PORT\"],\"host\":[\"$REMOTE_HOST\"]}" +} + # Main function main() { local stack_name="${1:-}"