Skip to content

KiranRajeev-KV/devstats-pipeline

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 

Repository files navigation

devstats-pipeline

Go Version License

A Go-based concurrent data processing pipeline that extracts demographic metrics from developer survey CSV data.

Table of Contents

Overview

devstats-pipeline is a high-performance data processing application built with Go that reads and analyzes developer survey data from CSV files. It uses Go's concurrency primitives (goroutines and channels) to efficiently process large datasets by parallelizing the workload across multiple workers.

The pipeline currently extracts two demographic metrics:

  • Age Distribution - Categorized age ranges
  • Employment Status - Employment types including students and retirees

Prerequisites

Quick Start

1. Clone and Navigate

cd devstats-pipeline

2. Prepare Your Data

Create the data directory and place your survey CSV file at:

data/survey2024/survey_results_public.csv

The CSV must contain headers: Age and Employment

3. Run the Pipeline

go run main.go

4. Build Binary (Optional)

go build -o devstats-pipeline main.go
./devstats-pipeline

Architecture

The pipeline follows a producer-worker-consumer pattern with broadcast distribution:

┌─────────────────────────────────────────────────────────────────────────┐
│                            CSV File                                     │
└─────────────────────────────────┬───────────────────────────────────────┘
                                  │
                                  ▼
                    ┌─────────────────────────┐
                    │   readRows (Producer)   │
                    │   Goroutine #1          │
                    └───────────┬─────────────┘
                                │
                                ▼
                    ┌─────────────────────────┐
                    │  parseRowsToMaps (Workers)│
                    │   Goroutine #2-4 (3 workers)│
                    └───────────┬─────────────┘
                                │
                                ▼
                    ┌─────────────────────────┐
                    │processedRowDistributor │
                    │   (Broadcast)           │
                    └───────────┬─────────────┘
                                │
              ┌─────────────────┼─────────────────┐
              ▼                 ▼                 ▼
     ┌──────────────┐   ┌──────────────┐   ┌──────────────┐
     │   AgeCh      │   │EmploymentCh │   │  Future...   │
     └──────┬───────┘   └──────┬───────┘   └──────────────┘
            │                  │
            ▼                  ▼
┌───────────────────┐  ┌───────────────────┐
│ageMetricExtractor │  │employmentMetric   │
│                   │  │Extractor           │
└─────────┬─────────┘  └─────────┬─────────┘
          │                      │
          ▼                      ▼
┌───────────────────┐  ┌───────────────────┐
│ageMetricReducer   │  │employmentMetric   │
│                   │  │Reducer             │
└─────────┬─────────┘  └─────────┬─────────┘
          │                      │
          └──────────┬───────────┘
                     ▼
            ┌─────────────────┐
            │   Print Results │
            └─────────────────┘

Components

Component Description
readHeaders() Reads CSV header row and returns headers + CSV reader
readRows() Producer goroutine - reads raw CSV rows and sends to channel
parseRowsToMaps() Worker goroutines (3) - converts raw CSV rows to map[string]string
processedRowDistibutor() Broadcasts processed rows to multiple metric extractors
ageMetricExtractor() Transforms age categories to simplified ranges
ageMetricReducer() Aggregates age counts into final metric
employmentMetricExtractor() Transforms employment status to standardized categories
employmentMetricReducer() Aggregates employment counts into final metric

Metrics

Age Distribution

The pipeline maps raw age values to simplified categories:

Raw Value Mapped Value
Under 18 years old 0-17
18-24 years old 18-24
25-34 years old 25-34
35-44 years old 35-44
45-54 years old 45-54
55-64 years old 55-64
65 years or older 65+
Prefer not to say Prefer not to say

Employment Status

The pipeline handles multiple employment types (semicolon-separated) and maps them to standardized categories:

Raw Value Mapped Value
Employed, full-time Full-time
Employed, part-time Part-time
Independent contractor, freelancer, or self-employed Self-employed
Not employed, but looking for work Looking for work
Not employed, and not looking for work Not looking for work
Student, full-time Student Full-time
Student, part-time Student Part-time
Retired Retired

Extending the Pipeline

To add a new metric (e.g., "Programming Languages"):

1. Add a New Extractor Function

func languagesMetricExtractor(langCh <-chan map[string]string, langMetricCh chan<- map[string]string) {
    defer close(langMetricCh)
    for row := range langCh {
        if languages, exists := row["Language"]; exists {
            // Transform and emit metric
            langMetricCh <- map[string]string{"Language": languages}
        }
    }
}

2. Add a New Reducer Function

func languagesMetricReducer(langCh <-chan map[string]string, outputCh chan<- map[string]int) {
    langMetric := make(map[string]int)
    for row := range langCh {
        if lang, exists := row["Language"]; exists {
            langMetric[lang]++
        }
    }
    outputCh <- langMetric
}

3. Wire It Up in main()

// Add channel
langCh := make(chan map[string]string, 1000)

// Add to distributor
metricExtractorChans := []chan map[string]string{ageCh, employmentCh, langCh}

// Start extractor and reducer
langMetricCh := make(chan map[string]string, 1000)
go languagesMetricExtractor(langCh, langMetricCh)

langOutputCh := make(chan map[string]int, 100)
go languagesMetricReducer(langMetricCh, langOutputCh)

// Print results
langMetric := <-langOutputCh
fmt.Println("Language Metrics:")
for lang, count := range langMetric {
    fmt.Printf("  %-25s : %d\n", lang, count)
}

Project Structure

devstats-pipeline/
├── main.go           # All application logic (single file)
├── go.mod            # Go module definition
├── go.sum            # Go dependencies (auto-generated)
├── .gitignore        # Ignores data/ directory
└── data/             # Survey data (gitignored)
    └── survey2024/
        └── survey_results_public.csv

This is a single-file Go application with no external dependencies (uses only Go standard library).

Data Format

The pipeline expects a CSV file with the following columns:

Age,Employment,...
"25-34 years old","Employed, full-time",...
"18-24 years old","Student, full-time",...

Required Columns

Column Description Example Values
Age Respondent's age range "Under 18 years old", "25-34 years old", "65 years or older"
Employment Employment status (can be semicolon-separated for multiple) "Employed, full-time", "Student, full-time;Employed, part-time"

Expected CSV Format

  • Header row with column names
  • Comma-separated values
  • Values may contain spaces
  • Multiple employment statuses separated by semicolons (;)

About

High-performance Go application that processes CSV developer surveys using goroutines and channels to extract demographic metrics.

Resources

Stars

Watchers

Forks

Contributors

Languages