Construct an end-to-end serverless streaming pipeline with Apache Kafka on Amazon MSK utilizing Python


The amount of information generated globally continues to surge, from gaming, retail, and finance, to manufacturing, healthcare, and journey. Organizations are on the lookout for extra methods to rapidly use the fixed influx of information to innovate for his or her companies and prospects. They must reliably seize, course of, analyze, and cargo the information right into a myriad of information shops, all in actual time.

Apache Kafka is a well-liked alternative for these real-time streaming wants. Nevertheless, it may be difficult to arrange a Kafka cluster together with different knowledge processing parts that scale routinely relying in your utility’s wants. You threat under-provisioning for peak visitors, which might result in downtime, or over-provisioning for base load, resulting in wastage. AWS gives a number of serverless companies like Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Information Firehose, Amazon DynamoDB, and AWS Lambda that scale routinely relying in your wants.

On this submit, we clarify how you should use a few of these companies, together with MSK Serverless, to construct a serverless knowledge platform to fulfill your real-time wants.

Answer overview

Let’s think about a state of affairs. You’re chargeable for managing 1000’s of modems for an web service supplier deployed throughout a number of geographies. You need to monitor the modem connectivity high quality that has a big affect on buyer productiveness and satisfaction. Your deployment contains totally different modems that should be monitored and maintained to make sure minimal downtime. Every system transmits 1000’s of 1 KB data each second, corresponding to CPU utilization, reminiscence utilization, alarm, and connection standing. You need real-time entry to this knowledge so you possibly can monitor efficiency in actual time, and detect and mitigate points rapidly. You additionally want longer-term entry to this knowledge for machine studying (ML) fashions to run predictive upkeep assessments, discover optimization alternatives, and forecast demand.

Your purchasers that collect the information onsite are written in Python, and so they can ship all the information as Apache Kafka subjects to Amazon MSK. In your utility’s low-latency and real-time knowledge entry, you should use Lambda and DynamoDB. For longer-term knowledge storage, you should use managed serverless connector service Amazon Information Firehose to ship knowledge to your knowledge lake.

The next diagram exhibits how one can construct this end-to-end serverless utility.

end-to-end serverless application

Let’s comply with the steps within the following sections to implement this structure.

Create a serverless Kafka cluster on Amazon MSK

We use Amazon MSK to ingest real-time telemetry knowledge from modems. Making a serverless Kafka cluster is easy on Amazon MSK. It solely takes a couple of minutes utilizing the AWS Administration Console or AWS SDK. To make use of the console, confer with Getting began utilizing MSK Serverless clusters. You create a serverless cluster, AWS Id and Entry Administration (IAM) function, and shopper machine.

Create a Kafka matter utilizing Python

When your cluster and shopper machine are prepared, SSH to your shopper machine and set up Kafka Python and the MSK IAM library for Python.

  • Run the next instructions to put in Kafka Python and the MSK IAM library:
pip set up kafka-python

pip set up aws-msk-iam-sasl-signer-python

  • Create a brand new file known as createTopic.py.
  • Copy the next code into this file, changing the bootstrap_servers and area data with the main points on your cluster. For directions on retrieving the bootstrap_servers data on your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS area the place MSK cluster is positioned
area= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to supply MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(area)
        return token

# Create an occasion of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers="<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>",
    security_protocol="SASL_SSL",
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create matter
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Subject has been created")
else:
    print("matter already exists!. Listing of subjects are:" + str(existing_topics))

  • Run the createTopic.py script to create a brand new Kafka matter known as mytopic in your serverless cluster:

Produce data utilizing Python

Let’s generate some pattern modem telemetry knowledge.

  • Create a brand new file known as kafkaDataGen.py.
  • Copy the next code into this file, updating the BROKERS and area data with the main points on your cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname="mytopic"

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
area= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(area)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol="SASL_SSL",
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Methodology to get a random mannequin identify
def getModel():
    merchandise=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (merchandise[randomnum])

# Methodology to get a random interface standing
def getInterfaceStatus():
    standing=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (standing[randomnum])

# Methodology to get a random CPU utilization
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Methodology to get a random reminiscence utilization
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Methodology to generate pattern knowledge
def generateData():
    
    mannequin=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface="eth4.1"
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=mannequin
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Repeatedly generate and ship knowledge
whereas True:
    knowledge =generateData()
    print(knowledge)
    attempt:
        future = producer.ship(topicname, worth=knowledge)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    besides Exception as e:
        print(e.with_traceback())

  • Run the kafkaDataGen.py to repeatedly generate random knowledge and publish it to the required Kafka matter:

Retailer occasions in Amazon S3

Now you retailer all of the uncooked occasion knowledge in an Amazon Easy Storage Service (Amazon S3) knowledge lake for analytics. You should use the identical knowledge to coach ML fashions. The integration with Amazon Information Firehose permits Amazon MSK to seamlessly load knowledge out of your Apache Kafka clusters into an S3 knowledge lake. Full the next steps to repeatedly stream knowledge from Kafka to Amazon S3, eliminating the necessity to construct or handle your personal connector purposes:

  • On the Amazon S3 console, create a brand new bucket. You may also use an present bucket.
  • Create a brand new folder in your S3 bucket known as streamingDataLake.
  • On the Amazon MSK console, select your MSK Serverless cluster.
  • On the Actions menu, select Edit cluster coverage.

cluster policy

  • Choose Embrace Firehose service principal and select Save adjustments.

firehose service principal

  • On the S3 supply tab, select Create supply stream.

delivery stream

  • For Supply, select Amazon MSK.
  • For Vacation spot, select Amazon S3.

source and destination

  • For Amazon MSK cluster connectivity, choose Personal bootstrap brokers.
  • For Subject, enter a subject identify (for this submit, mytopic).

source settings

  • For S3 bucket, select Browse and select your S3 bucket.
  • Enter streamingDataLake as your S3 bucket prefix.
  • Enter streamingDataLakeErr as your S3 bucket error output prefix.

destination settings

  • Select Create supply stream.

create delivery stream

You’ll be able to confirm that the information was written to your S3 bucket. It’s best to see that the streamingDataLake listing was created and the information are saved in partitions.

amazon s3

Retailer occasions in DynamoDB

For the final step, you retailer the latest modem knowledge in DynamoDB. This permits the shopper utility to entry the modem standing and work together with the modem remotely from wherever, with low latency and excessive availability. Lambda seamlessly works with Amazon MSK. Lambda internally polls for brand spanking new messages from the occasion supply after which synchronously invokes the goal Lambda perform. Lambda reads the messages in batches and offers these to your perform as an occasion payload.

Lets first create a desk in DynamoDB. Discuss with DynamoDB API permissions: Actions, sources, and situations reference to confirm that your shopper machine has the mandatory permissions.

  • Create a brand new file known as createTable.py.
  • Copy the next code into the file, updating the area data:
import boto3
area='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.shopper('dynamodb', region_name=area)
table_name="device_status"
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the desk with on-demand capability mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode="PAY_PER_REQUEST"
)
print(f"Desk '{table_name}' created with on-demand capability mode.")

  • Run the createTable.py script to create a desk known as device_status in DynamoDB:

Now let’s configure the Lambda perform.

  • On the Lambda console, select Features within the navigation pane.
  • Select Create perform.
  • Choose Creator from scratch.
  • For Perform identify¸ enter a reputation (for instance, my-notification-kafka).
  • For Runtime, select Python 3.11.
  • For Permissions, choose Use an present function and select a job with permissions to learn out of your cluster.
  • Create the perform.

On the Lambda perform configuration web page, now you can configure sources, locations, and your utility code.

  • Select Add set off.
  • For Set off configuration, enter MSK to configure Amazon MSK as a set off for the Lambda supply perform.
  • For MSK cluster, enter myCluster.
  • Deselect Activate set off, since you haven’t configured your Lambda perform but.
  • For Batch measurement, enter 100.
  • For Beginning place, select Newest.
  • For Subject identify¸ enter a reputation (for instance, mytopic).
  • Select Add.
  • On the Lambda perform particulars web page, on the Code tab, enter the next code:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    attempt:
        aa=json.hundreds(payload)
        return aa
    besides:
        return 'err'

def lambda_handler(occasion, context):
    base64records = occasion['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for document in raw_records:
        merchandise = json.hundreds(document)
        deviceid=merchandise['deviceid']
        interface=merchandise['interface']
        interfacestatus=merchandise['interfacestatus']
        cpuusage=merchandise['cpuusage']
        memoryusage=merchandise['memoryusage']
        event_time=merchandise['event_time']
        
        dynamodb = boto3.shopper('dynamodb')
        table_name="device_status"
        merchandise = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the merchandise to the DynamoDB desk
        response = dynamodb.put_item(
            TableName=table_name,
            Merchandise=merchandise
        )
        
        print(f"Merchandise written to DynamoDB")

  • Deploy the Lambda perform.
  • On the Configuration tab, select Edit to edit the set off.

edit trigger

  • Choose the set off, then select Save.
  • On the DynamoDB console, select Discover objects within the navigation pane.
  • Choose the desk device_status.

You will note Lambda is writing occasions generated within the Kafka matter to DynamoDB.

ddb table

Abstract

Streaming knowledge pipelines are crucial for constructing real-time purposes. Nevertheless, organising and managing the infrastructure might be daunting. On this submit, we walked by means of tips on how to construct a serverless streaming pipeline on AWS utilizing Amazon MSK, Lambda, DynamoDB, Amazon Information Firehose, and different companies. The important thing advantages are not any servers to handle, computerized scalability of the infrastructure, and a pay-as-you-go mannequin utilizing totally managed companies.

Able to construct your personal real-time pipeline? Get began as we speak with a free AWS account. With the ability of serverless, you possibly can focus in your utility logic whereas AWS handles the undifferentiated heavy lifting. Let’s construct one thing superior on AWS!


In regards to the Authors

Masudur Rahaman Sayem is a Streaming Information Architect at AWS. He works with AWS prospects globally to design and construct knowledge streaming architectures to unravel real-world enterprise issues. He makes a speciality of optimizing options that use streaming knowledge companies and NoSQL. Sayem may be very enthusiastic about distributed computing.

Michael Oguike is a Product Supervisor for Amazon MSK. He’s enthusiastic about utilizing knowledge to uncover insights that drive motion. He enjoys serving to prospects from a variety of industries enhance their companies utilizing knowledge streaming. Michael additionally loves studying about behavioral science and psychology from books and podcasts.

Recent Articles

Related Stories

Leave A Reply

Please enter your comment!
Please enter your name here

Stay on op - Ge the daily news in your inbox