Skip to content
93 changes: 69 additions & 24 deletions src/gen_tcp_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
addr => any
}).
-define(DEFAULT_SOCKET_OPTIONS, #{}).
%% Smaller chunks work better with lwIP's limited buffers
-define(MAX_SEND_CHUNK, 1460). %% TCP MSS - fits in single packet without fragmentation

%%
%% API
Expand Down Expand Up @@ -130,16 +132,29 @@ handle_info({tcp, Socket, Packet}, State) ->
case Handler:handle_receive(Socket, Packet, HandlerState) of
{reply, ResponsePacket, ResponseState} ->
?TRACE("Sending reply to endpoint ~p", [socket:peername(Socket)]),
try_send(Socket, ResponsePacket),
{noreply, State#state{handler_state=ResponseState}};
case try_send(Socket, ResponsePacket) of
ok ->
{noreply, State#state{handler_state=ResponseState}};
{error, closed} ->
?TRACE("Connection closed during send, cleaning up", []),
{noreply, State#state{handler_state=ResponseState}};
{error, _Reason} ->
try_close(Socket),
{noreply, State#state{handler_state=ResponseState}}
end;
{noreply, ResponseState} ->
?TRACE("no reply", []),
{noreply, State#state{handler_state=ResponseState}};
{close, ResponsePacket} ->
?TRACE("Sending reply to endpoint ~p and closing socket: ~p", [socket:peername(Socket), Socket]),
try_send(Socket, ResponsePacket),
% timer:sleep(500),
try_close(Socket),
case try_send(Socket, ResponsePacket) of
ok ->
try_close(Socket);
{error, closed} ->
ok; %% Already closed, nothing to do
{error, _Reason} ->
try_close(Socket)
end,
{noreply, State};
close ->
?TRACE("Closing socket ~p", [Socket]),
Expand Down Expand Up @@ -168,20 +183,11 @@ try_send(Socket, Packet) when is_binary(Packet) ->
"Trying to send binary packet data to socket ~p. Packet (or len): ~p", [
Socket, case byte_size(Packet) < 32 of true -> Packet; _ -> byte_size(Packet) end
]),
case socket:send(Socket, Packet) of
ok ->
?TRACE("sent.", []),
ok;
{ok, Rest} ->
?TRACE("sent. remaining: ~p", [Rest]),
try_send(Socket, Rest);
Error ->
io:format("Send failed due to error ~p~n", [Error])
end;
try_send(Socket, Char) when is_integer(Char) ->
%% TODO handle unicode
?TRACE("Sending char ~p as ~p", [Char, <<Char:8>>]),
try_send(Socket, <<Char:8>>);
try_send_binary(Socket, Packet);
try_send(Socket, Byte) when is_integer(Byte) ->
%% Handles bytes (0-255) in iolists. Unicode must be pre-encoded to UTF-8.
?TRACE("Sending byte ~p as ~p", [Byte, <<Byte:8>>]),
try_send(Socket, <<Byte:8>>);
try_send(Socket, List) when is_list(List) ->
case is_string(List) of
true ->
Expand All @@ -193,8 +199,46 @@ try_send(Socket, List) when is_list(List) ->
try_send_iolist(_Socket, []) ->
ok;
try_send_iolist(Socket, [H | T]) ->
try_send(Socket, H),
try_send_iolist(Socket, T).
case try_send(Socket, H) of
ok ->
try_send_iolist(Socket, T);
{error, _Reason} = Error ->
Error
end.

try_send_binary(_Socket, <<>>) ->
ok;
try_send_binary(Socket, Packet) when is_binary(Packet) ->
TotalSize = byte_size(Packet),
ChunkSize = erlang:min(TotalSize, ?MAX_SEND_CHUNK),
<<Chunk:ChunkSize/binary, Rest/binary>> = Packet,
case socket:send(Socket, Chunk) of
ok ->
%% Give the scheduler a chance to run and let TCP drain
maybe_yield(Rest),
try_send_binary(Socket, Rest);
{ok, Remaining} ->
%% Partial send - combine remaining with rest and retry
try_send_binary(Socket, <<Remaining/binary, Rest/binary>>);
{error, closed} ->
%% Only log if we actually had more data to send
case byte_size(Rest) of
0 -> ok; %% Sent everything, client just closed after - that's fine
_ -> io:format("Connection closed mid-transfer (~p/~p bytes sent)~n",
[ChunkSize, TotalSize])
end,
{error, closed};
{error, Reason} ->
io:format("Send error: ~p (chunk: ~p, total: ~p)~n",
[Reason, ChunkSize, TotalSize]),
{error, Reason}
end.

%% Lightweight yield using receive timeout - works in AtomVM
maybe_yield(<<>>) ->
ok;
maybe_yield(_) ->
receive after 0 -> ok end.

is_string([]) ->
true;
Expand All @@ -216,7 +260,6 @@ try_close(Socket) ->
set_socket_options(Socket, SocketOptions) ->
maps:fold(
fun(Option, Value, Accum) ->
erlang:display({setopt, Socket, Option, Value}),
ok = socket:setopt(Socket, Option, Value),
Accum
end,
Expand All @@ -232,8 +275,10 @@ accept(ControllingProcess, ListenSocket) ->
?TRACE("Accepted connection from ~p", [socket:peername(Connection)]),
spawn(fun() -> accept(ControllingProcess, ListenSocket) end),
loop(ControllingProcess, Connection);
_Error ->
?TRACE("Error accepting connection: ~p", [Error])
Error ->
?TRACE("Error accepting connection: ~p", [Error]),
timer:sleep(100),
accept(ControllingProcess, ListenSocket)
end.


Expand Down
Loading