From 42831986fe403bdb294ae4eb6a646224fff96979 Mon Sep 17 00:00:00 2001 From: jingjingwang Date: Tue, 25 Jul 2017 23:16:56 -0700 Subject: [PATCH 1/2] apply lists of rules recursively --- raco/backends/cpp/cpp.py | 9 ++---- raco/backends/logical.py | 19 ++++-------- raco/backends/myria/myria.py | 21 +++++++------ raco/backends/radish/radish.py | 8 ++--- raco/backends/sparql/sparql.py | 4 +-- raco/compile.py | 54 ++++++++++++++++++++-------------- raco/rules.py | 33 +++++---------------- 7 files changed, 64 insertions(+), 84 deletions(-) diff --git a/raco/backends/cpp/cpp.py b/raco/backends/cpp/cpp.py index d60cc801..3dd71018 100644 --- a/raco/backends/cpp/cpp.py +++ b/raco/backends/cpp/cpp.py @@ -596,10 +596,7 @@ def opt_rules(self, **kwargs): if kwargs.get('external_indexing'): CBaseLanguage.set_external_indexing(True) - # flatten the rules lists - rule_list = list(itertools.chain(*rule_grps_sequence)) + for rule_list in rule_grps_sequence: + rules.Rule.apply_disable_flags(rule_list, *kwargs.keys()) - # disable specified rules - rules.Rule.apply_disable_flags(rule_list, *kwargs.keys()) - - return rule_list + return rule_grps_sequence diff --git a/raco/backends/logical.py b/raco/backends/logical.py index a8cd1105..77ff7eb8 100644 --- a/raco/backends/logical.py +++ b/raco/backends/logical.py @@ -6,16 +6,9 @@ class OptLogicalAlgebra(Algebra): @staticmethod def opt_rules(**kwargs): - return [rules.RemoveTrivialSequences(), - rules.SimpleGroupBy(), - rules.SplitSelects(), - rules.PushSelects(), - rules.MergeSelects(), - rules.ProjectToDistinctColumnSelect(), - rules.JoinToProjectingJoin(), - rules.PushApply(), - rules.RemoveUnusedColumns(), - rules.PushApply(), - rules.RemoveUnusedColumns(), - rules.PushApply(), - rules.DeDupBroadcastInputs()] + return [rules.remove_trivial_sequences, + rules.simple_group_by, + rules.push_select, + rules.push_project, + rules.push_apply, + [rules.DeDupBroadcastInputs()]] diff --git a/raco/backends/myria/myria.py b/raco/backends/myria/myria.py index 53db7a55..0087e80e 100644 --- a/raco/backends/myria/myria.py +++ b/raco/backends/myria/myria.py @@ -1524,7 +1524,8 @@ def add_hyper_shuffle(): if not isinstance(expr, algebra.NaryJoin): return expr # check if HC shuffle has been placed before - shuffled_child = [isinstance(op, algebra.HyperCubeShuffle) + shuffled_child = [isinstance(op, algebra.HyperCubeShuffle) or + isinstance(op, algebra.OrderBy) for op in list(expr.children())] if all(shuffled_child): # already shuffled assert len(expr.children()) > 0 @@ -1731,6 +1732,8 @@ def insert_split_before_heavy(self, op): encounter a heavyweight operator.""" if isinstance(op, MyriaAlgebra.fragment_leaves): return op + if isinstance(op, algebra.Split): + return op if isinstance(op, InsertSplit.heavy_ops): return algebra.Split(op) @@ -2197,13 +2200,11 @@ def opt_rules(self, **kwargs): rule_grps_sequence = opt_grps_sequence + compile_grps_sequence - # flatten the rules lists - rule_list = list(itertools.chain(*rule_grps_sequence)) - # disable specified rules - rules.Rule.apply_disable_flags(rule_list, *kwargs.keys()) + for l in rule_grps_sequence: + rules.Rule.apply_disable_flags(l, *kwargs.keys()) - return rule_list + return rule_grps_sequence class MyriaHyperCubeAlgebra(MyriaAlgebra): @@ -2261,13 +2262,11 @@ def opt_rules(self, **kwargs): rule_grps_sequence = opt_grps_sequence + compile_grps_sequence - # flatten the rules lists - rule_list = list(itertools.chain(*rule_grps_sequence)) - # disable specified rules - rules.Rule.apply_disable_flags(rule_list, *kwargs.keys()) + for l in rule_grps_sequence: + rules.Rule.apply_disable_flags(l, *kwargs.keys()) - return rule_list + return rule_grps_sequence def __init__(self, catalog=None): self.catalog = catalog diff --git a/raco/backends/radish/radish.py b/raco/backends/radish/radish.py index 0844e448..f31adcf3 100644 --- a/raco/backends/radish/radish.py +++ b/raco/backends/radish/radish.py @@ -2552,10 +2552,8 @@ def opt_rules(self, if external_indexing: CBaseLanguage.set_external_indexing(True) - # flatten the rules lists - rule_list = list(itertools.chain(*rule_grps_sequence)) - # disable specified rules - rules.Rule.apply_disable_flags(rule_list, *kwargs.keys()) + for rule_list in rule_grps_sequence: + rules.Rule.apply_disable_flags(rule_list, *kwargs.keys()) - return rule_list + return rule_grps_sequence diff --git a/raco/backends/sparql/sparql.py b/raco/backends/sparql/sparql.py index 33a84596..0cd92882 100644 --- a/raco/backends/sparql/sparql.py +++ b/raco/backends/sparql/sparql.py @@ -152,7 +152,7 @@ def formatemitter(name, exp): class SPARQLAlgebra(Algebra): def opt_rules(self, **kwargs): - rules = [ + rules = [[ raco.rules.CrossProduct2Join(), raco.rules.SplitSelects(), raco.rules.PushSelects(), @@ -162,6 +162,6 @@ def opt_rules(self, **kwargs): raco.rules.OneToOne(algebra.Select, SPARQLSelect), raco.rules.OneToOne(algebra.Apply, SPARQLApply), raco.rules.OneToOne(algebra.Join, SPARQLJoin), - ] + ]] return rules diff --git a/raco/compile.py b/raco/compile.py index a0470478..f8ab7a46 100644 --- a/raco/compile.py +++ b/raco/compile.py @@ -5,6 +5,8 @@ import raco.viz as viz import os from raco.utility import colored +from rules import RemoveUnusedColumns +from raco.backends.myria import MyriaLeftDeepTreeAlgebra import logging LOG = logging.getLogger(__name__) @@ -35,31 +37,39 @@ def write_if_enabled(self, plan, title): self.ind += 1 -def optimize_by_rules(expr, rules): +def optimize_by_rules(expr, rules_lists): writer = PlanWriter() writer.write_if_enabled(expr, "before rules") - for rule in rules: - def recursiverule(e): - newe = rule(e) - writer.write_if_enabled(newe, str(rule)) - if newe.stop_recursion: - return newe - - # log the optimizer step - if str(e) == str(newe): - LOG.debug("apply rule %s (no effect)\n" + - " %s \n", rule, e) - else: - LOG.debug("apply rule %s\n" + - colored(" -", "red") + " %s" + "\n" + - colored(" +", "green") + " %s", rule, e, newe) - - newe.apply(recursiverule) - - return newe - expr = recursiverule(expr) - + for rules in rules_lists: + for i in range(20): + changed = False + for rule in rules: + def recursiverule(e): + newe = rule(e) + writer.write_if_enabled(newe, str(rule)) + if newe.stop_recursion: + return newe + + # log the optimizer step + if str(e) == str(newe): + LOG.debug("apply rule %s (no effect)\n %s \n", rule, e) + else: + LOG.debug("apply rule %s\n" + + colored(" -", "red") + " %s" + "\n" + + colored(" +", "green") + " %s", + rule, e, newe) + + newe.apply(recursiverule) + return newe + expr_old = str(expr) + expr = recursiverule(expr) + if expr_old != str(expr): + changed = True + if not changed: + break + if changed: + print "Rule set %s did not converge in 20 iterations" % rules return expr diff --git a/raco/rules.py b/raco/rules.py index 664744ed..c9f85c3b 100644 --- a/raco/rules.py +++ b/raco/rules.py @@ -557,30 +557,16 @@ def convert(n): elif isinstance(child, algebra.ProjectingJoin): emits = op.get_unnamed_emit_exprs() - - # If this apply is only AttributeRefs and the columns already - # have the correct names, we can push it into the ProjectingJoin - if (all(isinstance(e, expression.AttributeRef) for e in emits) and - len(set(emits)) == len(emits)): - new_cols = [child.output_columns[e.position] for e in emits] - # We need to ensure that left columns come before right cols - left_sch = child.left.scheme() - right_sch = child.right.scheme() - combined = left_sch + right_sch - left_len = len(left_sch) - new_cols = [expression.to_unnamed_recursive(e, combined) - for e in new_cols] - side = [e.position >= left_len for e in new_cols] - if sorted(side) == side: - # Left columns do come before right cols - new_pj = algebra.ProjectingJoin( - condition=child.condition, left=child.left, - right=child.right, output_columns=new_cols) - if new_pj.scheme() == op.scheme(): - return new_pj - accessed = sorted(set(itertools.chain(*(accessed_columns(e) for e in emits)))) + new_cols = [child.output_columns[p] for p in accessed] + # We need to ensure that left columns come before right cols + left_sch = child.left.scheme() + right_sch = child.right.scheme() + combined = left_sch + right_sch + new_cols = [expression.to_unnamed_recursive(e, combined) + for e in new_cols] + index_map = {a: i for (i, a) in enumerate(accessed)} child.output_columns = [child.output_columns[i] for i in accessed] for e in emits: @@ -847,9 +833,6 @@ def __str__(self): # For now, run twice and finish with PushApply. PushApply(), RemoveUnusedColumns(), - PushApply(), - RemoveUnusedColumns(), - PushApply(), RemoveNoOpApply(), ] From 9ed2b5ad43f15e0f581eb45a44baf96e0a03b349 Mon Sep 17 00:00:00 2001 From: jingjingwang Date: Wed, 26 Jul 2017 02:35:55 -0700 Subject: [PATCH 2/2] fix grappa bug --- raco/backends/radish/radish.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/raco/backends/radish/radish.py b/raco/backends/radish/radish.py index f31adcf3..dc57f5f1 100644 --- a/raco/backends/radish/radish.py +++ b/raco/backends/radish/radish.py @@ -1638,7 +1638,7 @@ def __init__(self, array_rep, memory_scan_class=GrappaMemoryScan): def fire(self, expr): if isinstance(expr, algebra.Scan) \ - and not isinstance(expr, GrappaFileScan): + and not isinstance(expr, cppcommon.CBaseFileScan): return self._memory_scan_class( GrappaFileScan(self._array_rep, expr.relation_key, @@ -2314,6 +2314,8 @@ class GrappaWhileCondition(rules.Rule): result of the condition table""" def fire(self, expr): + if isinstance(expr, GrappaDoWhile): + return expr if isinstance(expr, algebra.DoWhile): newdw = rules.OneToOne(algebra.DoWhile, GrappaDoWhile).fire(expr) newdw.copy(expr)