Batch processing runs on schedules. Streaming processes events as they happen. The shift from “nightly ETL” to “process in real-time” changes everything: your architecture, your failure modes, your data contracts, and your operational complexity. This guide covers the engineering decisions that separate demo-quality streaming from production systems processing millions of events per second.
Streaming Architecture Patterns
Event Streaming Pipeline
Data Sources Stream Processing Consumers
┌──────────┐ ┌──────────────┐ ┌──────────┐
│ Web Apps ├──┐ │ │ ┌─────▶│ Real-time│
│ (clicks) │ │ │ Apache │ │ │ Dashboard│
├──────────┤ │ Kafka │ Flink │ │ ├──────────┤
│ IoT ├──┼────────▶│ ├─────┤ │ Alerting │
│ Sensors │ │ Topics │ (transform, │ │ │ System │
├──────────┤ │ │ aggregate, │ │ ├──────────┤
│ Database ├──┘ │ enrich) │ └─────▶│ Data Lake│
│ CDC │ │ │ │ (archive)│
└──────────┘ └──────────────┘ └──────────┘
Apache Kafka Deep Dive
Topic Design
| Pattern | Naming | Partitions | Retention |
|---|
| Event sourcing | domain.entity.event | By entity ID | Infinite (compacted) |
| Change data capture | cdc.database.table | By primary key | 7-30 days |
| Metrics | metrics.service.type | By service ID | 24-72 hours |
| Commands | commands.service.action | By correlation ID | Short (24 hours) |
| Dead letter | dlq.consumer-group.topic | Low (1-3) | 30 days |
Producer Configuration
from confluent_kafka import Producer
import json
producer_config = {
"bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092",
"acks": "all", # Wait for all replicas
"enable.idempotence": True, # Exactly-once semantics
"max.in.flight.requests.per.connection": 5,
"retries": 2147483647, # Retry indefinitely
"compression.type": "lz4", # Balance speed vs size
"linger.ms": 5, # Batch for 5ms before sending
"batch.size": 32768, # 32KB batch size
}
producer = Producer(producer_config)
def produce_event(topic, key, event):
"""Produce event with guaranteed delivery."""
producer.produce(
topic=topic,
key=json.dumps(key).encode("utf-8"),
value=json.dumps(event).encode("utf-8"),
callback=delivery_report,
)
producer.poll(0) # Trigger callbacks
def delivery_report(err, msg):
if err:
logger.error(f"Delivery failed: {err}")
# Push to local retry queue
else:
logger.debug(f"Delivered to {msg.topic()} [{msg.partition()}]")
Consumer Patterns
from confluent_kafka import Consumer
consumer_config = {
"bootstrap.servers": "kafka-1:9092,kafka-2:9092,kafka-3:9092",
"group.id": "order-processor",
"auto.offset.reset": "earliest", # Start from beginning if no offset
"enable.auto.commit": False, # Manual commit for exactly-once
"max.poll.interval.ms": 300000, # 5 min max processing time
"session.timeout.ms": 45000, # Detect dead consumers
}
consumer = Consumer(consumer_config)
consumer.subscribe(["orders.created"])
def process_events():
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
handle_error(msg.error())
continue
try:
event = json.loads(msg.value())
process_order(event)
# Commit only after successful processing
consumer.commit(message=msg)
except Exception as e:
# Send to dead letter queue
produce_to_dlq(msg, str(e))
consumer.commit(message=msg) # Don't reprocess bad messages
Stream Processing with Flink
# PyFlink: Real-time order aggregation
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table.window import Tumble
env_settings = EnvironmentSettings.in_streaming_mode()
t_env = TableEnvironment.create(env_settings)
# Define source (Kafka)
t_env.execute_sql("""
CREATE TABLE orders (
order_id STRING,
customer_id STRING,
amount DECIMAL(10,2),
category STRING,
event_time TIMESTAMP(3),
WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'orders.created',
'properties.bootstrap.servers' = 'kafka:9092',
'format' = 'json'
)
""")
# Windowed aggregation: revenue per category per 5 minutes
t_env.execute_sql("""
SELECT
window_start,
window_end,
category,
COUNT(*) as order_count,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(event_time), INTERVAL '5' MINUTES)
)
GROUP BY window_start, window_end, category
""")
Exactly-Once Processing
| Guarantee | Meaning | Implementation |
|---|
| At-most-once | May lose events | Auto-commit offsets before processing |
| At-least-once | May duplicate events | Commit after processing + idempotent consumers |
| Exactly-once | No loss, no duplication | Kafka transactions + idempotent sinks |
Idempotent Consumer Pattern
def process_order_idempotent(event):
"""Process order exactly once using idempotency key."""
event_id = event["event_id"]
# Check if already processed
if redis.sismember("processed_events", event_id):
logger.info(f"Duplicate event {event_id}, skipping")
return
# Process within transaction
with database.transaction():
create_order(event)
redis.sadd("processed_events", event_id)
redis.expire(f"processed_events", 86400 * 7) # 7-day window
Windowing Strategies
| Window Type | Use Case | Example |
|---|
| Tumbling | Fixed, non-overlapping intervals | ”Orders per 5-minute window” |
| Sliding | Overlapping, moving windows | ”Average over last 10 min, updated every 1 min” |
| Session | Activity-based, gap-triggered | ”User session = events within 30 min of each other” |
| Global | Accumulate all events | ”Running total since start” |
Anti-Patterns
| Anti-Pattern | Problem | Fix |
|---|
| Processing in the producer | Producer becomes bottleneck | Produce raw events, process in consumers |
| No schema registry | Breaking changes crash consumers | Avro/Protobuf + Schema Registry with compatibility |
| Auto-commit offsets | Data loss on consumer crash | Manual commit after successful processing |
| Single partition topics | No parallelism, max 1 consumer | Partition by key for parallel processing |
| No dead letter queue | Bad events block the entire pipeline | Route failed events to DLQ with metadata |
| No backpressure | Consumer overwhelmed, OOM crashes | Rate limiting, consumer lag monitoring |
Checklist
:::note[Source]
This guide is derived from operational intelligence at Garnet Grid Consulting. For streaming architecture consulting, visit garnetgrid.com.
:::