Dynamic DAG era with YAML and DAG Manufacturing unit in Amazon MWAA


Amazon Managed Workflow for Apache Airflow (Amazon MWAA) is a managed service that lets you use a well-known Apache Airflow surroundings with improved scalability, availability, and safety to reinforce and scale your online business workflows with out the operational burden of managing the underlying infrastructure. In Airflow, Directed Acyclic Graphs (DAGs) are outlined as Python code. Dynamic DAGs seek advice from the power to generate DAGs on the fly throughout runtime, sometimes based mostly on some exterior situations, configurations, or parameters. Dynamic DAGs lets you create, schedule, and run duties inside a DAG based mostly on knowledge and configurations which will change over time.

There are numerous methods to introduce dynamism in Airflow DAGs (dynamic DAG era) utilizing surroundings variables and exterior information. One of many approaches is to make use of the DAG Manufacturing unit YAML based mostly configuration file technique. This library goals to facilitate the creation and configuration of latest DAGs through the use of declarative parameters in YAML. It permits default customizations and is open-source, making it easy to create and customise new functionalities.

On this put up, we discover the method of making Dynamic DAGs with YAML information, utilizing the DAG Manufacturing unit library. Dynamic DAGs supply a number of advantages:

  1. Enhanced code reusability – By structuring DAGs via YAML information, we promote reusable parts, lowering redundancy in your workflow definitions.
  2. Streamlined upkeep – YAML-based DAG era simplifies the method of modifying and updating workflows, guaranteeing smoother upkeep procedures.
  3. Versatile parameterization – With YAML, you’ll be able to parameterize DAG configurations, facilitating dynamic changes to workflows based mostly on various necessities.
  4. Improved scheduler effectivity – Dynamic DAGs allow extra environment friendly scheduling, optimizing useful resource allocation and enhancing total workflow runs
  5. Enhanced scalability – YAML-driven DAGs permit for parallel runs, enabling scalable workflows able to dealing with elevated workloads effectively.

By harnessing the ability of YAML information and the DAG Manufacturing unit library, we unleash a flexible method to constructing and managing DAGs, empowering you to create strong, scalable, and maintainable knowledge pipelines.

Overview of resolution

On this put up, we are going to use an instance DAG file that’s designed to course of a COVID-19 knowledge set. The workflow course of includes processing an open supply knowledge set supplied by WHO-COVID-19-World. After we set up the DAG-Manufacturing unit Python bundle, we create a YAML file that has definitions of varied duties. We course of the country-specific loss of life depend by passing Nation as a variable, which creates particular person country-based DAGs.

The next diagram illustrates the general resolution together with knowledge flows inside logical blocks.

Overview of the Solution

Conditions

For this walkthrough, it’s best to have the next conditions:

Moreover, full the next steps (run the setup in an AWS Area the place Amazon MWAA is out there):

  1. Create an Amazon MWAA surroundings (should you don’t have one already). If that is your first time utilizing Amazon MWAA, seek advice from Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Be certain that the AWS Id and Entry Administration (IAM) person or function used for establishing the surroundings has IAM insurance policies connected for the next permissions:

The entry insurance policies talked about listed here are only for the instance on this put up. In a manufacturing surroundings, present solely the wanted granular permissions by exercising least privilege ideas.

  1. Create an distinctive (inside an account) Amazon S3 bucket title whereas creating your Amazon MWAA surroundings, and create folders known as dags and necessities.
    Amazon S3 Bucket
  2. Create and add a necessities.txt file with the next content material to the necessities folder. Change {environment-version} together with your surroundings’s model quantity, and {Python-version} with the model of Python that’s suitable together with your surroundings:
    --constraint "https://uncooked.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

Pandas is required only for the instance use case described on this put up, and dag-factory is the one required plug-in. It is suggested to verify the compatibility of the newest model of dag-factory with Amazon MWAA. The boto and psycopg2-binary libraries are included with the Apache Airflow v2 base set up and don’t must be laid out in your necessities.txt file.

  1. Obtain the WHO-COVID-19-global knowledge file to your native machine and add it beneath the dags prefix of your S3 bucket.

Just be sure you are pointing to the newest AWS S3 bucket model of your necessities.txt file for the extra bundle set up to occur. This could sometimes take between 15 – 20 minutes relying in your surroundings configuration.

Validate the DAGs

When your Amazon MWAA surroundings reveals as Out there on the Amazon MWAA console, navigate to the Airflow UI by selecting Open Airflow UI subsequent to your surroundings.

Validate the DAG

Confirm the prevailing DAGs by navigating to the DAGs tab.

Verify the DAG

Configure your DAGs

Full the next steps:

  1. Create empty information named dynamic_dags.yml, example_dag_factory.py and process_s3_data.py in your native machine.
  2. Edit the process_s3_data.py file and put it aside with following code content material, then add the file again to the Amazon S3 bucket dags folder. We’re doing a little fundamental knowledge processing within the code:
    1. Learn the file from an Amazon S3 location
    2. Rename the Country_code column as applicable to the nation.
    3. Filter knowledge by the given nation.
    4. Write the processed closing knowledge into CSV format and add again to S3 prefix.
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### High stage Variables exchange S3_BUCKET together with your bucket title ###
    s3 = boto3.consumer('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   standing = response['ResponseMetadata']['HTTPStatusCode']
   if standing == 200:
### learn csv file and filter based mostly on the nation to jot down again ###
       df = pd.read_csv(response.get("Physique"))
       df.rename(columns={"Country_code": "nation"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Physique=csv_buffer.getvalue()
                   )
       standing = response['ResponseMetadata']['HTTPStatusCode']
       if standing == 200:
           print(f"Profitable S3 put_object response. Standing - {standing}")
       else:
           print(f"Unsuccessful S3 put_object response. Standing - {standing}")
   else:
       print(f"Unsuccessful S3 get_object response. Standing - {standing}")

  1. Edit the dynamic_dags.yml and put it aside with the next code content material, then add the file again to the dags folder. We’re stitching varied DAGs based mostly on the nation as follows:
    1. Outline the default arguments which can be handed to all DAGs.
    2. Create a DAG definition for particular person international locations by passing op_args
    3. Map the process_s3_data operate with python_callable_name.
    4. Use Python Operator to course of csv file knowledge saved in Amazon S3 bucket.
    5. We’ve set schedule_interval as 10 minutes, however be at liberty to regulate this worth as wanted.
default:
  default_args:
    proprietor: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"

  1. Edit the file example_dag_factory.py and put it aside with the next code content material, then add the file again to dags folder. The code cleans the prevailing the DAGs and generates clean_dags() technique and the creating new DAGs utilizing the generate_dags() technique from the DagFactory occasion.
from airflow import DAG
import dagfactory
  
config_file = "/usr/native/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to scrub up or delete any current DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())

  1. After you add the information, return to the Airflow UI console and navigate to the DAGs tab, the place you’ll discover new DAGs.
    List the new DAGs
  2. When you add the information, return to the Airflow UI console and beneath the DAGs tab you’ll discover new DAGs are showing as proven under:DAGs

You may allow DAGs by making them energetic and testing them individually. Upon activation, an extra CSV file named count_death_{COUNTRY_CODE}.csv is generated within the dags folder.

Cleansing up

There could also be prices related to utilizing the assorted AWS companies mentioned on this put up. To forestall incurring future costs, delete the Amazon MWAA surroundings after you might have accomplished the duties outlined on this put up, and empty and delete the S3 bucket.

Conclusion

On this weblog put up we demonstrated the way to use the dag-factory library to create dynamic DAGs. Dynamic DAGs are characterised by their capacity to generate outcomes with every parsing of the DAG file based mostly on configurations. Think about using dynamic DAGs within the following situations:

  • Automating migration from a legacy system to Airflow, the place flexibility in DAG era is essential
  • Conditions the place solely a parameter modifications between totally different DAGs, streamlining the workflow administration course of
  • Managing DAGs which can be reliant on the evolving construction of a supply system, offering adaptability to modifications
  • Establishing standardized practices for DAGs throughout your staff or group by creating these blueprints, selling consistency and effectivity
  • Embracing YAML-based declarations over advanced Python coding, simplifying DAG configuration and upkeep processes
  • Creating knowledge pushed workflows that adapt and evolve based mostly on the info inputs, enabling environment friendly automation

By incorporating dynamic DAGs into your workflow, you’ll be able to improve automation, adaptability, and standardization, finally bettering the effectivity and effectiveness of your knowledge pipeline administration.

To be taught extra about Amazon MWAA DAG Manufacturing unit, go to Amazon MWAA for Analytics Workshop: DAG Manufacturing unit. For added particulars and code examples on Amazon MWAA, go to the Amazon MWAA Person Information and the Amazon MWAA examples GitHub repository.


Concerning the Authors

 Jayesh Shinde is Sr. Software Architect with AWS ProServe India. He focuses on creating varied options which can be cloud centered utilizing trendy software program growth practices like serverless, DevOps, and analytics.

Harshd Yeola is Sr. Cloud Architect with AWS ProServe India serving to prospects emigrate and modernize their infrastructure into AWS. He focuses on constructing DevSecOps and scalable infrastructure utilizing containers, AIOPs, and AWS Developer Instruments and companies.

Recent Articles

Related Stories

Leave A Reply

Please enter your comment!
Please enter your name here

Stay on op - Ge the daily news in your inbox