Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions raco/algebra.py
Original file line number Diff line number Diff line change
Expand Up @@ -1941,6 +1941,26 @@ def __repr__(self):
return "{op}({pl!r})".format(op=self.opname(), pl=self.input)


class Stream(UnaryOperator):

"""Stream query results back to the client."""

def __init__(self, input=None):
UnaryOperator.__init__(self, input)

def num_tuples(self):
return self.input.num_tuples()

def partitioning(self):
return self.input.partitioning()

def shortStr(self):
return "{op}".format(op=self.opname())

def __repr__(self):
return "{op}({pl!r})".format(op=self.opname(), pl=self.input)


class Parallel(NaryOperator):

"""Execute a set of independent plans in parallel."""
Expand Down
58 changes: 54 additions & 4 deletions raco/backends/myria/myria.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,33 @@ def compileme(self, inputid):
}


class MyriaStreamingSink(algebra.Stream, MyriaOperator):

"""A Myria StreamingSink"""

def __init__(self, input):
algebra.UnaryOperator.__init__(self, input)

def num_tuples(self):
return self.input.num_tuples()

def partitioning(self):
# TODO: have a way to say it is on a specific worker
return RepresentationProperties()

def shortStr(self):
return "%s" % self.opname()

def compileme(self, inputid):
return {
"opType": "StreamingSink",
"argChild": inputid,
}

def __repr__(self):
return "{op}({inp!r})".format(op=self.opname(), inp=self.input)


class MyriaAppendTemp(algebra.AppendTemp, MyriaOperator):

def compileme(self, inputid):
Expand Down Expand Up @@ -622,6 +649,14 @@ def compileme(self, inputid):
raise NotImplementedError('shouldn''t ever get here, should be turned into CP-CC pair') # noqa


class MyriaStream(algebra.Stream, MyriaOperator):

"""Represents a streaming sink operator"""

def compileme(self, inputid):
raise NotImplementedError('shouldn''t ever get here, should be turned into CP-CC-StreamingSink triple') # noqa


class MyriaDupElim(algebra.Distinct, MyriaOperator):

"""Represents duplicate elimination"""
Expand Down Expand Up @@ -1838,6 +1873,18 @@ def fire(self, expr):
return expr


class ExpandStreamSink(rules.Rule):

def fire(self, expr):
if not isinstance(expr, MyriaStream):
return expr

producer = MyriaCollectProducer(expr.input, None)
consumer = MyriaCollectConsumer(producer)
sink = MyriaStreamingSink(consumer)
return sink


# 6. shuffle logics, hyper_cube_shuffle_logic is only used in HCAlgebra
left_deep_tree_shuffle_logic = [
ShuffleBeforeSetop(),
Expand Down Expand Up @@ -1875,6 +1922,7 @@ def fire(self, expr):
rules.OneToOne(algebra.OrderBy, MyriaInMemoryOrderBy),
rules.OneToOne(algebra.Limit, MyriaLimit),
rules.OneToOne(algebra.Sink, MyriaSink),
rules.OneToOne(algebra.Stream, MyriaStream),
rules.OneToOne(algebra.IDBController, MyriaIDBController),
]

Expand Down Expand Up @@ -1904,7 +1952,7 @@ class MyriaAlgebra(Algebra):
MyriaScanTemp,
MyriaFileScan,
MyriaEmptyRelation,
MyriaSingleton
MyriaSingleton,
)


Expand Down Expand Up @@ -2189,6 +2237,7 @@ def opt_rules(self, **kwargs):
[AddAppendTemp()],
break_communication,
idb_until_convergence(kwargs.get('async_ft')),
[ExpandStreamSink()],
]

if kwargs.get('add_splits', True):
Expand Down Expand Up @@ -2243,7 +2292,7 @@ def opt_rules(self, **kwargs):
rules.push_select,
distributed_group_by(MyriaGroupBy),
[rules.DeDupBroadcastInputs()],
hyper_cube_shuffle_logic
hyper_cube_shuffle_logic,
]

if kwargs.get('push_sql', False):
Expand All @@ -2252,7 +2301,8 @@ def opt_rules(self, **kwargs):
compile_grps_sequence = [
myriafy,
[AddAppendTemp()],
break_communication
break_communication,
[ExpandStreamSink()],
]

if kwargs.get('add_splits', True):
Expand Down Expand Up @@ -2430,7 +2480,7 @@ def compile_to_json(raw_query, logical_plan, physical_plan,
string and passed along unchanged."""

# Store/StoreTemp is a reasonable physical plan... for now.
root_ops = (algebra.Store, algebra.StoreTemp, algebra.Sink)
root_ops = (algebra.Store, algebra.StoreTemp, algebra.Sink, algebra.Stream)
if isinstance(physical_plan, root_ops):
physical_plan = algebra.Parallel([physical_plan])

Expand Down
9 changes: 9 additions & 0 deletions raco/myrial/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,15 @@ def sink(self, _id):
uses_set = self.ep.get_and_clear_uses_set()
self.cfg.add_op(op, None, uses_set)

def stream(self, _id):
alias_expr = ("ALIAS", _id)
child_op = self.ep.evaluate(alias_expr)

op = raco.algebra.Stream(child_op)

uses_set = self.ep.get_and_clear_uses_set()
self.cfg.add_op(op, None, uses_set)

def dump(self, _id):
alias_expr = ("ALIAS", _id)
child_op = self.ep.evaluate(alias_expr)
Expand Down
5 changes: 5 additions & 0 deletions raco/myrial/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,11 @@ def p_statement_sink(p):
'statement : SINK LPAREN unreserved_id RPAREN SEMI' # noqa
p[0] = ('SINK', p[3])

@staticmethod
def p_statement_stream(p):
'statement : STREAM LPAREN unreserved_id RPAREN SEMI' # noqa
p[0] = ('STREAM', p[3])

@staticmethod
def p_statement_dump(p):
'statement : DUMP LPAREN unreserved_id RPAREN SEMI'
Expand Down
2 changes: 1 addition & 1 deletion raco/myrial/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

builtins = ['EMPTY', 'WORKER_ID', 'SCAN', 'COUNTALL', 'COUNT', 'STORE',
'DIFF', 'CROSS', 'JOIN', 'UNION', 'UNIONALL', 'INTERSECT',
'DISTINCT', 'LIMIT', 'SINK', 'SAMPLESCAN', 'LIKE']
'DISTINCT', 'LIMIT', 'SINK', 'STREAM', 'SAMPLESCAN', 'LIKE']


# identifiers with special meaning; case-insensitive
Expand Down