Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions raco/backends/cpp/cpp.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,10 +596,7 @@ def opt_rules(self, **kwargs):
if kwargs.get('external_indexing'):
CBaseLanguage.set_external_indexing(True)

# flatten the rules lists
rule_list = list(itertools.chain(*rule_grps_sequence))
for rule_list in rule_grps_sequence:
rules.Rule.apply_disable_flags(rule_list, *kwargs.keys())

# disable specified rules
rules.Rule.apply_disable_flags(rule_list, *kwargs.keys())

return rule_list
return rule_grps_sequence
19 changes: 6 additions & 13 deletions raco/backends/logical.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,9 @@ class OptLogicalAlgebra(Algebra):

@staticmethod
def opt_rules(**kwargs):
return [rules.RemoveTrivialSequences(),
rules.SimpleGroupBy(),
rules.SplitSelects(),
rules.PushSelects(),
rules.MergeSelects(),
rules.ProjectToDistinctColumnSelect(),
rules.JoinToProjectingJoin(),
rules.PushApply(),
rules.RemoveUnusedColumns(),
rules.PushApply(),
rules.RemoveUnusedColumns(),
rules.PushApply(),
rules.DeDupBroadcastInputs()]
return [rules.remove_trivial_sequences,
rules.simple_group_by,
rules.push_select,
rules.push_project,
rules.push_apply,
[rules.DeDupBroadcastInputs()]]
21 changes: 10 additions & 11 deletions raco/backends/myria/myria.py
Original file line number Diff line number Diff line change
Expand Up @@ -1524,7 +1524,8 @@ def add_hyper_shuffle():
if not isinstance(expr, algebra.NaryJoin):
return expr
# check if HC shuffle has been placed before
shuffled_child = [isinstance(op, algebra.HyperCubeShuffle)
shuffled_child = [isinstance(op, algebra.HyperCubeShuffle) or
isinstance(op, algebra.OrderBy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this only happen if OrderByBeforeNaryJoin was somehow applied before HCShuffleBeforeNaryJoin?

for op in list(expr.children())]
if all(shuffled_child): # already shuffled
assert len(expr.children()) > 0
Expand Down Expand Up @@ -1731,6 +1732,8 @@ def insert_split_before_heavy(self, op):
encounter a heavyweight operator."""
if isinstance(op, MyriaAlgebra.fragment_leaves):
return op
if isinstance(op, algebra.Split):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug?

return op

if isinstance(op, InsertSplit.heavy_ops):
return algebra.Split(op)
Expand Down Expand Up @@ -2197,13 +2200,11 @@ def opt_rules(self, **kwargs):

rule_grps_sequence = opt_grps_sequence + compile_grps_sequence

# flatten the rules lists
rule_list = list(itertools.chain(*rule_grps_sequence))

# disable specified rules
rules.Rule.apply_disable_flags(rule_list, *kwargs.keys())
for l in rule_grps_sequence:
rules.Rule.apply_disable_flags(l, *kwargs.keys())

return rule_list
return rule_grps_sequence


class MyriaHyperCubeAlgebra(MyriaAlgebra):
Expand Down Expand Up @@ -2261,13 +2262,11 @@ def opt_rules(self, **kwargs):

rule_grps_sequence = opt_grps_sequence + compile_grps_sequence

# flatten the rules lists
rule_list = list(itertools.chain(*rule_grps_sequence))

# disable specified rules
rules.Rule.apply_disable_flags(rule_list, *kwargs.keys())
for l in rule_grps_sequence:
rules.Rule.apply_disable_flags(l, *kwargs.keys())

return rule_list
return rule_grps_sequence

def __init__(self, catalog=None):
self.catalog = catalog
Expand Down
12 changes: 6 additions & 6 deletions raco/backends/radish/radish.py
Original file line number Diff line number Diff line change
Expand Up @@ -1638,7 +1638,7 @@ def __init__(self, array_rep, memory_scan_class=GrappaMemoryScan):

def fire(self, expr):
if isinstance(expr, algebra.Scan) \
and not isinstance(expr, GrappaFileScan):
and not isinstance(expr, cppcommon.CBaseFileScan):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug?

return self._memory_scan_class(
GrappaFileScan(self._array_rep,
expr.relation_key,
Expand Down Expand Up @@ -2314,6 +2314,8 @@ class GrappaWhileCondition(rules.Rule):
result of the condition table"""

def fire(self, expr):
if isinstance(expr, GrappaDoWhile):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug?

return expr
if isinstance(expr, algebra.DoWhile):
newdw = rules.OneToOne(algebra.DoWhile, GrappaDoWhile).fire(expr)
newdw.copy(expr)
Expand Down Expand Up @@ -2552,10 +2554,8 @@ def opt_rules(self,
if external_indexing:
CBaseLanguage.set_external_indexing(True)

# flatten the rules lists
rule_list = list(itertools.chain(*rule_grps_sequence))

# disable specified rules
rules.Rule.apply_disable_flags(rule_list, *kwargs.keys())
for rule_list in rule_grps_sequence:
rules.Rule.apply_disable_flags(rule_list, *kwargs.keys())

return rule_list
return rule_grps_sequence
4 changes: 2 additions & 2 deletions raco/backends/sparql/sparql.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def formatemitter(name, exp):
class SPARQLAlgebra(Algebra):

def opt_rules(self, **kwargs):
rules = [
rules = [[
raco.rules.CrossProduct2Join(),
raco.rules.SplitSelects(),
raco.rules.PushSelects(),
Expand All @@ -162,6 +162,6 @@ def opt_rules(self, **kwargs):
raco.rules.OneToOne(algebra.Select, SPARQLSelect),
raco.rules.OneToOne(algebra.Apply, SPARQLApply),
raco.rules.OneToOne(algebra.Join, SPARQLJoin),
]
]]

return rules
54 changes: 32 additions & 22 deletions raco/compile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import raco.viz as viz
import os
from raco.utility import colored
from rules import RemoveUnusedColumns
from raco.backends.myria import MyriaLeftDeepTreeAlgebra

import logging
LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -35,31 +37,39 @@ def write_if_enabled(self, plan, title):
self.ind += 1


def optimize_by_rules(expr, rules):
def optimize_by_rules(expr, rules_lists):
writer = PlanWriter()
writer.write_if_enabled(expr, "before rules")

for rule in rules:
def recursiverule(e):
newe = rule(e)
writer.write_if_enabled(newe, str(rule))
if newe.stop_recursion:
return newe

# log the optimizer step
if str(e) == str(newe):
LOG.debug("apply rule %s (no effect)\n" +
" %s \n", rule, e)
else:
LOG.debug("apply rule %s\n" +
colored(" -", "red") + " %s" + "\n" +
colored(" +", "green") + " %s", rule, e, newe)

newe.apply(recursiverule)

return newe
expr = recursiverule(expr)

for rules in rules_lists:
for i in range(20):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this lands in an infinite recursion, it'll blow the python stack soon enough, right? Do we really need an explicit cutoff?

changed = False
for rule in rules:
def recursiverule(e):
newe = rule(e)
writer.write_if_enabled(newe, str(rule))
if newe.stop_recursion:
return newe

# log the optimizer step
if str(e) == str(newe):
LOG.debug("apply rule %s (no effect)\n %s \n", rule, e)
else:
LOG.debug("apply rule %s\n" +
colored(" -", "red") + " %s" + "\n" +
colored(" +", "green") + " %s",
rule, e, newe)

newe.apply(recursiverule)
return newe
expr_old = str(expr)
expr = recursiverule(expr)
if expr_old != str(expr):
changed = True
if not changed:
break
if changed:
print "Rule set %s did not converge in 20 iterations" % rules
return expr


Expand Down
33 changes: 8 additions & 25 deletions raco/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,30 +557,16 @@ def convert(n):

elif isinstance(child, algebra.ProjectingJoin):
emits = op.get_unnamed_emit_exprs()

# If this apply is only AttributeRefs and the columns already
# have the correct names, we can push it into the ProjectingJoin
if (all(isinstance(e, expression.AttributeRef) for e in emits) and
len(set(emits)) == len(emits)):
new_cols = [child.output_columns[e.position] for e in emits]
# We need to ensure that left columns come before right cols
left_sch = child.left.scheme()
right_sch = child.right.scheme()
combined = left_sch + right_sch
left_len = len(left_sch)
new_cols = [expression.to_unnamed_recursive(e, combined)
for e in new_cols]
side = [e.position >= left_len for e in new_cols]
if sorted(side) == side:
# Left columns do come before right cols
new_pj = algebra.ProjectingJoin(
condition=child.condition, left=child.left,
right=child.right, output_columns=new_cols)
if new_pj.scheme() == op.scheme():
return new_pj

accessed = sorted(set(itertools.chain(*(accessed_columns(e)
for e in emits))))
new_cols = [child.output_columns[p] for p in accessed]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you explain why you removed all this code?

# We need to ensure that left columns come before right cols
left_sch = child.left.scheme()
right_sch = child.right.scheme()
combined = left_sch + right_sch
new_cols = [expression.to_unnamed_recursive(e, combined)
for e in new_cols]

index_map = {a: i for (i, a) in enumerate(accessed)}
child.output_columns = [child.output_columns[i] for i in accessed]
for e in emits:
Expand Down Expand Up @@ -847,9 +833,6 @@ def __str__(self):
# For now, run twice and finish with PushApply.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove obsolete comment

PushApply(),
RemoveUnusedColumns(),
PushApply(),
RemoveUnusedColumns(),
PushApply(),
RemoveNoOpApply(),
]

Expand Down