From dd716b662aa1149a1987ee117af53f72906d0a9c Mon Sep 17 00:00:00 2001 From: Teoh Han Hui Date: Wed, 3 Feb 2016 15:40:39 +0800 Subject: [PATCH] Add isolateSink and isolateSource to support cyclejs/isolate Shamelessly copied/adapted from cyclejs/cycle-http-driver --- src/index.js | 23 +++++++++++++++++++++++ test/index.js | 46 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/src/index.js b/src/index.js index 991729a..9bd6277 100644 --- a/src/index.js +++ b/src/index.js @@ -24,6 +24,27 @@ function byUrl (response$$, url) { .filter(response$ => getUrl(response$.request) === url) } +function isolateSink (request$, scope) { + return request$.map(request => { + if (typeof request === 'string') { + return { url: request, _namespace: [scope] } + } + request._namespace = request._namespace || [] + request._namespace.push(scope) + return request + }) +} + +function isolateSource (response$$, scope) { + const isolatedResponse$$ = response$$.filter(response$ => + Array.isArray(response$.request._namespace) && + response$.request._namespace.indexOf(scope) !== -1 + ) + isolatedResponse$$.isolateSource = isolateSource + isolatedResponse$$.isolateSink = isolateSink + return isolatedResponse$$ +} + // scheduler option is for testing because Reactive-Extensions/RxJS#976 export function makeFetchDriver (scheduler) { return function fetchDriver (request$) { @@ -42,6 +63,8 @@ export function makeFetchDriver (scheduler) { ) response$$.byKey = byKey.bind(null, response$$) response$$.byUrl = byUrl.bind(null, response$$) + response$$.isolateSource = isolateSource + response$$.isolateSink = isolateSink return response$$ } } diff --git a/test/index.js b/test/index.js index fd1e6bf..d93882c 100644 --- a/test/index.js +++ b/test/index.js @@ -201,6 +201,52 @@ test('byUrl should support input url', t => { ) }) +test('isolateSource and isolateSink should exist on the HTTP Source (response$$)', t => { + setup() + const fetchDriver = makeFetchDriver() + const request$ = new Rx.Subject() + const response$$ = fetchDriver(request$) + t.ok(typeof response$$.isolateSource === 'function') + t.ok(typeof response$$.isolateSink === 'function') + t.end() +}) + +test('isolateSource and isolateSink should exist on a scoped HTTP Source (response$$)', t => { + setup() + const fetchDriver = makeFetchDriver() + const request$ = new Rx.Subject() + const response$$ = fetchDriver(request$) + const scopedRequest$ = response$$.isolateSink(request$, 'foo') + const scopedResponse$$ = response$$.isolateSource(response$$, 'foo') + t.ok(typeof scopedResponse$$.isolateSource === 'function') + t.ok(typeof scopedResponse$$.isolateSink === 'function') + t.end() +}) + +test('isolateSource and isolateSink should hide responses from outside the scope', t => { + setup() + const fetchDriver = makeFetchDriver() + const proxyRequest$ = new Rx.Subject() + const response$$ = fetchDriver(proxyRequest$) + const request1 = 'http://api.test/resource1' + const request2 = 'http://api.test/resource2' + const ignoredRequest$ = Rx.Observable.just(request1) + const request$ = Rx.Observable.just(request2).delay(10) + const scopedRequest$ = response$$.isolateSink(request$, 'foo') + const scopedResponse$$ = response$$.isolateSource(response$$, 'foo') + scopedResponse$$.subscribe(response$ => { + t.equal(typeof response$.request, 'object') + t.equal(response$.request.url, request2) + response$.subscribe(response => { + t.equal(response.status, 200) + t.equal(response.data, 'resource2') + t.end() + }) + }) + Rx.Observable.merge(ignoredRequest$, scopedRequest$) + .subscribe(proxyRequest$.asObserver()) +}) + test('after', t => { global.fetch = originalFetch t.end()