Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,25 @@ edition = "2021"
# Common dependencies for all crates
[workspace.dependencies]
async-trait = "0.1.89"
cgroups-rs = "0.4.0"
crossbeam = "0.8.1"
futures = "0.3.19"
libc = "0.2.112"
log = {version = "0.4.2", features=["kv_unstable"]}
nix = "0.30"
oci-spec = "0.7"
os_pipe = "1.1"
prctl = "1.0.0"
prost = "0.14"
prost-build = "0.14"
prost-types = "0.14"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
simple_logger = { version = "5.0", default-features = false }
tempfile = "3.6"
cgroups-rs = { version = "0.5", default-features = false }
crossbeam = { version = "0.8", default-features = false }
futures = { version = "0.3", default-features = false }
libc = { version = "0.2", default-features = false }
log = { version = "0.4", default-features = false }
nix = { version = "0.31", default-features = false }
oci-spec = { version = "0.9", default-features = false }
prost = { version = "0.14", default-features = false }
prost-build = { version = "0.14", default-features = false }
prost-types = { version = "0.14", default-features = false }
serde = { version = "1.0", default-features = false }
serde_json = { version = "1.0", default-features = false }
simple_logger = { version = "5.2", default-features = false }
tempfile = "3.25"
thiserror = "2.0"
time = { version = "0.3.29", features = ["serde", "std", "formatting"] }
tokio = "1.26"
tonic = "0.14"
time = { version = "0.3", default-features = false }
tokio = { version = "1.49", default-features = false }
tonic = { version = "0.14", default-features = false }
tonic-prost = "0.14"
tonic-prost-build = "0.14"
tower = "0.5"
uuid = { version = "1.0", features = ["v4"] }
tonic-prost-build = { version = "0.14", default-features = false }
tower = { version = "0.5", default-features = false }
uuid = { version = "1.21", default-features = false }
2 changes: 1 addition & 1 deletion clippy.toml
Original file line number Diff line number Diff line change
@@ -1 +1 @@
msrv = "1.66"
msrv = "1.91"
14 changes: 7 additions & 7 deletions crates/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ name = "version"
path = "examples/version.rs"

[dependencies]
hyper-util = "0.1.6" # https://github.com/hyperium/hyper/issues/3110
prost.workspace = true
prost-types.workspace = true
tokio = { workspace = true, optional = true }
tonic.workspace = true
hyper-util = { version = "0.1.20", default-features = false, features = ["tokio"] }
prost = { workspace = true, features = ["derive", "std"] }
prost-types = { workspace = true, features = ["std"] }
tokio = { workspace = true, features = ["net"], optional = true }
tonic = { workspace = true, features = ["codegen", "channel"] }
tonic-prost.workspace = true
tower = { workspace = true, optional = true }
tower = { workspace = true, features = ["util"], optional = true }

[build-dependencies]
tonic-prost-build.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["rt", "macros"]}
tokio = { workspace = true, features = ["rt", "macros", "net"] }

[features]
connect = ["tokio", "tower"]
Expand Down
1 change: 1 addition & 0 deletions crates/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub mod types {

/// Generated `google.rpc` types, containerd services typically use some of these types.
pub mod google {
#[allow(rustdoc::broken_intra_doc_links)]
pub mod rpc {
tonic::include_proto!("google.rpc");
}
Expand Down
19 changes: 9 additions & 10 deletions crates/runc-shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,22 @@ doc = false
containerd-shim = { path = "../shim", version = "0.10.0", features = ["async"] }
libc.workspace = true
log.workspace = true
nix = { workspace = true, features = ["socket", "uio", "term"] }
oci-spec.workspace = true
prctl.workspace = true
nix = { workspace = true, features = ["socket", "uio", "term", "signal"] }
oci-spec = { workspace = true, features = ["runtime"] }
runc = { path = "../runc", version = "0.3.0", features = ["async"] }
serde.workspace = true
serde_json.workspace = true
time.workspace = true
uuid.workspace = true
serde = { workspace = true, features = ["derive", "std"] }
serde_json = { workspace = true, features = ["std"] }
time = { workspace = true, features = ["std"] }
uuid = { workspace = true, features = ["v4"] }
# Async dependencies
async-trait.workspace = true
tokio = { workspace = true, features = ["full"] }
rustix = { version = "1", features = ["termios"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process", "sync", "fs", "io-util", "net", "time", "signal"] }
rustix = { version = "1.1", default-features = false, features = ["std", "termios"] }

[package.metadata.cargo-machete]
ignored = ["libc"]

[target.'cfg(target_os = "linux")'.dependencies]
cgroups-rs.workspace = true
nix = { workspace = true, features = ["event"] }
tokio-eventfd = "0.2.1"
tokio-eventfd = "0.2.2"
20 changes: 10 additions & 10 deletions crates/runc-shim/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,17 @@ impl Shim for Service {
// Our goal is to set thp disable = true on the shim side and then restore thp
// disable before starting runc. So we only need to focus on the return value
// of the function get_thp_disabled, which is Result<bool, i32>.
let thp_disabled = match prctl::get_thp_disable() {
Ok(x) => {
// The return value of the function set_thp_disabled is Result<(), i32>,
// we don't care if the setting is successful, because even if the
// setting failed, we should not exit the shim process, therefore,
// there is no need to pay attention to the set_thp_disabled function's
// return value.
let _ = prctl::set_thp_disable(true);
x.to_string()
let thp_disabled = {
let ret = unsafe { libc::prctl(libc::PR_GET_THP_DISABLE, 0, 0, 0, 0) };
if ret >= 0 {
let was_disabled = ret > 0;
// We don't care if the setting is successful, because even if the
// setting failed, we should not exit the shim process.
let _ = unsafe { libc::prctl(libc::PR_SET_THP_DISABLE, 1u64, 0, 0, 0) };
was_disabled.to_string()
} else {
String::new()
}
Err(_) => String::new(),
};
let vars: Vec<(&str, &str)> = vec![("THP_DISABLED", thp_disabled.as_str())];

Expand Down
17 changes: 7 additions & 10 deletions crates/runc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,18 @@ docs = []
libc.workspace = true
log.workspace = true
nix = { workspace = true, features = ["user", "fs"] }
oci-spec.workspace = true
path-absolutize = "3.0.11"
prctl.workspace = true
serde.workspace = true
serde_json.workspace = true
oci-spec = { workspace = true, features = ["runtime"] }
serde = { workspace = true, features = ["derive", "std"] }
serde_json = { workspace = true, features = ["std"] }
tempfile.workspace = true
thiserror.workspace = true
time.workspace = true
uuid.workspace = true
os_pipe.workspace = true
time = { workspace = true, features = ["serde", "std"] }
uuid = { workspace = true, features = ["v4"] }

# Async dependencies
async-trait = { workspace = true, optional = true }
tokio = { workspace = true, features = ["full"], optional = true }
tokio-pipe = { version = "0.2.10", optional = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "process", "sync", "fs", "io-util", "net", "time"], optional = true }
tokio-pipe = { version = "0.2.12", default-features = false, optional = true }

[package.metadata.docs.rs]
features = ["docs"]
4 changes: 3 additions & 1 deletion crates/runc/src/asynchronous/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@ impl PipedIo {
let rd = pipe.rd.try_clone()?;
nix::unistd::fchown(rd, uid, gid)?;
} else {
let wr = pipe.wr.try_clone()?;
let wr = pipe
.try_clone_wr()
.ok_or_else(|| std::io::Error::other("write end closed"))?;
nix::unistd::fchown(wr, uid, gid)?;
}
Ok(Some(pipe))
Expand Down
22 changes: 13 additions & 9 deletions crates/runc/src/asynchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ mod runc;
use std::{fmt::Debug, io::Result, os::fd::AsRawFd};

use async_trait::async_trait;
use log::debug;
pub use pipe::Pipe;
pub use runc::{DefaultExecutor, Spawner};
use tokio::io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -57,10 +56,11 @@ pub struct PipedIo {
impl Io for PipedIo {
fn stdin(&self) -> Option<Box<dyn AsyncWrite + Send + Sync + Unpin>> {
self.stdin.as_ref().and_then(|pipe| {
let fd = pipe.wr.as_raw_fd();
tokio_pipe::PipeWrite::from_raw_fd_checked(fd)
.map(|x| Box::new(x) as Box<dyn AsyncWrite + Send + Sync + Unpin>)
.ok()
pipe.wr_as_raw_fd().and_then(|fd| {
tokio_pipe::PipeWrite::from_raw_fd_checked(fd)
.map(|x| Box::new(x) as Box<dyn AsyncWrite + Send + Sync + Unpin>)
.ok()
})
})
}

Expand Down Expand Up @@ -91,12 +91,16 @@ impl Io for PipedIo {
}

if let Some(p) = self.stdout.as_ref() {
let pw = p.wr.try_clone()?;
let pw = p
.try_clone_wr()
.ok_or_else(|| std::io::Error::other("write end closed"))?;
cmd.stdout(pw);
}

if let Some(p) = self.stderr.as_ref() {
let pw = p.wr.try_clone()?;
let pw = p
.try_clone_wr()
.ok_or_else(|| std::io::Error::other("write end closed"))?;
cmd.stdout(pw);
}

Expand All @@ -105,11 +109,11 @@ impl Io for PipedIo {

async fn close_after_start(&self) {
if let Some(p) = self.stdout.as_ref() {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stdout: {}", e));
p.close_wr();
}

if let Some(p) = self.stderr.as_ref() {
nix::unistd::close(p.wr.as_raw_fd()).unwrap_or_else(|e| debug!("close stderr: {}", e));
p.close_wr();
}
}
}
57 changes: 42 additions & 15 deletions crates/runc/src/asynchronous/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
limitations under the License.
*/

use std::os::unix::io::OwnedFd;

use tokio::net::unix::pipe;
use std::{
os::unix::io::{AsRawFd, OwnedFd, RawFd},
sync::Mutex,
};

/// Struct to represent a pipe that can be used to transfer stdio inputs and outputs.
///
Expand All @@ -25,44 +26,70 @@ use tokio::net::unix::pipe;
#[derive(Debug)]
pub struct Pipe {
pub rd: OwnedFd,
pub wr: OwnedFd,
wr: Mutex<Option<OwnedFd>>,
}

impl Pipe {
pub fn new() -> std::io::Result<Self> {
let (tx, rx) = pipe::pipe()?;
let rd = rx.into_blocking_fd()?;
let wr = tx.into_blocking_fd()?;
Ok(Self { rd, wr })
let (rd, wr) = std::io::pipe()?;
Ok(Self {
rd: OwnedFd::from(rd),
wr: Mutex::new(Some(OwnedFd::from(wr))),
})
}

/// Return the raw fd of the write end. Returns `None` if closed.
pub fn wr_as_raw_fd(&self) -> Option<RawFd> {
self.wr.lock().unwrap().as_ref().map(|w| w.as_raw_fd())
}

/// Clone the write end. Returns `None` if closed.
pub fn try_clone_wr(&self) -> Option<OwnedFd> {
self.wr
.lock()
.unwrap()
.as_ref()
.and_then(|w| w.try_clone().ok())
}

/// Close the write end by dropping it. No-op if already closed.
pub fn close_wr(&self) {
let _ = self.wr.lock().unwrap().take();
}

/// Take ownership of the write end. Returns `None` if already closed.
pub fn take_wr(&self) -> Option<OwnedFd> {
self.wr.lock().unwrap().take()
}
}

#[cfg(test)]
mod tests {
use std::os::fd::IntoRawFd;

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::unix::pipe,
};

use super::*;

#[tokio::test]
async fn test_pipe_creation() {
let pipe = Pipe::new().expect("Failed to create pipe");
let wr = pipe.take_wr().unwrap();
assert!(
pipe.rd.into_raw_fd() >= 0,
"Read file descriptor is invalid"
);
assert!(
pipe.wr.into_raw_fd() >= 0,
"Write file descriptor is invalid"
);
assert!(wr.into_raw_fd() >= 0, "Write file descriptor is invalid");
}

#[tokio::test]
async fn test_pipe_write_read() {
let pipe = Pipe::new().expect("Failed to create pipe");
let mut write_end = pipe::Sender::from_owned_fd(pipe.take_wr().unwrap()).unwrap();
let mut read_end = pipe::Receiver::from_owned_fd(pipe.rd).unwrap();
let mut write_end = pipe::Sender::from_owned_fd(pipe.wr).unwrap();
let write_data = b"hello";

write_end
Expand All @@ -85,8 +112,8 @@ mod tests {
#[tokio::test]
async fn test_pipe_async_write_read() {
let pipe = Pipe::new().expect("Failed to create pipe");
let mut write_end = pipe::Sender::from_owned_fd(pipe.take_wr().unwrap()).unwrap();
let mut read_end = pipe::Receiver::from_owned_fd(pipe.rd).unwrap();
let mut write_end = pipe::Sender::from_owned_fd(pipe.wr).unwrap();

let write_data = b"hello";
tokio::spawn(async move {
Expand Down
13 changes: 10 additions & 3 deletions crates/runc/src/asynchronous/runc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,16 @@ impl Runc {
#[cfg(target_os = "linux")]
if let Ok(thp) = std::env::var("THP_DISABLED") {
if let Ok(thp_disabled) = thp.parse::<bool>() {
if let Err(e) = prctl::set_thp_disable(thp_disabled) {
debug!("set_thp_disable err: {}", e);
};
let ret = libc::prctl(
libc::PR_SET_THP_DISABLE,
if thp_disabled { 1u64 } else { 0u64 },
0,
0,
0,
);
if ret < 0 {
debug!("set_thp_disable err: {}", std::io::Error::last_os_error());
}
}
}
Ok(())
Expand Down
8 changes: 5 additions & 3 deletions crates/runc/src/synchronous/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ impl PipedIo {
let rd = pipe.rd.try_clone()?;
nix::unistd::fchown(rd, uid, gid)?;
} else {
let wr = pipe.wr.try_clone()?;
let wr = pipe
.try_clone_wr()
.ok_or_else(|| std::io::Error::other("write end closed"))?;
nix::unistd::fchown(wr, uid, gid)?;
}
Ok(Some(pipe))
Expand Down Expand Up @@ -235,7 +237,7 @@ mod tests {
buf[0] = 0xce;
io.stdout
.as_ref()
.map(|v| v.wr.try_clone().unwrap().write(&buf).unwrap());
.map(|v| v.try_clone_wr().unwrap().write(&buf).unwrap());
buf[0] = 0x0;
stdout.read_exact(&mut buf).unwrap();
assert_eq!(&buf, &[0xceu8]);
Expand All @@ -244,7 +246,7 @@ mod tests {
buf[0] = 0xa5;
io.stderr
.as_ref()
.map(|v| v.wr.try_clone().unwrap().write(&buf).unwrap());
.map(|v| v.try_clone_wr().unwrap().write(&buf).unwrap());
buf[0] = 0x0;
stderr.read_exact(&mut buf).unwrap();
assert_eq!(&buf, &[0xa5u8]);
Expand Down
Loading
Loading