Data Pipeline with Apache Airflow and AWS

Think Different - Dhiraj Patra
6 min readDec 30, 2023

--

Let’s delve into the concept of a data pipeline and its significance in the context of the given scenario:

Data Pipeline:

Definition:

A data pipeline is a set of processes and technologies used to ingest, process, transform, and move data from one or more sources to a destination, typically a storage or analytics platform. It provides a structured way to automate the flow of data, enabling efficient data processing and analysis.

Why Data Pipeline?

1. Data Integration:

- Challenge: Data often resides in various sources and formats.

- Solution: Data pipelines integrate data from diverse sources into a unified format, facilitating analysis.

2. Automation:

- Challenge: Manual data movement and transformation can be time-consuming and error-prone.

- Solution: Data pipelines automate these tasks, reducing manual effort and minimizing errors.

3. Scalability:

- Challenge: As data volume grows, manual processing becomes impractical.

- Solution: Data pipelines are scalable, handling large volumes of data efficiently.

4. Consistency:

- Challenge: Inconsistent data formats and structures.

- Solution: Data pipelines enforce consistency, ensuring data quality and reliability.

5. Real-time Processing:

- Challenge: Timely availability of data for analysis.

- Solution: Advanced data pipelines support real-time or near-real-time processing for timely insights.

6. Dependency Management:

- Challenge: Managing dependencies between different data processing tasks.

- Solution: Data pipelines define dependencies, orchestrating tasks in a logical order.

In the Given Scenario:

1. Extract (OpenWeather API):

- Data is extracted from the OpenWeather API, fetching weather data.

2. Transform (FastAPI and Lambda):

- FastAPI transforms the raw weather data into a desired format.

- AWS Lambda triggers the FastAPI endpoint and performs additional transformations.

3. Load (S3 Bucket):

- The transformed data is loaded into an S3 bucket, acting as a data lake.

Key Components:

1. Source Systems:

- OpenWeather API serves as the source of raw weather data.

2. Processing Components:

- FastAPI: Transforms the data.

- AWS Lambda: Triggers FastAPI and performs additional transformations.

3. Data Storage:

- S3 Bucket: Acts as a data lake for storing the processed weather data.

4. Orchestration Tool:

- Apache Airflow orchestrates the entire process, scheduling and coordinating tasks.

Benefits of Data Pipeline:

1. Efficiency:

- Automation reduces manual effort, increasing efficiency.

2. Reliability:

- Automated processes minimize the risk of errors and inconsistencies.

3. Scalability:

- Scales to handle growing volumes of data

4. Consistency:

- Enforces consistent data processing and storage practices.

5. Real-time Insights:

- Supports real-time or near-real-time data processing for timely insights.

End-to-End Code and Steps:

Sure, let’s break down the context, tools, and steps involved in building an end-to-end data pipeline using Apache Airflow, OpenWeather API, AWS Lambda, FastAPI, and S3.

Context:

1. Apache Airflow:

- Open-source platform for orchestrating complex workflows.

- Allows you to define, schedule, and monitor workflows as Directed Acyclic Graphs (DAGs).

2. OpenWeather API:

- Provides weather data through an API.

- Requires an API key for authentication.

3. AWS Lambda:

- Serverless computing service for running code without provisioning servers.

- Can be triggered by events, such as an HTTP request.

4. FastAPI:

- Modern, fast web framework for building APIs with Python 3.7+ based on standard Python type hints.

- Used for extracting and transforming weather data.

5. S3 (Amazon Simple Storage Service):

- Object storage service by AWS for storing and retrieving any amount of data.

- Acts as the data lake.

Let’s dive into the concepts of Directed Acyclic Graphs (DAGs), operators, and tasks in the context of Apache Airflow:

Directed Acyclic Graph (DAG):

- Definition:

- A Directed Acyclic Graph (DAG) is a collection of tasks with defined relationships, where each task represents a unit of work.

- The “directed” part signifies the flow of data or dependencies between tasks.

- The “acyclic” part ensures that there are no cycles or loops in the graph, meaning tasks can’t depend on themselves or create circular dependencies.

- Why DAGs in Apache Airflow:

- DAGs in Apache Airflow define the workflow for a data pipeline.

- Tasks within a DAG are orchestrated based on dependencies, ensuring a logical and ordered execution.

Operator:

- Definition:

- An operator defines a single, atomic task in Apache Airflow.

- Operators determine what actually gets done in each task.

- Types of Operators:

1. Action Operators:

- Perform an action, such as running a Python function, executing a SQL query, or triggering an external system.

2. Transfer Operators:

- Move data between systems, for example, copying files, uploading to S3, or transferring data between databases.

3. Sensor Operators:

- Wait for a certain criteria to be met before allowing the DAG to proceed. For example, wait until a file is available in a directory.

Task:

- Definition:

- A task is an instance of an operator that represents a single occurrence of a unit of work within a DAG.

- Tasks are the building blocks of DAGs.

- Key Characteristics:

- Idempotent:

- Tasks should be idempotent, meaning running them multiple times has the same effect as running them once.

- Atomic:

- Tasks are designed to be atomic, representing a single unit of work.

DAG, Operator, and Task in the Context of the Example:

- DAG (`weather_data_pipeline.py`):

- Represents the entire workflow.

- Orchestrates the execution of tasks based on dependencies.

- Ensures a logical and ordered execution of the data pipeline.

- Operator (`PythonOperator`, `S3ToS3Operator`):

- `PythonOperator`: Executes a Python function (e.g., triggering Lambda).

- `S3ToS3Operator`: Transfers data between S3 buckets.

- Task (`trigger_lambda_task`, `store_in_s3_task`):

- `trigger_lambda_task`: Represents the task of triggering the Lambda function.

- `store_in_s3_task`: Represents the task of storing data in S3.

DAG Structure:

```python

# Example DAG structure

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from airflow.providers.amazon.transfers.s3_to_s3 import S3ToS3Operator

from datetime import datetime, timedelta

default_args = {

‘owner’: ‘airflow’,

‘depends_on_past’: False,

‘start_date’: datetime(2023, 1, 1),

‘retries’: 1,

‘retry_delay’: timedelta(minutes=5),

}

dag = DAG(

‘weather_data_pipeline’,

default_args=default_args,

description=’End-to-end weather data pipeline’,

schedule_interval=timedelta(days=1),

)

trigger_lambda_task = PythonOperator(

task_id=’trigger_lambda’,

python_callable=trigger_lambda_function,

provide_context=True,

dag=dag,

)

store_in_s3_task = S3ToS3Operator(

task_id=’store_in_s3',

source_bucket_name=’SOURCE_BUCKET’,

dest_bucket_name=’DEST_BUCKET’,

dest_prefix=’weather_data/’,

aws_conn_id=’aws_default’,

replace=True,

dag=dag,

)

trigger_lambda_task >> store_in_s3_task

```

In the example DAG, `trigger_lambda_task` and `store_in_s3_task` are tasks represented by the `PythonOperator` and `S3ToS3Operator`, respectively. The `>>` syntax denotes the dependency relationship between these tasks.

This DAG ensures that the Lambda function is triggered before storing data in S3, defining a clear execution flow. This structure adheres to the principles of Directed Acyclic Graphs, where tasks are executed in a logical sequence based on dependencies.

Steps:

1. Set Up OpenWeather API Key:

- Obtain an API key from the OpenWeather website.

2. Create AWS S3 Bucket:

- Create an S3 bucket to store the weather data.

3. Develop FastAPI Application:

- Create a FastAPI application in Python to extract and transform weather data.

- Expose an endpoint for Lambda to trigger.

4. Develop AWS Lambda Function:

- Create a Lambda function that triggers the FastAPI endpoint.

- Use the OpenWeather API to fetch weather data.

- Transform the data as needed.

5. Configure Apache Airflow:

- Install and configure Apache Airflow.

- Define a DAG that orchestrates the entire workflow.

6. Define Apache Airflow Tasks:

- Define tasks in the DAG to call the Lambda function and store the data in S3.

- Specify dependencies between tasks.

7. Run Apache Airflow Workflow:

- Trigger the Apache Airflow DAG to execute the defined tasks.

End-to-End Code:

Here’s a simplified example of how your code might look for the FastAPI application, Lambda function, and Apache Airflow DAG. Note that this is a basic illustration, and you may need to adapt it based on your specific requirements.

FastAPI Application (`fastapi_app.py`):

```python

from fastapi import FastAPI

app = FastAPI()

@app.get(“/weather”)

def get_weather():

# Call OpenWeather API and perform transformations

# Return transformed weather data

return {“message”: “Weather data transformed”}

```

AWS Lambda Function (`lambda_function.py`):

```python

import requests

def lambda_handler(event, context):

# Trigger FastAPI endpoint

response = requests.get(“FASTAPI_ENDPOINT/weather”)

weather_data = response.json()

# Perform additional processing

# …

# Store data in S3

# …

return {“statusCode”: 200, “body”: “Data processed and stored in S3”}

```

Apache Airflow DAG (`weather_data_pipeline.py`):

```python

from datetime import datetime, timedelta

from airflow import DAG

from airflow.operators.python_operator import PythonOperator

from airflow.providers.amazon.transfers.s3_to_s3 import S3ToS3Operator

default_args = {

‘owner’: ‘airflow’,

‘depends_on_past’: False,

‘start_date’: datetime(2023, 1, 1),

‘retries’: 1,

‘retry_delay’: timedelta(minutes=5),

}

dag = DAG(

‘weather_data_pipeline’,

default_args=default_args,

description=’End-to-end weather data pipeline’,

schedule_interval=timedelta(days=1),

)

def trigger_lambda_function(**kwargs):

# Trigger Lambda function

# …

trigger_lambda_task = PythonOperator(

task_id=’trigger_lambda’,

python_callable=trigger_lambda_function,

provide_context=True,

dag=dag,

)

store_in_s3_task = S3ToS3Operator(

task_id=’store_in_s3',

source_bucket_name=’SOURCE_BUCKET’,

dest_bucket_name=’DEST_BUCKET’,

dest_prefix=’weather_data/’,

aws_conn_id=’aws_default’,

replace=True,

dag=dag,

)

trigger_lambda_task >> store_in_s3_task

```

Please replace placeholders like `’FASTAPI_ENDPOINT’`, `’SOURCE_BUCKET’`, and `’DEST_BUCKET’` with your actual values.

Remember that this is a simplified example, and you may need to adapt it based on your specific use case, error handling, and additional requirements.

--

--

Think Different - Dhiraj Patra

I am a Software architect for AI, ML, IoT microservices cloud applications. Love to learn and share. https://dhirajpatra.github.io