TCP support for Machnet, when non-machnet application connect a machnet application.#54
TCP support for Machnet, when non-machnet application connect a machnet application.#54
Conversation
TCP transport is important in any networking stack. Therefore, this is the first try to add this transport to Machnet. Signed-off-by: Alireza Sanaee <sarsanaee@gmail.com>
Signed-off-by: Alireza Sanaee <sarsanaee@gmail.com>
a2574e7 to
5f07af2
Compare
There was a problem hiding this comment.
Pull request overview
Adds an experimental TCP transport path to Machnet so non-Machnet applications (Linux TCP stack) can connect to Machnet applications, addressing issue #53 by introducing a TCP flow implementation and exposing protocol selection through the public API and bindings.
Changes:
- Introduce TCP wire header +
TcpFlow(handshake, framing/deframing, basic timers) and integrate TCP dispatch/listeners intoMachnetEngine. - Extend the public C API (
machnet_connect,machnet_listen) with a protocol selector and propagate it through examples and language bindings. - Add a standalone
tcp_msg_genapp to interop with Machnet TCP framing.
Reviewed changes
Copilot reviewed 30 out of 30 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| src/include/tcp_flow.h | New TCP flow implementation (state machine + framing over TCP). |
| src/include/tcp.h | New packed TCP header definition for parsing/construction. |
| src/include/packet.h | Add TCPv4 checksum offload helper. |
| src/include/machnet_engine.h | Add TCP listeners/flows, TCP control ops, and TCP RX/TX routing. |
| src/include/channel.h | Track active TCP flows per channel; add create/remove helpers. |
| src/ext/machnet_common.h | Add protocol constants and new TCP control-plane opcodes; fix STATUS macro semicolon. |
| src/ext/machnet.h | Extend public API: protocol parameter for connect/listen. |
| src/ext/machnet.c | Implement protocol-aware connect/listen opcodes in user library. |
| src/core/net/tcp.cc | Implement Tcp::ToString(). |
| src/core/drivers/shm/channel.cc | Implement Channel::RemoveTcpFlow(). |
| src/apps/tcp_msg_gen/main.cc | New Linux TCP tool speaking Machnet TCP framing. |
| src/apps/tcp_msg_gen/CMakeLists.txt | Build configuration for tcp_msg_gen. |
| src/apps/rocksdb_server/rocksdb_server.cc | Update call to new machnet_listen(..., protocol) signature. |
| src/apps/msg_gen/main.cc | Add --protocol={udp,tcp} flag and wire it into connect/listen. |
| src/apps/CMakeLists.txt | Add tcp_msg_gen subdirectory. |
| examples/sync.sh | Add rsync helper script for syncing a workspace to a remote machine. |
| examples/rust/src/main.rs | Update Rust example calls to include protocol argument. |
| examples/hello_world.cc | Update example calls to include protocol argument. |
| bindings/rust/src/lib.rs | Update Rust wrapper to pass protocol and adjust doc reference. |
| bindings/rust/src/bindings.rs | Update Rust FFI signatures for protocol argument. |
| bindings/rust/resources/machnet.h | Update bundled header used by Rust bindings with protocol parameter. |
| bindings/js/rocksdb_client.js | Update JS client to pass protocol argument. |
| bindings/js/machnet_shim.js | Update JS FFI signatures to include protocol argument. |
| bindings/js/latency.js | Update JS example to pass protocol argument. |
| bindings/js/hello_world.js | Update JS example to pass protocol argument. |
| bindings/go/msg_gen/main.go | Update Go example to pass protocol argument. |
| bindings/go/machnet/machnet.go | Update Go wrapper signatures to include protocol argument. |
| bindings/go/machnet/conversion.h | Update Go C shim signatures to include protocol argument. |
| bindings/csharp/HelloWorld/machnet_shim.cs | Update C# P/Invoke signatures to include protocol argument. |
| bindings/csharp/HelloWorld/Program.cs | Update C# example calls to pass protocol argument. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tcph->set_header_length(sizeof(Tcp)); // 20 bytes, no options. | ||
| tcph->flags = flags; | ||
| tcph->window = be16_t(rcv_wnd_); | ||
| tcph->checksum = 0; | ||
| tcph->urgent_ptr = be16_t(0); |
There was a problem hiding this comment.
The receive window advertised in outgoing TCP headers is always rcv_wnd_ (initialized to 65535) and is never reduced when the channel runs out of MsgBufs. If MsgBufAlloc fails, data is dropped but we continue to advertise a large window, breaking TCP flow control and risking stream desynchronization. Consider tying rcv_wnd_ to actual available channel/buffer capacity and advertising a zero/smaller window under pressure.
| pub fn machnet_listen(channel: &mut MachnetChannel, local_ip: &str, local_port: u16, protocol: i32) -> i32 { | ||
| unsafe { | ||
| let channel_ptr = channel.get_ptr(); | ||
| let local_ip_cstr = CString::new(local_ip).unwrap(); | ||
| bindings::machnet_listen(channel_ptr, local_ip_cstr.as_ptr(), local_port) | ||
| bindings::machnet_listen(channel_ptr, local_ip_cstr.as_ptr(), local_port, protocol) |
There was a problem hiding this comment.
The machnet_listen() signature was updated to include a protocol argument, but the Rustdoc examples above this function (and any other docs referencing machnet_listen) still show the old 3-arg call. Update the examples to include the protocol parameter so doctests/docs don’t mislead users.
| void InitiateHandshake() { | ||
| CHECK(state_ == State::kClosed); | ||
| SendSyn(); | ||
| state_ = State::kSynSent; | ||
| rto_active_ = true; | ||
| rto_remaining_ = rto_ticks_; | ||
| } |
There was a problem hiding this comment.
TcpFlow introduces a new TCP state machine, framing, and retransmission/timer behavior, but there are no accompanying unit/integration tests in this PR (contrast with existing flow/channel tests). Adding tests for handshake (active+passive open), framing/deframing across segment boundaries, and FIN/RST handling would help prevent regressions as this evolves.
| int __machnet_connect_go(MachnetChannelCtx_t* ctx, uint32_t local_ip, | ||
| uint32_t remote_ip, uint16_t remote_port, | ||
| MachnetFlow_t* flow) { | ||
| return machnet_connect(ctx, local_ip, remote_ip, remote_port, flow); | ||
| MachnetFlow_t* flow, int protocol) { | ||
| return machnet_connect(ctx, local_ip, remote_ip, remote_port, flow, protocol); | ||
| } | ||
|
|
||
| int __machnet_listen_go(MachnetChannelCtx_t* ctx, uint32_t local_ip, | ||
| uint16_t port) { | ||
| return machnet_listen(ctx, local_ip, port); | ||
| uint16_t port, int protocol) { | ||
| return machnet_listen(ctx, local_ip, port, protocol); | ||
| } |
There was a problem hiding this comment.
Go bindings now call machnet_connect()/machnet_listen() with uint32_t IPs, but the C API expects const char* IP strings. This will compile with warnings (C) but pass invalid pointers at runtime (likely crash). Update the Go wrapper to pass string IPs (and remove ipv4_str_to_uint32 usage) or introduce separate C APIs (e.g., machnet_connect_ipv4_u32 / machnet_listen_ipv4_u32) and expose those instead.
| REMOTE_USER="sarsanaee" | ||
| REMOTE_HOST="asas-westus2-vm-0" | ||
| LOCAL_PATH="/home/sarsanaee/machnet/" | ||
| REMOTE_PATH="${REMOTE_USER}@${REMOTE_HOST}:/home/sarsanaee/machnet/" | ||
|
|
There was a problem hiding this comment.
This script hard-codes a specific username, host, and absolute home-directory path. Checking in personal deployment details is risky and makes the script unusable for others; it can also leak internal hostnames. Consider removing this file from the repo or parameterizing via env vars/CLI args and documenting usage generically.
| REMOTE_USER="sarsanaee" | |
| REMOTE_HOST="asas-westus2-vm-0" | |
| LOCAL_PATH="/home/sarsanaee/machnet/" | |
| REMOTE_PATH="${REMOTE_USER}@${REMOTE_HOST}:/home/sarsanaee/machnet/" | |
| # Required: remote SSH username and host. Set these via environment variables. | |
| : "${REMOTE_USER:?REMOTE_USER is not set. Please export REMOTE_USER before running this script.}" | |
| : "${REMOTE_HOST:?REMOTE_HOST is not set. Please export REMOTE_HOST before running this script.}" | |
| # Optional: override LOCAL_PATH and REMOTE_BASE_PATH via environment variables. | |
| : "${LOCAL_PATH:=${HOME}/machnet/}" | |
| : "${REMOTE_BASE_PATH:=/home/${REMOTE_USER}/machnet/}" | |
| REMOTE_PATH="${REMOTE_USER}@${REMOTE_HOST}:${REMOTE_BASE_PATH}" |
| // Retransmit based on state. | ||
| switch (state_) { | ||
| case State::kSynSent: | ||
| LOG(INFO) << "TCP retransmitting SYN"; | ||
| SendSyn(); | ||
| break; | ||
| case State::kSynReceived: | ||
| LOG(INFO) << "TCP retransmitting SYN-ACK"; | ||
| SendSynAck(); | ||
| break; | ||
| default: | ||
| // For established connections, a full retransmission mechanism | ||
| // would require buffering sent data. For now, we just reset the timer. | ||
| break; | ||
| } |
There was a problem hiding this comment.
PeriodicCheck() does not retransmit any application data when RTO expires in established states (it only retransmits SYN/SYN-ACK). With any packet loss, the connection can hang indefinitely because lost bytes are never resent. Either implement buffering+retransmission for unacked data or fail the connection (RST/close) on RTO until retransmission support is added.
| // Client | ||
| MachnetFlow_t flow = new MachnetFlow_t(); | ||
| ret = MachnetShim.machnet_connect(channel_ctx, options.LocalIp, options.RemoteIp, kHelloWorldPort, ref flow); | ||
| ret = MachnetShim.machnet_connect(channel_ctx, options.LocalIp, options.RemoteIp, kHelloWorldPort, ref flow, 0); |
There was a problem hiding this comment.
Replace this call with a call to managed code if possible.
| { | ||
| Console.WriteLine("Waiting for message from client"); | ||
| ret = MachnetShim.machnet_listen(channel_ctx, options.LocalIp, kHelloWorldPort); | ||
| ret = MachnetShim.machnet_listen(channel_ctx, options.LocalIp, kHelloWorldPort, 0); |
There was a problem hiding this comment.
Replace this call with a call to managed code if possible.
|
|
||
| [DllImport(libmachnet_shim_location, CallingConvention = CallingConvention.Cdecl)] | ||
| public static extern int machnet_connect(IntPtr channel_ctx, string local_ip, string remote_ip, UInt16 port, ref MachnetFlow_t flow); | ||
| public static extern int machnet_connect(IntPtr channel_ctx, string local_ip, string remote_ip, UInt16 port, ref MachnetFlow_t flow, int protocol); |
There was a problem hiding this comment.
Minimise the use of unmanaged code.
|
|
||
| [DllImport(libmachnet_shim_location, CallingConvention = CallingConvention.Cdecl)] | ||
| public static extern int machnet_listen(IntPtr channel_ctx, string local_ip, UInt16 port); | ||
| public static extern int machnet_listen(IntPtr channel_ctx, string local_ip, UInt16 port, int protocol); |
There was a problem hiding this comment.
Minimise the use of unmanaged code.
Tests and readme added to describe tests used for the project on TCP side. Signed-off-by: Alireza Sanaee <sarsanaee@gmail.com>
5257833 to
4837db8
Compare
This is not ready for merge, but it is a version where TCP is working here, addresses issue #53