@@ -72,13 +72,16 @@ module PlaceOS
7272 # Caching
7373 # Background task to clear module metadata caches
7474 private getter cache_lock = Mutex .new
75+
76+ private getter write_channel = Channel (String ).new
77+
7578 private getter cache_timeout : Time ::Span = 10 .minutes
7679
7780 @metadata_cache = {} of String => Driver ::DriverModel ::Metadata
7881 @module_id_cache = {} of String => String
7982 @cache_cleaner : Tasker ::Task ?
8083
81- getter ws : HTTP ::WebSocket
84+ private getter ws : HTTP ::WebSocket
8285
8386 def initialize (
8487 @ws : HTTP ::WebSocket ,
@@ -107,8 +110,10 @@ module PlaceOS
107110 Driver ::Proxy ::RemoteDriver ::Clearance ::User
108111 end
109112
113+ # Perform writes
114+ spawn(name: " socket_writes_#{ request_id } " , same_thread: true ) { run_writes }
110115 # Begin clearing cache
111- spawn(name: " cache_cleaner " , same_thread: true ) { cache_plumbing }
116+ spawn(name: " cache_cleaner_ #{ request_id } " , same_thread: true ) { cache_plumbing }
112117 end
113118
114119 # Websocket API Messages
@@ -177,7 +182,9 @@ module PlaceOS
177182 @[JSON ::Field (key: " msg" )]
178183 getter message : String ?
179184
185+ @[JSON ::Field (converter: String ::RawConverter )]
180186 getter value : String ?
187+
181188 getter meta : Metadata ?
182189
183190 @[JSON ::Field (key: " mod" )]
@@ -246,8 +253,8 @@ module PlaceOS
246253 index: index,
247254 name: name,
248255 },
249- value: " %{} " ,
250- ), response )
256+ value: response ,
257+ ))
251258 rescue e : Driver ::Proxy ::RemoteDriver ::Error
252259 respond(error_response(request_id, e.error_code, e.message))
253260 rescue e
@@ -509,7 +516,7 @@ module PlaceOS
509516 # Parse an update from a subscription and pass to listener
510517 #
511518 def notify_update (value, request_id, system_id, module_name, index, status)
512- response = Response .new(
519+ respond( Response .new(
513520 id: request_id,
514521 type: Response ::Type ::Notify ,
515522 value: value,
@@ -518,15 +525,18 @@ module PlaceOS
518525 mod: module_name,
519526 index: index,
520527 name: status,
521- },
522- )
523- respond(response)
528+ }
529+ ))
530+ end
531+
532+ protected def write (data )
533+ write_channel.send(data)
524534 end
525535
526536 # Request handler
527537 #
528538 protected def on_message (data )
529- return @ws .send (" pong" ) if data == " ping"
539+ return write (" pong" ) if data == " ping"
530540
531541 # Execute the request
532542 request = parse_request(data)
@@ -581,7 +591,7 @@ module PlaceOS
581591 rescue e
582592 Log .warn { {message: " failed to parse" , data: data} }
583593 error_response(JSON .parse(data)[" id" ]?.try & .as_i64, ErrorCode ::BadRequest , " bad request: #{ e.message } " )
584- return
594+ nil
585595 end
586596
587597 protected def error_response (
@@ -609,18 +619,17 @@ module PlaceOS
609619 end
610620 end
611621
612- protected def respond (response : Response , payload = nil )
613- return if @ws .closed?
614-
615- if payload
616- # Avoids parsing and serialising when payload is already in JSON format
617- partial = response.to_json
618- @ws .send(partial.sub(%( "%{}") , payload))
619- else
620- @ws .send(response.to_json)
622+ private def run_writes
623+ while data = write_channel.receive?
624+ ws.send(data)
621625 end
622626 end
623627
628+ protected def respond (response : Response )
629+ return if @ws .closed?
630+ write(response.to_json)
631+ end
632+
624633 # Delegate request to correct handler
625634 #
626635 protected def __send__ (request : Request )
0 commit comments