Skip to content

[Refactor] Migrating Coinswap to the mill-io Eventloop library #675

@hulxv

Description

@hulxv

Overview

Currently, the mill-io, our event-loop library, after testing and latest improvements, I can say it's ready now to use it inside Coinswap. The codebase of coinswap has many manual patterns and bad components (like this fake threadpool that's repeated here and here).
Also, As I see, Coinswap follows thread-per-connection paradigm and that's not the best paradigm in performance and it can be bad (see my article about the event-loop library in details, https://hulxv.me/blog/mill-io-event-loop-library/).

The main feature of mill-io is it's very minimal, not like heavy async runtimes such as tokio and async-io. In addition to that, we can add many future features that are helpful for coinswap.

This refactoring will be big and will enhance the code quality instead of repeating bad patterns. Also, will give us a better performance for sure.

Refactoring Examples

I did some researches in the codebase and found these potential points that can be refactored and minimized

1. Manual Accept Loop

Before:

let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, port))?;
listener.set_nonblocking(true)?;

while !maker.shutdown.load(Relaxed) {
    match listener.accept() {
        /// handling the connection
    }
    sleep(HEART_BEAT_INTERVAL);
}

After:

let config = TcpServerConfig::builder()
    .address(address.parse()?)
    .buffer_size(16384)
    .max_connections(Some(100))
    .build();

let handler = TakerHandler::new(maker.clone());
let server = Arc::new(TcpServer::new(config, handler)?);
server.start(&event_loop, Token(0))?;

2. Manual Message Reading

Before:

fn handle_client(maker: &Arc<Maker>, stream: &mut TcpStream) -> Result<(), MakerError> {
    stream.set_nonblocking(false)?;
    
    while !maker.shutdown.load(Relaxed) {
        let mut bytes = Vec::new();
        match read_message(stream) {
            Ok(b) => bytes = b,
            Err(e) => {
                if let NetError::IO(e) = e {
                    if e.kind() == ErrorKind::UnexpectedEof {
                        log::info!("Connection ended.");
                        break;
                    } else {
                        log::error!("Net Error: {}", e);
                        continue;
                    }
                }
            }
        }
        // ... process message
    }
    Ok(())
}

After:

impl NetworkHandler for MakerHandler {
    fn on_data(&self, ctx: &ServerContext, conn_id: ConnectionId, data: &[u8]) -> MillResult<()> {
        // data is already read and ready to process
        let message = serde_cbor::from_slice::<MakerToTakerMessage>(data)?;
        // ... process message
        Ok(())
    }
}

3. Manual Thread Pool Management

Before:

let maker_clone = maker.clone();
let idle_conn_check_thread = thread::Builder::new()
    .name("Idle Client Checker Thread".to_string())
    .spawn(move || {
        log::info!("Spawning Client connection status checker thread");
        if let Err(e) = check_for_idle_states(maker_clone.clone()) {
            log::error!("Failed checking client's idle state {e:?}");
            maker_clone.shutdown.store(true, Relaxed);
        }
    })?;
maker.thread_pool.add_thread(idle_conn_check_thread);

let maker_clone = maker.clone();
let contract_watcher_thread = thread::Builder::new()
    .name("Contract Watcher Thread".to_string())
    .spawn(move || {
        log::info!("Spawning contract-watcher thread");
        if let Err(e) = check_for_broadcasted_contracts(maker_clone.clone()) {
            maker_clone.shutdown.store(true, Relaxed);
            log::error!("Failed checking broadcasted contracts {e:?}");
        }
    })?;
maker.thread_pool.add_thread(contract_watcher_thread);

After:

// Uses mill-io's compute pool with priorities
event_loop.spawn_compute_with_priority(
    move || {
        if let Err(e) = check_for_idle_states(maker_clone.clone()) {
            log::error!("Failed checking client's idle state {e:?}");
        }
    },
    TaskPriority::High,
);

// Or without
event_loop.spawn_compute(
    move || {
        if let Err(e) = check_for_broadcasted_contracts(maker_clone.clone()) {
            log::error!("Failed checking broadcasted contracts {e:?}");
        }
    }
);

Code Quality Improvements

In addition to previous examples from the codebase, these are some features in the mill-io that can be useful for us.

1. Error Handling

Before: Manual error handling with multiple code paths

match stream.read(&mut buffer) {
    Ok(0) => {/* disconnect? */},
    Ok(n) => {/* process */},
    Err(e) if e.kind() == WouldBlock => {/* retry? */},
    Err(e) => {/* what now? */},
}

After: Unified error handling through NetworkHandler

fn on_data(&self, ctx: &ServerContext, conn_id: ConnectionId, data: &[u8]) -> MillResult<()> {
    // all I/O errors handled by mill-io
}

fn on_error(&self, ctx: &ServerContext, conn_id: ConnectionId, data: &[u8]) -> MillResult<()> { 
   // handling error here
}

2. Connection Lifecycle

Before: Manual tracking with potential leaks

// Easy to forget cleanup on error paths
if let Err(e) = handle_client(&mut stream) {
    // Did we clean up the connection state?
    // Did we remove from ongoing_swaps?
}

After: Guaranteed cleanup

fn on_disconnect(&self, _ctx: &ServerContext, conn_id: ConnectionId) -> MillResult<()> {
    // Always called, even on errors
    let swap_id = format!("{:?}", conn_id);
    self.maker.ongoing_swap_state.lock().unwrap().remove(&swap_id);
    Ok(())
}

3. Separation of Concerns

Before: Mixed I/O and business logic

fn handle_client(maker: &Arc<Maker>, stream: &mut TcpStream) -> Result<()> {
    // i/o code
    let bytes = read_message(stream)?;
    // Business logic
    let message = serde_cbor::from_slice(&bytes)?;
    let response = handle_message(&maker, message)?;
    // more I/O code
    send_message(stream, &response)?;
}

After: Clean separation

impl NetworkHandler for MakerHandler {
    fn on_data(&self, ctx: &ServerContext, conn_id: ConnectionId, data: &[u8]) -> MillResult<()> {
        // only business logic - I/O handled by mill-io
        let message = serde_cbor::from_slice(data)?;
        let response = handle_message(&self.maker, message)?;
        ctx.send_to(conn_id, &response_bytes)?;
        Ok(())
    }
}

Performance Improvements

These are some expected performance improvements. they are theoretically and We need to do real-world benchmarks anyway

Thread Usage

Scenario Before (Manual) After (mill-io) Improvement
Idle 3 threads 8 threads Better baseline
10 connections 13 threads 8 threads -38%
50 connections 53 threads 8 threads -85%
100 connections 103 threads 8 threads -92%

Memory Usage (Estimated)

Connections Before After
10 ~20 MB ~10 MB
50 ~100 MB ~15 MB
100 ~200 MB ~20 MB

Connection Capacity

  • Before: ~200 concurrent connections (limited by thread creation overhead)
  • After: 1,000+ concurrent connections (limited only by system resources)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions