-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtasks.py
More file actions
199 lines (174 loc) · 7.58 KB
/
tasks.py
File metadata and controls
199 lines (174 loc) · 7.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
from invoke import task
from pathlib import Path
import os
import shutil
import platform
from locovote.mcas import download_mcas_data
from locovote.mass_gov_div_local_services import download_tables as download_financial_data
from locovote.general_fund import download_general_fund_data
from locovote.download_dor_data import download_population_data, download_tax_levies_data, download_tax_rates_data
from locovote.mcas_parquet import clean_mcas
from locovote.finances import clean_dor_data
from locovote.clean_general_fund import main as clean_general_fund
from locovote.clean_population import clean_population_data
from locovote.clean_tax_levies import clean_tax_levies_data
from locovote.clean_tax_rates import clean_tax_rates_data
DATA_DIR = Path("./data")
RAW_DIR = DATA_DIR / "raw"
PROCESSED_DIR = DATA_DIR / "processed"
# Define subdirectories for different data sources
DOR_GENERAL_FUND_DIR = RAW_DIR / "dor-general-fund"
DOR_COMMUNITY_COMPARISON_DIR = RAW_DIR / "dor-community-comparison"
def get_download_dir():
"""Get appropriate download directory based on environment."""
if platform.system() == 'Darwin':
# On macOS, use Downloads directory to avoid permissions issues
return Path.home() / "Downloads"
else:
# For other environments, use the raw directory
return RAW_DIR
def handle_macos_download(download_func, target_dir):
"""Handle downloads on macOS by downloading to Downloads dir and moving files.
Args:
download_func: Function that performs the download and returns list of downloaded files
target_dir: Directory where files should end up
"""
target_dir.mkdir(parents=True, exist_ok=True)
if platform.system() == 'Darwin':
download_dir = get_download_dir()
downloaded_files = download_func(download_dir)
# Move downloaded files to target directory
for file in downloaded_files:
shutil.move(str(file), str(target_dir / file.name))
else:
download_func(target_dir)
@task
def download_mcas(c):
"""Download MCAS (Massachusetts Comprehensive Assessment System) data."""
RAW_DIR.mkdir(parents=True, exist_ok=True)
download_mcas_data(RAW_DIR)
@task
def download_dor_community_comparisons(c):
"""Download Massachusetts Department of Revenue (DOR) Community Comparison Reports."""
if DOR_COMMUNITY_COMPARISON_DIR.exists():
print(f"Directory {DOR_COMMUNITY_COMPARISON_DIR} already exists. Skipping download.")
return
handle_macos_download(
download_func=download_financial_data,
target_dir=DOR_COMMUNITY_COMPARISON_DIR
)
@task
def download_dor_general_fund(c):
"""Download Massachusetts Department of Revenue (DOR) General Fund data."""
if DOR_GENERAL_FUND_DIR.exists():
print(f"Directory {DOR_GENERAL_FUND_DIR} already exists. Skipping download.")
return
handle_macos_download(
download_func=download_general_fund_data,
target_dir=DOR_GENERAL_FUND_DIR
)
@task(download_mcas)
def clean_mcas_data(c):
"""Clean and process MCAS achievement data."""
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
input_path = RAW_DIR / "MCAS_Achievement_Results.csv"
output_path = PROCESSED_DIR / "mcas.db"
if output_path.exists():
print(f"File {output_path} already exits. Skipping cleaning.")
else:
clean_mcas(input_path=str(input_path), output_path=str(output_path))
@task(download_dor_community_comparisons, download_dor_general_fund)
def clean_finance_data(c):
"""Clean and process Massachusetts municipal financial data."""
paths = {
"demographics": DOR_COMMUNITY_COMPARISON_DIR / "CommunityComparisonGeneral.xlsx",
"revenue": DOR_COMMUNITY_COMPARISON_DIR / "CC_Revenue_by_Source.xlsx",
"levies": DOR_COMMUNITY_COMPARISON_DIR / "CC_Levies_and_Tax_by_Class.xlsx",
"spending": DOR_GENERAL_FUND_DIR / "GenFundExpenditures2023.xlsx",
"municipalities": PROCESSED_DIR / "municipalities.csv",
}
for k, v in paths.items():
paths[k] = str(v)
clean_dor_data(paths)
@task(download_dor_general_fund)
def clean_dor_general_fund(c):
"""Clean and process Massachusetts DOR General Fund data."""
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
output_path = PROCESSED_DIR / "combined_general_fund.arrow"
if output_path.exists():
print(f"File {output_path} already exists. Skipping cleaning.")
else:
clean_general_fund()
@task
def download_population(c):
"""Download Massachusetts population data from DOR."""
print("Downloading population data...")
target = RAW_DIR / "population.xlsx"
if target.exists():
print(f"Population data already downloaded: {target}")
return
downloaded_file = download_population_data(download_dir=RAW_DIR)
if downloaded_file:
print(f"Population data downloaded successfully to: {downloaded_file}")
else:
print("Failed to download population data")
@task(download_population)
def clean_population_data_task(c):
"""Clean and process Massachusetts population data."""
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
output_path = PROCESSED_DIR / "population.arrow"
if output_path.exists():
print(f"File {output_path} already exists. Skipping cleaning.")
else:
input_path = RAW_DIR / "population.xlsx"
clean_population_data(input_path=input_path, output_path=output_path)
@task
def download_tax_levies(c):
"""Download Massachusetts tax levies by class data from DOR."""
print("Downloading tax levies data...")
target = RAW_DIR / "tax_levies_data.xlsx"
if target.exists():
print(f"Tax levies data already downloaded: {target}")
return
downloaded_file = download_tax_levies_data(download_dir=RAW_DIR)
if downloaded_file:
print(f"Tax levies data downloaded successfully to: {downloaded_file}")
else:
print("Failed to download tax levies data")
@task
def download_tax_rates(c):
"""Download Massachusetts tax rates by class data from DOR."""
print("Downloading tax rates data...")
target = RAW_DIR / "tax_rates_data.xlsx"
if target.exists():
print(f"Tax rates data already downloaded: {target}")
return
downloaded_file = download_tax_rates_data(download_dir=RAW_DIR)
if downloaded_file:
print(f"Tax rates data downloaded successfully to: {downloaded_file}")
else:
print("Failed to download tax rates data")
@task(download_tax_levies)
def clean_tax_levies_data_task(c):
"""Clean and process Massachusetts tax levies data."""
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
output_path = PROCESSED_DIR / "tax-levies.arrow"
if output_path.exists():
print(f"File {output_path} already exists. Skipping cleaning.")
else:
input_path = RAW_DIR / "tax_levies_data.xlsx"
clean_tax_levies_data(input_path=input_path, output_path=output_path)
@task(download_tax_rates)
def clean_tax_rates_data_task(c):
"""Clean and process Massachusetts tax rates data."""
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)
output_path = PROCESSED_DIR / "tax-rates.arrow"
if output_path.exists():
print(f"File {output_path} already exists. Skipping cleaning.")
else:
input_path = RAW_DIR / "tax_rates_data.xlsx"
clean_tax_rates_data(input_path=input_path, output_path=output_path)
@task(clean_mcas_data, clean_finance_data, clean_dor_general_fund, clean_population_data_task, clean_tax_levies_data_task, clean_tax_rates_data_task)
def clean_data(c):
"""Run all data cleaning tasks in the correct order."""
pass