Skip to content
This repository was archived by the owner on Aug 27, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 18 additions & 30 deletions src/eval/StateIPCClient.ml
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,23 @@ let ipcclient_exn_wrapper thunk =
fail0 ~kind:(Printf.sprintf "StateIPCClient: Unexpected error making JSON-RPC call: %s" e)
?inst:None

type state = { mutable client : Ezcurl.t option };;
let current_state = { client = None };;
type state = { mutable channels : (string, (Core.In_channel.t * Core.Out_channel.t)) Caml.Hashtbl.t }
let current_state = { channels = Caml.Hashtbl.create 64 }

let http_rpc ~socket_addr (call : Rpc.call) : Rpc.response M.t =
let client = match current_state.client with
| Some c -> c
let socket_rpc ~socket_addr (call : Rpc.call) : Rpc.response M.t =
let (ic, oc) = match (Caml.Hashtbl.find_opt current_state.channels socket_addr) with
| Some cs -> cs
| None ->
let c = Ezcurl.make () in
current_state.client <- Some c;
c
let cs = Core_unix.open_connection (Core_unix.ADDR_UNIX socket_addr) in
Caml.Hashtbl.replace current_state.channels socket_addr cs;
cs
in
let msg_buf = Jsonrpc.string_of_call ~version:Jsonrpc.V2 call in
DebugMessage.plog (Printf.sprintf "Sending: %s\n" msg_buf);
let exception Http_error of string in
let response =
match Ezcurl.post ~client ~headers:["content-type", "application/json"] ~content:(`String msg_buf) ~params:[] ~url:socket_addr () with
| Ok response -> response
| Error (_, err) -> (
DebugMessage.plog (Printf.sprintf "error calling RPC: %s" err);
raise (Http_error (Printf.sprintf "error calling RPC: %s" err))
)
in

let response = if response.code = 200 then response.body else (
DebugMessage.plog (Printf.sprintf "error response from RPC: code: %d, body: %s" response.code response.body);
raise (Http_error "error response from RPC")
)
in

(* Send data to the socket. *)
let _ = send_delimited oc msg_buf in
(* Get response. *)
let response = Caml.input_line ic in
DebugMessage.plog (Printf.sprintf "Response: %s\n" response);
M.return @@ Jsonrpc.response_of_string response

Expand Down Expand Up @@ -184,7 +172,7 @@ let fetch ~socket_addr ~fname ~keys ~tp =
let%bind q' = encode_serialized_query q in
let%bind res =
let thunk () =
translate_res @@ IPCClient.fetch_state_value (http_rpc ~socket_addr) q'
translate_res @@ IPCClient.fetch_state_value (socket_rpc ~socket_addr) q'
in
ipcclient_exn_wrapper thunk
in
Expand Down Expand Up @@ -227,7 +215,7 @@ let external_fetch ~socket_addr ~caddr ~fname ~keys ~ignoreval =
let%bind res =
let thunk () =
translate_res
@@ IPCClient.fetch_ext_state_value (http_rpc ~socket_addr) caddr q'
@@ IPCClient.fetch_ext_state_value (socket_rpc ~socket_addr) caddr q'
in
ipcclient_exn_wrapper thunk
in
Expand Down Expand Up @@ -263,7 +251,7 @@ let update ~socket_addr ~fname ~keys ~value ~tp =
let%bind () =
let thunk () =
translate_res
@@ IPCClient.update_state_value (http_rpc ~socket_addr) q' value'
@@ IPCClient.update_state_value (socket_rpc ~socket_addr) q' value'
in
ipcclient_exn_wrapper thunk
in
Expand All @@ -283,7 +271,7 @@ let is_member ~socket_addr ~fname ~keys ~tp =
let%bind q' = encode_serialized_query q in
let%bind res =
let thunk () =
translate_res @@ IPCClient.fetch_state_value (http_rpc ~socket_addr) q'
translate_res @@ IPCClient.fetch_state_value (socket_rpc ~socket_addr) q'
in
ipcclient_exn_wrapper thunk
in
Expand All @@ -306,7 +294,7 @@ let remove ~socket_addr ~fname ~keys ~tp =
let%bind () =
let thunk () =
translate_res
@@ IPCClient.update_state_value (http_rpc ~socket_addr) q' dummy_val
@@ IPCClient.update_state_value (socket_rpc ~socket_addr) q' dummy_val
in
ipcclient_exn_wrapper thunk
in
Expand All @@ -320,7 +308,7 @@ let fetch_bcinfo ~socket_addr ~query_name ~query_args =
let%bind res =
let thunk () =
translate_res
@@ IPCClient.fetch_bcinfo (http_rpc ~socket_addr) query_name query_args
@@ IPCClient.fetch_bcinfo (socket_rpc ~socket_addr) query_name query_args
in
ipcclient_exn_wrapper thunk
in
Expand Down