A distributed data analysis project that processes Wikipedia's entire revision history and category structure to compute topic classifications and bus factor metrics using Apache Spark RDD API.
- Overview
- Features
- Architecture
- Prerequisites
- Installation
- Dataset Setup
- Configuration
- Running Locally
- Running on AWS EMR
- Understanding the Output
- Project Structure
- Troubleshooting
- License
This project performs large-scale analysis on Wikipedia data (Up to 100+ GB compressed) to:
- Category Classification: Maps Wikipedia articles to root topic categories using an iterative label propagation algorithm
- Bus Factor Analysis: Calculates the "bus factor" for each topic category (minimum contributors needed for 50% of content)
The implementation demonstrates advanced Spark optimization techniques including broadcast joins, strategic persistence, and efficient aggregation patterns, achieving more than 2,22x speedup over baseline implementations.
- Used Dataset Size: 37,5 GB compressed
- Processing Time: ~79 minutes (optimized) vs >177 minutes (baseline)
- Shuffle Reduction: 70% less data shuffled
-
Two Implementation Versions:
- Baseline (non-optimized) for performance comparison
- Optimized version with advanced Spark techniques
-
Advanced Optimizations:
- Broadcast joins to eliminate massive shuffles
- ReduceByKey instead of GroupByKey for 50% shuffle reduction
- Strategic RDD caching and unpersistence
- Partition coalescing after filters
- LZ4 compression for shuffle optimization
-
Scalable Data Pipeline:
- Custom SQL dump parser for custom Wikipedia dataset size
- Parallel download scripts with aria2c support
- Splittable compression (bzip2) for distributed processing
- Checkpoint-based lineage truncation for iterative algorithms
-
Comprehensive Analysis:
- Hierarchical category propagation with bitmask encoding
- Top-N contributors tracking with PriorityQueue
- 2 output generation (bus factor + top contributors)
The category classification uses an iterative algorithm that propagates category labels through Wikipedia's hierarchy:
Iteration 0: [Root] β {Science, Arts, ...}
Iteration 1: [Root] β [Children] β {Physics, Biology, ...}
Iteration 2: [Root] β [Children] β [Grandchildren] β {...}
...
Convergence: All reachable nodes labeled
Uses bitmask encoding for efficient storage (64 categories in a single Long).
- OS: Linux/macOS/Windows 11
- RAM: 8 GB minimum (16+ GB recommended)
- Disk: 150+ GB free space for full datasets
- Java: JDK 8 or 11
- Scala: 2.12.x
- Spark: 3.5.0 or later
- sbt 1.9.0+
- IntelliJ IDEA (recommended) or any Scala IDE
- Git
- aria2c: For faster parallel downloads
- AWS CLI: For S3 operations and EMR management
git clone https://github.com/yourusername/wikipedia-spark-analysis.git
cd wikipedia-spark-analysissudo apt update
sudo apt install openjdk-11-jdk scalabrew install openjdk@11 scala sbtjava -version # Should show Java 11
scala -version # Should show Scala 2.12+
sbt --version # Should show sbt 1.9+# Download Spark
cd ~
wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar -xzf spark-3.5.0-bin-hadoop3.tgz
sudo mv spark-3.5.0-bin-hadoop3 /opt/spark
# Add to PATH
echo 'export SPARK_HOME=/opt/spark' >> ~/.bashrc
echo 'export PATH=$PATH:$SPARK_HOME/bin' >> ~/.bashrc
source ~/.bashrc
# Verify
spark-submit --version# Ubuntu/Debian
sudo apt install aria2
# macOS
brew install aria2# Ubuntu/Debian
sudo apt install awscli
# macOS
brew install awscli
# Verify
aws --versioncd wikipedia-spark-analysis
sh scripts/setup.sh
sbt assemblyThis creates: target/scala-2.12/wikipedia-analysis_2.12-1.0.jar
The project includes scripts to download Wikipedia dumps automatically.
# Download history for 2024 (adjust date range as needed)
./scripts/download_wiki_history.sh -s 2024-01 -e 2024-12
# Options:
# -v VERSION Dump version (default: 2025-11)
# -w WIKI Wiki name (default: enwiki)
# -d DIR Download directory (default: dataset/wikimedia_dumps)
# -s START_DATE Start date YYYY-MM (default: 2001-01)
# -e END_DATE End date YYYY-MM
# -n Dry run (generate URLs only)Download time: faster with aria2c (parallel), x3-4 time slower with curl (sequential)
# Download latest category dumps
./scripts/download_categories.sh
# Options:
# -w, --wiki WIKI Wikipedia language code (default: enwiki)
# -d, --date DATE Dump date YYYYMMDD (default: 20251201)
# -D, --dir DIRECTORY Download directory (default: ./dataset/sql_dumps)
# -n, --dry-run Show what would happenIf scripts fail, download manually from:
- History: https://dumps.wikimedia.org/other/mediawiki_history/
- Categories: https://dumps.wikimedia.org/enwiki/
Place files in:
dataset/
βββ wikimedia_dumps/ # History dumps (*.tsv.bz2)
βββ categories_dump/ # Category dumps (*.sql.bz2)
βββ enwiki-YYYYMMDD-categorylinks.sql.bz2
βββ enwiki-YYYYMMDD-linktarget.sql.bz2
βββ enwiki-YYYYMMDD-page.sql.bz2
Is recommended to follow the setup.sh script, but here are the steps for a manual configuratio
Create aws/ directory structure:
mkdir -p awsFile 1: aws/credentials
[default]
aws_access_key_id = YOUR_ACCESS_KEY_ID
aws_secret_access_key = YOUR_SECRET_ACCESS_KEYFile 2: aws/config
[default]
region = us-east-1
output = jsonCreate credentials file in project:
mkdir -p src/main/resourcesFile: src/main/resources/aws_credentials
YOUR_ACCESS_KEY_ID
YOUR_SECRET_ACCESS_KEY
Edit src/main/scala/utils/Config.scala:
object Config {
// S3 bucket name (create via AWS Console first)
val s3BucketName = "your-bucket-name-here"
// Local project directory (absolute path)
val projectDir = "/home/youruser/wikipedia-spark-analysis"
// S3 paths (relative to bucket root)
val s3DatasetPath = "datasets/"
val s3OutputPath = "output/"
val s3HistoryPath = "spark-logs/"
// Credentials file location (relative to resources/)
val credentialsPath: String = "/aws_credentials"
}# Create bucket
aws s3 mb s3://your-bucket-name-here --region us-east-1
# Upload datasets
aws s3 sync dataset/wikimedia_dumps/ s3://your-bucket-name-here/datasets/wikimedia_dumps/
aws s3 sync dataset/categories_dump/ s3://your-bucket-name-here/datasets/categories_dump/# Build
sbt assembly
# Run optimized version on sample data
spark-submit \
--class JobLauncher \
--master local[*] \
--driver-memory 4g \
--executor-memory 4g \
target/scala-2.12/wikipedia-analysis_2.12-1.0.jar \
local [cat|bus] overwrite optimized
# Category analysis only
spark-submit --class JobLauncher ... local cat overwrite optimized
# Bus factor analysis only
spark-submit --class JobLauncher ... local bus overwrite optimized
JobLauncher <deployment-mode> <job-type> <write-rule> <version>| Argument | Options | Description |
|---|---|---|
deployment-mode |
local, remote |
Where to run (local Spark vs AWS EMR) |
job-type |
cat, bus, all |
Which analysis to run |
write-rule |
overwrite, skip |
Overwrite existing outputs or skip |
version |
baseline, optimized |
Which implementation to run |
# Spark UI (while running)
open http://localhost:4040
Results are saved to:
- Optimized:
output/ - Baseline:
output_baseline/
Files:
output/
βββ page_to_root_categories/ # Article β Category mappings
βββ root_category_indices.tsv # Category index
βββ bus_factor.tsv # Bus factor per category
βββ top_contributors.tsv # Top contributors per category
-
Go to EMR β Clusters β Create cluster
-
Choose:
- Release: emr-7.0.0 (Spark 3.5.0)
- Applications: Spark, Hadoop
- Instance type: m4.xlarge or larger
- Number of instances: 1 master + n core
- EC2 key pair: Select your SSH key
-
Under "Security configuration":
- Enable "Cluster visible on the internet"
- Configure security groups for SSH access
aws emr create-cluster \
--name "Wikipedia-Spark-Analysis" \
--release-label emr-7.0.0 \
--applications Name=Spark Name=Hadoop \
--ec2-attributes KeyName=your-key-pair,SubnetId=subnet-xxxxx \
--instance-type m4.xlarge \
--instance-count 5 \
--use-default-roles \
--log-uri s3://your-bucket-name-here/spark-logs/ \
--region us-east-1Wait 10-15 minutes for cluster to start.
sbt package
aws s3 cp target/scala-2.12/wikipedia-analysis_2.12-1.0.jar \
s3://your-bucket-name-here/jars/Define a new Run/Debug Configuration with the following settings:
- Configuration type: Spark Submit - Cluster
- Name: Spark Cluster
- Region: us-east-1
- Remote Target: Add EMR connection
- Authentication type: Profile from credentials file
- Select "Set custom config" and give the paths to the "config" and "credential" files
- Click on "Test connection" to verify
- Enter a new SSH Configuration
- Host: the address of the primary node of the cluster, i.e., the MasterPublicDnsName (see AWS CLI cheatsheet)
- Username: hadoop
- Authentication type: Key pair
- Private key file: point to your .ppk
- Test the connection
- Application: point to the .jar file inside the build/libs folder of this repository; if you don't find it, build the project
- Class: JobLauncher
- Run arguments: Select your desired configuration, e.g.:
remote all overwrite optimized - Before launch: Upload Files Through SFTP
- Hit the Run button and wait for the application to finish (you should see "final status: SUCCEEDED" in the last Application report).
- Check out the results in the "output" folder in your S3 bucket (via CLI or GUI).7
# SSH to master node
ssh -i your-key.pem hadoop@<master-public-dns>
# Copy JAR from S3
aws s3 cp s3://your-bucket-name-here/jars/wikipedia-analysis_2.12-1.0.jar ./
# Submit job
spark-submit \
--class JobLauncher \
--master yarn \
--deploy-mode cluster \
--driver-memory 4g \
--executor-memory 8g \
--executor-cores 2 \
--num-executors 4 \
wikipedia-analysis_2.12-1.0.jar \
remote all overwrite optimized# SSH tunnel
ssh -i your-key.pem -L 8088:localhost:8088 hadoop@<master-public-dns>
# Open browser
open http://localhost:8088/cluster# SSH tunnel
ssh -i your-key.pem -L 18080:localhost:18080 hadoop@<master-public-dns>
# Open browser
open http://localhost:18080# Via SSH
ssh -i your-key.pem hadoop@<master-public-dns>
yarn logs -applicationId application_XXXXXXXXXXXXX_XXXX
# Or download from S3
aws s3 sync s3://your-bucket-name-here/spark-logs/ ./logs/# Download outputs
aws s3 sync s3://your-bucket-name-here/output/ ./output_aws/
# View results
cat output_aws/bus_factor.tsv
cat output_aws/top_contributors.tsvaws emr terminate-clusters --cluster-ids j-XXXXXXXXXXXXXroot_category_indices.tsv
12345 Science 0
67890 Arts 1
11111 Technology 2
Format: PageID \t CategoryName \t BitIndex
page_to_root_categories/part-***
987654 7 # Page 987654 belongs to categories at bit positions 0,1,2 (bitmask: 111 = 7)
123456 5 # Page 123456 belongs to categories at bit positions 0,2 (bitmask: 101 = 5)
Format: PageID \t BitMask
Decode bitmask:
val mask = 7 // Binary: 0111
// Bit 0 set: Science
// Bit 1 set: Arts
// Bit 2 set: Technology
// Result: Page belongs to Science, Arts, Technologybus_factor.tsv
Science 142 450000000
Arts 89 120000000
Technology 203 780000000
Format: Category \t BusFactor \t TotalBytes
Interpretation:
- Science: Minimum 142 contributors needed for 50% of content
- Arts: Minimum 89 contributors needed for 50% of content
- Lower bus factor = more concentrated contributions = higher risk
top_contributors.tsv
Science Alice 12000000
Science Bob 8500000
Arts Charlie 4300000
Format: Category \t Username \t BytesContributed
Generating Charts
A Python script is provided to generate comprehensive visualizations of the analysis results:
# Generate all charts from output data
python scripts/chart_gen.pyRequirements:
pip install pandas matplotlib seaborn numpywikipedia-spark-analysis/
βββ src/main/scala/
β βββ JobLauncher.scala # Entry point - routes to jobs
β βββ wikipediaCategoryAnalysis.scala # Optimized category analysis
β βββ NonOptimized_wikipediaCategoryAnalysis.scala # Baseline (non-optimized)
β βββ wikipediaBusFactorAnalysis.scala # Optimized bus factor
β βββ NonOptimized_wikipediaBusFactorAnalysis.scala # Baseline bus factor
β βββ mediaWikiHistorySchema.scala # Schema definitions
β βββ utils/
β βββ Commons.scala # Common utilities
β βββ Config.scala # Configuration
β
βββ src/main/resources/
β βββ aws_credentials # AWS keys (DO NOT COMMIT)
β
βββ scripts/
β βββ download_wiki_history.sh # Download revision history
β βββ download_categories.sh # Download category dumps
β βββ setup.sh # Initialization script
β βββ split_stream.py # SQL dump line splitter
β
βββ dataset/ # Local datasets (gitignored)
β βββ wikimedia_dumps/ # Revision history
β βββ categories_dump/ # Category structure
β
βββ output/ # Optimized job outputs
βββ output_baseline/ # Baseline job outputs
βββ checkpoints/ # Spark checkpoints
β
βββ docs/
β βββ PERFORMANCE_ANALYSIS.md # Performance report template
β βββ OPTIMIZATION_DIFFERENCES.md # Optimization details
β
βββ build.sbt # SBT build configuration
βββ .gitignore # Git ignore rules
βββ README.md # This file
Problem: aria2c not found
# Solution: Install aria2c
sudo apt install aria2 # Ubuntu
brew install aria2 # macOSProblem: Download stuck or slow
# Solution: Resume download
# The script uses --continue=true, so just re-run:
./scripts/download_wiki_history.sh -s 2024-01 -e 2024-01Problem: Wikimedia dump date doesn't exist
# Solution: Check available dates
curl -s https://dumps.wikimedia.org/enwiki/ | grep -o 'href="[0-9]*/"' | grep -o '[0-9]*'
# Use an available date
./scripts/download_categories.sh -d 20241101Problem: AWS credentials not found
# Solution: Verify credentials setup
cat ~/.aws/credentials # Should show your keys
aws s3 ls # Should list buckets
# Re-configure if needed
aws configureProblem: Access Denied when accessing S3
# Solution: Check IAM permissions
# Your IAM user needs:
# - s3:GetObject
# - s3:PutObject
# - s3:ListBucket
# On your bucket
# Verify bucket exists
aws s3 ls s3://your-bucket-name-here/Problem: Spark can't read from S3
# Solution: Verify aws_credentials file
cat src/main/resources/aws_credentials
# Should show:
# Line 1: access_key_id
# Line 2: secret_access_key
# Rebuild
sbt clean packageProblem: Job failed - executor lost
# Solution: Increase executor memory
spark-submit --executor-memory 12g ...
# Or reduce partition count
--conf spark.default.parallelism=100Problem: OutOfMemoryError: GC overhead limit exceeded
# Solution: Tune GC settings
--conf spark.executor.extraJavaOptions="-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35"Problem: Slow shuffle performance
# Solution: Enable compression
--conf spark.shuffle.compress=true \
--conf spark.io.compression.codec=lz4Problem: Class not found: JobLauncher
# Solution: Verify JAR packaging
jar tf target/scala-2.12/wikipedia-analysis_2.12-1.0.jar | grep JobLauncher
# Should show: JobLauncher.class
# If missing, rebuild:
sbt clean packageProblem: Can't SSH to master node
# Solution 1: Check security group
# EMR master security group must allow:
# - Port 22 (SSH) from your IP
# Solution 2: Verify key permissions
chmod 400 your-key.pem
# Solution 3: Check cluster state
aws emr describe-cluster --cluster-id j-XXXXX | grep State
# Should be "WAITING" or "RUNNING"Problem: Spark UI not accessible
# Solution: Use SSH tunnel
ssh -i your-key.pem -L 8088:localhost:8088 -L 18080:localhost:18080 hadoop@<master-dns>
# Then access:
# http://localhost:8088 (YARN)
# http://localhost:18080 (Spark History)Problem: Job submitted but not running
# Solution: Check YARN logs
ssh -i your-key.pem hadoop@<master-dns>
yarn application -list
yarn logs -applicationId application_XXXXXProblem: sbt: command not found
# Solution: Install SBT
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add
sudo apt update
sudo apt install sbtProblem: java.lang.OutOfMemoryError during build
# Solution: Increase SBT memory
export SBT_OPTS="-Xmx4G -XX:+UseConcMarkSweepGC"
sbt clean packageProblem: Dependency resolution timeout
# Solution: Retry or use different resolver
sbt clean update
# If still fails, check your internet connectionProblem: "Checkpoint directory does not exist"
# Solution: Create checkpoint directory
mkdir -p checkpoints/
# On S3:
aws s3 mb s3://your-bucket/checkpoints/Problem: Results are empty
# Solution: Check input data paths in Config.scala
# Verify files exist:
ls -lh dataset/wikimedia_dumps/
ls -lh dataset/categories_dump/
# Check logs for parsing errors
grep -i "error\|exception" /tmp/spark-*.logProblem: Different results between baseline and optimized
# This should NOT happen!
# Solution: Verify correctness
diff output/bus_factor.tsv output_baseline/bus_factor.tsv
# If different, there's a bug - please open an issue!OutOfMemoryError:
# Increase driver/executor memory
spark-submit --driver-memory 8g --executor-memory 8g ...Shuffle Fetch Failed:
# Increase shuffle partitions
--conf spark.sql.shuffle.partitions=400Task Not Serializable:
// Make sure all closures only use serializable objects
// Move non-serializable variables outside map/filter functionsThis project is licensed under the MIT License - see the LICENSE file for details.
Last Updated: February 2025
Version: 1.0.0

