Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions deathstar/demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

class DeathstarDemo():
def __init__(self):
self.init_user = OpNode(user_op, InitClass())
self.init_hotel = OpNode(hotel_op, InitClass())
self.init_flight = OpNode(flight_op, InitClass())
self.init_user = OpNode(User, InitClass(), read_key_from="user_id")
self.init_hotel = OpNode(Hotel, InitClass(), read_key_from="key")
self.init_flight = OpNode(Flight, InitClass(), read_key_from="id")

def init_runtime(self, runtime, **kwargs):
self.runtime = runtime
Expand Down Expand Up @@ -139,7 +139,7 @@ def populate(self):
# populate users
self.users = [User(f"Cornell_{i}", str(i) * 10) for i in range(501)]
for user in self.users:
event = Event(self.init_user, [user.id], {"user_id": user.id, "password": user.password}, None)
event = Event(self.init_user, {"user_id": user.id, "password": user.password}, None)
self.runtime.send(event)

# populate hotels
Expand All @@ -150,7 +150,7 @@ def populate(self):
price = prices[i]
hotel = Hotel(str(i), 10, geo, rate, price)
self.hotels.append(hotel)
event = Event(self.init_hotel, [hotel.key],
event = Event(self.init_hotel,
{
"key": hotel.key,
"cap": hotel.cap,
Expand All @@ -163,13 +163,13 @@ def populate(self):
# populate flights
self.flights = [Flight(str(i), 10) for i in range(100)]
for flight in self.flights[:-1]:
event = Event(self.init_flight, [flight.id], {
event = Event(self.init_flight, {
"id": flight.id,
"cap": flight.cap
}, None)
self.runtime.send(event)
flight = self.flights[-1]
event = Event(self.init_flight, [flight.id], {
event = Event(self.init_flight, {
"id": flight.id,
"cap": flight.cap
}, None)
Expand All @@ -192,7 +192,7 @@ def search_hotel():
lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0

# We don't really use the in_date, out_date information
return Event(search_op.dataflow.entry, ["tempkey"], {"lat": lat, "lon": lon}, search_op.dataflow)
return Event(search_op.dataflow.entry, {"lat": lat, "lon": lon}, search_op.dataflow)

def recommend(req_param=None):
if req_param is None:
Expand All @@ -205,13 +205,15 @@ def recommend(req_param=None):
lat = 38.0235 + (random.randint(0, 481) - 240.5) / 1000.0
lon = -122.095 + (random.randint(0, 325) - 157.0) / 1000.0

return Event(recommend_op.dataflow.entry, ["tempkey"], {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow)

return Event(recommend_op.dataflow.entry, {"requirement": req_param, "lat": lat, "lon": lon}, recommend_op.dataflow)

def user_login(succesfull=True):
user_id = random.randint(0, 500)
username = f"Cornell_{user_id}"
password = str(user_id) * 10 if succesfull else ""
return Event(OpNode(user_op, InvokeMethod("login")), [username], {"password": password}, None)
return Event(OpNode(User, InvokeMethod("login"), read_key_from="user_key"), {"user_key": username, "password": password}, None)


def reserve():
hotel_id = random.randint(0, 99)
Expand All @@ -221,7 +223,14 @@ def reserve():
# user.order(flight, hotel)
user_id = "Cornell_" + str(random.randint(0, 500))

return Event(user_op.dataflows["order"].entry, [user_id], {"flight": str(flight_id), "hotel": str(hotel_id)}, user_op.dataflows["order"])
return Event(
user_op.dataflows["order"].entry,
{
"user_key": user_id,
"flight_key": str(flight_id),
"hotel_key": str(hotel_id)
},
user_op.dataflows["order"])

def deathstar_workload_generator():
search_ratio = 0.6
Expand Down Expand Up @@ -261,7 +270,7 @@ def benchmark_runner(proc_num) -> dict[int, dict]:
time.sleep(sleep_time)
event = next(deathstar_generator)
# func_name = event.dataflow.name if event.dataflow is not None else "login" # only login has no dataflow
key = event.key_stack[0]
# key = event.key_stack[0]
# params = event.variable_map
client.send(event)
# futures[event._id] = {"event": f'{func_name} {key}->{params}'}
Expand Down
3 changes: 1 addition & 2 deletions deathstar/entities/flight.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ def reserve(self) -> bool:

#### COMPILED FUNCTIONS (ORACLE) #####

def reserve_compiled(variable_map: dict[str, Any], state: Flight, key_stack: list[str]) -> Any:
key_stack.pop()
def reserve_compiled(variable_map: dict[str, Any], state: Flight) -> Any:
if state.cap <= 0:
return False
return True
Expand Down
11 changes: 4 additions & 7 deletions deathstar/entities/hotel.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, Optional
from typing import Any
from cascade.dataflow.operator import StatefulOperator
from geopy.distance import distance

Expand Down Expand Up @@ -59,18 +59,15 @@ def __key__(self) -> int:

#### COMPILED FUNCTIONS (ORACLE) #####

def reserve_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any:
key_stack.pop()
def reserve_compiled(variable_map: dict[str, Any], state: Hotel) -> Any:
if state.cap <= 0:
return False
return True

def get_geo_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any:
key_stack.pop()
def get_geo_compiled(variable_map: dict[str, Any], state: Hotel) -> Any:
return state.geo

def get_price_compiled(variable_map: dict[str, Any], state: Hotel, key_stack: list[str]) -> Any:
key_stack.pop()
def get_price_compiled(variable_map: dict[str, Any], state: Hotel) -> Any:
return state.price

hotel_op = StatefulOperator(
Expand Down
60 changes: 26 additions & 34 deletions deathstar/entities/recommendation.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Literal
from cascade.dataflow.dataflow import CollectNode, DataFlow, Edge, InvokeMethod, OpNode, SelectAllNode
from cascade.dataflow.dataflow import CollectNode, DataFlow, Edge, InvokeMethod, OpNode, SelectAllNode, StatelessOpNode
from cascade.dataflow.operator import StatelessOperator
from deathstar.entities.hotel import Geo, Hotel, hotel_op
from deathstar.entities.hotel import Geo, Hotel

# Stateless
class Recommendation():
Expand All @@ -23,51 +23,43 @@ def get_recommendations(requirement: Literal["distance", "price"], lat: float, l

#### COMPILED FUNCTIONS (ORACLE) ####

def get_recs_if_cond(variable_map: dict[str, Any], key_stack: list[str]):
def get_recs_if_cond(variable_map: dict[str, Any]):
return variable_map["requirement"] == "distance"

# list comprehension entry
def get_recs_if_body_0(variable_map: dict[str, Any], key_stack: list[str]):
hotel_key = key_stack[-1]
# The body will need the hotel key (actually, couldn't we just take the top of the key stack again?)
variable_map["hotel_key"] = hotel_key
# The next node (Hotel.get_geo) will need the hotel key
key_stack.append(hotel_key)
def get_recs_if_body_0(variable_map: dict[str, Any]):
pass


# list comprehension body
def get_recs_if_body_1(variable_map: dict[str, Any], key_stack: list[str]):
def get_recs_if_body_1(variable_map: dict[str, Any]):
hotel_geo: Geo = variable_map["hotel_geo"]
lat, lon = variable_map["lat"], variable_map["lon"]
dist = hotel_geo.distance_km(lat, lon)
return (dist, variable_map["hotel_key"])

# after list comprehension
def get_recs_if_body_2(variable_map: dict[str, Any], key_stack: list[str]):
def get_recs_if_body_2(variable_map: dict[str, Any]):
distances = variable_map["distances"]
min_dist = min(distances, key=lambda x: x[0])[0]
variable_map["res"] = [hotel for dist, hotel in distances if dist == min_dist]


def get_recs_elif_cond(variable_map: dict[str, Any], key_stack: list[str]):
def get_recs_elif_cond(variable_map: dict[str, Any]):
return variable_map["requirement"] == "price"


# list comprehension entry
def get_recs_elif_body_0(variable_map: dict[str, Any], key_stack: list[str]):
hotel_key = key_stack[-1]
# The body will need the hotel key (actually, couldn't we just take the top of the key stack again?)
variable_map["hotel_key"] = hotel_key
# The next node (Hotel.get_geo) will need the hotel key
key_stack.append(hotel_key)
def get_recs_elif_body_0(variable_map: dict[str, Any]):
pass


# list comprehension body
def get_recs_elif_body_1(variable_map: dict[str, Any], key_stack: list[str]):
def get_recs_elif_body_1(variable_map: dict[str, Any]):
return (variable_map["hotel_price"], variable_map["hotel_key"])

# after list comprehension
def get_recs_elif_body_2(variable_map: dict[str, Any], key_stack: list[str]):
def get_recs_elif_body_2(variable_map: dict[str, Any]):
prices = variable_map["prices"]
min_price = min(prices, key=lambda x: x[0])[0]
variable_map["res"] = [hotel for price, hotel in prices if price == min_price]
Expand All @@ -76,7 +68,7 @@ def get_recs_elif_body_2(variable_map: dict[str, Any], key_stack: list[str]):

# a future optimization might instead duplicate this piece of code over the two
# branches, in order to reduce the number of splits by one
def get_recs_final(variable_map: dict[str, Any], key_stack: list[str]):
def get_recs_final(variable_map: dict[str, Any]):
return variable_map["res"]


Expand All @@ -93,24 +85,24 @@ def get_recs_final(variable_map: dict[str, Any], key_stack: list[str]):
}, None)

df = DataFlow("get_recommendations")
n1 = OpNode(recommend_op, InvokeMethod("get_recs_if_cond"), is_conditional=True)
n2 = OpNode(recommend_op, InvokeMethod("get_recs_if_body_0"))
n3 = OpNode(hotel_op, InvokeMethod("get_geo"), assign_result_to="hotel_geo")
n4 = OpNode(recommend_op, InvokeMethod("get_recs_if_body_1"), assign_result_to="distance")
n1 = StatelessOpNode(recommend_op, InvokeMethod("get_recs_if_cond"), is_conditional=True)
n2 = StatelessOpNode(recommend_op, InvokeMethod("get_recs_if_body_0"))
n3 = OpNode(Hotel, InvokeMethod("get_geo"), assign_result_to="hotel_geo", read_key_from="hotel_key")
n4 = StatelessOpNode(recommend_op, InvokeMethod("get_recs_if_body_1"), assign_result_to="distance")
n5 = CollectNode("distances", "distance")
n6 = OpNode(recommend_op, InvokeMethod("get_recs_if_body_2"))
ns1 = SelectAllNode(Hotel, n5)
n6 = StatelessOpNode(recommend_op, InvokeMethod("get_recs_if_body_2"))
ns1 = SelectAllNode(Hotel, n5, assign_key_to="hotel_key")

n7 = OpNode(recommend_op, InvokeMethod("get_recs_elif_cond"), is_conditional=True)
n8 = OpNode(recommend_op, InvokeMethod("get_recs_elif_body_0"))
n9 = OpNode(hotel_op, InvokeMethod("get_price"), assign_result_to="hotel_price")
n10 = OpNode(recommend_op, InvokeMethod("get_recs_elif_body_1"), assign_result_to="price")
n7 = StatelessOpNode(recommend_op, InvokeMethod("get_recs_elif_cond"), is_conditional=True)
n8 = StatelessOpNode(recommend_op, InvokeMethod("get_recs_elif_body_0"))
n9 = OpNode(Hotel, InvokeMethod("get_price"), assign_result_to="hotel_price", read_key_from="hotel_key")
n10 = StatelessOpNode(recommend_op, InvokeMethod("get_recs_elif_body_1"), assign_result_to="price")
n11 = CollectNode("prices", "price")
n12 = OpNode(recommend_op, InvokeMethod("get_recs_elif_body_2"))
ns2 = SelectAllNode(Hotel, n11)
n12 = StatelessOpNode(recommend_op, InvokeMethod("get_recs_elif_body_2"))
ns2 = SelectAllNode(Hotel, n11, assign_key_to="hotel_key")


n13 = OpNode(recommend_op, InvokeMethod("get_recs_final"))
n13 = StatelessOpNode(recommend_op, InvokeMethod("get_recs_final"))

df.add_edge(Edge(n1, ns1, if_conditional=True))
df.add_edge(Edge(n1, n7, if_conditional=False))
Expand Down
32 changes: 12 additions & 20 deletions deathstar/entities/search.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Any
from cascade.dataflow.dataflow import CollectNode, DataFlow, Edge, InvokeMethod, OpNode, SelectAllNode
from cascade.dataflow.dataflow import CollectNode, DataFlow, Edge, InvokeMethod, OpNode, SelectAllNode, StatelessOpNode
from cascade.dataflow.operator import StatelessOperator
from deathstar.entities.hotel import Geo, Hotel, hotel_op

Expand All @@ -21,19 +21,11 @@ def nearby(lat: float, lon: float, in_date: int, out_date: int):


# predicate 1
def search_nearby_compiled_0(variable_map: dict[str, Any], key_stack: list[str]):
# We assume that the top of the key stack is the hotel key.
# This assumption holds if the node before this one is a correctly
# configure SelectAllNode.

hotel_key = key_stack[-1]
# The body will need the hotel key (actually, couldn't we just take the top of the key stack again?)
variable_map["hotel_key"] = hotel_key
# The next node (Hotel.get_geo) will need the hotel key
key_stack.append(hotel_key)
def search_nearby_compiled_0(variable_map: dict[str, Any]):
pass

# predicate 2
def search_nearby_compiled_1(variable_map: dict[str, Any], key_stack: list[str]):
def search_nearby_compiled_1(variable_map: dict[str, Any]):
hotel_geo: Geo = variable_map["hotel_geo"]
lat, lon = variable_map["lat"], variable_map["lon"]
dist = hotel_geo.distance_km(lat, lon)
Expand All @@ -42,11 +34,11 @@ def search_nearby_compiled_1(variable_map: dict[str, Any], key_stack: list[str])


# body
def search_nearby_compiled_2(variable_map: dict[str, Any], key_stack: list[str]):
def search_nearby_compiled_2(variable_map: dict[str, Any]):
return (variable_map["dist"], variable_map["hotel_key"])

# next line
def search_nearby_compiled_3(variable_map: dict[str, Any], key_stack: list[str]):
def search_nearby_compiled_3(variable_map: dict[str, Any]):
distances = variable_map["distances"]
hotels = [hotel for dist, hotel in sorted(distances)[:5]]
return hotels
Expand All @@ -60,14 +52,14 @@ def search_nearby_compiled_3(variable_map: dict[str, Any], key_stack: list[str])
}, None)

df = DataFlow("search_nearby")
n1 = OpNode(search_op, InvokeMethod("search_nearby_compiled_0"))
n2 = OpNode(hotel_op, InvokeMethod("get_geo"), assign_result_to="hotel_geo")
n3 = OpNode(search_op, InvokeMethod("search_nearby_compiled_1"), is_conditional=True)
n4 = OpNode(search_op, InvokeMethod("search_nearby_compiled_2"), assign_result_to="search_body")
n1 = StatelessOpNode(search_op, InvokeMethod("search_nearby_compiled_0"))
n2 = OpNode(Hotel, InvokeMethod("get_geo"), assign_result_to="hotel_geo", read_key_from="hotel_key")
n3 = StatelessOpNode(search_op, InvokeMethod("search_nearby_compiled_1"), is_conditional=True)
n4 = StatelessOpNode(search_op, InvokeMethod("search_nearby_compiled_2"), assign_result_to="search_body")
n5 = CollectNode("distances", "search_body")
n0 = SelectAllNode(Hotel, n5)
n0 = SelectAllNode(Hotel, n5, assign_key_to="hotel_key")

n6 = OpNode(search_op, InvokeMethod("search_nearby_compiled_3"))
n6 = StatelessOpNode(search_op, InvokeMethod("search_nearby_compiled_3"))

df.add_edge(Edge(n0, n1))
df.add_edge(Edge(n1, n2))
Expand Down
33 changes: 15 additions & 18 deletions deathstar/entities/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,22 @@ def order(self, flight: Flight, hotel: Hotel):

#### COMPILED FUNCTIONS (ORACLE) #####

def check_compiled(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any:
key_stack.pop()
def check_compiled(variable_map: dict[str, Any], state: User) -> Any:
return state.password == variable_map["password"]

def order_compiled_entry_0(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any:
key_stack.append(variable_map["hotel"])
def order_compiled_entry_0(variable_map: dict[str, Any], state: User) -> Any:
pass

def order_compiled_entry_1(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any:
key_stack.append(variable_map["flight"])
def order_compiled_entry_1(variable_map: dict[str, Any], state: User) -> Any:
pass

def order_compiled_if_cond(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any:
def order_compiled_if_cond(variable_map: dict[str, Any], state: User) -> Any:
return variable_map["hotel_reserve"] and variable_map["flight_reserve"]

def order_compiled_if_body(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any:
key_stack.pop()
def order_compiled_if_body(variable_map: dict[str, Any], state: User) -> Any:
return True

def order_compiled_else_body(variable_map: dict[str, Any], state: User, key_stack: list[str]) -> Any:
key_stack.pop()
def order_compiled_else_body(variable_map: dict[str, Any], state: User) -> Any:
return False

user_op = StatefulOperator(
Expand All @@ -59,13 +56,13 @@ def order_compiled_else_body(variable_map: dict[str, Any], state: User, key_stac
# will try to automatically parallelize this.
# There is also no user entry (this could also be an optimization)
df = DataFlow("user_order")
n0 = OpNode(user_op, InvokeMethod("order_compiled_entry_0"))
n1 = OpNode(hotel_op, InvokeMethod("reserve"), assign_result_to="hotel_reserve")
n2 = OpNode(user_op, InvokeMethod("order_compiled_entry_1"))
n3 = OpNode(flight_op, InvokeMethod("reserve"), assign_result_to="flight_reserve")
n4 = OpNode(user_op, InvokeMethod("order_compiled_if_cond"), is_conditional=True)
n5 = OpNode(user_op, InvokeMethod("order_compiled_if_body"))
n6 = OpNode(user_op, InvokeMethod("order_compiled_else_body"))
n0 = OpNode(User, InvokeMethod("order_compiled_entry_0"), read_key_from="user_key")
n1 = OpNode(Hotel, InvokeMethod("reserve"), assign_result_to="hotel_reserve", read_key_from="hotel_key")
n2 = OpNode(User, InvokeMethod("order_compiled_entry_1"), read_key_from="user_key")
n3 = OpNode(Flight, InvokeMethod("reserve"), assign_result_to="flight_reserve", read_key_from="flight_key")
n4 = OpNode(User, InvokeMethod("order_compiled_if_cond"), is_conditional=True, read_key_from="user_key")
n5 = OpNode(User, InvokeMethod("order_compiled_if_body"), read_key_from="user_key")
n6 = OpNode(User, InvokeMethod("order_compiled_else_body"), read_key_from="user_key")

df.add_edge(Edge(n0, n1))
df.add_edge(Edge(n1, n2))
Expand Down
Loading
Loading