DataLeap 中支持数据生产任务的工作流调度,火山引擎 E-MapReduce(EMR)集群内的 Airflow 组件也支持数据生产任务的工作流调度。如果在实际的使用场景中,既使用到了 DataLeap 中的工作流调度,也使用到了 EMR 集群内的 Airflow 中的工作流调度,并且需要配置两个工作流调度之间的依赖关系,您便可以参考本文中提供的 Shell 触发 Airflow 工作流执行方案,来满足您的使用场景。
DataLeap 中支持 Shell 脚本类型的任务。Shell 脚本可以在您 EMR 集群所在的 VPC 内执行,因此可以通过该 Shell 脚本调用 EMR 集群内的 Airflow REST API,来触发 Airflow 工作流调度的执行,即可以实现 EMR 集群内 Airflow 工作流对于 DataLeap 中计算任务的依赖。
我们也建议您将 Airflow 中的工作流迁移到 DataLeap 平台上,实现:
我们将为您提供整体迁移支持及服务,帮助您轻松完成作业迁移和数据上云,提升数据研发效率,降低任务运维管理成本。
注意
登录 EMR 集群 Master 主节点。登录方式详见登录集群。
使用以下命令,创建并编辑 Airflow DAG 的工作流文件,以 py 格式创建:
vim airflow_test.py
参考以下 Airflow 官网示例,在 airflow_test.py 文件中编写 DAG 脚本:
""" ### Tutorial Documentation Documentation that goes along with the Airflow tutorial located [here](https://airflow.apache.org/tutorial.html) """ # [START tutorial] # [START import_module] from datetime import datetime, timedelta from textwrap import dedent # The DAG object; we'll need this to instantiate a DAG from airflow import DAG # Operators; we need this to operate! from airflow.operators.bash import BashOperator # [END import_module] # [START instantiate_dag] with DAG( 'tutorial', # [START default_args] # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args={ 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' }, # [END default_args] description='A simple tutorial DAG', schedule_interval=None, start_date=datetime(2021, 1, 1), catchup=False, tags=['example'], ) as dag: # [END instantiate_dag] # t1, t2 and t3 are examples of tasks created by instantiating operators # [START basic_task] t1 = BashOperator( task_id='print_date', bash_command='date', ) t2 = BashOperator( task_id='sleep', depends_on_past=False, bash_command='sleep 5', retries=3, ) # [END basic_task] # [START documentation] t1.doc_md = dedent( """\ #### Task Documentation You can document your task using the attributes `doc_md` (markdown), `doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which gets rendered in the UI's Task Instance Details page. ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png) """ ) dag.doc_md = __doc__ # providing that you have a docstring at the beginning of the DAG dag.doc_md = """ This is a documentation placed anywhere """ # otherwise, type it like this # [END documentation] # [START jinja_template] templated_command = dedent( """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ ) t3 = BashOperator( task_id='templated', depends_on_past=False, bash_command=templated_command, params={'my_param': 'Parameter I passed in'}, ) # [END jinja_template] t1 >> [t2, t3] # [END tutorial]
DAG 文件完成编写后,通过以下命令,将文件分发至集群下各个节点中:
dagdispatch ./airflow_test.py
切换至集群其他节点中查看是否已分发成功:
## 切换至 core 节点 ssh emr-core-1; ## 进入 dags 目录位置 cd /usr/lib/emr/current/airflow/dags
ls 查看 dags 下所有的文件,airflow_test.py 文件存在 core 节点的 dags 中,即表明已分发成功。
说明
Airflow UI 访问链接登录条件如下:
Curl 触发 Airflow API 需经过用户认证,需前往服务参数页面,修改 auth_backends 配置参数。用户认证详情请参考:https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/security/api.html。
airflow.api.auth.backend.basic_auth
。任务完成新建后,进入 Shell 任务编辑界面,编写 Shell 脚本,可以通过 curl 调用集群内 Airflow 的 REST API,触发 Airflow 的行为:
curl -X POST 'http://ip:port/api/v1/dags/{dag_name}/dagRuns' \ -H 'Content-Type: application/json' \ --user "username:password" \ -d '{"conf": {}}'
更多 API 调用方式,详见 Airflow API
替换参数说明:
参数 | 说明 |
---|---|
IP | EMR 集群中 Master 节点内网 IP 信息。详见 4.3 内网 IP 信息获取。 |
port | EMR 集群中 Airflow 组件的 Web 端口号配置,可以在 Airflow 的集群详情 > 服务参数配置页面中查询到:
|
{dag_name} | 填写 Airflow UI 控制台界面显示的 DAG 名称信息,示例中填写为 tutorial。 |
username | 在 OpenLDAP 的服务参数配置页面中查询 Airflow 登录的用户名信息:airflow_admin 参数对应的值。 |
password | 在 OpenLDAP 的服务参数配置页面中查询 Airflow 登录的用户密码信息:airflow_password 参数对应的值。 |
-d | 根据实际场景,可输入对应的请求参数。 |
脚本配置完成后,您可进行以下操作,完成任务执行资源配置:
在选取独享计算资源组设置后,网络配置中会默认将独享计算资源组绑定的私有网络、子网、安全组信息填入,且不可修改。
您可在创建独享计算资源组时,配置对应的私有网络信息,需和 EMR 集群中的网络配置信息保持一致,便于网络互通。创建资源组操作详见:资源组管理。
产出数据登记用于记录任务的数据血缘,不会对代码逻辑造成影响,此示例选择默认。
任务配置完成后,依次单击上方操作栏中保存和调试图标按钮,执行编辑好的 Shell 命令,执行成功后,可在界面下方查看运行日志和 API 返回参数信息。
结果验证确认无误后,您可进行后续的调度设置和将任务提交发布到运维中心离线任务运维中执行。
后续任务运维操作详见:离线任务运维。