A comprehensive data pipeline and analysis project exploring Michelin-starred restaurants worldwide.
View our interactive dashboard: Michelin Star Explorer Dashboard
- Data is collected and processed through our ETL pipeline
- Results are stored in both Google BigQuery and MongoDB
- Interactive visualizations are created using Google Looker Studio
- Dashboard pulls data from both Google BigQuery and MongoDB for comprehensive analysis
This project is an Apache Airflow DAG that orchestrates tasks related to fetching, processing, and storing restaurant data in a MongoDB database.
-
Environment Setup:
- Loads environment variables using
dotenv, specifically fetching theGOOGLE_PLACES_API_KEYrequired for Google Places API calls.
- Loads environment variables using
-
Function Definitions:
get_place_rating(name, address): Fetches the Google Places rating and user ratings total for a given restaurant name and address.json_to_mongo(json_data): Inserts or updates restaurant data in a MongoDB collection.csv_to_json(csv_data): Converts CSV data into JSON format.call_api(): Fetches CSV data from a specified GitHub URL containing restaurant information.task_dag(): Calls the API, converts the CSV data to JSON, and uploads it to MongoDB.google_dag(): Processes the CSV data to fetch Google ratings for each restaurant, updates the DataFrame, and uploads the results to MongoDB.aggregate_michelin_data(): Aggregates restaurant data from MongoDB, calculating average ratings and counts of Michelin stars, and stores the results in a new collection.
-
DAG Definition:
- The DAG is defined with default arguments, including the owner, start date, and retry settings.
- Scheduled to run daily.
-
Task Definitions:
- Several tasks are defined using
PythonOperator:api_call_task: Executes thetask_dagfunction.google_dag_task: Executes thegoogle_dagfunction.aggregation_task: Executes theaggregate_michelin_datafunction.aggregation_cuisine_task: Executes theaggregate_top_cuisines_by_ratingfunction (imported from another script).
- Several tasks are defined using
-
Task Dependencies:
- The tasks are set to run in a specific order:
api_call_taskruns first, followed bygoogle_dag_task, and then bothaggregation_taskandaggregation_cuisine_taskrun in parallel.
- The tasks are set to run in a specific order:
-
Clone the Repository:
git clone git@github.com:yourusername/michelin_star_explorer.git cd michelin_star_explorer -
Create a Virtual Environment (optional but recommended):
python -m venv venv source venv/bin/activate # On Windows use `venv\Scripts\activate`
-
Install Required Packages:
pip install -r requirements.txt
-
Set Up Environment Variables:
- Create a
.envfile in the root directory of the project and add your Google Places API key:GOOGLE_PLACES_API_KEY=your_google_places_api_key_here
- Create a
-
Set Up MongoDB:
- Ensure you have MongoDB installed and running on your local machine or use a cloud MongoDB service.
- Update the
MONGO_URIin the code if necessary.
-
Start Apache Airflow:
- Initialize the database:
airflow db init
- Start the web server:
airflow webserver --port 8080
- In a new terminal, start the scheduler:
airflow scheduler
- Initialize the database:
-
Access the Airflow UI:
- Open your web browser and go to
http://localhost:8080. - You should see the
daily_api_callDAG listed.
- Open your web browser and go to
-
Trigger the DAG:
- Click on the DAG name to view its details.
- You can manually trigger the DAG by clicking the "Trigger DAG" button.
-
Monitor the Tasks:
- You can monitor the progress of each task in the Airflow UI.