I'm trying to make a dag that has 2 operators that are created dynamically, depending on the number of "pipelines" that a json config file has. this file is stored in the variable dag_datafusion_args. Then I have a standard bash operator, and I have a task called success at the end that sends a message to the slack saying that the dag is over. the other 2 tasks that are python operators are generated dynamically and run in parallel. I'm using the composer, when I put the dag in the bucket it appears on the webserver ui, but when I click to see the dag the following message appears'DAG "dag_lucas4" seems to be missing. ', If I test the tasks directly by CLI on the kubernetes cluster it works! But I can't seem to make the web UI appear. I tried to do as a suggestion of some people here in SO to restart the webserver by installing a python package, I tried 3x but without success. Does anyone know what can it be?
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from aux_py_files.med.med_airflow_functions import *
from google.cloud import storage
from datetime import timedelta
?
TEMPLATE_SEARCH_PATH = '/home/airflow/gcs/plugins/'
INDEX=1
default_args = {
'owner':'lucas',
'start_date': '2021-01-10',
'email': ['xxxx'],
'email_on_failure': False,
'email_on_success': False,
'retries': 3,
'retry_delay': timedelta(minutes=2),
'on_failure_callback': post_message_fail_to_slack
}
?
dag_datafusion_args=return_datafusion_config_file('med')
?
with DAG('dag_lucas4', default_args = default_args, schedule_interval="30 23 * * *", template_searchpath = [TEMPLATE_SEARCH_PATH]) as dag:
?
extract_ftp_csv_files_load_in_gcs = BashOperator(
task_id='extract_ftp_csv_files_load_in_gcs',
bash_command='aux_sh_files/med/script.sh'
)
?
success = PythonOperator(
task_id='success',
python_callable=post_message_success_to_slack,
op_kwargs={'dag_name':'dag_lucas2'}
)
?
for pipeline,args in dag_datafusion_args.items():
?
?
configure_pipeline=PythonOperator(
task_id=f'configure_pipeline{str(INDEX)}',
python_callable=setPipelineArguments,
op_kwargs={'dag_name':'med', 'pipeline_name':pipeline},
provide_context=True
)
start_pipeline = PythonOperator(
task_id= f'start_pipeline{str(INDEX)}',
python_callable=start_pipeline_wrapper,
op_kwargs={'configure_pipeline_task':f'configure_pipeline{str(INDEX)}'},
retries=3,
provide_context=True
)
?
[extract_ftp_csv_files_load_in_gcs,configure_pipeline] >> start_pipeline >> success
INDEX += 1
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…