ํฐ์คํ ๋ฆฌ ๋ทฐ
๊ฐ์
์ฃผ์ ์ฉ์ด
- DAG: Directed Acyclic Graph(๋๊ทธ), ๋ฐฉํฅ์ฑ ๋น์ํ ๊ทธ๋ํ
- DAG ํ์ผ์ ํ์ด์ฌ์ผ๋ก ์ด๋ฃจ์ด์ ธ์์
- backfilling(๋ฐฑํ): DAG์ ๊ณผ๊ฑฐ ์์ ์ ์ง์ ํด ์คํํ๋ ํ๋ก์ธ์ค
- ํ์คํฌ
- ์คํผ๋ ์ดํฐ
๊ทธ๋ํ ๊ธฐ๋ฐ ํํ์ ํน์ฑ
- ๋
๋ฆฝ์ ์ธ ํ์คํฌ์ ๊ฒฝ์ฐ ํ์คํฌ๋ฅผ ๋ณ๋ ฌ๋ก ์คํํ ์ ์์
- ๋ ์จ ์๋ณด ๊ฐ์ ธ์ค๊ธฐ, ํ๋งค ๋ฐ์ดํฐ ๊ฐ์ ธ์ค๊ธฐ๋ ๋ ๋ฆฝ์ ์
- ๋ชจ๋๋ฆฌ์(๋จ์ผ) ์คํฌ๋ฆฝํธ๊ฐ X, ์ ์ง์ ์ธ ํ์คํฌ๋ก ๋ช
ํํ๊ฒ ๋ถ๋ฆฌํ ์ ์์
- ์ค๊ฐ์ ์คํจํ๋ฉด ์คํจํ ํ์คํฌ๋ง ์ฌ์คํํ๋ฉด ๋จ
์ฃผ์ ๊ตฌ์ฑ ์์
- Airflow ์ค์ผ์ค๋ฌ: ํ์ฌ ์์ ์์ DAG์ ์ค์ผ์ค์ด ์ง๋ ๊ฒฝ์ฐ Airflow ์์ปค์ DAG์ ํ์คํฌ๋ฅผ ์์ฝํจ
- Airflow ์์ปค: ์์ฝ๋ ํ์คํฌ๋ฅผ ์ ํํ๊ณ ์คํํจ
- Airflow ์น ์๋ฒ: ์ค์ผ์ค๋ฌ์์ ๋ถ์ํ DAG๋ฅผ ์๊ฐํํ๊ณ DAG ์คํ๊ณผ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ ์ ์๋ ์ธํฐํ์ด์ค๋ฅผ ์ ๊ณตํจ
Airflow์ ์ฅ์
- bash ์๊ณผ python์ ๋์์ ํ์ฉํ ์ ์๋ค.
- ์ ํ์คํฌ๊ฐ ์คํจํ์ ๊ฒฝ์ฐ, ๋ท ํ์คํฌ๋ ์คํ๋์ง ์์ผ๋ฉฐ ์คํจํ ํ์คํฌ๋ง ์ฌ์คํํ ์ ์๋ค.
ํํ ๋ฆฌ์ผ
์์์ ํด๋ณด์! 3๊ฐ์ ํ์คํฌ๋ก ๊ตฌ์ฑ๋์ด์๋ ํ์ดํ๋ผ์ธ์ ๋ง๋๋ ค๊ณ ํ๋ค. ๋จผ์ , curl์ ์ด์ฉํด ๋ก์ผ์ ๋ฐ์ฌ ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ฌ ์ ์๋ api๋ฅผ ์ด์ฉํด์ launches.json ํ์ผ์ ๋ฐ์์จ๋ค. ๊ทธ๋ค์ launches.json ํ์ผ ์์ image url์ ํ์ด์ฌ์ ์ฌ์ฉํด ๋ค์ด ๋ฐ๋๋ค. ๋ง์ง๋ง์ผ๋ก ์ด๋ฏธ์ง ๊ฐฏ์๋ฅผ ํ์ธํ๋ bash shell ๋ช ๋ น์ด๋ฅผ ์ ๋ ฅํ๋ค.
์๋ ์ฝ๋๋ Apache Airflow ๊ธฐ๋ฐ์ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ์์ ๋ฐ์ทํ์ผ๋ฉฐ ๋์ปค ์ด๋ฏธ์ง๊ฐ ์์ด ๋ค๋ฅธ ์ด๋ฏธ์ง๋ก ๋ณ๊ฒฝํ์๋ค.
# download_rocket_launches.py
import json
import pathlib
import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
dag = DAG(
dag_id="download_rocket_launches",
description="Download rocket pictures of recently launched rockets.",
start_date=airflow.utils.dates.days_ago(14),
schedule_interval="@daily",
)
download_launches = BashOperator(
task_id="download_launches",
bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'", # noqa: E501
dag=dag,
)
def _get_pictures():
# Ensure directory exists
pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)
# Download all pictures in launches.json
with open("/tmp/launches.json") as f:
launches = json.load(f)
image_urls = [launch["image"] for launch in launches["results"]]
for image_url in image_urls:
try:
response = requests.get(image_url)
image_filename = image_url.split("/")[-1]
target_file = f"/tmp/images/{image_filename}"
with open(target_file, "wb") as f:
f.write(response.content)
print(f"Downloaded {image_url} to {target_file}")
except requests_exceptions.MissingSchema:
print(f"{image_url} appears to be an invalid URL.")
except requests_exceptions.ConnectionError:
print(f"Could not connect to {image_url}.")
get_pictures = PythonOperator(
task_id="get_pictures", python_callable=_get_pictures, dag=dag
)
notify = BashOperator(
task_id="notify",
bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
dag=dag,
)
download_launches >> get_pictures >> notify
airflow๋ฅผ ์คํํ๊ธฐ ์ํด์ docker๋ฅผ ์ฌ์ฉํ๋ค. docker๋ฅผ ์คํํ์๋ ๋ก๊ทธ๊ฐ ์ถ๋ ฅ๋๋ ๊ฑธ ํ์ธํ ์ ์๋ค.
docker run \\
-ti \\
-p 8080:8080 \\
-v {download_rocket_launches.py ํ์ผ ์์น}:/opt/airflow/dags/download_rocket_launches.py \\
--name airflow \\
--entrypoint=/bin/bash \\
apache/airflow:slim-latest-python3.8 \\
-c '( \\
airflow db init && \\
airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org \\
); \\
airflow webserver & \\
airflow scheduler \\
'
์ฑ๊ณต์ ์ผ๋ก ์คํ๋๋ค๋ฉด http://localhost:8080์ผ๋ก ๋ค์ด๊ฐ๋ณด์
์ผ์ชฝ ํ ๊ธ์ ์ค๋ฅธ์ชฝ์ผ๋ก ์ฎ๊ธฐ๊ณ ์ฌ์์ ๋๋ฅด๋ฉด ์คํ๋๋ค!
์ค์ผ์ค
๊ธฐ๋ณธ์ ์ธ ์ค์ผ์ค ๊ฐ๊ฒฉ, cron ๊ธฐ๋ฐ์ ์ค์ผ์ค ๊ฐ๊ฒฉ, ๋น๋ ๊ธฐ๋ฐ์ ์ค์ผ์ค ๊ฐ๊ฒฉ์ด ์กด์ฌํ๋ค.
๊ฐ์ฅ ๋จผ์ ๊ธฐ๋ณธ์ ์ธ ์ค์ผ์ค ๊ฐ๊ฒฉ์ ์์๋ณด์. schedule_interval์ @daily๋ก ํ๋ค๋ฉด ์คํ์ ๋งค์ผ ์์ ์ด๋ค. ์๋ฅผ ๋ค์ด 2019-01-01์ DAG๋ฅผ ์คํํ๋ฉด 1์ 2์ผ ์์ ์ ์ฒ์์ผ๋ก ์คํ๋๋ค. start_date๊ฐ 2019-01-01, end date๊ฐ 2019-01-05๋ผ๋ฉด ์๋์ ๊ฐ์ด ๋ค์ฏ๋ฒ ์คํ๋๋ค.
- 2019-01-02: ์ต์ด ์คํ
- 2019-01-03: ๋๋ฒ์งธ ์คํ
- 2019-01-04: ์ธ๋ฒ์งธ ์คํ
- 2019-01-05: ๋ค๋ฒ์งธ ์คํ
- 2019-01-06: ๋ค์ฏ๋ฒ์งธ ์คํ
cron ๊ธฐ๋ฐ์ ์ค์ผ์ค ๊ฐ๊ฒฉ์ 5๊ฐ์ ๊ตฌ์ฑ ์์๊ฐ ์๊ณ ๋งจ ์ ์์๋ถํฐ ๋ถ(0~59), ์๊ฐ(0~23), ์ผ(1~31), ์(1~12), ์์ผ(0~6)์ผ๋ก ๋์ด์๋ค. *๋ ์ ์คํฐ๋ฆฌ์คํฌ๋ก ์ ํ๋์ง ์์ ํ๋๋ก ์ ์ํ๋ค.
- 0 0 * * 0์ด๋ผ๋ฉด ์ผ์์ผ ์์ ์ ์คํ
- 2 13 2 * *์ด๋ผ๋ฉด ๋งค์ 2์ผ 13์ 2๋ถ์ ์คํ
- 0 0,12 * * MON-FRI๋ผ๋ฉด ๋งค์ ์-๊ธ ์์ , ์ ์ค์ ์คํ
๋น๋ ๊ธฐ๋ฐ์ ์ค์ผ์ค ๊ฐ๊ฒฉ์ cron์ ์ ์ฝ์ ํด๊ฒฐํ ์ ์๋ค. ์๋ฅผ ๋ค์ด, 3์ผ๋ง๋ค ํ ๋ฒ์ฉ ์คํํ๊ธฐ ์ํด์ cron ๊ธฐ๋ฐ์ ๊ฐ๋ฅํ์ง ์๋ค. ์ด๋ด๋ timedelta ์ธ์คํด์ค๋ฅผ ์ฌ์ฉํด ๋น๋ ๊ธฐ๋ฐ์ ์ค์ผ์ค ๊ฐ๊ฒฉ์ ์ ์ํ๋ค. ์๋์ ๊ฐ์ด dag๋ฅผ ๋ง๋ค๋ฉด 2019๋ 1์ 4์ผ์ ์คํ๋ ๊ฒ์ด๋ค.
dag=DAG(
dag_id="time_delta",
schedule_interval=dt.timedelta(days=3),
start_date=dt.datetime(year=2019, month=1, day=1),
end_date=dt.datetime(year=2019, month=1, day=5)
์์กด์ฑ
๋ ํ์คํฌ๊ฐ ์์กด์ฑ์ ๊ฐ์ง๋๋ ์๋ ์์กด์ฑ์ ์ถ๊ฐํด์ผํ๋ค.
fetch_weather >> clean_weather
fetch_sales >> clean_sales
ํ๋์ ํ์คํฌ๊ฐ 2๊ฐ์ ํ์คํฌ์ ์์กด์ฑ์ ๊ฐ์ง๋ค๋ฉด (์๋ฅผ ๋ค์ด, ๋ฐ์ดํฐ์ ์ ๋ง๋ค๊ธฐ ์ํด ๋ ์จ ๋ฐ์ดํฐ์ ํ๋งค ๋ฐ์ดํฐ ๋ชจ๋๋ฅผ ๊ฐ์ ธ์์ผํ๋ค๋ฉด) ์๋์ ๊ฐ์ด ํฌ์ธ(์ผ ๋ ๋ค) ์์กด์ฑ ํ์คํฌ๋ฅผ ์์ฑํด์ผํ๋ค.
[fetch_weather, fetch_sales] >> join_datasets
๋ฐ๋๋ผ๋ฉด ํฌ์์(๋ค ๋ ์ผ)์ด๋ผ๊ณ ํ๋ค.
start >> [fetch_weather, fetch_sales]
์์ ๊ฐ์ ์์กด์ฑ์ผ ๋ DAG๋ฅผ ์คํํ๋ฉด fetch_weather์ fetch_sales๋ ๋ณ๋ ฌ๋ก ์คํ๋๋ค.
๋ธ๋์น
์กฐ๊ฑด์ ์ํด ๋ค๋ฅธ task๋ฅผ ์คํํด์ผํ๋ค๋ฉด BranchPythonOperator๋ฅผ ์ฌ์ฉํ๋ค. ์ ํํ์ง ์์ ๋ธ๋์น ์์ ์ ๋ชจ๋ ๊ฑด๋๋ด๋ค.
XCom
ํ์คํฌ๋ค์ ๋ ๋ฆฝ์ ์ผ๋ก ์คํ๋๊ธฐ ๋๋ฌธ์ ํ์คํฌ ๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ๊ณต์ ํ๊ธฐ ์ํด์ XCom์ ์ฌ์ฉํด์ผ ํ๋ค.
def _train_mode(**context):
model_id=str(uuid.uuid4())
context["task_instance"].xcom_push(key="model_id", value="model_id")
def _deploy_model(**context):
model_id=context["task_instance"].xcom.pull(
"task_ids="train_model, key="model_id"
)
join_datasets >> train_model >> deploy_model
์ด ์ ๊ทผ ๋ฐฉ์์ ํจ์๋ฅผ ์ ์ํ ํ PythonOperator๋ฅผ ์ด์ฉํด Airflow ํ์คํฌ๋ฅผ ์์ฑํด์ผํ๊ธฐ ๋๋ฌธ์ Taskflow API๋ฅผ ์ฌ์ฉํ๋ค. @task ๋ผ๋ ๋ฐ์ฝ๋ ์ดํฐ๋ก ๋ณํํ ์ ์๋ค.
from airflow.decorators import task
with DAG(...) as dag:
@task
def train_model():
model_id = str(uuid.uuid4())
return model_id
@task
def deploy_model(model_id: str):
print(f"Deploying model {model_id")
์ฐธ๊ณ ์๋ฃ
- Apache Airflow ๊ธฐ๋ฐ์ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ์ฑ
'๐ป Dev' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
Lucene ๊ธฐ๋ฐ ElasticSearch์ ๋ํด ์์๋ณด์ (0) | 2024.06.18 |
---|---|
๐ฅ๏ธ [์ปดํจํฐ๊ตฌ์กฐ์ ์ด์์ฒด์ ] ์ปดํจํฐ ๋ถํ, ๋ฐ์ดํฐ, ๋ช ๋ น์ด (0) | 2024.05.06 |
๐ docker cache ์ญ์ ํ๊ธฐ (0) | 2024.05.02 |
๐โโ๏ธ [python] ray๋ก ์นผํด๋ฅผ ํด๋ณด์ (0) | 2024.05.02 |
๐ [python] jpeg๋ก ์ธ์ฝ๋ฉํ๋ฉด ์ด๋ฏธ์ง๊ฐ ๋ณํ๋ค? (0) | 2024.05.02 |
- vscode
- ๋ฒ ์ด์ฆ ์ ๋ฆฌ
- ์ฑ ๋ฆฌ๋ทฐ
- ๊ฐ๋ฐ์
- Computer Vision
- python
- ๋จธ์ ๋ฌ๋ ์ด๋ก
- GIT
- ๊ธ๋
- Multiprocessing
- linux
- Generative Model
- tmux
- ๋ ํ๊ฐ
- ํ๊ณ
์ผ | ์ | ํ | ์ | ๋ชฉ | ๊ธ | ํ |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- Total
- Today
- Yesterday