Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
270 views
in Technique[技术] by (71.8m points)

python - Cloud Composer can't rendered dynamic dag in webserver UI "DAG seems to be missing"

I'm trying to make a dag that has 2 operators that are created dynamically, depending on the number of "pipelines" that a json config file has. this file is stored in the variable dag_datafusion_args. Then I have a standard bash operator, and I have a task called success at the end that sends a message to the slack saying that the dag is over. the other 2 tasks that are python operators are generated dynamically and run in parallel. I'm using the composer, when I put the dag in the bucket it appears on the webserver ui, but when I click to see the dag the following message appears'DAG "dag_lucas4" seems to be missing. ', If I test the tasks directly by CLI on the kubernetes cluster it works! But I can't seem to make the web UI appear. I tried to do as a suggestion of some people here in SO to restart the webserver by installing a python package, I tried 3x but without success. Does anyone know what can it be?

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from aux_py_files.med.med_airflow_functions import *
from google.cloud import storage
from datetime import timedelta
?
TEMPLATE_SEARCH_PATH = '/home/airflow/gcs/plugins/'
INDEX=1
default_args = {
'owner':'lucas',
'start_date': '2021-01-10',
'email': ['xxxx'],
'email_on_failure': False,
'email_on_success': False,
'retries': 3,
'retry_delay': timedelta(minutes=2),
'on_failure_callback': post_message_fail_to_slack
}
?
dag_datafusion_args=return_datafusion_config_file('med')
?
with DAG('dag_lucas4', default_args = default_args, schedule_interval="30 23 * * *", template_searchpath = [TEMPLATE_SEARCH_PATH]) as dag:
?
    extract_ftp_csv_files_load_in_gcs = BashOperator(
        task_id='extract_ftp_csv_files_load_in_gcs',
        bash_command='aux_sh_files/med/script.sh' 
    )
?
    success = PythonOperator(
                task_id='success',
                python_callable=post_message_success_to_slack,
                op_kwargs={'dag_name':'dag_lucas2'}
    )
?
    for pipeline,args in dag_datafusion_args.items():
?
?
        configure_pipeline=PythonOperator(
                task_id=f'configure_pipeline{str(INDEX)}',
                python_callable=setPipelineArguments,
                op_kwargs={'dag_name':'med', 'pipeline_name':pipeline},
                provide_context=True       
        )
            
        start_pipeline = PythonOperator(
                task_id= f'start_pipeline{str(INDEX)}',
                python_callable=start_pipeline_wrapper,
                op_kwargs={'configure_pipeline_task':f'configure_pipeline{str(INDEX)}'},
                retries=3,
                provide_context=True  
        )
?
        [extract_ftp_csv_files_load_in_gcs,configure_pipeline] >> start_pipeline >> success
        INDEX += 1

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

Appears that The Airflow-Webserver in Cloud Composer runs in the tenant project, the worker and scheduler runs in the customer project. Tenant project is nothing but its google side managed environment for some part of airflow components. So the Webserver UI doesn't have complete access to your project resources. As it doesn't run under your project's environment. So I can read my config json file with return_datafusion_config_file . Best way is create an ENV variable with that file.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...