Data pipelines are the backbone of data-driven organizations. Whether batch or streaming, well-designed pipelines ensure data quality, reliability, and timely insights. This guide covers modern patterns for building production data infrastructure.
Batch Pipeline with Airflow
# Airflow DAG for data pipeline
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'daily_sales_pipeline',
default_args=default_args,
description='Daily sales data ETL',
schedule_interval='0 6 * * *', # 6 AM daily
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['sales', 'etl'],
)
# Extract
def extract_sales_data(**context):
execution_date = context['ds']
# Extract from source systems
sales_data = api_client.get_sales(date=execution_date)
# Save to staging
s3.upload_json(f's3://data-lake/staging/sales/{execution_date}.json', sales_data)
return len(sales_data)
extract_task = PythonOperator(
task_id='extract_sales',
python_callable=extract_sales_data,
dag=dag,
)
# Transform
def transform_sales_data(**context):
execution_date = context['ds']
# Load staging data
raw_data = s3.download_json(f's3://data-lake/staging/sales/{execution_date}.json')
# Transform
transformed = []
for record in raw_data:
transformed.append({
'order_id': record['id'],
'customer_id': record['customer']['id'],
'total_amount': float(record['total']),
'currency': record.get('currency', 'USD'),
'order_date': execution_date,
'items_count': len(record['items']),
})
# Save transformed data
s3.upload_parquet(
f's3://data-lake/processed/sales/{execution_date}.parquet',
transformed
)
transform_task = PythonOperator(
task_id='transform_sales',
python_callable=transform_sales_data,
dag=dag,
)
# Load
load_task = S3ToRedshiftOperator(
task_id='load_to_redshift',
schema='analytics',
table='sales_facts',
s3_bucket='data-lake',
s3_key='processed/sales/{{ ds }}.parquet',
copy_options=['PARQUET'],
dag=dag,
)
extract_task >> transform_task >> load_taskStreaming with Kafka
// Kafka streaming pipeline
import { Kafka, Consumer, Producer } from 'kafkajs';
const kafka = new Kafka({
clientId: 'data-processor',
brokers: [process.env.KAFKA_BROKER!],
});
const consumer = kafka.consumer({ groupId: 'sales-processor' });
const producer = kafka.producer();
async function processStream() {
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: 'raw-events', fromBeginning: false });
await consumer.run({
eachBatch: async ({ batch, heartbeat }) => {
const transformedEvents = [];
for (const message of batch.messages) {
const event = JSON.parse(message.value!.toString());
// Transform
const transformed = {
eventId: event.id,
eventType: event.type,
userId: event.user_id,
timestamp: new Date(event.timestamp).toISOString(),
data: enrichEvent(event),
};
transformedEvents.push({
key: event.user_id,
value: JSON.stringify(transformed),
});
await heartbeat();
}
// Produce to processed topic
await producer.send({
topic: 'processed-events',
messages: transformedEvents,
});
},
});
}
processStream().catch(console.error);Data Quality
Data Quality Best Practices
Validation:
- Schema validation at ingestion
- Business rule checks
- Null and duplicate detection
Monitoring:
- Data freshness SLAs
- Row count anomaly detection
- Schema change alerts
Recovery:
- Idempotent processing
- Dead letter queues
- Point-in-time recovery
Conclusion
Modern data pipelines combine batch and streaming approaches based on requirements. Focus on data quality, observability, and idempotent processing for reliable data infrastructure.
Need help building data pipelines? Contact Jishu Labs for expert data engineering consulting.
About David Kumar
David Kumar is the DevOps Lead at Jishu Labs with expertise in data engineering and pipeline architecture.