Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
358 changes: 196 additions & 162 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
members = [
"atsc",
"tools",
"lib_vsri",
"wavbrro",
"vsri",
"csv-compressor"
]
resolver = "2"
Expand All @@ -16,4 +16,4 @@ opt-level = 3
codegen-units = 1

[workspace.dependencies]
clap = {version = "4.3.14", features = ["derive"] }
clap = {version = "4.5", features = ["derive"] }
11 changes: 6 additions & 5 deletions atsc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ description = "An Advanced Time-Series Compressor"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
env_logger = "0.11.0"
log = "0.4.0"
clap = {version = "4.3.14", features = ["derive"] }
log = "0.4.22"
clap = {version = "4.5", features = ["derive"] }
bincode = "2.0.0-rc.3"
rustfft = "6.1.0"
tempfile = "3.2"
rustfft = "6.2.0"
tempfile = "3.14"
average = "0.15.1"
regex = "1.9.1"
regex = "1.11"
hound = "3.5"
median = "0.3.2"
wavbrro = { path = "../wavbrro" }
lib_vsri = { path = "../lib_vsri" }
splines = "4.3.0"
inverse_distance_weight = "0.1.1"
num-traits = "0.2"
Expand Down
11 changes: 11 additions & 0 deletions atsc/src/compressor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ use self::constant::{constant_compressor, constant_to_data};
use self::fft::{fft, fft_compressor, fft_to_data};
use self::noop::{noop, noop_to_data};
use self::polynomial::{polynomial, polynomial_allowed_error, to_data, PolynomialType};
use self::vsri::{vsri_compressor_bytes, vsri_to_data};

pub mod constant;
pub mod fft;
pub mod noop;
pub mod polynomial;
pub mod vsri;

#[derive(Encode, Decode, Default, Debug, Clone, Copy, Eq, Hash, PartialEq)]
pub enum Compressor {
Expand All @@ -38,6 +40,7 @@ pub enum Compressor {
Constant,
Polynomial,
Auto,
VSRI,
}

/// Struct to store the results of a compression round. Will be used to pick the best compressor.
Expand Down Expand Up @@ -110,6 +113,14 @@ impl Compressor {
_ => todo!(),
}
}

pub fn compress_vsri(&self, data: &[i32]) -> Vec<u8> {
vsri_compressor_bytes(data)
}

pub fn decompress_vsri(&self, data: &[u8]) -> Vec<i32> {
vsri_to_data(data)
}
}

pub struct BinConfig {
Expand Down
117 changes: 117 additions & 0 deletions atsc/src/compressor/vsri.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Copyright 2024 NetApp, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

use crate::compressor::CompressorResult;

use super::BinConfig;
use bincode::{Decode, Encode};
use lib_vsri::vsri::Vsri;
use log::debug;

const VSRI_COMPRESSOR_ID: u8 = 249;

/// Compressor frame for static data, stores the value and nothing else.
#[derive(Debug, Clone, Encode, Decode)]
pub struct VSRI {
pub id: u8,
pub vsri: Vsri,
}

impl VSRI {
/// Creates a new instance of the Constant compressor with the size needed to handle the worst case
pub fn new() -> Self {
debug!("Constant compressor");
VSRI {
id: VSRI_COMPRESSOR_ID,
vsri: Vsri::new("placeholder"),
}
}

/// Receives a data stream and generates a Constant
pub fn decompress(data: &[u8]) -> Self {
let config = BinConfig::get();
let (ct, _) = bincode::decode_from_slice(data, config).unwrap();
ct
}

/// This function transforms the structure into a Binary stream
pub fn to_bytes(&self) -> Vec<u8> {
// Use Bincode and flate2-rs? Do this at the Stream Level?
let config = BinConfig::get();
bincode::encode_to_vec(self, config).unwrap()
}

/// Returns an array of data. It creates an array of data the size of the frame with a constant value
/// and pushes the residuals to the right place.
pub fn to_data(&self) -> Vec<i32> {
self.vsri.get_all_timestamps()
}
}

pub fn vsri_compressor(data: &[i32]) -> CompressorResult {
debug!("Initializing VSRI Compressor. Error and Stats provided");
// Initialize the compressor
let mut c = VSRI::new();
for ts in data {
c.vsri.update_for_point(*ts).unwrap();
}
// Convert to bytes
CompressorResult::new(c.to_bytes(), 0.0)
}

pub fn vsri_compressor_bytes(data: &[i32]) -> Vec<u8> {
debug!("Initializing VSRI Compressor. Error and Stats provided");
// Initialize the compressor
let mut c = VSRI::new();
for ts in data {
c.vsri.update_for_point(*ts).unwrap();
}
// Convert to bytes
c.to_bytes()
}

pub fn vsri_to_data(compressed_data: &[u8]) -> Vec<i32> {
let c = VSRI::decompress(compressed_data);
c.to_data()
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_vsri_simple() {
let timestamps = vec![
1729606100, 1729606120, 1729606140, 1729606160, 1729606180, 1729606200, 1729606220,
1729606240, 1729606260,
];
let vsri = vsri_compressor(&timestamps);
let out = vsri_to_data(&vsri.compressed_data);
assert_eq!(timestamps, out);
}

#[test]
fn test_vsri_several_segments() {
let timestamps = vec![
1729606100, 1729606120, 1729606140, 1729606160, 1729606180, 1729606200, 1729606220,
1729606260, 1729606360, 1729606460, 1729606560, 1729606660, 1729606760, 1729606860,
1729606881, 1729606882, 1729606883, 1729606884, 1729606885, 1729606886, 1729606887,
];
let vsri = vsri_compressor(&timestamps);
let out = vsri_to_data(&vsri.compressed_data);
assert_eq!(timestamps, out);
}
}
2 changes: 1 addition & 1 deletion atsc/src/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub enum Error {

type Result<T> = std::result::Result<T, Error>;

#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Copy, Clone)]
pub struct Sample {
pub timestamp: i64,
pub value: f64,
Expand Down
8 changes: 8 additions & 0 deletions atsc/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ impl CompressedStream {
}
}

/// Compress a chunk of data adding it as a new frame to the current stream
pub fn compress_vsri(&mut self, chunk: &[i32]) {
let mut compressor_frame = CompressorFrame::new(None);
compressor_frame.compress_vsri(chunk);
compressor_frame.close();
self.data_frames.push(compressor_frame);
}

/// Compress a chunk of data adding it as a new frame to the current stream
pub fn compress_chunk(&mut self, chunk: &[f64]) {
let mut compressor_frame = CompressorFrame::new(None);
Expand Down
6 changes: 6 additions & 0 deletions atsc/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ impl CompressorFrame {
self.frame_size = size;
}

/// Compress a vsri index
pub fn compress_vsri(&mut self, data: &[i32]) {
self.sample_count = data.len();
self.data = self.compressor.compress_vsri(data);
}

/// Compress a data and stores the result in the frame
pub fn compress(&mut self, data: &[f64]) {
self.sample_count = data.len();
Expand Down
2 changes: 0 additions & 2 deletions atsc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ limitations under the License.
*/

#![allow(clippy::new_without_default)]
// TODO: re-enable dead code checks
#![allow(dead_code)]
extern crate core;

pub mod compressor;
Expand Down
48 changes: 45 additions & 3 deletions atsc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

use atsc::compressor::Compressor;
use atsc::compressor::{vsri, Compressor};
use atsc::csv::{read_samples, read_samples_with_headers};
use atsc::data::CompressedStream;
use atsc::optimizer::OptimizerPlan;
Expand All @@ -25,6 +25,18 @@ use std::error::Error;
use std::path::PathBuf;
use wavbrro::wavbrro::WavBrro;

/// Process VSRI
/// Function to write out VSRI, since it is straight forward can be written out immediately
fn write_vsri(vsri_data: &[i32], file_path: &mut PathBuf) -> Result<(), Box<dyn Error>> {
// Need special stream for VSRI
let mut cs = CompressedStream::new();
// Compress a VSRI
file_path.set_extension("vsri");
cs.compress_vsri(&vsri_data);
std::fs::write(file_path, cs.to_bytes())?;
Ok(())
}

/// Processes the given input based on the provided arguments.
fn process_args(arguments: &Args) -> Result<(), Box<dyn Error>> {
let metadata = std::fs::metadata(&arguments.input)?;
Expand Down Expand Up @@ -96,13 +108,40 @@ fn process_single_file(mut file_path: PathBuf, arguments: &Args) -> Result<(), B
// Assuming that header[0] is a time field and header[1] is value field
read_samples_with_headers(&file_path, headers[0], headers[1])?
};

// Timestamp needs to be compressed as VSRI
let timestamps: Vec<i32> = (&samples)
.into_iter()
.map(|sample| sample.timestamp as i32)
.collect();
let data: Vec<f64> = samples.into_iter().map(|sample| sample.value).collect();

// Compress the timestamps and write them
write_vsri(&timestamps, &mut file_path.clone())?;
if arguments.verbose {
println!("Input={:?}", data);
}
/* let compressed_data = match arguments.compressor {
CompressorType::Vsri => {
// Get data in i32 format!
let vsri_data = [
1729606100, 1729606120, 1729606140, 1729606160, 1729606180, 1729606200,
1729606220,
];
if arguments.verbose {
println!("Input={:?}", vsri_data);
}

}
_ => {
// Read an WavBRRO file and compress it
let data = WavBrro::from_file(&file_path)?;
if arguments.verbose {
println!("Input={:?}", data);
}
file_path.set_extension("bro");
//compress
compress_data(&data, arguments)
}
}; */
// Compress
let compressed_data = compress_data(&data, arguments);

Expand Down Expand Up @@ -140,6 +179,7 @@ fn compress_data(vec: &[f64], arguments: &Args) -> Vec<u8> {
CompressorType::Fft => op.set_compressor(Compressor::FFT),
CompressorType::Polynomial => op.set_compressor(Compressor::Polynomial),
CompressorType::Idw => op.set_compressor(Compressor::Idw),
CompressorType::Vsri => panic!("Can't provide f64 to VRSI. VSRI is i32 only."),
CompressorType::Auto => op.set_compressor(Compressor::Auto),
}
for (cpr, data) in op.get_execution().into_iter() {
Expand All @@ -155,6 +195,7 @@ fn compress_data(vec: &[f64], arguments: &Args) -> Vec<u8> {
arguments.error as f32 / 100.0,
arguments.compression_selection_sample_level as usize,
),
CompressorType::Vsri => panic!("Can't provide f64 to VRSI. VSRI is i32 only."),
_ => cs.compress_chunk_with(data, cpr.to_owned()),
}
}
Expand Down Expand Up @@ -225,6 +266,7 @@ enum CompressorType {
Constant,
Polynomial,
Idw,
Vsri,
}

fn main() {
Expand Down
8 changes: 4 additions & 4 deletions csv-compressor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ description = "Utilizes ATSC functionalities to compress CSV formatted data"
[dependencies]
atsc = { version = "0.5.0", path = "../atsc" }
clap = { workspace = true, features = ["derive"] }
csv = "1.3.0"
serde = { version = "1.0.171", features = ["derive"] }
csv = "1.3.1"
serde = { version = "1.0", features = ["derive"] }
wavbrro = { version = "0.1.0", path = "../wavbrro" }
log = "0.4.19"
log = "0.4.22"
env_logger = "0.11.0"
vsri = { version = "0.1.0", path = "../vsri" }
lib_vsri = { version = "0.2.0", path = "../lib_vsri" }
[dev-dependencies]
tempdir = "0.3.7"
2 changes: 1 addition & 1 deletion csv-compressor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ use atsc::data::CompressedStream;
use atsc::optimizer::OptimizerPlan;
use atsc::utils::readers::bro_reader::read_file;
use clap::{arg, Parser};
use lib_vsri::vsri::Vsri;
use log::debug;
use std::fs;
use std::path::{Path, PathBuf};
use vsri::Vsri;
use wavbrro::wavbrro::WavBrro;

mod csv;
Expand Down
2 changes: 1 addition & 1 deletion csv-compressor/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ limitations under the License.
*/

use crate::csv::Sample;
use lib_vsri::vsri::{day_elapsed_seconds, Vsri};
use std::fmt::{Debug, Display, Formatter};
use std::path::Path;
use vsri::{day_elapsed_seconds, Vsri};
use wavbrro::wavbrro::WavBrro;

/// Metric is responsible for generating WavBrro and VSRI from parsed Samples
Expand Down
14 changes: 14 additions & 0 deletions lib_vsri/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "lib_vsri"
version = "0.2.0"
edition = "2021"
authors = ["Carlos Rolo <carlos.rolo@netapp.com>"]
license = "Apache-2.0"
description = "Very small index for a sequence of timestamps"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
chrono = "0.4.38"
log = "0.4.22"
bincode = "2.0.0-rc.3"
3 changes: 3 additions & 0 deletions lib_vsri/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# VSRI (Very Small Rolo Index)

Under construction!
Loading
Loading