- 1. ๋ชจ๋ ๋ฐ์ดํฐ ์์ง๋์ด๋ง
- 2. Batch & Stream Processing
- 3. Dataflow Orchestration
- 4. Apache Spark ํ๊ฒฝ ์ค์
- 5. spark ๊ธฐ๋ณธ
- hadoop์ ํน์ง
- spark์ ํน์ง
- spark Cluster
- ๊ฐ์ธ ์ปดํจํฐ์์๋ spark๊ฐ ๋๋ฆฐ ์ด์
- spark์ ํต์ฌ ๋ฐ์ดํฐ ๋ชจ๋ธ RDD
- Pandas vs Spark
- Spark ๋ฒ์ ๋ณ ํน์ง
- Spark ๊ตฌ์ฑ
- RDD๋
- RDD ํน์ง - 1. ๋ฐ์ดํฐ ์ถ์ํ
- RDD ํน์ง - 2. ํ๋ ฅ์ & ๋ถ๋ณ
- RDD ํน์ง - 3. Type-safe
- RDD ํน์ง - 4. Unstructured / Structured Data
- RDD ํน์ง - 5. Lazy
- Spark Operation
- RDD๋ฅผ ์ฐ๋ ์ด์
- ์ฐ๋ฒ ํธ๋ฆฝ์ ์ธ๊ธฐ
- 6. ๋ณ๋ด์ฒ๋ฆฌ์ ๋ถ์ฐ์ฒ๋ฆฌ
- 7. RDD
- Key-Value RDD ์ด๋
- Single-Value RDD vs Key-Value RDD
- Key-Value RDD ๊ฐ๋
- Key-Value RDD - Reduction
- Key-Value RDD - Join
- Key-Value RDD - Mapping values
- Key-Value RDD - ์์
- RDD Transformations vs Actions
- Transformations
- Narrow Transformations
- Wide Transformations
- Lazy ์ฐ์ฐ์ ์ฅ์
- Storage Level
- Cache & Persist
- Master Worker Topology
- Spark ๋์ ๊ณผ์
- Reduction Operations
- Parallel Reduction
- Reduction Actions
- Reduce
- Partition
- Fold
- Fold & Partition
- GroupBy
- Aggregate
- Key-Value RDD Transformations & Actions
- Key-Value RDD - GroupByKey
- Key-Value RDD - ReduceByKey
- Key-Value RDD - mapValues
- Key-Value RDD - countByKey
- Key-Value RDD - keys()
- Key-Value RDD - Joins
- Shuffling
- Partitioner๋ฅผ ์ด์ฉํ ์ฑ๋ฅ ์ต์ ํ (Shuffle ์ต์ํ)
- Partition์ ๋ชฉ์
- Partition ํน์ง
- Partition์ ์ข ๋ฅ
- Hash Partitioning
- Range Partitioning
- Memory & Disk Partition
- Disk Partition
- Repartition & Coalesce
- ์ฐ์ฐ ์ค์ ํํฐ์ ์ ๋ง๋๋ ์์ ๋ค
- map vs mapValues
- 8. Spark SQL
- Structured Data vs Unstructured Data
- Structured Data vs RDDs
- Spark SQL
- Spark SQL์ ๋ชฉ์
- Spark SQL ์๊ฐ
- DataFrame
- SparkSession
- DataFrame ๋ง๋๋ ๋ฒ
- RDD๋ก๋ถํฐ DataFrame ๋ง๋ค๊ธฐ
- ํ์ผ๋ก๋ถํฐ DataFrame ๋ง๋ค๊ธฐ
- DataFrame์ ๋ฐ์ดํฐ๋ฒ ์ด์ค ํ ์ด๋ธ์ฒ๋ผ ์ฌ์ฉํ๊ธฐ
- Spark์์ ์ฌ์ฉํ ์ ์๋ SQL๋ฌธ
- Python์์ Spark SQL ์ฌ์ฉํ๊ธฐ
- RDD๋ฅผ ์ฌ์ฉ์ํ๊ณ DataFrame์ ์ฌ์ฉํ์ ๋์ ์ฅ์
- Datasets
- SQL ์ค์ต
- DataFrame ํน์ง
- DataFrame์ ์คํค๋ง๋ฅผ ํ์ธํ๋ ๋ฒ
- DataFrame Operations
- DataFrame Select
- DataFrame Agg
- DataFrame GroupBy
- DataFrame Join
- Spark SQL๋ก ํธ๋ฆฝ ์ ์ธ๊ธฐ
- Spark SQL๋ก ๋ด์์ ๊ฐ ํ์ ๊ตฌ ๋ณ ๋ฐ์ดํฐ ์ถ์ถํ๊ธฐ
- Spark์ ๋๊ฐ์ ์์ง
- Logical Plan์ด๋
- Physical Plan์ด๋
- Catalyst ๋
- Catalyst Logical Plan -> Physical Plan ๋์ ์์
- Catalyst Pipeline
- Logical Planning ์ต์ ํ
- Explain
- Tungsten
- UDF
- ๋ด์ ํ์ ๋ฐ์ดํฐ ๋ถ์
- 9. MLlib
- MLlib์ด๋
- Machine Learning ์ด๋
- MLlib์ ์ฌ๋ฌ ์ปดํฌ๋ํธ
- ML ํ์ดํ๋ผ์ธ ๊ตฌ์ฑ
- MLlib์ผ๋ก ํ ์ ์๋ ๊ฒ๋ค
- MLlib์ DataFrame์์์ ๋์ํ๋ค.
- MLlib์ ์ฃผ์ Components
- MLlib - Transformer
- MLlib - Estimator
- MLlib - Evaluator
- MLlib - Pipeline
- ์ฒซ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ
- ALS ์ถ์ฒ ์๊ณ ๋ฆฌ์ฆ
- ์ถ์ฒ์ด๋
- ์ํ ์ถ์ฒ ํ์ดํ๋ผ์ธ ๊ตฌ์ถ
- Supervised Leaning
- ํ์๋น ์์ธกํ๊ธฐ1
- ํ์๋น ์์ธกํ๊ธฐ2
- ํ์ดํผ ํ๋ผ๋ฏธํฐ ์ต์ ํ
- ๋ชจ๋ธ ์ ์ฅ & ๋ก๋ฉ
- 10. Spark Streaming
- 11. Apache Airflow
- Apache Airflow๋
- ์ํฌํ๋ก์ฐ ๊ด๋ฆฌ ๋ฌธ์
- cron script์ ๊ฐ์ ๊ธฐ์กด ๋ฐฉ์์ ๋ฌธ์ ์
- AirFlow๋
- Workflow๋
- Airflow์ ๊ตฌ์ฑ์์
- Operator
- ์์ (Task)
- Airflow์ ์ ์ฉ์ฑ
- Airflow์ One Node Architecture
- Airflow์ Multi Node Architecture
- Airflow ๋์ ๋ฐฉ์
- DAG์ ์์ฑ๊ณผ ์คํ
- Airflow ์ค์น
- Airflow CLI command
- Airflow DAGs ๋์๋ณด๋
- Airflow DAG View
- NFT ํ์ดํ๋ผ์ธ ํ๋ก์ ํธ ์๊ฐ
- NFT ํ์ดํ๋ผ์ธ - DAG Skeleton
- Airflow - ๋ด์ฅ Operators
- Airflow - Action Operator
- NFT ํ์ดํ ๋ผ์ธ - create table task ์ถ๊ฐ
- NFT ํ์ดํ ๋ผ์ธ - Sensor ๋ก API ํ์ธํ๊ธฐ
- NFT ํ์ดํ ๋ผ์ธ - OpenSea API ์ค๋ฅ ๋์ฒ๋ฒ
- NFT ํ์ดํ ๋ผ์ธ - HttpOperator๋ก ๋ฐ์ดํฐ ๋ถ๋ฌ์ค๊ธฐ
- NFT ํ์ดํ ๋ผ์ธ - process
- NFT ํ์ดํ ๋ผ์ธ - store
- NFT ํ์ดํ ๋ผ์ธ - ํ ์คํฌ๊ฐ ์์กด์ฑ ๋ง๋ค๊ธฐ
- Backfill
- Airflow๋ก Spark ํ์ดํ๋ผ์ธ ๊ด๋ฆฌํ๊ธฐ - Airflow์ Spark ํ๊ฒฝ์ธํ ๋ฐ ์ฌ์ฉํ๊ธฐ
- ํ์๋น ์์ธก ํ์ดํ๋ผ์ธ ๋ง๋ค๊ธฐ
- 12. Kafka
- ์ ํต์ ์ธ ์ํคํ ์ณ
- ์ ํต์ ์ธ ์ํคํ ์ฒ์ ๋ฌธ์ ์
- Kafka ์๊ฐ 1
- Kafka ์๊ฐ 2
- Kafka๋ฅผ ์ด์ฉํ ์ํคํ ์ณ
- Kafka์ ์ฅ์ ๋ค
- Kafka ์ฌ์ฉ ์
- Kafka ๊ตฌ์ฑ
- Kafka๋ฅผ ์ด์ฉํ ์ํคํ ์ฒ - ์์ธ
- Kafka Topic
- Kafka Partition
- Kafka Message
- Kafka Offset
- Kafka Cluster
- Kafka Broker
- Kafka Producer & Consumer
- Kafka Consumer Group
- Rebalancing
- Zookeeper
- Key ์ ๋ฐ๋ฅธ Message ์ ์ก
- Replication Factor
- ํํฐ์ ๋ฆฌ๋
- Consumer Group & Partition & Producer
- Kafka python ์ค์น
- Kafka pyhton Consumer Producer ๊ฐ๋จ์์
- zookeeper, kafka, kafdrop ๋ฅผ docker-compose๋ก ์คํํ๊ธฐ
- kafka topic ์์ฑ
- CSV๋ฅผ ์คํธ๋ฆผ์ผ๋ก ๋ฐ๊ฟ์ฃผ๋ Producer
- ๋น์ ์ ๋ฐ์ดํฐ ํ์ง
- 13. Flink์ ์คํธ๋ฆฌ๋ฐ ํ๋ก์ธ์ฑ
- Apache Flink ๋
- Flink ์๊ฐ
- Stream Processing์ ์ธ์ ์ฐ์ผ๊น
- Batch Processing vs Stream Processing
- Flink์ ๊ธฐ๋ณธ์ ์ธ ์ฒ๋ฆฌ ๊ตฌ์กฐ
- Hadoop vs Spark vs Flink ํน์ง ๋น๊ต
- Hadoop vs Spark vs Flink ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฐฉ์ ๋น๊ต
- Hadoop vs Spark vs Flink ๊ฐ๋ฐ ํธ์์ฑ ๋น๊ต
- Spark vs Flink ๋น๊ต
- ๋ง์ดํฌ๋ก ๋ฐฐ์น vs Window
- Spark vs Flink ๊ฐ๋ฐ ๋น๊ต
- Flink์ ๋๋จํ ์
- Flink ๊ตฌ์ฑ
- Flink Storage Streaming
- Flink Deployment
- Flink ๋ด๋ถ ๊ตฌ์กฐ
- Flink์ Connectors
- Flink์ ์จ๋ํํฐ ํ๋ก์ ํธ
- Flink ํ๋ก๊ทธ๋จ์ ์ผ๋ฐ์ ์ธ ํ๋ก์ฐ
- State
- State Backend
- Keyed State
- State ์ ์ฅ
- ์ฒดํฌํฌ์ธํ
- Barriers
- Snapshotting
- ์ฒดํฌํฌ์ธํธ ์ ๋ ฌ
- Recovery
- Savepoints
- Exactly once vs At least once
- ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ์๊ฐ ๊ฐ๋ ์ด ๋ค์ด๊ฐ ๋
- Time์ ์ข ๋ฅ
- Processing Time
- Event Time
- Evemt Time๊ณผ Processing Time์ด ์ ๋ง์ ๋
- Watermark
- ๋ณ๋ ฌ ํ๊ฒฝ์์์ Watermark
- Flink์ ํด๋ฌ์คํฐ ๋งค๋์
- Flink์ ์ํคํ ์ฒ - Job Manager
- Flink์ ์ํคํ ์ฒ - Task Manager
- Flink์ ์ํคํ ์ฒ - Task Slots
- Pyflink ์ญ์ฌ
- Pyflink ๋
- Pyflink์ ํผํฌ๋จผ์ค ์ต์ ํ
- flink ์ค์น
- flink ํด๋ฌ์คํฐ ์คํ ๋ฐ ์ข ๋ฃ
- ETL
- E: ์ถ์ถ Extract
- T: ์ค๋ฏธ์นด์ ๋ง๊ฒ ๋ณํ Transform
- L: ๋๋น์ ์ ์ฅ Load
- ๋ฐ์ดํฐ๋ก ํ ์ ์๋ ์ผ์ด ๋ค์ํด์ง๊ณ ํํ๋ฅผ ์์ธกํ๊ธฐ ๋ถ๊ฐ๋ฅํด์ง๋ฉด์ ์คํค๋ง๋ฅผ ์ ์ํ๊ธฐ ํ๋ค์ด ์ก๋ค.
- ์ค์๊ฐ์ฑ์ ์๊ตฌํ๋ ๊ธฐ๋ฅ๋ค
- ๋นจ๋ผ์ง๋ ๊ธฐ๋ฅ ์ถ๊ฐ
- ์ค์๊ฐ ๋ก๊ทธ
- ๋น์ ํ ๋ฐ์ดํฐ
- ์๋ ํํฐ ๋ฐ์ดํฐ
- ์ต๋ํ ๋ง์ ๋ฐ์ดํฐ๋ฅผ ๋ฏธ๋ฆฌ ์ ์ฅํด๋๊ณ ๋ง์ ์์ ํ๋ก์ธ์ฑ์ ํ ์ ์๊ฒ ๋๋ค.
- ์ปดํจํ ํ์์ ๋ํ ๋น์ฉ ์ต์ ํ๋ณด๋ค ๋น์ฆ๋์ค์ ์๋๋ฅผ ์ต์ ํํ๋ ์ชฝ์ด ์ด๋์ด ํฌ๊ฒ ๋๋ค.
- ELT
- E: ๋ฐ์ดํฐ ์ถ์ถ Extract
- L: ์ผ๋จ ์ ์ฅ
- T: ์ฐ์์์ ๋ฐ๋ผ ๋ณํ
- ์
- ๋ฐ์ดํฐ์ ๋ก๊ทธ๋ฅผ Spark๋ FLink๋ฅผ ํตํด ์ด๋์ ๋ ์ ๋ฆฌ ํ ์ ์ฅ (E&L)
- ์ดํ๋ฆฌ์ผ์ด์ ํน์ ๋ถ์ ํด์์ ์ด์ฉ ๊ฐ๋ฅํ๋๋ก ๋ณํ (T)
- ์์คํ ์ ๋ณต์ก๋์ ๋ฐ๋ผ ๋ฐ์ดํฐ ์ถ์ถ๊ณผ ์ ์ฌ๋ฅผ ํ๋ฒ์ ํ๊ธฐ๋ ํ๋ค.
- ํด๋ผ์ฐ๋ ์จ์ดํ์ฐ์ค - Snowflake, Google Big Query
- Hadoop -> Databricks, Presto
- ์ค์๊ฐ ๋น ๋ฐ์ดํฐ ์ฒ๋ฆฌ (Stream Processing)
- ETL -> ELT
- Dataflow ์๋ํ (Airflow)
- ๋ฐ์ดํฐ ๋ถ์ ํ์ ๋๊ธฐ ๋ณด๋จ ๋๊ตฌ๋ ๋ถ์ํ ์ ์๋๋ก
- ์ค์ํ ๋๋ ๋ฐ์ดํฐ ํ๋ซํผ ๊ด๋ฆฌ (access control, data book)
- ์์ค: ๋น์ฆ๋์ค์ ์ด์ ๋ฐ์ดํฐ ์์ฑ
- ์์ง ๋ฐ ๋ณํ: ์ด์ ์์คํ ์์ ๋ฐ์ดํฐ ์ถ์ถ -> ์ถ์ถ๋ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๊ณ ์คํค๋ง ๊ด๋ฆฌ -> ๋ฐ์ดํฐ๋ฅผ ๋ถ์ํ ์ ์๋๋ก ๋ณํ
- ์ ์ฅ: ๋ฐ์ดํฐ๋ฅผ ์ฟผ๋ฆฌ์ ์ฒ๋ฆฌ ์์คํ ์ด ์ธ ์ ์๋๋ก ์ ์ฅ, ๋น์ฉ๊ณผ ํ์ฅ์ฑ๋ฉด์ผ๋ก ์ต์ ํ
- ๊ณผ๊ฑฐ&์์ธก: ๋ฐ์ดํฐ ๋ถ์์ ์ํ ์ธ์ฌ์ดํธ ๋ง๋ค๊ธฐ(Query), ์ ์ฅ๋ ๋ฐ์ดํฐ๋ฅผ ์ด์ฉํด ์ฟผ๋ฆฌ๋ฅผ ์คํํ๊ณ ํ์์ ๋ถ์ฐ์ฒ๋ฆฌ(Processing), ๊ณผ๊ฑฐ์ ๋ฌด์จ ์ผ์ด ์ผ์ด๋ฌ๋์ง ํน์ ๋ฏธ๋์ ๋ฌด์จ์ผ์ด ์ผ์ด๋ ์ง(ML)
- ์ถ๋ ฅ: ๋ฐ์ดํฐ ๋ถ์์ ๋ด๋ถ์ ์ธ๋ถ ์ ์ ์๊ฒ ์ ๊ณต, ๋ฐ์ดํฐ ๋ชจ๋ธ์ ์ด์ ์์คํ ์ ์ ์ฉ
- Sources, Storage, Query: ์๋น์ค ๋ ๋ฒจ ๋ณด๋ค๋ ๋ก์ฐ ๋ ๋ฒจ ๋ฌธ์ ๋ค์ ํธ๋ ๋ถ์ผ
- Ingestion & Transformation, Processing: ์ผ๋ฐ์ ์ธ ์์ง๋์ด๋ง์ด ์ง์คํ๋ ๋ถ์ผ
- ๋ฐฐ์น(Batch) == ์ผ๊ด
- ๋ฐฐ์น ํ๋ก์ธ์ฑ(Batch Processing) == ์ผ๊ด์ฒ๋ฆฌ
- ๋ง์ ์์ ๋ฐ์ดํฐ๋ฅผ ์ ํด์ง ์๊ฐ์ ํ๊บผ๋ฒ์ ์ฒ๋ฆฌํ๋ ๊ฒ
- ํ์ ๋ ๋๋์ ๋ฐ์ดํฐ
- ํน์ ์๊ฐ
- ์ผ๊ด ์ฒ๋ฆฌ
- ์ ํต์ ์ผ๋ก ์ฐ์ด๋ ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฐฉ๋ฒ
- ์ค์๊ฐ์ฑ์ ๋ณด์ฅํ์ง ์์๋ ๋ ๋
- ๋ฐ์ดํฐ๋ฅผ ํ๊บผ๋ฒ์ ์ฒ๋ฆฌํ ์ ์์ ๋
- ๋ฌด๊ฑฐ์ด ์ฒ๋ฆฌ๋ฅผ ํ ๋ ex) ML ํ์ต
- ์์
- ๋งค์ผ ๋ค์ 14์ผ์ ์์์ ๊ณต๊ธ์ ์์ธก
- ๋งค์ฃผ ์ฌ์ดํธ์์ ๊ด์ฌ์ ๋ณด์ธ ์ ์ ๋ค์๊ฒ ๋ง์ผํ ์ด๋ฉ์ผ ์ ์ก
- ๋งค์ฃผ ๋ฐํํ๋ ๋ด์ค๋ ํฐ
- ๋งค์ฃผ ์๋ก์ด ๋ฐ์ดํฐ๋ก ๋จธ์ ๋ฌ๋ ์๊ณ ๋ฆฌ์ฆ ํ์ต
- ๋งค์ผ ์์นจ ์น ์คํฌ๋ํ/ํฌ๋กค๋ง
- ๋งค๋ฌ ์๊ธ ์ง๊ธ
- ์ค์๊ฐ์ผ๋ก ์์์ง๋ ๋ฐ์ดํฐ๋ฅผ ๊ณ์ ์ฒ๋ฆฌํ๋ ๊ฒ
- ์ด๋ฒคํธ๊ฐ ์๊ธธ ๋ ๋ง๋ค, ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ฌ ๋ ๋ง๋ค ์ฒ๋ฆฌ
- ๋ทธ๊ท์น์ ์ผ๋ก ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ค๋ ํ๊ฒฝ์ผ ๋
- ์ฌ๋ฌ๊ฐ์ ์ด๋ฒคํธ๊ฐ ํ๊บผ๋ฒ์ ๋ค์ด์ฌ ๋
- ์ค๋ ์๊ฐ ๋์ ์ด๋ฒคํธ๊ฐ ํ๋๋ ๋ค์ด์ค์ง ์์ ๋
๋ถ๊ท์น์ ์ผ๋ก ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ฌ ๋๋ฅผ ๊ฐ์
- ๋ฐฐ์น ํ๋ก์ธ์ฑ
- ๋ฐฐ์น๋น ์ฒ๋ฆฌํ๋ ๋ฐ์ดํฐ ์๊ฐ ๋ฌ๋ผ์ง๋ฉด์ ๋ฆฌ์์ค๋ฅผ ๋นํจ์จ์ ์ผ๋ก ์ฌ์ฉํ๊ฒ ๋๋ค.
- ์คํธ๋ฆผ ํ๋ก์ธ์ฑ
- ๋ฐ์ดํฐ๊ฐ ์์ฑ๋์ด ์์ฒญ์ด ๋ค์ด๋ก ๋ ๋ง๋ค ์ฒ๋ฆฌํ ์ ์๋ค.
- ์ค์๊ฐ์ฑ์ ๋ณด์ฅํด์ผ ๋ ๋
- ๋ฐ์ดํฐ๊ฐ ์ฌ๋ฌ ์์ค๋ก๋ถํฐ ๋ค์ด์ฌ ๋
- ๋ฐ์ดํฐ๊ฐ ๊ฐ๋ ๋ค์ด์ค๊ฑฐ๋ ์ง์์ ์ผ๋ก ๋ค์ด์ฌ ๋
- ๊ฐ๋ฒผ์ด ์ฒ๋ฆฌ๋ฅผ ํ ๋ (Rule-based)
- ์์
- ์ฌ๊ธฐ ๊ฑฐ๋ ํ์ง (Fraud Detection)
- ์ด์ ํ์ง (Anomaly Detection)
- ์ค์๊ฐ ์๋ฆผ
- ๋น์ฆ๋์ค ๋ชจ๋ํฐ๋ง
- ์ค์๊ฐ ์์/๊ณต๊ธ ์ธก์ ๋ฐ ๊ฐ๊ฒฉ ์ฑ ์
- ์ค์๊ฐ ๊ธฐ๋ฅ์ด ๋ค์ด๊ฐ๋ ์ ํ๋ฆฌ์ผ์ด์
- ์ผ๋ฐ์ ์ธ ๋ฐฐ์น ํ๋ก์ฐ
- ๋ฐ์ดํฐ๋ฅผ ๋ชจ์์
- ๋ฐ์ดํฐ๋ฒ ์ด์ค์์ ์ฝ์ด์ ์ฒ๋ฆฌ
- ๋ค์ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ ๋ด๊ธฐ
- ์ผ๋ฐ์ ์ธ ์คํธ๋ฆผ ์ฒ๋ฆฌ ํ๋ก์ฐ
- ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ฌ ๋ ๋ง๋ค(ingest)
- ์ฟผ๋ฆฌ/์ฒ๋ฆฌ ํ State๋ฅผ ์ ๋ฐ์ดํธ
- DB์ ๋ด๊ธฐ
- ๋ฐ์ดํฐ๋ฅผ ์กฐ๊ธ์ฉ ๋ชจ์์ ํ๋ก์ธ์ฑํ๋ ๋ฐฉ์
- Batch ํ๋ก์ธ์ฑ์ ์๊ฒ ์ชผ๊ฐ์ ์คํธ๋ฆฌ๋ฐ์ ํ๋ด๋ด๋ ๋ฐฉ์
- ํ ์คํฌ ์ค์ผ์ค๋ง
- ๋ถ์ฐ ์คํ
- ํ ์คํฌ๊ฐ ์์กด์ฑ ๊ด๋ฆฌ
- ์๋น์ค๊ฐ ์ปค์ง๋ฉด์ ๋ฐ์ดํฐ ํ๋ซํผ์ ๋ณต์ก๋๊ฐ ์ปค์ง
- ๋ฐ์ดํฐ๊ฐ ์ฌ์ฉ์์ ์ง์ ์ฐ๊ด๋๋ ๊ฒฝ์ฐ๊ฐ ๋์ด๋จ (์ํฌํ๋ก์ฐ๊ฐ ๋ง๊ฐ์ง๋ฉด ์๋น์ค๋ ๋ง๊ฐ์ง)
- ํ ์คํฌ ํ๋ํ๋๊ฐ ์ค์ํด์ง
- ํ ์คํฌ๊ฐ ์์กด์ฑ๋ ์๊น
- Apache Airflow
- python
- ์ฃผํผํฐ ๋ ธํธ๋ถ
- java
- spark
- pyspark
- https://www.anaconda.com/products/distribution
- ์๋์ฝ๋ค ์ค์นํ๋ฉด python๊ณผ python์ ๊ธฐ๋ณธ ํจํค์ง๋ค์ ์๋์ผ๋ก ์ค์น๋๊ณ , python๊ณผ ์ฃผํผํฐ ๋ ธํธ๋ถ์ ๋์์ ์ฝ๊ฒ ์ค์น๊ฐ ๊ฐ๋ฅํ๋ค.
brew install --cask adoptopenjdk8
brew install scala
brew install apache-spark
pip --version # ๊ฒฝ๋ก๊ฐ anaconda ์ธ๊ฒ์ ํ์ธ
pip install pyspark
pyspark # spark ํฐ๋ฏธ๋์ด ๋จ๋์ง ํ์ธ
git clone https://github.com/keon/data-engineering.git
- https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
- 2020 March - High Volume For-Hire Vehicle Trip Records ๋ค์ด๋ก๋
- clone ํ ๋ฆฌํฌ์ ๋ฃ๊ธฐ
- data-engineering/01-spark/data/fhvhv_tripdata_2020-03.parquet
| ํ๋ ์ด๋ฆ | ์ค๋ช |
|---|---|
| hvfhs_license_num | ํ์ฌ ๋ฉดํ ๋ฒํธ |
| dispatching_base_num | ์ง์ญ ๋ผ์ด์ผ์ค ๋ฒํธ |
| pickup_datetime | ์น์ฐจ ์๊ฐ |
| dropoff_datetime | ํ์ฐจ ์๊ฐ |
| PULocationID | ์น์ฐจ ์ง์ญ ID |
| DOLocationID | ํ์ฐจ ์ง์ญ ID |
| SR_Flag | ํฉ์น ์ฌ๋ถ Flag |
- HDFS ํ์ผ ์์คํ
- Yarn ๋ฆฌ์์ค ๊ด๋ฆฌ
- Map Reduce ์ฐ์ฐ ์์ง -> Spark๊ฐ ์ด๊ฒ์ ๋์ฒดํ๋ค.
- ๋น ๋ฅด๋ค = ๋น ๋ฐ์ดํฐ์ In-Memory ์ฐ์ฐ
- ๋ ธ๋๋ ํ์์ ๋ฐ๋ผ ๊ณ์ ๋๋ฆด ์ ์๋ค.
- ์ํ์ ํ์ฅ์ด ๊ฐ๋ฅํ๋ค.
- Hadoop MapReduce ๋ณด๋ค ๋น ๋ฅด๋ค
- ๋ฉ๋ชจ๋ฆฌ ์์์ 100๋ฐฐ
- ๋์คํฌ ์์์ 10๋ฐฐ
- Lazy Evaluation
- ํ์คํฌ๋ฅผ ์ ์ํ ๋๋ ์ฐ์ฐ์ ํ์ง ์๋ค๊ฐ ๊ฒฐ๊ณผ๊ฐ ํ์ํ ๋ ์ฐ์ฐํ๋ค.
- ๊ธฐ๋ค๋ฆฌ๋ฉด์ ์ฐ์ฐ ๊ณผ์ ์ ์ต์ ํ ํ ์ ์๋ค.
- Driver Program, Cluster Manager, Worker Node ๋ก ์ด๋ฃจ์ด์ ธ ์๋ค.
- Driver Program: ์ฐ๋ฆฌ๊ฐ ์ฌ์ฉํ๋ ์ปดํจํฐ, python | java | scala ์ ๊ฐ์ script๋ก task์ ์ ์ํ๋ค.
- Cluster Manager: ์ ์๋ task ์ฆ ์ผ๊ฑฐ๋ฆฌ๋ฅผ ๋ถ๋ฐฐ ํ๋ค.
- hadoop์์๋ yarn cluster manager์ ์ฌ์ฉํ ์ ์๋ค.
- aws์์๋ elastic mapreduce manager์ ์ฌ์ฉํ ์ ์๋ค.
- Worker Node
- 1CPU์ฝ์ด ๋น 1Node ๋ฐฐ์น
- ์ธ๋ฉ๋ชจ๋ฆฌ ์ฐ์ฐ์ ์งํํ๋ค.
- spark๋ ํ์ฅ์ฑ์ ๊ณ ๋ คํด์ ์ค๊ณ ํ๊ธฐ ๋๋ฌธ
- Resilient Distributed Dataset (RDD)
- ์ฌ๋ฌ ๋ถ์ฐ ๋ ธ๋์ ๊ฑธ์ณ์ ์ ์ฅ
- ๋ณ๊ฒฝ์ด ๋ถ๊ฐ๋ฅ
- ์ฌ๋ฌ๊ฐ์ ํํฐ์ ์ผ๋ก ๋ถ๋ฆฌ
| Pandas | Spark |
|---|---|
| 1๊ฐ์ ๋ ธ๋ | ์ฌ๋ฌ๊ฐ์ ๋ ธ๋ |
| Eager Execution - ์ฝ๋๊ฐ ๋ฐ๋ก ์คํ | Lazy Execution - ์คํ์ด ํ์ํ ๋ ๊น์ง ๊ธฐ๋ค๋ฆผ |
| ์ปดํจํฐ ํ๋์จ์ด์ ์ ํ์ ๋ฐ์ | ์ํ์ ํ์ฅ์ด ๊ฐ๋ฅ |
| In-Memory ์ฐ์ฐ | In-Memory ์ฐ์ฐ |
| Mutable Data | Immutable Data |
- Spark 1.0
- 2014 ๋ ์ ์ ๋ฐํ
- RDD๋ฅผ ์ด์ฉํ ์ธ๋ฉ๋ชจ๋ฆฌ ์ฒ๋ฆฌ ๋ฐฉ์
- DataFrame (V1.3)
- Project Tungsten - ์์ง ์ ๊ทธ๋ ์ด๋๋ก ๋ฉ๋ชจ๋ฆฌ์ CPU ํจ์จ ํ์ ํ
- Spark 2.0
- 2016 ๋ ๋ฐํ
- ๋จ์ํ ๋๊ณ ์ฑ๋ฅ ๊ฐ์
- Structed Streaming
- Dataset ์ด๋ผ๋ DataFrame์ ํ์ฅํ ์๋ฃ๊ตฌ์กฐ ๋ฑ์ฅ
- Catalyst Optimizer ํ๋ก์ ํธ - ์ธ์ด์ ์๊ด์์ด ๋์ผํ ์ฑ๋ฅ์ ๋ณด์ฅ - Scala, Java, Python, R
- Spark 3.0
- 2020 ๋ ๋ฐํ
- Mlib ๊ธฐ๋ฅ ์ถ๊ฐ
- Spark SQL ๊ธฐ๋ฅ ์ถ๊ฐ
- Spark 2.4๋ณด๋ค ์ฝ 2๋ฐฐ ๋นจ๋ผ์ง - Adaptive execution, Dynamic partition pruning
- PySpark ์ฌ์ฉ์ฑ ๊ฐ์
- ๋ฅ๋ฌ๋ ์ง์ ๊ฐํ - GPU๋ ธ๋ ์ง์, ๋จธ์ ๋ฌ๋ ํ๋ ์์ํฌ์ ์ฐ๊ณ ๊ฐ๋ฅ
- GraphX - ๋ถ์ฐ ๊ทธ๋ํ ์ฐ์ฐ
- Python2 ์ง์์ด ๋๊น
- ์ฟ ๋ฒ๋คํฐ์ค ์ง์ ๊ฐํ
- ์ ๊ธฐ๋ฅ์ด ์ถ๊ฐ๋๊ณ ์ฑ๋ฅ์ด ์ข์์ง๊ณ ์์ง๋ง, ๊ทผ๋ณธ์ ๋ฐ๋์ง ์๋๋ค.
- Spark Core
- Spark SQL
- Spark Streaming
- MLlib
- GraphX
lines = sc.textFile("") # lines == RDD
- Resilient Distributed Dataset
- ๋ฐ์ดํฐ๋ ํด๋ฌ์คํฐ์ ํฉ์ด์ ธ์์ง๋ง ํ๋์ ํ์ผ์ธ๊ฒ ์ฒ๋ผ ์ฌ์ฉ ๊ฐ๋ฅ
- ํ๋ ฅ์ ์ด๊ณ ๋ถ๋ณํ๋ ์ฑ์ง์ด ์๋ค (Resilient & Immutable)
- ๋ฐ์ดํฐ๊ฐ ์ฌ๋ฌ๊ตฐ๋ฐ์ ์ฐ์ฐ์ ํ๋ค๊ฐ ์ฌ๋ฌ ๋ ธ๋ ์ค ํ๋๊ฐ ๋ง๊ฐ์ง๋ค๋ฉด? (๋คํธ์ํฌ ์ฅ์ | ํ๋์จ์ด / ๋ฉ๋ชจ๋ฆฌ ๋ฌธ์ | ์์์๋ ๊ฐ๊ฐ์ง ์ด์ ๋๋ฌธ์)
- ๋ฐ์ดํฐ๊ฐ ๋ถ๋ณ(Imuutable) ํ๋ฉด ๋ฌธ์ ๊ฐ ์ผ์ด๋ ๋ ๋ณต์์ด ๊ฐ๋ฅํด์ง๋ค.
- RDD1์ด ๋ณํ์ ๊ฑฐ์น๋ฉด, RDD1์ด ๋ฐ๋๋๊ฒ ์๋๋ผ ์๋ก์ด RDD2๊ฐ ๋ง๋ค์ด์ง๋ค. (Imuutable)
- ๋ณํ์ ๊ฑฐ์น ๋ ๋ง๋ค ์ฐ์ฐ์ ๊ธฐ๋ก์ด ๋จ๋๋ค.
- RDD์ ๋ณํ ๊ณผ์ ์ ํ๋์ ๋น์ํ ๊ทธ๋ํ(Acyclic Graph)๋ก ๊ทธ๋ฆด ์ ์๋๋ฐ, ์ด ํน์ง ๋๋ถ์ ๋ฌธ์ ๊ฐ ์๊ธธ ๊ฒฝ์ฐ์ ์ฝ๊ฒ ์ RDD๋ก ๋์๊ฐ ์ ์๋ค.
- Node 1์ด ์ฐ์ฐ ์ค ๋ฌธ์ ๊ฐ ์๊ธฐ๋ฉด ๋ค์ ๋ณต์ ํ Node2 ์์ ์ฐ์ฐํ๋ฉด ๋๋ค. (Resillient)
- ์ปดํ์ผ์ Type์ ํ๋ณํ ์ ์์ด ๋ฌธ์ ๋ฅผ ์ผ์ฐ ๋ฐ๊ฒฌํ ์ ์๋ค.
- Structured / Unstructured ๋๋ค ๋ด์ ์ ์๋ค.
- Unstructured Data - ๋ก๊ทธ or ์์ฐ์ด
- Structured Data - RDB or DataFrame
- ๊ฒฐ๊ณผ๊ฐ ํ์ํ ๋ ๊น์ง ์ฐ์ฐ์ ํ์ง ์๋๋ค
- ๋๊ฐ์ง ์ฐ์ฐ์ด ์๋๋ฐ, T = ๋ณํ A = ์ก์ , ์) RDD1 -> T -> RDD2 -> T -> RDD3 -> A --> RDD4
- ์ก์ ์ ํ ๋ ๊น์ง ๋ณํ์ ์คํ๋์ง ์๋๋ค.
- Action์ ๋ง๋๋ฉด ๊ทธ๋ ๋ณํ(T) ์ฐ์ฐ์ ์งํํ๋ค.
- Spark Operation = Transform + Action
- ์ ์ฐํ๋ค
- ์งง์ ์ฝ๋๋ก ํ ์ ์๋๊ฒ ๋ง๋ค
- ๊ฐ๋ฐํ ๋ ๋ฌด์๋ณด๋ค๋ ์ด๋ป๊ฒ์ ๋ํด ๋ ์๊ฐํ๊ฒ ํ๋ค (how-to)
- ๊ฒ์ผ๋ฅธ ์ฐ์ฐ ๋๋ถ์ ๋ฐ์ดํฐ๊ฐ ์ด๋ป๊ฒ ๋ณํ๋ ์ง ์๊ฐํ๊ฒ ๋๋ค
- ๋ฐ์ดํฐ๊ฐ ์ง๋๊ฐ ๊ธธ์ ๋ฆ๋ ๋๋
spark-submit count_trips.py # ํธ๋ฆฝ ์ ์ธ๊ธฐ
python3 visualiza_trips_date.py # ์ฐจํธ๋ก ๊ทธ๋ฆฌ๊ธฐ
jupyter notebook . # ์ฃผํผํฐ๋ก count_trips.ipynb ์ด๊ธฐ -> ์ฝ๋์ ๋ํ ์์ธํ ์ค๋ช
๋ค
RDD.map(<task>)
- ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ๊ฐ๋ก ์ชผ๊ฐ๊ณ
- ์ฌ๋ฌ ์ฐ๋ ๋์์ ๊ฐ์ task๋ฅผ ์ ์ฉ
- ๊ฐ์ ๋ง๋ ๊ฒฐ๊ณผ๊ฐ์ ํฉ์น๋ ๊ณผ์
- ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ๊ฐ๋ก ์ชผ๊ฐ์ ์ฌ๋ฌ ๋ ธ๋๋ก ๋ณด๋ธ๋ค.
- ์ฌ๋ฌ ๋ ธ๋์์ ๊ฐ์ ๋ ๋ฆฝ์ ์ผ๋ก task๋ฅผ ์ ์ฉ
- ๊ฐ์ ๋ง๋ ๊ฒฐ๊ณผ๊ฐ์ ํฉ์น๋ ๊ณผ์
- ๋ ธ๋๊ฐ ํต์ ๊ฐ์ด ์ ๊ฒฝ์จ์ผ๋ ๊ฒ์ด ๋์ด๋๋ค
- Spark๋ฅผ ์ด์ฉํ๋ฉด ๋ถ์ฐ๋ ํ๊ฒฝ์์๋ ์ผ๋ฐ์ ์ธ ๋ณ๋ ฌ์ฒ๋ฆฌ๋ฅผ ํ๋ฏ์ด ์ฝ๋๋ฅผ ์ง๋๊ฒ ๊ฐ๋ฅํ๋ค.
- Spark๋ ๋ถ์ฐ๋ ํ๊ฒฝ์์ ๋ฐ์ดํฐ ๋ณ๋ ฌ ๋ชจ๋ธ์ ๊ตฌํํด์ ์ถ์ํ ์์ผ์ฃผ๊ธฐ ๋๋ฌธ์ ๊ฐ๋ฅํ๊ฒ์ด๋ค. (RDD)
- ๊ทธ๋ ๋ค๊ณ ์๊ฐ ์์ด spark ์ฝ๋ฉ์ ํ๋ฉด ์ฑ๋ฅ์ ๋์ด๋ด๊ธฐ๋ ํ๋ค๋ค. (๋ ธ๋๊ฐ ํต์ ์๋๋ฅผ ์ ๊ฒฝ์จ์ผ ํจ)
- ๋ถ์ฐ์ฒ๋ฆฌ๋ก ๋์ด๊ฐ๋ฉด์ ์ ๊ฒฝ์จ์ผ๋ ๋ฌธ์ ๊ฐ ๋ง์์ก๋ค.
- ๋ถ๋ถ ์คํจ - ๋ ธ๋ ๋ช๊ฐ๊ฐ ํ๋ก๊ทธ๋จ๊ณผ ์๊ด ์๋ ์ด์ ๋ก ์ธํด ์คํจ
- ์๋ - ๋ง์ ๋คํธ์ํฌ ํต์ ์ ํ์๋ก ํ๋ ์์ ์ ์๋๊ฐ ์ ํ
RDD.map(A).filter(B).reduceByKey(C).take(100) # 1
RDD.map(A).reduceByKey(C).filter(B).take(100) # 2
"""
1๋ฒ์ด ๋ ์ข์ ์ฑ๋ฅ์ ์ฝ๋์ด๋ค.
reduceByKey๋ ์ฌ๋ฌ๋
ธ๋์์ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ธฐ ๋๋ฌธ์ ํต์ ์ ํ์๋ก ํ๋๋ฐ,
filter๋ฅผ ํตํด์ ๋ฐ์ดํฐ์์ ์ค์ด๊ณ ์ฒ๋ฆฌํ๋๊ฒ์ด ํจ์จ์ ์ด๊ธฐ ๋๋ฌธ
๋ฉ๋ชจ๋ฆฌ > ๋์คํฌ > ๋คํธ์ํฌ ์์ผ๋ก ๋น ๋ฅด๊ธฐ๋๋ฌธ์ ๋ฉ๋ชจ๋ฆฌ์์ ์ต๋ํ ๋ง์ด ์ฒ๋ฆฌํ๋ ๊ฒ์ด ์ข๋ค.
๋คํธ์ํฌ๋ ๋ฉ๋ชจ๋ฆฌ ์ฐ์ฐ์ ๋นํด 100๋ง๋ฐฐ ์ ๋ ๋๋ฆฌ๋ค
"""- Structured Data๋ฅผ Spark์ ์ฐ๊ณํด์ ์ธ์ ์๊ฒ ํด์ฃผ๋ ๋๊ตฌ ์ค ํ๋์ด๋ค.
- Key์ Value ์์ ๊ฐ๋ Key-Value RDD
- (Key, Value) ์์ ๊ฐ๊ธฐ ๋๋ฌธ์ Pairs RDD๋ผ๋๊ณ ๋ถ๋ฆผ
- ๊ฐ๋จํ ๋ฐ์ดํฐ๋ฒ ์ด์ค์ฒ๋ผ ๋ค๋ฃฐ ์ ์๋ค.
- Single-Value RDD: ํ ์คํธ์ ๋ฑ์ฅํ๋ ๋จ์ด ์ ์ธ๊ธฐ (๋ ์ง) -> 1์ฐจ์ ์ ์ธ ์ฐ์ฐ
- Key-Value RDD: ๋ทํ๋ฆญ์ค ๋๋ผ๋ง๊ฐ ๋ฐ์ ํ๊ท ๋ณ์ (๋ ์ง, ์น๊ฐ์) -> ๊ณ ์ฐจ์ ์ ์ธ ์ฐ์ฐ
- Key์ Value ์์ ๊ฐ์ง๋ค
- ์) ์ง์ญ ID ๋ณ๋ก ํ์ ์ดํ์๋ ์ด๋ป๊ฒ ๋ ๊น?
- Key: ์ง์ญ ID
- Value: ์ดํ ์
- ๋ค๋ฅธ์) ๋๋ผ๋ง ๋ณ๋ก ๋ณ์ ์ ๋ชจ์๋ณด๊ธฐ, ํ๊ท ๊ตฌํ๊ธฐ
- ๋ค๋ฅธ์) ์ด์ปค๋จธ์ค ์ฌ์ดํธ์์ ์ํ๋น ๋ณ ํ์ ๊ตฌํ๊ธฐ
- ์) ์ง์ญ ID ๋ณ๋ก ํ์ ์ดํ์๋ ์ด๋ป๊ฒ ๋ ๊น?
- ์ฝ๋์์ผ๋ก๋ ๋ง์ด ๋ค๋ฅด์ง ์๋ค
pairs = rdd.map(lambda x: (x,1))
"""
[
์ง์ญ
์ง์ญ
]
[
(์ง์ญ, 1)
(์ง์ญ, 1)
]
"""- reduceByKey() - ํค ๊ฐ์ ๊ธฐ์ค์ผ๋ก ํ ์คํฌ ์ฒ๋ฆฌ
- groupByKey() - ํค ๊ฐ์ ๊ธฐ์ค์ผ๋ก ๋ฒจ๋ฅ๋ฅผ ๋ฌถ๋๋ค
- sortByKey() - ํค ๊ฐ์ ๊ธฐ์ค์ผ๋ก ์ ๋ ฌ
- keys() - ํค ๊ฐ ์ถ์ถ
- values() - ๋ฒจ๋ฅ๊ฐ ์ถ์ถ
pairs = rdd.map(lambda x: (x,1))
count = pairs.reduceByKey(lambda a, b,: a+b)
"""
์ง์ฅ๋ฉด
์ง์ฅ๋ฉด
์งฌ๋ฝ
์งฌ๋ฝ
(์ง์ฅ๋ฉด, 1)
(์ง์ฅ๋ฉด, 1)
(์งฌ๋ฝ, 1)
(์งฌ๋ฝ, 1)
(์ง์ฅ๋ฉด, 2)
(์งฌ๋ฝ, 2)
"""- join
- rightOuterJoin
- leftOuterJoin
- subtractByKey
- key๋ฅผ ๋ฐ๊พธ์ง ์๋๊ฒฝ์ฐ map()๋์ value๋ง ๋ค๋ฃจ๋ mapValues() ํจ์๋ฅผ ์ฐ๋๊ฒ ์ข๋ค
- spark ๋ด๋ถ์์ ํํฐ์ ์ ์ ์งํ ์ ์์ด์ ๋์ฑ ํจ์จ์ ์ด๋ค.
- mapValues(), flatMapValues() ๋๊ฐ๋ค Value๋ง ๋ค๋ฃจ๋ ์ฐ์ฐ๋ค์ด๊ณ RDD์์ key๋ ์ ์ง๋จ
1-spark/category-review-average.ipynb
- Transformation
- ๊ฒฐ๊ณผ๊ฐ์ผ๋ก ์๋ก์ด RDD๋ฅผ ๋ฐํ
- ์ง์ฐ ์คํ - Lazy Execution
- map()
- flatMap()
- filter()
- distinct()
- reduceByKey()
- groupByKey()
- mapValues()
- flatMapValues()
- sortByKey()
- Actions
- ๊ฒฐ๊ณผ๊ฐ์ ์ฐ์ฐํ์ฌ ์ถ๋ ฅํ๊ฑฐ๋ ์ ์ฅ (python object ๋ฐํ )
- ์ฆ์ ์คํ - Eager Execution
- collect()
- count()
- countByValues()
- take()
- top()
- reduce()
- fold()
- foreach()
1-spark/rdd_transformations_actions.ipynb
- transformations = Narrow + Wide
- 1:1 ๋ณํ
- filter(), map(), flatMap(), sample(), union()
- 1์ด์ ์กฐ์ํ๊ธฐ ์ํด ๋ค๋ฅธ ์ด/ํํฐ์ ์ ๋ฐ์ดํฐ๋ฅผ ์ธ ํ์๊ฐ ์์.
- Shuffling
- Intersection and join, distinct, cartesian, reduceByKey(), groupByKey()
- ์์ํ RDD์ ํํฐ์ ์ ๋ค๋ฅธ ํํฐ์ ์ ๋ฐ์ดํฐ๊ฐ ๋ค์ด๊ฐ ์ ์์
- ์ฑ๋ฅ์ ๋ง์ ๋ฆฌ์์ค๋ฅผ ์๊ตฌํ๊ฒ ๋๊ณ , ์ต์ํํ๊ณ ์ต์ ํ๊ฐ ํ์ํ๋ค.
- ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์ต๋ํ ํ์ฉํ ์ ์๋ค. (๋์คํฌ, ๋คํธ์ํฌ ์ฐ์ฐ์ ์ต์ํ ํ ์ ์๋ค.)
- ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃจ๋ task๋ ๋ฐ๋ณต๋๋ ๊ฒฝ์ฐ๊ฐ ๋ง์์(ex ๋จธ์ ๋ฌ๋ ํ์ต), Lazy๋ก ์ฒ๋ฆฌํ๋ฉด ๋นํจ์จ์ ์ธ๋ถ๋ถ์ ํจ์จ์ ์ผ๋ก ์ฒ๋ฆฌํ ์ ์๋ค.
- Task -> Disk -> Task -> Disk ๋ก ์์ ์ ํ๋ฉด Disk์ ์์ฃผ๋ค๋ฅด๊ฒ ๋์ด์ ๋นํจ์จ์ ์ด๋ค.
- Task -> Task ๋ก ๋์ด๊ฐ ๋ in-memory๋ก ์ฃผ๊ณ ๋ฐ์ผ๋ฉด ํจ์จ์ ์ด๋ค.
- in-memory๋ก ์ฃผ๊ณ ๋ฐ์ผ๋ ค๋ฉด ์ด๋ค ๋ฐ์ดํฐ๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ๋จ๊ฒจ์ผ ํ ์ง ์์์ผ ๊ฐ๋ฅํ๋ค.
- Transformations๋ ์ง์ฐ ์คํ ๋๊ธฐ ๋๋ฌธ์ ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅํด๋ ์ ์๋ค.
- MEMORY_ONLY: ๋ฉ๋ชจ๋ฆฌ์๋ง ์ ์ฅ
- MEMORY_AND_DISK: ๋ฉ๋ชจ๋ฆฌ์ ๋์คํฌ ๋ชจ๋ ์ ์ฅ, ๋ฉ๋ชจ๋ฆฌ์ ์์๊ฒฝ์ฐ ๋์คํฌ๊น์ง ๋ณด๊ฒ ๋ค.
- MEMORY_ONLY_SER: ๋ฉ๋ชจ๋ฆฌ๋ฅผ ์๋ผ๊ธฐ ์ํด์ serialize (๊บผ๋ด์ฌ ๋ deserialize ๊ณผ์ ์ด ์ถ๊ฐ๋จ)
- MEMORY_AND_DISK_SER: ๋ฉ๋ชจ๋ฆฌ์ ๋์คํฌ์ serialize
- DISK_ONLY: ๋์คํฌ์๋ง
- ๋ฐ์ดํฐ๋ฅผ ๋ฉ๋ชจ๋ฆฌ์ ๋จ๊ฒจ๋๊ณ ์ถ์ ๋ ์ฌ์ฉํ ์ ์๋ ํจ์
categoryReviews = filtered_lines.map(parse)
result1 = categoryReviews.take(10)
result2 = categoryReviews.mapValues(lambda x: (x,1)).collect()
# categoryReviews๋ result1๊ณผ result2๋ฅผ ๋ง๋ค๋ฉด์ 2๋ฒ ๋ง๋ค์ด์ง.
# .persist()๋ฅผ ์ถ๊ฐํ๋ฉด ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅํด๋๊ณ ์ธ ์ ์์
# categoryReviews = filtered_lines.map(parse).cache()
- Cache
- ๋ํดํธ Storage Level ์ฌ์ฉ
- RDD: MEMORY_ONLY
- DF: MEMORY_AND_DISK
- Persist
- Storage Level์ ์ฌ์ฉ์๊ฐ ์ํ๋๋๋ก ์ง์ ๊ฐ๋ฅ
- spark๋ Master Worker Topology๋ก ๊ตฌ์ฑ ๋์ด ์๋ค.
- ์คํํฌ๋ฅผ ์ฐ๋ฉด์ ์์ง ๋ง์์ผ ํ ์
- ํญ์ ๋ฐ์ดํฐ๊ฐ ์ฌ๋ฌ ๊ณณ์ ๋ถ์ฐ๋์ด ์๋ค๋ ๊ฒ
- ๊ฐ์ ์ฐ์ฐ์ด์ด๋ ์ฌ๋ฌ ๋ ธ๋์ ๊ฑธ์ณ์ ์คํ ๋๋ค๋ ์
- Driver Program์ด Spark Context๋ฅผ ์์ฑํด์ ์ดํ๋ฆฌ์ผ์ด์ ์ ๋ง๋ ๋ค.
- Spark Context๊ฐ Cluster Manager์ ์ฐ๊ฒฐ์ ํ๋ค.
- Cluster Manager๊ฐ ์์๋ค์ ํ ๋นํ๋ค.
- Cluster Manager๊ฐ ํด๋ฌ์คํฐ์ ์๋ ๋ ธ๋๋ค์ Executor๋ฅผ ์์งํ๋ค.
- Executor๋ค์ ์ฐ์ฐ์ ์ํํ๊ณ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ค.
- Spark Context๊ฐ Executor ๋ค์๊ฒ ์คํํ Task๋ฅผ ์ ์กํ๋ค์์
- ์คํ๋ Task๋ค์ด ๊ฒฐ๊ณผ๊ฐ๋ค์ ๋ด๋ฑ๋๋ฐ, ์ด๊ฒ์ Driver Program์ ๋ณด๋ด๊ฒ ๋๋ค.
RDD.foreach(lambda x: print(x))
"""
Driver Program์์ ์ ์ฝ๋๋ฅผ ์คํํ๋ฉด ์คํ๊ฒฐ๊ณผ๊ฐ ์๋ฌด๊ฒ๋ ๋์ค์ง ์๋๋ค.
์๋ํ๋ฉด foreach๊ฐ ์ก์
์ด๊ธฐ ๋๋ฌธ์, Driver๊ฐ ์๋ Executor์์ ๋ฐ๋ก ์คํ ๋๊ธฐ ๋๋ฌธ์ด๋ค.
"""foods = sc.parallelize(["์ง์ฅ๋ฉด","๋ง๋ผํ", ...])
three = foods.take(3)
"""
three ๊ฒฐ๊ณผ๊ฐ์ Driver Program์ ์ ์ฅ ๋๋ค.
์ผ๋ฐ์ ์ผ๋ก ์ก์
์ Driver Pgogram์ด Worker Node๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๋ฐ๋ ๊ฒ ๊น์ง ํฌํจ ํ๋ค.
๊ฒฐ๊ตญ, Executor์๊ฒ take ์ฐ์ฐ์ ์ํํ๋ผ๊ณ ๋ช
๋ นํ๊ณ , ๊ทธ๊ฒฐ๊ณผ๋ฅผ driver node์๊ฒ ๋๋ ค๋ฌ๋ผ๊ณ ์์ฒญํ๋ ๊ฒ์ด๋ค.
"""- Reduction: ์์๋ค์ ๋ชจ์์ ํ๋๋ก ํฉ์น๋ ์์ , ๋ง์ Spark์ ์ฐ์ฐ๋ค์ด reduction์ด๋ค.
- ๋๋ถ๋ถ์ Action์ Reduction์ด๋ค.
- Reduction: ๊ทผ์ ํ๋ ์์๋ค์ ๋ชจ์์ ํ๋์ ๊ฒฐ๊ณผ๋ก ๋ง๋๋ ์ผ
- ํ์ผ ์ ์ฅ, collect() ๋ฑ๊ณผ ๊ฐ์ด Reduction์ด ์๋ ์ก์ ๋ ์๋ค.
- ํํฐ์ ๋ง๋ค ๋ ๋ฆฝ์ ์ผ๋ก ์์ ์ ์ฒ๋ฆฌํ ์ ์์ด์ผ ๋ถ์ฐ๋ ๋ณ๋ ฌ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๋ค.
- ํํฐ์ ์ด ๋ค๋ฅธ ํํฐ์ ์ ๊ฒฐ๊ณผ์ ์์กดํ๊ฒ ๋๋ฉด, ํ ํ ์คํฌ๊ฐ ์ ํ ์คํฌ๋ฅผ ๊ธฐ๋ค๋ ค์ผ ๋๊ธฐ ๋๋ฌธ์ ์์ ์ ๋์์ ์ฒ๋ฆฌํ ์ ์๊ฒ ๋๊ณ ๋ณ๋ ฌ ์ฒ๋ฆฌ๊ฐ ๋ถ๊ฐ๋ฅํด์ง๋ฏ๋ก ๋ถ์ฐ์ ์๋ฏธ๊ฐ ์์ด์ง๋ค.
- ๋ํ์ ์ธ Reduction Actions: Reduce, Fold, GroupBy, Aggregate
from operator import add
sc.parallelize([1,2,3,4,5]).reduce(add)
# 15 - ํํฐ์ ์ด ์ด๋ป๊ฒ ๋๋ ์ง ํ๋ก๊ทธ๋๋จธ๊ฐ ์ ํํ ์๊ธฐ ์ด๋ ต๋ค.
- ์ฐ์ฐ์ ์์์ ์๊ด ์์ด ๊ฒฐ๊ณผ ๊ฐ์ ๋ณด์ฅํ๋ ค๋ฉด
- ๊ตํ ๋ฒ์น (ab = ba)
- ๊ฒฐํฉ ๋ฒ์น (ab)c = a(bc)
# ํํฐ์
์ ๋ฐ๋ผ ๊ฒฐ๊ณผ ๊ฐ์ด ๋ฌ๋ผ์ง๊ฒ ๋๋ค.
# ๋ถ์ฐ๋ ํํฐ์
๋ค์ ์ฐ์ฐ๊ณผ ํฉ์น๋ ๋ถ๋ถ์ ๋๋ ์ ์๊ฐํด์ผ ํ๋ค.
>>> sc.parallelize([1,2,3,4]).reduce(lambda x,y: (x*2)+y) # ํํฐ์
์ง์ X
26
>>> sc.parallelize([1,2,3,4],1).reduce(lambda x,y: (x*2)+y) # ํํฐ์
1๊ฐ๋ก ์ง์
26
>>> sc.parallelize([1,2,3,4],2).reduce(lambda x,y: (x*2)+y) # ํํฐ์
2๊ฐ๋ก ์ง์
18
>>> sc.parallelize([1,2,3,4],3).reduce(lambda x,y: (x*2)+y) # ํํฐ์
3๊ฐ๋ก ์ง์
18
>>> sc.parallelize([1,2,3,4],4).reduce(lambda x,y: (x*2)+y) # ํํฐ์
4๊ฐ๋ก ์ง์
26
"""
(1,2,3,4) -> ((1*2+2)*2+3)*2+4=26 # ํํฐ์
1
(1,2)(3,4) -> ((1*2+2)*2 + (3*2)+4) = 18 # ํํฐ์
2
"""
- Reduce์ ์ ์ฌํ์ง๋ง, Fold๋ ์์๊ฐ์ ์ค์ ํด์ค๋ค ๋ผ๋ ๋ถ๋ถ๋ง ๋ค๋ฆ.
from operator import add
sc.parallelize([1,2,3,4,5]).fold(0, add)
# 15 rdd = sc.parallelize([2,3,4],4)
rdd.reduce(lambda x, y: x*y) # 24
rdd.fold(1, lambda x, y: x*y) # 24
rdd.reduce(lambda x, y: x+y) # 9 (0+2+3+4 =9)
rdd.fold(1, lambda x, y: x+y) # 14 (1+1) + (1+2) + (1+3) + (1+4) = 14 , ๊ฐ ํํฐ์
์ ์์๊ฐ์ด 1rdd = sc.parallelize([1,1,2,3,5,8])
result = rdd.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x,y) in result])
# [(0, [2,8]), (1, [1,1,3,5])]- RDD ๋ฐ์ดํฐ ํ์ ๊ณผ Action ๊ฒฐ๊ณผ ํ์ ์ด ๋ค๋ฅผ ๊ฒฝ์ฐ ์ฌ์ฉ
- ํํฐ์ ๋จ์์ ์ฐ์ฐ ๊ฒฐ๊ณผ๋ฅผ ํฉ์น๋ ๊ณผ์ ์ ๊ฑฐ์น๋ค
- RDD.aggregate(zeroValue, seqOp, combOp)
- zeroValue: ๊ฐ ํํฐ์ ์์ ๋์ ํ ์์ ๊ฐ
- seqOp: ํ์ ๋ณ๊ฒฝ ํจ์
- combOp: ํฉ์น๋ ํจ์
- ๋ง์ด ์ฐ์ด๋ reduction action
- ๋๋ถ๋ถ์ ๋ฐ์ดํฐ ์์ ์ ํฌ๊ณ ๋ณต์กํ ๋ฐ์ดํฐ ํ์ -> ์ ์ ๋ ๋ฐ์ดํฐ
seqOp = (lambda x,y: (x[0] + y, x[1] + 1))
combOp = (lambda x,y: (x[0] + y[0], x[1] + y[1]))
sc.parallelize([1,2,3,4]).aggregate((0,0), seqOp, combOp) # (10,4)
sc.parallelize([]).aggregate((0,0), seqOp, combOp) # (0,0)
- Transformations
- groupByKey
- reduceByKey
- mapValues
- keys
- join (+leftOuterJoin, rightOuterJoin)
- Actions
- countByKey
- Key-Value RDD์์ Tranformations๊ฐ ๋ง์ ์ด์ : ์ฒ๋ฆฌ๊ณผ์ ์์ ๋์จ ๊ฒฐ๊ณผ๊ฐ์ด ํํฐ์ ์ด ์ ์ง๊ฐ ์๋๋๋ผ๋ ๊ฐ์ด ๊ต์ฅํ ํฌ๊ธฐ ๋๋ฌธ์
- groupBy: ํจ์๋ฅผ ๊ธฐ์ค์ผ๋ก Group
rdd = parallelize([1,1,2,3,5,8])
result = rdd.groupBy(lambda x: x % 2).collect()
sorted([(x, sorted(y)) for (x,y) in result()])
# [(0, [2,8]), (1, [1,1,3,5])]- groupByKey: Key๋ฅผ ๊ธฐ์ค์ผ๋ก Group
rdd = parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
# [('a',2), ('b',1)]
sorted(rdd.groupByKey().mapValues(list).collect())
# [('a', [1,1]), ('b', [1])]- reduce: ํจ์๋ฅผ ๊ธฐ์ค์ผ๋ก ์์๋ค์ ํฉ์นจ (action)
sc.parallelize([1,2,3,4,5]).reduce(add)
# 15- reduceBykey: key๋ฅผ ๊ธฐ์ค์ผ๋ก ๊ทธ๋ฃน์ ๋ง๋ค๊ณ ํฉ์นจ (trans)
rdd = sc.parallelize([("a",1), ("b",1), ("a",1)])
sorted(rdd.reduceByKey(add).collect())
# [('a',2), ('b',1)]- ๊ฐ๋ ์ ์ผ๋ก๋ groupByKey + reduction
- ํ์ง๋ง, groupByKey๋ณด๋ค ํจ์ฌ ๋น ๋ฅด๋ค
- ํจ์๋ฅผ ๋ฐธ๋ฅ์๊ฒ๋ง ์ ์ฉํ๋ค
- ํํฐ์ ๊ณผ ํค๋ ๊ทธ๋๋ก ๋ฉ๋๋ค. (ํํฐ์ ๊ณผ ํค๋ฅผ ์๋ค๊ฐ๋ค ํ์ง์์์ ๋คํธ์ํฌ ๋น์ฉ์ ์ค์ผ ์ ์๋ค)
x = sc.parallelize([("a", ["apple","banana","lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()
# [('a',3), ('b',1)]- ๊ฐ ํค๊ฐ ๊ฐ์ง ์์๋ค์ ์ผ๋ค
rdd = sc.parallelize([("a",1), ("b",1), ("a",1)])
sorted(rdd.countByKey().items())
# [('a',2), ('b',1)]- Transformation
- ๋ชจ๋ Key๋ฅผ ๊ฐ์ง RDD๋ฅผ ์์ฑ
m = sc.parallelize([(1,2), (3,4)]).keys()
m.collect()
# [1,3]- Transformation
- ์ฌ๋ฌ๊ฐ์ RDD๋ฅผ ํฉ์น๋๋ฐ ์ฌ์ฉ
- ๋ํ์ ์ผ๋ก ๋๊ฐ์ง์ join ๋ฐฉ์์ด ์กด์ฌํ๋ค.
- Inner Join (join)
- Outer join (left outer, right outer)
rdd1 = sc.parallelize([("foo",1), ("bar",2), ("baz",3)])
rdd2 = sc.parallelize([("foo",4), ("bar",5), ("bar", 6), ("zoo", 1)])
rdd1.join(rdd2).collect()
# [('bar',(2,5)), ('bar', (2,6)), ('foo', (1,4))]
rdd1.leftOuterJoin(rdd2).collect()
# [('baz', (3, None)), ('bar', (2,5)), ('bar', (2,6)), ('foo', (1,4))]
rdd1.rightOuterJoin(rdd2).collect()
# [('bar', (2,5)), ('bar', (2,6)), ('zoo', (None,1)), ('foo', (1,4))]
- ๊ทธ๋ฃนํ์ ๋ฐ์ดํฐ๋ฅผ ํ ๋ ธ๋์์ ๋ค๋ฅธ๋ ธ๋๋ก ์ฎ๊ธธ ๋ ์ฌ์ฉ
- ์ฑ๋ฅ์ (๋ง์ด) ์ ํ์ํจ๋ค
- groupByKey๋ฅผ ํ ๋๋ ๋ฐ์ํ๋ค.
- ์ฌ๋ฌ ๋ ธ๋์์ ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ๊ฒ ๋์ ๋คํธ์ํฌ ์ฐ์ฐ์ ๋น์ฉ์ด ๋๋ค
- Shuffle์ ์ผ์ผํฌ ์ ์๋ ์์
๋ค
- Join, leftOuterJoin, rightOuterJoin
- GroupByKey
- ReduceByKey
- ComebineByKey
- Distinct
- Intersection
- Repartition
- Coalesce
- Shuffle์ด ๋ฐ์ํ๋ ์์
- ๊ฒฐ๊ณผ๋ก ๋์ค๋ RDD๊ฐ ์๋ณธ RDD์ ๋ค๋ฅธ ์์๋ฅผ ์ฐธ์กฐํ๊ฑฐ๋ ๋ค๋ฅธ RDD๋ฅผ ์ฐธ์กฐํ ๋
- ๋ฏธ๋ฆฌ ํํฐ์ ์ ๋ง๋ค์ด ๋๊ณ ์บ์ฑ ํ reduceByKey ์คํ
- ๋ฏธ๋ฆฌ ํํฐ์ ์ ๋ง๋ค์ด ๋๊ณ ์บ์ฑ ํ join ์คํ
- ๋๋ค ํํฐ์ ๊ณผ ์บ์ฑ์ ์กฐํฉํด์ ์ต๋ํ ๋ก์ปฌ ํ๊ฒฝ์์ ์ฐ์ฐ์ด ์คํ๋๋๋ก ํ๋ ๋ฐฉ์
- ์ ํ์ ์ต์ํํ๋ฉด 10๋ฐฐ์ ์ฑ๋ฅ ํฅ์์ด ๊ฐ๋ฅํ๋ค.
์์ groupByKey vs reduceByKey
# reduceByKey
(textRDD
.flatMap(lambda lin: line.split()) # ๋์ผํ ๋
ธ๋์์ ์คํ
.map(lambda work: (word, 1)) # ๋์ผํ ๋
ธ๋์์ ์คํ
.reduceByKey(lambda a, b: a+b)) # ์
ํ ๋ฐ์
# groupByKey
(textRDD
.flatMap(lambda line: line.split())
.map(lambda word: (word,1))
.groupByKey() # ์
ํ ๋ฐ์
.map(lambda (w, counts): (w, sum(counts))))
# ๊ฐ๊ธ์ ์ด๋ฉด, groupByKey๋์ ์ reduceByKey๋ก ๋์ฒด ๊ฐ๋ฅํ๋๊น reduceByKey๋ฅผ ์ฌ์ฉํ์.
- ๋ฐ์ดํฐ๋ฅผ ์ต๋ํ ๊ท ์ผํ๊ฒ ํผํธ๋ฆฌ๊ณ , ์ฟผ๋ฆฌ๊ฐ ๊ฐ์ด ๋๋ ๋ฐ์ดํฐ๋ฅผ ์ต๋ํ ์์ ๋์ด ๊ฒ์ ์ฑ๋ฅ์ ํฅ์
- RDD๋ ์ชผ๊ฐ์ ธ์ ์ฌ๋ฌ ํํฐ์ ์ ์ ์ฅ๋จ
- ํ๋์ ํํฐ์ ์ ํ๋์ ๋ ธ๋(์๋ฒ)์
- ํ๋์ ๋ ธ๋๋ ์ฌ๋ฌ๊ฐ์ ํํฐ์ ์ ๊ฐ์ง ์ ์์
- ํํฐ์ ์ ํฌ๊ธฐ์ ๋ฐฐ์น๋ ์์ ๋กญ๊ฒ ์ค์ ๊ฐ๋ฅํ๋ฉฐ ์ฑ๋ฅ์ ํฐ ์ํฅ์ ๋ฏธ์นจ
- Key-Value RDD๋ฅผ ์ฌ์ฉํ ๋๋ง ์๋ฏธ๊ฐ ์๋ค.
- ์คํํฌ์ ํํฐ์ ๋ == ์ผ๋ฐ ํ๋ก๊ทธ๋๋ฐ์์ ์๋ฃ๊ตฌ์กฐ๋ฅผ ์ ํํ๋ ๊ฒ
- Hash Partitioning
- Range Partitioning
- ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ํํฐ์ ์ ๊ท ์ผํ๊ฒ ๋ถ๋ฐฐํ๋ ๋ฐฉ์
- ๋์ ๋๋ฆฌ์ ๋น์ทํ ๋ฐฉ์์ผ๋ก ๋ถ๋ฐฐ
- ์๋ชป๋ ์ฌ์ฉ
- ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ํํฐ์ ์ ๊ท ์ผํ๊ฒ ๋ถ๋ฐฐํ๋ ๋ฐฉ์์ธ๋ฐ,
- [๊ทน๋จ์ ์ธ ์] 2๊ฐ์ ํํฐ์ ์ด ์๋ ์ํฉ์์ ์ง์์ Key๋ง ์๋ ๋ฐ์ดํฐ์ ์ Hash ํจ์๊ฐ (x%2)์ธ ๊ฒฝ์ฐ (ํ์ชฝ ํํฐ์ ๋ง ์ฌ์ฉ.)
- ์์๊ฐ ์๋, ์ ๋ ฌ๋ ํํฐ์ ๋
- ํค์ ์์์ ๋ฐ๋ผ ์ ๋ ฌ
- ํค์ ์งํฉ์ ์์์ ๋ฐ๋ผ ์ ๋ ฌ
- ์๋น์ค์ ์ฟผ๋ฆฌ ํจํด์ด ๋ ์ง ์์ฃผ๋ฉด ์ผ๋ณ Range Partition ๊ณ ๋ ค
- Disk์์: partitionBy() (๋ณดํต ์ด๊ฒ์ ๋ง์ด ์ฌ์ฉ)
- ๋ฉ๋ชจ๋ฆฌ์์: repartition(), coalesce()
- ์ฌ์ฉ์๊ฐ ์ง์ ํ ํํฐ์ ์ ๊ฐ์ง๋ RDD๋ฅผ ์์ฑํ๋ ํจ์: partitionBy()
- ํํฐ์
์ ๋ง๋ ํ์ persist()๋ฅผ ํด์ผ ํ๋ค
- ํ์ง์์ผ๋ฉด, ๋ค์ ์ฐ์ฐ์ ๋ถ๋ฆด๋ ๋ง๋ค ๋ฐ๋ณตํ๊ฒ ๋๋ค (์ ํ๋ง์ด ๋ฐ๋ณต์ ์ผ๋ก ์ผ์ด๋๋ค)
pairs = sc.parallelize([1,2,3,4,2,4,1]).map(lambda x: (x,x))
pairs.collect()
# [(1,1),(2,2),(3,3),(4,4),(2,2),(4,4),(1,1)]
pairs.partitionBy(2).glom().collect()
# [[(2,2), (4,4), (2,2), (4,4)], [(1,1), (3,3), (1,1)]]
pairs.partitionBy(2, lambda x: x%2).glom().collect()
# [[(2,2), (4,4), (2,2), (4,4)], [(1,1), (3,3), (1,1)]]
# glom์ ํํฐ์
์ ๋ณด๊น์ง ๊ฐ์ด ๋ณด๋ ํจ์
- ๋๋ค ํํฐ์ ์ ๊ฐฏ์๋ฅผ ์กฐ์ ํ๋๋ฐ ์ฌ์ฉ
- ๋๋ค shuffling์ ๋๋ฐํ์ฌ ๋งค์ฐ ๋น์ผ ์์
- Repartition: ํํฐ์ ์ ํฌ๊ธฐ๋ฅผ ์ค์ด๊ฑฐ๋ ๋๋ฆฌ๋๋ฐ ์ฌ์ฉ
- Coalesce: ํํฐ์ ์ ํฌ๊ธฐ๋ฅผ ์ค์ด๋๋ฐ ์ฌ์ฉ (์ค์ผ๋ Repartition๋ณด๋ค ์ฑ๋ฅ์ด ์ข์ )
- Join (+ Outer join)
- groupByKey
- reduceByKey
- foldByKey
- partitionBy
- Sort
- mapValues (parent)
- flatMapValues (parent)
- filter (parent)
- ๋ฑ
- mapValues, flatMapValues, filter๋ parent RDD์์ ํํฐ์ ์ด ์ ์๋์ด ์์ผ๋ฉด ๊ทธ๊ฑธ ๊ทธ๋๋ก ์ฌ์ฉ
- map, flatMap์ ์ ํํฐ์ ์ ์๋ง๋ค๊น? => map, flatMap์ key๊ฐ์ด ๋ฐ๋ ์ ์๊ธฐ ๋๋ฌธ์ ํํฐ์ ์ ํด๋์๊ฒ ์๋ฏธ๊ฐ ์์ด์ง ์ ์๊ธฐ ๋๋ฌธ
- ๊ทธ๋์ ํํฐ์ ์ด ์ ์ ์๋์ด ์๋ค๋ฉด mapValues, flatMapValues๋ฅผ ์ฐ๋๊ฒ์ด ์ข๋ค.
join().filter() vs filter().join() ์ ๋น๊ตํ๋ฉด ๋น์ฐํ filter().join()์ด ์ฑ๋ฅ์ด ๋ ๋น ๋ฅด๋ค.
์์ ๊ฐ์ ๊ณ ๋ฏผ์ ์คํํฌ๊ฐ ์์์ ํด์ฃผ๋ฉด ์ข๊ฒ ๋๋ฐ, ์ด๋ป๊ฒ ๊ฐ๋ฅํ ๊น?
๋ฐ์ดํฐ๊ฐ ๊ตฌ์กฐํ ๋์ด ์๋ค๋ฉด ์๋์ผ๋ก ์ต์ ํ๊ฐ ๊ฐ๋ฅํ๋ค.
- Unstructured: Free Form
- ๋ก๊ทธ ํ์ผ
- ์ด๋ฏธ์ง
- Semi Structured: ํ๊ณผ ์ด
- CSV
- JSON
- XML
- Structured: ํ๊ณผ ์ด + ๋ฐ์ดํฐ ํ์
(์คํค๋ง)
- ๋ฐ์ดํฐ๋ฒ ์ด์ค
- RDD์์๋
- ๋ฐ์ดํฐ์ ๊ตฌ์กฐ๋ฅผ ๋ชจ๋ฅด๊ธฐ ๋๋ฌธ์ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃจ๋ ๊ฒ์ ๊ฐ๋ฐ์์๊ฒ ์์กดํ๋ค.
- map, flatMap, filter ๋ฑ์ ํตํด ์ ์ ๊ฐ ๋ง๋ function์ ์ํ
- Structured Data์์๋
- ๋ฐ์ดํฐ์ ๊ตฌ์กฐ๋ฅผ ์ด๋ฏธ ์๊ณ ์์ผ๋ฏ๋ก ์ด๋ค ํ ์คํฌ๋ฅผ ์ํํ ๊ฒ์ธ์ง ์ ์๋ง ํ๋ฉด ๋จ
- ์ต์ ํ๋ ์๋์ผ๋ก ํ ์ ์์
- ๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ์ ์๊ฒ ํด์ค๋ค.
- ์ ์ ๊ฐ ์ผ์ผ์ด function์ ์ ์ํ๋ ์ผ ์์ด ์์ ์ ์ํ ํ ์ ์๋ค.
- ์๋์ผ๋ก ์ฐ์ฐ์ด ์ต์ ํ ๋๋ค
- ์คํํฌ ํ๋ก๊ทธ๋๋ฐ ๋ด๋ถ์์ ๊ด๊ณํ ์ฒ๋ฆฌ๋ฅผ ํ๊ธฐ ์ํด ์ฌ์ฉ
- ์คํค๋ง์ ์ ๋ณด๋ฅผ ์ด์ฉํด ์๋์ผ๋ก ์ต์ ํ๋ฅผ ํ๊ธฐ ์ํด ์ฌ์ฉ
- ์ธ๋ถ ๋ฐ์ดํฐ์ ์ ์ฌ์ฉํ๊ธฐ ์ฝ๊ฒ ํ๊ธฐ ์ํด ์ฌ์ฉ
- ์คํํฌ ์์ ๊ตฌํ๋ ํ๋์ ํจํค์ง
- 3๊ฐ์ ์ฃผ์ API
- SQL
- DataFrame
- Datasets
- 2๊ฐ์ ๋ฐฑ์๋ ์ปดํฌ๋ํธ
- Catalyst - ์ฟผ๋ฆฌ ์ต์ ํ ์์ง
- Tungsten - ์๋ฆฌ์ผ๋ผ์ด์ (์ฉ๋ ์ต์ ํ)
- Spark Core์ RDD๊ฐ ์๋ค๋ฉด, Spark SQL์๋ DataFrame์ด ์๋ค.
- DataFrame์ ํ ์ด๋ธ ๋ฐ์ดํฐ์ ์ด๋ผ๊ณ ๋ณด๋ฉด ๋จ
- ๊ฐ๋ ์ ์ผ๋ก๋ RDD์ ์คํค๋ง๊ฐ ์ ์ฉ๋ ๊ฒ์ด๋ผ๊ณ ๋ณด๋ฉด ๋จ
- Spark Core์ SparkContext๊ฐ ์๋ค๋ฉด Spark SQL์๋ SparkSession์ด ์๋ค.
spark = SparkSession.builder.appName("test-app").getOrCreate()- RDD์์ ์คํค๋ง๋ฅผ ์ ์ํ๋ค์ ๋ณํ์ ํ๊ฑฐ๋
- CSV, JSON๋ฑ์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์์ค๋ฉด ๋๋ค
- Schema๋ฅผ ์๋์ผ๋ก ์ ์ถํด์ DataFrame ๋ง๋ค๊ธฐ
- Schema๋ฅผ ์ฌ์ฉ์๊ฐ ์ ์ํ๊ธฐ
# RDD ๋ง๋ค๊ธฐ
lines = sc.textfile("example.csv")
data = lines.map(lambda x: x.split(","))
preprocessed = data.map(lambda x: Row(name=x[0], price=int(x[1])))
# Infer (Schema๋ฅผ ์ ์ถํด์ ๋ง๋ค๊ธฐ)
df = spark.createDataFrame(preprocessed)
# Specify (Schema๋ฅผ ์ฌ์ฉ์๊ฐ ์ ์ํ๊ธฐ)
schema = StructType(
StructField("name", StringType(), True),
StructField("price", StringType(), True)
)
spark.createDataFrame(preprocessed, schema).show()from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test-app").getOrCreate()
# JSON
dataframe = spark.read.json('dataset/nyt2.json')
# TXT FILE
dataframe_txt = spark.read.text('text_data.txt')
# CSV FILE
dataframe_csv = spark.read.csv('csv_data.csv')
# PARQUET FILE
dataframe_parquet = spark.read.load('parquet.data.parquet')- createOrReplaceTempView() ํจ์๋ก temporary view๋ฅผ ๋ง๋ค์ด ์ค์ผ ํจ.
data.createOrReplaceTempView("mobility_data")
spark.sql("SELECT pickup_datetime FROM mobility_data LIMIT 5").show()- Hive Query Language์ ๊ฑฐ์ ๋์ผ
- Select
- From
- Where
- Count
- Having
- Group By
- Order By
- Sort By
- Distinct
- Join
- Spark SQL์ ์ฌ์ฉํ๊ธฐ ์ํด ์ฌ์ฉํ๋ SparkSession
- SparkSession ์ผ๋ก ๋ถ๋ฌ์ค๋ ๋ฐ์ดํฐ๋ DataFrame
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("test-app").getOrCreate()
# JSON
dataframe = spark.read.json('dataset/nyt2.json')
# TXT FILE
dataframe_txt = spark.read.text('text_data.txt')
# CSV FILE
dataframe_csv = spark.read.csv('csv_data.csv')
# PARQUET FILE
dataframe_parquet = spark.read.load('parquet.data.parquet')- SQL๋ฌธ์ ์ฌ์ฉํด์ ์ฟผ๋ฆฌ๊ฐ ๊ฐ๋ฅํ๋ค.
data.createOrReplaceTempView("mobility_data")
spark.sql("SELECT pickup_datetime FROM mobility_data LIMIT 5").show()- ํจ์๋ฅผ ์ฌ์ฉํด์ ์ฟผ๋ฆฌ๋ ๊ฐ๋ฅํ๋ค.
df.select(df['name'], df['age'] + 1).show()
df.filter(df['age'] > 21).show()
df.groupBy("age").count().show()- DataFrame์ RDD๋ก ๋ณํํด ์ฌ์ฉํ ์๋ ์๋ค.
rdd = df.rdd.map(tuple)- (ํ์ง๋ง, RDD๋ฅผ ๋ ์ฌ์ฉํ๋ ์ชฝ์ด ์ข๋ค)
- MLLib์ด๋ Spark Streaming ๊ฐ์ ๋ค๋ฅธ ์คํํฌ ๋ชจ๋๋ค๊ณผ ์ฌ์ฉํ๊ธฐ ํธํ๋ค.
- ๊ฐ๋ฐํ๊ธฐ ํธํ๋ค.
- ์ต์ ํ๋ ์์์ ๋๋ค.
- Type์ด ์๋ DataFrame
- PySpark์์ ํฌ๊ฒ ์ ๊ฒฝ์ฐ์ง ์์๋ ๋๋ค.
./1-spark/learn-sql.ipynb
- ๊ด๊ณํ ๋ฐ์ดํฐ์ด๋ค.
- ํ๋ง๋๋ก ๊ด๊ณํ ๋ฐ์ดํฐ์ = RDD + Relation
- RDD๊ฐ ํจ์ํ API๋ฅผ ๊ฐ์ก๋ค๋ฉด DataFrame์ ์ ์ธํ API
- ์๋์ผ๋ก ์ต์ ํ๊ฐ ๊ฐ๋ฅ
- ํ์ ์ด ์๋ค
- RDD์ ํ์ฅํ
- ์ง์ฐ ์คํ (Lazy Execution)
- ๋ถ์ฐ ์ ์ฅ
- Immutable
- ์ด(Row) ๊ฐ์ฒด๊ฐ ์๋ค
- SQL ์ฟผ๋ฆฌ๋ฅผ ์คํํ ์ ์๋ค.
- ์คํค๋ง๋ฅผ ๊ฐ์ง ์ ์๊ณ ์ด๋ฅผ ํตํด ์ฑ๋ฅ์ ๋์ฑ ์ต์ ํ ํ ์ ์๋ค
- CSV, JSON, Hive ๋ฑ์ผ๋ก ์ฝ์ด์ค๊ฑฐ๋ ๋ณํ์ด ๊ฐ๋ฅ
- dtypes
- show()
- ํ ์ด๋ธ ํํ๋ก ๋ฐ์ดํฐ๋ฅผ ์ถ๋ ฅ
- ์ฒซ 20๊ฐ์ ์ด๋ง ๋ณด์ฌ์ค๋ค
- printSchema()
- ์คํค๋ง๋ฅผ ํธ๋ฆฌ ํํ๋ก ๋ณผ ์ ์๋ค.
- SQL ๊ณผ ๋น์ทํ ์์ ์ด ๊ฐ๋ฅํ๋ค.
- Select
- Where
- Limit
- OrderBy
- GroupBy
- Join
- ์ฌ์ฉ์๊ฐ ์ํ๋ Column์ด๋ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถ ํ๋๋ฐ ์ฌ์ฉ
df.select('*').collect()
df.select('name','age').collect()
df.select(df.name, (df.age+10).alias('age')).collect()- Aggregate์ ์ฝ์๋ก, ๊ทธ๋ฃนํ ํ ๋ฐ์ดํฐ๋ฅผ ํ๋๋ก ํฉ์น๋ ์์
df.agg({"age",: "max"}).collect()
# [Row(max(age)=5)]
from pyspark.sql improt functions as F
df.agg(F.min(df.age)).collect()
# [Row(min(age)=2)]- ์ฌ์ฉ์๊ฐ ์ง์ ํ column์ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๋ฅผ Groupingํ๋ ์์
df.groupBy().avg().collect()
# [Row(avg(age)=3.5)]
sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
# [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
sorted(df.groupBy(df.name).avg().collect())
# [Row(name='Alice', avg(age)=2.0), Row(name='Bob', avg(age)=5.0)]
sorted(df.groupBy(['name', df.age]).count().collect())
# [Row(name='Alice', age=2, count=1), Row(name='Bob', age=5, count=1)]- ๋ค๋ฅธ DataFrame๊ณผ ์ฌ์ฉ์๊ฐ ์ง์ ํ Column์ ๊ธฐ์ค์ผ๋ก ํฉ์น๋ ์์
df.join(df2, 'name').select(df.name, df2.height).collect()
# [Row(name='Bob', height=85)]- ์ด์ ์ RDD๋ก ์ค์ตํด๋ณด์๋๋ฐ, ์ด๋ฒ์ Spark SQL๋ก ํด๋ณด์.
./1-spark/trip_count_sql.ipynb
- ๋ ํ ์ด๋ธ์ JOIN ์ค์ต
./1-spark/trip_count_sql_by_zone-Copy1.ipynb
- ์คํํฌ๋ ์ฟผ๋ฆฌ๋ฅผ ๋๋ฆฌ๊ธฐ ์ํด ๋๊ฐ์ง ์์ง์ ์ฌ์ฉํ๋ค.
- Catalyst, Tungsten
- ์ํ ํด์ผ ํ๋ ๋ชจ๋ transformation ๋จ๊ณ์ ๋ํ ์ถ์ํ
- ๋ฐ์ดํฐ๊ฐ ์ด๋ป๊ฒ ๋ณํด์ผ ํ๋์ง ์ ์ํ์ง๋ง,
- ์ค์ ์ด๋์ ์ด๋ป๊ฒ ๋์ ํ๋์ง๋ ์ ์ํ์ง ์์
- Logical Plan์ด ์ด๋ป๊ฒ ํด๋ฌ์คํฐ ์์์ ์คํ ๋ ์ง ์ ์
- ์คํ ์ ๋ต์ ๋ง๋ค๊ณ Cost Model์ ๋ฐ๋ผ ์ต์ ํ
- SQL๊ณผ DataFrame์ด ๊ตฌ์กฐ๊ฐ ์๋ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ์ ์๊ฒ ํด์ฃผ๋ ๋ชจ๋
- Logical Plan์ Physical Plan์ผ๋ก ๋ฐ๊พธ๋ ์ผ์ ํ๋ค.
- ๋ถ์: DataFrame ๊ฐ์ฒด์ relation์ ๊ณ์ฐ, ์นผ๋ผ์ ํ์ ๊ณผ ์ด๋ฆ ํ์ธ
- Logical Plan ์ต์ ํ
- ์์๋ก ํํ๋ ํํ์์ Compile Time์ ๊ณ์ฐ (x runtime)
- Predicate Pushdown: join & filter -> filter & join
- Projection Pruning: ์ฐ์ฐ์ ํ์ํ ์นผ๋ผ๋ง ๊ฐ์ ธ์ค๊ธฐ
- Physical Plan ๋ง๋ค๊ธฐ: Spark์์ ์คํ ๊ฐ๋ฅํ Plan์ผ๋ก ๋ณํ
- ์ฝ๋ ์ ๋ค๋ ์ด์ : ์ต์ ํ๋ Physical Plan์ Java Bytecode๋ก
SELECT zone_data.Zone, count(*) AS trips \
FROM trip_data JOIN zone_data \
ON trip_data.PULocationID = zone_data.LocationID \
WHERE trip_data.hvfhs_license_num = 'HV0003' \
GROUP BY zone_data.Zone order by trips desc๊ธฐ๋ณธ ์์
- Scan: ๋๊ฐ์ ํ ์ด๋ธ์์ ๋ฐ์ดํฐ ์ถ์ถ
- Join:
join - Filter:
trip_data.hvfhs_license_num = 'HV0003 - Project:
count(*) AS trips - Aggregate:
group by
์ต์ ํ
- Scan: ๋๊ฐ์ ํ ์ด๋ธ์์ ๋ฐ์ดํฐ ์ถ์ถ
- Filter:
trip_data.hvfhs_license_num = 'HV0003 - Join:
join - Project:
count(*) AS trips - Aggregate:
group by
spark.sql(query).explain(True)- explain(True) ๋ช
๋ น์ด๋ฅผ ์
๋ ฅํ๋ฉด ์๋์ ์ ๋ณด๋ค์ ๋ณด์ฌ์ค๋ค
- Parsed Logical Plan: ์ฌ์ฉ์๊ฐ ์ด ์ฝ๋ ๊ทธ๋๋ก
- Analyzed Logical Plan: ์ฌ์ฉ์๊ฐ ์ง์ ํ ํ ์ด๋ธ์ ๋ฌด์จ ์ปฌ๋ผ์ด ์๋์ง ํ์ธํ๋ค.
- Optimized Logical Plan: Filtering์ฝ๋๋ฅผ ๋ ๋นจ๋ฆฌ ํ๋ ๋ฑ ์ต์ ํ๋ ์ฝ๋๋ฅผ ๋ณด์ฌ์ค๋ค
- Physical Plan: ๋ํ ์ผํ Plan์ ๋ณด์ฌ์ค
- explain(True ์์ด) ๋ช
๋ น์ด๋ฅผ ์
๋ ฅํ๋ฉด ์๋ ์ ๋ณด๋ง ๋์จ๋ค.
- Physical Plan
- Physical Plan์ด ์ ํ๋๊ณ ๋๋ฉด ๋ถ์ฐ ํ๊ฒฝ์์ ์คํ๋ Bytecode๊ฐ ๋ง๋ค์ด์ง๋ค. (Code Generation)
- ์คํํฌ ์์ง์ ์ฑ๋ฅ ํฅ์์ด ๋ชฉ์
- ๋ฉ๋ชจ๋ฆฌ ๊ด๋ฆฌ ์ต์ ํ
- ์บ์ ํ์ฉ ์ฐ์ฐ
- ์ฝ๋ ์์ฑ
- user-defined-functions
- sql ๋ฌธ์์์ ์ธ ์ ์๋ function์ ๋ง๋๋๊ฒ
์ค์ต
./1-spark/user-defined-functions.ipynb
./1-spark/taxi-analysis.ipynb
- Machine Learning Library
- ML์ ์ฝ๊ณ ํ์ฅ์ฑ ์๊ฒ ์ ์ฉํ๊ธฐ ์ํด ์ฌ์ฉ
- ๋จธ์ ๋ฌ๋ ํ์ดํ๋ผ์ธ ๊ฐ๋ฐ์ ์ฝ๊ฒ ํ๊ธฐ ์ํด
- ๋ฐ์ดํฐ๋ฅผ ์ด์ฉํด ์ฝ๋ฉ์ ํ๋ ์ผ
- ์ต์ ํ์ ๊ฐ์ ๋ฐฉ๋ฒ์ ํตํด ํจํด์ ์ฐพ๋์ผ
- ์๊ณ ๋ฆฌ์ฆ
- Classification
- Regression
- Clustering
- Recommendation
- ํ์ดํ๋ผ์ธ
- Training
- Evaluating
- Tuning
- Persistence
- Feature Engineering
- Extraction
- Transformation
- Utils
- Linear algebra
- Statistics
- ๋ฐ์ดํฐ ๋ก๋ฉ -> ์ ์ฒ๋ฆฌ -> ํ์ต -> ๋ชจ๋ธ ํ๊ฐ
- ํ๋ผ๋ฏธํฐ ํ๋ ํ ์ ๊ณผ์ ์ ๋ค์ ์๋
- ํผ์ณ ์์ง๋์ด๋ง
- ํต๊ณ์ ์ฐ์ฐ
- ํํ ์ฐ์ด๋ ML์๊ณ ๋ฆฌ์ฆ๋ค
- Regression (Linea, Logistic)
- Support Vector Machines
- Naive Bayes
- Decision Tree
- K-Means clustering
- ์ถ์ฒ (Alternating Least Squares)
- ์์ง RDD API๊ฐ ์์ง๋ง, "maintenance mode"
- ์๋ก์ด API๋ ๊ฐ๋ฐ์ด ๋๊น
- DataFrame์ ์ฐ๋ MLlib API๋ฅผ Spark ML์ด๋ผ๊ณ ๋ ๋ถ๋ฆ
- DataFrame
- Transformer
- Estimator
- Evaluator
- Pipeline
- Parameter
- ํผ์ณ ๋ณํ๊ณผ ํ์ต๋ ๋ชจ๋ธ์ ์ถ์ํ
- ๋ชจ๋ Transformer๋ transform() ํจ์๋ฅผ ๊ฐ๊ณ ์๋ค
- ๋ฐ์ดํฐ๋ฅผ ํ์ต์ด ๊ฐ๋ฅํ ํฌ๋ฉง์ผ๋ก ๋ฐ๊พผ๋ค
- DF๋ฅผ ๋ฐ์ ์๋ก์ด DF๋ฅผ ๋ง๋๋๋ฐ, ๋ณดํต ํ๋ ์ด์์ column์ ๋ํ๊ฒ ๋๋ค
- ์)
- Data Normalization
- Tokenization
- ์นดํ ๊ณ ๋ฆฌ์ปฌ ๋ฐ์ดํฐ๋ฅผ ์ซ์๋ก (one-hot encoding)
- ๋ชจ๋ธ์ ํ์ต ๊ณผ์ ์ ์ถ์ํ
- ๋ชจ๋ Estimator๋ fit() ํจ์๋ฅผ ๊ฐ๊ณ ์๋ค
- fit()์ DataFrame์ ๋ฐ์ Model์ ๋ฐํ
- ๋ชจ๋ธ์ ํ๋์ Transformer
- ์)
- lr = LinearRegression()
- model = lr.fit(data)
- metric์ ๊ธฐ๋ฐ์ผ๋ก ๋ชจ๋ธ์ ์ฑ๋ฅ์ ํ๊ฐ
- ์) Root mean squared error (RMSE)
- ๋ชจ๋ธ์ ์ฌ๋ฌ๊ฐ ๋ง๋ค์ด์, ์ฑ๋ฅ์ ํ๊ฐ ํ ๊ฐ์ฅ ์ข์ ๋ชจ๋ธ์ ๋ฝ๋ ๋ฐฉ์์ผ๋ก ๋ชจ๋ธ ํ๋์ ์๋ํ ํ ์ ์๋ค.
- ์)
- BinarClassificationEvaluator
- CrossValidator
- ML์ ์ํฌํ๋ก์ฐ๋ฅผ ์ ์ํ ๋ ์ฌ์ฉ
- ์ฌ๋ฌ stage๋ฅผ ๋ด๊ณ ์๋ค
- ์ ์ฅ๋ ์ ์๋ค. (persist)
- ํ์ดํ๋ผ์ธ ์: ๋ฐ์ดํฐ๋ก๋ฉ -> ์ ์ฒ๋ฆฌ -> ํ์ต -> ๋ชจ๋ธํ๊ฐ
- Transformer -> Tranformer -> Estimator -> Evaluator -> Model
./1-spark/logistic-regression.ipynb
./1-spark/pipeline.ipynb
- Alternating Least Squares
- ์์ง ๋ชป๋ณธ ์ํ๋ค์ ํ์ ์ ์ํ๊ณ ,
- ๊ฐ์ ์ ๋ ฌํด์ ์ ์ผ ์์์ ๋ถํฐ ์ ์ ์๊ฒ ์ ๋ฌํ๋ ๊ฒ์ด ์ถ์ฒ์ด๋ค.
./1-spark/movie-recommendation.ipynb
- ์ง๋ ํ์ต
- Regression, Classification ๋๋ค ์ง๋ํ์ต์ด๋ค.
- Regression: ์์ธก๋ ๊ฐ์ด ์ค์
- Classification: ์์ธก๋ ๊ฐ์ด ํด๋์ค(์นดํ ๊ณ ๋ฆฌ)
./1-spark/taxi-fare-prediction.ipynb
./1-spark/taxi-fare-prediction-2.ipynb
./1-spark/taxi-fare-prediction-hyper.ipynb
./1-spark/taxi-fare-prediction-hyper.ipynb
- SQL ์์ง ์์ ๋ง๋ค์ด์ง ๋ถ์ฐ ์คํธ๋ฆผ ์ฒ๋ฆฌ ํ๋ก์ธ์ฑ
- ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์ฒ๋ฆฌํ ๋ ์ฌ์ฉ
- ์๊ฐ๋ ๋ณ๋ก ๋ฐ์ดํฐ๋ฅผ ํฉ์ณ(aggregate) ๋ถ์ ํ ์ ์์
- kafka, Amazon Kinesis, HDFS ๋ฑ๊ณผ ์ฐ๊ฒฐ ๊ฐ๋ฅ
- ์ฒดํฌํฌ์ธํธ๋ฅผ ๋ง๋ค์ด์ ๋ถ๋ถ์ ์ธ ๊ฒฐํจ์ด ๋ฐ์ํด๋ ๋ค์ ๋์๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ ์ ์๋ค.
- ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ๋ฌดํํ ํ ์ด๋ธ์ด๋ค.
- input Data Stream --SparkStreaming--> batches of input data --SparkEngine--> batches of processed data
- Spark Stream์ ๊ธฐ๋ณธ์ ์ธ ์ถ์ํ
- ๋ด๋ถ์ ์ผ๋ก RDD์ ์ฐ์์ด๊ณ RDD์ ์์ฑ์ ์ด์ด๋ฐ์
- ์ง๊ธ์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํด ์ด์ ๋ฐ์ดํฐ์ ๋ํ ์ ๋ณด๊ฐ ํ์ํ ๋
- ๋ฐ์ดํฐ๋ฅผ ์ด๋์์ ์ฝ์ด์ฌ ์ง ๋ช ์
- ์ฌ๋ฌ ๋ฐ์ดํฐ ์์ค๋ฅผ ์ฌ์ฉํด join()์ด๋ union()์ผ๋ก ํฉ์ณ ์ธ ์ ์๋ค
spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("subscribe","topic")
.load()spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("subscribe","topic")
.load()
.selectExpr("cast(value as string) as json")
.select(from_json("json", schema).as("data"))spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", ...)
.option("subscribe","topic")
.load()
.selectExpr("cast(value as string) as json")
.select(from_json("json", schema).as("data"))
.writeStream.format("parquet")
.trigger("1 minute") # <-- micro-batch ์คํ ๊ฐ๊ฒฉ
.option("checkpointLocation", "...")
.start()- Map
- FlatMap
- Filter
- ReduceByKey
- ์ด์ ๋ฐ์ดํฐ์ ๋ํ ์ ๋ณด๋ฅผ State๋ก ์ฃผ๊ณ ๋ฐ์ ์ ์๋ค.
- ์) ์นดํ ๊ณ ๋ฆฌ๋ณ (ํค๊ฐ ๋ณ) ์ดํฉ
terminal1) nc -lk 9999 # ์์ผ ์ด๊ธฐ
terminal2) python3 ./1-spark/streaming.py
terminal1) test testa testb
terminal1) test test testa
- ์์ด๋น์ค๋น์์ ๊ฐ๋ฐํ ์ํฌํ๋ก์ฐ ์ค์ผ์ค๋ง, ๋ชจ๋ํฐ๋ง ํ๋ซํผ
- ์ค์ ๋ฐ์ดํฐ์ ์ฒ๋ฆฌ๊ฐ ์ด๋ฃจ์ด์ง๋ ๊ณณ์ ์๋๋ค.
- 2016๋ ์ํ์น ์ฌ๋จ incubator program
- ํ์ฌ ์ํ์น ํ๋ ๋ฒจ ํ๋ก์ ํธ
- Airbnb, Yahoo, Paypal, Intel, Stripe
- ๋งค์ผ 10์์ ์ฃผ๊ธฐ์ ์ผ๋ก ๋์๊ฐ๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ ๋ง๋ค๋ ค๋ฉด?
- ๊ธฐ์กด ๋ฐฉ์: cron script๋ก ์ฌ์ฉ
- ๋งค์ผ 10์์ ์ฃผ๊ธฐ์ ์ผ๋ก ๋์๊ฐ๋ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ (์ธ๋ถ api๋ก download -> process(Spark Job) -> store(DB))๋ค์ ์์ญ๊ฐ ๋ง๋ค์ด์ผ ํ๋ค๋ฉด?
- ์คํจ ๋ณต๊ตฌ: ์ธ์ ์ด๋ป๊ฒ ๋ค์ ์คํํ ๊ฒ์ธ๊ฐ? Backfill
- ๋ชจ๋ํฐ๋ง: ์ ๋์๊ฐ๊ณ ์๋์ง ํ์ธํ๊ธฐ ํ๋ค๋ค
- ์์กด์ฑ ๊ด๋ฆฌ: ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ๊ฐ ์์กด์ฑ์ด ์๋ ๊ฒฝ์ฐ ์์ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์ด ์ ๋์๊ฐ๊ณ ์๋์ง ํ์ ์ด ํ๋ค๋ค
- ํ์ฅ์ฑ: ์ค์ํ ํด์ ๊ด๋ฆฌํ๋ ํด์ด ์๊ธฐ ๋๋ฌธ์ ๋ถ์ฐ๋ ํ๊ฒฝ์์ ํ์ดํ๋ผ์ธ๋ค์ ๊ด๋ฆฌํ๊ธฐ ์ด๋ ต๋ค
- ๋ฐฐํฌ: ์๋ก์ด ์ํฌํ๋ก์ฐ๋ฅผ ๋ฐฐํฌํ๊ธฐ ํ๋ค๋ค
- ์ํฌํ๋ก์ฐ๋ฅผ ์์ฑํ๊ณ ์ค์ผ์ค๋งํ๊ณ ๋ชจ๋ํฐ๋ง ํ๋ ์์ ์ ํ๋ก๊ทธ๋๋ฐ ํ ์ ์๊ฒ ํด์ฃผ๋ ํ๋ซํผ
- ํ์ด์ฌ์ผ๋ก ์ฌ์ด ํ๋ก๊ทธ๋๋ฐ์ด ๊ฐ๋ฅ
- ๋ถ์ฐ๋ ํ๊ฒฝ์์ ํ์ฅ์ฑ์ด ์์
- ์น ๋์๋ณด๋ (UI)
- ์ปค์คํฐ๋ง์ด์ง์ด ๊ฐ๋ฅ
- ์์กด์ฑ์ผ๋ก ์ฐ๊ฒฐ๋ ์์ (task)๋ค์ ์งํฉ == DAG == Directed Acyclic Graph
- ์น ์๋ฒ - ์น ๋์๋ณด๋ UI
- ์ค์ผ์ค๋ฌ - ์ํฌํ๋ก์ฐ๊ฐ ์ธ์ ์คํ๋๋์ง ๊ด๋ฆฌ
- Metastore - ๋ฉํ๋ฐ์ดํฐ ๊ด๋ฆฌ
- Executor - ํ ์คํฌ๊ฐ ์ด๋ป๊ฒ ์คํ๋๋์ง ์ ์
- Worker - ํ ์คํฌ๋ฅผ ์คํํ๋ ํ๋ก์ธ์ค
- ์์ ์ ์ ์ํ๋๋ฐ ์ฌ์ฉ
- Action Operators: ์ค์ ์ฐ์ฐ์ ์ํ
- Transfer Operators: ๋ฐ์ดํฐ๋ฅผ ์ฎ๊น
- Sensor Operators: ํ ์คํฌ๋ฅผ ์ธ์ ์คํ์ํฌ ํธ๋ฆฌ๊ฑฐ๋ฅผ ๊ธฐ๋ค๋ฆผ
- Operator๋ฅผ ์คํ์ํค๋ฉด Task๊ฐ ๋๋ค
- Task = Operator Instance
- ์ฌ๋ฌ ๋ฐ์ดํฐ ์์ง๋์ด๋ง ํ๊ฒฝ์์ ์ ์ฉํ๊ฒ ์ฐ์ผ ์ ์๋ค
- ๋ฐ์ดํฐ ์จ์ดํ์ฐ์ค
- ๋จธ์ ๋ฌ๋
- ๋ถ์
- ์คํ
- ๋ฐ์ดํฐ ์ธํ๋ผ ๊ด๋ฆฌ
- WebServer, Metastore, Scheduler, Executor๊ฐ ์กด์ฌ
- ๋์ ๊ณผ์
- Metastore์์ dag์ ๋ํ ์ ๋ณด๋ฅผ ๋ด๊ณ ์์ด์, Web server์ Scheduler๊ฐ ๊ทธ ์ ๋ณด๋ฅผ ์ฝ์ด ์ค๊ณ Executor๋ก ์ด ์ ๋ณด๋ฅผ ๋ณด๋ด์ ์คํ์ ํ๋ค.
- ์ด๋ ๊ฒ ์คํ๋ Task Instance๋ metastore๋ก ๋ณด๋ด์ ธ์ ์ํ๋ฅผ ์ ๋ฐ์ดํธ ํ๋ค.
- ์ด๋ ๊ฒ ์ ๋ฐ์ดํธ๋ ์ํ๋ฅผ ๋ค์ Web Server์ Scheduler๊ฐ ์ฝ์ด์์ Task๊ฐ ์ ์๋ฃ๊ฐ ๋์๋์ง ํ์ธ์ ํ๋ค.
- Executor์ Queue๊ฐ ์กด์ฌํด์ ์์๋ฅผ ์ ํ ์ ์๊ฒ ๋๋ค.
- Queue๊ฐ Executor ๋ฐ๊นฅ์ ์กด์ฌ ํ๋ค (One Node Architecture์์ ํฐ ์ฐจ์ด์ )
- Celery Broker๊ฐ Queue์ด๋ค.
- ๋์ ๊ณผ์
- MetaStore์์ dag์ ๋ณด๋ฅผ webserver์ scheduler๊ฐ ์ ๋ณด๋ฅผ ์ฝ๊ณ , celery executor๋ฅผ ํตํด์ celery broker์ task ์์๋๋ก ๋ด๋๋ค.
- ์์๋๋ก ๋ด๊ธด task๋ฅผ worker๋ค์ด ํ๋์จ ๊ฐ์ ธ๊ฐ์ ์์๋๋ก ์คํ๋๋ค.
- ์ด๋ ๊ฒ ์คํ๋ dag๋ค์ ์๋ฃ๋๋ฉด celery executor ๊ทธ๋ฆฌ๊ณ metastore์ ๋ณด๊ณ ๊ฐ ๋๋ค.
- ์ด๋ ๊ฒ ์๋ฃ๋ ์ํ๋ฅผ UI์ Scheduler๊ฐ ๋ค์์ฝ์ด์์ ์๋ฃ๋๋ ๊ฒ์ ํ์ธํ๋ค.
- DAG๋ฅผ ์์ฑํ์ฌ Workflow๋ฅผ ๋ง๋ ๋ค. DAG๋ Task๋ก ๊ตฌ์ฑ๋์ด ์๋ค
- Task๋ Operator๊ฐ ์ธ์คํด์คํ ๋ ๊ฒ
- DAG๋ฅผ ์คํ์ํฌ ๋ Scheduler๊ฐ DagRun ์ค๋ธ์ ํธ๋ฅผ ๋ง๋ ๋ค
- DagRun ์ค๋ธ์ ํธ๋ Task Instance๋ฅผ ๋ง๋ ๋ค
- Worker๊ฐ Task๋ฅผ ์ํ ํ DagRun ์ ์ํ๋ฅผ "์๋ฃ"๋ก ๋ฐ๊ฟ๋๋๋ค.
- ์ ์ ๊ฐ ์๋ก์ด DAG๋ฅผ ์์ฑ ํ Folder DAGs ์์ ๋ฐฐ์น
- Web Server์ Scheduler๊ฐ DAG๋ฅผ ํ์ฑ
- Scheduler๊ฐ Metastore๋ฅผ ํตํด DagRun ์ค๋ธ์ ํธ๋ฅผ ์์ฑ
- DagRun์ ์ฌ์ฉ์๊ฐ ์์ฑํ DAG์ ์ธ์คํด์ค
- DagRun status: Running
- Scheduler๊ฐ Task Instance ์ค๋ธ์ ํธ (Dag run ์ค๋ธ์ ํธ์ ์ธ์คํด์ค == Task Instance) ๋ฅผ ์ค์ผ์ค๋ง
- Trigger๊ฐ ์ํฉ์ ๋ง์ผ๋ฉด Scheduler๊ฐ Task Instance๋ฅผ Executor๋ก ๋ณด๋
- Executor๊ฐ ๊ทธ Task๋ฅผ ์คํ์ํจ ๋ค์, ์๋ฃํ Metastore์ ์๋ฃํ๋ค๊ณ ๋ณด๊ณ ํ๋ค. (์๋ฃ๋ Task Instance๋ Dag Run์ ์ ๋ฐ์ดํธ ํ๋ค)
- Scheduler๊ฐ Metastore๋ฅผ ํตํด์ DAG ์คํ์ด ์๋ฃ๋๋ ํ์ธ์ ํ๊ณ DagRun Status๋ฅผ Completed๋ก ๋ณ๊ฒฝํ๋ค.
- Web Server๊ฐ Metastore๋ฅผ ํตํด์ DAG ์คํ์ด ์๋ฃ๋๋ ํ์ธ์ ํ๊ณ UI ์ ๋ฐ์ดํธ๋ฅผ ํ๋ค.
# m1 ์์๋ ์ด ๋ฐฉ๋ฒ์ผ๋ก ์ค์น ์๋จ.
pip --version # anaconda ๋ก ์ค์น๋์ง ํ์ธ
pip install apache-airflow
airflow db init
airflow werbserver -p 8080
airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
# m1
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.1/docker-compose.yaml'
docker-compose up airflow-init
docker-compose up -d
docker exec -it 64bb1d858ab5ad7babfad795a6e3dc60121e27b15a83c37bda4f54a6a /bin/sh # webserver container ์ ์
airflow users create --role Admin --username admin --email admin --firstname admin --lastname admin --password admin
airflow -h: ๊ฐ์ข ๋ช ๋ น์ด ์ค๋ช ๋ณด๊ธฐairflow webserver: webserver ์์airflow users create ~~: user ์ถ๊ฐairflow scheduler: scheduler ์์airflow db init: db์ ๊ธฐ๋ณธ์ ์ธ ํ์ดํ๋ผ์ธ ์์ฑ ๋ฐ ๊ธฐ๋ณธ ์ค์ airflow dags list: ํ์ฌ ๋์๊ฐ๋ dag๋ค ์ถ๋ ฅairflow tasks list example_xcom: example_xcom ์์ ์กด์ฌํ๋ task๋ค ์กฐํairflow dgas trigger -e 2022-01-01 example_xcom: ํน์ dag๋ฅผ ํธ๋ฆฌ๊ฑฐ
Owner: Dag ๊ด๋ฆฌ์Runs: ์คํ ์ค์ธ DAG์ ์ํSchedule: ์ฃผ๊ธฐ๋ฅผ ๋ํ๋ด๋ ์ค์ Last Run: ์ต๊ทผ ์คํ ๋ ์งNext Run: ๋ค์ ์คํ์ด ์ธ์ ๋ ์ง ๋ํ๋Recent Tasks: ๋ฐฉ๊ธ ์คํ๋ Task๋ค์ ๋ณด์ฌ์คActions: DAG๋ฅผ ์ง์ฐ๊ฑฐ๋ ์คํLinks: ๋ง์ฐ์ค ๊ฐ๋ค๋๋ฉด ์ฌ๋ฌ๊ฐ์ง Link๋ค์ด ๋ณด์
Tree: Task๋ค์ ์ํ๋ฅผ ๋ณด๊ธฐ ํธํจGraph: Task๋ค์ ์์กด์ฑ์ ํ์ธํ ๋ ์ข์, ๊ฐ Task๋ค์ Log ์ ๋ณด ๋ฑ์ ํ์ธํ๊ธฐ์๋ ์ข์Calendar: ๋ ์ง๋ณ๋ก ์คํจ ์์ด ์ ๋์๊ฐ๋ ํ์ธ ๊ฐ๋ฅTask Duration,Task Tries,Landing Times: ๋ ์ง๊ธฐ๋ฐ์ผ๋ก ๋ญ๊ฐํ์ธ์ธ๋ฐ ์ค์น ์งํ์ ๋ณผ๊ฒ ์์Gantt: ๊ฐ๊ฐ์ task๊ฐ ์คํํ๋ฉด์ ์ผ๋งํผ์ ์๊ฐ์ ์๋นํ๋ ๋ณผ ์ ์๋ค.Details: ์ฌ๋ฌ๊ฐ์ง Metadata ํ์ธCode: DAG ์ฝ๋ ํ์ธ
- OpenSea ์ฌ์ดํธ์ NFT๋ฐ์ดํฐ๋ฅผ ์ถ์ถํด ํ ์ด๋ธ์ ์ ์ฅํ๊ธฐ
- ํ ์ด๋ธ ์์ฑ -> API ํ์ธ -> NFT ์ ๋ณด ์ถ์ถ -> NFT ์ ๋ณด ๊ฐ๊ณต -> NFT ์ ๋ณด ์ ์ฅ
./2-airflow/01-sqlite.py # ๊ธฐ๋ณธ dag ๊ตฌ์ฑ
# ์์ฑ ํ dag ๋์๋ณด๋์ ๋ฑ์ฅํ๋์ง ํ์ธ
- BashOperator
- PythonOperator
- EmailOperator
- Action Operator๋ ์ก์ ์ ์คํํ๋ค (๋ฐ์ดํฐ๋ฅผ ์ถ์ถ, ๋ฐ์ดํฐ ํ๋ก์ธ์ฑ ๋ฑ)
- Transfer Operator๋ ๋ฐ์ดํฐ๋ฅผ ์ฎ๊ธธ ๋ ์ฌ์ฉ
- Sensors: ์กฐ๊ฑด์ด ๋ง์ ๋ ๊น์ง ๊ธฐ๋ค๋ฆฐ๋ค
Airflow ๋์๋ณด๋ -> Admin -> Connections -> ์ถ๊ฐ -> connection id =db_sqlite, conneciton Type = Sqlite ๋ก Save
./2-airflow/02-create-table.py
airflow tasks test nft-pipeline creating_table 2021-01-01 # task ์คํ
Airflow ๋์๋ณด๋ -> Admin -> Connections -> ์ถ๊ฐ -> connection id = opensea_api, conneciton Type = http, host: https://api.opensea.io/ ๋ก Save
./2-airflow/03-sensor.py
airflow tasks test nft-pipeline is_api_available 2021-01-01 # task ์คํ
์ถ์ฒ: https://github.com/keon/data-engineering/tree/main/02-airflow
Airflow ๋์๋ณด๋ -> Admin -> Connections -> ์ถ๊ฐ -> connection id = githubcontent_api, conneciton Type = http, host: https://raw.githubusercontent.com/ ๋ก Save
./2-airflow/03-sensor.py
airflow tasks test nft-pipeline is_api_available 2021-01-01 # task ์คํ
./2-airflow/04-extract-data.py
airflow tasks test nft-pipeline extract_nft 2021-01-01 # task ์คํ
./2-airflow/05-process.py
airflow tasks test nft-pipeline process_nft 2021-01-01 # task ์คํ
cat /tmp/processed_nft.csv # ๊ฒฐ๊ณผ ํ์ธ
./2-airflow/06-store.py
airflow tasks test nft-pipeline store_nft 2021-01-01 # task ์คํ
# docker์์๋ 'airflow.db' ๊ฐ ๋ฐ๋ก ์๋๋ฏ. ๊ทธ๋์ ํด๊ฒฐ์ ๋ชปํ์.
./2-airflow/07-dependency.py
airflow์์ DAG ํ์ฑํ ํด์ ์์ฐจ์ ์ผ๋ก ์คํ๋๋์ง ํ์ธ.
- ๋งค์ผ ์ฃผ๊ธฐ์ ์ผ๋ก ๋์๊ฐ๋ ํ์ดํ๋ผ์ธ์ ๋ฉ์ท๋ค๊ฐ ๋ช์ผ ๋ค ์คํ์ํค๋ฉด ์ด๋ป๊ฒ ๋ ๊น?
- ์๋ฅผ ๋ค์ด, ํ๋ฃจ์ ํ๋ฒ์ฉ ๋์๊ฐ๋ DAG๊ฐ 1์1์ผ์ ์คํ๋๋ค๊ฐ, 1์2์ผ์ ๋ฉ์ท์๊ณ 1์4์ผ์ ๋ค์ ์์ํ๋ฉด ์ด๋ป๊ฒ ๋ ๊น?
- DAG ์ค์ ์ฝ๋์
catchup(False)์ด๋ฉด 1์ 4์ผ์ ๋ค์ ์์ํ๋ฉด 1์ 4์ผ๊ธฐ์ค์ผ๋ก ๋์๊ฐ๋ค. - DAG ์ค์ ์ฝ๋์
catchup(True)์ด๋ฉด 1์ 4์ผ์ ๋ค์ ์์ํ๋ฉด 1์ 2์ผ๊ธฐ์ค์ผ๋ก ๋์๊ฐ๋ค.
- DAG ์์ ๋ ์ง๋ฅผ
2021-01-01๋ก ํด๋๊ณ , ํ์ฌ2022-08-06์catcup(True)๋กํ๋ฉด ์ด๋ป๊ฒ ๋ ๊น?- ๊ธฐ์กด์ ์ด๋ฏธ ์คํ๋๊ฒ ์์ผ๋ฉด ๋์๊ฐ์ง ์๋๋ค. -> ๊ธฐ์กด DAG๋ฅผ ์ง์ฐ๊ณ , Browse -> DAG Run -> nft-pipeline ์ ๊ฑฐ
- ์ ๊ฑฐํ๊ณ ๋๋ฉด ๋ฐ๋ก 1์1์ผ๋ถํฐ ๊ฑฐ์ 1๋ ์น๊ฐ ๋์์ ๋์๊ฐ๊ฒ ๋๋ค.
1. webserver docker ์ ์
2. pip install apache-airflow-providers-apache-spark
3. fhvhv_tripdata_2020-03.csv ํ์ผ webserver๋ก ์ ์ก
# webserver docker์์ count_trips.py ์์ฑ
# ํจํค์ง๋ฅผ ๊ฐ์ ธ์ค๊ณ
from pyspark import SparkConf, SparkContext
import pandas as pd
# Spark ์ค์
conf = SparkConf().setMaster("local").setAppName("uber-date-trips")
sc = SparkContext(conf=conf)
# ์ฐ๋ฆฌ๊ฐ ๊ฐ์ ธ์ฌ ๋ฐ์ดํฐ๊ฐ ์๋ ํ์ผ
directory = "/home/airflow/data"
filename = "fhvhv_tripdata_2020-03.csv"
# ๋ฐ์ดํฐ ํ์ฑ
lines = sc.textFile(f"file:///{directory}/{filename}")
header = lines.first()
filtered_lines = lines.filter(lambda row:row != header)
# ํ์ํ ๋ถ๋ถ๋ง ๊ณจ๋ผ๋ด์ ์ธ๋ ๋ถ๋ถ
# countByValue๋ก ๊ฐ์ ๋ ์ง๋ฑ์ฅํ๋ ๋ถ๋ถ์ ์ผ๋ค
dates = filtered_lines.map(lambda x: x.split(",")[2].split(" ")[0])
result = dates.countByValue()
# ์๋๋ Spark์ฝ๋๊ฐ ์๋ ์ผ๋ฐ์ ์ธ ํ์ด์ฌ ์ฝ๋
# CSV๋ก ๊ฒฐ๊ณผ๊ฐ ์ ์ฅ
pd.Series(result, name="trips").to_csv("trips_date.csv")Admin -> Connectors -> ์ถ๊ฐ
Connect id: spark_local
Connection Type: Spark
Host: local
Save
airflow tasks test spark-example submit_job 2021-01-01
./2-airflow/dags/spark-example.py # ์ฝ๋ ์์น
./2-airflow/taxi-price.py
- SystemA, SystemB ๊ฐ๊ฐ ๋ฐ์ดํฐ ์์ธ ๊ฒ์ Data Lake๋ก ๋ณด๋ด๋ ํ์ดํ๋ผ์ธ์ ๊ฐ๊ฐ ๋ง๋ค์ด์ค์ผ ํจ.
- ์์คํ ์ ๋ํ ์๋ก ๊ธฐํ๊ธ์์ ์ผ๋ก ๋ณต์กํด์ง๋ค.
- ์ฌ๋ฌ๊ฐ์ง ํต์ ํ๋กํ ์ฝ์ ์ง์ํด์ผ ํ๋ค (HTTP, GRPC, TCP, MQ)
- ๋ฐ์ดํฐ ํฌ๋ฉง๋ ๋ค๋ฅด๋ค (CSV, JSON, XML)
- Point-of-failure ๊ฐ ๋ง๋ค
- ์์คํ A,B,C,D,E,F ๊ฐ๊ฐ์ ์ ๋ขฐ๋๊ฐ 99% ๋ผ๊ณ ํ์ ๋
- ์์คํ A,B,C,D,E,F๋ฅผ ๋ฌถ์์ ๋์ ์ ๋ขฐ๋ = 99% ^6 = 94.1%
- ๊ฐ๊ฐ์ ์ฐ๊ฒฐ๊ณ ๋ฆฌ ์ด๋์ ์๋ฌ๊ฐ ๋๊ณ ์๋์ง ๋ชจ๋ํฐ๋ง ํ๊ธฐ๋ ํ๋ค๋ค
- LinkedIn์์ ๊ฐ๋ฐ
- Apache Software๋ก ๋์ด๊ฐ 2011๋ ์คํ์์คํ
- Apple, eBay, Uber, ArBnB, Netflix ๋ฑ์์ ์ฌ์ฉ์ค
- ๋ถ์ฐ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ
- Source ์์คํ ์ Kafka๋ก ๋ฉ์์ง๋ฅผ ๋ณด๋ด๊ณ
- Destination ์์คํ ์ Kafka๋ก ๋ถํฐ ๋ฉ์์ง๋ฅผ ๋ฐ๋๋ค
- ํ์ฅ์ฑ์ด ์๊ณ , ์ฅ์ ํ์ฉ (fault tolerant)์ ํ๋ฉฐ, ์ฑ๋ฅ์ด ์ข๋ค.
- ์์คํ ๊ฐ ์์กด์ฑ์ ๊ฐ์ ์ ์ผ๋ก ๋ง๋ ๋ค
- ํ์ฅ์ฑ: ์ ์์คํ ์ ๋ํ ๋ ๋ง๋ค ๋ณต์ก๋๊ฐ ์ ํ์ ์ผ๋ก ์ฌ๋ผ๊ฐ๋ค
- Kafka๋ฅผ ์ด์ฉํด ํต์ ํ๋กํ ์ฝ์ ํตํฉํ๊ธฐ ์ฝ๋ค
- ํ์ฅ์ฑ: ํ๋ฃจ์ 1์กฐ๊ฐ์ ๋ฉ์์ง๋ฅผ ์ฒ๋ฆฌํ ์ ์๊ณ , Petabyte์ ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌ ๊ฐ๋ฅ
- ๋ฉ์์ง ์ฒ๋ฆฌ ์๋: 2MS
- ๊ฐ์ฉ์ฑ(availability): ํด๋ฌ์คํฐ ํ๊ฒฝ์์ ์๋
- ๋ฐ์ดํฐ ์ ์ฅ ์ฑ๋ฅ: ๋ถ์ฐ ์ฒ๋ฆฌ, ๋ด๊ตฌ์ฑ, ์ฅ์ ํ์ฉ (fault tolerant)
- ์์คํ ๊ฐ ๋ฉ์์ง ํ
- ๋ก๊ทธ ์์ง
- ์คํธ๋ฆผ ํ๋ก์ธ์ฑ
- ์ด๋ฒคํธ ๋๋ฆฌ๋ธ ๊ธฐ๋ฅ๋ค
- Netflix: ์ค์๊ฐ ๋ชจ๋ํฐ๋ง
- Expedia: ์ด๋ฒคํธ ๋๋ฆฌ๋ธ ์ํคํ ์ฒ
- Uber: ์ค์๊ฐ ๊ฐ๊ฒฉ ์กฐ์ , ์ค์๊ฐ ์์ ์์ธก
- Topic
- Kafka Broker
- Kafka Producer
- Kafka Consumer
- Kafka Partition
- Kafka Message
- Kafka Offset
- Kafka Consumer Group
- Kafka Cluster
- Zookeeper
- Producer ์ Consumer๊ฐ ์ํต์ ํ๋ ํ๋์ ์ฑ๋
- ๋ฐ์ดํฐ ์คํธ๋ฆผ์ด ์ด๋์ Publish ๋ ์ง ์ ํ๋๋ฐ ์ฐ์
- ํ ํฝ์ ํ์ผ ์์คํ ์ ํด๋์ ๊ฐ๋ ๊ณผ ์ ์ฌํ๋ค.
- Producer๋ ํ ํฝ์ ์ง์ ํ๊ณ ๋ฉ์์ง๋ฅผ ๊ฒ์ (Post)
- Consumer๋ ํ ํฝ์ผ๋ก๋ถํฐ ๋ฉ์์ง๋ฅผ ๋ฐ์์ด
- ์นดํ์นด์ ๋ฉ์์ง๋ ๋์คํฌ์ ์ ๋ ฌ๋์ด ์ ์ฅ ๋๋ฉฐ, ์๋ก์ด ๋ฉ์์ง๊ฐ ๋์ฐฉํ๋ฉด ์ง์์ ์ผ๋ก ๋ก๊ทธ์ ๊ธฐ๋ก
- Kafka Topic์ด Partition์ผ๋ก ๋๋๋ค.
- Partition์ ๋์คํฌ์ ์ด๋ป๊ฒ ์ ์ฅ์ด ๋๋์ง ๊ฐ๋ฅด๋ ๊ธฐ์ค์ด ๋๋ค.
- ์นดํ์นด์ ํ ํฝ์ ํํฐ์ ์ ๊ทธ๋ฃน์ด๋ผ๊ณ ํ ์ ์๋ค.
- ๋์คํฌ์๋ ํํฐ์ ๋จ์๋ก ์ ์ฅ
- ํํฐ์ ๋ง๋ค commit Log ๊ฐ ์์ด๊ฒ ๋๋ค
- ํํฐ์ ์ ์์ด๋ ๊ธฐ๋ก๋ค์ ์ ๋ ฌ์ด ๋์ด ์๊ณ ๋ถ๋ณ(immutable)ํ๋ค
- ํํฐ์ ์ ๋ชจ๋ ๊ธฐ๋ก๋ค์ Offset์ด๋ผ๋ ID๋ฅผ ๋ถ์ฌ๋ฐ๋๋ค.
- ์นดํ์นด์ ๋ฉ์์ง๋ Byte์ ๋ฐฐ์ด
- ํํ ๋จ์ String, JSON์ด๋ Avro ์ฌ์ฉ
- ํฌ๊ธฐ์๋ ์ ํ์ด ์์ง๋ง, ์ฑ๋ฅ์ ์ํด์๋ ์๊ฒ ์ ์งํ๋๊ฒ์ด ์ข๋ค
- ๋ฐ์ดํฐ๋ ์ฌ์ฉ์๊ฐ ์ง์ ํ ์๊ฐ๋งํผ ์ ์ฅํ๋ค (Retention Period), topic ๋ณ๋ก ์ง์ ๋ ๊ฐ๋ฅ
- Consumer๊ฐ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์๊ฐ๊ณ ๋์๋ ๋ฐ์ดํฐ๋ ์ ์ฅ๋๋ค
- Retention Period๊ฐ ์ง๋๋ฉด ๋ฐ์ดํฐ๋ ์๋์ผ๋ก ์ญ์
- ์ฅ์ ๊ฐ ์์ ๊ฒฝ์ฐ, Retention Period ๊ธฐ๊ฐ ์์ ํด๊ฒฐ์ ํด์ผ ํ๋ค.
- Retention Period ์ง๋ ํ์ ๋ฌธ์ ๊ฐ ์๊ฒผ์ ๊ฒฝ์ฐ, Data Lake ๊น์ง ๋ด๋ ค๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด์์ ํ๋ก์ธ์ฑ ํด์ผ ํ๋ค
- ๋ณด๋ด๋ ๋ฉ์์ง๋ Offset์ ๊ฐ์ง๊ฒ๋๋ค.
- Offset์ Partition์์ ๋ฉ์์ง๊ฐ ์์๋๋ก ์ ๋ ฌ๋๋๋ฐ, ์ ๋ ฌ๋ ์์ ๋ฐ ๊ฐ์ ์๋ฏธํ๋ค.
- ์นดํ์นด ํด๋ฌ์คํฐ๋ ์ฌ๋ฌ๊ฐ์ ์นดํ์นด ๋ธ๋ก์ปค(์๋ฒ)๋ฅผ ๊ฐ์ง ์ ์๋ฐ
- ์นดํ์นด ํ ํฝ์ ์์ฑํ๋ฉด ๋ชจ๋ ์นดํ์นด ๋ธ๋ก์ปค์ ์์ฑ๋๋ค
- ์นดํ์นด ํํฐ์ ์ ์ฌ๋ฌ ๋ธ๋ก์ปค์ ๊ฑธ์ณ์ ์์ฑ๋๋ค
- ์นดํ์นด์ ์๋ฒ๋ก๋ ๋ถ๋ฆฐ๋ค.
- Topic์ ์ ๋ฌํ๋ ์ญํ ์ ํ๋ค.
- Producer: ๋ฉ์์ง๋ฅผ ์ ๋ฌํ๋ ์ฃผ์ฒด
- ์นดํ์นด ํ ํฝ์ผ๋ก ๋ฉ์์ง๋ฅผ ๊ฒ์(post)ํ๋ ํด๋ผ์ด์ธํธ ์ ํ๋ฆฌ์ผ์ด์
- ๋ฉ์์ง๋ฅผ ์ด๋ ํํฐ์ ์ ๋ฃ์์ง ๊ฒฐ์ (key)
- Consumer: ๋ฉ์์ง๋ฅผ ์ ๋ฌ๋ฐ๋ ์ฃผ์ฒด
- Consumer๋ฅผ ๋ฌถ์ด์ Consumer Group์ด๋ผ๊ณ ํ๋ค.
- Consumer 1๊ฐ๊ฐ Consumer Group์ด ๋ ์ ์๊ณ , ์ฌ๋ฌ๊ฐ๊ฐ ๋ ์ ๋ ์๋ค.
- Consumer Group์ ๋ณ๋๋ก ์ง์ ์ํ๋ฉด, Consumer 1๊ฐ๋น Group1๊ฐ์ฉ ์ง์ ๋๋ค.
- ๊ฐ Consumer Group์ ๋ชจ๋ ํํฐ์
์ผ๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ์ ์๋ค.
- Consumer๋ ์ง์ ๋ ํํฐ์ ์ผ๋ก๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์ ์ ์๋ค.
- Consumer1,2๊ฐ Consumer Group์ผ๋ก ์ด๋ฃจ์ด์ ธ ์๋ ๊ฒฝ์ฐ, ๊ฐ Consumer๋ง๋ค ํน์ ์ง์ ๋ ํํฐ์ ์ ๋ํด์๋ง ๋ฐ์ดํธ๋ฅด ์ ๋ฌ ๋ฐ๊ฒ ๋๋ค.
- Partition 4๊ฐ, Consumer Group์์ Consumer๊ฐ 3๊ฐ ์๋๊ฒฝ์ฐ, 3๊ฐ ๊ฐ ํํฐ์ ๋ง๋ค Consumer์ ํ ๋น๋๊ณ ๋จ์ 1๊ฐ์ ํํฐ์ ์ Consumer์ค์ ๋๋ค์ผ๋ก ๋ฐฐ์ ๋๋ค.
- ๊ทผ๋ฐ ์ฌ๊ธฐ์์ Consumer Group์์ Consumer๊ฐ 1๊ฐ๊ฐ ์ถ๊ฐ๋๋ ๊ฒฝ์ฐ Rebalancing์ด ์ผ์ด๋๋ค.
- ๋จ์ 1๊ฐ์ ํํฐ์ ์ด ์๋ก ์ถ๊ฐ๋ Consumer๋ก ์ ๋ฌ๋๋๋ก Rebalancing์ด ์ผ์ด๋๋ค.
- Consumer๊ฐ ์ ๊ฑฐ๋๊ฑฐ๋ ์ถ๊ฐ๋ ๋ rebalancing์ด ์ด๋ฃจ์ด ์ง๋ค.
- ์นดํ์นด ํด๋ฌ์คํฐ์ ์ฌ๋ฌ ์์๋ค์ ์ค์ ํ๋๋ฐ ์ฌ์ฉ๋จ
- ๋ฉํ๋ฐ์ดํฐ ์ค์ , ํ ํฝ ์ค์ , Replication Factor ๋ฑ์ ์กฐ์ ํ๋๋ฐ ์ฌ์ฉ
- ๋ถ์ฐ ์์คํ ๊ฐ์ ์ ๋ณด ๊ณต์ , ์ํ ์ฒดํฌ, ์๋ฒ๋ค ๊ฐ์ ๋๊ธฐํ
- ๋ถ์ฐ ์์คํ ์ ์ผ๋ถ์ด๊ธฐ ๋๋ฌธ์ ๋์์ ๋ฉ์ถ๋ค๋ฉด ๋ถ์ฐ ์์คํ ์ ์ํฅ
- ์ฃผํคํผ ์ญ์ ํด๋ฌ์คํฐ๋ก ๊ตฌ์ฑ
- ํด๋ฌ์คํฐ๋ ํ์๋ก ๊ตฌ์ฑ๋์ด ๋ฌธ์ ๊ฐ ์๊ฒผ์ ๊ฒฝ์ฐ ๊ณผ๋ฐ์๊ฐ ๊ฐ์ง ๋ฐ์ดํฐ๋ฅผ ๊ธฐ์ค์ผ๋ก ์ผ๊ด์ฑ ์ ์ง
- ํ๋์ผ
- ํด๋ฌ์คํฐ๊ด๋ฆฌ: ํด๋ฌ์คํฐ์ ์กด์ฌํ๋ ๋ธ๋ก์ปค๋ฅผ ๊ด๋ฆฌํ๊ณ ๋ชจ๋ํฐ๋ง
- Topic ๊ด๋ฆฌ: ํ ํฝ ๋ฆฌ์คํธ๋ฅผ ๊ด๋ฆฌํ๊ณ ํ ํฝ์ ํ ๋น๋ ํํฐ์ ๊ณผ Replication๊ด๋ฆฌ
- ํํฐ์ ๋ฆฌ๋ ๊ด๋ฆฌ: ํํฐ์ ์ ๋ฆฌ๋๊ฐ ๋ ๋ธ๋ก์ปค๋ฅผ ์ ํํ๊ณ , ๋ฆฌ๋๊ฐ ๋ค์ด๋ ๊ฒฝ์ฐ ๋ค์ ๋ฆฌ๋๋ฅผ ์ ํ
- ๋ธ๋ก์ปค๋ค๋ผ๋ฆฌ ์๋ก๋ฅผ ๋ฐ๊ฒฌํ ์ ์๋๋ก ์ ๋ณด ์ ๋ฌ
- Key ์์ด ์ ์ก: Producer๊ฐ ๋ฉ์์ง๋ฅผ ๊ฒ์ํ๋ฉด Round-Robin ๋ฐฉ์์ผ๋ก ํํฐ์ ์ ๋ถ๋ฐฐํ๋ค.
- Key ์ํจ๊ป ์ ์ก: ๊ฐ์ Key๋ฅผ ๊ฐ์ง ๋ฉ์์ง๋ค์ ๊ฐ์ ํํฐ์ ์๊ฒ ๋ณด๋ด์ง๋ค
- ๊ฐ ๋ธ๋ก์ปค๋ ๋ณต์ ๋ ํํฐ์ ์ค ๋ํ๋ฅผ ํ๋ ํํฐ์ ๋ฆฌ๋๋ฅผ ๊ฐ์ง๊ฒ ๋๋ค.
- ๋ชจ๋ Read/Write๋ ํํฐ์ ๋ฆฌ๋๋ฅผ ํตํด์ ์ด๋ฃจ์ด์ง๊ฒ ๋จ
- ๋ค๋ฅธ ํํฐ์ ๋ค์ ํํฐ์ ๋ฆฌ๋๋ฅผ ๋ณต์
- Partition์ 1๊ฐ๋ก ๋ง๋ค์ด๋๊ณ Consumer Group์์ Consumer๋ฅผ 2๊ฐ๋ก ๋ง๋ ๋ค๋ฉด
- Producer์์ ๋ฐ์ดํฐ๋ฅผ ์๋ฌด๋ฆฌ ๋ณด๋ด๋ Consumer1 ๋ก๋ง ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ด๊ฒ๋๋ค.
- Partition์ 2๊ฐ๋ก ๋ง๋ค์ด๋๊ณ Consumer Group์์ Consumer๋ฅผ 2๊ฐ๋ก ๋ง๋ ๋ค๋ฉด
- Producer์์ ๋ฐ์ดํฐ๋ฅผ ๋ณด๋ด๋ฉด, Consumer1,2 ๊ฐ๊ฐ์ ๊ท ๋ฑํ๊ฒ ๋ณด๋ด๊ฒ ๋๋ค.
pip install kafka-python
./3-kafka/consumer.py
./3-kafka/producer.py
# m1 ์์๋ ์ ์๋ํจ.
./3-kafka/docker-compose.yml
docker exec -it 03-kafka_kafka1_1 kafka-topics --bootstrap-server=localhost:19091 --create --topic first-cluster-topic --partitions 3 --replication-factor 1
# kafdrop ์์๋ ui๋ก topic ์์ฑ ๊ฐ๋ฅ
./3-kafka/trips_producer.py
./3-kafka/trips_consumer.py
- payment_producer์์ ๋๋ค payment ๋ฐ์ดํฐ๋ค์ kafka payment ํ ํฝ์ผ๋ก ์ ์ก ํ๋ค. (producer)
- fraud_detector์์ payment ํ ํฝ์์ ๋ฐ์ดํฐ๋ฅผ ์ ๋ฌ๋ฐ์ ๋นํธ์ฝ์ธ ๋ฐ์ดํฐ๋ฉด fraud_payments(์ฌ๊ธฐ) ํ ํฝ์ผ๋ก ์ ์กํ๊ณ ์ ์ ๋ฐ์ดํฐ๋ฉด legit_payments ํ ํฝ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ ์กํ๋ค. ์ฆ, consumer์ producer๊ฐ ๋๋ค ๊ณต์กดํ๊ณ ์๋ค.
- legit_processor๋ ์ ์ ๋ฐ์ดํฐ๋ค์ ์ ๋ฌ๋ฐ์ ์ฒ๋ฆฌํ๋ consumer์ด๋ค.
- fraud_processor๋ ๋น์ ์ ๋ฐ์ดํฐ๋ค์ ์ ๋ฌ๋ฐ์ ์ฒ๋ฆฌํ๋ consumer์ด๋ค.
./3-kafka/fraud_detection/*
- Spark: ๋ฐฐ์น ํ๋ก์ธ์ฑ์ ์ํ ํ๋ ์์ํฌ
- Flink: ์คํธ๋ฆผ ํ๋ก์ธ์ฑ์ ์ํ ํ๋ ์์ํฌ
- 2009๋ ๊ฐ๋ฐ ์์ ~ 2016๋ ์ฒซ stable ๋ฒ์ ๊ณต๊ฐ
- ์คํ์์ค ์คํธ๋ฆผ ํ๋ก์ธ์ฑ ํ๋ ์์ํฌ
- ๋ถ์ฐ์ฒ๋ฆฌ / ๊ณ ์ฑ๋ฅ / ๊ณ ๊ฐ์ฉ์ฑ
- ๋ฐฐ์น ํ๋ก์ธ์ฑ ๋ํ ์ง์ํ๋ค
- Spark๋ณด๋ค ๋น ๋ฅธ ์๋
- Fault-tolerance: ์์คํ ์ฅ์ ์ ์ฅ์ ์ง์ ์ผ๋ก ๋์๊ฐ์ ๋ค์ ์์ํ ์ ์๋ค
- ํ๋ฐํ ๊ฐ๋ฐ - ๊ทธ๋ํ ํ๋ก์ธ์ฑ, ๋จธ์ ๋ฌ๋, ํ ์คํธ ์ฒ๋ฆฌ, ๋ฑ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ฌ๋ฌ๊ฐ์ง ๋ผ์ด๋ธ๋ฌ๋ฆฌ / ํ๋ ์์ํฌ์ ์ฐ๋
- Rescalability: ์คํ ๋์ค ๋ฆฌ์์ค ์ถ๊ฐ ๊ฐ๋ฅ
- ๋ฐฐ์น ํ๋ก์ธ์ฑ์ ํ์ ๋ ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ง๊ณ ๋ค๋ค๋ค๋ฉด, ์คํธ๋ฆผ ํ๋ก์ธ์ฑ์ ๋ฌดํํ๊ฒ ๋ฐ์ดํฐ๊ฐ ๋ค์ด์ฌ ์ ์์ ๋ ๋ค๋ฃฌ๋ค.
- ์ฃผ์ ๊ฑฐ๋์
- ์น ์๋ฒ
- ์ผ์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ
- ์ด๋ฒคํธ ๋๋ฆฌ๋ธ ์ดํ๋ฆฌ์ผ์ด์
- ๋น์ ์ ๊ฑฐ๋ ํ์ง
- Batch
- ํ์ ๋ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ๋ ์ฌ์ฉ
- ๋ชจ๋ ๋ฐ์ดํฐ์ ์ ์ฝ์ ํ ์ฒ๋ฆฌ ๊ฐ๋ฅ
- ์ฃผ๊ธฐ์ ์ผ๋ก ์คํ๋๋ ์์
- ์ฒ๋ฆฌ์๋๋ณด๋ค๋ ์ฒ๋ฆฌ๋์ ํฌ์ปค์ค
- Stream
- ๋ฐ์ดํฐ๊ฐ ๋ฌดํ์ด๋ผ๊ณ ๊ฐ์
- ๋ฐ์ดํฐ๊ฐ ๋์ฐฉํ ๋ ๋ง๋ค ์ฒ๋ฆฌ
- ์ค์๊ฐ์ผ๋ก ์คํ๋๋ ์์
- ์ฒ๋ฆฌ๋๋ณด๋ค ์ฒ๋ฆฌ์๋์ ํฌ์ปค์ค
- Streaming Dataflow:
- Sources: ํ๊ฐ ํน์ ์ฌ๋ฌ๊ฐ์ ๋ฐ์ดํฐ ์์ค๊ฐ ์์ ์ ์๋ค
- Operators: ๋ฐ์ดํฐ๋ฅผ ๋ณํ (transformation)
- Sink: ๋ฐ์ดํฐํ๋ก์ฐ์ ๋ง์ง๋ง ๋ถ๋ถ
- ์ฌ๋ฌ ๋ฐ์ดํฐ ์์ค๋ก ๋ถํฐ ์ฝ์ด์์, Sink๋ฅผ ํตํด ์ฌ๋ฌ ๋ฐ์ดํฐ ์์ค๋ก ๋ณด๋ผ ์ ์๋ค.
- Hadoop
- Batch Processing
- Disk์์ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ณ ์ฒ๋ฆฌ
- Spark
- Hadoop์์ ๊ฐ์ ํด์ ๋ง๋ ํ๋ก์ ํธ
- Hadoop์ ๋นํด ์๋๊ฐ ๋น ๋ฅด๋ค
- Batch Processing
- (Batch based Streaming) -> micro batch๋ก streaming์ ํ ์ ์๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ์๋ค
- In-Memory ๋ฐ์ดํฐ ์ฒ๋ฆฌ
- Flink
- Stream Processing
- In-Memory ๋ฐ์ดํฐ ์ฒ๋ฆฌ
- Hadoop
- Input ---Mapper--> ์ํ1 ---Mapper(Disk)--> Reducer --> Output
- Mapper๋ฅผ ํตํด์ Reducer์ ์ ๋ฌ์ด ๋ ๋ Disk๋ฅผ ๊ฑฐ์น๊ธฐ ๋๋ฌธ์ ๊ณ ์ฑ๋ฅ์ ๋ด๊ธฐ ํ๋ค๋ค (Disk๋ฅผ ๊ฑฐ์น๋๊ฒ ์๊ฐ ์์๊ฐ ๋ง์ด ๋๋ค)
- Spark
- Input --> ์ํ1 --Transformation(in-memory)--> ์ํ2 --> Output
- in-memory tranformation์ ํตํด์ Hadoop์ ๋นํด ํจ์ฌ ์ฑ๋ฅ์ด ๋น ๋ฅด๋ค.
- Flink
- Input --> ์ํ1 --Transformation(in-memory)--> ์ํ2 --> Output
- Flow์์ฒด๋ Spark์ ๋งค์ฐ ์ ์ฌํ๋ฐ, Batch Processing์ด๋ Stream Processing์ด๋ ์ฐจ์ด๊ฐ ์๋ค.
- Hadoop
- ๋ฐ์ดํฐ ์ฒ๋ฆฌ ๋ฐฉ๋ฒ์ ์์ ์ฝ๋ฉํด์ค์ผ ํ๋ค
- ๋ฎ์ ๋จ๊ณ์ ์ถ์ํ
- Spark
- ๋์ ๋จ๊ณ์ ์ถ์ํ
- ์ฌ์ด ํ๋ก๊ทธ๋๋ฐ
- RDD
- Flink
- ๋์ ๋จ๊ณ์ ์ถ์ํ
- ์ฌ์ด ํ๋ก๊ทธ๋๋ฐ
- Dataflows
- Spark & Flink ๋ชจ๋ ๊ฐ๋ฐ ์ปค๋ฎค๋ํฐ๊ฐ ํ์ฑํ ๋์ด ์๊ณ , API ๋ผ์ด๋ธ๋ฌ๋ฆฌ๊ฐ ๊ฐ๋ฐ์ด ์ ๋์ด ์๋ค
- ์) Spark - MLlib, Flink - FlinkML
- Spark
- Spark๋ ์ง์ ํ ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๊ฐ ์๋๋ค
- ์คํํฌ์ ์์ง์ ๋ฐฐ์น ํ๋ก์ธ์ฑ ๊ธฐ์ค
- ๋ง์ดํฌ๋ก ๋ฐฐ์นญ
- Flink
- ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ
- ํ๋งํฌ์ ์์ง์ ์คํธ๋ฆผ ํ๋ก์ธ์ฑ ๊ธฐ์ค
- ๋ง์ดํฌ๋ก ๋ฐฐ์น: ๋ฐ์ดํฐ ์ค ์ผ๋ถ๋ถ ๋ผ์์ ๋ฐฐ์น ํ๋ก์ธ์ฑ
- Window: ์๊ฐ์ ์ ํ ํ, ๊ทธ ์๊ฐ๋ถํฐ 10์ด ์ฌ์ด์ ๋ฐ์ดํฐ๋ฅผ window๋ก ๋ฌถ์ด ์ฌ์ฉ
- Spark
- Scala๋ก ๊ฐ๋ฐ๋์ด ์์
- ํจ์จ์ ์ธ ๋ฉ๋ชจ๋ฆฌ ๊ด๋ฆฌ๊ฐ ์ด๋ ต๋ค
- Out of Memory ์๋ฌ๊ฐ ์์ฃผ ๋ฐ์
- ์์กด์ฑ ๊ด๋ฆฌ๋ก DAG ์ฌ์ฉ
- Flink
- Java๋ก ๊ฐ๋ฐ๋์ด ์์
- ๋ด์ฅ ๋ฉ๋ชจ๋ฆฌ ๋งค๋์
- Out of Memory ์๋ฌ๊ฐ ์์ฃผ ์๋๋ค
- Controlled cyclic dependency graph (ML ๊ฐ์ด ๋ฐ๋ณต์ ์ธ ์์ ์ ์ต์ ํ)
- Flink๋ ์๋ ์คํ๋ค์ ๊ฐ๊ณ ์๋ ์ฒซ๋ฒ์งธ ์คํ์์ค ํ๋ ์์ํฌ
- ํด๋ฌ์คํฐ๋ฅผ ์ด๋ฃจ๊ณ 100๋ง ๋จ์์ ์ด๋ฒคํธ๋ฅผ ์ฒ๋ฆฌ
- Latency ๊ฐ 1์ด ์ดํ(sub-second)
- Exactly-once: 1๋ฒ ์ด์์ ์ฒ๋ฆฌ๋ฅผ ๋ณด์ฅ -> ๋ณดํต ๋ค๋ฅธ ์์คํ ๋ค์ at least once๊ฐ ๋๋ถ๋ถ์ด๋ค (ํ ๋ฒ ์ด์์ ์ฒ๋ฆฌ๋ฅผ ํ๊ฑฐ๋ ๋ณด์ฅ์ ๋ชปํ๊ณ ์ค๋ณต์ผ๋ก ์ฒ๋ฆฌํ ์ ๋ ์๊ณ ๋ฐ์ดํฐ๋ฅผ ์์ด๋ฒ๋ฆด์๋ ์์)
- ์ ํํ ๊ฒฐ๊ณผ๋ฅผ ๋ณด์ฅ
- Storage
- Deployment/Environment
- Engine
- Flink๋ Spark์ ๋ง์ฐฌ๊ฐ์ง๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌ๋ง ํ๋ ์์คํ ์ด๋ค.
- ๋ฐ๋ผ์, ๊ฐ์ข
์ ์ฅ ์์คํ
๋ค๊ณผ ์ฐ๋์ด ๊ฐ๋ฅํ๋๋ก ์ค๊ณ
- HDFS
- Local File System
- Mongo DB
- RDBMS (MySQL, Postgres)
- S3
- Rabbit MQ
- ๋ฆฌ์์ค ๊ด๋ฆฌ๋ ์ฌ๋ฌ ์์คํ
๊ณผ ์ฐ๋ํ์ฌ ์ด์ฉ ๊ฐ๋ฅํ๋ค.
- Local
- Standalone ํด๋ฌ์คํฐ
- YARN
- Mesos
- AWS / GCP
- SQL: High-level Language
- Table API: Declarative DSL
- Data Stream / DataSet API: Core APIs
- Stateful Stream Processing: Low-level building block (streams, state, [event] time)
- 4๋ฒ์ ๊ทธ๋๋ก ์ธ์๋ ์์ง๋ง, ๋ณดํต์ ์ด ํํธ๋ฅผ ์ฐ์ง ์๋๋ค.
- ์ค์ ๋ก๋ 3๋ฒ์ ์ฌ์ฉํ๊ฒ ๋๋๋ฐ, Data Stream์ ์คํธ๋ฆผ ํ๋ก์ธ์ฑํ ๋ ์ฌ์ฉํ๊ณ , Dataset API๋ ๋ฐฐ์น ํ๋ก์ธ์ฑํ ๋ ์ฌ์ฉํ๋๋ฐ
- Data Set API๋ ์ ์ ์์ฐ๋ ์ถ์ธ์ด๋ฉฐ ๊ณง Deprecated ๋ ์ ์๋ค.
- 2๋ฒ: SparkSQL๊ณผ ๋น์ทํ๊ฒ ํ๋ก๊ทธ๋๋ฐ์ ์ ์ธ์ ์ผ๋ก ํ ์ ์๋๋ก ํด์ค. Spark์๋ ๋ค๋ฅด๊ฒ Table์ด Dynamicํ๊ฒ ๋ณ๊ฒฝ๋๋์ ์ด ๋ค๋ฅด๋ค.
- 1๋ฒ: ๊ฐ์ฅ ๋์ ๋จ๊ณ์ ์ถ์ํ, SQL๋ก ํ๋ก๊ทธ๋๋ฐ์ ํ ์ ์๋ค.
- Flink๋ ์ฌ๋ฌ Connector ๋ค๊ณผ ์ฐ๊ฒฐ ๊ฐ๋ฅ
- sink๋ ๋ฐ์ดํฐ๋ฅผ ์ ์ฅํ๋ ๊ณณ, source๋ ๋ฐ์ดํฐ๋ฅผ ์
๋ ฅ์ ๋ฐ์ ์ ์๋ ๊ณณ
- Apache Kafka (sink / source)
- Elastic Search (sink)
- HDFS (sink)
- RabbitMQ (sink, source)
- Amazon Kinesis (sink, source)
- Twitter Streaming API (source)
- Apache Cassandra (sink)
- Redis (sink)
- Apache Zepplin - ์น ๋ฒ ์ด์ค ๋ ธํธ๋ถ
- Apache Mahout - ๋จธ์ ๋ฌ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ
- Cascading - Workflows ๋งค๋์ง๋จผํธ
- Apache Beam - Data pipeline ์์ฑ / ๊ด๋ฆฌ ํด
- Source -> Operations,Transformations -> Sink
- Source: RDB, Kafka, Local file
- Sink: Kafka, HDFS, RDB ๋ฑ
- event ๊ฐ๊ฐ์ ๋ ๋ฆฝ์ ์ผ๋ก ์ฒ๋ฆฌํ๋ฉด state๊ฐ ํ์ ์๋ค
- ์ฌ๋ฌ event๋ฅผ ํ๊บผ๋ฒ์ ๋ณด๋ ค๊ณ ํ ๋ state๊ฐ ํ์ํ๋ค - stateful
- ์
- ํจํด์ ์ฐพ๋ ์ผ
- ๋ฐ์ดํฐ๋ฅผ ์๊ฐ๋ณ๋ก ํฉ์น๋ ์ผ
- ๋จธ์ ๋ฌ๋ ํธ๋ ์ด๋
- ๊ณผ๊ฑฐ์ ๋ฐ์ดํฐ๋ฅผ ์ฐธ๊ณ ํด์ผํ๋ ์ผ
- flink๋ ๋ฐ๋ผ์ state๋ฅผ ๊ฐ๊ณ ์๋ค
- checkpoints์ savepoints๋ก state๋ฅผ ์ ์ฅํด์ ๋ด๊ฒฐํจ์ฑ์ ๊ฐ๋๋ก ์ค๊ณ
- queryable state๋ฅผ ์ด์ฉํด์ ๋ฐ์์ state๋ฅผ ๊ด์ฐฐํ ์๋ ์๋ค
- Data Stream API๋ฅผ ์ฌ์ฉํ ๋ ์ฌ๋ฌ๊ฐ์ง ๊ฒฝ์ฐ๋ก state๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋ค
- Window๋ก ๋ฐ์ดํฐ ๋ชจ์๋ณด๊ธฐ
- Transformations (key-value state)
- CheckpointedFunction์ผ๋ก ๋ก์ปฌ ๋ณ์๋ฅผ fault tolerantํ๊ฒ ๋ง๋ค๊ธฐ
- HashMapStateBackend
- Java Heap์ ์ ์ฅ
- Hash Table์ ๋ณ์์ Trigger๋ฅผ ์ ์ฅ
- ํฐ state, ๊ธด windows, ํฐ key/value ์์ ์ ์ฅํ ๋ ๊ถ์ฅ
- ๊ณ ๊ฐ์ฉ์ฑ ํ๊ฒฝ
- ๋ฉ๋ชจ๋ฆฌ ์ฌ์ฉ์ผ๋ก ๋น ๋ฅธ ์ฒ๋ฆฌ
- EmbeddedRocksDBStateBackend
- RocksDB์ ์ ์ฅ
- ๋ฐ์ดํฐ๋ byte array๋ก ์๋ฆฌ์ผ๋ผ์ด์ฆ ๋์ด ์ ์ฅ
- ๋งค์ฐ ํฐ state, ๊ธด window, ํฐ key/value state ์ ์ฅ
- ๊ณ ๊ฐ์ฉ์ฑ ํ๊ฒฝ
- Disk์ Serialize ์ฌ์ฉ์ผ๋ก ์ฑ๋ฅ์ ๋ค๋จ์ด์ง๊ณ / ์ฒ๋ฆฌ๋์ด ๋์ด๋๋ค (tradeoff)
- Key-Value store
- Keyed stream์์๋ง ์ด์ฉ ๊ฐ๋ฅ
- ์๋ฅผ๋ค์ด ๊ฐ ์ด๋ฒคํธ๋ id, value ์คํค๋ง๋ฅผ ๊ฐ๊ณ
- ๊ฐ id๋ง๋ค value๋ฅผ ๋ํ๊ณ ์ถ์ ๋ keyed state๋ฅผ ์ด์ฉ
- ์ฅ์ ํ์ฉ์ ๊ฐ๋ฅํ๊ฒ ํด์ฃผ๋ ๊ธฐ๋ฅ๋ค
- Stream replay
- Checkpointing
- checkpoint๋ฅผ ์ผ๋ง๋ ์์ฃผ ์ ์ฅํด์ผ ํ๋.
- Trade off ์กด์ฌ
- ๊ฐ๋ฒผ์ด state๋ฅผ ๊ฐ์ง ํ๋ก๊ทธ๋จ์ ์์ฃผ ์ ์ฅํด์ฃผ์ด๋ ๋๋ค
- checkpoint๋ฅผ ํ ์ดํ์ ์์คํ
์ด ๋ง๊ฐ์ง ๊ฒฝ์ฐ
- ํ๋งํฌ๋ ์๋์ ๋ฉ์ถ๊ณ
- ์ฒดํฌํฌ์ธํธ๋ก ๋ฆฌ์
- ๋ถ์ฐ๋ ๋ฐ์ดํฐ ์คํธ๋ฆผ์์ ์ด๋ป๊ฒ snapshot์ ๋ง๋ค๊น
- Chandy-Lamport ์๊ณ ๋ฆฌ์ฆ.
- ๋น๋๊ธฐ์ ์ผ๋ก ์คํ
- ๋ฐ์ดํฐ๋ฅผ ์๊ฐ๋ณ๋ก ๋๋๋ barrier๋ฅผ ์ฝ์ ํด snapshot์ด ๊ฐ๋ฅํ๋ค
- barrier๋ ๊ฐ๋ฒผ์์ ์คํธ๋ฆผ์ ๋ฐฉํด๋์ง ์๋๋ก ์ค๊ณ
- Sink operator๊ฐ barrier๋ฅผ ๋ฐ์์ ์๋ก์ด checkpoint๋ฅผ ๋ง๋ ๋ค
- ์ฌ์ด์ฌ์ด์ ๋ผ์ด๋์ barrier๋ฅผ ๊ธฐ๋ก์ ํ๋ ๊ณผ์ ์ด๋ค.
- barrier์ state๋ฅผ ๊ธฐ๋กํ๋ค.
- ๋ฐ์ดํฐ๊ฐ ์ค๋๋๋ก ๋ฐ์๋ค์ฌ ์ฒดํฌํฌ์ธํธ ๋ง๋ค๊ธฐ
- ๋น ๋ฅธ ์๋๋ฅผ ์ํ ํ๋ก๊ทธ๋จ์ ๋ง๋ค ๋ ์ฌ์ฉ
- Exactly-once ๋ณด๋ค๋ at-least-once๋ฅผ ๋ณด์ฅํ๋ค.
- ์ฅ์ ๊ฐ ๋๋ฉด ๋ง์ง๋ง ์ฒดํฌํฌ์ธํธ๋ฅผ ๋ถ๋ฌ์จ๋ค
- ์์คํ ์ dataflow ์ ์ฒด๋ฅผ re-deploy ํ๋ค
- ๊ฐ operator์๊ฒ ์ฒดํฌํฌ์ธํธ์ state ์ ๋ณด๋ฅผ ์ฃผ์ ํ๋ค
- ์ ๋ ฅ stream๋ ์ฒดํฌํฌ์ธํธ์ผ ๋๋ก ๋๋ ค๋๋๋ค - ์ ๋ ฅ ์คํธ๋ฆผ ์์ฒด๊ฐ ์ฒดํฌํฌ์ธํธ๋ก ๋๋ ค๋๋ ์์ ์ ์ง์ํด์ผ ํ๋ค(์ด๋ฐ์ ์์ kafka๋ ๊ถํฉ์ด ์ ๋ง๋๋ค)
- ์ฌ์์
- ์ฌ์ฉ์๊ฐ ์ง์ ํ ์ฒดํฌํฌ์ธํธ
- ๋ค๋ฅธ ์ฒดํฌํฌ์ธํธ์ฒ๋ผ ์๋์ผ๋ก ์์ด์ง์ง ์๋๋ค
- ๋ถ์ฐ ํ๊ฒฝ์์ ์ฒดํฌํฌ์ธํธ ์ ๋ ฌ ์ฌ๋ถ
- ์๋๊ฐ ์ค์ํ ๊ฒฝ์ฐ at least once ์ฌ์ฉ
- Time Series Analysisํ ๋
- Windows์ธ ๋
- Event time์ด ์ค์ ํ ๋
- Event Time
- Processing Time
- ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ ์์คํ ์ ์๊ฐ
- Hourly time window
- 9:15๋ถ ์์คํ ์์
- 9:15 - 10:00
- 10:00 - 11:00
- ๊ฐ์ฅ ๋น ๋ฅธ ์ฑ๋ฅ๊ณผ Low Latency
- ํ์ง๋ง ๋ถ์ฐ๋๊ณ ๋น๋๊ธฐ์ ์ธ ํ๊ฒฝ์์๋ ๊ฒฐ์ ์ (deterministic) ์ด์ง ๋ชปํ๋ค
- ์ด๋ฒคํธ๊ฐ ์์คํ ์ ๋๋ฌํ๋ ์๋์ ๋ฌ๋ ธ๊ธฐ ๋๋ฌธ์
- Event๊ฐ ์์ฑ๋ ๊ณณ์์ ๋ง๋ค์ด์ง ์๊ฐ
- Flink์ ๋๋ฌํ๊ธฐ ์ ์ด๋ฒคํธ ์์ฒด์ ๊ธฐ๋ก ๋ณด๊ด
- ์๊ฐ์ ์์คํ ์ด ์๋๋ผ data ์์ฒด์ ์์กด
- ์ด๋ฒคํธ ํ์ ํ๋ก๊ทธ๋จ์ Event Time Watermark๋ฅผ ์์ฑํด์ผ ๋๋ค
- Evnet Time์ ์์กดํ๋ ์์คํ ์ ์๊ฐ์ ํ๋ฆ์ ์ฌ๋ ๋ฐฉ๋ฒ์ด ๋ฐ๋ก ํ์ํ๋ค
- ์) 1์๊ฐ ์ง๋ฆฌ window operation์ด๋ฉด 1์๊ฐ์ด ํ๋ ๋ค๋ ๊ฒ์ ์์์ผ ํ๋ค
- Event Time๊ณผ Processing Time์ ์ฑํฌ๊ฐ ์๋ง์ ์ ์๋ค
- ์) 1์ฃผ ์ง๋ฆฌ ๋ฐ์ดํฐ๋ฅผ ๋ช์ด ๋ง์ ๊ณ์ฐํ ์ ์๋ค
- ๊ทธ๋์ ๋์จ๊ฒ์ด watermark
- Flink๊ฐ event time์ ํ๋ฆ์ ์ฌ๋ ๋ฐฉ๋ฒ
- Watermark(t): timestamp t <= t (์ ์ด๋ t๊น์ง ์๋ค)
- ์ฌ๋ฌ input stream์ ๋ฐ๋ operator์ ๊ฒฝ์ฐ ๊ฐ์ฅ ๋ฎ์ event time์ ์ฌ์ฉ
- ๋ถ์ฐ ์์คํ ์ผ๋ก์ ์ปดํจํ ๋ฆฌ์์ค ๋ถ๋ฐฐ๊ฐ ํจ์จ์ ์ด์ด์ผ ํ๋ค
- ๋ฆฌ์์ค ๋งค๋์ ์ ์ข
๋ฅ
- YARN
- Kubernetes
- Task ์ค์ผ์ค๋ง (๋ค์ Task๊ฐ ์ธ์ ์คํ ๋ ์ง)
- ์คํจ/์๋ฃ๋ Tasks ๊ด๋ฆฌ
- ์ฒดํฌํฌ์ธํธ ๊ด๋ฆฌ
- ์คํจ์ Recovery
- 3๊ฐ์ง์ ์ปดํฌ๋ํธ
- Resource Manager - task solt ๊ด๋ฆฌ
- Dispatcher - Flink app์ ๋ฑ๋ก ํ๋ REST API & web UI
- JobMaster - 1๊ฐ์ JobGraph ๊ด๋ฆฌ
- aka workers
- Dataflow์ task๋ฅผ ์คํํ๋ ์ฃผ์ฒด
- Task slot - ํ ์คํฌ ๋งค๋์ ๋ฅผ ์ค์ผ์ค๋งํ๋ ๊ฐ์ฅ ์์ ๋จ์
- Task slot์ผ๋ก ๋์์ ์คํ๋ ์ ์๋ tasks ์ค์
- Task Worker (TaskManager)๋ JVM ํ๋ก์ธ์ค
- ์ฌ๋ฌ ์ฐ๋ ๋์์ ํ๋ ํน์ ์ฌ๋ฌ๊ฐ์ sub task๋ฅผ ์คํ ๊ฐ๋ฅ
- ํ๋์ TaskManager๊ฐ ๊ฐ์ง ์ ์๋ Task ์๋ Task Slot์ผ๋ก ์กฐ์
- 2019๋ 8์ Pyflink (Table API) ๋ฒ ํ ๋ฒ์
- 2020๋ 2์ Apache-flink๊ฐ pypi์์ ๋ค์ด๋ก๋ ๊ฐ๋ฅ ํด์ง
- 2020๋ 7์ Python UDF, SQL, UDF metrics, Pandas UDFs
- ํ์ด์ฌ์ธ ์ด์
- Data Sciencedhk ๊ฐ์ฅ ๊ฐ๊น์ด ์ธ์ด
- ๋จธ์ ๋ฌ๋ ํ๋ ์ด๋ฌด์ดํฌ, ํ๋ค์ค ๋ฑ์ ๋ผ์ด๋ธ๋ฌ๋ฆฌ
- Apache Flink์์ ์ฌ๋ ค์ง Python API
- ํ์ด์ฌ์ผ๋ก ์คํธ๋ฆผ ํ๋ก์ธ์ฑ์ ํ ์ ์๋ค
- Data๋ฅผ ์ฃผ๊ณ ๋ฐ๋ ๊ฒ์ ์ต์ํ
- Serialization / Deserialization
- ๋น ๋ฅธ Python UDF
- CPU utilization
https://www.apache.org/dyn/closer.lua/flink/flink-1.15.1/flink-1.15.1-bin-scala_2.12.tgz # ์ ์ ํ ๋ค์ด๋ก๋
pyflink ์ค์น
pip install apache-flink
WordCount java ์คํ
./bin/flink run examples/streaming/WordCount.jar
tail log/flink-*-taskexecutor-*.out
WordCount python ์คํ
./bin/flink run --python examples/python/datastream/word_count.py
# Job has been submitted with JobID cc2541ee4ded7e34d7b12d812134fd96
- ์คํ
$ ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host.
Starting taskexecutor daemon on host.
- ์คํ ํ์ธ
$ ps aux | grep flink
- ์ข ๋ฃ
$ ./bin/stop-cluster.sh
- ์น ui ํ์ธ
http://localhost:8081/#/overview

















