From ad977640d3f0dcf552c0f4b374cbf6e1f91bdddd Mon Sep 17 00:00:00 2001 From: Tobin Baker Date: Wed, 14 Sep 2016 17:05:47 -0700 Subject: [PATCH] first draft --- raco/algebra.py | 20 +++++++++++++ raco/backends/myria/myria.py | 58 +++++++++++++++++++++++++++++++++--- raco/myrial/interpreter.py | 9 ++++++ raco/myrial/parser.py | 5 ++++ raco/myrial/scanner.py | 2 +- 5 files changed, 89 insertions(+), 5 deletions(-) diff --git a/raco/algebra.py b/raco/algebra.py index 209f34c9..a8be4db8 100644 --- a/raco/algebra.py +++ b/raco/algebra.py @@ -1941,6 +1941,26 @@ def __repr__(self): return "{op}({pl!r})".format(op=self.opname(), pl=self.input) +class Stream(UnaryOperator): + + """Stream query results back to the client.""" + + def __init__(self, input=None): + UnaryOperator.__init__(self, input) + + def num_tuples(self): + return self.input.num_tuples() + + def partitioning(self): + return self.input.partitioning() + + def shortStr(self): + return "{op}".format(op=self.opname()) + + def __repr__(self): + return "{op}({pl!r})".format(op=self.opname(), pl=self.input) + + class Parallel(NaryOperator): """Execute a set of independent plans in parallel.""" diff --git a/raco/backends/myria/myria.py b/raco/backends/myria/myria.py index c065cab2..bf9075a1 100644 --- a/raco/backends/myria/myria.py +++ b/raco/backends/myria/myria.py @@ -388,6 +388,33 @@ def compileme(self, inputid): } +class MyriaStreamingSink(algebra.Stream, MyriaOperator): + + """A Myria StreamingSink""" + + def __init__(self, input): + algebra.UnaryOperator.__init__(self, input) + + def num_tuples(self): + return self.input.num_tuples() + + def partitioning(self): + # TODO: have a way to say it is on a specific worker + return RepresentationProperties() + + def shortStr(self): + return "%s" % self.opname() + + def compileme(self, inputid): + return { + "opType": "StreamingSink", + "argChild": inputid, + } + + def __repr__(self): + return "{op}({inp!r})".format(op=self.opname(), inp=self.input) + + class MyriaAppendTemp(algebra.AppendTemp, MyriaOperator): def compileme(self, inputid): @@ -622,6 +649,14 @@ def compileme(self, inputid): raise NotImplementedError('shouldn''t ever get here, should be turned into CP-CC pair') # noqa +class MyriaStream(algebra.Stream, MyriaOperator): + + """Represents a streaming sink operator""" + + def compileme(self, inputid): + raise NotImplementedError('shouldn''t ever get here, should be turned into CP-CC-StreamingSink triple') # noqa + + class MyriaDupElim(algebra.Distinct, MyriaOperator): """Represents duplicate elimination""" @@ -1838,6 +1873,18 @@ def fire(self, expr): return expr +class ExpandStreamSink(rules.Rule): + + def fire(self, expr): + if not isinstance(expr, MyriaStream): + return expr + + producer = MyriaCollectProducer(expr.input, None) + consumer = MyriaCollectConsumer(producer) + sink = MyriaStreamingSink(consumer) + return sink + + # 6. shuffle logics, hyper_cube_shuffle_logic is only used in HCAlgebra left_deep_tree_shuffle_logic = [ ShuffleBeforeSetop(), @@ -1875,6 +1922,7 @@ def fire(self, expr): rules.OneToOne(algebra.OrderBy, MyriaInMemoryOrderBy), rules.OneToOne(algebra.Limit, MyriaLimit), rules.OneToOne(algebra.Sink, MyriaSink), + rules.OneToOne(algebra.Stream, MyriaStream), rules.OneToOne(algebra.IDBController, MyriaIDBController), ] @@ -1904,7 +1952,7 @@ class MyriaAlgebra(Algebra): MyriaScanTemp, MyriaFileScan, MyriaEmptyRelation, - MyriaSingleton + MyriaSingleton, ) @@ -2189,6 +2237,7 @@ def opt_rules(self, **kwargs): [AddAppendTemp()], break_communication, idb_until_convergence(kwargs.get('async_ft')), + [ExpandStreamSink()], ] if kwargs.get('add_splits', True): @@ -2243,7 +2292,7 @@ def opt_rules(self, **kwargs): rules.push_select, distributed_group_by(MyriaGroupBy), [rules.DeDupBroadcastInputs()], - hyper_cube_shuffle_logic + hyper_cube_shuffle_logic, ] if kwargs.get('push_sql', False): @@ -2252,7 +2301,8 @@ def opt_rules(self, **kwargs): compile_grps_sequence = [ myriafy, [AddAppendTemp()], - break_communication + break_communication, + [ExpandStreamSink()], ] if kwargs.get('add_splits', True): @@ -2430,7 +2480,7 @@ def compile_to_json(raw_query, logical_plan, physical_plan, string and passed along unchanged.""" # Store/StoreTemp is a reasonable physical plan... for now. - root_ops = (algebra.Store, algebra.StoreTemp, algebra.Sink) + root_ops = (algebra.Store, algebra.StoreTemp, algebra.Sink, algebra.Stream) if isinstance(physical_plan, root_ops): physical_plan = algebra.Parallel([physical_plan]) diff --git a/raco/myrial/interpreter.py b/raco/myrial/interpreter.py index ed34f418..f9804275 100644 --- a/raco/myrial/interpreter.py +++ b/raco/myrial/interpreter.py @@ -438,6 +438,15 @@ def sink(self, _id): uses_set = self.ep.get_and_clear_uses_set() self.cfg.add_op(op, None, uses_set) + def stream(self, _id): + alias_expr = ("ALIAS", _id) + child_op = self.ep.evaluate(alias_expr) + + op = raco.algebra.Stream(child_op) + + uses_set = self.ep.get_and_clear_uses_set() + self.cfg.add_op(op, None, uses_set) + def dump(self, _id): alias_expr = ("ALIAS", _id) child_op = self.ep.evaluate(alias_expr) diff --git a/raco/myrial/parser.py b/raco/myrial/parser.py index 37acf8a7..70c51900 100644 --- a/raco/myrial/parser.py +++ b/raco/myrial/parser.py @@ -565,6 +565,11 @@ def p_statement_sink(p): 'statement : SINK LPAREN unreserved_id RPAREN SEMI' # noqa p[0] = ('SINK', p[3]) + @staticmethod + def p_statement_stream(p): + 'statement : STREAM LPAREN unreserved_id RPAREN SEMI' # noqa + p[0] = ('STREAM', p[3]) + @staticmethod def p_statement_dump(p): 'statement : DUMP LPAREN unreserved_id RPAREN SEMI' diff --git a/raco/myrial/scanner.py b/raco/myrial/scanner.py index 9d81514e..125f0605 100644 --- a/raco/myrial/scanner.py +++ b/raco/myrial/scanner.py @@ -19,7 +19,7 @@ builtins = ['EMPTY', 'WORKER_ID', 'SCAN', 'COUNTALL', 'COUNT', 'STORE', 'DIFF', 'CROSS', 'JOIN', 'UNION', 'UNIONALL', 'INTERSECT', - 'DISTINCT', 'LIMIT', 'SINK', 'SAMPLESCAN', 'LIKE'] + 'DISTINCT', 'LIMIT', 'SINK', 'STREAM', 'SAMPLESCAN', 'LIKE'] # identifiers with special meaning; case-insensitive