[Airflow]Airflow 실습

 

Airflow 설치

AIRFLOW_VERSION=2.6.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2| cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Airflow Init

  • Code
export AIRFLOW_HOME=`pwd`#(따옴표가 아니라 ~와 같이있는 `이다.)
echo $AIRFLOW_HOME

airflow db init
  • Result

    스크린샷 2024-12-19 124119

  • export AIRFLOW_HOME=pwd`` 는 AIRFLOW_HOME이라는 환경 변수에 pwd(print working directory=현재 작업 디렉토리)로 설정하겠다는 것이다.
  • echo $AIRFLOW_HOME 는 제대로 환경 변수가 확인되었는지 확인하는 코드이다. 이 코드에서는 현재 디렉토리가 출력되어야 정상이다.(생략해도 AIRFLOW실행에는 문제 없다.)
  • airflow db init 는 Airflow의 Database를 초기화하는 코드이다. 제대로 되었다면 해당 폴더 내에 aiflow.cfg, airflow.db가 생성되었을 것이다.
  • airflow 계정 생성
airflow users create \
> --username [유저 이름]\
> --password ['password 지정'] \
> --firstname [이름] \
> --lastname [성] \
> --role Admin \
> --email [이메일]
  • 위와 같이 생성한 후에 airflow users list 를 입력해주면 생성된 것을 확인할 수 있다.

Airflow 실행

  • Code
airflow webserver --port 8080
  • Result

스크린샷 2024-12-19 125730

  • 위의 그림과 같이 나오면 정상적으로 된 것이고, 빨간색 box를 보았을 때, 내가 지정한 port 번호로 생성된 것을 확인할 수 있다.
  • 우리가 띄운 웹 서버는 http://localhost:8080으로 접속할 수 있다.
  • 위 url로 접속하면 Sign In이 뜰 텐데, 우리가 아까 생성한 계정의 username과 password를 입력하면 된다.

스크린샷 2024-12-19 130423

  • 들어가게 되면 위와 같은 문구들이 보일텐데 이 중에 빨간 box가 쳐진 것만 우선 해결해 볼 생각이다.
  • 간단하게 현재 scheduler가 실행되어 있지 않아 발생한 것이다.
  • scheduler를 실행하기 위해 새로운 터미널 창을 실행시켜준다.
# 새로운 터미널 창
export AIRFLOW_HOME=`pwd`

airflow scheduler
  • init 할 때와 동일하게 export로 환경 변수를 설정해주고, airflow scheduler 를 작성하여 실행해주면 된다.
  • 이 때, 기존의 webserver를 띄운 터미널을 종료하면 안된다. 즉, Airflow를 사용하기 위해 webserver, scheduler를 띄운 최소 2개의 터미널이 필요하다.

스크린샷 2024-12-19 131139

  • 위의 코드를 실행하고, 웹 사이트를 새로고침하면 사라진 것을 확인할 수 있다.

DAG 작성

  • 새로운 터미널을 하나 더 띄우고, 내가 아까 지정한 AIRFLOW_HOME환경 변수 경로에 dags라는 폴더를 하나 만든다.(폴더 이름은 무조건 dags이어야 한다.)
  • DAG파일은 크게 DAG 정의, Task 정의, Task 순서 정의로 나눌 수 있다.
  • DAG 정의
from airflow import DAG

default_args = {
    'owner':'', # 소유자 지정
    'depends_on_past' : -> bool, # True일 경우, 이전 DAG이 성공으로 완료 되어야 이후의 DAG 실행
    'start_date' : datetime(2024,1,1), # DAG 실행의 시작 날짜
    'end_date' : datetime(2024, 1, 4) # DAG 실행의 종료 날짜
    'retries' : int # Task 실패 시에 재시도 횟수
    'retry_delay' : ->timedelta # 재시도 사이의 대기 시간 정
}

with DAG(
	dag_id = "", # DAG의 고유 식별자 지정
	defualt_args = default_args, # Task의 기본 인자 설정
	description = "", # DAG의 설명
	scheduler = "* * * * *", # DAG의 실행 주기(Cron 표현식이나 @daily, @hourly, @once 등으로 지정)
	tags = [] -> list, # DAG을 태그로 그룹화할 수 있게 태그 지정
	catchup = -> bool # True일 경우, DAG에서 정의한 start_date부터 현재까지 미실행된 모든 스케줄에 대해 DAG을 실행하게 된다.
) as dag:
  • Task 정의
from airflow.operators.python import PythonOperator
from airflow import DAG

default_args = {
    'owner':'', # 소유자 지정
    'depends_on_past' : -> bool, # True일 경우, 이전 DAG이 성공으로 완료 되어야 이후의 DAG 실행
    'start_date' : datetime(2024,1,1), # DAG 실행의 시작 날짜
    'end_date' : datetime(2024, 1, 4) # DAG 실행의 종료 날짜
    'retries' : int # Task 실패 시에 재시도 횟수
    'retry_delay' : ->timedelta # 재시도 사이의 대기 시간 정
}

with DAG(
	dag_id = "", # DAG의 고유 식별자 지정
	defualt_args = default_args, # Task의 기본 인자 설정
	description = "", # DAG의 설명
	scheduler = "* * * * *", # DAG의 실행 주기(Cron 표현식이나 @daily, @hourly, @once 등으로 지정)
	tags = [] -> list, # DAG을 태그로 그룹화할 수 있게 태그 지정
	catchup = -> bool # True일 경우, DAG에서 정의한 start_date부터 현재까지 미실행된 모든 스케줄에 대해 DAG을 실행하게 된다.
) as dag:
		t1 = PythonOperator(
				tast_id = '',
				python_callable = # 파이썬 함수 지정
  • operator를 불러와서 사용한다. 이때 PythonOperator 이외에도 여러 Operator가 있다.
Operator  
airflow.operators.python.PythonOperator 파이썬에서 Callable한 객체를 파라미터로 넘겨 실행
airflow.operators.bash.BashOperator Bash 커맨드를 실행
airlfow.operators.dummy.DummyOperator 아무것도 실행하지 않음(여러 Task의 Success를 기다릴 때 사용)
airflow.provider.http.operators.http.SimpleHttpOperator 특정 호스트로 HTTP 요청을 보내고 Response를 반환
airflow.operators.python.BranchPythonOperator 특정 조건에 따라 실행을 제어