Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .github/workflows/docker-image.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
name: Docker Image CI

on:
push:
paths:
- "Dockerfile"
- "docker-compose.yml"
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:

build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v3
- name: docker login
env:
DOCKER_USER: ${{secrets.DOCKER_USER}}
DOCKER_PASSWORD: ${{secrets.DOCKER_PASSWORD}}
run: |
docker login -u $DOCKER_USER -p $DOCKER_PASSWORD

- name: Build the Docker image
run: docker build --tag ${{secrets.DOCKER_USER}}/k_bike_airflow:latest .

- name: docker push
run: docker push ${{secrets.DOCKER_USER}}/k_bike_airflow:latest
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
/logs/
.env
/dags/__pycache__/
.idea/*
14 changes: 14 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
FROM apache/airflow:2.8.0-python3.8
USER root

RUN apt-get update \
&& apt-get install -y --no-install-recommends \
vim \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*


USER airflow
COPY ./dags /opt/airflow/dags
RUN pip install --user --upgrade pip
102 changes: 102 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# 기상에 따른 서울시 자전거 대여 현황파악
## 프로젝트 주제

기상 정보에 따른 서울시 자전거 대여 현황을 파악할 수 있도록 데이터 파이프라인을 구성하고 이를 시각화

## 주제 선정이유 및 기대효과

적은 전처리와 Update 주기가 빠른 데이터를 사용하여 ETL 파이프라인 구축 경험을 위함

이 데이터들의 파이프라인을 통한 시각화로, 날씨정보와 서울시 공공자전거 현황을 함께 확인하여 사용자들의 편리한 이용에 도움을 줄 수 있음

## 프로젝트 역할분담

| 이름 | 역할 |
| --- | --- |
| 장태수 | 워크플로 관리, DatawareHouse 구축 |
| 김형인 | ETL관리, 대시보드 |
| 유승상 | AWS구축, ELT관리 |
| 주재민 | ETL관리, 대시보드 |
| 최아정 | ETL관리, 대시보드 |


## 프로젝트 아키텍처

![Untitled](./depj3/architecture.png)

- ec2
- Docker로 Airflow, Superset container 실행하기 위함
- 사양 - **Instance type** : **t3a.xlarge**(4 vCPU, 16GiB)

- airflow
- 개인 local에서 테스트 후 최종 DAG 사용

- snowflake
- redshift 비용문제로 snowflake 30일 무료제공 계정 사용
- 분석용 데이터(data warehouse), raw데이터(data lake) 모두 적재

## 프로젝트 진행과정

수집에 사용한 api

https://data.seoul.go.kr/dataList/OA-21285/F/1/datasetView.do

![Untitled](./depj3/api.png)

- 따릉이 관련 데이터 수집
- 따릉이 대여소 명
- 따릉이 대여소 ID
- 따릉이 주차건수
- 따릉이 거치대수
- 따릉이 거치율

- 날씨 관련 데이터 수집
- 온도
- 체감온도
- 강수확률
- 강수량
- 자외선 지수 단계
- 미세먼지농도
- 초미세먼지농도

### ETL 구성

![Untitled](./depj3/etl.png)

Api 자체 문제로 반환값이 없는 경우가 발생

⇒값이 없는경우 Task를 미리 실패하도록 예외처리

```python
if not records:
raise Exception('recodrds is empty')
```

### ELT 구성

분석용 데이터 data warehouse에 적재

![Untitled](./depj3/daily.png)
![Untitled](./depj3/week.png)


### 시각화 대시보드 구성

- 현재 기온, 강수확률, 평균 따릉이 거치율을 표시

![Untitled](./depj3/bignum.png)

- 서울 주요지역별 구체적인 기상정보 확인가능

![Untitled](./depj3/weather.png)

- 장소별 따릉이 거치소의 구체적 현황 확인가능

![Untitled](./depj3/local.png)







2 changes: 2 additions & 0 deletions create-env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/bin/sh
echo "AIRFLOW_UID=$(id -u)" > .env
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uid를 특별히 명시해서 사용한 이유가 있나요?

216 changes: 216 additions & 0 deletions dags/etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

사용하고 계신 airflow 2버전대에는 TriggerDagRunOperator 패키지가 다릅니다.

Suggested change
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

from airflow.models import Variable
from airflow.providers.snowflake.hooks import snowflake

import requests
from bs4 import BeautifulSoup
from datetime import datetime
from datetime import timedelta


places = ['강남역', '미아사거리역', '건대입구역', '광화문·덕수궁', 'DDP(동대문디자인플라자)', '뚝섬한강공원', '여의도한강공원', '서울숲공원', '난지한강공원', '홍대입구역(2호선)']


def get_Snowflake_connection(autocommit=True):
hook = snowflake.SnowflakeHook(snowflake_conn_id = 'snowflake_conn_raw')
conn = hook.get_conn()
conn.autocommit = autocommit
return conn.cursor()

def extract(**context):
link = context["params"]["url"]
task_instance = context["task_instance"]
execution_date = context["execution_date"]
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

실제 사용하지 않는 변수는 생성하지 않는 것이 좋겠습니다. (task_instance, execution_date)


return (link)

# 따릉이 transform

def sbike_transform(**context):
response = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
sbike_data = []
place_list = []

for place in places:
api_url = str(response) + place
place_list.append(api_url)

for res in place_list:
resp = requests.get(res)
data = BeautifulSoup(resp.text, "lxml")
sbikes = data.find('citydata').find('sbike_stts')

for sbike in sbikes:
sbike_spot = sbike.find('sbike_spot_nm').text # 따릉이 대여소 명
sbike_spot_id = sbike.find('sbike_spot_id').text # 따릉이 대여소 ID
sbike_parking_cnt = sbike.find('sbike_parking_cnt').text # 따릉이 주차 건수
sbike_rack_cnt = sbike.find('sbike_rack_cnt').text # 따릉이 거치대 수
sbike_shared = sbike.find('sbike_shared').text # 따릉이 거치율

sbike_data.append([res[80:], sbike_spot, sbike_spot_id, int(sbike_parking_cnt), int(sbike_rack_cnt), int(sbike_shared)])

return sbike_data

# 날씨 transform

def weather_transform(**context):
response = context["task_instance"].xcom_pull(key="return_value", task_ids="extract")
weather_data = []
place_list = []

for place in places:
api_url = str(response) + place
place_list.append(api_url)

for res in place_list:
resp = requests.get(res)
data = BeautifulSoup(resp.text, "lxml")
weathers = data.find('citydata').find('weather_stts')

for weather in weathers:
temp = weather.find('temp').text # 온도
sensible_temp = weather.find('sensible_temp').text # 체감온도
rain_chance = weather.find('rain_chance').text # 강수확률
precipitation = weather.find('precipitation').text # 강수량
uv_index_lvl = weather.find('uv_index_lvl').text # 자외선 지수 단계
pm10 = weather.find('pm10').text # 미세먼지농도
pm25 = weather.find('pm25').text # 초미세먼지농도

weather_data.append([res[80:], float(temp), float(sensible_temp), int(rain_chance), precipitation, int(uv_index_lvl), int(pm10), int(pm25)])


return weather_data

def sbike_load_func(**context):
schema = context["params"]["schema"]
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

실제 사용하지 않는 변수는 생성하지 않는 것이 좋겠습니다. (schema)

table = context["params"]["table"]
# convert timezone UTC -> KST
tmp_dt = datetime.now() + timedelta(hours=9)
created_at = tmp_dt.strftime('%Y-%m-%d %H:%M:%S')

records = context["task_instance"].xcom_pull(key="return_value", task_ids="sbike_transform")

cur = get_Snowflake_connection()

if not records:
raise Exception('records is empty')

try:
cur.execute("BEGIN;")

for r in records:
place = r[0]
sbike_spot = r[1]
sbike_spot_id = r[2]
sbike_parking_cnt = r[3]
sbike_rack_cnt = r[4]
sbike_shared = r[5]
insert_sql = f"INSERT INTO {table} VALUES ('{place}','{created_at}','{sbike_spot}', '{sbike_spot_id}', '{sbike_parking_cnt}', '{sbike_rack_cnt}', '{sbike_shared}')"
cur.execute(insert_sql)
cur.execute("COMMIT;")

except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise

def weather_load_func(**context):
schema = context["params"]["schema"]
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

실제 사용하지 않는 변수는 생성하지 않는 것이 좋겠습니다. (schema)

table = context["params"]["table"]
# convert timezone UTC -> KST
tmp_dt = datetime.now() + timedelta(hours=9)
created_at = tmp_dt.strftime('%Y-%m-%d %H:%M:%S')

records = context["task_instance"].xcom_pull(key="return_value", task_ids="weather_transform")

# BEGIN과 END를 사용해서 SQL 결과를 트랜잭션으로 만들어주는 것이 좋음
cur = get_Snowflake_connection()

if not records:
raise Exception('records is empty')

try:
cur.execute("BEGIN;")

for r in records:
place = r[0]
temp = r[1]
sensible_temp = r[2]
rain_chance = r[3]
precipitation = r[4]
uv_index_lvl = r[5]
pm10 = r[6]
pm25 = r[7]
insert_sql = f"INSERT INTO {table} VALUES ('{place}','{created_at}', '{temp}', '{sensible_temp}', '{rain_chance}', '{precipitation}', '{uv_index_lvl}', '{pm10}', '{pm25}')"
cur.execute(insert_sql)
cur.execute("COMMIT;")

except Exception as error:
print(error)
cur.execute("ROLLBACK;")
raise


dag = DAG(
dag_id = 'Seoul_data',
start_date = datetime(2024,1,1),
schedule = timedelta(minutes = 30),
max_active_runs = 1,
catchup = False,
default_args = {
'retries': 1,
'retry_delay': timedelta(minutes=3),
}
)


extract = PythonOperator(
task_id = 'extract',
python_callable = extract,
params = {
'url': Variable.get("url")
},
dag = dag)

sbike_transform = PythonOperator(
task_id = 'sbike_transform',
python_callable = sbike_transform,
params = {
},
dag = dag)

weather_transform = PythonOperator(
task_id = 'weather_transform',
python_callable = weather_transform,
params = {
},
dag = dag)

sbike_load = PythonOperator(
task_id = 'sbike_load',
python_callable = sbike_load_func,
params = {
'schema': 'RAW_DATA',
'table': 'SBIKE',
},
dag = dag)

weather_load = PythonOperator(
task_id = 'weather_load',
python_callable = weather_load_func,
params = {
'schema': 'RAW_DATA',
'table': 'WEATHER',
},
dag = dag)

trigger = TriggerDagRunOperator(
task_id='trigger_next_dag',
trigger_dag_id="get_latest_data", # 트리거하려는 다음 DAG의 ID
dag=dag,
)

[extract >> sbike_transform >> sbike_load, extract >> weather_transform >> weather_load] >> trigger

Loading