diff --git a/nemo-cli/src/main.rs b/nemo-cli/src/main.rs index 77634b1b0..a3b9fbf10 100644 --- a/nemo-cli/src/main.rs +++ b/nemo-cli/src/main.rs @@ -36,7 +36,7 @@ use nemo::{ datavalues::AnyDataValue, error::Error, execution::{ - DefaultExecutionEngine, ExecutionEngine, + DefaultExecutionStrategy, ExecutionEngine, execution_parameters::ExecutionParameters, tracing::{node_query::TableEntriesForTreeNodesQuery, tree_query::TreeForTableQuery}, }, @@ -150,7 +150,7 @@ fn print_timing_details() { } /// Prints detailed memory information. -fn print_memory_details(engine: &DefaultExecutionEngine) { +fn print_memory_details(engine: &ExecutionEngine) { println!("\nMemory report:\n\n{}", engine.memory_usage()); } @@ -169,7 +169,7 @@ fn parse_trace_facts(cli: &CliApp) -> Result, Error> { } /// Deal with tracing -async fn handle_tracing(cli: &CliApp, engine: &mut DefaultExecutionEngine) -> Result<(), CliError> { +async fn handle_tracing(cli: &CliApp, engine: &mut ExecutionEngine) -> Result<(), CliError> { let tracing_facts = parse_trace_facts(cli)?; if !tracing_facts.is_empty() { log::info!("Starting tracing of {} facts...", tracing_facts.len()); @@ -214,10 +214,7 @@ async fn handle_tracing(cli: &CliApp, engine: &mut DefaultExecutionEngine) -> Re Ok(()) } -async fn handle_tracing_tree( - cli: &CliApp, - engine: &mut DefaultExecutionEngine, -) -> Result<(), CliError> { +async fn handle_tracing_tree(cli: &CliApp, engine: &mut ExecutionEngine) -> Result<(), CliError> { if let Some(query_json) = &cli.tracing_tree.trace_tree_json { let tree_query: TreeForTableQuery = serde_json::from_str(query_json).map_err(|_| CliError::TracingInvalidFact { @@ -233,10 +230,7 @@ async fn handle_tracing_tree( Ok(()) } -async fn handle_tracing_node( - cli: &CliApp, - engine: &mut DefaultExecutionEngine, -) -> Result<(), CliError> { +async fn handle_tracing_node(cli: &CliApp, engine: &mut ExecutionEngine) -> Result<(), CliError> { if let Some(query_file) = &cli.tracing_node.trace_node_json { let query_string = read_to_string(query_file).expect("Unable to read file"); @@ -297,7 +291,7 @@ async fn run(mut cli: CliApp) -> Result<(), CliError> { TimedCode::instance().sub("Reasoning").start(); log::info!("Reasoning ... "); - engine.execute().await?; + engine.execute::().await?; log::info!("Reasoning done"); TimedCode::instance().sub("Reasoning").stop(); diff --git a/nemo-physical/src/datatypes.rs b/nemo-physical/src/datatypes.rs index d4fd326bd..52486dd3f 100644 --- a/nemo-physical/src/datatypes.rs +++ b/nemo-physical/src/datatypes.rs @@ -5,7 +5,7 @@ pub(crate) mod storage_type_name; pub(crate) use storage_type_name::StorageTypeName; /// Module for defining [StorageValueT] pub(crate) mod storage_value; -pub(crate) use storage_value::StorageValueT; +pub use storage_value::StorageValueT; /// Module for defining [Double] pub mod double; pub use double::Double; diff --git a/nemo-physical/src/datatypes/storage_value.rs b/nemo-physical/src/datatypes/storage_value.rs index 48666b0ae..12c6a71d9 100644 --- a/nemo-physical/src/datatypes/storage_value.rs +++ b/nemo-physical/src/datatypes/storage_value.rs @@ -12,7 +12,7 @@ use super::{StorageTypeName, double::Double, float::Float, into_datavalue::IntoD /// Ord and PartialOrd assume U32 < U64 < I64 < Float < Double. /// More information at #[derive(Clone, Copy, Debug, PartialEq, Eq, Ord, PartialOrd, Hash)] -pub(crate) enum StorageValueT { +pub enum StorageValueT { /// A value of type [StorageTypeName::Id32]. Such values always refer to an entry in a /// dictionary, rather than to the literal numerical integer value. Id32(u32), diff --git a/nemo-physical/src/datavalues/any_datavalue.rs b/nemo-physical/src/datavalues/any_datavalue.rs index 3899c4748..08ffc8b11 100644 --- a/nemo-physical/src/datavalues/any_datavalue.rs +++ b/nemo-physical/src/datavalues/any_datavalue.rs @@ -515,7 +515,7 @@ impl AnyDataValue { /// The correct process in this case is to use the dictionary to create any null value on which this /// method will later be called. It is not possible to newly create a dictionary id for an arbitrary /// null value (in such a way that the same ID will be returned if an equal null value is converted). - pub(crate) fn to_storage_value_t_dict(&self, dictionary: &mut Dict) -> StorageValueT { + pub fn to_storage_value_t_dict(&self, dictionary: &mut Dict) -> StorageValueT { match self.value_domain() { ValueDomain::Tuple | ValueDomain::Map diff --git a/nemo-physical/src/management/database.rs b/nemo-physical/src/management/database.rs index e5b38ef84..516d6f98c 100644 --- a/nemo-physical/src/management/database.rs +++ b/nemo-physical/src/management/database.rs @@ -20,6 +20,7 @@ use ascii_tree::write_tree; use crate::{ datasources::table_providers::TableProvider, + datatypes::StorageValueT, datavalues::AnyDataValue, error::Error, management::{bytesized::ByteSized, database::execution_series::ExecutionTreeNode}, @@ -150,6 +151,26 @@ impl DatabaseInstance { self.reference_manager.count_rows_in_memory(id) } + /// Provide an iterator over the rows of the table with the given [PermanentTableId] + /// which directly yields rows of [`StorageValueT`], without translation through a + /// dictionary. + /// + /// # Panics + /// Panics if the given id does not exist. + pub async fn table_raw_row_iterator( + &mut self, + id: PermanentTableId, + ) -> Result> + '_, Error> { + // Make sure trie is loaded + let storage_id = self + .reference_manager + .trie_id(&self.dictionary, id, ColumnOrder::default()) + .await.unwrap_or_else(|err| panic!("No table with the id {id} exists: {err}")); + let trie = self.reference_manager.trie(storage_id); + + Ok(trie.row_iterator()) + } + /// Provide an iterator over the rows of the table with the given [PermanentTableId]. /// /// # Panics @@ -158,7 +179,6 @@ impl DatabaseInstance { &mut self, id: PermanentTableId, ) -> Result> + '_, Error> { - // Make sure trie is loaded let storage_id = self .reference_manager .trie_id(&self.dictionary, id, ColumnOrder::default()) diff --git a/nemo-physical/src/tabular.rs b/nemo-physical/src/tabular.rs index 2fe311f9e..184d69c5f 100644 --- a/nemo-physical/src/tabular.rs +++ b/nemo-physical/src/tabular.rs @@ -5,5 +5,5 @@ pub mod operations; pub mod trie; pub(crate) mod triescan; -pub(crate) mod buffer; +pub mod buffer; pub(crate) mod rowscan; diff --git a/nemo-physical/src/tabular/buffer.rs b/nemo-physical/src/tabular/buffer.rs index 94630e059..59747d484 100644 --- a/nemo-physical/src/tabular/buffer.rs +++ b/nemo-physical/src/tabular/buffer.rs @@ -2,3 +2,5 @@ pub(crate) mod sorted_tuple_buffer; pub(crate) mod tuple_buffer; + +pub use tuple_buffer::TupleBuffer; diff --git a/nemo-physical/src/tabular/buffer/sorted_tuple_buffer.rs b/nemo-physical/src/tabular/buffer/sorted_tuple_buffer.rs index 8e1f4611e..c9eb17524 100644 --- a/nemo-physical/src/tabular/buffer/sorted_tuple_buffer.rs +++ b/nemo-physical/src/tabular/buffer/sorted_tuple_buffer.rs @@ -8,7 +8,7 @@ use super::tuple_buffer::TupleBuffer; /// Read-only wrapper for [TupleBuffer] which allows the retrieval of its tuples in a sorted manner #[derive(Debug)] -pub(crate) struct SortedTupleBuffer { +pub struct SortedTupleBuffer { /// Underlying [TupleBuffer] containing the actual values tuple_buffer: TupleBuffer, /// We imagine the tuple of the `tuple_buffer` to be arranged one after another in the order of its subtables. diff --git a/nemo-physical/src/tabular/buffer/tuple_buffer.rs b/nemo-physical/src/tabular/buffer/tuple_buffer.rs index 77dd549d7..814d566e3 100644 --- a/nemo-physical/src/tabular/buffer/tuple_buffer.rs +++ b/nemo-physical/src/tabular/buffer/tuple_buffer.rs @@ -253,7 +253,7 @@ impl TransformPosition { /// Represents a row-based table containing values of arbitrary data types #[derive(Debug)] -pub(crate) struct TupleBuffer { +pub struct TupleBuffer { /// Conceptionally, one may imagine the table represented by the [TupleBuffer] /// to be split into several subtables that only contain rows with certain fixed types. /// E.g. one subtable might contain tuples of type ([StorageTypeName::Id32], [StorageTypeName::Int64]) @@ -282,7 +282,7 @@ pub(crate) struct TupleBuffer { impl TupleBuffer { /// Create a new [TupleBuffer]. - pub(crate) fn new(column_number: usize) -> Self { + pub fn new(column_number: usize) -> Self { Self::with_patterns(column_number, Vec::new()) } @@ -334,7 +334,7 @@ impl TupleBuffer { /// Alternatively, a partially built tuple can be abandonded by calling `drop_current_tuple`. /// /// This must not be mixed with [add_tuple_data_value] on the same tuple. - pub(crate) fn add_tuple_value(&mut self, value: StorageValueT) { + pub fn add_tuple_value(&mut self, value: StorageValueT) { self.current_tuple_types[self.current_tuple_index] = value.get_type(); self.current_tuple[self.current_tuple_index] = value; self.current_tuple_index += 1; @@ -376,7 +376,7 @@ impl TupleBuffer { } /// Finish writing to the [TupleBuffer] and return a [SortedTupleBuffer]. - pub(crate) fn finalize(self) -> SortedTupleBuffer { + pub fn finalize(self) -> SortedTupleBuffer { SortedTupleBuffer::new(self) } @@ -387,7 +387,7 @@ impl TupleBuffer { } /// Returns the number of rows in the [TupleBuffer] - pub(crate) fn size(&self) -> usize { + pub fn size(&self) -> usize { self.typed_subtables .iter() .map(|record| record.current_length) diff --git a/nemo-physical/src/tabular/trie.rs b/nemo-physical/src/tabular/trie.rs index f7f809ab3..fb21d7710 100644 --- a/nemo-physical/src/tabular/trie.rs +++ b/nemo-physical/src/tabular/trie.rs @@ -92,7 +92,7 @@ impl Trie { } /// Return a row based iterator over this trie. - pub(crate) fn row_iterator(&self) -> impl Iterator> + '_ { + pub fn row_iterator(&self) -> impl Iterator> + use<'_> { RowScan::new_full(self.partial_iterator()) } @@ -180,7 +180,7 @@ impl Trie { } /// Create a new [Trie] from a [SortedTupleBuffer]. - pub(crate) fn from_tuple_buffer(buffer: SortedTupleBuffer) -> Self { + pub fn from_tuple_buffer(buffer: SortedTupleBuffer) -> Self { let mut intervalcolumn_builders = (0..buffer.column_number()) .map(|_| IntervalColumnTBuilderMatrix::::default()) .collect::>(); diff --git a/nemo-python/src/lib.rs b/nemo-python/src/lib.rs index ad45cac4b..64d806cfe 100644 --- a/nemo-python/src/lib.rs +++ b/nemo-python/src/lib.rs @@ -5,7 +5,7 @@ use nemo::{ chase_model::ChaseAtom, datavalues::{AnyDataValue, DataValue}, error::Error, - execution::{ExecutionEngine, tracing::trace::ExecutionTraceTree}, + execution::{DefaultExecutionStrategy, ExecutionEngine, tracing::trace::ExecutionTraceTree}, io::{ExportManager, ImportManager, resource_providers::ResourceProviders}, meta::timing::TimedCode, rule_model::{ @@ -331,7 +331,7 @@ impl NemoResults { #[pyclass(unsendable)] struct NemoEngine { - engine: nemo::execution::DefaultExecutionEngine, + engine: nemo::execution::ExecutionEngine, } #[pyclass] @@ -426,7 +426,8 @@ impl NemoEngine { .enable_all() .build()?; - rt.block_on(self.engine.execute()).py_res()?; + rt.block_on(self.engine.execute::()) + .py_res()?; TimedCode::instance().sub("Reasoning").stop(); TimedCode::instance().stop(); diff --git a/nemo-wasm/src/lib.rs b/nemo-wasm/src/lib.rs index f03830658..08fde57a4 100644 --- a/nemo-wasm/src/lib.rs +++ b/nemo-wasm/src/lib.rs @@ -16,7 +16,9 @@ use web_sys::{Blob, FileReaderSync}; use nemo::{ datavalues::{AnyDataValue, DataValue}, error::ReadingError, - execution::{ExecutionEngine, execution_parameters::ExecutionParameters}, + execution::{ + DefaultExecutionStrategy, ExecutionEngine, execution_parameters::ExecutionParameters, + }, io::{ ImportManager, resource_providers::{ResourceProvider, ResourceProviders, http}, @@ -126,7 +128,7 @@ impl ResourceProvider for BlobResourceProvider { #[wasm_bindgen] pub struct NemoEngine { - engine: nemo::execution::DefaultExecutionEngine, + engine: nemo::execution::ExecutionEngine, } #[cfg(feature = "web_sys_unstable_apis")] @@ -264,7 +266,7 @@ impl NemoEngine { #[wasm_bindgen] pub async fn reason(&mut self) -> Result<(), NemoError> { self.engine - .execute() + .execute::() .await .map_err(WasmOrInternalNemoError::Nemo) .map_err(NemoError) diff --git a/nemo/src/api.rs b/nemo/src/api.rs index e8b43b9c2..18471b988 100644 --- a/nemo/src/api.rs +++ b/nemo/src/api.rs @@ -27,7 +27,7 @@ use std::{fs::read_to_string, path::PathBuf}; use crate::{ error::{Error, ReadingError, report::ProgramReport}, execution::{ - DefaultExecutionEngine, ExecutionEngine, execution_parameters::ExecutionParameters, + DefaultExecutionStrategy, ExecutionEngine, execution_parameters::ExecutionParameters, }, rule_file::RuleFile, rule_model::{ @@ -41,7 +41,7 @@ use crate::{ use nemo_physical::resource::Resource; /// Reasoning Engine exposed by the API -pub type Engine = DefaultExecutionEngine; +pub type Engine = ExecutionEngine; /// Load the given `file` and load the program from the file. /// @@ -116,7 +116,7 @@ pub fn validate(input: String, label: String) -> ProgramReport { /// parsed rules, all relative paths are resolved with the current /// working directory pub async fn reason(engine: &mut Engine) -> Result<(), Error> { - engine.execute().await + engine.execute::().await } /// Get a [Vec] of all output predicates that are computed by the engine. diff --git a/nemo/src/execution.rs b/nemo/src/execution.rs index bebb59c99..6345b30be 100644 --- a/nemo/src/execution.rs +++ b/nemo/src/execution.rs @@ -12,6 +12,7 @@ use self::selection_strategy::{ pub mod execution_parameters; pub mod planning; pub mod rule_execution; +pub mod saturation; pub mod selection_strategy; pub mod tracing; @@ -19,6 +20,3 @@ pub mod tracing; pub type DefaultExecutionStrategy = StrategyStratifiedNegation< StrategyDependencyGraph, >; - -/// Shorthand for an execution engine using the default strategy -pub type DefaultExecutionEngine = ExecutionEngine; diff --git a/nemo/src/execution/execution_engine.rs b/nemo/src/execution/execution_engine.rs index 6caa77b0a..b8f0f4cfb 100644 --- a/nemo/src/execution/execution_engine.rs +++ b/nemo/src/execution/execution_engine.rs @@ -1,12 +1,17 @@ //! Functionality which handles the execution of a program -use std::collections::HashMap; +use core::slice; +use std::{collections::HashMap, sync::Arc}; use nemo_physical::{ datavalues::AnyDataValue, dictionary::DvDict, - management::database::sources::{SimpleTable, TableSource}, + management::{ + database::sources::{SimpleTable, TableSource}, + execution_plan::ColumnOrder, + }, meta::timing::TimedCode, + tabular::{buffer::TupleBuffer, trie::Trie}, }; use crate::{ @@ -16,6 +21,10 @@ use crate::{ translation::ProgramChaseTranslation, }, error::{Error, report::ProgramReport, warned::Warned}, + execution::saturation::{ + execution::{DataBase, saturate}, + model::{SaturationRule, SaturationRuleTranslation}, + }, io::{formats::Export, import_manager::ImportManager}, rule_file::RuleFile, rule_model::{ @@ -28,7 +37,7 @@ use crate::{ use super::{ execution_parameters::ExecutionParameters, rule_execution::RuleExecution, - selection_strategy::strategy::RuleSelectionStrategy, + selection_strategy::strategy::MetaStrategy, }; pub mod tracing; @@ -54,7 +63,7 @@ impl RuleInfo { /// Object which handles the evaluation of the program. #[derive(Debug)] -pub struct ExecutionEngine { +pub struct ExecutionEngine { /// Logical program nemo_program: Program, @@ -63,9 +72,6 @@ pub struct ExecutionEngine { /// Auxillary information for `program` analysis: ProgramAnalysis, - /// The picked selection strategy for rules - rule_strategy: RuleSelectionStrategy, - /// Management of tables that represent predicates table_manager: TableManager, /// Managermet of imports @@ -84,7 +90,7 @@ pub struct ExecutionEngine { current_step: usize, } -impl ExecutionEngine { +impl ExecutionEngine { /// Initialize a [ExecutionEngine] by parsing and translating /// the contents of the given file. pub async fn from_file( @@ -124,16 +130,10 @@ impl ExecutionEngine { .iter() .for_each(|_| rule_infos.push(RuleInfo::new())); - let rule_strategy = Strategy::new( - chase_program.rules().iter().collect(), - analysis.rule_analysis.iter().collect(), - )?; - Ok(Self { nemo_program: program, program: chase_program, analysis, - rule_strategy, table_manager, import_manager, predicate_fragmentation: HashMap::new(), @@ -209,6 +209,7 @@ impl ExecutionEngine { rule_index: usize, execution: &RuleExecution, ) -> Result, Error> { + TimedCode::instance().sub("Reasoning/Rules").start(); let timing_string = format!("Reasoning/Rules/Rule {rule_index}"); TimedCode::instance().sub(&timing_string).start(); @@ -233,6 +234,7 @@ impl ExecutionEngine { log::info!("Rule duration: {} ms", rule_duration.as_millis()); self.current_step += 1; + TimedCode::instance().sub("Reasoning/Rules").stop(); Ok(updated_predicates) } @@ -267,12 +269,121 @@ impl ExecutionEngine { Ok(()) } + async fn saturation_step( + &mut self, + rules: &mut [SaturationRule], + scc: &[usize], + ) -> Result { + TimedCode::instance().sub("Reasoning/Saturation").start(); + + let mut db: DataBase = Default::default(); + let mut new_facts = false; + + let predicates: Vec = self.table_manager.known_predicates().cloned().collect(); + for predicate in &predicates { + let Some(table_id) = self.table_manager.combine_predicate(predicate).await? else { + continue; + }; + + db.add_table( + Arc::from(predicate.name()), + self.table_manager.table_raw_row_iterator(table_id).await?, + ); + } + + log::trace!("{db:?}"); + + let timing_string = format!("Reasoning/Saturation/Saturate/{scc:?}"); + + TimedCode::instance().sub(&timing_string).start(); + saturate(&mut db, rules); + let duration = TimedCode::instance().sub(&timing_string).stop(); + + log::info!("Saturation took {}ms", duration.as_millis()); + + let timing_string = "Reasoning/Saturation/Save Tables".to_string(); + + TimedCode::instance().sub(&timing_string).start(); + for predicate in &predicates { + let mut buffer = TupleBuffer::new(self.predicate_arity(predicate).unwrap()); + + for value in db.new_facts(predicate.name()) { + buffer.add_tuple_value(value); + } + + if buffer.size() == 0 { + continue; + } + + log::debug!("derived {} facts for {predicate}", buffer.size()); + + let trie = Trie::from_tuple_buffer(buffer.finalize()); + self.table_manager.add_table( + predicate.clone(), + self.current_step, + ColumnOrder::default(), + trie, + ); + + new_facts = true; + } + let duration = TimedCode::instance().sub(&timing_string).stop(); + log::info!("Saved saturation results: {}ms", duration.as_millis()); + + self.current_step += 1; + TimedCode::instance().sub("Reasoning/Saturation").stop(); + Ok(new_facts) + } + /// Executes the program. - pub async fn execute(&mut self) -> Result<(), Error> { - TimedCode::instance().sub("Reasoning/Rules").start(); + pub async fn execute(&mut self) -> Result<(), Error> { TimedCode::instance().sub("Reasoning/Execution").start(); - let rule_execution: Vec = self + let mut new_derivations: Option = None; + + fn fill_saturation_rules( + this: &mut ExecutionEngine, + scc: &[usize], + store: &mut HashMap< + Box<[usize]>, + Result, HashMap>, + >, + ) { + if store.contains_key(scc) { + return; + } + + let saturation_rules: Vec> = { + let mut dict = this.table_manager.dictionary_mut(); + let mut translation = SaturationRuleTranslation::new(&mut dict); + scc.iter() + .map(|index| { + translation + .convert(this.nemo_program.rule(*index)) + .inspect_err(|reason| { + log::debug!("rule {index} does not support saturation ({reason})") + }) + .ok() + }) + .collect() + }; + + let saturation_rules = if saturation_rules.iter().all(Option::is_some) { + Ok(saturation_rules.into_iter().map(Option::unwrap).collect()) + } else { + Err(saturation_rules + .into_iter() + .zip(scc) + .flat_map(|(r, i)| Some(*i).zip(r)) + .collect()) + }; + + store.insert(Box::from(scc), saturation_rules); + } + + let mut saturation_rules = Default::default(); + + let executions: Vec<_> = self .program .rules() .iter() @@ -280,17 +391,46 @@ impl ExecutionEngine { .zip(self.analysis.rule_analysis.iter()) .map(|((index, rule), analysis)| RuleExecution::initialize(rule, index, analysis)) .collect(); - - let mut new_derivations: Option = None; - - while let Some(index) = self.rule_strategy.next_rule(new_derivations) { - let updated_predicates = self.step(index, &rule_execution[index]).await?; - new_derivations = Some(!updated_predicates.is_empty()); - - self.defrag(updated_predicates).await?; + let mut rule_strategy = Strategy::new(self.analysis.rule_analysis.iter().collect())?; + + let mut last_scc: Option> = None; + + while let Some(index) = rule_strategy.next_rule(new_derivations) { + let scc = rule_strategy.current_scc(); + if let Some(last) = &last_scc + && &scc == last { + log::debug!("skipping application of {index}"); + new_derivations = Some(false); + continue; + } + + fill_saturation_rules(self, &scc, &mut saturation_rules); + + match saturation_rules.get_mut(&scc).unwrap() { + Ok(rules) => { + log::info!("<<< {0}: APPLYING SCC {scc:?} >>>", self.current_step); + + new_derivations = Some(self.saturation_step(rules, &scc).await?); + last_scc = Some(scc); + } + Err(rules) => { + if let Some(rule) = rules.get_mut(&index) { + new_derivations = Some( + self.saturation_step(slice::from_mut(rule), &[index]) + .await?, + ); + last_scc = None; + } else { + let updated_predicates = self.step(index, &executions[index]).await?; + last_scc = None; + new_derivations = Some(!updated_predicates.is_empty()); + + self.defrag(updated_predicates).await?; + } + } + } } - TimedCode::instance().sub("Reasoning/Rules").stop(); TimedCode::instance().sub("Reasoning/Execution").stop(); Ok(()) diff --git a/nemo/src/execution/execution_engine/tracing/node_query.rs b/nemo/src/execution/execution_engine/tracing/node_query.rs index 2b2947761..e81da4e01 100644 --- a/nemo/src/execution/execution_engine/tracing/node_query.rs +++ b/nemo/src/execution/execution_engine/tracing/node_query.rs @@ -22,7 +22,6 @@ use crate::{ valid_tables_plan, variable_translation, }, }, - selection_strategy::strategy::RuleSelectionStrategy, tracing::{ node_query::{ TableEntriesForTreeNodesQuery, TableEntriesForTreeNodesQueryInner, @@ -38,7 +37,7 @@ use crate::{ mod manager; mod util; -impl ExecutionEngine { +impl ExecutionEngine { /// Phase 1 of `trace_node_execute` /// /// Collect the fact restriction in each node into tables. diff --git a/nemo/src/execution/execution_engine/tracing/simple.rs b/nemo/src/execution/execution_engine/tracing/simple.rs index d091daefa..233b9c91d 100644 --- a/nemo/src/execution/execution_engine/tracing/simple.rs +++ b/nemo/src/execution/execution_engine/tracing/simple.rs @@ -13,7 +13,6 @@ use crate::{ execution::{ ExecutionEngine, planning::plan_tracing::TracingStrategy, - selection_strategy::strategy::RuleSelectionStrategy, tracing::{ error::TracingError, trace::{ @@ -32,7 +31,7 @@ use crate::{ table_manager::SubtableExecutionPlan, }; -impl ExecutionEngine { +impl ExecutionEngine { /// Recursive part of `trace`. async fn trace_recursive( &mut self, diff --git a/nemo/src/execution/execution_engine/tracing/tree_query.rs b/nemo/src/execution/execution_engine/tracing/tree_query.rs index e79e91bde..b08315c9b 100644 --- a/nemo/src/execution/execution_engine/tracing/tree_query.rs +++ b/nemo/src/execution/execution_engine/tracing/tree_query.rs @@ -15,7 +15,6 @@ use crate::{ execution::{ ExecutionEngine, planning::plan_tracing::TracingStrategy, - selection_strategy::strategy::RuleSelectionStrategy, tracing::{ error::TracingError, shared::{PaginationResponse, Rule as TraceRule, TableEntryQuery, TableEntryResponse}, @@ -102,7 +101,7 @@ fn partial_grounding_for_rule_head_and_fact( Some(grounding) } -impl ExecutionEngine { +impl ExecutionEngine { /// The results for negated nodes is not computed as part of the normal /// evaluation of the query, and instead appendning the the full table, /// as an explanation for negated facts. diff --git a/nemo/src/execution/rule_execution.rs b/nemo/src/execution/rule_execution.rs index 7e78a8877..4a55e353d 100644 --- a/nemo/src/execution/rule_execution.rs +++ b/nemo/src/execution/rule_execution.rs @@ -28,7 +28,7 @@ pub(crate) type VariableTranslation = OperationTableGenerator; /// Object responsible for executing a "normal" rule. #[derive(Debug)] -pub(crate) struct RuleExecution { +pub struct RuleExecution { /// Translation of variables into markers used for creating execution plans variable_translation: VariableTranslation, /// List of variable orders which might be considered for this rule diff --git a/nemo/src/execution/saturation.rs b/nemo/src/execution/saturation.rs new file mode 100644 index 000000000..48bab25bd --- /dev/null +++ b/nemo/src/execution/saturation.rs @@ -0,0 +1,4 @@ +//! Saturate a set of rules. + +pub mod execution; +pub mod model; diff --git a/nemo/src/execution/saturation/execution.rs b/nemo/src/execution/saturation/execution.rs new file mode 100644 index 000000000..818d1c961 --- /dev/null +++ b/nemo/src/execution/saturation/execution.rs @@ -0,0 +1,783 @@ +//! Executing a set of saturation rules + +use core::panic; +use std::{ + collections::{BTreeMap, HashMap, btree_map}, + ops::{Bound, Deref, DerefMut, Index}, + sync::Arc, +}; + +use nemo_physical::datatypes::StorageValueT; +#[cfg(not(test))] +use nemo_physical::meta::timing::TimedCode; + +use super::model::{ + BodyTerm, Head, JoinOp, SaturationAtom, SaturationFact, SaturationRule, VariableIdx, +}; + +#[derive(Debug, Default, Clone)] +struct SaturationSubstitution(Vec>); + +impl Index for SaturationSubstitution { + type Output = Option; + + fn index(&self, index: VariableIdx) -> &Self::Output { + if self.0.len() <= usize::from(index) { + &None + } else { + &self.0[usize::from(index)] + } + } +} + +impl SaturationSubstitution { + fn insert(&mut self, var: VariableIdx, value: StorageValueT) -> Option { + if self.0.len() <= usize::from(var) { + self.0.resize_with(usize::from(var + 1), || None); + self.0[usize::from(var)] = Some(value); + None + } else { + let prev = self.0[usize::from(var)]; + self.0[usize::from(var)] = Some(value); + prev + } + } + + fn bind(&self, terms: &[BodyTerm]) -> Row { + terms + .iter() + .map(|term| match term { + BodyTerm::Constant(constant) => RowElement::Value(*constant), + BodyTerm::Variable(var) => self[*var] + .map(RowElement::Value) + .unwrap_or(RowElement::Bottom), + BodyTerm::Ignore => RowElement::Bottom, + }) + .collect() + } + + #[must_use] + fn update(&mut self, terms: &[BodyTerm], row: &[RowElement]) -> bool { + for (term, value) in terms.iter().zip(row) { + let BodyTerm::Variable(var) = term else { + continue; + }; + + if let Some(prev) = self.insert(*var, value.value()) + && prev != value.value() + { + return false; + } + } + + true + } + + fn satisfies(&self, equality: (VariableIdx, VariableIdx)) -> bool { + self.0[equality.0 as usize].unwrap() == self.0[equality.1 as usize].unwrap() + } +} + +impl SaturationAtom { + fn match_fact(&self, fact: &SaturationFact) -> Option { + if fact.predicate != self.predicate { + return None; + } + + let mut res = SaturationSubstitution::default(); + debug_assert_eq!(self.terms.len(), fact.values.len()); + + for (term, value) in self.terms.iter().zip(fact.values.iter()) { + match term { + BodyTerm::Constant(constant) => { + if value != constant { + return None; + } + } + BodyTerm::Variable(idx) => { + if let Some(prev) = res.insert(*idx, *value) + && prev != *value + { + return None; + } + } + BodyTerm::Ignore => {} + } + } + + if let Some(equality) = self.equality + && !res.satisfies(equality) + { + return None; + } + + Some(res) + } +} + +struct Triggers<'a, 'b> { + rule: &'a mut SaturationRule, + fact: &'b SaturationFact, + index: usize, +} + +impl Iterator for Triggers<'_, '_> { + type Item = ExecutionTree; + + fn next(&mut self) -> Option { + while self.index < self.rule.body_atoms.len() { + let Some(substitution) = self.rule.body_atoms[self.index].match_fact(self.fact) else { + self.index += 1; + continue; + }; + + let ops = self.rule.join_order(self.index); + let index = ops.len(); + + self.index += 1; + + return Some(ExecutionTree { + init: substitution, + ops, + index, + }); + } + + None + } +} + +impl SaturationRule { + fn trigger<'a, 'b>( + &'a mut self, + fact: &'b SaturationFact, + ) -> impl Iterator + use<'a, 'b> { + Triggers { + rule: self, + fact, + index: 0, + } + } +} + +#[derive(PartialEq, Eq, Clone, Copy, Debug)] +enum RowElement { + Value(StorageValueT), + Bottom, + Top, +} + +type Row = Box<[RowElement]>; + +impl PartialOrd for RowElement { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for RowElement { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + match (self, other) { + (RowElement::Bottom, RowElement::Bottom) => std::cmp::Ordering::Equal, + (RowElement::Top, RowElement::Top) => std::cmp::Ordering::Equal, + (RowElement::Value(a), RowElement::Value(b)) => a.cmp(b), + + (_, RowElement::Bottom) => std::cmp::Ordering::Greater, + (_, RowElement::Top) => std::cmp::Ordering::Less, + (RowElement::Bottom, _) => std::cmp::Ordering::Less, + (RowElement::Top, _) => std::cmp::Ordering::Greater, + } + } +} + +enum MatchResult { + Matches, + InBounds, + OutOfBounds, +} + +impl RowElement { + fn value(self) -> StorageValueT { + match self { + RowElement::Value(inner) => inner, + RowElement::Top | RowElement::Bottom => panic!("called value() on RowElement::Ghost"), + } + } +} + +fn match_rows(pattern: &[RowElement], row: &[RowElement]) -> MatchResult { + let mut index = 0; + + while index < pattern.len() { + let RowElement::Value(value) = pattern[index] else { + break; + }; + + match value.cmp(&row[index].value()) { + std::cmp::Ordering::Less => return MatchResult::OutOfBounds, + std::cmp::Ordering::Equal => {} + std::cmp::Ordering::Greater => panic!("pattern must always be a lower bound"), + } + + index += 1; + } + + // only here if pattern[index] == Ghost || index >= pattern.len() + index += 1; + + while index < pattern.len() { + let RowElement::Value(value) = pattern[index] else { + index += 1; + continue; + }; + + if value != row[index].value() { + return MatchResult::InBounds; + } + + index += 1; + } + + MatchResult::Matches +} + +#[derive(Debug)] +struct RowIterator<'a> { + lower_cursor: btree_map::Cursor<'a, Row, Age>, + upper_cursor_next: Option<&'a Row>, + pattern: Row, +} + +impl<'a> Iterator for RowIterator<'a> { + type Item = &'a [RowElement]; + + fn next(&mut self) -> Option { + while let Some((row, _)) = self.lower_cursor.next() { + if let Some(other_row) = self.upper_cursor_next + && other_row == row + { + return None; + } + + match match_rows(&self.pattern, row) { + MatchResult::Matches => return Some(row), + MatchResult::InBounds => continue, + MatchResult::OutOfBounds => { + log::trace!("OutOfBounds {row:?}, {:?}", self.pattern); + log::trace!("upper cursor next {:?}", self.upper_cursor_next); + unreachable!("this should have been caught early") + } + } + } + + None + } +} + +trait GhostBound { + fn invert_bound(&self) -> Self; +} + +impl GhostBound for Row { + fn invert_bound(&self) -> Self { + self.iter() + .map(|elem| match elem { + RowElement::Bottom => RowElement::Top, + RowElement::Top => RowElement::Bottom, + value => *value, + }) + .collect() + } +} + +fn find_all_matches<'a>(pattern: Row, table: &'a BTreeMap) -> RowIterator<'a> { + let lower_cursor = table.lower_bound(Bound::Included(&pattern)); + let upper_cursor = table.upper_bound(Bound::Included(&pattern.invert_bound())); + let upper_cursor_next = upper_cursor.peek_next().map(|(r, _)| r); + RowIterator { + lower_cursor, + upper_cursor_next, + pattern, + } +} + +#[derive(Debug)] +struct RowMatcher<'a> { + substitution: SaturationSubstitution, + atom: SaturationAtom, + cursor: RowIterator<'a>, +} + +impl Iterator for RowMatcher<'_> { + type Item = SaturationSubstitution; + + fn next(&mut self) -> Option { + loop { + let row = self.cursor.next()?; + let mut subst = self.substitution.clone(); + if subst.update(&self.atom.terms, row) { + if let Some(equality) = &self.atom.equality + && !subst.satisfies(*equality) + { + continue; + } + + return Some(subst); + } + } + } +} + +fn join<'a>( + substitution: SaturationSubstitution, + atom: SaturationAtom, + table: &'a BTreeMap, +) -> RowMatcher<'a> { + let cursor = find_all_matches(substitution.bind(&atom.terms), table); + + RowMatcher { + substitution, + atom, + cursor, + } +} + +#[derive(Debug)] +struct ExecutionTree { + init: SaturationSubstitution, + ops: Arc<[JoinOp]>, + index: usize, +} + +#[derive(Debug)] +enum JoinIter<'a> { + Done, + NoOp(SaturationSubstitution), + Join { + inner: Box>, + atom: SaturationAtom, + table: &'a BTreeMap, + current: Option>, + }, +} + +#[derive(Debug, Clone, Copy)] +enum Age { + Old, + New, +} + +#[derive(Debug)] +enum Singular<'a, T> { + #[allow(unused)] + Ref(&'a mut T), + Owned(T), +} + +impl Default for Singular<'_, T> { + fn default() -> Self { + Self::Owned(T::default()) + } +} + +impl<'a, T> Deref for Singular<'a, T> { + type Target = T; + + fn deref(&self) -> &Self::Target { + match self { + Singular::Ref(r) => &*r, + Singular::Owned(o) => o, + } + } +} + +impl<'a, T> DerefMut for Singular<'a, T> { + fn deref_mut(&mut self) -> &mut T { + match self { + Singular::Ref(r) => r, + Singular::Owned(o) => o, + } + } +} + +#[derive(Debug, Default)] +pub(crate) struct DataBase<'a>(HashMap, Singular<'a, BTreeMap>>); + +impl ExecutionTree { + fn pop(&mut self) -> Option<&JoinOp> { + if self.index > 0 { + self.index -= 1; + Some(&self.ops[self.index]) + } else { + None + } + } + + fn execute<'a>(mut self, tables: &'a DataBase) -> JoinIter<'a> { + let Some(op) = self.pop() else { + return JoinIter::NoOp(self.init); + }; + + match op { + JoinOp::Join(atom) => { + let Some(table) = tables.0.get(&atom.predicate) else { + return JoinIter::Done; + }; + + let atom = atom.clone(); + let inner = Box::new(self.execute(tables)); + + JoinIter::Join { + inner, + atom, + table, + current: None, + } + } + // todo: more efficient implementation? + JoinOp::Filter(atom) => { + let Some(table) = tables.0.get(&atom.predicate) else { + return JoinIter::Done; + }; + + let atom = atom.clone(); + let inner = Box::new(self.execute(tables)); + + JoinIter::Join { + inner, + atom, + table, + current: None, + } + } + } + } +} + +impl Iterator for JoinIter<'_> { + type Item = SaturationSubstitution; + + fn next(&mut self) -> Option { + match self { + JoinIter::NoOp(saturation_substitution) => { + let res = saturation_substitution.clone(); + *self = Self::Done; + Some(res) + } + JoinIter::Join { + inner, + atom, + table, + current, + } => loop { + if let Some(current) = current + && let Some(next) = current.next() + { + return Some(next); + } + + let substitution = inner.next()?; + *current = Some(join(substitution, atom.clone(), table)); + }, + JoinIter::Done => None, + } + } +} + +fn fact_from_row(row: &Row, predicate: Arc) -> SaturationFact { + let values = row + .iter() + .map(|element| match element { + RowElement::Value(value) => Some(*value), + _ => None, + }) + .collect::>() + .unwrap_or_else(|| panic!("{row:?}")); + + SaturationFact { predicate, values } +} + +pub(crate) fn saturate(db: &mut DataBase, rules: &mut [SaturationRule]) { + let mut matches = Vec::new(); + + #[cfg(not(test))] + TimedCode::instance() + .sub("Reasoning/Saturation/update") + .start(); + + for (rule_index, rule) in rules.iter_mut().enumerate() { + let predicate = rule + .input_predicates() + .min_by_key(|p| db.0.get(p).map(|t| t.len()).unwrap_or(0)) + .unwrap(); + + for (row, _) in db.0.get(&predicate).iter().flat_map(|table| table.iter()) { + let fact = fact_from_row(row, predicate.clone()); + + for trigger in rule.trigger(&fact) { + matches.extend(trigger.execute(db).map(|row| (row, rule_index))); + } + } + } + + #[cfg(not(test))] + TimedCode::instance() + .sub("Reasoning/Saturation/update") + .stop(); + #[cfg(not(test))] + TimedCode::instance() + .sub("Reasoning/Saturation/loop") + .start(); + + let mut todo = Vec::new(); + while !matches.is_empty() { + todo.clear(); + + for (substitution, rule_index) in matches.drain(..) { + let rule = &rules[rule_index]; + + match &rule.head { + Head::Datalog(atoms) => { + for atom in atoms { + let row = substitution.bind(&atom.terms); + let table = db.0.entry(atom.predicate.clone()).or_default(); + + let mut cursor = table.lower_bound_mut(Bound::Included(&row)); + + if let Some((other_row, _)) = cursor.peek_next() + && other_row == &row + { + continue; + } + + let fact = fact_from_row(&row, atom.predicate.clone()); + + cursor.insert_after(row, Age::New).unwrap(); + todo.push(fact); + } + } + } + } + + for fact in &todo { + for (rule_index, rule) in rules.iter_mut().enumerate() { + for trigger in rule.trigger(fact) { + matches.extend(trigger.execute(db).map(|row| (row, rule_index))); + } + } + } + } + + #[cfg(not(test))] + TimedCode::instance() + .sub("Reasoning/Saturation/loop") + .stop(); +} + +impl DataBase<'_> { + pub fn add_table( + &mut self, + predicate: Arc, + table: impl Iterator>, + ) { + let table = Singular::Owned( + table + .map(|row| (row.into_iter().map(RowElement::Value).collect(), Age::Old)) + .collect(), + ); + + self.0.insert(predicate, table); + } + + pub fn new_facts(&self, predicate: &str) -> impl Iterator + use<'_> { + self.0.get(predicate).into_iter().flat_map(|table| { + table.iter().flat_map(|(row, age)| { + (matches!(age, Age::New)) + .then_some(row.iter().map(|v| match v { + RowElement::Value(storage_value_t) => *storage_value_t, + RowElement::Bottom => unreachable!("sentinel elements are never written"), + RowElement::Top => unreachable!("sentinel elements are never written"), + })) + .into_iter() + .flatten() + }) + }) + } +} + +#[cfg(test)] +mod test { + use std::{ + collections::{BTreeMap, HashMap}, + iter::repeat_n, + sync::Arc, + }; + + use nemo_physical::datatypes::StorageValueT; + + use super::Age; + + use crate::execution::saturation::{ + execution::{DataBase, Row, RowElement, Singular, find_all_matches, saturate}, + model::{BodyTerm, Head, SaturationAtom, SaturationRule, bench_rules}, + }; + + macro_rules! table { + [ $([ $($v:expr),* ],)* ] => { + BTreeMap::from([ $( (Box::from([ $(RowElement::Value(StorageValueT::Id32($v))),* ]), Age::Old), )* ]) + }; + } + + #[test] + fn find_all_matches_works() { + let table: BTreeMap = table![ + [0, 0, 0, 1, 0], + [0, 1, 0, 0, 0], + [0, 1, 0, 1, 2], + [0, 1, 1, 0, 0], + [0, 1, 2, 1, 2], + [1, 0, 0, 0, 0], + [1, 1, 0, 1, 2], + [2, 1, 0, 0, 0], + ]; + + let pattern1: Row = Box::from([ + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Bottom, + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(1)), + RowElement::Bottom, + ]); + + let matches: Vec<_> = find_all_matches(pattern1, &table).collect(); + let expected: Vec<&[RowElement]> = vec![ + &[ + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(1)), + RowElement::Value(StorageValueT::Id32(0)), + ], + &[ + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(1)), + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(1)), + RowElement::Value(StorageValueT::Id32(2)), + ], + ]; + + assert_eq!(matches, expected); + + let pattern = Box::from([ + RowElement::Value(StorageValueT::Id32(1)), + RowElement::Bottom, + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Bottom, + ]); + + let mut iter = find_all_matches(pattern, &table); + let expected: &[RowElement] = &[ + RowElement::Value(StorageValueT::Id32(1)), + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(0)), + RowElement::Value(StorageValueT::Id32(0)), + ]; + assert_eq!( + iter.lower_cursor.peek_next().map(|(row, _)| { + let row: &[RowElement] = row; + row + }), + Some(expected) + ); + assert_eq!(iter.next(), Some(expected)); + assert_eq!(iter.next(), None); + } + + #[test] + fn saturate_bench_rules() { + let n = 10; + let (mut rules, predicate) = bench_rules(n); + let row: Row = repeat_n(RowElement::Value(StorageValueT::Int64(0)), n).collect(); + + let mut db = DataBase(HashMap::from([( + predicate.clone(), + Singular::Owned(BTreeMap::from([(row, Age::Old)])), + )])); + + saturate(&mut db, &mut rules); + + assert_eq!(db.0.get(&predicate).unwrap().len(), 2_usize.pow(n as u32)); + + let new_len = db.new_facts(&predicate).count() / n; + assert_eq!(new_len, 2_usize.pow(n as u32) - 1); + } + + #[test] + fn saturate_multi_join() { + let p1: Arc = Arc::from("p1"); + let p2: Arc = Arc::from("p2"); + let p3: Arc = Arc::from("p3"); + let p4: Arc = Arc::from("p4"); + + let x = BodyTerm::Variable(0); + let y = BodyTerm::Variable(1); + let z = BodyTerm::Variable(2); + + let head = Head::Datalog(Box::from([SaturationAtom { + predicate: p1.clone(), + terms: Box::from([x.clone(), y.clone(), z.clone()]), + equality: Default::default(), + }])); + + let p2_atom = SaturationAtom { + predicate: p2.clone(), + terms: Box::from([x.clone(), y.clone()]), + equality: Default::default(), + }; + + let p2_table: BTreeMap = table![[0, 0],]; + + let p3_atom = SaturationAtom { + predicate: p3.clone(), + terms: Box::from([x.clone(), y.clone()]), + equality: Default::default(), + }; + + let p3_table: BTreeMap = table![[0, 0],]; + + let p4_atom = SaturationAtom { + predicate: p4.clone(), + terms: Box::from([x.clone(), y.clone(), z.clone()]), + equality: Default::default(), + }; + + let p4_table: BTreeMap = table![[0, 0, 0], [0, 0, 1],]; + + let mut db = HashMap::new(); + db.insert(p2.clone(), Singular::Owned(p2_table)); + db.insert(p3.clone(), Singular::Owned(p3_table)); + db.insert(p4.clone(), Singular::Owned(p4_table.clone())); + + let rule = SaturationRule { + body_atoms: Arc::new([p2_atom, p3_atom, p4_atom]), + join_orders: Box::from([None, None, None]), + head, + }; + + let mut db = DataBase(db); + let mut rules = vec![rule]; + + saturate(&mut db, &mut rules); + + assert_eq!( + db.0.get(&p1) + .unwrap_or(&Default::default()) + .keys() + .collect::>(), + p4_table.keys().collect::>() + ); + } +} diff --git a/nemo/src/execution/saturation/model.rs b/nemo/src/execution/saturation/model.rs new file mode 100644 index 000000000..3c9b6b846 --- /dev/null +++ b/nemo/src/execution/saturation/model.rs @@ -0,0 +1,358 @@ +//! Model of rules supported by the saturation algorithm + +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, + sync::Arc, +}; + +use nemo_physical::{datatypes::StorageValueT, management::database::Dict}; + +use crate::rule_model::components::{ + atom::Atom, + literal::Literal, + rule::Rule, + term::{ + primitive::{ground::GroundTerm, variable::Variable, Primitive}, + Term, + }, + IterableVariables, +}; + +pub(crate) type VariableIdx = u16; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +/// Terms supported in the body of [`SaturationRule`]s +pub(crate) enum BodyTerm { + Constant(StorageValueT), + Variable(VariableIdx), + Ignore, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +/// Atoms whose arguments are represented by [`BodyTerm`] +pub(crate) struct SaturationAtom { + pub(super) predicate: Arc, + pub(super) terms: Box<[BodyTerm]>, + pub(super) equality: Option<(VariableIdx, VariableIdx)>, +} + +impl SaturationAtom { + /// Iterate over the variables in a [`SaturationAtom`] + pub(super) fn variables(&self) -> impl Iterator + use<'_> { + self.terms.iter().flat_map(|term| match term { + BodyTerm::Variable(var) => Some(*var), + _ => None, + }) + } +} + +#[derive(Debug, Clone)] +pub(crate) enum Head { + Datalog(Box<[SaturationAtom]>), +} + +pub(crate) type JoinOrder = Arc<[JoinOp]>; + +#[derive(Debug, Clone)] +pub(crate) enum JoinOp { + Join(SaturationAtom), + Filter(SaturationAtom), +} + +#[derive(Debug, Clone)] +pub(crate) struct SaturationRule { + pub(super) body_atoms: Arc<[SaturationAtom]>, + pub(super) join_orders: Box<[Option]>, + pub(super) head: Head, +} + +impl SaturationRule { + pub(super) fn join_order(&mut self, index: usize) -> JoinOrder { + if let Some(order) = &self.join_orders[index] { + order.clone() + } else { + let atom = &self.body_atoms[index]; + let variables: HashSet<_> = atom.variables().collect(); + let mut mask = vec![true; self.body_atoms.len()]; + mask[index] = false; + + let order = compute_join_order(variables, &self.body_atoms, &mut mask); + + self.join_orders[index] = Some(order.clone()); + order + } + } + + #[allow(unused)] + pub(crate) fn input_predicates(&self) -> impl Iterator> + use<'_> { + self.body_atoms.iter().map(|a| a.predicate.clone()) + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct SaturationFact { + pub(super) predicate: Arc, + pub(super) values: Arc<[StorageValueT]>, +} + +#[derive(Default)] +struct Variables(HashMap, u16); + +impl Variables { + fn get(&mut self, var: Cow) -> u16 { + match self.0.get(var.as_ref()) { + Some(index) => *index, + None => { + let index = self.add_fresh(); + self.0.insert(var.to_string(), index); + index + } + } + } + + fn add_fresh(&mut self) -> u16 { + let index = self.1; + self.1 += 1; + index + } +} + +impl SaturationRuleTranslation<'_> { + fn convert_term(&mut self, term: &Term) -> Result { + match GroundTerm::try_from(term.clone()) { + Ok(ground) => { + let value = ground.value().to_storage_value_t_dict(self.dict); + Ok(BodyTerm::Constant(value)) + } + Err(term) => { + let Term::Primitive(Primitive::Variable(var)) = term else { + return Err("not a ground-term or variable"); + }; + + let Variable::Universal(var) = var else { + return Err("existential"); + }; + + match var.name() { + Some(name) => Ok(BodyTerm::Variable(self.variables.get(Cow::Borrowed(name)))), + None => Ok(BodyTerm::Ignore), + } + } + } + } + + fn convert_body_atom(&mut self, atom: &Atom) -> Result { + let predicate = self.interner.create(atom.predicate().name()); + + let mut terms: Box<[BodyTerm]> = atom + .terms() + .map(|term| self.convert_term(term)) + .collect::>()?; + + let mut equality = None; + + for i in 0..terms.len() { + let (left, right) = terms.split_at_mut(i + 1); + if let BodyTerm::Variable(v) = &mut left[i] + && right.iter().any(|other| other == &BodyTerm::Variable(*v)) { + if equality.is_some() { + return Err("only supports a single equality"); + } + + let orig = *v; + *v = self.variables.add_fresh(); + + equality = Some((orig, *v)); + } + } + + Ok(SaturationAtom { + predicate, + terms, + equality, + }) + } + + fn convert_head_atom(&mut self, atom: &Atom) -> Result { + let predicate = self.interner.create(atom.predicate().name()); + + let terms: Box<[BodyTerm]> = atom + .terms() + .map(|term| self.convert_term(term)) + .collect::>()?; + + Ok(SaturationAtom { + predicate, + terms, + equality: Default::default(), + }) + } + + fn convert_literal(&mut self, lit: &Literal) -> Result { + match lit { + Literal::Positive(atom) => self.convert_body_atom(atom), + Literal::Negative(_) => Err("negation"), + Literal::Operation(_) => Err("unsupported operation"), + } + } + + pub(crate) fn convert(&mut self, rule: &Rule) -> Result { + let body: Arc<[SaturationAtom]> = rule + .body() + .iter() + .map(|lit| self.convert_literal(lit)) + .collect::>()?; + + let join_orders: Box<[_]> = std::iter::repeat_n(None, body.len()).collect(); + + let head = if rule.variables().any(Variable::is_existential) { + // existential variable are not supported yet + return Err("existential"); + } else { + Head::Datalog( + rule.head() + .iter() + .map(|atom| self.convert_head_atom(atom)) + .collect::>()?, + ) + }; + + Ok(SaturationRule { + body_atoms: body, + join_orders, + head, + }) + } +} + +pub(crate) struct SaturationRuleTranslation<'a> { + variables: Variables, + interner: Interner, + dict: &'a mut Dict, +} + +impl<'a> SaturationRuleTranslation<'a> { + /// Create at [`SaturationRuleTranslation`] referring to a [`Dict`] + pub(crate) fn new(dict: &'a mut Dict) -> Self { + Self { + variables: Variables::default(), + interner: Interner(HashSet::new()), + dict, + } + } +} + +struct Interner(HashSet>); + +impl Interner { + fn create(&mut self, input: &str) -> Arc { + if let Some(res) = self.0.get(input) { + res.clone() + } else { + self.0.insert(Arc::from(input)); + self.0.get(input).unwrap().clone() + } + } +} + +fn filter_index(variables: &HashSet, atom: &SaturationAtom) -> (i32, i32) { + let mut other_variables = 0; + let mut overlapping_variables = 0; + + for var in atom.variables() { + if variables.contains(&var) { + overlapping_variables += 1; + } else { + other_variables += 1; + } + } + + (other_variables, overlapping_variables) +} + +fn compute_join_order( + mut variables: HashSet, + body: &[SaturationAtom], + mask: &mut [bool], +) -> JoinOrder { + let mut operations = Vec::new(); + + loop { + let mut index = None; + let mut min_new_variables = i32::MAX; + let mut max_overlapping = 0; + + for (current_index, atom) in body + .iter() + .enumerate() + .zip(&mut *mask) + .filter_map(|(atom, flag)| flag.then_some(atom)) + { + let (other, overlap) = filter_index(&variables, atom); + + if other < min_new_variables + || (other == min_new_variables && max_overlapping < overlap) + { + min_new_variables = other; + max_overlapping = overlap; + index = Some(current_index); + } + } + + let Some(index) = index else { + break JoinOrder::from(operations); + }; + + mask[index] = false; + if min_new_variables == 0 { + operations.push(JoinOp::Filter(body[index].clone())); + } else { + operations.push(JoinOp::Join(body[index].clone())); + variables.extend(body[index].variables()); + } + } +} + +#[cfg(test)] +pub(super) fn bench_rules(n: usize) -> (Vec, Arc) { + use std::iter::repeat_n; + + let one = BodyTerm::Constant(StorageValueT::Int64(1)); + let zero = BodyTerm::Constant(StorageValueT::Int64(0)); + let predicate: Arc = Arc::from("p"); + + let rules: Vec<_> = (0..n) + .map(|i| { + let head = (0..VariableIdx::try_from(i).unwrap()) + .map(BodyTerm::Variable) + .chain(Some(one)) + .chain(repeat_n(zero, n - i - 1)); + + let head = SaturationAtom { + predicate: predicate.clone(), + terms: head.collect(), + equality: Default::default(), + }; + + let body = (0..VariableIdx::try_from(i).unwrap()) + .map(BodyTerm::Variable) + .chain(Some(zero)) + .chain(repeat_n(one, n - i - 1)); + + let body = SaturationAtom { + predicate: predicate.clone(), + terms: body.collect(), + equality: Default::default(), + }; + + SaturationRule { + body_atoms: Arc::from([body]), + join_orders: Box::from([None]), + head: Head::Datalog(Box::from([head])), + } + }) + .collect(); + + (rules, predicate) +} diff --git a/nemo/src/execution/selection_strategy/dependency_graph/graph_constructor.rs b/nemo/src/execution/selection_strategy/dependency_graph/graph_constructor.rs index 6de530b54..1c5e49be1 100644 --- a/nemo/src/execution/selection_strategy/dependency_graph/graph_constructor.rs +++ b/nemo/src/execution/selection_strategy/dependency_graph/graph_constructor.rs @@ -2,14 +2,31 @@ use petgraph::{Directed, Graph, adj::NodeIndex}; -use crate::chase_model::{analysis::program_analysis::RuleAnalysis, components::rule::ChaseRule}; +use crate::{ + chase_model::analysis::program_analysis::RuleAnalysis, rule_model::components::tag::Tag, +}; /// Graph that represents a prioritization between rules. pub type DependencyGraph = Graph, (), Directed>; +pub trait PositivePredicateAnalysis { + fn positive_body_predicates(&self) -> impl Iterator; + fn head_predicates(&self) -> impl Iterator; +} + +impl PositivePredicateAnalysis for &RuleAnalysis { + fn positive_body_predicates(&self) -> impl Iterator { + self.positive_body_predicates.iter() + } + + fn head_predicates(&self) -> impl Iterator { + self.head_predicates.iter() + } +} + /// Defines the trait for constructors of depedency graphs. -pub trait DependencyGraphConstructor: std::fmt::Debug { +pub trait DependencyGraphConstructor: std::fmt::Debug { /// Given a list of rules and some additional information, /// construct the dependency graph. - fn build_graph(rules: Vec<&ChaseRule>, rule_analyses: Vec<&RuleAnalysis>) -> DependencyGraph; + fn build_graph(rule_analyses: &[T]) -> DependencyGraph; } diff --git a/nemo/src/execution/selection_strategy/dependency_graph/graph_positive.rs b/nemo/src/execution/selection_strategy/dependency_graph/graph_positive.rs index 21f5f7ea5..51f5934aa 100644 --- a/nemo/src/execution/selection_strategy/dependency_graph/graph_positive.rs +++ b/nemo/src/execution/selection_strategy/dependency_graph/graph_positive.rs @@ -1,10 +1,10 @@ //! Module for defining a graph that checks //! when the application of a rule might lead to the application of another. -use std::collections::HashMap; +use std::{collections::HashMap, fmt::Debug}; use crate::{ - chase_model::{analysis::program_analysis::RuleAnalysis, components::rule::ChaseRule}, + execution::selection_strategy::dependency_graph::graph_constructor::PositivePredicateAnalysis, rule_model::components::tag::Tag, }; @@ -15,16 +15,17 @@ use super::graph_constructor::{DependencyGraph, DependencyGraphConstructor}; #[derive(Debug, Copy, Clone)] pub struct GraphConstructorPositive {} -impl DependencyGraphConstructor for GraphConstructorPositive { - fn build_graph(rules: Vec<&ChaseRule>, rule_analyses: Vec<&RuleAnalysis>) -> DependencyGraph { - debug_assert!(rules.len() == rule_analyses.len()); - let rule_count = rules.len(); +impl DependencyGraphConstructor + for GraphConstructorPositive +{ + fn build_graph(rule_analyses: &[T]) -> DependencyGraph { + let rule_count = rule_analyses.len(); let mut predicate_to_rules_body = HashMap::>::new(); let mut predicate_to_rules_head = HashMap::>::new(); for (rule_index, rule_analysis) in rule_analyses.iter().enumerate() { - for body_predicate in &rule_analysis.positive_body_predicates { + for body_predicate in rule_analysis.positive_body_predicates() { let indices = predicate_to_rules_body .entry(body_predicate.clone()) .or_default(); @@ -32,7 +33,7 @@ impl DependencyGraphConstructor for GraphConstructorPositive { indices.push(rule_index); } - for head_predicate in &rule_analysis.head_predicates { + for head_predicate in rule_analysis.head_predicates() { let indices = predicate_to_rules_head .entry(head_predicate.clone()) .or_default(); diff --git a/nemo/src/execution/selection_strategy/strategy.rs b/nemo/src/execution/selection_strategy/strategy.rs index b7767246d..c691b36a6 100644 --- a/nemo/src/execution/selection_strategy/strategy.rs +++ b/nemo/src/execution/selection_strategy/strategy.rs @@ -2,7 +2,7 @@ use thiserror::Error; -use crate::chase_model::{analysis::program_analysis::RuleAnalysis, components::rule::ChaseRule}; +use crate::chase_model::analysis::program_analysis::RuleAnalysis; /// Errors that can occur while creating a strategy. #[derive(Error, Debug, Copy, Clone)] @@ -14,17 +14,18 @@ pub enum SelectionStrategyError { /// Trait that defines a strategy for rule execution, /// namely the order in which the rules are applied in. -pub trait RuleSelectionStrategy: std::fmt::Debug { +pub trait RuleSelectionStrategy: std::fmt::Debug + Sized { /// Create a new [RuleSelectionStrategy] object. - fn new( - rules: Vec<&ChaseRule>, - rule_analyses: Vec<&RuleAnalysis>, - ) -> Result - where - Self: Sized; + fn new(rule_analyses: Vec<&RuleAnalysis>) -> Result; /// Return the index of the next rule that should be executed. /// Returns `None` if there are no more rules to be applied /// and the execution should therefore stop. fn next_rule(&mut self, new_derivations: Option) -> Option; } + +/// A [`RuleSelectionStrategy`] which is aware of the SCCs of a program +pub trait MetaStrategy: RuleSelectionStrategy { + /// Get the SCC which the currently selected rule belongs to + fn current_scc(&self) -> Box<[usize]>; +} diff --git a/nemo/src/execution/selection_strategy/strategy_graph.rs b/nemo/src/execution/selection_strategy/strategy_graph.rs index 144187fd9..a9306d642 100644 --- a/nemo/src/execution/selection_strategy/strategy_graph.rs +++ b/nemo/src/execution/selection_strategy/strategy_graph.rs @@ -2,7 +2,10 @@ use std::marker::PhantomData; -use crate::chase_model::{analysis::program_analysis::RuleAnalysis, components::rule::ChaseRule}; +use crate::{ + chase_model::analysis::program_analysis::RuleAnalysis, + execution::selection_strategy::strategy::MetaStrategy, +}; use super::{ dependency_graph::graph_constructor::DependencyGraphConstructor, @@ -11,10 +14,7 @@ use super::{ /// Defines a rule execution strategy which respects certain dependencies between rules #[derive(Debug)] -pub struct StrategyDependencyGraph< - GraphConstructor: DependencyGraphConstructor, - SubStrategy: RuleSelectionStrategy, -> { +pub struct StrategyDependencyGraph { _constructor: PhantomData, ordered_sccs: Vec>, @@ -23,14 +23,13 @@ pub struct StrategyDependencyGraph< current_scc_index: usize, } -impl - RuleSelectionStrategy for StrategyDependencyGraph +impl< + GraphConstructor: for<'a> DependencyGraphConstructor<&'a RuleAnalysis>, + SubStrategy: RuleSelectionStrategy, + > RuleSelectionStrategy for StrategyDependencyGraph { - fn new( - rules: Vec<&ChaseRule>, - rule_analyses: Vec<&RuleAnalysis>, - ) -> Result { - let dependency_graph = GraphConstructor::build_graph(rules.clone(), rule_analyses.clone()); + fn new(rule_analyses: Vec<&RuleAnalysis>) -> Result { + let dependency_graph = GraphConstructor::build_graph(&rule_analyses); let graph_scc = petgraph::algo::condensation(dependency_graph, true); let scc_sorted = petgraph::algo::toposort(&graph_scc, None) .expect("The input graph is assured to be acyclic"); @@ -41,12 +40,11 @@ impl = scc_rule_indices.iter().map(|&i| rules[i]).collect(); let sub_analyses: Vec<&RuleAnalysis> = scc_rule_indices.iter().map(|&i| rule_analyses[i]).collect(); ordered_sccs.push(scc_rule_indices); - substrategies.push(SubStrategy::new(sub_rules, sub_analyses)?); + substrategies.push(SubStrategy::new(sub_analyses)?); } Ok(Self { @@ -72,3 +70,16 @@ impl DependencyGraphConstructor<&'a RuleAnalysis>, + SubStrategy: RuleSelectionStrategy, + > MetaStrategy for StrategyDependencyGraph +{ + fn current_scc(&self) -> Box<[usize]> { + self.ordered_sccs[self.current_scc_index] + .iter() + .cloned() + .collect() + } +} diff --git a/nemo/src/execution/selection_strategy/strategy_random.rs b/nemo/src/execution/selection_strategy/strategy_random.rs index 9668a0a56..a5d597d6d 100644 --- a/nemo/src/execution/selection_strategy/strategy_random.rs +++ b/nemo/src/execution/selection_strategy/strategy_random.rs @@ -4,7 +4,7 @@ use std::collections::HashSet; use rand::Rng; -use crate::chase_model::{analysis::program_analysis::RuleAnalysis, components::rule::ChaseRule}; +use crate::chase_model::analysis::program_analysis::RuleAnalysis; use super::strategy::{RuleSelectionStrategy, SelectionStrategyError}; @@ -20,10 +20,7 @@ pub struct StrategyRandom { impl RuleSelectionStrategy for StrategyRandom { /// Create new [StrategyRandom]. - fn new( - _rules: Vec<&ChaseRule>, - rule_analyses: Vec<&RuleAnalysis>, - ) -> Result { + fn new(rule_analyses: Vec<&RuleAnalysis>) -> Result { Ok(Self { rule_count: rule_analyses.len(), no_derivations: HashSet::new(), diff --git a/nemo/src/execution/selection_strategy/strategy_round_robin.rs b/nemo/src/execution/selection_strategy/strategy_round_robin.rs index 6a72e341a..972594414 100644 --- a/nemo/src/execution/selection_strategy/strategy_round_robin.rs +++ b/nemo/src/execution/selection_strategy/strategy_round_robin.rs @@ -1,6 +1,6 @@ //! Defines the execution strategy by which each rule is applied in the order it appears. -use crate::chase_model::{analysis::program_analysis::RuleAnalysis, components::rule::ChaseRule}; +use crate::chase_model::analysis::program_analysis::RuleAnalysis; use super::strategy::{RuleSelectionStrategy, SelectionStrategyError}; @@ -21,10 +21,7 @@ pub struct StrategyRoundRobin { impl RuleSelectionStrategy for StrategyRoundRobin { /// Create new [StrategyRoundRobin]. - fn new( - _rules: Vec<&ChaseRule>, - rule_analyses: Vec<&RuleAnalysis>, - ) -> Result { + fn new(rule_analyses: Vec<&RuleAnalysis>) -> Result { let self_recursive = rule_analyses.iter().map(|a| a.is_recursive).collect(); Ok(Self { diff --git a/nemo/src/execution/selection_strategy/strategy_stratified_negation.rs b/nemo/src/execution/selection_strategy/strategy_stratified_negation.rs index acc26dd76..380dda59e 100644 --- a/nemo/src/execution/selection_strategy/strategy_stratified_negation.rs +++ b/nemo/src/execution/selection_strategy/strategy_stratified_negation.rs @@ -5,8 +5,8 @@ use std::collections::HashMap; use petgraph::Directed; use crate::{ - chase_model::{analysis::program_analysis::RuleAnalysis, components::rule::ChaseRule}, - rule_model::components::tag::Tag, + chase_model::analysis::program_analysis::RuleAnalysis, + execution::selection_strategy::strategy::MetaStrategy, rule_model::components::tag::Tag, util::labeled_graph::LabeledGraph, }; @@ -22,7 +22,7 @@ type NegationGraph = LabeledGraph; /// Defines a strategy where rule are divided into different strata /// which are executed in succession. -/// Entering a new statum implies that the table for every negated atom +/// Entering a new stratum implies that the table for every negated atom /// will not get any new elements. #[derive(Debug)] pub struct StrategyStratifiedNegation { @@ -107,10 +107,7 @@ impl RuleSelectionStrategy for StrategyStratifiedNegation { /// Create new [StrategyStratifiedNegation]. - fn new( - rules: Vec<&ChaseRule>, - rule_analyses: Vec<&RuleAnalysis>, - ) -> Result { + fn new(rule_analyses: Vec<&RuleAnalysis>) -> Result { let graph = Self::build_graph(&rule_analyses); if let Some(mut strata) = graph.stratify(&[EdgeLabel::Negative]) { @@ -119,11 +116,10 @@ impl RuleSelectionStrategy for stratum in &mut strata { stratum.sort(); - let sub_rules: Vec<&ChaseRule> = stratum.iter().map(|&i| rules[i]).collect(); let sub_analyses: Vec<&RuleAnalysis> = stratum.iter().map(|&i| rule_analyses[i]).collect(); - substrategies.push(SubStrategy::new(sub_rules, sub_analyses)?); + substrategies.push(SubStrategy::new(sub_analyses)?); } for stratum in &mut strata { @@ -159,3 +155,14 @@ impl RuleSelectionStrategy None } } + +impl MetaStrategy for StrategyStratifiedNegation { + fn current_scc(&self) -> Box<[usize]> { + let inner = self.substrategies[self.current_stratum].current_scc(); + + inner + .iter() + .map(|i| self.ordered_strata[self.current_stratum][*i]) + .collect() + } +} diff --git a/nemo/src/lib.rs b/nemo/src/lib.rs index 77e750ace..3222d3fb6 100644 --- a/nemo/src/lib.rs +++ b/nemo/src/lib.rs @@ -19,6 +19,7 @@ #![feature(iter_intersperse)] #![feature(str_from_raw_parts)] #![feature(associated_type_defaults)] +#![feature(btree_cursors)] /// The crate for underlying physical operations. pub extern crate nemo_physical; diff --git a/nemo/src/table_manager.rs b/nemo/src/table_manager.rs index 05904e0af..5e8759954 100644 --- a/nemo/src/table_manager.rs +++ b/nemo/src/table_manager.rs @@ -3,6 +3,7 @@ use crate::{error::Error, rule_model::components::tag::Tag}; use nemo_physical::{ + datatypes::StorageValueT, datavalues::any_datavalue::AnyDataValue, management::{ bytesized::ByteSized, @@ -392,6 +393,13 @@ impl TableManager { Ok(self.database.trie_row_iterator(trie)?) } + pub(crate) async fn table_raw_row_iterator( + &mut self, + id: PermanentTableId, + ) -> Result> + '_, Error> { + Ok(self.database.table_raw_row_iterator(id).await?) + } + /// Combine all subtables of a predicate into one table /// and return the [PermanentTableId] of that new table. pub(crate) async fn combine_predicate( @@ -491,7 +499,7 @@ impl TableManager { /// Add a [Trie] as a subtable of a predicate. /// Predicate must be registered before calling this function. #[allow(dead_code)] - fn add_table(&mut self, predicate: Tag, step: usize, order: ColumnOrder, trie: Trie) { + pub fn add_table(&mut self, predicate: Tag, step: usize, order: ColumnOrder, trie: Trie) { let name = self.generate_table_name(&predicate, &order, step); let table_id = self.database.register_add_trie(&name, order, trie); @@ -763,6 +771,10 @@ impl TableManager { .execute_plan_trie(subtable_plan.execution_plan) .await?) } + + pub fn known_predicates(&self) -> impl Iterator { + self.predicate_subtables.keys() + } } #[cfg(test)]