반응형
op_kwargs 와 op_args 이용
두 개의 차이는 op_kwargs는 dict이고 op_args는 list이다.
PythonOperator
def __init__(
self,
*,
python_callable: Callable,
op_args: Optional[List] = None,
op_kwargs: Optional[Dict] = None,
templates_dict: Optional[Dict] = None,
templates_exts: Optional[List[str]] = None,
**kwargs,
)
op_args 이용
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
def my_func(*op_args):
return op_args[0]
dag = DAG(
dag_id='print_test',
schedule_interval='0 12 * * *',
start_date=datetime(2021, 8, 4),
catchup=False
)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
print_operator = PythonOperator(
task_id='python_task',
python_callable=my_func,
op_args=["some", "and", "text"],
dag=dag
)
dummy_operator >> print_operator
op_kwargs 이용
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
def my_func(**kwargs):
return "hello " + kwargs['name']
dag = DAG(
dag_id='print_test',
schedule_interval='0 12 * * *',
start_date=datetime(2021, 8, 4),
catchup=False
)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
print_operator = PythonOperator(
task_id='python_task',
python_callable=my_func,
op_kwargs={"name": "Patrick", "City": "Vienna", "ID": "Qwalk_123"},
dag=dag
)
dummy_operator >> print_operator
Jinja template variables 이용
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
def my_func(**kwargs):
return "hello " + kwargs['params'].get('name')
dag = DAG(
dag_id='print_test',
schedule_interval='0 12 * * *',
start_date=datetime(2021, 8, 4),
catchup=False
)
dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
print_operator = PythonOperator(
task_id='python_task',
python_callable=my_func,
provide_context=True,
dag=dag
)
dummy_operator >> print_operator
Config로 값을 넘겨주기 위해서는 provide_context를 True로 설정한다.
Airflow UI에서 실행할 때 Trigger DAG w/ config를 클릭한다.
원하는 값을 입력하고 실행하면 된다.
결과를 보면 입력한 값이 잘 넘어간 걸 확인할 수 있다.
참고
반응형