From bd43445b6e56d5be91b2b5da88e7935f6ab092b9 Mon Sep 17 00:00:00 2001 From: Vidur Khanal Date: Mon, 30 Dec 2024 22:52:46 -0500 Subject: [PATCH 1/5] chore: fix test for columnquery --- src/columnquery/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/columnquery/mod.rs b/src/columnquery/mod.rs index 44d464f..b53acda 100644 --- a/src/columnquery/mod.rs +++ b/src/columnquery/mod.rs @@ -80,9 +80,9 @@ mod tests { .unwrap(), ); let column_query = ColumnQuery::new(&dal); - let qs = "arch=aarch64,node=focal|parca_agent_cpu:samples:count:cpu:nanoseconds"; + let qs = "arch=aarch64|parca_agent_cpu:samples:count:cpu:nanoseconds"; let x = column_query - .query(ColumnQueryRequest::GeneratePprof, qs, 0) + .query(ColumnQueryRequest::GeneratePprof, qs, 1734496813872) .await .unwrap(); } From 213bb37bc0bbb401c33ef3f2f3c6806480dcc37f Mon Sep 17 00:00:00 2001 From: Vidur Khanal Date: Mon, 30 Dec 2024 22:53:29 -0500 Subject: [PATCH 2/5] fix (in progress) symolized stack record --- src/dal/mod.rs | 334 ++++++++++++++++++++++++++----------------------- 1 file changed, 174 insertions(+), 160 deletions(-) diff --git a/src/dal/mod.rs b/src/dal/mod.rs index 134d5e2..5440a0d 100644 --- a/src/dal/mod.rs +++ b/src/dal/mod.rs @@ -7,16 +7,16 @@ use crate::{ }, utils, }, - schema_builder::{self, symbolized_record_schema}, + schema_builder::{self, locations_inner_field, symbolized_record_schema}, symbolizer::Symbolizer, }; use datafusion::{ arrow::{ array::{ - Array, ArrayBuilder, AsArray, BinaryDictionaryBuilder, GenericListBuilder, - Int64Builder, ListBuilder, NullArray, RecordBatch, StructBuilder, UInt64Builder, + Array, ArrayBuilder, AsArray, BinaryDictionaryBuilder, GenericListBuilder, Int64Array, + Int64Builder, ListBuilder, RecordBatch, StructBuilder, UInt64Builder, }, - datatypes::{DataType, Field, Fields, Int32Type}, + datatypes::Int32Type, }, catalog::TableProvider, datasource::{ @@ -94,9 +94,9 @@ impl DataAccessLayer { pub async fn get_provider(&self) -> anyhow::Result> { let mut cp = self.cached_provider.lock().unwrap(); - if cp.created_at.elapsed() < self.max_cache_stale_duration { - return Ok(Arc::clone(&cp.provider)); - } + //if cp.created_at.elapsed() < self.max_cache_stale_duration { + // return Ok(Arc::clone(&cp.provider)); + //} let cp_ = self.create_cached_provider()?; let p = Arc::clone(&cp.provider); @@ -113,6 +113,7 @@ impl DataAccessLayer { pub async fn select_single(&self, qs: &str, time: i64) -> anyhow::Result { let (records, value_col, meta) = self.find_single(qs, time).await?; + //println!("Non-Symbolized Records: {:?}", records); let symbolized_records: Vec = self.symbolize_records(records, value_col, &meta).await?; @@ -136,6 +137,24 @@ impl DataAccessLayer { qs: &str, time: i64, ) -> anyhow::Result<(Vec, &str, profile::Meta)> { + let ctx = SessionContext::new(); + let session_state = ctx.state(); + let table_path = ListingTableUrl::parse("evprofiler-data")?; + + let file_format = ParquetFormat::new(); + let listing_options = + ListingOptions::new(Arc::new(file_format)).with_file_extension(".parquet"); + + let resolved_schema = listing_options + .infer_schema(&session_state, &table_path) + .await?; + + let config = ListingTableConfig::new(table_path) + .with_listing_options(listing_options) + .with_schema(resolved_schema); + + let provider = Arc::new(ListingTable::try_new(config.clone())?); + let (mut meta, mut filter_expr) = qs_to_meta_and_filter_expr(qs)?; filter_expr.push(col(COLUMN_TIMESTAMP).eq(lit(time))); @@ -144,11 +163,10 @@ impl DataAccessLayer { .reduce(|acc, pred| and(acc, pred)) .unwrap(); - let ctx = SessionContext::new(); let value_column = "sum(value)"; let aggr_expr = vec![sum(col(COLUMN_VALUE)).alias(value_column)]; let group_expr = vec![col(COLUMN_STACKTRACE)]; - let df = ctx.read_table(self.get_provider().await?)?; + let df = ctx.read_table(provider)?; let df = df.filter(filter_expr)?; let df = df.aggregate(group_expr, aggr_expr)?; let record = df.collect().await?; @@ -171,11 +189,12 @@ impl DataAccessLayer { Some(sc) => sc, None => anyhow::bail!("Missing column: {}", COLUMN_STACKTRACE), }); + let value_column = Arc::clone(match record.column_by_name(value_col) { Some(sc) => sc, None => anyhow::bail!("Missing column: {}", value_col), }); - let values_per_second = Arc::new(NullArray::new(value_col.len())); + let values_per_second = Arc::new(Int64Array::new_null(value_column.len())); let locations_record = self.resolve_stacks(stacktrace_col).await?; let records = vec![ @@ -184,7 +203,16 @@ impl DataAccessLayer { values_per_second, ]; - let record_batch = RecordBatch::try_new(Arc::new(symbolized_record_schema()), records)?; + let record_batch = + match RecordBatch::try_new(Arc::new(symbolized_record_schema()), records) { + Ok(rb) => rb, + Err(e) => { + anyhow::bail!( + "Failed to create record batch from locations array. Details: {}", + e + ) + } + }; res.push(record_batch); } @@ -196,13 +224,15 @@ impl DataAccessLayer { Some(sc) => sc, None => anyhow::bail!("stacktrace column couldnot be downcasted to binary array."), }; - let stacktrace_col = match stacktrace_col.values().as_binary_opt::() { + println!("(before) stacktrace_col: {:?}", stacktrace_col.len()); + let stacktrace_col_val = match stacktrace_col.values().as_binary_opt::() { Some(sc) => sc, None => anyhow::bail!("stacktrace column couldnot be downcasted to binary array."), }; + println!("(after)stacktrace_col: {:?}", stacktrace_col_val.len()); let symbolized_locations = - utils::symbolize_locations(stacktrace_col, Arc::clone(&self.symbolizer)).await?; + utils::symbolize_locations(stacktrace_col_val, Arc::clone(&self.symbolizer)).await?; let mut locations_list = locations_array_builder(); for (indx, stacktrace) in stacktrace_col.iter().enumerate() { @@ -210,188 +240,172 @@ impl DataAccessLayer { locations_list.append_null(); continue; } - locations_list.append(true); - let locations: &mut StructBuilder = locations_list.values(); - locations.append(true); + let start = stacktrace_col.offsets()[indx] as usize; + let end = stacktrace_col.offsets()[indx + 1] as usize; - let addresses = locations.field_builder::(0).unwrap(); - if let Some(symbolized_location) = &symbolized_locations[indx] { - addresses.append_value(symbolized_location.address); - - if let Some(mapping) = &symbolized_location.mapping { - let mapping_build_id = locations - .field_builder::>(5) - .unwrap(); - if !mapping.build_id.is_empty() { - mapping_build_id.append_value(mapping.build_id.as_bytes()); - } else { - mapping_build_id.append_value("".as_bytes()); - } + for j in start..end { + let locations: &mut StructBuilder = locations_list.values(); - let mapping_file = locations - .field_builder::>(4) - .unwrap(); - if !mapping.file.is_empty() { - mapping_file.append_value(mapping.file.as_bytes()); - } else { - mapping_file.append_value("".as_bytes()); - } + let addresses = locations.field_builder::(0).unwrap(); + if let Some(symbolized_location) = &symbolized_locations[j] { + //println!("Symbolized Location: {:?}", symbolized_locations[j]); + //println!("Symbolized Location: {:?}", symbolized_location); + addresses.append_value(symbolized_location.address); - let mapping_start = locations.field_builder::(1).unwrap(); - mapping_start.append_value(mapping.start); + if let Some(mapping) = &symbolized_location.mapping { + let mapping_build_id = locations + .field_builder::>(5) + .unwrap(); + if !mapping.build_id.is_empty() { + mapping_build_id.append_value(mapping.build_id.as_bytes()); + } else { + mapping_build_id.append_value("".as_bytes()); + } - let mapping_limit = locations.field_builder::(2).unwrap(); - mapping_limit.append_value(mapping.limit); + let mapping_file = locations + .field_builder::>(4) + .unwrap(); + if !mapping.file.is_empty() { + mapping_file.append_value(mapping.file.as_bytes()); + } else { + mapping_file.append_value("".as_bytes()); + } - let mapping_offset = locations.field_builder::(3).unwrap(); - mapping_offset.append_value(mapping.offset); - } else { - let mapping_build_id = locations - .field_builder::>(5) - .unwrap(); - mapping_build_id.append_value("".as_bytes()); + let mapping_start = locations.field_builder::(1).unwrap(); + mapping_start.append_value(mapping.start); - let mapping_file = locations - .field_builder::>(4) - .unwrap(); - mapping_file.append_value("".as_bytes()); + let mapping_limit = locations.field_builder::(2).unwrap(); + mapping_limit.append_value(mapping.limit); - let mapping_start = locations.field_builder::(1).unwrap(); - mapping_start.append_value(0); + let mapping_offset = locations.field_builder::(3).unwrap(); + mapping_offset.append_value(mapping.offset); + } else { + let mapping_build_id = locations + .field_builder::>(5) + .unwrap(); + mapping_build_id.append_value("".as_bytes()); - let mapping_limit = locations.field_builder::(2).unwrap(); - mapping_limit.append_value(0); + let mapping_file = locations + .field_builder::>(4) + .unwrap(); + mapping_file.append_value("".as_bytes()); - let mapping_offset = locations.field_builder::(3).unwrap(); - mapping_offset.append_value(0); - } + let mapping_start = locations.field_builder::(1).unwrap(); + mapping_start.append_value(0); - let lines = locations - .field_builder::>>(6) - .unwrap(); - if symbolized_location.lines.len() > 0 { - lines.append(true); - for ln in symbolized_location.lines.iter() { - let line = lines - .values() - .as_any_mut() - .downcast_mut::() - .unwrap(); - line.append(true); + let mapping_limit = locations.field_builder::(2).unwrap(); + mapping_limit.append_value(0); - let line_number = line.field_builder::(0).unwrap(); - line_number.append_value(ln.line); + let mapping_offset = locations.field_builder::(3).unwrap(); + mapping_offset.append_value(0); + } - if let Some(func) = &ln.function { - let function_name = line - .field_builder::>(1) + let lines = locations + .field_builder::>>(6) + .unwrap(); + if symbolized_location.lines.len() > 0 { + lines.append(true); + for ln in symbolized_location.lines.iter() { + let line = lines + .values() + .as_any_mut() + .downcast_mut::() .unwrap(); - if !func.name.is_empty() { - function_name.append_value(func.name.as_bytes()); + line.append(true); + + let line_number = line.field_builder::(0).unwrap(); + line_number.append_value(ln.line); + + if let Some(func) = &ln.function { + let function_name = line + .field_builder::>(1) + .unwrap(); + if !func.name.is_empty() { + function_name.append_value(func.name.as_bytes()); + } else { + function_name.append_value("".as_bytes()); + } + + let function_system_name = line + .field_builder::>(2) + .unwrap(); + if !func.system_name.is_empty() { + function_system_name.append_value(func.system_name.as_bytes()); + } else { + function_system_name.append_value("".as_bytes()); + } + + let function_filename = line + .field_builder::>(3) + .unwrap(); + if !func.filename.is_empty() { + function_filename.append_value(func.filename.as_bytes()); + } else { + function_filename.append_value("".as_bytes()); + } + + let function_start_line = + line.field_builder::(4).unwrap(); + function_start_line.append_value(func.start_line); } else { + let function_name = line + .field_builder::>(1) + .unwrap(); function_name.append_value("".as_bytes()); - } - let function_system_name = line - .field_builder::>(2) - .unwrap(); - if !func.system_name.is_empty() { - function_system_name.append_value(func.system_name.as_bytes()); - } else { + let function_system_name = line + .field_builder::>(2) + .unwrap(); function_system_name.append_value("".as_bytes()); - } - let function_filename = line - .field_builder::>(3) - .unwrap(); - if !func.filename.is_empty() { - function_filename.append_value(func.filename.as_bytes()); - } else { + let function_filename = line + .field_builder::>(3) + .unwrap(); function_filename.append_value("".as_bytes()); - } - let function_start_line = - line.field_builder::(4).unwrap(); - function_start_line.append_value(func.start_line); + let function_start_line = + line.field_builder::(4).unwrap(); + function_start_line.append_value(0); + } } + } else { + lines.append(false); } } else { - lines.append(false); - } - } else { - addresses.append_value(0); + addresses.append_value(0); - let lines = locations - .field_builder::>>(6) - .unwrap(); - lines.append_null(); + let lines = locations + .field_builder::>>(6) + .unwrap(); + lines.append_null(); + } + locations.append(true); } + + locations_list.append(true); } let locations_array = locations_list.finish(); - Ok(RecordBatch::try_new( + + let rb = match RecordBatch::try_new( Arc::new(schema_builder::locations_arrow_schema()), vec![Arc::new(locations_array)], - )?) + ) { + Ok(rb) => rb, + Err(e) => { + anyhow::bail!( + "Failed to create record batch from locations array. Details: {}", + e + ) + } + }; + Ok(rb) } } fn locations_array_builder() -> GenericListBuilder { - ListBuilder::new(StructBuilder::from_fields( - vec![ - Field::new("address", DataType::UInt64, true), - Field::new("mapping_start", DataType::UInt64, true), - Field::new("mapping_limit", DataType::UInt64, true), - Field::new("mapping_offset", DataType::UInt64, true), - Field::new( - "mapping_file", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), - true, - ), - Field::new( - "mapping_build_id", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), - true, - ), - Field::new( - "lines", - DataType::List(Arc::new(Field::new_list_field( - DataType::Struct(Fields::from(vec![ - Field::new("line", DataType::Int64, true), - Field::new( - "function_name", - DataType::Dictionary( - Box::new(DataType::UInt32), - Box::new(DataType::Binary), - ), - true, - ), - Field::new( - "function_system_name", - DataType::Dictionary( - Box::new(DataType::UInt32), - Box::new(DataType::Binary), - ), - true, - ), - Field::new( - "function_filename", - DataType::Dictionary( - Box::new(DataType::UInt32), - Box::new(DataType::Binary), - ), - true, - ), - Field::new("function_start_line", DataType::Int64, true), - ])), - true, - ))), - true, - ), - ], - 0, - )) + ListBuilder::new(StructBuilder::from_fields(locations_inner_field(), 0)) } fn qs_to_meta_and_filter_expr(qs: &str) -> anyhow::Result<(profile::Meta, Vec)> { From f4e31538b948a71339ddf46cbc9286839d6cc5a8 Mon Sep 17 00:00:00 2001 From: Vidur Khanal Date: Mon, 30 Dec 2024 22:54:34 -0500 Subject: [PATCH 3/5] chore: make symbolizer tolerant --- src/profile/utils.rs | 2 +- src/symbolizer/mod.rs | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/profile/utils.rs b/src/profile/utils.rs index aa11e6e..67c03ee 100644 --- a/src/profile/utils.rs +++ b/src/profile/utils.rs @@ -184,7 +184,7 @@ pub async fn symbolize_locations( }], }; - symbolizer.symbolize(&mut sym_req).await?; + let _ = symbolizer.symbolize(&mut sym_req).await; // Update result_locations directly from the symbolized locations for (idx, loc) in locations_with_indices.iter() { diff --git a/src/symbolizer/mod.rs b/src/symbolizer/mod.rs index 2f7a4a4..f1664e3 100644 --- a/src/symbolizer/mod.rs +++ b/src/symbolizer/mod.rs @@ -66,7 +66,10 @@ impl Symbolizer { self.metadata .fetch(build_id, &DebuginfoType::DebuginfoUnspecified) .ok_or_else(|| { - Status::not_found(format!("Debuginfo for build_id {} not found", build_id)) + Status::not_found(format!( + "Debuginfo Metadata for build_id {} not found", + build_id + )) })? .clone() }; From 064799f75c8b028f9d22ee7264a7a47b1d2b4d13 Mon Sep 17 00:00:00 2001 From: Vidur Khanal Date: Mon, 30 Dec 2024 22:55:00 -0500 Subject: [PATCH 4/5] chore: remove debug statement --- src/symbols/addr_to_line/symbol.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/symbols/addr_to_line/symbol.rs b/src/symbols/addr_to_line/symbol.rs index e7a4818..1b03d62 100644 --- a/src/symbols/addr_to_line/symbol.rs +++ b/src/symbols/addr_to_line/symbol.rs @@ -168,6 +168,5 @@ mod tests { let x = l .pc_to_lines(NormalizedAddress(0x0000000000041290)) .unwrap(); - println!("{:?}", x); } } From 82a57480939fec05b7d9096a5bbfee97879fb52c Mon Sep 17 00:00:00 2001 From: Vidur Khanal Date: Mon, 30 Dec 2024 22:55:14 -0500 Subject: [PATCH 5/5] refactor: locations_inner_field --- src/schema_builder.rs | 102 ++++++++++++++++++++---------------------- 1 file changed, 49 insertions(+), 53 deletions(-) diff --git a/src/schema_builder.rs b/src/schema_builder.rs index 09138e2..2d3c446 100644 --- a/src/schema_builder.rs +++ b/src/schema_builder.rs @@ -3,64 +3,60 @@ use std::sync::Arc; //TODO: Add PprofLocationsArrowSchemaHere // +// + +pub fn locations_inner_field() -> Vec { + vec![ + Field::new("address", DataType::UInt64, true), + Field::new("mapping_start", DataType::UInt64, true), + Field::new("mapping_limit", DataType::UInt64, true), + Field::new("mapping_offset", DataType::UInt64, true), + Field::new( + "mapping_file", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)), + true, + ), + Field::new( + "mapping_build_id", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)), + true, + ), + Field::new( + "lines", + DataType::List(Arc::new(Field::new( + "lines_inner", + DataType::Struct(Fields::from(vec![ + Field::new("line", DataType::Int64, true), + Field::new( + "function_name", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)), + true, + ), + Field::new( + "function_system_name", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)), + true, + ), + Field::new( + "function_filename", + DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Binary)), + true, + ), + Field::new("function_start_line", DataType::Int64, true), + ])), + true, + ))), + true, + ), + ] +} pub fn locations_field() -> Field { Field::new( "locations", DataType::List(Arc::new(Field::new( - "locations_inner", - DataType::Struct(Fields::from(vec![ - Field::new("address", DataType::UInt64, true), - Field::new("mapping_start", DataType::UInt64, true), - Field::new("mapping_limit", DataType::UInt64, true), - Field::new("mapping_offset", DataType::UInt64, true), - Field::new( - "mapping_file", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), - true, - ), - Field::new( - "mapping_build_id", - DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)), - true, - ), - Field::new( - "lines", - DataType::List(Arc::new(Field::new( - "lines_inner", - DataType::Struct(Fields::from(vec![ - Field::new("line", DataType::Int64, true), - Field::new( - "function_name", - DataType::Dictionary( - Box::new(DataType::UInt32), - Box::new(DataType::Binary), - ), - true, - ), - Field::new( - "function_system_name", - DataType::Dictionary( - Box::new(DataType::UInt32), - Box::new(DataType::Binary), - ), - true, - ), - Field::new( - "function_filename", - DataType::Dictionary( - Box::new(DataType::UInt32), - Box::new(DataType::Binary), - ), - true, - ), - Field::new("function_start_line", DataType::Int64, true), - ])), - true, - ))), - true, - ), - ])), + "item", + DataType::Struct(Fields::from(locations_inner_field())), true, ))), true,