Airflow 설치
AIRFLOW_VERSION=2.6.3
PYTHON_VERSION="$(python --version | cut -d " " -f 2| cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
Airflow Init
- Code
export AIRFLOW_HOME=`pwd`#(따옴표가 아니라 ~와 같이있는 `이다.)
echo $AIRFLOW_HOME
airflow db init
-
Result
export AIRFLOW_HOME=pwd`` 는 AIRFLOW_HOME이라는 환경 변수에 pwd(print working directory=현재 작업 디렉토리)로 설정하겠다는 것이다.echo $AIRFLOW_HOME는 제대로 환경 변수가 확인되었는지 확인하는 코드이다. 이 코드에서는 현재 디렉토리가 출력되어야 정상이다.(생략해도 AIRFLOW실행에는 문제 없다.)airflow db init는 Airflow의 Database를 초기화하는 코드이다. 제대로 되었다면 해당 폴더 내에 aiflow.cfg, airflow.db가 생성되었을 것이다.- airflow 계정 생성
airflow users create \
> --username [유저 이름]\
> --password ['password 지정'] \
> --firstname [이름] \
> --lastname [성] \
> --role Admin \
> --email [이메일]
- 위와 같이 생성한 후에
airflow users list를 입력해주면 생성된 것을 확인할 수 있다.
Airflow 실행
- Code
airflow webserver --port 8080
- Result
- 위의 그림과 같이 나오면 정상적으로 된 것이고, 빨간색 box를 보았을 때, 내가 지정한 port 번호로 생성된 것을 확인할 수 있다.
- 우리가 띄운 웹 서버는 http://localhost:8080으로 접속할 수 있다.
- 위 url로 접속하면 Sign In이 뜰 텐데, 우리가 아까 생성한 계정의 username과 password를 입력하면 된다.
- 들어가게 되면 위와 같은 문구들이 보일텐데 이 중에 빨간 box가 쳐진 것만 우선 해결해 볼 생각이다.
- 간단하게 현재 scheduler가 실행되어 있지 않아 발생한 것이다.
- scheduler를 실행하기 위해 새로운 터미널 창을 실행시켜준다.
# 새로운 터미널 창
export AIRFLOW_HOME=`pwd`
airflow scheduler
- init 할 때와 동일하게 export로 환경 변수를 설정해주고,
airflow scheduler를 작성하여 실행해주면 된다. - 이 때, 기존의 webserver를 띄운 터미널을 종료하면 안된다. 즉, Airflow를 사용하기 위해 webserver, scheduler를 띄운 최소 2개의 터미널이 필요하다.
- 위의 코드를 실행하고, 웹 사이트를 새로고침하면 사라진 것을 확인할 수 있다.
DAG 작성
- 새로운 터미널을 하나 더 띄우고, 내가 아까 지정한 AIRFLOW_HOME환경 변수 경로에 dags라는 폴더를 하나 만든다.(폴더 이름은 무조건 dags이어야 한다.)
- DAG파일은 크게 DAG 정의, Task 정의, Task 순서 정의로 나눌 수 있다.
- DAG 정의
from airflow import DAG
default_args = {
'owner':'', # 소유자 지정
'depends_on_past' : -> bool, # True일 경우, 이전 DAG이 성공으로 완료 되어야 이후의 DAG 실행
'start_date' : datetime(2024,1,1), # DAG 실행의 시작 날짜
'end_date' : datetime(2024, 1, 4) # DAG 실행의 종료 날짜
'retries' : int # Task 실패 시에 재시도 횟수
'retry_delay' : ->timedelta # 재시도 사이의 대기 시간 정
}
with DAG(
dag_id = "", # DAG의 고유 식별자 지정
defualt_args = default_args, # Task의 기본 인자 설정
description = "", # DAG의 설명
scheduler = "* * * * *", # DAG의 실행 주기(Cron 표현식이나 @daily, @hourly, @once 등으로 지정)
tags = [] -> list, # DAG을 태그로 그룹화할 수 있게 태그 지정
catchup = -> bool # True일 경우, DAG에서 정의한 start_date부터 현재까지 미실행된 모든 스케줄에 대해 DAG을 실행하게 된다.
) as dag:
- Task 정의
from airflow.operators.python import PythonOperator
from airflow import DAG
default_args = {
'owner':'', # 소유자 지정
'depends_on_past' : -> bool, # True일 경우, 이전 DAG이 성공으로 완료 되어야 이후의 DAG 실행
'start_date' : datetime(2024,1,1), # DAG 실행의 시작 날짜
'end_date' : datetime(2024, 1, 4) # DAG 실행의 종료 날짜
'retries' : int # Task 실패 시에 재시도 횟수
'retry_delay' : ->timedelta # 재시도 사이의 대기 시간 정
}
with DAG(
dag_id = "", # DAG의 고유 식별자 지정
defualt_args = default_args, # Task의 기본 인자 설정
description = "", # DAG의 설명
scheduler = "* * * * *", # DAG의 실행 주기(Cron 표현식이나 @daily, @hourly, @once 등으로 지정)
tags = [] -> list, # DAG을 태그로 그룹화할 수 있게 태그 지정
catchup = -> bool # True일 경우, DAG에서 정의한 start_date부터 현재까지 미실행된 모든 스케줄에 대해 DAG을 실행하게 된다.
) as dag:
t1 = PythonOperator(
tast_id = '',
python_callable = # 파이썬 함수 지정
- operator를 불러와서 사용한다. 이때 PythonOperator 이외에도 여러 Operator가 있다.
| Operator | |
|---|---|
| airflow.operators.python.PythonOperator | 파이썬에서 Callable한 객체를 파라미터로 넘겨 실행 |
| airflow.operators.bash.BashOperator | Bash 커맨드를 실행 |
| airlfow.operators.dummy.DummyOperator | 아무것도 실행하지 않음(여러 Task의 Success를 기다릴 때 사용) |
| airflow.provider.http.operators.http.SimpleHttpOperator | 특정 호스트로 HTTP 요청을 보내고 Response를 반환 |
| airflow.operators.python.BranchPythonOperator | 특정 조건에 따라 실행을 제어 |
PREVIOUS[Airflow]Airflow 실습