Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 101 additions & 0 deletions bq_dbt/seeds/stg_seed__fake_phone_numbers.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
number,imei,country
1-979-213-9120 x313,36-395843-652482-1,Angola
794-554-1917 x8798,30-658888-080210-8,Tajikistan
1-254-794-5162 x3167,84-007696-722472-0,Bhutan
(549) 229-2115,00-692922-450556-4,Burundi
626.315.4964 x45075,06-157905-895141-8,Togo
535-369-3357 x67186,66-733723-874890-9,Northern Mariana Islands
601.890.4359 x432,79-753076-135508-3,Pitcairn Islands
(597) 640-2986 x590,14-179164-027572-7,Turks and Caicos Islands
751-303-0105,21-825259-821220-2,Moldova
(945) 408-8227 x8634,87-745413-011909-1,Bolivia
367-256-0102 x93522,02-171195-390872-7,Fiji
369.881.7060 x048,67-543591-513552-1,Saint Barthelemy
1-742-307-3260 x20710,32-081196-558962-8,Gabon
(775) 879-1960,42-767939-952906-0,Tuvalu
1-441-605-4989 x336,90-131016-500499-8,Somalia
(241) 235-1885 x329,35-882809-944537-3,Mexico
349-630-9524 x288,32-885511-756000-5,Indonesia
739-608-9732,90-618254-632656-3,Saint Helena
(807) 869-8032,64-433240-453165-8,Ireland
1-946-712-4032,70-899765-202285-8,Macao
(815) 270-5470 x50483,60-737029-012888-7,Bangladesh
751.354.9342 x8583,19-594221-414437-8,Albania
626.804.1662,31-857421-266516-1,Libyan Arab Jamahiriya
1-971-305-8770,33-380835-086928-9,Nepal
(400) 777-7647 x70579,73-232266-377546-1,Sao Tome and Principe
943-913-9554,99-629956-581438-5,Angola
357-903-3788 x6464,18-455294-222226-4,Saint Helena
1-520-954-2773 x24025,08-135775-952978-3,Ecuador
671-539-0252,60-443186-083647-4,Russian Federation
(982) 588-6601 x706,06-054513-097933-0,Saint Barthelemy
(594) 646-4089 x47990,67-017660-651029-2,San Marino
1-975-971-0161 x86695,98-777299-452614-4,Botswana
1-881-826-8532 x99275,53-791394-052980-9,France
586.215.3388 x8556,24-864230-852385-2,South Sudan
1-641-393-0989 x58396,21-042331-186149-6,Slovenia
1-347-328-5433,02-206147-511559-4,Zimbabwe
(845) 393-9756,07-010048-339619-1,"Virgin Islands, British"
1-818-366-6043 x093,74-275444-613017-6,French Guiana
(991) 343-6507 x248,11-305668-731308-6,Lithuania
1-840-966-6745 x91680,14-385866-468522-8,Iran
865-428-0127 x2267,24-583302-874397-8,Austria
321.351.2143 x7011,96-676012-626967-0,Liechtenstein
234-844-6917 x0401,39-714294-544137-7,Gibraltar
264.892.8687,96-359207-390497-2,Ghana
887-343-0310,09-837544-512270-7,Sri Lanka
626-491-8761 x7971,95-609701-555731-7,Egypt
525-887-8128 x9098,35-746727-799240-0,Belize
1-482-785-5707 x73471,77-654260-077387-6,Democratic Republic of the Congo
202-439-8151 x2211,26-693044-408545-7,Greece
(722) 449-3467 x854,47-441691-141916-1,Syrian Arab Republic
959-538-4065 x89950,11-872975-599867-4,South Sudan
1-269-600-3498,00-808445-288645-6,Brunei Darussalam
(978) 442-8646 x60335,51-899221-511077-9,Cameroon
(270) 627-9607 x2416,61-133194-928459-0,Bermuda
(394) 753-2826 x423,77-403630-602285-4,Falkland Islands (Malvinas)
(684) 283-8819,52-854594-246963-3,Curacao
224-592-3577,19-371571-646720-4,Papua New Guinea
354.631.3602 x617,36-561151-851871-0,Wallis and Futuna
943.561.6382 x885,86-446017-552188-0,Oman
(771) 240-9996 x60146,55-549465-083745-7,Iceland
1-404-900-1194 x71165,09-260672-603010-2,Saint Helena
407.845.5400 x60302,04-931459-437765-5,Turks and Caicos Islands
1-732-349-5099 x05038,99-075608-501263-0,Wallis and Futuna
(864) 873-7244,52-133714-704346-3,Thailand
684.222.0748 x0823,54-931261-407719-3,Belarus
(517) 811-7326 x26033,23-963051-403146-4,Saint Martin
1-835-629-2896 x995,07-074238-934754-5,Togo
508-222-2337 x532,46-819008-122335-0,Burkina Faso
803-809-7832 x4573,55-297346-939703-8,Republic of Korea
1-274-814-5260 x93305,68-624671-530710-4,Colombia
1-470-864-5761 x4688,48-048268-829004-7,Czechia
204-582-0544 x1975,81-034324-999969-3,"Virgin Islands, British"
408.549.2896 x6205,86-766470-246286-7,Australia
378-666-5825 x2983,47-058023-868640-8,Montserrat
1-606-681-2784 x531,62-657281-314886-5,Switzerland
336.645.5650 x27795,52-167604-349156-9,Suriname
1-608-783-3910 x70451,95-902287-606380-6,Uganda
550.459.2584 x75924,39-382059-610498-0,Saint Kitts and Nevis
817.404.4646 x624,37-165666-219717-7,Portugal
1-727-363-7358 x66374,40-041938-198356-6,Panama
(520) 322-4513 x0478,78-358147-876458-4,Comoros
1-698-234-8898 x376,58-804881-784613-1,Democratic People's Republic of Korea
1-268-808-8639,95-818413-897369-2,Sri Lanka
811-243-8196 x4429,89-911314-054714-7,Mayotte
1-421-960-6851 x469,32-951218-825031-8,Afghanistan
(331) 527-8187,72-418944-322299-3,Haiti
502-285-3321 x74111,18-676522-368683-7,Malta
(489) 547-7285,94-131142-500357-4,Brunei Darussalam
(469) 286-9502 x5101,38-790943-374568-9,Niger
458.821.0513 x265,92-313979-613662-0,Hong Kong
(334) 730-2595,62-547555-739788-1,Cayman Islands
1-490-293-2664 x0729,53-193554-305356-0,Heard Island and McDonald Islands
1-766-561-5035 x6697,56-954672-017051-8,Aland Islands
425.277.0557 x84679,10-725937-954878-1,Netherlands
798.320.9040 x19202,83-694232-342095-3,Qatar
204-226-0688 x74317,83-031989-754938-2,Thailand
533-754-2679 x5773,60-912574-982951-1,Guadeloupe
(998) 990-5020 x93304,21-389641-255469-7,"Bonaire, Sint Eustatius and Saba"
1-848-880-8839 x431,92-095249-685961-1,Saint Vincent and the Grenadines
1-463-353-9144 x10911,15-675488-246402-1,Liechtenstein
17 changes: 17 additions & 0 deletions dbt_deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from prefect import flow
from prefect_dbt import DbtCoreOperation

@flow
def dbt_flow():
dbt_job = DbtCoreOperation(
commands=["dbt run"],
project_dir="bq_dbt",
profiles_dir="bq_dbt",
)
dbt_job.run()


if __name__ == "__main__":
dbt_flow.serve(
cron="*/5 * * * *",
)
15 changes: 15 additions & 0 deletions dbt_flow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from prefect import flow
from prefect_dbt import DbtCoreOperation

@flow
def dbt_flow():
dbt_job = DbtCoreOperation(
commands=["dbt run"],
project_dir="bq_dbt",
profiles_dir="bq_dbt",
)
dbt_job.run()


if __name__ == "__main__":
dbt_flow()
Binary file added images/api-key.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/flow-run-graph.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/flow-run.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/key-create.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added images/schedules.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
187 changes: 187 additions & 0 deletions prefect_readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Prefect Readme

## Get started with Prefect and Codespaces

### Create a Prefect API key

Go to [Prefect Cloud](https://app.prefect.cloud/auth/sign-up) and create an account and workspace.

Once logged in, create an API key.

Navigate to the account dropdown in the top right, and select "API keys".

<img src="images/api-key.png" alt="isolated" width="200"/>

Once on the API Keys page, click the plus at the top of the page to open the "Create API Key" form. Enter a name and click "Create".

<img src="images/key-create.png" alt="isolated" width="400"/>

Copy and save your API key for later use.

### Start your Codespaces instance

Fork this repository and start a codespaces instance from it.

### Install Prefect and log in to Prefect Cloud

Connect to Codespaces and install `prefect` and `prefect-dbt`:

`pip install prefect prefect-dbt`

Once installed, log in to Prefect Cloud with your API key:

`prefect cloud login -k <your-api-key>`

Once you see the following message in your terminal, you're good to go!

`Authenticated with Prefect Cloud! Using workspace 'your-account/your-workspace'.`

## Run your first flow

### Write and execute a flow script

Clone this repository or create a file called `simple_flow.py` with the following contents:

```python
from prefect import flow, task


@flow(log_prints=True)
def simple_flow(name: str):
message = generate_message(name)
log_message(message)


@task
def generate_message(name: str):
return f"Welcome to Prefect, {name}!"


@task
def log_message(message: str):
print(message)


if __name__ == "__main__":
simple_flow("Kevin")
```

Replace the name "Kevin" on the last line with your own name, and save the file.
Run your flow by executing `python simple_flow.py` in your terminal.

Each time a function with a `@flow` decorator is called, a flow run is created.
Cmd+Click on the link generated in your terminal, or navigate to the "Runs" page in the [Prefect Cloud UI](app.prefect.cloud) and click on the **blue, generated flow run name** to view your flow run.

<img src="images/flow-run.png" alt="isolated" width="600"/>

### Understanding the flow run graph

Each flow run has an accompanying flow run graph.

<img src="images/flow-run-graph.png" alt="isolated" width="500"/>

At the center are **task runs**, a visual representation of `@task` decorated functions displayed at the time they were executed.
Task runs that share data are connected by an arrow. Since the `message` returned by the first task was passed as a parameter to the second task, `generate_message` points to `log_message`.

The colors across the bottom, which are also lightly shaded in the background of the graph, indicate the **state** of the flow run over time. This flow run went from `Pending` to `Running`, then to `Completed`. There are many other states, like `Scheduled`, `Paused`, `Failed`, and `Crashed`, each indicated by their own color.

Finally, the purple dots are **events**, which are small pieces of data emitted during a flow run that can trigger actions in other parts of the Prefect ecosystem.

**Task runs**, **states**, and **events** can all be clicked on for additional details.

### Create a dbt flow

dbt is supported though a Prefect integration package, `prefect-dbt`. You can execute `dbt run` against your existing dbt project by importing `DbtCoreOperation` from `prefect_dbt`.

In `dbt_flow.py`, a dbt job is defined by specifying a command and referencing the directory where the dbt project and profile are located relative to the flow script.

```python
from prefect import flow
from prefect_dbt import DbtCoreOperation

@flow
def dbt_flow():
dbt_job = DbtCoreOperation(
commands=["dbt run"],
project_dir="bq_dbt",
profiles_dir="bq_dbt",
)
dbt_job.run()


if __name__ == "__main__":
run_dbt_job()
```

Run your dbt flow by executing `python dbt_flow.py` in your terminal. Though this flow only runs your dbt job, it can easily be extended to do additional work, like calling APIs, transforming data, or running other dbt commands!

## Schedule your flow

### Create a deployment

In Prefect, flows can be scheduled by promoting them to **deployments**. Deployments are containers for instructions about how, where, and when your flows should run.

Creating a deployment for a flow is as simple as calling `.serve()` on your flow function.

`dbt_deployment.py` contains the same flow as before, but instead of calling it directly as a Python function, you'll deploy it inside your Codespaces instance to run once every 5 minutes.

```python
# Flow code above remains the same

if __name__ == "__main__":
dbt_flow.serve(
cron="*/5 * * * *",
)
```

Execute `python dbt_deployment.py` in your terminal to create your deployment. Cmd+Click on the link generated in your terminal to visit the `simple-flow` deployment page.

An auto-scheduled run for this deployment will appear under the "Runs" tab of the deployment page. Prefect deployments can have multiple schedules. You can disable all schedules with the top toggle, or disable schedules with their individual corresponding toggles.

<img src="images/schedules.png" alt="isolated" width="250"/>

Clicking the "Run"->"Quick run" buttons in the top right of the deployment page will create a scheduled run with a start time of _now_. Give it a try!

### The limitations of `.serve()`

Creating a deployment with `.serve()` is only useful in places where your flow code is already present and runnable as a script, like your local computer or Codespaces instance. This is because `.serve()` creates a flow runner to execute your flows right where you run your script. However, these environments don't persist forever — a served deployment won't run on your laptop if it's shut off!

`.serve()` is a convenient way to to get something off the ground and schedulable quickly, but its production readiness is limited. Keep this in mind as you think about building your data pipelines.

Press Ctrl+C in your terminal to shut down your served flow.

## Work Pools

To make your deployment truly resilient, it needs to be deployed to the cloud, where there are compute resources always ready to run your code. In Prefect, those execution environments are represented as **work pools**.

### Create a managed work pool

Go to the "Work Pools" page in the [Prefect Cloud UI](app.prefect.cloud) and click the plus button at the top of the page. Select "Prefect Managed" and name work your pool "managed-pool". Click "Next", then click "Create".

Let's deploy your `simple-flow` from earlier to your Prefect Managed work pool.
First, since this deployment will be running your flow in a remote execution environment where the code isn't currently present, we'll need to tell Prefect where to find your code. `flow.from_source()` allows you to specify a repository where your code is stored, as well as an `entrypoint`, which is the path to your `<flow_file>:<flow_function>` from the root of the repository.
Then all we need to do is swap `serve` with `deploy`, and add the `name="simple-flow-managed"` and `work_pool_name="managed-pool"` parameters.

```python
# Flow code above remains the same

if __name__ == "__main__":
flow.from_source(
source="https://github.com/Generation-Data/prefect-examples.git",
entrypoint="simple_flow.py:simple_flow",
).deploy(
name="simple-flow-managed",
work_pool_name="managed-pool",
cron="*/5 * * * *",
parameters={"name": "Kevin"}
)
```

Execute `python simple_deployment_managed.py` in your terminal to create your new work pool-based deployment. Cmd+Click on the link generated in your terminal to visit the `simple-flow-managed` deployment page. When this deployment is run, it'll be executed on always-available cloud infrastructure managed by Prefect.

Note that Prefect Managed work pools are free, but offer a limited number of compute hours per month. For unlimited compute, consider a Cloud Run v2 push work pool managed within your organization's GCP project.

Don't forget to shut off the schedule or delete your deployment to preserve your remaining Prefect Managed compute time.



24 changes: 24 additions & 0 deletions simple_deployment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from prefect import flow, task


@flow(log_prints=True)
def simple_flow(name: str):
message = generate_message(name)
log_message(message)


@task
def generate_message(name: str):
return f"Welcome to Prefect, {name}!"


@task
def log_message(message: str):
print(message)


if __name__ == "__main__":
simple_flow.serve(
cron="*/5 * * * *",
parameters={"name": "Kevin"}
)
29 changes: 29 additions & 0 deletions simple_deployment_managed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from prefect import flow, task


@flow(log_prints=True)
def simple_flow(name: str):
message = generate_message(name)
log_message(message)


@task
def generate_message(name: str):
return f"Welcome to Prefect, {name}!"


@task
def log_message(message: str):
print(message)


if __name__ == "__main__":
flow.from_source(
source="https://github.com/Generation-Data/prefect-examples.git",
entrypoint="simple_flow.py:simple_flow",
).deploy(
name="simple-flow-managed",
work_pool_name="managed-pool",
cron="*/5 * * * *",
parameters={"name": "Kevin"}
)
Loading