Skip to content

anikettuli/Distributed-Weather-Pipeline

Repository files navigation

Distributed Weather Data Pipeline

1. Overview

This project implements a distributed data pipeline that simulates the collection, processing, and storage of weather data from multiple weather stations. It leverages several key technologies including Docker, Kafka, MySQL, HDFS, and Protocol Buffers to create a scalable and resilient system.

The primary goal is to stream weather records (temperature readings) into a Kafka topic, consume this data, and store it in Parquet format within HDFS for long-term storage and potential future analysis, such as generating web-based dashboards or performing climate studies.

Pipeline Diagram

2. Why This Project?

  • Scalable Data Ingestion: Demonstrates how to handle continuous streams of data from multiple sources.
  • Resilient Processing: Implements fault tolerance, particularly in the consumer, to ensure data is not lost or reprocessed unnecessarily in case of crashes.
  • Big Data Technologies: Provides hands-on experience with industry-standard tools like Kafka for messaging, HDFS for distributed storage, and Parquet for efficient columnar storage.
  • Data Serialization: Uses Protocol Buffers for efficient and language-agnostic data serialization.
  • Microservices Architecture: The system is broken down into distinct services managed by Docker Compose, reflecting a microservices-like approach.

3. How It Works

The pipeline consists of several stages:

  1. Data Generation:

    • A Python script (src/weather_generator.py) continuously generates simulated daily weather data (date, temperature, station ID) for multiple imaginary stations, starting from January 1, 2000.
    • This raw data is inserted into a temperatures table in a MySQL database.
  2. Data Streaming (Kafka Producer):

    • The src/producer.py script acts as a Kafka producer.
    • It initializes a Kafka topic named temperatures (with 4 partitions by default).
    • It continuously polls the MySQL temperatures table for new records that haven't been processed yet (based on an incrementing id column).
    • Each weather record is serialized into a Report message using Protocol Buffers (defined in src/report.proto).
    • The serialized messages are sent to the temperatures Kafka topic. The station_id is used as the message key, which helps Kafka in distributing messages to partitions (potentially based on the key).
    • The producer is configured for retries and requires acknowledgment from all in-sync replicas to enhance data durability.
  3. Data Consumption & Storage (Kafka Consumer):

    • The src/consumer.py script is a Kafka consumer. It is designed to be run as multiple instances, each assigned to a specific partition of the temperatures topic (manual partition assignment).
    • It reads messages in batches from its assigned partition.
    • The consumed messages (protobuf bytes) are deserialized back into Report objects.
    • Each batch of data is converted into a Pandas DataFrame and then written as a Parquet file to HDFS in the /data/ directory. Files are named partition-N-batch-M.parquet.
    • Atomic Writes: Parquet files are written atomically to HDFS. Data is first written to a temporary file (e.g., ....parquet.tmp), and upon successful completion, this temporary file is renamed to its final name. This prevents other applications from reading incomplete or corrupted files.
    • Crash Recovery:
      • After successfully writing a Parquet batch to HDFS, the consumer writes a checkpoint JSON file (e.g., partition-N.json) to the local disk of the Kafka container (at the root /). This file stores the batch_id of the last successfully processed batch and the next Kafka offset to read from for that partition.
      • When a consumer instance starts, it checks for its corresponding checkpoint file. If the file exists, the consumer loads the offset and batch_id, seeks to that offset in its assigned Kafka partition, and resumes processing from the next batch. This ensures that if a consumer crashes, it can restart without losing data or reprocessing data that has already been stored in HDFS. If no checkpoint file is found, it starts from the beginning of the partition.
  4. Debugging:

    • The src/debug.py script is a simple Kafka consumer that subscribes to the temperatures topic (using automatic partition assignment).
    • It deserializes and prints the messages it receives to the console, along with the partition number. This is useful for verifying that the producer is sending data correctly and that messages are flowing through Kafka.
  5. Infrastructure:

    • Docker & Docker Compose: The entire system is containerized using Docker. docker-compose.yml defines and manages the following services:
      • kafka: A single Kafka broker. The src directory (containing Python scripts and protobuf definitions) is mounted into this container.
      • mysql: A MySQL database server to store the raw weather data.
      • nn (namenode): The HDFS namenode, which manages the HDFS filesystem metadata.
      • dn (datanode): HDFS datanodes (typically 3 replicas), which store the actual data blocks.
    • Dockerfiles: Separate Dockerfiles (Dockerfile.datanode, Dockerfile.hdfs, Dockerfile.kafka, Dockerfile.mysql, Dockerfile.namenode) are provided to build the custom Docker images for each service, ensuring all necessary dependencies and configurations are included.
    • The PROJECT environment variable (e.g., p7) is used for naming Docker containers and services, allowing for easier management and avoiding naming conflicts.

4. File & Directory Structure

.
├── diagram.png                 # Visual architecture of the pipeline
├── docker-compose.yml          # Defines and configures the multi-container application
├── Dockerfile.datanode         # Docker build instructions for HDFS DataNode
├── Dockerfile.hdfs             # (Likely a base or common HDFS image setup)
├── Dockerfile.kafka            # Docker build instructions for Kafka & Python environment
├── Dockerfile.mysql            # Docker build instructions for MySQL
├── Dockerfile.namenode         # Docker build instructions for HDFS NameNode
├── README.md                   # This file
├── run_producer.sh             # Example shell script to run the producer (if provided)
└── src/                        # Source code directory
    ├── consumer.py             # Kafka consumer: reads from Kafka, writes to HDFS, handles recovery
    ├── debug.py                # Kafka consumer: for debugging, prints messages to console
    ├── producer.py             # Kafka producer: reads from MySQL, sends to Kafka
    ├── report_pb2.py           # Python code generated by protoc from report.proto
    ├── report.proto            # Protocol Buffer definition for weather Report messages
    ├── requirements.txt        # Python package dependencies
    └── weather_generator.py    # Generates simulated weather data and writes to MySQL

5. Key Files Explained

  • docker-compose.yml: Orchestrates the services (Kafka, MySQL, HDFS NameNode, HDFS DataNodes). It defines container names, image sources, volume mounts (e.g., mounting ./src into the Kafka container), environment variables, and resource limits.
  • Dockerfile.*: Each Dockerfile specifies the steps to build the Docker image for a particular service, including installing dependencies (e.g., Java for HDFS/Kafka, Python libraries, MySQL server).
  • src/weather_generator.py:
    • Connects to the MySQL database (service name mysql within the Docker network).
    • Enters an infinite loop, generating weather data (date, degrees, station ID) for stations A-J.
    • Inserts each generated record into the temperatures table in the CS544 database on MySQL.
  • src/producer.py:
    • Connects to Kafka (service name kafka or localhost:9092 if running directly in the Kafka container) and MySQL.
    • Initializes (deletes if exists, then creates) the temperatures Kafka topic with 4 partitions and 1 replica.
    • Continuously queries the temperatures table in MySQL for rows with an id greater than the last processed id.
    • For each row, it creates a Report protobuf object (defined in src/report.proto), serializes it to bytes, and sends it to the temperatures topic using the station ID as the key.
  • src/consumer.py:
    • Takes a partition number as a command-line argument (e.g., python consumer.py 0).
    • Connects to Kafka and HDFS (NameNode at boss:9000).
    • Assigns itself to the specified partition of the temperatures topic.
    • Checkpointing & Recovery:
      • On startup, checks for a partition-<N>.json file in the container's root directory (/).
      • If found, reads the batch_id and offset, then seeks the Kafka consumer to this offset.
      • If not found, starts from the beginning of the partition (offset 0).
    • Polls Kafka for messages in batches.
    • Deserializes protobuf messages.
    • Converts the batch of messages to a Pandas DataFrame.
    • Writes the DataFrame to a Parquet file in HDFS (hdfs://boss:9000/data/partition-N-batch-M.parquet) atomically (write to .tmp then rename).
    • After a successful HDFS write, updates/creates the partition-N.json checkpoint file with the new batch_id and the next offset.
  • src/debug.py:
    • A simpler consumer that subscribes to the temperatures topic (automatic partition assignment).
    • Deserializes and prints messages to the console, showing station_id, date, degrees, and the partition it was read from.
  • src/report.proto:
    • Defines the structure of the Report message used for weather data:
      syntax = "proto3";
      
      message Report {
        string date = 1;      // Date of observation (YYYY-MM-DD)
        double degrees = 2;   // Observed max temperature
        string station_id = 3;// Station ID
      }
  • src/report_pb2.py: The Python module generated by the Protocol Buffer compiler (protoc) from report.proto. This file is imported by the producer and consumers to work with Report messages.
  • src/requirements.txt: Lists necessary Python libraries such as kafka-python, SQLAlchemy, mysql-connector-python, protobuf, pandas, pyarrow. These are typically installed in the Kafka container.

6. Steps to Build and Run

These instructions assume you have Docker and Docker Compose installed.

  1. Set Project Environment Variable: This variable is used by docker-compose.yml to name containers and services.

    • Linux/macOS (bash/zsh):
      export PROJECT=p7
    • Windows (Command Prompt):
      set PROJECT=p7
    • Windows (PowerShell):
      $env:PROJECT = "p7"
  2. Build Docker Images: Navigate to the project's root directory (where docker-compose.yml is located) and run:

    docker build . -f Dockerfile.hdfs     -t ${PROJECT}-hdfs
    docker build . -f Dockerfile.kafka    -t ${PROJECT}-kafka
    docker build . -f Dockerfile.namenode -t ${PROJECT}-nn
    docker build . -f Dockerfile.datanode -t ${PROJECT}-dn
    docker build . -f Dockerfile.mysql    -t ${PROJECT}-mysql

    (On Windows, you might need to replace ${PROJECT} with %PROJECT% in Command Prompt or $env:PROJECT in PowerShell if running these commands directly without a shell script that handles variable expansion).

  3. Start All Services: This command starts Kafka, MySQL, HDFS NameNode, and HDFS DataNodes in detached mode (-d).

    docker compose up -d

    Wait a minute or two for all services to initialize, especially HDFS.

  4. Compile Protocol Buffer Definition: This command needs to be run to generate src/report_pb2.py from src/report.proto. You can run this inside the Kafka container or locally if you have grpcio-tools installed.

    • Option A (Recommended: Inside the Kafka container):
      docker exec -w /src ${PROJECT}-kafka python -m grpc_tools.protoc -I=. --python_out=. report.proto
    • Option B (Locally, if Python and grpcio-tools are installed): Ensure your current directory is src or adjust paths.
      python -m grpc_tools.protoc -I=. --python_out=. report.proto
      (Use python3 if that's your Python 3 executable).
  5. Run the Weather Generator: This starts the script that populates the MySQL database. It runs in the background (-d).

    docker exec -d -w /src ${PROJECT}-kafka python3 weather_generator.py
  6. Run the Kafka Producer: This starts the script that reads from MySQL and produces messages to Kafka. It runs in the background (-d).

    docker exec -d -w /src ${PROJECT}-kafka python3 producer.py
  7. Run Kafka Consumer(s): These scripts consume messages from Kafka partitions and write them to HDFS. Run them in interactive mode (-it) to see their output or monitor them. You should run one consumer per partition you want to process (the default setup has 4 partitions: 0, 1, 2, 3).

    • For partition 0:
      docker exec -it -w /src ${PROJECT}-kafka python3 consumer.py 0
    • For partition 1 (in a new terminal):
      docker exec -it -w /src ${PROJECT}-kafka python3 consumer.py 1
    • And so on for other partitions (e.g., consumer.py 2, consumer.py 3).
  8. (Optional) Run the Debug Consumer: To see the raw flow of messages in Kafka from all partitions:

    docker exec -it -w /src ${PROJECT}-kafka python3 debug.py

7. Verification

  • Check MySQL Data: Connect to the MySQL container and query the temperatures table:

    docker exec -it ${PROJECT}-mysql-1 mysql -u root -pabc CS544

    Then, in the MySQL prompt:

    SELECT COUNT(*) FROM temperatures;
    SELECT * FROM temperatures ORDER BY id DESC LIMIT 10;
    exit
  • Check Kafka Topics (Optional, requires Kafka tools in container or locally): You can list topics to ensure temperatures was created.

    docker exec ${PROJECT}-kafka kafka-topics --bootstrap-server localhost:9092 --list
  • Check HDFS Output: List files in the HDFS /data/ directory:

    docker exec ${PROJECT}-kafka hdfs dfs -ls hdfs://boss:9000/data/

    You should see Parquet files appearing (e.g., partition-0-batch-0.parquet). To view the content of a Parquet file (first few rows, requires parquet-tools or similar in HDFS/Kafka container, or copy file out):

    # Example: Copy a file out to inspect locally
    # docker cp ${PROJECT}-kafka:/data/partition-0-batch-0.parquet ./
    # Then use a local Parquet viewer or Python with pyarrow
  • Check Consumer Logs: The output from the consumer.py scripts will show messages about batches being written and checkpoints being saved.

  • Check Checkpoint Files: List files in the root directory of a Kafka consumer container (though consumers write to / inside ${PROJECT}-kafka):

    docker exec ${PROJECT}-kafka ls -l /

    You should see partition-0.json, partition-1.json, etc. To view a checkpoint file:

    docker exec ${PROJECT}-kafka cat /partition-0.json

8. Stopping the Application

  1. Stop and Remove Containers, Networks, and Volumes defined in docker-compose.yml:
    docker compose down
    If you want to remove volumes to clear HDFS and MySQL data:
    docker compose down -v

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published