diff --git a/kcp2k/kcp2k/highlevel/Common.cs b/kcp2k/kcp2k/highlevel/Common.cs index e96246a..2a86a9e 100644 --- a/kcp2k/kcp2k/highlevel/Common.cs +++ b/kcp2k/kcp2k/highlevel/Common.cs @@ -24,6 +24,30 @@ public static bool ResolveHostname(string hostname, out IPAddress[] addresses) } } + // if connections drop under heavy load, increase to OS limit. + // if still not enough, increase the OS limit. + public static void ConfigureSocketBuffers(ISocket socket, int recvBufferSize, int sendBufferSize) + { + // log initial size for comparison. + // remember initial size for log comparison + int initialReceive = socket.ReceiveBufferSize; + int initialSend = socket.SendBufferSize; + + // set to configured size + try + { + socket.ReceiveBufferSize = recvBufferSize; + socket.SendBufferSize = sendBufferSize; + } + catch (SocketException) + { + Log.Warning($"[KCP] failed to set Socket RecvBufSize = {recvBufferSize} SendBufSize = {sendBufferSize}"); + } + + + Log.Info($"[KCP] RecvBuf = {initialReceive}=>{socket.ReceiveBufferSize} ({socket.ReceiveBufferSize/initialReceive}x) SendBuf = {initialSend}=>{socket.SendBufferSize} ({socket.SendBufferSize/initialSend}x)"); + } + // if connections drop under heavy load, increase to OS limit. // if still not enough, increase the OS limit. public static void ConfigureSocketBuffers(Socket socket, int recvBufferSize, int sendBufferSize) diff --git a/kcp2k/kcp2k/highlevel/Extensions.cs b/kcp2k/kcp2k/highlevel/Extensions.cs index 20725a9..ad7564c 100644 --- a/kcp2k/kcp2k/highlevel/Extensions.cs +++ b/kcp2k/kcp2k/highlevel/Extensions.cs @@ -45,39 +45,6 @@ public static bool SendToNonBlocking(this Socket socket, ArraySegment data } } - // non-blocking UDP send. - // allows for reuse when overwriting KcpServer/Client (i.e. for relays). - // => wrapped with Poll to avoid WouldBlock allocating new SocketException. - // => wrapped with try-catch to ignore WouldBlock exception. - // make sure to set socket.Blocking = false before using this! - public static bool SendNonBlocking(this Socket socket, ArraySegment data) - { - try - { - // when using non-blocking sockets, SendTo may return WouldBlock. - // in C#, WouldBlock throws a SocketException, which is expected. - // unfortunately, creating the SocketException allocates in C#. - // let's poll first to avoid the WouldBlock allocation. - // note that this entirely to avoid allocations. - // non-blocking UDP doesn't need Poll in other languages. - // and the code still works without the Poll call. - if (!socket.Poll(0, SelectMode.SelectWrite)) return false; - - // SendTo allocates. we used bound Send. - socket.Send(data.Array, data.Offset, data.Count, SocketFlags.None); - return true; - } - catch (SocketException e) - { - // for non-blocking sockets, SendTo may throw WouldBlock. - // in that case, simply drop the message. it's UDP, it's fine. - if (e.SocketErrorCode == SocketError.WouldBlock) return false; - - // otherwise it's a real socket error. throw it. - throw; - } - } - // non-blocking UDP receive. // allows for reuse when overwriting KcpServer/Client (i.e. for relays). // => wrapped with Poll to avoid WouldBlock allocating new SocketException. @@ -120,47 +87,5 @@ public static bool ReceiveFromNonBlocking(this Socket socket, byte[] recvBuffer, throw; } } - - // non-blocking UDP receive. - // allows for reuse when overwriting KcpServer/Client (i.e. for relays). - // => wrapped with Poll to avoid WouldBlock allocating new SocketException. - // => wrapped with try-catch to ignore WouldBlock exception. - // make sure to set socket.Blocking = false before using this! - public static bool ReceiveNonBlocking(this Socket socket, byte[] recvBuffer, out ArraySegment data) - { - data = default; - - try - { - // when using non-blocking sockets, ReceiveFrom may return WouldBlock. - // in C#, WouldBlock throws a SocketException, which is expected. - // unfortunately, creating the SocketException allocates in C#. - // let's poll first to avoid the WouldBlock allocation. - // note that this entirely to avoid allocations. - // non-blocking UDP doesn't need Poll in other languages. - // and the code still works without the Poll call. - if (!socket.Poll(0, SelectMode.SelectRead)) return false; - - // ReceiveFrom allocates. we used bound Receive. - // returns amount of bytes written into buffer. - // throws SocketException if datagram was larger than buffer. - // https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0 - // - // throws SocketException if datagram was larger than buffer. - // https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0 - int size = socket.Receive(recvBuffer, 0, recvBuffer.Length, SocketFlags.None); - data = new ArraySegment(recvBuffer, 0, size); - return true; - } - catch (SocketException e) - { - // for non-blocking sockets, Receive throws WouldBlock if there is - // no message to read. that's okay. only log for other errors. - if (e.SocketErrorCode == SocketError.WouldBlock) return false; - - // otherwise it's a real socket error. throw it. - throw; - } - } } } \ No newline at end of file diff --git a/kcp2k/kcp2k/highlevel/ISocket.cs b/kcp2k/kcp2k/highlevel/ISocket.cs new file mode 100644 index 0000000..db250fa --- /dev/null +++ b/kcp2k/kcp2k/highlevel/ISocket.cs @@ -0,0 +1,17 @@ +using System; +using System.Net; + +namespace kcp2k +{ + public interface ISocket + { + EndPoint LocalEndPoint { get; } + bool Blocking { get; set; } + int ReceiveBufferSize { get; set; } + int SendBufferSize { get; set; } + void Connect(EndPoint remoteEP); + void Close(); + bool SendNonBlocking(ArraySegment data); + bool ReceiveNonBlocking(byte[] recvBuffer, out ArraySegment data); + } +} diff --git a/kcp2k/kcp2k/highlevel/KcpClient.cs b/kcp2k/kcp2k/highlevel/KcpClient.cs index 827cff0..e4ce9b6 100644 --- a/kcp2k/kcp2k/highlevel/KcpClient.cs +++ b/kcp2k/kcp2k/highlevel/KcpClient.cs @@ -9,7 +9,7 @@ namespace kcp2k public class KcpClient : KcpPeer { // IO - protected Socket socket; + protected ISocket socket; public EndPoint remoteEndPoint; // expose local endpoint for users / relays / nat traversal etc. @@ -43,12 +43,14 @@ public class KcpClient : KcpPeer bool active = false; // active between when connect() and disconnect() are called public bool connected; - public KcpClient(Action OnConnected, - Action, KcpChannel> OnData, - Action OnDisconnected, - Action OnError, - KcpConfig config) - : base(config, 0) // client has no cookie yet + public KcpClient( + Action OnConnected, + Action, KcpChannel> OnData, + Action OnDisconnected, + Action OnError, + KcpConfig config + ) + : base(config, 0) // client has no cookie yet { // initialize callbacks first to ensure they can be used safely. OnConnectedCallback = OnConnected; @@ -96,6 +98,7 @@ public void Connect(string address, ushort port) return; } +#if UNITY_EDITOR || !UNITY_WEBGL // resolve host name before creating peer. // fixes: https://github.com/MirrorNetworking/Mirror/issues/3361 if (!Common.ResolveHostname(address, out IPAddress[] addresses)) @@ -105,18 +108,27 @@ public void Connect(string address, ushort port) OnDisconnectedCallback(); return; } - +#endif // create fresh peer for each new session // client doesn't need secure cookie. Reset(config); Log.Info($"[KCP] Client: connect to {address}:{port}"); +#if UNITY_EDITOR || !UNITY_WEBGL // create socket remoteEndPoint = new IPEndPoint(addresses[0], port); - socket = new Socket(remoteEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + socket = new KcpSocket( + remoteEndPoint.AddressFamily, + SocketType.Dgram, + ProtocolType.Udp + ); +#elif UNITY_WEBGL && WECHATMINIGAME + socket = new WXSocket(address, port); +#endif active = true; +#if UNITY_EDITOR || !UNITY_WEBGL // recv & send are called from main thread. // need to ensure this never blocks. // even a 1ms block per connection would stop us from scaling. @@ -127,7 +139,7 @@ public void Connect(string address, ushort port) // bind to endpoint so we can use send/recv instead of sendto/recvfrom. socket.Connect(remoteEndPoint); - +#endif // immediately send a hello message to the server. // server will call OnMessage and add the new connection. // note that this still has cookie=0 until we receive the server's hello. diff --git a/kcp2k/kcp2k/highlevel/KcpSocket.cs b/kcp2k/kcp2k/highlevel/KcpSocket.cs new file mode 100644 index 0000000..d8729b6 --- /dev/null +++ b/kcp2k/kcp2k/highlevel/KcpSocket.cs @@ -0,0 +1,98 @@ +using System; +using System.Net; +using System.Net.Sockets; + +namespace kcp2k +{ + public class KcpSocket : Socket, ISocket + { + public KcpSocket(AddressFamily addressFamily, SocketType socketType, ProtocolType protocolType) : base(addressFamily, socketType, protocolType) + { + } + + public KcpSocket(SocketInformation socketInformation) : base(socketInformation) + { + } + + public KcpSocket(SocketType socketType, ProtocolType protocolType) : base(socketType, protocolType) + { + } + + // non-blocking UDP send. + // allows for reuse when overwriting KcpServer/Client (i.e. for relays). + // => wrapped with Poll to avoid WouldBlock allocating new SocketException. + // => wrapped with try-catch to ignore WouldBlock exception. + // make sure to set socket.Blocking = false before using this! + public bool SendNonBlocking(ArraySegment data) + { + try + { + // when using non-blocking sockets, SendTo may return WouldBlock. + // in C#, WouldBlock throws a SocketException, which is expected. + // unfortunately, creating the SocketException allocates in C#. + // let's poll first to avoid the WouldBlock allocation. + // note that this entirely to avoid allocations. + // non-blocking UDP doesn't need Poll in other languages. + // and the code still works without the Poll call. + if (!base.Poll(0, SelectMode.SelectWrite)) return false; + + // SendTo allocates. we used bound Send. + base.Send(data.Array, data.Offset, data.Count, SocketFlags.None); + //Log.Info($"[KcpSocket] SendNonBlocking, data: {BitConverter.ToString(data.Array!, data.Offset, data.Count)}"); + return true; + } + catch (SocketException e) + { + // for non-blocking sockets, SendTo may throw WouldBlock. + // in that case, simply drop the message. it's UDP, it's fine. + if (e.SocketErrorCode == SocketError.WouldBlock) return false; + + // otherwise it's a real socket error. throw it. + throw; + } + } + + // non-blocking UDP receive. + // allows for reuse when overwriting KcpServer/Client (i.e. for relays). + // => wrapped with Poll to avoid WouldBlock allocating new SocketException. + // => wrapped with try-catch to ignore WouldBlock exception. + // make sure to set socket.Blocking = false before using this! + public bool ReceiveNonBlocking(byte[] recvBuffer, out ArraySegment data) + { + data = default; + + try + { + // when using non-blocking sockets, ReceiveFrom may return WouldBlock. + // in C#, WouldBlock throws a SocketException, which is expected. + // unfortunately, creating the SocketException allocates in C#. + // let's poll first to avoid the WouldBlock allocation. + // note that this entirely to avoid allocations. + // non-blocking UDP doesn't need Poll in other languages. + // and the code still works without the Poll call. + if (!base.Poll(0, SelectMode.SelectRead)) return false; + + // ReceiveFrom allocates. we used bound Receive. + // returns amount of bytes written into buffer. + // throws SocketException if datagram was larger than buffer. + // https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0 + // + // throws SocketException if datagram was larger than buffer. + // https://learn.microsoft.com/en-us/dotnet/api/system.net.sockets.socket.receive?view=net-6.0 + int size = base.Receive(recvBuffer, 0, recvBuffer.Length, SocketFlags.None); + //if (size > 0) Log.Info($"[KcpSocket] ReceiveNonBlocking, data: {BitConverter.ToString(recvBuffer, 0, size)}"); + data = new ArraySegment(recvBuffer, 0, size); + return true; + } + catch (SocketException e) + { + // for non-blocking sockets, Receive throws WouldBlock if there is + // no message to read. that's okay. only log for other errors. + if (e.SocketErrorCode == SocketError.WouldBlock) return false; + + // otherwise it's a real socket error. throw it. + throw; + } + } + } +} diff --git a/kcp2k/kcp2k/highlevel/WXSocket.cs b/kcp2k/kcp2k/highlevel/WXSocket.cs new file mode 100644 index 0000000..8f2b61d --- /dev/null +++ b/kcp2k/kcp2k/highlevel/WXSocket.cs @@ -0,0 +1,119 @@ +using System; +using System.Collections.Generic; +using System.Net; +using kcp2k; +using WeChatWASM; + +public class WXSocket : ISocket +{ + private string address; + private int port; + private WXUDPSocket socket; + + private Queue recvQueue = new(); + + public WXSocket(string address, int port) + { + this.address = address; + this.port = port; + + socket = WX.CreateUDPSocket(); + socket.OnMessage(OnMessageHandler); + socket.OnError(OnErrorHandler); + socket.OnClose(OnCloseHandler); + socket.Bind(); + } + + public bool Blocking + { + get => true; + set + { /* ignored */ + } + } + + public EndPoint LocalEndPoint + { + get { throw new NotSupportedException(); } + } + + public int SendBufferSize + { + get => 0; + set + { /* ignored */ + } + } + + public int ReceiveBufferSize + { + get => 0; + set + { /* ignored */ + } + } + + public void Connect(EndPoint remoteEP) + { + throw new NotSupportedException("Use Connect(string address, int port)."); + } + + public bool ReceiveNonBlocking(byte[] recvBuffer, out ArraySegment data) + { + if (recvQueue.Count > 0) + { + var packet = recvQueue.Dequeue(); + int len = Math.Min(packet.Length, recvBuffer.Length); + Array.Copy(packet, 0, recvBuffer, 0, len); + data = new ArraySegment(recvBuffer, 0, len); + return true; + } + + data = default; + return false; + } + + public bool SendNonBlocking(ArraySegment data) + { + socket.Send( + new UDPSocketSendOption() + { + address = this.address, + port = this.port, + message = data.Array, + length = data.Count, + offset = data.Offset, + } + ); + // Log.Info($"[WXSocket] SendNonBlocking, address: {this.address}, port: {port}, message: {BitConverter.ToString(data.Array!, data.Offset, data.Count)}"); + return true; + } + + private void OnCloseHandler(GeneralCallbackResult result) + { + Log.Warning($"[WXSocket] onCloseHandler: {result.errMsg}"); + } + + private void OnErrorHandler(GeneralCallbackResult result) + { + Log.Warning($"[WXSocket] onErrorHandler: {result.errMsg}"); + } + + private void OnMessageHandler(UDPSocketOnMessageListenerResult result) + { + if (result.message != null) + { + recvQueue.Enqueue(result.message); + // Log.Info( $"[WXSocket] OnMessageHandler, message: {BitConverter.ToString(result.message, result.message.Length)}"); + } + } + + public void Close() + { + socket?.OffMessage(OnMessageHandler); + socket?.OffError(OnErrorHandler); + socket?.OffClose(OnCloseHandler); + socket?.Close(); + socket = null; + } +}