Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,047 changes: 597 additions & 450 deletions Cargo.lock

Large diffs are not rendered by default.

31 changes: 19 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ perf-literal = ["regex/perf-literal"]

[dependencies]
anyhow = {version = "1.0.71", features = ["backtrace"]}
async-compression = { version = "0.4.0", features = ["all", "all-algorithms", "tokio"] }
async-compression = { version = "0.4.11", features = ["all", "all-algorithms", "tokio"] }
async-stream = "0.3.5"
async-trait = "0.1.68"
async_zip = {version = "0.0.12", features = ["full"]}
async_zip = {version = "0.0.18", features = ["full"]}
bincode = "1.3.3"
bytes = "1.4.0"
bytes = "1.6.0"
clap = {version = "4.3.0", features = ["wrap_help"]}
crossbeam = "0.8.2"
crossbeam-channel = "0.5.8"
Expand All @@ -40,36 +40,43 @@ glob = "0.3.1"
json_comments = "0.2.1"
lazy_static = "1.4.0"
log = "0.4.17"
mailparse = "0.14.0"
memchr = "2.5.0"
mailparse = "0.15.0"
memchr = "2.7.4"
mime2ext = "0.1.52"
open = "5"
paste = "1.0.12"
path-clean = "1.0.1"
pretty-bytes = "0.2.2"
regex = "1.8.2"
rusqlite = {version = "0.30.0", features = ["vtab", "bundled"]}
regex = "1.10.4"
zstd = "0.13"
rusqlite = {version = "0.32.0", features = ["vtab", "bundled"]}
schemars = {version = "0.8.12", features = ["preserve_order"]}
serde = {version = "1.0.163", features = ["derive"]}
serde_json = "1.0.96"
size_format = "1.0.2"
structopt = "0.3.26"
tempfile = "3.5.0"
tokio = {version = "1.28.1", features = ["full"]}
tokio-rusqlite = "0.5.0"
tokio-stream = {version = "0.1.14", features = ["io-util", "tokio-util"]}
tokio = {version = "1.39.2", features = ["full"]}
tokio-rusqlite = "0.6.0"
tokio-stream = {version = "0.1.15", features = ["io-util", "tokio-util"]}
astral-tokio-tar = "0.5.6"
tokio-util = {version = "0.7.8", features = ["io", "full"]}
tree_magic = {package = "tree_magic_mini", version = "3.0.3"}
tar = "0.4.40"
tokio-util = {version = "0.7.11", features = ["io", "full"]}
tree_magic = {package = "tree_magic_mini", version = "3.1.0"}
futures-lite = "2.6.1"
futures = "0.3.31"

[dev-dependencies]
async-recursion = "1.0.4"
ctor = "0.2.0"
pretty_assertions = "1.3.0"
tempfile = "3.5.0"
tokio-test = "0.4.2"
tar = "0.4.40"

[profile.release]
debug = true
lto = "thin"
split-debuginfo = "packed"
codegen-units = 1
opt-level = "z"
3 changes: 2 additions & 1 deletion src/adapters/custom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,8 @@ mod test {

let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
// let r = adapter.adapt(a, &d)?;
let r = loop_adapt(&adapter, d, a).await?;
let engine = crate::preproc::make_engine(&a.config)?;
let r = loop_adapt(engine, &adapter, d, a).await?;
let o = adapted_to_vec(r).await?;
assert_eq!(
String::from_utf8(o)?,
Expand Down
3 changes: 2 additions & 1 deletion src/adapters/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,8 @@ mod tests {
let filepath = test_data_dir().join("short.pdf.gz");

let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let r = loop_adapt(&adapter, d, a).await?;
let engine = crate::preproc::make_engine(&a.config)?;
let r = loop_adapt(engine, &adapter, d, a).await?;
let o = adapted_to_vec(r).await?;
assert_eq!(
String::from_utf8(o)?,
Expand Down
3 changes: 2 additions & 1 deletion src/adapters/mbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ mod tests {
let filepath = test_data_dir().join("mail_with_attachment.mbox");

let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));
let mut r = loop_adapt(&adapter, d, a).await?;
let engine = crate::preproc::make_engine(&a.config)?;
let mut r = loop_adapt(engine, &adapter, d, a).await?;
let mut count = 0;
while let Some(file) = r.next().await {
let mut file = file?;
Expand Down
56 changes: 35 additions & 21 deletions src/adapters/postproc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,38 +92,42 @@ async fn postproc_encoding(
let has_binary = fourk.contains(&0u8);

let enc = Encoding::for_bom(&fourk);
let inp = Cursor::new(fourk).chain(beginning.into_inner());
let combined = Cursor::new(fourk).chain(beginning.into_inner());
match enc {
Some((enc, _)) if enc != encoding_rs::UTF_8 => {
// detected UTF16LE or UTF16BE, convert to UTF8 in separate thread
// TODO: parse these options from ripgrep's configuration
let encoding = None; // detect bom but usually assume utf8
let encoding = None;
let bom_sniffing = true;
let mut decode_builder = DecodeReaderBytesBuilder::new();
// https://github.com/BurntSushi/ripgrep/blob/a7d26c8f144a4957b75f71087a66692d0b25759a/grep-searcher/src/searcher/mod.rs#L706
// this detects utf-16 BOMs and transcodes to utf-8 if they are present
// it does not detect any other char encodings. that would require https://github.com/hsivonen/chardetng or similar but then binary detection is hard (?)
let mut inp = decode_builder
let mut builder = DecodeReaderBytesBuilder::new();
let reader = builder
.encoding(encoding)
.utf8_passthru(true)
.strip_bom(bom_sniffing)
.bom_override(true)
.bom_sniffing(bom_sniffing)
.build(SyncIoBridge::new(inp));
let oup = tokio::task::spawn_blocking(move || -> Result<Vec<u8>> {
let mut oup = Vec::new();
std::io::Read::read_to_end(&mut inp, &mut oup)?;
Ok(oup)
.build(SyncIoBridge::new(combined));
let (w, r) = tokio::io::duplex(64 * 1024);
let mut writer = SyncIoBridge::new(w);
tokio::task::spawn_blocking(move || -> Result<()> {
let mut buf = [0u8; 1 << 15];
let mut rdr = reader;
loop {
let n = std::io::Read::read(&mut rdr, &mut buf)?;
if n == 0 {
break;
}
std::io::Write::write_all(&mut writer, &buf[..n])?;
}
Ok(())
})
.await??;
Ok(Box::pin(Cursor::new(oup)))
Ok(Box::pin(r))
}
_ => {
if has_binary {
log::debug!("detected binary");
return Ok(Box::pin(Cursor::new("[rga: binary data]")));
}
Ok(Box::pin(inp))
Ok(Box::pin(combined))
}
}
}
Expand All @@ -133,9 +137,8 @@ pub fn postproc_prefix<T: AsyncRead + Send>(
line_prefix: &str,
inp: T,
) -> impl AsyncRead + Send + use<T> {
let line_prefix_n = format!("\n{line_prefix}"); // clone since we need it later
let line_prefix_n = format!("\n{line_prefix}");
let line_prefix_o = Bytes::copy_from_slice(line_prefix.as_bytes());
let regex = regex::bytes::Regex::new("\n").unwrap();
let inp_stream = ReaderStream::new(inp);
let oup_stream = stream! {
yield Ok(line_prefix_o);
Expand All @@ -144,7 +147,15 @@ pub fn postproc_prefix<T: AsyncRead + Send>(
Err(e) => yield Err(e),
Ok(chunk) => {
if chunk.contains(&b'\n') {
yield Ok(Bytes::copy_from_slice(&regex.replace_all(&chunk, line_prefix_n.as_bytes())));
let mut out = Vec::with_capacity(chunk.len() + 16);
let mut last = 0usize;
for pos in memchr::memchr_iter(b'\n', &chunk) {
out.extend_from_slice(&chunk[last..pos]);
out.extend_from_slice(line_prefix_n.as_bytes());
last = pos + 1;
}
out.extend_from_slice(&chunk[last..]);
yield Ok(Bytes::from(out));
} else {
yield Ok(chunk);
}
Expand Down Expand Up @@ -182,7 +193,9 @@ impl FileAdapter for PostprocPageBreaks {
a: super::AdaptInfo,
_detection_reason: &crate::matching::FileMatcher,
) -> Result<AdaptedFilesIterBox> {
let read = postproc_pagebreaks(postproc_encoding(&a.line_prefix, a.inp).await?);
let read: Pin<Box<dyn AsyncRead + Send>> = Box::pin(postproc_pagebreaks(
postproc_encoding(&a.line_prefix, a.inp).await?,
));
// keep adapt info (filename etc) except replace inp
let ai = AdaptInfo {
inp: Box::pin(read),
Expand Down Expand Up @@ -293,7 +306,8 @@ mod tests {
let fname = test_data_dir().join("twoblankpages.pdf");
let rd = File::open(&fname).await?;
let (a, d) = simple_adapt_info(&fname, Box::pin(rd));
let res = loop_adapt(&adapter, d, a).await?;
let engine = crate::preproc::make_engine(&a.config)?;
let res = loop_adapt(engine, &adapter, d, a).await?;

let buf = adapted_to_vec(res).await?;

Expand Down
3 changes: 2 additions & 1 deletion src/adapters/tar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ mod tests {
let (a, d) = simple_adapt_info(&filepath, Box::pin(File::open(&filepath).await?));

let adapter = TarAdapter::new();
let r = loop_adapt(&adapter, d, a).await.context("adapt")?;
let engine = crate::preproc::make_engine(&a.config)?;
let r = loop_adapt(engine, &adapter, d, a).await.context("adapt")?;
let o = adapted_to_vec(r).await.context("adapted_to_vec")?;
assert_eq!(
String::from_utf8(o).context("parsing utf8")?,
Expand Down
Loading