Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,27 @@ function byUrl (response$$, url) {
.filter(response$ => getUrl(response$.request) === url)
}

function isolateSink (request$, scope) {
return request$.map(request => {
if (typeof request === 'string') {
return { url: request, _namespace: [scope] }
}
request._namespace = request._namespace || []
request._namespace.push(scope)
return request
})
}

function isolateSource (response$$, scope) {
const isolatedResponse$$ = response$$.filter(response$ =>
Array.isArray(response$.request._namespace) &&
response$.request._namespace.indexOf(scope) !== -1
)
isolatedResponse$$.isolateSource = isolateSource
isolatedResponse$$.isolateSink = isolateSink
return isolatedResponse$$
}

// scheduler option is for testing because Reactive-Extensions/RxJS#976
export function makeFetchDriver (scheduler) {
return function fetchDriver (request$) {
Expand All @@ -42,6 +63,8 @@ export function makeFetchDriver (scheduler) {
)
response$$.byKey = byKey.bind(null, response$$)
response$$.byUrl = byUrl.bind(null, response$$)
response$$.isolateSource = isolateSource
response$$.isolateSink = isolateSink
return response$$
}
}
46 changes: 46 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,52 @@ test('byUrl should support input url', t => {
)
})

test('isolateSource and isolateSink should exist on the HTTP Source (response$$)', t => {
setup()
const fetchDriver = makeFetchDriver()
const request$ = new Rx.Subject()
const response$$ = fetchDriver(request$)
t.ok(typeof response$$.isolateSource === 'function')
t.ok(typeof response$$.isolateSink === 'function')
t.end()
})

test('isolateSource and isolateSink should exist on a scoped HTTP Source (response$$)', t => {
setup()
const fetchDriver = makeFetchDriver()
const request$ = new Rx.Subject()
const response$$ = fetchDriver(request$)
const scopedRequest$ = response$$.isolateSink(request$, 'foo')
const scopedResponse$$ = response$$.isolateSource(response$$, 'foo')
t.ok(typeof scopedResponse$$.isolateSource === 'function')
t.ok(typeof scopedResponse$$.isolateSink === 'function')
t.end()
})

test('isolateSource and isolateSink should hide responses from outside the scope', t => {
setup()
const fetchDriver = makeFetchDriver()
const proxyRequest$ = new Rx.Subject()
const response$$ = fetchDriver(proxyRequest$)
const request1 = 'http://api.test/resource1'
const request2 = 'http://api.test/resource2'
const ignoredRequest$ = Rx.Observable.just(request1)
const request$ = Rx.Observable.just(request2).delay(10)
const scopedRequest$ = response$$.isolateSink(request$, 'foo')
const scopedResponse$$ = response$$.isolateSource(response$$, 'foo')
scopedResponse$$.subscribe(response$ => {
t.equal(typeof response$.request, 'object')
t.equal(response$.request.url, request2)
response$.subscribe(response => {
t.equal(response.status, 200)
t.equal(response.data, 'resource2')
t.end()
})
})
Rx.Observable.merge(ignoredRequest$, scopedRequest$)
.subscribe(proxyRequest$.asObserver())
})

test('after', t => {
global.fetch = originalFetch
t.end()
Expand Down