Data Pipeline Patterns That Actually Scale

Building a network flow analytics pipeline with IPFIX, pmacct, Kafka, and ClickHouse that processes billions of flows and catches anomalies in real-time.

Jeremy Rossi··15 min read

Data Pipeline Patterns That Actually Scale

Most pipeline advice is abstract. "Use event-driven orchestration." "Make stages idempotent." Good principles, but what does it actually look like?

Let's build something real: a network flow analytics pipeline that ingests billions of flows per day and answers questions like "show me every new SSH connection in the last 24 hours that we've never seen before in the past 3 months."

The Architecture

Each component has a specific job:

ComponentRoleWhy This Choice
IPFIX/NetFlowFlow export from network devicesIndustry standard, supported everywhere
pmacctFlow collection and normalizationLightweight, battle-tested, flexible output
KafkaMessage queue and bufferDecouples ingestion from storage, handles backpressure
ClickHouseColumnar analytics databaseSub-second queries on billions of rows

The Critical Detail: 1:1 Unsampled Flows

Before we dive into the pipeline, there's one configuration detail that determines whether this entire system works or not: your network devices must export unsampled (1:1) flow data.

Most network equipment defaults to sampled IPFIX/NetFlow (1:1000 or 1:2048). Sampled flows are fine for capacity planning and bandwidth statistics. They're useless for security.

Why? With 1:1000 sampling, you're statistically guaranteed to miss:

  • Short-lived SSH connections (a few packets)
  • Port scans (one packet per port)
  • Lateral movement (brief connections between hosts)
  • Data exfiltration in small bursts

If your goal is "find every new SSH connection," sampling gives you "find approximately 0.1% of new SSH connections, maybe." That's not actionable intelligence. That's noise.

The conversation with your network team matters. Getting 1:1 flow export configured correctly on routers and switches is often the hardest part of this entire pipeline. The software is easy. Convincing network engineers to change IPFIX settings on production gear requires trust, testing, and proving the CPU/memory impact is acceptable.

On modern hardware (, , ), 1:1 flow export at 10Gbps+ is routine. On older gear or edge routers, you may need to be selective about which interfaces export unsampled flows.

Step 1: IPFIX Collection with pmacct

Network devices export flow records via IPFIX (the standardized successor to NetFlow). pmacct collects these flows and outputs them as JSON to Kafka.

pmacctd.conf

! pmacct configuration for IPFIX collection
daemonize: false
pcap_interface: eth0
 
! IPFIX/NetFlow listener
nfacctd_port: 9995
nfacctd_ip: 0.0.0.0
 
! Output to Kafka
plugins: kafka
 
! Kafka configuration
kafka_output: json
kafka_topic: network-flows
kafka_broker_host: kafka-1:9092,kafka-2:9092,kafka-3:9092
kafka_partition_by: src_ip
 
! Fields to capture
aggregate: src_host, dst_host, src_port, dst_port, proto, \
           bytes, packets, tos, timestamp_start, timestamp_end

Each flow record becomes a JSON message:

{
  "event_type": "flow",
  "src_ip": "10.1.50.23",
  "dst_ip": "192.168.1.100",
  "src_port": 52341,
  "dst_port": 22,
  "proto": "tcp",
  "bytes": 4523,
  "packets": 42,
  "timestamp_start": "2024-12-05T14:23:01Z",
  "timestamp_end": "2024-12-05T14:23:58Z"
}

Step 2: Kafka as the Backbone

Kafka sits between pmacct and ClickHouse for critical reasons:

  1. Backpressure handling: If ClickHouse slows down, Kafka buffers the data
  2. Replay capability: Need to reprocess? Replay from Kafka
  3. Multiple consumers: Feed the same flows to SIEM, alerting, analytics

Topic Configuration

kafka-topics.sh --create \
  --topic network-flows \
  --partitions 24 \
  --replication-factor 3 \
  --config retention.ms=21600000 \
  --config retention.bytes=107374182400

24 partitions with 6-hour retention and 100GB cap. With 1:1 unsampled flows, you're generating serious volume. Kafka's job here isn't long-term storage. It's a buffer for:

  • ClickHouse maintenance: A few hours to handle restarts, upgrades, or compaction
  • Consumer lag: Time to catch up if ingestion falls behind
  • Brief outages: Network blips, container restarts, failovers

If ClickHouse is down for more than 6 hours, you have bigger problems than lost flow data. Keep Kafka lean. ClickHouse is your durable store.

Step 3: ClickHouse for Analytics

ClickHouse is purpose-built for this workload: append-heavy writes, analytical queries across massive datasets, and columnar compression that turns terabytes into gigabytes.

Table Schema

CREATE TABLE network_flows
(
    timestamp DateTime64(3),
    src_ip IPv4,
    dst_ip IPv4,
    src_port UInt16,
    dst_port UInt16,
    proto Enum8('tcp' = 6, 'udp' = 17, 'icmp' = 1),
    bytes UInt64,
    packets UInt32,
 
    -- Materialized columns for faster queries
    date Date MATERIALIZED toDate(timestamp),
    hour UInt8 MATERIALIZED toHour(timestamp)
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (dst_port, src_ip, dst_ip, timestamp)
TTL timestamp + INTERVAL 6 MONTH
SETTINGS index_granularity = 8192;

Key design decisions:

  • Partitioned by month: Old data drops off automatically via TTL
  • Ordered by (dst_port, src_ip, dst_ip, timestamp): Optimizes queries filtering by destination port first
  • IPv4 type: 4 bytes instead of string, with native CIDR support

Kafka Integration

ClickHouse natively consumes from Kafka:

CREATE TABLE network_flows_queue
(
    timestamp DateTime64(3),
    src_ip IPv4,
    dst_ip IPv4,
    src_port UInt16,
    dst_port UInt16,
    proto String,
    bytes UInt64,
    packets UInt32
)
ENGINE = Kafka()
SETTINGS
    kafka_broker_list = 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
    kafka_topic_list = 'network-flows',
    kafka_group_name = 'clickhouse-flows',
    kafka_format = 'JSONEachRow',
    kafka_num_consumers = 4;
 
CREATE MATERIALIZED VIEW network_flows_mv TO network_flows AS
SELECT
    timestamp,
    src_ip,
    dst_ip,
    src_port,
    dst_port,
    CAST(proto AS Enum8('tcp' = 6, 'udp' = 17, 'icmp' = 1)) as proto,
    bytes,
    packets
FROM network_flows_queue;

Data flows automatically: Kafka topic to queue table to materialized view to final table.

The Payoff: Finding New SSH Connections

Now the powerful part. We want to find SSH connections in the last 24 hours that we've never seen before in 3 months.

This is a security use case: new SSH connections might indicate lateral movement, compromised credentials, or unauthorized access.

Step 1: Last 24 Hours of SSH Pairs

-- All unique src_ip/dst_ip pairs with SSH (port 22) in last 24 hours
SELECT DISTINCT
    src_ip,
    dst_ip
FROM network_flows
WHERE dst_port = 22
  AND timestamp >= now() - INTERVAL 24 HOUR

Step 2: Find First-Time Connections

WITH recent_ssh AS (
    -- SSH pairs from last 24 hours
    SELECT DISTINCT
        src_ip,
        dst_ip
    FROM network_flows
    WHERE dst_port = 22
      AND timestamp >= now() - INTERVAL 24 HOUR
),
historical_ssh AS (
    -- SSH pairs from 3 months ago to 24 hours ago
    SELECT DISTINCT
        src_ip,
        dst_ip
    FROM network_flows
    WHERE dst_port = 22
      AND timestamp >= now() - INTERVAL 3 MONTH
      AND timestamp < now() - INTERVAL 24 HOUR
)
SELECT
    r.src_ip,
    r.dst_ip,
    'NEW' as status
FROM recent_ssh r
LEFT JOIN historical_ssh h
    ON r.src_ip = h.src_ip AND r.dst_ip = h.dst_ip
WHERE h.src_ip IS NULL
ORDER BY r.src_ip, r.dst_ip;

On a dataset with 90 days of flows (billions of records), this query runs in under 2 seconds.

Step 3: Add Context

For investigation, you want more than just the pair. When did it first appear? How much traffic?

WITH recent_ssh AS (
    SELECT DISTINCT
        src_ip,
        dst_ip
    FROM network_flows
    WHERE dst_port = 22
      AND timestamp >= now() - INTERVAL 24 HOUR
),
historical_ssh AS (
    SELECT DISTINCT
        src_ip,
        dst_ip
    FROM network_flows
    WHERE dst_port = 22
      AND timestamp >= now() - INTERVAL 3 MONTH
      AND timestamp < now() - INTERVAL 24 HOUR
),
new_pairs AS (
    SELECT r.src_ip, r.dst_ip
    FROM recent_ssh r
    LEFT JOIN historical_ssh h
        ON r.src_ip = h.src_ip AND r.dst_ip = h.dst_ip
    WHERE h.src_ip IS NULL
)
SELECT
    np.src_ip,
    np.dst_ip,
    min(nf.timestamp) as first_seen,
    max(nf.timestamp) as last_seen,
    count(*) as connection_count,
    sum(nf.bytes) as total_bytes,
    sum(nf.packets) as total_packets
FROM new_pairs np
JOIN network_flows nf
    ON np.src_ip = nf.src_ip
   AND np.dst_ip = nf.dst_ip
   AND nf.dst_port = 22
   AND nf.timestamp >= now() - INTERVAL 24 HOUR
GROUP BY np.src_ip, np.dst_ip
ORDER BY first_seen DESC;

Output:

src_ipdst_ipfirst_seenlast_seenconnection_counttotal_bytestotal_packets
10.1.50.23192.168.1.1002024-12-05 14:23:012024-12-05 14:58:32745230412
10.2.30.15192.168.1.1002024-12-05 09:15:442024-12-05 09:15:441210418

Two new SSH relationships. The first one had 7 connections over 35 minutes with significant data transfer. The second was a single brief connection. Both warrant investigation.

Scaling Considerations

This architecture handles serious scale:

Ingestion Rate

  • pmacct: 100K+ flows/second per instance
  • Kafka: Millions of messages/second with proper partitioning
  • ClickHouse: 1M+ rows/second insert rate

Query Performance

The key is ClickHouse's columnar storage and the ORDER BY clause. Our query only reads:

  1. The dst_port column (to filter for port 22)
  2. The timestamp column (to filter time ranges)
  3. The src_ip and dst_ip columns (for the actual data)

Billions of rows, but only 4 columns touched. Columnar compression means those columns might be 10-50x smaller than row storage.

Storage

90 days of flow data from a mid-size enterprise network:

  • Raw JSON in Kafka: ~8 TB
  • Compressed in ClickHouse: ~400 GB

20:1 compression is typical for flow data with ClickHouse's LZ4 and delta encoding.

Operationalizing It

Automated Alerting

Run the "new SSH pairs" query every hour and alert on results:

import clickhouse_connect
from datetime import datetime
 
client = clickhouse_connect.get_client(host='clickhouse')
 
new_ssh_pairs = client.query("""
    WITH recent_ssh AS (...),  -- query from above
    ...
""").result_rows
 
if new_ssh_pairs:
    send_alert(
        channel="#security-alerts",
        message=f"Found {len(new_ssh_pairs)} new SSH relationships",
        details=new_ssh_pairs
    )

Dashboard

Build a Grafana dashboard showing:

  • New SSH pairs per hour (trend)
  • Top source IPs initiating new connections
  • Top destination IPs receiving new connections
  • Connection volume for flagged pairs

Investigation Workflow

When a new pair is flagged:

  1. Check the source: Is this a known admin workstation? A server that shouldn't be SSH'ing anywhere?
  2. Check the destination: Is this a production server? A jump host? Something in a DMZ?
  3. Check the timing: Business hours? 3 AM on a Sunday?
  4. Check the volume: Brief connection or sustained session?

Beyond SSH

The same pattern works for any "new relationship" detection:

-- New database connections (port 3306/5432)
WHERE dst_port IN (3306, 5432)
 
-- New SMB connections (port 445)
WHERE dst_port = 445
 
-- New connections to external IPs (cast to string for isIPAddressInRange)
WHERE NOT isIPAddressInRange(toString(dst_ip), '10.0.0.0/8')
  AND NOT isIPAddressInRange(toString(dst_ip), '172.16.0.0/12')
  AND NOT isIPAddressInRange(toString(dst_ip), '192.168.0.0/16')
 
-- New high-volume relationships (data exfiltration?)
HAVING total_bytes > 100000000  -- 100MB+

The pipeline is the foundation. The queries are where the value lives.

More Detection Patterns

Here are three more high-impact queries. Each includes allowlists for legitimate activity because real networks have security scanners, backup systems, and monitoring tools that would otherwise flood you with false positives.

Data Exfiltration Detection

Find internal hosts sending unusually large amounts of data to external IPs. Exclude known backup servers and SFTP gateways.

-- Hosts allowed to transfer large amounts of data externally
WITH allowed_large_transfers AS (
    SELECT arrayJoin([
        toIPv4('10.1.1.50'),   -- Backup server
        toIPv4('10.1.1.51'),   -- SFTP gateway
        toIPv4('10.1.2.10'),   -- Data sync server
        toIPv4('10.1.5.100')   -- Cloud sync agent
    ]) as ip
)
SELECT
    src_ip,
    dst_ip,
    sum(bytes) as total_bytes,
    count(*) as connection_count,
    min(timestamp) as first_seen,
    max(timestamp) as last_seen
FROM network_flows
WHERE timestamp >= now() - INTERVAL 24 HOUR
  AND src_ip BETWEEN toIPv4('10.0.0.0') AND toIPv4('10.255.255.255')
  AND NOT (dst_ip BETWEEN toIPv4('10.0.0.0') AND toIPv4('10.255.255.255'))
  AND src_ip NOT IN (SELECT ip FROM allowed_large_transfers)
GROUP BY src_ip, dst_ip
HAVING total_bytes > 100000000  -- 100MB+
ORDER BY total_bytes DESC
LIMIT 50

Port Scan Detection

Find sources hitting 20+ unique ports on a single destination in an hour. Exclude vulnerability scanners and IT admin workstations.

-- Security scanners and tools allowed to perform port scans
WITH allowed_scanners AS (
    SELECT arrayJoin([
        toIPv4('10.100.1.10'),  -- Nessus scanner
        toIPv4('10.100.1.11'),  -- Qualys appliance
        toIPv4('10.100.1.20'),  -- IT admin workstation
        toIPv4('10.100.2.5')    -- Ansible/config mgmt
    ]) as ip
)
SELECT
    src_ip,
    dst_ip,
    count(DISTINCT dst_port) as unique_ports,
    min(timestamp) as first_seen,
    max(timestamp) as last_seen
FROM network_flows
WHERE timestamp >= now() - INTERVAL 1 HOUR
  AND src_ip NOT IN (SELECT ip FROM allowed_scanners)
GROUP BY src_ip, dst_ip
HAVING unique_ports >= 20
ORDER BY unique_ports DESC

Beaconing / C2 Detection

Find connections with suspiciously regular intervals. Malware check-ins often beacon every N minutes with very consistent timing. Exclude monitoring systems and known cloud health checks.

-- Monitoring systems that beacon legitimately
WITH allowed_beacons AS (
    SELECT ip, port FROM (
        SELECT arrayJoin([
            (toIPv4('10.50.1.10'), toUInt16(443)),   -- Nagios/Zabbix
            (toIPv4('10.50.1.11'), toUInt16(443)),   -- Prometheus
            (toIPv4('10.50.1.20'), toUInt16(80)),    -- Uptime monitor
            (toIPv4('10.50.2.5'),  toUInt16(8443))   -- Pingdom agent
        ]) as t
    ) ARRAY JOIN t.1 as ip, t.2 as port
),
-- Known external services with regular check-ins
allowed_destinations AS (
    SELECT arrayJoin([
        toIPv4('13.107.4.50'),    -- Microsoft 365 health
        toIPv4('172.217.14.78'),  -- Google APIs
        toIPv4('52.94.236.0')     -- AWS health checks
    ]) as ip
),
connections AS (
    SELECT
        src_ip,
        dst_ip,
        dst_port,
        timestamp,
        dateDiff('second',
            lagInFrame(timestamp) OVER (
                PARTITION BY src_ip, dst_ip, dst_port
                ORDER BY timestamp
            ),
            timestamp
        ) as interval_seconds
    FROM network_flows
    WHERE timestamp >= now() - INTERVAL 24 HOUR
      AND src_ip NOT IN (SELECT ip FROM allowed_beacons)
      AND dst_ip NOT IN (SELECT ip FROM allowed_destinations)
)
SELECT
    src_ip,
    dst_ip,
    dst_port,
    count(*) as beacon_count,
    round(avg(interval_seconds)) as avg_interval_sec,
    round(stddevPop(interval_seconds), 2) as interval_stddev
FROM connections
WHERE interval_seconds IS NOT NULL AND interval_seconds > 0
GROUP BY src_ip, dst_ip, dst_port
HAVING
    beacon_count >= 10                        -- At least 10 connections
    AND avg_interval_sec BETWEEN 60 AND 3600  -- 1min to 1hr intervals
    AND interval_stddev < avg_interval_sec * 0.1  -- Very consistent timing
ORDER BY beacon_count DESC
LIMIT 50

Managing Allowlists at Scale

In production, hardcoded allowlists become unmaintainable. Move them to a dedicated table:

CREATE TABLE flow_allowlists (
    list_type Enum8('scanner' = 1, 'backup' = 2, 'monitor' = 3),
    ip IPv4,
    port UInt16 DEFAULT 0,
    description String,
    added_by String,
    added_at DateTime DEFAULT now()
) ENGINE = MergeTree()
ORDER BY (list_type, ip);
 
-- Add entries
INSERT INTO flow_allowlists VALUES
    ('scanner', '10.100.1.10', 0, 'Nessus vulnerability scanner', 'security-team', now()),
    ('backup', '10.1.1.50', 0, 'Primary backup server', 'infra-team', now()),
    ('monitor', '10.50.1.10', 443, 'Nagios server', 'ops-team', now());
 
-- Use in queries
WITH allowed AS (
    SELECT ip FROM flow_allowlists WHERE list_type = 'scanner'
)
SELECT ... WHERE src_ip NOT IN (SELECT ip FROM allowed)

This gives you audit trails, easy updates, and the ability to query "what's in our allowlist and why?"


Building network analytics at scale? Let's talk about architectures that find the needle in the haystack.

Related Posts