Data Pipeline Patterns That Actually Scale
Building a network flow analytics pipeline with IPFIX, pmacct, Kafka, and ClickHouse that processes billions of flows and catches anomalies in real-time.
Data Pipeline Patterns That Actually Scale
Most pipeline advice is abstract. "Use event-driven orchestration." "Make stages idempotent." Good principles, but what does it actually look like?
Let's build something real: a network flow analytics pipeline that ingests billions of flows per day and answers questions like "show me every new SSH connection in the last 24 hours that we've never seen before in the past 3 months."
The Architecture
Each component has a specific job:
| Component | Role | Why This Choice |
|---|---|---|
| IPFIX/NetFlow | Flow export from network devices | Industry standard, supported everywhere |
| pmacct | Flow collection and normalization | Lightweight, battle-tested, flexible output |
| Kafka | Message queue and buffer | Decouples ingestion from storage, handles backpressure |
| ClickHouse | Columnar analytics database | Sub-second queries on billions of rows |
The Critical Detail: 1:1 Unsampled Flows
Before we dive into the pipeline, there's one configuration detail that determines whether this entire system works or not: your network devices must export unsampled (1:1) flow data.
Most network equipment defaults to sampled IPFIX/NetFlow (1:1000 or 1:2048). Sampled flows are fine for capacity planning and bandwidth statistics. They're useless for security.
Why? With 1:1000 sampling, you're statistically guaranteed to miss:
- Short-lived SSH connections (a few packets)
- Port scans (one packet per port)
- Lateral movement (brief connections between hosts)
- Data exfiltration in small bursts
If your goal is "find every new SSH connection," sampling gives you "find approximately 0.1% of new SSH connections, maybe." That's not actionable intelligence. That's noise.
The conversation with your network team matters. Getting 1:1 flow export configured correctly on routers and switches is often the hardest part of this entire pipeline. The software is easy. Convincing network engineers to change IPFIX settings on production gear requires trust, testing, and proving the CPU/memory impact is acceptable.
On modern hardware (, , ), 1:1 flow export at 10Gbps+ is routine. On older gear or edge routers, you may need to be selective about which interfaces export unsampled flows.
Step 1: IPFIX Collection with pmacct
Network devices export flow records via IPFIX (the standardized successor to NetFlow). pmacct collects these flows and outputs them as JSON to Kafka.
pmacctd.conf
! pmacct configuration for IPFIX collection
daemonize: false
pcap_interface: eth0
! IPFIX/NetFlow listener
nfacctd_port: 9995
nfacctd_ip: 0.0.0.0
! Output to Kafka
plugins: kafka
! Kafka configuration
kafka_output: json
kafka_topic: network-flows
kafka_broker_host: kafka-1:9092,kafka-2:9092,kafka-3:9092
kafka_partition_by: src_ip
! Fields to capture
aggregate: src_host, dst_host, src_port, dst_port, proto, \
bytes, packets, tos, timestamp_start, timestamp_endEach flow record becomes a JSON message:
{
"event_type": "flow",
"src_ip": "10.1.50.23",
"dst_ip": "192.168.1.100",
"src_port": 52341,
"dst_port": 22,
"proto": "tcp",
"bytes": 4523,
"packets": 42,
"timestamp_start": "2024-12-05T14:23:01Z",
"timestamp_end": "2024-12-05T14:23:58Z"
}Step 2: Kafka as the Backbone
Kafka sits between pmacct and ClickHouse for critical reasons:
- Backpressure handling: If ClickHouse slows down, Kafka buffers the data
- Replay capability: Need to reprocess? Replay from Kafka
- Multiple consumers: Feed the same flows to SIEM, alerting, analytics
Topic Configuration
kafka-topics.sh --create \
--topic network-flows \
--partitions 24 \
--replication-factor 3 \
--config retention.ms=21600000 \
--config retention.bytes=10737418240024 partitions with 6-hour retention and 100GB cap. With 1:1 unsampled flows, you're generating serious volume. Kafka's job here isn't long-term storage. It's a buffer for:
- ClickHouse maintenance: A few hours to handle restarts, upgrades, or compaction
- Consumer lag: Time to catch up if ingestion falls behind
- Brief outages: Network blips, container restarts, failovers
If ClickHouse is down for more than 6 hours, you have bigger problems than lost flow data. Keep Kafka lean. ClickHouse is your durable store.
Step 3: ClickHouse for Analytics
ClickHouse is purpose-built for this workload: append-heavy writes, analytical queries across massive datasets, and columnar compression that turns terabytes into gigabytes.
Table Schema
CREATE TABLE network_flows
(
timestamp DateTime64(3),
src_ip IPv4,
dst_ip IPv4,
src_port UInt16,
dst_port UInt16,
proto Enum8('tcp' = 6, 'udp' = 17, 'icmp' = 1),
bytes UInt64,
packets UInt32,
-- Materialized columns for faster queries
date Date MATERIALIZED toDate(timestamp),
hour UInt8 MATERIALIZED toHour(timestamp)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (dst_port, src_ip, dst_ip, timestamp)
TTL timestamp + INTERVAL 6 MONTH
SETTINGS index_granularity = 8192;Key design decisions:
- Partitioned by month: Old data drops off automatically via TTL
- Ordered by (dst_port, src_ip, dst_ip, timestamp): Optimizes queries filtering by destination port first
- IPv4 type: 4 bytes instead of string, with native CIDR support
Kafka Integration
ClickHouse natively consumes from Kafka:
CREATE TABLE network_flows_queue
(
timestamp DateTime64(3),
src_ip IPv4,
dst_ip IPv4,
src_port UInt16,
dst_port UInt16,
proto String,
bytes UInt64,
packets UInt32
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
kafka_topic_list = 'network-flows',
kafka_group_name = 'clickhouse-flows',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE MATERIALIZED VIEW network_flows_mv TO network_flows AS
SELECT
timestamp,
src_ip,
dst_ip,
src_port,
dst_port,
CAST(proto AS Enum8('tcp' = 6, 'udp' = 17, 'icmp' = 1)) as proto,
bytes,
packets
FROM network_flows_queue;Data flows automatically: Kafka topic to queue table to materialized view to final table.
The Payoff: Finding New SSH Connections
Now the powerful part. We want to find SSH connections in the last 24 hours that we've never seen before in 3 months.
This is a security use case: new SSH connections might indicate lateral movement, compromised credentials, or unauthorized access.
Step 1: Last 24 Hours of SSH Pairs
-- All unique src_ip/dst_ip pairs with SSH (port 22) in last 24 hours
SELECT DISTINCT
src_ip,
dst_ip
FROM network_flows
WHERE dst_port = 22
AND timestamp >= now() - INTERVAL 24 HOURStep 2: Find First-Time Connections
WITH recent_ssh AS (
-- SSH pairs from last 24 hours
SELECT DISTINCT
src_ip,
dst_ip
FROM network_flows
WHERE dst_port = 22
AND timestamp >= now() - INTERVAL 24 HOUR
),
historical_ssh AS (
-- SSH pairs from 3 months ago to 24 hours ago
SELECT DISTINCT
src_ip,
dst_ip
FROM network_flows
WHERE dst_port = 22
AND timestamp >= now() - INTERVAL 3 MONTH
AND timestamp < now() - INTERVAL 24 HOUR
)
SELECT
r.src_ip,
r.dst_ip,
'NEW' as status
FROM recent_ssh r
LEFT JOIN historical_ssh h
ON r.src_ip = h.src_ip AND r.dst_ip = h.dst_ip
WHERE h.src_ip IS NULL
ORDER BY r.src_ip, r.dst_ip;On a dataset with 90 days of flows (billions of records), this query runs in under 2 seconds.
Step 3: Add Context
For investigation, you want more than just the pair. When did it first appear? How much traffic?
WITH recent_ssh AS (
SELECT DISTINCT
src_ip,
dst_ip
FROM network_flows
WHERE dst_port = 22
AND timestamp >= now() - INTERVAL 24 HOUR
),
historical_ssh AS (
SELECT DISTINCT
src_ip,
dst_ip
FROM network_flows
WHERE dst_port = 22
AND timestamp >= now() - INTERVAL 3 MONTH
AND timestamp < now() - INTERVAL 24 HOUR
),
new_pairs AS (
SELECT r.src_ip, r.dst_ip
FROM recent_ssh r
LEFT JOIN historical_ssh h
ON r.src_ip = h.src_ip AND r.dst_ip = h.dst_ip
WHERE h.src_ip IS NULL
)
SELECT
np.src_ip,
np.dst_ip,
min(nf.timestamp) as first_seen,
max(nf.timestamp) as last_seen,
count(*) as connection_count,
sum(nf.bytes) as total_bytes,
sum(nf.packets) as total_packets
FROM new_pairs np
JOIN network_flows nf
ON np.src_ip = nf.src_ip
AND np.dst_ip = nf.dst_ip
AND nf.dst_port = 22
AND nf.timestamp >= now() - INTERVAL 24 HOUR
GROUP BY np.src_ip, np.dst_ip
ORDER BY first_seen DESC;Output:
| src_ip | dst_ip | first_seen | last_seen | connection_count | total_bytes | total_packets |
|---|---|---|---|---|---|---|
| 10.1.50.23 | 192.168.1.100 | 2024-12-05 14:23:01 | 2024-12-05 14:58:32 | 7 | 45230 | 412 |
| 10.2.30.15 | 192.168.1.100 | 2024-12-05 09:15:44 | 2024-12-05 09:15:44 | 1 | 2104 | 18 |
Two new SSH relationships. The first one had 7 connections over 35 minutes with significant data transfer. The second was a single brief connection. Both warrant investigation.
Scaling Considerations
This architecture handles serious scale:
Ingestion Rate
- pmacct: 100K+ flows/second per instance
- Kafka: Millions of messages/second with proper partitioning
- ClickHouse: 1M+ rows/second insert rate
Query Performance
The key is ClickHouse's columnar storage and the ORDER BY clause. Our query only reads:
- The
dst_portcolumn (to filter for port 22) - The
timestampcolumn (to filter time ranges) - The
src_ipanddst_ipcolumns (for the actual data)
Billions of rows, but only 4 columns touched. Columnar compression means those columns might be 10-50x smaller than row storage.
Storage
90 days of flow data from a mid-size enterprise network:
- Raw JSON in Kafka: ~8 TB
- Compressed in ClickHouse: ~400 GB
20:1 compression is typical for flow data with ClickHouse's LZ4 and delta encoding.
Operationalizing It
Automated Alerting
Run the "new SSH pairs" query every hour and alert on results:
import clickhouse_connect
from datetime import datetime
client = clickhouse_connect.get_client(host='clickhouse')
new_ssh_pairs = client.query("""
WITH recent_ssh AS (...), -- query from above
...
""").result_rows
if new_ssh_pairs:
send_alert(
channel="#security-alerts",
message=f"Found {len(new_ssh_pairs)} new SSH relationships",
details=new_ssh_pairs
)Dashboard
Build a Grafana dashboard showing:
- New SSH pairs per hour (trend)
- Top source IPs initiating new connections
- Top destination IPs receiving new connections
- Connection volume for flagged pairs
Investigation Workflow
When a new pair is flagged:
- Check the source: Is this a known admin workstation? A server that shouldn't be SSH'ing anywhere?
- Check the destination: Is this a production server? A jump host? Something in a DMZ?
- Check the timing: Business hours? 3 AM on a Sunday?
- Check the volume: Brief connection or sustained session?
Beyond SSH
The same pattern works for any "new relationship" detection:
-- New database connections (port 3306/5432)
WHERE dst_port IN (3306, 5432)
-- New SMB connections (port 445)
WHERE dst_port = 445
-- New connections to external IPs (cast to string for isIPAddressInRange)
WHERE NOT isIPAddressInRange(toString(dst_ip), '10.0.0.0/8')
AND NOT isIPAddressInRange(toString(dst_ip), '172.16.0.0/12')
AND NOT isIPAddressInRange(toString(dst_ip), '192.168.0.0/16')
-- New high-volume relationships (data exfiltration?)
HAVING total_bytes > 100000000 -- 100MB+The pipeline is the foundation. The queries are where the value lives.
More Detection Patterns
Here are three more high-impact queries. Each includes allowlists for legitimate activity because real networks have security scanners, backup systems, and monitoring tools that would otherwise flood you with false positives.
Data Exfiltration Detection
Find internal hosts sending unusually large amounts of data to external IPs. Exclude known backup servers and SFTP gateways.
-- Hosts allowed to transfer large amounts of data externally
WITH allowed_large_transfers AS (
SELECT arrayJoin([
toIPv4('10.1.1.50'), -- Backup server
toIPv4('10.1.1.51'), -- SFTP gateway
toIPv4('10.1.2.10'), -- Data sync server
toIPv4('10.1.5.100') -- Cloud sync agent
]) as ip
)
SELECT
src_ip,
dst_ip,
sum(bytes) as total_bytes,
count(*) as connection_count,
min(timestamp) as first_seen,
max(timestamp) as last_seen
FROM network_flows
WHERE timestamp >= now() - INTERVAL 24 HOUR
AND src_ip BETWEEN toIPv4('10.0.0.0') AND toIPv4('10.255.255.255')
AND NOT (dst_ip BETWEEN toIPv4('10.0.0.0') AND toIPv4('10.255.255.255'))
AND src_ip NOT IN (SELECT ip FROM allowed_large_transfers)
GROUP BY src_ip, dst_ip
HAVING total_bytes > 100000000 -- 100MB+
ORDER BY total_bytes DESC
LIMIT 50Port Scan Detection
Find sources hitting 20+ unique ports on a single destination in an hour. Exclude vulnerability scanners and IT admin workstations.
-- Security scanners and tools allowed to perform port scans
WITH allowed_scanners AS (
SELECT arrayJoin([
toIPv4('10.100.1.10'), -- Nessus scanner
toIPv4('10.100.1.11'), -- Qualys appliance
toIPv4('10.100.1.20'), -- IT admin workstation
toIPv4('10.100.2.5') -- Ansible/config mgmt
]) as ip
)
SELECT
src_ip,
dst_ip,
count(DISTINCT dst_port) as unique_ports,
min(timestamp) as first_seen,
max(timestamp) as last_seen
FROM network_flows
WHERE timestamp >= now() - INTERVAL 1 HOUR
AND src_ip NOT IN (SELECT ip FROM allowed_scanners)
GROUP BY src_ip, dst_ip
HAVING unique_ports >= 20
ORDER BY unique_ports DESCBeaconing / C2 Detection
Find connections with suspiciously regular intervals. Malware check-ins often beacon every N minutes with very consistent timing. Exclude monitoring systems and known cloud health checks.
-- Monitoring systems that beacon legitimately
WITH allowed_beacons AS (
SELECT ip, port FROM (
SELECT arrayJoin([
(toIPv4('10.50.1.10'), toUInt16(443)), -- Nagios/Zabbix
(toIPv4('10.50.1.11'), toUInt16(443)), -- Prometheus
(toIPv4('10.50.1.20'), toUInt16(80)), -- Uptime monitor
(toIPv4('10.50.2.5'), toUInt16(8443)) -- Pingdom agent
]) as t
) ARRAY JOIN t.1 as ip, t.2 as port
),
-- Known external services with regular check-ins
allowed_destinations AS (
SELECT arrayJoin([
toIPv4('13.107.4.50'), -- Microsoft 365 health
toIPv4('172.217.14.78'), -- Google APIs
toIPv4('52.94.236.0') -- AWS health checks
]) as ip
),
connections AS (
SELECT
src_ip,
dst_ip,
dst_port,
timestamp,
dateDiff('second',
lagInFrame(timestamp) OVER (
PARTITION BY src_ip, dst_ip, dst_port
ORDER BY timestamp
),
timestamp
) as interval_seconds
FROM network_flows
WHERE timestamp >= now() - INTERVAL 24 HOUR
AND src_ip NOT IN (SELECT ip FROM allowed_beacons)
AND dst_ip NOT IN (SELECT ip FROM allowed_destinations)
)
SELECT
src_ip,
dst_ip,
dst_port,
count(*) as beacon_count,
round(avg(interval_seconds)) as avg_interval_sec,
round(stddevPop(interval_seconds), 2) as interval_stddev
FROM connections
WHERE interval_seconds IS NOT NULL AND interval_seconds > 0
GROUP BY src_ip, dst_ip, dst_port
HAVING
beacon_count >= 10 -- At least 10 connections
AND avg_interval_sec BETWEEN 60 AND 3600 -- 1min to 1hr intervals
AND interval_stddev < avg_interval_sec * 0.1 -- Very consistent timing
ORDER BY beacon_count DESC
LIMIT 50Managing Allowlists at Scale
In production, hardcoded allowlists become unmaintainable. Move them to a dedicated table:
CREATE TABLE flow_allowlists (
list_type Enum8('scanner' = 1, 'backup' = 2, 'monitor' = 3),
ip IPv4,
port UInt16 DEFAULT 0,
description String,
added_by String,
added_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (list_type, ip);
-- Add entries
INSERT INTO flow_allowlists VALUES
('scanner', '10.100.1.10', 0, 'Nessus vulnerability scanner', 'security-team', now()),
('backup', '10.1.1.50', 0, 'Primary backup server', 'infra-team', now()),
('monitor', '10.50.1.10', 443, 'Nagios server', 'ops-team', now());
-- Use in queries
WITH allowed AS (
SELECT ip FROM flow_allowlists WHERE list_type = 'scanner'
)
SELECT ... WHERE src_ip NOT IN (SELECT ip FROM allowed)This gives you audit trails, easy updates, and the ability to query "what's in our allowlist and why?"
Building network analytics at scale? Let's talk about architectures that find the needle in the haystack.
Related Posts
Zero Trust: The Gap Between Strategy and Implementation
Most zero trust initiatives stall in planning. Here's what actually works when you need to ship real security architecture.
Bytes of Wisdom #40: Again We Meet
Apple's latest updates, TikTok's societal impact, MetaConnect and the future of VR, plus thoughts on maintaining genuine friendships in our digital world.
Welcome to BDE: Making Tech Boring Since 2020
Why we started Binary Data Engineers and what 'making tech boring' actually means for enterprise technology.