Mix AWS Glue and Amazon MWAA to construct superior VPC choice and failover methods


AWS Glue is a serverless information integration service that makes it simple to find, put together, transfer, and combine information from a number of sources for analytics, machine studying (ML), and utility improvement.

AWS Glue clients typically have to fulfill strict safety necessities, which generally contain locking down the community connectivity allowed to the job, or working inside a selected VPC to entry one other service. To run contained in the VPC, the roles must be assigned to a single subnet, however probably the most appropriate subnet can change over time (as an illustration, based mostly on the utilization and availability), so it’s possible you’ll choose to make that call at runtime, based mostly by yourself technique.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is an AWS service to run managed Airflow workflows, which permit writing customized logic to coordinate how duties equivalent to AWS Glue jobs run.

On this publish, we present run an AWS Glue job as a part of an Airflow workflow, with dynamic configurable choice of the VPC subnet assigned to the job at runtime.

Resolution overview

To run inside a VPC, an AWS Glue job must be assigned at the very least a connection that features community configuration. Any connection permits specifying a VPC, subnet, and safety group, however for simplicity, this publish makes use of connections of kind: NETWORK, which simply defines the community configuration and doesn’t contain exterior techniques.

If the job has a hard and fast subnet assigned by a single connection, in case of a service outage on the Availability Zones or if the subnet isn’t accessible for different causes, the job can’t run. Moreover, every node (driver or employee) in an AWS Glue job requires an IP deal with assigned from the subnet. When working many massive jobs concurrently, this might result in an IP deal with scarcity and the job working with fewer nodes than meant or not working in any respect.

AWS Glue extract, rework, and cargo (ETL) jobs enable a number of connections to be specified with a number of community configurations. Nevertheless, the job will at all times attempt to use the connections’ community configuration within the order listed and choose the primary one which passes the well being checks and has at the very least two IP addresses to get the job began, which could not be the optimum choice.

With this resolution, you may improve and customise that habits by reordering the connections dynamically and defining the choice precedence. If a retry is required, the connections are reprioritized once more based mostly on the technique, as a result of the circumstances might need modified for the reason that final run.

Because of this, it helps stop the job from failing to run or working underneath capability on account of subnet IP deal with scarcity and even an outage, whereas assembly the community safety and connectivity necessities.

The next diagram illustrates the answer structure.

Conditions

To comply with the steps of the publish, you want a consumer that may log in to the AWS Administration Console and has permission to entry Amazon MWAA, Amazon Digital Personal Cloud (Amazon VPC), and AWS Glue. The AWS Area the place you select to deploy the answer wants the capability to create a VPC and two elastic IP addresses. The default Regional quota for each sorts of sources is 5, so that you may must request a rise through the console.

You additionally want an AWS Identification and Entry Administration (IAM) position appropriate to run AWS Glue jobs when you don’t have one already. For directions, consult with Create an IAM position for AWS Glue.

Deploy an Airflow surroundings and VPC

First, you’ll deploy a brand new Airflow surroundings, together with the creation of a brand new VPC with two public subnets and two non-public ones. It is because Amazon MWAA requires Availability Zone failure tolerance, so it must run on two subnets on two completely different Availability Zones within the Area. The general public subnets are used so the NAT Gateway can present web entry for the non-public subnets.

Full the next steps:

  1. Create an AWS CloudFormation template in your pc by copying the template from the next fast begin information into a neighborhood textual content file.
  2. On the AWS CloudFormation console, select Stacks within the navigation pane.
  3. Select Create stack with the choice With new sources (normal).
  4. Select Add a template file and select the native template file.
  5. Select Subsequent.
  6. Full the setup steps, coming into a reputation for the surroundings, and depart the remainder of the parameters as default.
  7. On the final step, acknowledge that sources will probably be created and select Submit.

The creation can take 20–half-hour, till the standing of the stack adjustments to CREATE_COMPLETE.

The useful resource that may take most of time is the Airflow surroundings. Whereas it’s being created, you may proceed with the next steps, till you might be required to open the Airflow UI.

  1. On the stack’s Sources tab, notice the IDs for the VPC and two non-public subnets (PrivateSubnet1 and PrivateSubnet2), to make use of within the subsequent step.

Create AWS Glue connections

The CloudFormation template deploys two non-public subnets. On this step, you create an AWS Glue connection to every one so AWS Glue jobs can run in them. Amazon MWAA just lately added the capability to run the Airflow cluster on shared VPCs, which reduces value and simplifies community administration. For extra info, consult with Introducing shared VPC assist on Amazon MWAA.

Full the next steps to create the connections:

  1. On the AWS Glue console, select Knowledge connections within the navigation pane.
  2. Select Create connection.
  3. Select Community as the information supply.
  4. Select the VPC and personal subnet (PrivateSubnet1) created by the CloudFormation stack.
  5. Use the default safety group.
  6. Select Subsequent.
  7. For the connection identify, enter MWAA-Glue-Weblog-Subnet1.
  8. Overview the small print and full the creation.
  9. Repeat these steps utilizing PrivateSubnet2 and identify the connection MWAA-Glue-Weblog-Subnet2.

Create the AWS Glue job

Now you create the AWS Glue job that will probably be triggered later by the Airflow workflow. The job makes use of the connections created within the earlier part, however as a substitute of assigning them straight on the job, as you’d usually do, on this state of affairs you allow the job connections record empty and let the workflow resolve which one to make use of at runtime.

The job script on this case will not be vital and is simply meant to exhibit the job ran in one of many subnets, relying on the connection.

  1. On the AWS Glue console, select ETL jobs within the navigation pane, then select Script editor.
  2. Depart the default choices (Spark engine and Begin contemporary) and select Create script.
  3. Change the placeholder script with the next Python code:
    import ipaddress
    import socket
    
    subnets = {
        "PrivateSubnet1": "10.192.20.0/24",
        "PrivateSubnet2": "10.192.21.0/24"
    }
    
    ip = socket.gethostbyname(socket.gethostname())
    subnet_name = "unknown"
    for subnet, cidr in subnets.objects():
        if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr):
            subnet_name = subnet
    
    print(f"The motive force node has been assigned the ip: {ip}"
          + f" which belongs to the subnet: {subnet_name}")
    

  4. Rename the job to AirflowBlogJob.
  5. On the Job particulars tab, for IAM Function, select any position and enter 2 for the variety of employees (only for frugality).
  6. Save these adjustments so the job is created.

Grant AWS Glue permissions to the Airflow surroundings position

The position created for Airflow by the CloudFormation template offers the fundamental permissions to run workflows however to not work together with different providers equivalent to AWS Glue. In a manufacturing mission, you’d outline your individual templates with these extra permissions, however on this publish, for simplicity, you add the extra permissions as an inline coverage. Full the next steps:

  1. On the IAM console, select Roles within the navigation pane.
  2. Find the position created by the template; it is going to begin with the identify you assigned to the CloudFormation stack after which -MwaaExecutionRole-.
  3. On the position particulars web page, on the Add permissions menu, select Create inline coverage.
  4. Swap from Visible to JSON mode and enter the next JSON on the textbox. It assumes that the AWS Glue position you could have follows the conference of beginning with AWSGlueServiceRole. For enhanced safety, you may exchange the wildcard useful resource on the ec2:DescribeSubnets permission with the ARNs of the 2 non-public subnets from the CloudFormation stack.
    {
        "Model": "2012-10-17",
        "Assertion": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:GetConnection"
                ],
                "Useful resource": [
                    "arn:aws:glue:*:*:connection/MWAA-Glue-Blog-Subnet*",
                    "arn:aws:glue:*:*:catalog"
                ]
            },
            {
                "Impact": "Enable",
                "Motion": [
                    "glue:UpdateJob",
                    "glue:GetJob",
                    "glue:StartJobRun",
                    "glue:GetJobRun"
                ],
                "Useful resource": [
                    "arn:aws:glue:*:*:job/AirflowBlogJob",
                    "arn:aws:glue:*:*:job/BlogAirflow"
                ]
            },
            {
                "Impact": "Enable",
                "Motion": [
                    "ec2:DescribeSubnets"
                ],
                "Useful resource": "*"
            },
            {
                "Impact": "Enable",
                "Motion": [
                    "iam:GetRole",
                    "iam:PassRole"
                ],
                "Useful resource": "arn:aws:iam::*:position/service-role/AWSGlueServiceRole*"
            }
        ]
    }
    

  5. Select Subsequent.
  6. Enter GlueRelatedPermissions because the coverage identify and full the creation.

On this instance, we use an ETL script job; for a visible job, as a result of it generates the script mechanically on save, the Airflow position would want permission to write down to the configured script path on Amazon Easy Storage Service (Amazon S3).

Create the Airflow DAG

An Airflow workflow relies on a Directed Acyclic Graph (DAG), which is outlined by a Python file that programmatically specifies the completely different duties concerned and its interdependencies. Full the next scripts to create the DAG:

  1. Create a neighborhood file named glue_job_dag.py utilizing a textual content editor.

In every of the next steps, we offer a code snippet to enter into the file and an evidence of what’s does.

  1. The next snippet provides the required Python modules imports. The modules are already put in on Airflow; if that weren’t the case, you would want to make use of a necessities.txt file to point to Airflow which modules to put in. It additionally defines the Boto3 purchasers that the code will use later. By default, they may use the identical position and Area as Airflow, that’s why you arrange earlier than the position with the extra permissions required.
    import boto3
    from pendulum import datetime, period
    from random import shuffle
    from airflow import DAG
    from airflow.decorators import dag, activity
    from airflow.fashions import Variable
    from airflow.suppliers.amazon.aws.operators.glue import GlueJobOperator
    
    glue_client = boto3.shopper('glue')
    ec2 = boto3.shopper('ec2')
    

  2. The next snippet provides three features to implement the connection order technique, which defines reorder the connections given to ascertain their precedence. That is simply an instance; you may construct your customized code to implement your individual logic, as per your wants. The code first checks the IPs accessible on every connection subnet and separates those which have sufficient IPs accessible to run the job at full capability and those who could possibly be used as a result of they’ve at the very least two IPs accessible, which is the minimal a job wants to begin. If the technique is about to random, it is going to randomize the order inside every of the connection teams beforehand described and add every other connections. If the technique is capability, it is going to organize them from most IPs free to fewest.
    def get_available_ips_from_connection(glue_connection_name):
        conn_response = glue_client.get_connection(Identify=glue_connection_name)
        connection_properties = conn_response['Connection']['PhysicalConnectionRequirements']
        subnet_id = connection_properties['SubnetId']
        subnet_response = ec2.describe_subnets(SubnetIds=[subnet_id])
        return subnet_response['Subnets'][0]['AvailableIpAddressCount']
    
    def get_connections_free_ips(glue_connection_names, num_workers):
        good_connections = []
        usable_connections = []    
        for connection_name in glue_connection_names:
            attempt:
                available_ips = get_available_ips_from_connection(connection_name)
                # Precedence to connections that may maintain the complete cluster and we have not simply tried
                if available_ips >= num_workers:
                    good_connections.append((connection_name, available_ips))
                elif available_ips >= 2: # The naked minimal to begin a Glue job
                    usable_connections.append((connection_name, available_ips))                
            besides Exception as e:
                print(f"[WARNING] Did not test the free ips for:{connection_name}, will skip. Exception: {e}")  
        return good_connections, usable_connections
    
    def prioritize_connections(connection_list, num_workers, technique):
        (good_connections, usable_connections) = get_connections_free_ips(connection_list, num_workers)
        print(f"Good connections: {good_connections}")
        print(f"Usable connections: {usable_connections}")
        all_conn = []
        if technique=="random":
            shuffle(good_connections)
            shuffle(usable_connections)
            # Good connections have precedence
            all_conn = good_connections + usable_connections
        elif technique=="capability":
            # We will kind each on the similar time
            all_conn = good_connections + usable_connections
            all_conn.kind(key=lambda x: -x[1])
        else: 
            increase ValueError(f"Unknown technique specified: {technique}")    
        outcome = [c[0] for c in all_conn] # Simply want the identify
        # Preserve on the finish every other connections that might not be checked for ips
        outcome += [c for c in connection_list if c not in result]
        return outcome
    

  3. The next code creates the DAG itself with the run job activity, which updates the job with the connection order outlined by the technique, runs it, and waits for the outcomes. The job identify, connections, and technique come from Airflow variables, so it may be simply configured and up to date. It has two retries with exponential backoff configured, so if the duties fails, it is going to repeat the complete activity together with the connection choice. Possibly now the only option is one other connection, or the subnet beforehand picked randomly is in an Availability Zone that’s at the moment struggling an outage, and by choosing a special one, it may recuperate.
    with DAG(
        dag_id='glue_job_dag',
        schedule_interval=None, # Run on demand solely
        start_date=datetime(2000, 1, 1), # A begin date is required
        max_active_runs=1,
        catchup=False
    ) as glue_dag:
        
        @activity(
            task_id="glue_task", 
            retries=2,
            retry_delay=period(seconds = 30),
            retry_exponential_backoff=True
        )
        def run_job_task(**ctx):    
            glue_connections = Variable.get("glue_job_dag.glue_connections").strip().break up(',')
            glue_jobname = Variable.get("glue_job_dag.glue_job_name").strip()
            technique= Variable.get('glue_job_dag.technique', 'random') # random or capability
            print(f"Connections accessible: {glue_connections}")
            print(f"Glue job identify: {glue_jobname}")
            print(f"Technique to make use of: {technique}")
            job_props = glue_client.get_job(JobName=glue_jobname)['Job']            
            num_workers = job_props['NumberOfWorkers']
            
            glue_connections = prioritize_connections(glue_connections, num_workers, technique)
            print(f"Working Glue job with the connection order: {glue_connections}")
            existing_connections = job_props.get('Connections',{}).get('Connections', [])
            # Protect different connections that we do not handle
            other_connections = [con for con in existing_connections if con not in glue_connections]
            job_props['Connections'] = {"Connections": glue_connections + other_connections}
            # Clear up properties so we are able to reuse the dict for the replace request
            for prop_name in ['Name', 'CreatedOn', 'LastModifiedOn', 'AllocatedCapacity', 'MaxCapacity']:
                del job_props[prop_name]
    
            GlueJobOperator(
                task_id='submit_job',
                job_name=glue_jobname,
                iam_role_name=job_props['Role'].break up('/')[-1],
                update_config=True,
                create_job_kwargs=job_props,
                wait_for_completion=True
            ).execute(ctx)   
            
        run_job_task()
    

Create the Airflow workflow

Now you create a workflow that invokes the AWS Glue job you simply created:

  1. On the Amazon S3 console, find the bucket created by the CloudFormation template, which may have a reputation beginning with the identify of the stack after which -environmentbucket- (for instance, myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Inside that bucket, create a folder referred to as dags, and inside that folder, add the DAG file glue_job_dag.py that you simply created within the earlier part.
  3. On the Amazon MWAA console, navigate to the surroundings you deployed with the CloudFormation stack.

If the standing will not be but Accessible, wait till it reaches that state. It shouldn’t take longer than half-hour because you deployed the CloudFormation stack.

  1. Select the surroundings hyperlink on the desk to see the surroundings particulars.

It’s configured to select up DAGs from the bucket and folder you used within the earlier steps. Airflow will monitor that folder for adjustments.

  1. Select Open Airflow UI to open a brand new tab accessing the Airflow UI, utilizing the built-in IAM safety to log you in.

If there’s any subject with the DAG file you created, it is going to show an error on high of the web page indicating the strains affected. In that case, assessment the steps and add once more. After just a few seconds, it is going to parse it and replace or take away the error banner.

  1. On the Admin menu, select Variables.
  2. Add three variables with the next keys and values:
    1. Key glue_job_dag.glue_connections with worth MWAA-Glue-Weblog-Subnet1,MWAA-Glue-Weblog-Subnet2.
    2. Key glue_job_dag.glue_job_name with worth AirflowBlogJob.
    3. Key glue_job_dag.technique with worth capability.

Run the job with a dynamic subnet project

Now you’re able to run the workflow and see the technique dynamically reordering the connections.

  1. On the Airflow UI, select DAGs, and on the row glue_job_dag, select the play icon.
  2. On the Browse menu, select Process cases.
  3. On the cases desk, scroll proper to show the Log Url and select the icon on it to open the log.

The log will replace as the duty runs; you may find the road beginning with “Working Glue job with the connection order:” and the earlier strains exhibiting particulars of the connection IPs and the class assigned. If an error happens, you’ll see the small print on this log.

  1. On the AWS Glue console, select ETL jobs within the navigation pane, then select the job AirflowBlogJob.
  2. On the Runs tab, select the run occasion, then the Output logs hyperlink, which is able to open a brand new tab.
  3. On the brand new tab, use the log stream hyperlink to open it.

It’ll show the IP that the driving force was assigned and which subnet it belongs to, which ought to match the connection indicated by Airflow (if the log will not be displayed, select Resume so it will get up to date as quickly because it’s accessible).

  1. On the Airflow UI, edit the Airflow variable glue_job_dag.technique to set it to random.
  2. Run the DAG a number of instances and see how the ordering adjustments.

Clear up

When you not want the deployment, delete the sources to keep away from any additional expenses:

  1. Delete the Python script you uploaded, so the S3 bucket may be mechanically deleted within the subsequent step.
  2. Delete the CloudFormation stack.
  3. Delete the AWS Glue job.
  4. Delete the script that the job saved in Amazon S3.
  5. Delete the connections you created as a part of this publish.

Conclusion

On this publish, we confirmed how AWS Glue and Amazon MWAA can work collectively to construct extra superior customized workflows, whereas minimizing the operational and administration overhead. This resolution offers you extra management about how your AWS Glue job runs to fulfill particular operational, community, or safety necessities.

You may deploy your individual Amazon MWAA surroundings in a number of methods, equivalent to with the template used on this publish, on the Amazon MWAA console, or utilizing the AWS CLI. You too can implement your individual methods to orchestrate AWS Glue jobs, based mostly in your community structure and necessities (as an illustration, to run the job nearer to the information when potential).


In regards to the authors

Michael Greenshtein is an Analytics Specialist Options Architect for the Public Sector.

Gonzalo Herreros is a Senior Huge Knowledge Architect on the AWS Glue crew.

Recent Articles

Related Stories

Leave A Reply

Please enter your comment!
Please enter your name here

Stay on op - Ge the daily news in your inbox