ํ‹ฐ์Šคํ† ๋ฆฌ ๋ทฐ

๊ฐœ์š”

์ฃผ์š” ์šฉ์–ด

  • DAG: Directed Acyclic Graph(๋Œ€๊ทธ), ๋ฐฉํ–ฅ์„ฑ ๋น„์ˆœํ™˜ ๊ทธ๋ž˜ํ”„
    • DAG ํŒŒ์ผ์€ ํŒŒ์ด์ฌ์œผ๋กœ ์ด๋ฃจ์–ด์ ธ์žˆ์Œ
  • backfilling(๋ฐฑํ•„): DAG์˜ ๊ณผ๊ฑฐ ์‹œ์ ์„ ์ง€์ •ํ•ด ์‹คํ–‰ํ•˜๋Š” ํ”„๋กœ์„ธ์Šค
  • ํƒœ์Šคํฌ
  • ์˜คํผ๋ ˆ์ดํ„ฐ

๊ทธ๋ž˜ํ”„ ๊ธฐ๋ฐ˜ ํ‘œํ˜„์˜ ํŠน์„ฑ

  • ๋…๋ฆฝ์ ์ธ ํƒœ์Šคํฌ์˜ ๊ฒฝ์šฐ ํƒœ์Šคํฌ๋ฅผ ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Œ
    • ๋‚ ์”จ ์˜ˆ๋ณด ๊ฐ€์ ธ์˜ค๊ธฐ, ํŒ๋งค ๋ฐ์ดํ„ฐ ๊ฐ€์ ธ์˜ค๊ธฐ๋Š” ๋…๋ฆฝ์ ์ž„
  • ๋ชจ๋†€๋ฆฌ์‹(๋‹จ์ผ) ์Šคํฌ๋ฆฝํŠธ๊ฐ€ X, ์ ์ง„์ ์ธ ํƒœ์Šคํฌ๋กœ ๋ช…ํ™•ํ•˜๊ฒŒ ๋ถ„๋ฆฌํ•  ์ˆ˜ ์žˆ์Œ
    • ์ค‘๊ฐ„์— ์‹คํŒจํ•˜๋ฉด ์‹คํŒจํ•œ ํƒœ์Šคํฌ๋งŒ ์žฌ์‹คํ–‰ํ•˜๋ฉด ๋จ

์ฃผ์š” ๊ตฌ์„ฑ ์š”์†Œ

  • Airflow ์Šค์ผ€์ค„๋Ÿฌ: ํ˜„์žฌ ์‹œ์ ์—์„œ DAG์˜ ์Šค์ผ€์ค„์ด ์ง€๋‚œ ๊ฒฝ์šฐ Airflow ์›Œ์ปค์— DAG์˜ ํƒœ์Šคํฌ๋ฅผ ์˜ˆ์•ฝํ•จ
  • Airflow ์›Œ์ปค: ์˜ˆ์•ฝ๋œ ํƒœ์Šคํฌ๋ฅผ ์„ ํƒํ•˜๊ณ  ์‹คํ–‰ํ•จ
  • Airflow ์›น ์„œ๋ฒ„: ์Šค์ผ€์ค„๋Ÿฌ์—์„œ ๋ถ„์„ํ•œ DAG๋ฅผ ์‹œ๊ฐํ™”ํ•˜๊ณ  DAG ์‹คํ–‰๊ณผ ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋Š” ์ธํ„ฐํŽ˜์ด์Šค๋ฅผ ์ œ๊ณตํ•จ

Airflow์˜ ์žฅ์ 

  • bash ์‰˜๊ณผ python์„ ๋™์‹œ์— ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.
  • ์•ž ํƒœ์Šคํฌ๊ฐ€ ์‹คํŒจํ–ˆ์„ ๊ฒฝ์šฐ, ๋’ท ํƒœ์Šคํฌ๋Š” ์‹คํ–‰๋˜์ง€ ์•Š์œผ๋ฉฐ ์‹คํŒจํ•œ ํƒœ์Šคํฌ๋งŒ ์žฌ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

ํŠœํ† ๋ฆฌ์–ผ

์‹œ์ž‘์„ ํ•ด๋ณด์ž! 3๊ฐœ์˜ ํƒœ์Šคํฌ๋กœ ๊ตฌ์„ฑ๋˜์–ด์žˆ๋Š” ํŒŒ์ดํ”„๋ผ์ธ์„ ๋งŒ๋“œ๋ ค๊ณ  ํ•œ๋‹ค. ๋จผ์ €, curl์„ ์ด์šฉํ•ด ๋กœ์ผ“์˜ ๋ฐœ์‚ฌ ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋Š” api๋ฅผ ์ด์šฉํ•ด์„œ launches.json ํŒŒ์ผ์„ ๋ฐ›์•„์˜จ๋‹ค. ๊ทธ๋‹ค์Œ launches.json ํŒŒ์ผ ์•ˆ์— image url์„ ํŒŒ์ด์ฌ์„ ์‚ฌ์šฉํ•ด ๋‹ค์šด ๋ฐ›๋Š”๋‹ค. ๋งˆ์ง€๋ง‰์œผ๋กœ ์ด๋ฏธ์ง€ ๊ฐฏ์ˆ˜๋ฅผ ํ™•์ธํ•˜๋Š” bash shell ๋ช…๋ น์–ด๋ฅผ ์ž…๋ ฅํ•œ๋‹ค.

์•„๋ž˜ ์ฝ”๋“œ๋Š” Apache Airflow ๊ธฐ๋ฐ˜์˜ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ์—์„œ ๋ฐœ์ทŒํ–ˆ์œผ๋ฉฐ ๋„์ปค ์ด๋ฏธ์ง€๊ฐ€ ์—†์–ด ๋‹ค๋ฅธ ์ด๋ฏธ์ง€๋กœ ๋ณ€๊ฒฝํ•˜์˜€๋‹ค.

# download_rocket_launches.py

import json
import pathlib

import airflow.utils.dates
import requests
import requests.exceptions as requests_exceptions
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

dag = DAG(
    dag_id="download_rocket_launches",
    description="Download rocket pictures of recently launched rockets.",
    start_date=airflow.utils.dates.days_ago(14),
    schedule_interval="@daily",
)

download_launches = BashOperator(
    task_id="download_launches",
    bash_command="curl -o /tmp/launches.json -L '<https://ll.thespacedevs.com/2.0.0/launch/upcoming>'",  # noqa: E501
    dag=dag,
)

def _get_pictures():
    # Ensure directory exists
    pathlib.Path("/tmp/images").mkdir(parents=True, exist_ok=True)

    # Download all pictures in launches.json
    with open("/tmp/launches.json") as f:
        launches = json.load(f)
        image_urls = [launch["image"] for launch in launches["results"]]
        for image_url in image_urls:
            try:
                response = requests.get(image_url)
                image_filename = image_url.split("/")[-1]
                target_file = f"/tmp/images/{image_filename}"
                with open(target_file, "wb") as f:
                    f.write(response.content)
                print(f"Downloaded {image_url} to {target_file}")
            except requests_exceptions.MissingSchema:
                print(f"{image_url} appears to be an invalid URL.")
            except requests_exceptions.ConnectionError:
                print(f"Could not connect to {image_url}.")

get_pictures = PythonOperator(
    task_id="get_pictures", python_callable=_get_pictures, dag=dag
)

notify = BashOperator(
    task_id="notify",
    bash_command='echo "There are now $(ls /tmp/images/ | wc -l) images."',
    dag=dag,
)

download_launches >> get_pictures >> notify

airflow๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด์„œ docker๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. docker๋ฅผ ์‹คํ–‰ํ–ˆ์„๋•Œ ๋กœ๊ทธ๊ฐ€ ์ถœ๋ ฅ๋˜๋Š” ๊ฑธ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

docker run \\
-ti \\
-p 8080:8080 \\
-v {download_rocket_launches.py ํŒŒ์ผ ์œ„์น˜}:/opt/airflow/dags/download_rocket_launches.py \\
--name airflow \\
--entrypoint=/bin/bash \\
apache/airflow:slim-latest-python3.8 \\
-c '( \\
airflow db init && \\
airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email admin@example.org \\
); \\
airflow webserver & \\
airflow scheduler \\
'

์„ฑ๊ณต์ ์œผ๋กœ ์‹คํ–‰๋๋‹ค๋ฉด http://localhost:8080์œผ๋กœ ๋“ค์–ด๊ฐ€๋ณด์ž

์™ผ์ชฝ ํ† ๊ธ€์„ ์˜ค๋ฅธ์ชฝ์œผ๋กœ ์˜ฎ๊ธฐ๊ณ  ์žฌ์ƒ์„ ๋ˆ„๋ฅด๋ฉด ์‹คํ–‰๋œ๋‹ค!

 

์Šค์ผ€์ค„

๊ธฐ๋ณธ์ ์ธ ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ, cron ๊ธฐ๋ฐ˜์˜ ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ, ๋นˆ๋„ ๊ธฐ๋ฐ˜์˜ ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ์ด ์กด์žฌํ•œ๋‹ค.

๊ฐ€์žฅ ๋จผ์ € ๊ธฐ๋ณธ์ ์ธ ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ์„ ์•Œ์•„๋ณด์ž. schedule_interval์„ @daily๋กœ ํ•œ๋‹ค๋ฉด ์‹คํ–‰์€ ๋งค์ผ ์ž์ •์ด๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด 2019-01-01์— DAG๋ฅผ ์‹คํ–‰ํ•˜๋ฉด 1์›” 2์ผ ์ž์ •์— ์ฒ˜์Œ์œผ๋กœ ์‹คํ–‰๋œ๋‹ค. start_date๊ฐ€ 2019-01-01, end date๊ฐ€ 2019-01-05๋ผ๋ฉด ์•„๋ž˜์™€ ๊ฐ™์ด ๋‹ค์„ฏ๋ฒˆ ์‹คํ–‰๋œ๋‹ค.

  • 2019-01-02: ์ตœ์ดˆ ์‹คํ–‰
  • 2019-01-03: ๋‘๋ฒˆ์งธ ์‹คํ–‰
  • 2019-01-04: ์„ธ๋ฒˆ์งธ ์‹คํ–‰
  • 2019-01-05: ๋„ค๋ฒˆ์งธ ์‹คํ–‰
  • 2019-01-06: ๋‹ค์„ฏ๋ฒˆ์งธ ์‹คํ–‰

cron ๊ธฐ๋ฐ˜์˜ ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ์€ 5๊ฐœ์˜ ๊ตฌ์„ฑ ์š”์†Œ๊ฐ€ ์žˆ๊ณ  ๋งจ ์•ž ์ˆœ์„œ๋ถ€ํ„ฐ ๋ถ„(0~59), ์‹œ๊ฐ„(0~23), ์ผ(1~31), ์›”(1~12), ์š”์ผ(0~6)์œผ๋กœ ๋˜์–ด์žˆ๋‹ค. *๋Š” ์• ์Šคํ„ฐ๋ฆฌ์Šคํฌ๋กœ ์ œํ•œ๋˜์ง€ ์•Š์€ ํ•„๋“œ๋กœ ์ •์˜ํ•œ๋‹ค.

  • 0 0 * * 0์ด๋ผ๋ฉด ์ผ์š”์ผ ์ž์ •์— ์‹คํ–‰
  • 2 13 2 * *์ด๋ผ๋ฉด ๋งค์›” 2์ผ 13์‹œ 2๋ถ„์— ์‹คํ–‰
  • 0 0,12 * * MON-FRI๋ผ๋ฉด ๋งค์›” ์›”-๊ธˆ ์ž์ •, ์ •์˜ค์— ์‹คํ–‰

๋นˆ๋„ ๊ธฐ๋ฐ˜์˜ ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ์€ cron์˜ ์ œ์•ฝ์„ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, 3์ผ๋งˆ๋‹ค ํ•œ ๋ฒˆ์”ฉ ์‹คํ–‰ํ•˜๊ธฐ ์œ„ํ•ด์„  cron ๊ธฐ๋ฐ˜์€ ๊ฐ€๋Šฅํ•˜์ง€ ์•Š๋‹ค. ์ด๋Ÿด๋–ˆ timedelta ์ธ์Šคํ„ด์Šค๋ฅผ ์‚ฌ์šฉํ•ด ๋นˆ๋„ ๊ธฐ๋ฐ˜์˜ ์Šค์ผ€์ค„ ๊ฐ„๊ฒฉ์„ ์ •์˜ํžŒ๋‹ค. ์•„๋ž˜์™€ ๊ฐ™์ด dag๋ฅผ ๋งŒ๋“ค๋ฉด 2019๋…„ 1์›” 4์ผ์— ์‹คํ–‰๋  ๊ฒƒ์ด๋‹ค.

dag=DAG(
        dag_id="time_delta",
        schedule_interval=dt.timedelta(days=3),
        start_date=dt.datetime(year=2019, month=1, day=1),
        end_date=dt.datetime(year=2019, month=1, day=5)

 

์˜์กด์„ฑ

๋‘ ํƒœ์Šคํฌ๊ฐ€ ์˜์กด์„ฑ์„ ๊ฐ€์งˆ๋•Œ๋Š” ์•„๋ž˜ ์˜์กด์„ฑ์„ ์ถ”๊ฐ€ํ•ด์•ผํ•œ๋‹ค.

fetch_weather >> clean_weather
fetch_sales >> clean_sales

ํ•˜๋‚˜์˜ ํƒœ์Šคํฌ๊ฐ€ 2๊ฐœ์˜ ํƒœ์Šคํฌ์— ์˜์กด์„ฑ์„ ๊ฐ€์ง„๋‹ค๋ฉด (์˜ˆ๋ฅผ ๋“ค์–ด, ๋ฐ์ดํ„ฐ์…‹์„ ๋งŒ๋“ค๊ธฐ ์œ„ํ•ด ๋‚ ์”จ ๋ฐ์ดํ„ฐ์™€ ํŒ๋งค ๋ฐ์ดํ„ฐ ๋ชจ๋‘๋ฅผ ๊ฐ€์ ธ์™€์•ผํ•œ๋‹ค๋ฉด) ์•„๋ž˜์™€ ๊ฐ™์ด ํŒฌ์ธ(์ผ ๋Œ€ ๋‹ค) ์˜์กด์„ฑ ํƒœ์Šคํฌ๋ฅผ ์ƒ์„ฑํ•ด์•ผํ•œ๋‹ค.

[fetch_weather, fetch_sales] >> join_datasets

๋ฐ˜๋Œ€๋ผ๋ฉด ํŒฌ์•„์›ƒ(๋‹ค ๋Œ€ ์ผ)์ด๋ผ๊ณ  ํ•œ๋‹ค.

start >> [fetch_weather, fetch_sales]

์œ„์™€ ๊ฐ™์€ ์˜์กด์„ฑ์ผ ๋•Œ DAG๋ฅผ ์‹คํ–‰ํ•˜๋ฉด fetch_weather์™€ fetch_sales๋Š” ๋ณ‘๋ ฌ๋กœ ์‹คํ–‰๋œ๋‹ค.

 

๋ธŒ๋žœ์น˜

์กฐ๊ฑด์— ์˜ํ•ด ๋‹ค๋ฅธ task๋ฅผ ์‹คํ–‰ํ•ด์•ผํ•œ๋‹ค๋ฉด BranchPythonOperator๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. ์„ ํƒํ•˜์ง€ ์•Š์€ ๋ธŒ๋žœ์น˜ ์ž‘์—…์€ ๋ชจ๋‘ ๊ฑด๋„ˆ๋›ด๋‹ค.

 

XCom

ํƒœ์Šคํฌ๋“ค์€ ๋…๋ฆฝ์ ์œผ๋กœ ์‹คํ–‰๋˜๊ธฐ ๋•Œ๋ฌธ์— ํƒœ์Šคํฌ ๊ฐ„์— ๋ฐ์ดํ„ฐ๋ฅผ ๊ณต์œ ํ•˜๊ธฐ ์œ„ํ•ด์„  XCom์„ ์‚ฌ์šฉํ•ด์•ผ ํ•œ๋‹ค.

def _train_mode(**context):
  model_id=str(uuid.uuid4())
  context["task_instance"].xcom_push(key="model_id", value="model_id")
  
def _deploy_model(**context):
  model_id=context["task_instance"].xcom.pull(
    "task_ids="train_model, key="model_id"
  )
  join_datasets >> train_model >> deploy_model

์ด ์ ‘๊ทผ ๋ฐฉ์‹์€ ํ•จ์ˆ˜๋ฅผ ์ •์˜ํ•œ ํ›„ PythonOperator๋ฅผ ์ด์šฉํ•ด Airflow ํƒœ์Šคํฌ๋ฅผ ์ƒ์„ฑํ•ด์•ผํ•˜๊ธฐ ๋•Œ๋ฌธ์— Taskflow API๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. @task ๋ผ๋Š” ๋ฐ์ฝ”๋ ˆ์ดํ„ฐ๋กœ ๋ณ€ํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค.

from airflow.decorators import task
with DAG(...) as dag:
  @task
  def train_model():
  model_id = str(uuid.uuid4())
  return model_id
  @task
  def deploy_model(model_id: str):
    print(f"Deploying model {model_id")

 

์ฐธ๊ณ ์ž๋ฃŒ

  • Apache Airflow ๊ธฐ๋ฐ˜์˜ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ ์ฑ…
์ตœ๊ทผ์— ์˜ฌ๋ผ์˜จ ๊ธ€
ยซ   2025/05   ยป
์ผ ์›” ํ™” ์ˆ˜ ๋ชฉ ๊ธˆ ํ† 
1 2 3
4 5 6 7 8 9 10
11 12 13 14 15 16 17
18 19 20 21 22 23 24
25 26 27 28 29 30 31
Total
Today
Yesterday