[Airflow] PythonOperator에 arugments 넘기기

반응형

op_kwargs 와 op_args 이용

두 개의 차이는 op_kwargs는 dict이고 op_args는 list이다.

 

PythonOperator

def __init__(
    self,
    *,
    python_callable: Callable,
    op_args: Optional[List] = None,
    op_kwargs: Optional[Dict] = None,
    templates_dict: Optional[Dict] = None,
    templates_exts: Optional[List[str]] = None,
    **kwargs,
)

 

op_args 이용

from datetime import datetime

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator

def my_func(*op_args):
    return op_args[0]

dag = DAG(
    dag_id='print_test',
    schedule_interval='0 12 * * *',
    start_date=datetime(2021, 8, 4),
    catchup=False
)

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

print_operator = PythonOperator(
    task_id='python_task',
    python_callable=my_func,
    op_args=["some", "and", "text"],
    dag=dag
)

dummy_operator >> print_operator

 

 

op_kwargs 이용

from datetime import datetime

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator


def my_func(**kwargs):
    return "hello " + kwargs['name']


dag = DAG(
    dag_id='print_test',
    schedule_interval='0 12 * * *',
    start_date=datetime(2021, 8, 4),
    catchup=False
)

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

print_operator = PythonOperator(
    task_id='python_task',
    python_callable=my_func,
    op_kwargs={"name": "Patrick", "City": "Vienna", "ID": "Qwalk_123"},
    dag=dag
)

dummy_operator >> print_operator

 

 

 

Jinja template variables 이용

from datetime import datetime

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator


def my_func(**kwargs):
    return "hello " + kwargs['params'].get('name')


dag = DAG(
    dag_id='print_test',
    schedule_interval='0 12 * * *',
    start_date=datetime(2021, 8, 4),
    catchup=False
)

dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)

print_operator = PythonOperator(
    task_id='python_task',
    python_callable=my_func,
    provide_context=True,
    dag=dag
)

dummy_operator >> print_operator

Config로 값을 넘겨주기 위해서는 provide_context를 True로 설정한다.

 

Airflow UI에서 실행할 때 Trigger DAG w/ config를 클릭한다.

 

 

원하는 값을 입력하고 실행하면 된다.

 

 

결과를 보면 입력한 값이 잘 넘어간 걸 확인할 수 있다.

 

 

참고