Skip to content

tcp Recv-Q accumulation appears on the tls server #110

@stanley-jp

Description

@stanley-jp

I wrote a tls server, but occasionally the program gets stuck after a few days of actual operation.
The tcp listening port is 50011.
I found that Recv-Q is always 1025.

$ ss -ltnp
(Not all processes could be identified, non-owned process info
will not be shown, you would have to be root to see it all.)
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 1025 0 0.0.0.0:50011 0.0.0.0:* LISTEN -
tcp6 0 0 :::111 :::* LISTEN -
tcp6 0 0 :::22 :::* LISTEN -
tcp6 0 0 ::1:631 :::* LISTEN -
tcp6 0 0 :::5900 :::* LISTEN -

$ netstat -anp |grep 50011 |grep CLOSE_WAIT
878

The server code:

impl DiscoveryServer {
    pub async fn new(s: &Settings, n: &Arc<RwLock<HashMap<String, NodeDescription>>>) -> Self {
        log::info!(
            "create discovery server listener on {:?}",
            format!("{}:{}", "0.0.0.0", s.server.listen_port)
        );
        DiscoveryServer {
            tcp_socket: new_listener(format!("{}:{}", "0.0.0.0", s.server.listen_port), false)
                .await
                .unwrap(),
            settings: s.clone(),
            nodes: n.clone(),
        }
    }

    pub async fn start(self) -> ResultType<()> {
        log::info!("start discovery server");
        let tls_acceptor = new_tls_acceptor();
        tokio::spawn(async move {
            loop {
                match self.tcp_socket.accept().await {
                    Ok((stream, addr)) => {
                        let acceptor = tls_acceptor.clone();
                        let res_servers = self.nodes.clone();
                        let res_cities = self.settings.config_item.city_list.clone();

                        tokio::spawn(async move {
                            match TlsFrameStream::from(stream, acceptor).await {
                                Ok(mut tls_stream) => {
                                    if let Some(Ok(bytes)) = tls_stream.next_timeout(MESSAGE_TIMEOUT).await {
                                        if let Ok(msg_in) = DiscoveryMessage::parse_from_bytes(&bytes) {
                                            match msg_in.union {
                                                Some(discovery_message::Union::request(req)) => {
                                                    log::info!("msg from client:{}, request:{}", addr, req);
                                                    handle_request(&res_servers, res_cities, tls_stream, req).await;
                                                }
                                                _ => {
                                                    log::warn!("unknown union type from msg_in, type:{:?}", msg_in.union);
                                                }
                                            }
                                        }
                                    }
                                },
                                Err(e) => log::error!("error accept client, err: {}", e),
                            }
                        });
                    }
                    Err(err) => {
                        log::error!("error accept tcp socket, err: {}", err);
                    }
                }
            }
        });
        Ok(())
    }
}

Tls wrapper:

pub fn load_certs(filename: &str) -> Vec<rustls::Certificate> {
    let certfile = File::open(filename).expect("cannot open certificate file");
    let mut reader = BufReader::new(certfile);
    rustls_pemfile::certs(&mut reader)
        .unwrap()
        .iter()
        .map(|v| rustls::Certificate(v.clone()))
        .collect()
}

pub fn load_private_key(filename: &str) -> rustls::PrivateKey {
    let keyfile = File::open(filename).expect("cannot open private key file");
    let mut reader = BufReader::new(keyfile);

    loop {
        match rustls_pemfile::read_one(&mut reader).expect("cannot parse private key .pem file") {
            Some(rustls_pemfile::Item::RSAKey(key)) => return rustls::PrivateKey(key),
            Some(rustls_pemfile::Item::PKCS8Key(key)) => return rustls::PrivateKey(key),
            None => break,
            _ => {}
        }
    }

    panic!(
        "no keys found in {:?} (encrypted keys not supported)",
        filename
    );
}

pub fn lookup_ipv4(host: &str, port: u16) -> SocketAddr {
    let addrs = (host, port).to_socket_addrs().unwrap();
    for addr in addrs {
        if let SocketAddr::V4(_) = addr {
            return addr;
        }
    }

    unreachable!("Cannot lookup address");
}

fn make_client_config(
    ca_file: &str,
    certs_file: &str,
    key_file: &str,
) -> Arc<rustls::ClientConfig> {
    let cert_file = File::open(&ca_file).expect("Cannot open CA file");
    let mut reader = BufReader::new(cert_file);

    let mut root_store = RootCertStore::empty();
    root_store.add_parsable_certificates(&rustls_pemfile::certs(&mut reader).unwrap());

    let suites = rustls::DEFAULT_CIPHER_SUITES.to_vec();
    let versions = rustls::DEFAULT_VERSIONS.to_vec();

    let certs = load_certs(certs_file);
    let key = load_private_key(key_file);

    let config = rustls::ClientConfig::builder()
        .with_cipher_suites(&suites)
        .with_safe_default_kx_groups()
        .with_protocol_versions(&versions)
        .expect("inconsistent cipher-suite/versions selected")
        .with_root_certificates(root_store)
        .with_single_cert(certs, key)
        .expect("invalid client auth certs/key");
    Arc::new(config)
}

fn make_server_config(certs: &str, key_file: &str) -> Arc<rustls::ServerConfig> {
    let client_auth = NoClientAuth::new();
    let suites = rustls::ALL_CIPHER_SUITES.to_vec();
    let versions = rustls::ALL_VERSIONS.to_vec();

    let certs = load_certs(certs);
    let privkey = load_private_key(key_file);

    let mut config = rustls::ServerConfig::builder()
        .with_cipher_suites(&suites)
        .with_safe_default_kx_groups()
        .with_protocol_versions(&versions)
        .expect("inconsistent cipher-suites/versions specified")
        .with_client_cert_verifier(client_auth)
        .with_single_cert_with_ocsp_and_sct(certs, privkey, vec![], vec![])
        .expect("bad certificates/private key");

    config.key_log = Arc::new(rustls::KeyLogFile::new());
    config.session_storage = rustls::server::ServerSessionMemoryCache::new(256);
    Arc::new(config)
}

pub async fn new_tls_stream(
    domain: &str,
    addr: std::net::SocketAddr,
    ca_file: &str,
    cert_file: &str,
    key_file: &str,
) -> ResultType<ClientTlsStream<TcpStream>> {
    let config = make_client_config(&ca_file, &cert_file, &key_file);

    let connector = TlsConnector::from(config);

    let tcp_stream = TcpStream::connect(&addr).await?;
    let domain = rustls::ServerName::try_from(domain)
        .map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "invalid dnsname"))
        .unwrap();
    let tls_stream = connector.connect(domain, tcp_stream).await?;
    Ok(tls_stream)
}

pub fn new_tls_acceptor() -> TlsAcceptor {
    let config = make_server_config(CERT.server_cert_file, CERT.server_key_file);
    let acceptor = TlsAcceptor::from(config);
    acceptor
}

pub struct TlsFrameStream {
    pub client_stream: Option<ClientTlsStream<TcpStream>>,
    pub server_stream: Option<ServerTlsStream<TcpStream>>,
    peer_addr: SocketAddr,
}

impl TlsFrameStream {
    pub async fn from(stream: TcpStream, acceptor: TlsAcceptor) -> ResultType<Self> {
        let addr = stream.peer_addr()?;
        let tls_stream = match acceptor.accept(stream).await {
            Ok(tls_stream) => tls_stream,
            Err(e) => {
                return Err(anyhow!("accept stream failed.., error: {:?}", e));
            }
        };

        Ok(TlsFrameStream {
            client_stream: None,
            server_stream: Some(tls_stream),
            peer_addr: addr,
        })
    }

    pub async fn new_for_client(server_addr: SocketAddr, ms_timeout: u64) -> ResultType<Self> {
        let tls_stream = super::timeout(
            ms_timeout,
            new_tls_stream(
                "localhost",
                server_addr,
                CERT.ca_file,
                CERT.client_cert_file,
                CERT.client_key_file,
            ),
        )
        .await??;

        Ok(TlsFrameStream {
            client_stream: Some(tls_stream),
            server_stream: None,
            peer_addr: server_addr,
        })
    }

    #[inline]
    pub async fn next(&mut self) -> Option<Result<BytesMut, Error>> {
        let mut bytes = BytesMut::with_capacity(DEFAULT_BUFFER_SIZE);
        match self.client_stream.as_mut() {
            None => {}
            Some(stream) => {
                stream.read_buf(&mut bytes).await.unwrap();
                return Some(Ok(bytes));
            }
        };
        match self.server_stream.as_mut() {
            None => None,
            Some(stream) => {
                stream.read_buf(&mut bytes).await.unwrap();
                return Some(Ok(bytes));
            }
        }
    }

    #[inline]
    pub async fn send(&mut self, msg: &impl Message) -> ResultType<()> {
        self.send_raw(msg.write_to_bytes()?).await
    }

    #[inline]
    pub async fn send_raw(&mut self, msg: Vec<u8>) -> ResultType<()> {
        match self.client_stream.as_mut() {
            None => {}
            Some(stream) => {
                stream.write_all(&msg).await.unwrap();
                return Ok(());
            }
        }
        match self.server_stream.as_mut() {
            None => return Ok(()),
            Some(stream) => {
                stream.write_all(&msg).await.unwrap();
                return Ok(());
            }
        }
    }

    #[inline]
    pub async fn next_timeout(&mut self, ms: u64) -> Option<Result<BytesMut, Error>> {
        if let Ok(res) =
            tokio::time::timeout(std::time::Duration::from_millis(ms), self.next()).await
        {
            res
        } else {
            None
        }
    }

    pub async fn shutdown(&mut self) -> ResultType<()> {
        log::info!("shutdown connection {:?}", self.peer_addr);
        match self.client_stream.as_mut() {
            None => {}
            Some(stream) => {
                stream.shutdown().await?;
                return Ok(());
            }
        }
        match self.server_stream.as_mut() {
            None => return Ok(()),
            Some(stream) => {
                stream.shutdown().await?;
                return Ok(());
            }
        }
    }
}

impl Drop for TlsFrameStream {
    fn drop(&mut self) {
        match block_on(self.shutdown()) {
            Err(e) => {
                log::error!("close connection {:?} failed, reson: {:?}", self.peer_addr, e);
            }
            _ => {}
        };
    }
}

Is it wrong with my code?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions