From 47c8a406939273e837f0c07cadd5db95ec17c132 Mon Sep 17 00:00:00 2001 From: Pascal Knoth Date: Thu, 5 Sep 2024 19:33:23 +0200 Subject: [PATCH 1/3] [gateway] WIP handler refactoring with a tcp server --- .../lib/boruta_gateway/application.ex | 28 +- .../lib/boruta_gateway/gateway.ex | 344 ++++++++++++++++++ .../boruta_gateway/gateway/authorization.ex | 40 ++ .../integration/requests_test.exs | 242 +----------- 4 files changed, 400 insertions(+), 254 deletions(-) create mode 100644 apps/boruta_gateway/lib/boruta_gateway/gateway.ex create mode 100644 apps/boruta_gateway/lib/boruta_gateway/gateway/authorization.ex diff --git a/apps/boruta_gateway/lib/boruta_gateway/application.ex b/apps/boruta_gateway/lib/boruta_gateway/application.ex index 925537211..f1f1270eb 100644 --- a/apps/boruta_gateway/lib/boruta_gateway/application.ex +++ b/apps/boruta_gateway/lib/boruta_gateway/application.ex @@ -27,11 +27,11 @@ defmodule BorutaGateway.Application do [ %{ start: - {BorutaGateway.Server, :start_link, + {BorutaGateway.Gateway, :start, [ [ port: Application.fetch_env!(:boruta_gateway, :port), - router: BorutaGateway.Router + num_acceptors: 10 ] ]}, id: :server @@ -43,30 +43,8 @@ defmodule BorutaGateway.Application do children end - children = - case Application.get_env(:boruta_gateway, :sidecar_server) do - true -> - [ - %{ - start: - {BorutaGateway.Server, :start_link, - [ - [ - port: Application.fetch_env!(:boruta_gateway, :sidecar_port), - router: BorutaGateway.SidecarRouter - ] - ]}, - id: :sidecar_server - } - | children - ] - - _ -> - children - end - setup_database() - Supervisor.start_link(children, strategy: :one_for_one, name: BorutaGateway.Supervisor) + Supervisor.start_link(children, strategy: :one_for_one, name: BorutaGateway.Supervisor, shutdown: 5_000) end def setup_database do diff --git a/apps/boruta_gateway/lib/boruta_gateway/gateway.ex b/apps/boruta_gateway/lib/boruta_gateway/gateway.ex new file mode 100644 index 000000000..147f720e4 --- /dev/null +++ b/apps/boruta_gateway/lib/boruta_gateway/gateway.ex @@ -0,0 +1,344 @@ +defmodule BorutaGateway.Gateway do + @moduledoc false + + defmodule Token do + @moduledoc false + + use Joken.Config + + def token_config, do: %{} + end + + use GenServer + + alias BorutaGateway.Gateway.Authorization + alias BorutaGateway.Upstreams + alias BorutaGateway.Upstreams.Upstream + + @connect_timeout 5_000 + + def start(args) do + {:ok, listen_socket} = + :gen_tcp.listen(args[:port], [{:packet, :raw}, :binary, {:active, false}]) + + children = + Enum.map(1..args[:num_acceptors], fn i -> + Supervisor.child_spec({__MODULE__, [listen_socket: listen_socket]}, + id: :"http_proxy_server_acceptor_#{i}" + ) + end) + + # TODO monitor children + with {:ok, pid} <- Supervisor.start_link(children, strategy: :one_for_one, name: __MODULE__) do + spawn(fn -> + Process.monitor(pid) + Process.flag(:trap_exit, true) + receive do + _ -> + :gen_tcp.close(listen_socket) + end + end) + {:ok, pid} + end + end + + defmodule State do + @moduledoc false + + defstruct [:listen_socket, :socket, :client_socket, :start, :request, :response] + end + + def start_link(args) do + GenServer.start_link(__MODULE__, args) + end + + @impl GenServer + def init(args) do + Process.flag(:trap_exit, true) + + send(self(), :accept) + + {:ok, %State{listen_socket: args[:listen_socket]}} + end + + @impl GenServer + def handle_info({:EXIT, _pid, reason}, state) do + {:stop, reason, state} + end + + def handle_info(:accept, state) do + {:ok, socket} = :gen_tcp.accept(state.listen_socket) + :inet.setopts(socket, active: :once) + + {:noreply, %{state | socket: socket, client_socket: nil}} + end + + def handle_info({:tcp_closed, socket}, %State{socket: socket} = state) do + {:ok, socket} = :gen_tcp.accept(state.listen_socket) + :inet.setopts(socket, active: :once) + + {:noreply, %{state | socket: socket, client_socket: nil}} + end + + def handle_info({:tcp_closed, socket}, %State{client_socket: socket} = state) do + :gen_tcp.shutdown(state.socket, :write) + + {:noreply, state} + end + + def handle_info({:ssl_closed, socket}, %State{client_socket: socket} = state) do + :gen_tcp.shutdown(state.socket, :write) + + {:noreply, state} + end + + def handle_info({:tcp, socket, payload}, %State{socket: socket, client_socket: nil} = state) do + start = :os.system_time(:microsecond) + [_, method, path] = Regex.run(~r{^(GET|POST|PUT|PATCH|OPTIONS|DELETE) ([^\s]+)}, payload) + # [_, host] = Regex.run(~r{[H|h]ost\: ([^\r]+)}, payload) + + path_info = String.split(path, "/", trim: true) + + with %Upstream{} = upstream <- Upstreams.match(path_info) , + {:ok, token} <- Authorization.authorize(payload, method, upstream) do + _connect = :os.system_time(:microsecond) - start + case upstream.scheme do + "http" -> + case :gen_tcp.connect( + upstream.host |> String.to_charlist(), + upstream.port, + [:binary, {:packet, :raw}, {:active, false}], + @connect_timeout + ) do + {:ok, client_socket} -> + _connected = :os.system_time(:microsecond) - start + :ok = :gen_tcp.send(client_socket, transform_header(payload, upstream, token)) + + # :inet.setopts(socket, active: :once) + :inet.setopts(client_socket, active: :once) + {:noreply, %{state | client_socket: client_socket, start: start}} + + {:error, _error} -> + :gen_tcp.send(socket, "HTTP/1.1 503 Service Unavailable\r\n\r\n") + :gen_tcp.shutdown(socket, :write) + + {:noreply, state} + end + + "https" -> + case :ssl.connect( + upstream.host |> String.to_charlist(), + upstream.port, + [ + :binary, + {:packet, :raw}, + {:active, false}, + {:verify, :verify_peer}, + {:cacerts, :public_key.cacerts_get()} + ], + @connect_timeout + ) do + {:ok, client_socket} -> + _connected = :os.system_time(:microsecond) - start + :ok = :ssl.send(client_socket, transform_header(payload, upstream, token)) + + :inet.setopts(socket, active: :once) + :ssl.setopts(client_socket, active: :once) + {:noreply, %{state | client_socket: client_socket, start: start}} + + {:error, _error} -> + :gen_tcp.send(socket, "HTTP/1.1 503 Service Unavailable\r\n\r\n") + :gen_tcp.shutdown(socket, :write) + + {:noreply, state} + end + end + else + # TODO close client socket in case of failure + nil -> + response = "No upstream has been found corresponding to the given request." + :gen_tcp.send( + socket, + "HTTP/1.1 404 Not Found\r\n" <> + "Content-Length: 62\r\n\r\n" <> + response + ) + + _response = :os.system_time(:microsecond) - start + :gen_tcp.shutdown(socket, :write) + + {:noreply, state} + + {:unauthorized, content_type, response} -> + :gen_tcp.send( + socket, + "HTTP/1.1 401 Unauthorized\r\n" <> + "Content-Type: #{content_type}\r\n" <> + "Content-Length: #{byte_size(response)}\r\n\r\n" <> + response + ) + + _response = :os.system_time(:microsecond) - start + :gen_tcp.shutdown(socket, :write) + + {:noreply, state} + + {:forbidden, content_type, response} -> + :gen_tcp.send( + socket, + "HTTP/1.1 403 Forbidden\r\n" <> + "Content-Type: #{content_type}\r\n" <> + "Content-Length: #{byte_size(response)}\r\n\r\n" <> + response + ) + + _response = :os.system_time(:microsecond) - start + :gen_tcp.shutdown(socket, :write) + + {:noreply, state} + end + end + + def handle_info({:tcp, socket, payload}, %State{client_socket: socket} = state) do + # TODO clean response headers (connection, strict-transport-security) + # clean_response_headers(payload) + :gen_tcp.send(state.socket, payload) + case message_complete?(payload) do + true -> + _response = :os.system_time(:microsecond) - state.start + :gen_tcp.shutdown(state.socket, :write) + false -> + :inet.setopts(socket, active: :once) + end + {:noreply, state} + end + + def handle_info({:ssl, socket, payload}, %State{client_socket: socket} = state) do + # TODO clean response headers (connection, strict-transport-security) + # clean_response_headers(payload) + :gen_tcp.send(state.socket, payload) + case message_complete?(payload) do + true -> + _response = :os.system_time(:microsecond) - state.start + :gen_tcp.shutdown(state.socket, :write) + false -> + :ssl.setopts(socket, active: :once) + end + + {:noreply, %{state | response: payload}} + end + + def handle_info({:tcp, socket, payload}, %State{socket: socket} = state) do + :inet.setopts(socket, active: :once) + case state.client_socket do + {:sslsocket, _, _} -> + :ssl.send(state.client_socket, payload) + _ -> + :gen_tcp.send(state.client_socket, payload) + end + {:noreply, state} + end + + def handle_info(_info, state) do + {:noreply, state} + end + + @impl GenServer + def terminate(reason, _state) do + {:stop, reason} + end + + defp transform_header(payload, upstream, nil) do + [_, _method, path] = + Regex.run(~r{^(GET|POST|PUT|PATCH|OPTIONS|DELETE) ([^\s]+)}, payload) + upstream_path = + case upstream do + %Upstream{strip_uri: true, uris: uris} -> + + Enum.reduce(uris, path, fn + "/", _ -> + path + uri, path -> + String.replace_prefix(path, uri, "") + end) + + %Upstream{strip_uri: false} -> + path + end + + # TODO clean request headers + # (x-forwarded-authorization, connection, content-length, expect, host, keep-alive, transfer-encoding, upgrade) + # clean_request_headers(payload) + + [_, _method, path] = Regex.run(~r{^(GET|POST|PUT|PATCH|OPTIONS|DELETE) ([^\s]+)}, payload) + [_, host] = Regex.run(~r{[H|h]ost\: ([^\r]+)}, payload) + payload = String.replace(payload, path, upstream_path) + String.replace(payload, host, upstream.host) + end + + defp transform_header(payload, upstream, token) do + claims = %{ + "scope" => token.scope, + "sub" => token.sub, + "value" => token.value, + "exp" => token.expires_at, + "client_id" => token.client && token.client.id, + "iat" => token.inserted_at && DateTime.to_unix(token.inserted_at) + } + + jwt = + with %Joken.Signer{} = signer <- signer(upstream), + {:ok, jwt, _claims} <- Token.encode_and_sign(claims, signer) do + jwt + else + _ -> nil + end + + payload = Regex.replace( + ~r{[A|a]uthorization: ([^\r]+)\r\n}, + payload, + "Authorization: bearer #{jwt}\r\nX-Forwarded-Authorization: \\1\r\n" + ) + + transform_header(payload, upstream, nil) + end + + def signer( + %Upstream{ + forwarded_token_signature_alg: signature_alg, + forwarded_token_secret: secret, + forwarded_token_private_key: private_key + } = upstream + ) do + case signature_alg && signature_type(upstream) do + :symmetric -> + Joken.Signer.create(signature_alg, secret) + + :asymmetric -> + Joken.Signer.create(signature_alg, %{"pem" => private_key}) + + nil -> + nil + end + end + + defp signature_type(%Upstream{forwarded_token_signature_alg: signature_alg}) do + case signature_alg && String.match?(signature_alg, ~r/HS/) do + true -> :symmetric + false -> :asymmetric + nil -> nil + end + end + + defp message_complete?(payload) do + case String.split(payload, "\r\n\r\n") do + [_header] -> true + [header|body] -> + [_, content_length] = Regex.run(~r{[C|c]ontent-[L|l]ength\: ([^\r]+)}, header) + + body_length = body |> Enum.join("\r\n\r\n") |> byte_size() + + body_length == String.to_integer(content_length) + end + end +end diff --git a/apps/boruta_gateway/lib/boruta_gateway/gateway/authorization.ex b/apps/boruta_gateway/lib/boruta_gateway/gateway/authorization.ex new file mode 100644 index 000000000..3b46e4a5f --- /dev/null +++ b/apps/boruta_gateway/lib/boruta_gateway/gateway/authorization.ex @@ -0,0 +1,40 @@ +defmodule BorutaGateway.Gateway.Authorization do + @moduledoc false + + alias Boruta.Oauth + alias Boruta.Oauth.Scope + alias Boruta.Oauth.Token + alias BorutaGateway.Upstreams.Upstream + + def authorize(_authorization_header, _method, %Upstream{authorize: false}) do + {:ok, nil} + end + + def authorize(payload, method, upstream) do + with [_, authorization_header] <- Regex.run(~r{[A|a]uthorization\: ([^\r]+)}, payload), + [_header, value] <- Regex.run(~r/[B|b]earer (.+)/, authorization_header), + {:ok, %Token{scope: scope} = token} <- Oauth.Authorization.AccessToken.authorize(value: value), + {:ok, _} <- validate_scopes(scope, upstream.required_scopes, method) do + {:ok, token} + else + {:error, "required scopes are not present."} -> + {:forbidden, upstream.error_content_type, upstream.forbidden_response} + + _error -> + {:unauthorized, upstream.error_content_type, upstream.unauthorized_response} + end + end + + defp validate_scopes(_scope, required_scopes, _method) when required_scopes == %{}, + do: {:ok, []} + + defp validate_scopes(scope, required_scopes, method) do + scopes = Scope.split(scope) + default_scopes = Map.get(required_scopes, "*", [:not_authorized]) + + case Enum.empty?(Map.get(required_scopes, method, default_scopes) -- scopes) do + true -> {:ok, scopes} + false -> {:error, "required scopes are not present."} + end + end +end diff --git a/apps/boruta_gateway/test/boruta_gateway/integration/requests_test.exs b/apps/boruta_gateway/test/boruta_gateway/integration/requests_test.exs index 84a380f64..1148135b8 100644 --- a/apps/boruta_gateway/test/boruta_gateway/integration/requests_test.exs +++ b/apps/boruta_gateway/test/boruta_gateway/integration/requests_test.exs @@ -1,5 +1,5 @@ defmodule BorutaGateway.RequestsIntegrationTest do - use ExUnit.Case + use ExUnit.Case, async: false use Plug.Test use BorutaGateway.DataCase @@ -7,6 +7,7 @@ defmodule BorutaGateway.RequestsIntegrationTest do alias Boruta.ClientsAdapter alias Boruta.Ecto.Admin alias BorutaGateway.ConfigurationLoader + alias BorutaGateway.Gateway alias BorutaGateway.Repo alias BorutaGateway.RequestsIntegrationTest.HttpClient alias BorutaGateway.Upstreams @@ -20,7 +21,6 @@ defmodule BorutaGateway.RequestsIntegrationTest do :ok end - @tag :skip describe "requests" do setup do {:ok, %Boruta.Ecto.Client{id: client_id}} = Admin.create_client(%{}) @@ -95,7 +95,7 @@ defmodule BorutaGateway.RequestsIntegrationTest do request = Finch.build(:get, "http://localhost:7777/unauthorized", [], "") assert {:ok, %Finch.Response{body: body, headers: headers, status: 401}} = - Finch.request(request, HttpClient) + Finch.request(request, HttpClient) assert body == upstream.unauthorized_response @@ -164,9 +164,9 @@ defmodule BorutaGateway.RequestsIntegrationTest do Sandbox.unboxed_run(Repo, fn -> try do Upstreams.create_upstream(%{ - scheme: "http", + scheme: "https", host: "httpbin.patatoid.fr", - port: 80, + port: 443, uris: ["/httpbin"], strip_uri: true, authorize: true, @@ -198,9 +198,9 @@ defmodule BorutaGateway.RequestsIntegrationTest do Sandbox.unboxed_run(Repo, fn -> try do Upstreams.create_upstream(%{ - scheme: "http", + scheme: "https", host: "httpbin.patatoid.fr", - port: 80, + port: 443, uris: ["/"], strip_uri: true, authorize: true, @@ -235,9 +235,9 @@ defmodule BorutaGateway.RequestsIntegrationTest do try do {:ok, upstream} = Upstreams.create_upstream(%{ - scheme: "http", + scheme: "https", host: "httpbin.patatoid.fr", - port: 80, + port: 443, uris: ["/httpbin"], strip_uri: true, authorize: true, @@ -260,14 +260,14 @@ defmodule BorutaGateway.RequestsIntegrationTest do assert %{ "headers" => %{ - "Authorization" => forwarded_authorization, - "X-Forwarded-Authorization" => authorization + "Authorization" => authorization, + "X-Forwarded-Authorization" => forwarded_authorization } } = Jason.decode!(body) assert [_authorization_header, token] = Regex.run(~r/bearer (.+)/, authorization) - signer = Client.signer(upstream) - assert {:ok, claims} = Client.Token.verify(token, signer) + signer = Gateway.signer(upstream) + assert {:ok, claims} = Gateway.Token.verify(token, signer) assert claims["client_id"] == access_token.client.id assert claims["value"] == access_token.value @@ -279,7 +279,6 @@ defmodule BorutaGateway.RequestsIntegrationTest do end end - @tag :skip describe "requests (from configuration file)" do setup do {:ok, %Boruta.Ecto.Client{id: client_id}} = Admin.create_client(%{}) @@ -464,219 +463,4 @@ defmodule BorutaGateway.RequestsIntegrationTest do end) end end - - @tag :skip - describe "sidecar requests" do - setup do - {:ok, %Boruta.Ecto.Client{id: client_id}} = Admin.create_client(%{}) - - {:ok, access_token} = - AccessTokensAdapter.create( - %{ - client: ClientsAdapter.get_client(client_id), - scope: "test" - }, - [] - ) - - {:ok, access_token: access_token} - end - - test "returns a 404 when no upstream found" do - Sandbox.unboxed_run(Repo, fn -> - try do - Upstreams.create_upstream(%{ - node_name: Atom.to_string(node()), - scheme: "http", - host: "should.not.be.called", - port: 80, - uris: ["/upstream"] - }) - - Process.sleep(100) - - request = Finch.build(:get, "http://localhost:7778/no_upstream", [], "") - - assert {:ok, %Finch.Response{body: body, status: 404}} = - Finch.request(request, HttpClient) - - assert body == "No upstream has been found corresponding to the given request." - after - Repo.delete_all(Upstream) - end - end) - end - - test "returns a 401 when unauthorized" do - Sandbox.unboxed_run(Repo, fn -> - try do - {:ok, upstream} = - Upstreams.create_upstream(%{ - node_name: Atom.to_string(node()), - scheme: "http", - host: "should.not.be.called", - port: 80, - uris: ["/unauthorized"], - authorize: true, - error_content_type: "text", - unauthorized_response: "boom" - }) - - Process.sleep(100) - - request = Finch.build(:get, "http://localhost:7778/unauthorized", [], "") - - assert {:ok, %Finch.Response{body: body, headers: headers, status: 401}} = - Finch.request(request, HttpClient) - - assert body == upstream.unauthorized_response - - assert Enum.any?(headers, fn - {"content-type", content_type} -> - upstream.error_content_type - |> Regex.compile!() - |> Regex.match?(content_type) - - _ -> - false - end) - after - Repo.delete_all(Upstream) - end - end) - end - - test "returns a 403 when forbidden", %{access_token: access_token} do - Sandbox.unboxed_run(Repo, fn -> - try do - {:ok, upstream} = - Upstreams.create_upstream(%{ - node_name: Atom.to_string(node()), - scheme: "http", - host: "should.not.be.called", - port: 80, - uris: ["/forbidden"], - authorize: true, - required_scopes: %{"GET" => ["required"]}, - error_content_type: "text", - forbidden_response: "boom" - }) - - Process.sleep(100) - - request = - Finch.build( - :get, - "http://localhost:7778/forbidden", - [{"authorization", "Bearer #{access_token.value}"}], - "" - ) - - assert {:ok, %Finch.Response{body: body, headers: headers, status: 403}} = - Finch.request(request, HttpClient) - - assert body == upstream.forbidden_response - - assert Enum.any?(headers, fn - {"content-type", content_type} -> - upstream.error_content_type - |> Regex.compile!() - |> Regex.match?(content_type) - - _ -> - false - end) - after - Repo.delete_all(Upstream) - end - end) - end - - @tag :skip - test "returns response when authorized", %{access_token: access_token} do - Sandbox.unboxed_run(Repo, fn -> - try do - Upstreams.create_upstream(%{ - node_name: Atom.to_string(node()), - scheme: "http", - host: "httpbin.patatoid.fr", - port: 80, - uris: ["/httpbin"], - strip_uri: true, - authorize: true, - required_scopes: %{"GET" => ["test"]} - }) - - Process.sleep(100) - - request = - Finch.build( - :get, - "http://localhost:7778/httpbin/status/418", - [{"authorization", "Bearer #{access_token.value}"}], - "" - ) - - assert {:ok, %Finch.Response{body: body, status: 418}} = - Finch.request(request, HttpClient) - - assert body =~ ~r/teapot/ - after - Repo.delete_all(Upstream) - end - end) - end - - @tag :skip - test "returns authorization header with introspected token when authorized", %{ - access_token: access_token - } do - Sandbox.unboxed_run(Repo, fn -> - try do - {:ok, upstream} = - Upstreams.create_upstream(%{ - node_name: Atom.to_string(node()), - scheme: "http", - host: "httpbin.patatoid.fr", - port: 80, - uris: ["/httpbin"], - strip_uri: true, - authorize: true, - required_scopes: %{"GET" => ["test"]}, - forwarded_token_signature_alg: "HS256" - }) - - Process.sleep(100) - - request = - Finch.build( - :get, - "http://localhost:7778/httpbin/anything", - [{"authorization", "bearer #{access_token.value}"}], - "" - ) - - assert {:ok, %Finch.Response{body: body, status: 200}} = - Finch.request(request, HttpClient) - - assert %{ - "headers" => %{ - "Authorization" => forwarded_authorization, - "X-Forwarded-Authorization" => authorization - } - } = Jason.decode!(body) - - assert [_authorization_header, token] = Regex.run(~r/bearer (.+)/, authorization) - signer = Client.signer(upstream) - assert {:ok, claims} = Client.Token.verify(token, signer) - assert claims["client_id"] == access_token.client.id - assert claims["value"] == access_token.value - - assert forwarded_authorization == "bearer #{access_token.value}" - after - Repo.delete_all(Upstream) - end - end) - end - end end From b3c77533f3e6638bde81d099d26150c2a4d8b855 Mon Sep 17 00:00:00 2001 From: Pascal Knoth Date: Sun, 13 Jul 2025 14:23:41 +0200 Subject: [PATCH 2/3] [gateway] WIP close tcp socket on gateway server stop --- .../lib/boruta_gateway/application.ex | 5 +- .../lib/boruta_gateway/gateway.ex | 72 ++++++++++++------- 2 files changed, 50 insertions(+), 27 deletions(-) diff --git a/apps/boruta_gateway/lib/boruta_gateway/application.ex b/apps/boruta_gateway/lib/boruta_gateway/application.ex index f1f1270eb..4b5411628 100644 --- a/apps/boruta_gateway/lib/boruta_gateway/application.ex +++ b/apps/boruta_gateway/lib/boruta_gateway/application.ex @@ -9,6 +9,7 @@ defmodule BorutaGateway.Application do alias BorutaGateway.Upstreams alias BorutaGateway.Upstreams.ClientSupervisor + @impl Application def start(_type, _args) do children = [ BorutaGateway.Repo, @@ -27,7 +28,7 @@ defmodule BorutaGateway.Application do [ %{ start: - {BorutaGateway.Gateway, :start, + {BorutaGateway.Gateway.Server, :start, [ [ port: Application.fetch_env!(:boruta_gateway, :port), @@ -44,7 +45,7 @@ defmodule BorutaGateway.Application do end setup_database() - Supervisor.start_link(children, strategy: :one_for_one, name: BorutaGateway.Supervisor, shutdown: 5_000) + Supervisor.start_link(children, strategy: :one_for_one, name: BorutaGateway.Supervisor) end def setup_database do diff --git a/apps/boruta_gateway/lib/boruta_gateway/gateway.ex b/apps/boruta_gateway/lib/boruta_gateway/gateway.ex index 147f720e4..497560972 100644 --- a/apps/boruta_gateway/lib/boruta_gateway/gateway.ex +++ b/apps/boruta_gateway/lib/boruta_gateway/gateway.ex @@ -17,28 +17,46 @@ defmodule BorutaGateway.Gateway do @connect_timeout 5_000 - def start(args) do - {:ok, listen_socket} = - :gen_tcp.listen(args[:port], [{:packet, :raw}, :binary, {:active, false}]) - - children = - Enum.map(1..args[:num_acceptors], fn i -> - Supervisor.child_spec({__MODULE__, [listen_socket: listen_socket]}, - id: :"http_proxy_server_acceptor_#{i}" - ) - end) - - # TODO monitor children - with {:ok, pid} <- Supervisor.start_link(children, strategy: :one_for_one, name: __MODULE__) do - spawn(fn -> - Process.monitor(pid) - Process.flag(:trap_exit, true) - receive do - _ -> - :gen_tcp.close(listen_socket) - end - end) - {:ok, pid} + defmodule Server do + use GenServer + + alias BorutaGateway.Gateway + + def start(args) do + GenServer.start_link(__MODULE__, args, name: __MODULE__) + end + + @impl GenServer + def init(args) do + {:ok, listen_socket} = + :gen_tcp.listen(args[:port], [{:packet, :raw}, :binary, {:active, false}]) + + children = + Enum.map(1..args[:num_acceptors], fn i -> + Supervisor.child_spec({Gateway, [listen_socket: listen_socket]}, + id: :"http_proxy_server_acceptor_#{i}" + ) + end) + + Process.flag(:trap_exit, true) + + with {:ok, supervisor} <- Supervisor.start_link(children, strategy: :one_for_one) do + {:ok, supervisor: supervisor, listen_socket: listen_socket} + end + end + + @impl GenServer + def handle_info({:EXIT, _pid, reason}, state) do + :gen_tcp.close(state[:listen_socket]) + + {:stop, reason, state} + end + + @impl GenServer + def terminate(_reason, state) do + :gen_tcp.close(state[:listen_socket]) + + :ok end end @@ -67,10 +85,14 @@ defmodule BorutaGateway.Gateway do end def handle_info(:accept, state) do - {:ok, socket} = :gen_tcp.accept(state.listen_socket) - :inet.setopts(socket, active: :once) + case :gen_tcp.accept(state.listen_socket) do + {:ok, socket} -> + :inet.setopts(socket, active: :once) - {:noreply, %{state | socket: socket, client_socket: nil}} + {:noreply, %{state | socket: socket, client_socket: nil}} + {:error, _error} -> + {:stop, :shutdown, state} + end end def handle_info({:tcp_closed, socket}, %State{socket: socket} = state) do From e330dc612027adf1b8565ddf5780ec39a7683475 Mon Sep 17 00:00:00 2001 From: Pascal Knoth Date: Sun, 13 Jul 2025 22:48:54 +0200 Subject: [PATCH 3/3] [gateway] WIP fix server socket management --- .../lib/boruta_gateway/gateway.ex | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/apps/boruta_gateway/lib/boruta_gateway/gateway.ex b/apps/boruta_gateway/lib/boruta_gateway/gateway.ex index 497560972..2948953c0 100644 --- a/apps/boruta_gateway/lib/boruta_gateway/gateway.ex +++ b/apps/boruta_gateway/lib/boruta_gateway/gateway.ex @@ -63,7 +63,7 @@ defmodule BorutaGateway.Gateway do defmodule State do @moduledoc false - defstruct [:listen_socket, :socket, :client_socket, :start, :request, :response] + defstruct [:listen_socket, :socket, :client_socket, :start, :request, :response, :content_length] end def start_link(args) do @@ -89,7 +89,7 @@ defmodule BorutaGateway.Gateway do {:ok, socket} -> :inet.setopts(socket, active: :once) - {:noreply, %{state | socket: socket, client_socket: nil}} + {:noreply, %{state | socket: socket, client_socket: nil, response: nil}} {:error, _error} -> {:stop, :shutdown, state} end @@ -99,17 +99,17 @@ defmodule BorutaGateway.Gateway do {:ok, socket} = :gen_tcp.accept(state.listen_socket) :inet.setopts(socket, active: :once) - {:noreply, %{state | socket: socket, client_socket: nil}} + {:noreply, %{state | socket: socket, client_socket: nil, response: nil}} end def handle_info({:tcp_closed, socket}, %State{client_socket: socket} = state) do - :gen_tcp.shutdown(state.socket, :write) + :gen_tcp.close(state.socket) {:noreply, state} end def handle_info({:ssl_closed, socket}, %State{client_socket: socket} = state) do - :gen_tcp.shutdown(state.socket, :write) + :gen_tcp.close(state.socket) {:noreply, state} end @@ -142,7 +142,7 @@ defmodule BorutaGateway.Gateway do {:error, _error} -> :gen_tcp.send(socket, "HTTP/1.1 503 Service Unavailable\r\n\r\n") - :gen_tcp.shutdown(socket, :write) + :gen_tcp.close(socket) {:noreply, state} end @@ -170,7 +170,7 @@ defmodule BorutaGateway.Gateway do {:error, _error} -> :gen_tcp.send(socket, "HTTP/1.1 503 Service Unavailable\r\n\r\n") - :gen_tcp.shutdown(socket, :write) + :gen_tcp.close(socket) {:noreply, state} end @@ -187,7 +187,7 @@ defmodule BorutaGateway.Gateway do ) _response = :os.system_time(:microsecond) - start - :gen_tcp.shutdown(socket, :write) + :gen_tcp.close(socket) {:noreply, state} @@ -201,7 +201,7 @@ defmodule BorutaGateway.Gateway do ) _response = :os.system_time(:microsecond) - start - :gen_tcp.shutdown(socket, :write) + :gen_tcp.close(socket) {:noreply, state} @@ -215,7 +215,7 @@ defmodule BorutaGateway.Gateway do ) _response = :os.system_time(:microsecond) - start - :gen_tcp.shutdown(socket, :write) + :gen_tcp.close(socket) {:noreply, state} end @@ -224,30 +224,34 @@ defmodule BorutaGateway.Gateway do def handle_info({:tcp, socket, payload}, %State{client_socket: socket} = state) do # TODO clean response headers (connection, strict-transport-security) # clean_response_headers(payload) + response = (state.response || "") <> payload + :gen_tcp.send(state.socket, payload) - case message_complete?(payload) do + case message_complete?(response) do true -> - _response = :os.system_time(:microsecond) - state.start - :gen_tcp.shutdown(state.socket, :write) + :gen_tcp.close(state.socket) + + {:noreply, %{state | socket: socket}} + false -> :inet.setopts(socket, active: :once) + {:noreply, %{state | response: response}} end - {:noreply, state} end def handle_info({:ssl, socket, payload}, %State{client_socket: socket} = state) do # TODO clean response headers (connection, strict-transport-security) # clean_response_headers(payload) + response = (state.response || "") <> payload :gen_tcp.send(state.socket, payload) - case message_complete?(payload) do + case message_complete?(response) do true -> - _response = :os.system_time(:microsecond) - state.start - :gen_tcp.shutdown(state.socket, :write) + :gen_tcp.close(state.socket) false -> :ssl.setopts(socket, active: :once) end - {:noreply, %{state | response: payload}} + {:noreply, %{state | response: response}} end def handle_info({:tcp, socket, payload}, %State{socket: socket} = state) do @@ -352,13 +356,13 @@ defmodule BorutaGateway.Gateway do end end - defp message_complete?(payload) do - case String.split(payload, "\r\n\r\n") do + defp message_complete?(response) do + case String.split(response, "\r\n\r\n") do [_header] -> true - [header|body] -> + [header|[body]] -> [_, content_length] = Regex.run(~r{[C|c]ontent-[L|l]ength\: ([^\r]+)}, header) - body_length = body |> Enum.join("\r\n\r\n") |> byte_size() + body_length = body |> byte_size() body_length == String.to_integer(content_length) end