Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 84 additions & 36 deletions Sources/Nes/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public enum ConnectionStatus {

public class Client: NSObject {
public private(set) var connectionStatus = CurrentValueSubject<ConnectionStatus, Never>(.disconnected)

private let subject = PassthroughSubject<PubMessage, NesError>()
private var operationQueue = OperationQueue()
private var urlSession: URLSession!
Expand All @@ -21,7 +21,7 @@ public class Client: NSObject {
private var cancellables: Set<AnyCancellable> = []
private var pendingRequests: Dictionary<NesID, (Result<Data, NesError>) -> Void> = [:]
private var fetchAuth: FetchAuthHeadersFuture? = nil

typealias MessageCallback<Message> = (_ message: Message) -> ()
public typealias FetchAuthHeadersFuture = () -> Future<[String:String], Error>

Expand All @@ -30,7 +30,7 @@ public class Client: NSObject {
urlSession = URLSession(configuration: .default, delegate: self, delegateQueue: operationQueue)
webSocketTask = urlSession.webSocketTask(with: url)
}

deinit {
disconnect()
}
Expand All @@ -41,9 +41,9 @@ public class Client: NSObject {
}

webSocketTask.resume()

connectionStatus.send(.connecting)

return connectionStatus
.first(where: { status in
switch (status) {
Expand All @@ -56,17 +56,17 @@ public class Client: NSObject {
.setFailureType(to: NesError.self)
.timeout(.seconds(30), scheduler: DispatchQueue.main, options: nil) { [self] () in
connectionStatus.send(.disconnected)
return NesError(message: "timed out connecting")
return .timedOut(message: "timed out connecting")
}
.map { _ in
return self
}
.eraseToAnyPublisher()
}

public func disconnect(err: Error? = nil) {
connectionStatus.send(.disconnected)
subject.send(completion: err.map { _ in Subscribers.Completion.failure(NesError(message: err?.localizedDescription ?? "")) } ?? .finished)
subject.send(completion: err.map { Subscribers.Completion.failure(NesError(error: $0)) } ?? .finished)
subscriptions.forEach(unsubscribe)
webSocketTask.cancel(with: .goingAway, reason: nil)
cancellables.forEach { $0.cancel() }
Expand All @@ -85,10 +85,10 @@ public class Client: NSObject {

if case .connected = connectionStatus.value {
webSocketTask.send(.data(data), completionHandler: { _ in

})
}

return subject
.filter { pubMessage in
return pubMessage.path.compare(path, options: .caseInsensitive) == .orderedSame
Expand All @@ -97,10 +97,20 @@ public class Client: NSObject {
let pub = try! JSONDecoder().decode(PubMessageContent<Message>.self, from: pubMessage.content)
return pub.message
}
.mapError { NesError(message: $0.localizedDescription) }
.mapError {
let error = NesError(error: $0)
switch error {
case .connectionAborted, .connectionReset:
self.connectionStatus.send(.disconnected)
default:
break
}

return error
}
.eraseToAnyPublisher()
}

public func request<RequestPayload: Encodable, ResponsePayload: Decodable>(
method: HTTPMethod,
path: String,
Expand All @@ -111,7 +121,7 @@ public class Client: NSObject {
let clientRequest = ClientRequest(method: method, path: path, payload: payload, headers: headers)
return request(clientRequest, for: type)
}

public func request<ResponsePayload: Decodable>(
method: HTTPMethod,
path: String,
Expand All @@ -121,7 +131,7 @@ public class Client: NSObject {
let clientRequest = ClientRequest(method: method, path: path, headers: headers)
return request(clientRequest, for: type)
}

func request<RequestPayload: Encodable, ResponsePayload: Decodable>(
_ clientRequest: ClientRequest<RequestPayload>,
for type: ResponsePayload.Type
Expand All @@ -131,17 +141,24 @@ public class Client: NSObject {
return Future<Data, NesError> { [self] promise in
pendingRequests[clientRequest.id] = promise
}
.map { requestResponse in
let response = try! JSONDecoder().decode(RequestResponse<ResponsePayload>.self, from: requestResponse)
return response.payload
.tryMap { requestResponse in
do {
let response = try JSONDecoder().decode(RequestResponse<ResponsePayload>.self, from: requestResponse)
return response.payload
} catch {
throw NesError.decodingFailed(message: error.localizedDescription)
}
}
.mapError { error -> NesError in
(error as? NesError) ?? NesError(error: error)
}
.timeout(.seconds(30), scheduler: DispatchQueue.main, options: nil) { [self] () in
pendingRequests.removeValue(forKey: clientRequest.id)
return NesError(message: "Timed out waiting for response")
return .timedOut(message: "Timed out waiting for response")
}
.eraseToAnyPublisher()
}

public func unsubscribe(_ path: String) {
let id = NesID(string: UUID().uuidString)
let outgoingMessage = ClientUnsub(id: id, path: path)
Expand All @@ -150,13 +167,22 @@ public class Client: NSObject {
subscriptions.remove(path)
webSocketTask.send(.data(data)) { _ in }
}

func readNextMessage() {
webSocketTask.receive { result in
switch(result) {
case .failure(let error):
self.subject.send(completion: .failure(NesError(message: error.localizedDescription)))
let nesError = NesError(error: error)
switch nesError {
case .connectionAborted, .connectionReset:
self.connectionStatus.send(.disconnected)
default:
break
}

self.subject.send(completion: .failure(NesError(error: nesError)))
case .success(.data(let data)):
let string = String(data: data, encoding: .utf8)!
self.parseData(data)
self.readNextMessage()
case .success(.string(let string)):
Expand All @@ -167,15 +193,15 @@ public class Client: NSObject {
}
}
}

func parseData(_ data: Data) {
guard let message = try? JSONDecoder().decode(IncomingMessage.self, from: data) else {
subject.send(completion: .failure(NesError(message: "Error")))
subject.send(completion: .failure(.general(message: "Error")))
return
}

switch message {
case .hello(let hello):
case .hello:
connectionStatus.send(.connected)
case .ping:
let pong = ClientPing(id: NesID(string: UUID().uuidString))
Expand Down Expand Up @@ -206,7 +232,7 @@ public class Client: NSObject {
subscriptions.remove(revoke.path)
}
}

func send<T: OutgoingMessage>(message: T) {
let data = try! JSONEncoder().encode(message)
let message = URLSessionWebSocketTask.Message.data(data)
Expand All @@ -216,7 +242,7 @@ public class Client: NSObject {
}
}
}

private func authenticate() -> AnyPublisher<AuthHeader?, Error> {
switch self.fetchAuth {
case nil:
Expand All @@ -225,7 +251,7 @@ public class Client: NSObject {
return auth().map { AuthHeader(headers: $0) }.eraseToAnyPublisher()
}
}

func sendHello() {
let id = NesID(string: UUID().uuidString)
let subscriptions = self.subscriptions
Expand All @@ -244,26 +270,26 @@ public class Client: NSObject {
})
.store(in: &cancellables)
}

struct PubMessage {
let path: String
let content: Data

init(path: String, content: String) {
self.path = path
self.content = Data(content.utf8)
}

init(path: String, content: Data) {
self.path = path
self.content = content
}
}

struct PubMessageContent<Message: Decodable>: Decodable {
let message: Message
}

struct RequestResponse<ResponsePayload: Decodable>: Decodable {
let id: NesID
let statusCode: Int
Expand All @@ -276,12 +302,34 @@ extension Client: URLSessionWebSocketDelegate {
public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) {
sendHello()
}

public func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
connectionStatus.send(.disconnected)
}
}

public struct NesError: Error {
let message: String
public enum NesError: Error {
case connectionAborted(message: String)
case connectionReset(message: String)
case timedOut(message: String)
case decodingFailed(message: String)
case general(message: String)

init(error: Error) {
let nsError = error as NSError
if nsError.domain == NSPOSIXErrorDomain {
switch nsError.code {
case 53:
self = .connectionAborted(message: nsError.localizedDescription)
case 54:
self = .connectionReset(message: nsError.localizedDescription)
case 60:
self = .timedOut(message: nsError.localizedDescription)
default:
self = .general(message: nsError.localizedDescription)
}
} else {
self = .general(message: error.localizedDescription)
}
}
}
Loading