Inspired by Command-line Tools can be 235x Faster than your Hadoop Cluster and personal experience over years with quick data scans.
Modern server hardware excels at parallel data processing across all cores. 48–96 cores are not uncommon nowadays, and with 10–40 Gb/s networking, a single box can process up to 5 GB/s—roughly 1 million 5 KB messages per second.
Modern server hardware excels at parallel data processing across all cores. 48–96 cores are not uncommon nowadays, and with 10–40 Gb/s networking, a single box can process up to 5 GB/s—roughly 1 million 5 KB messages per second.
Python's multiprocessing module works but adds significant overhead. A lighter alternative is to use mature Unix tools like grep and jq.
Key principle: avoid JSON parsing unless essential, as it is notoriously CPU-intensive.
One effective technique is prefiltering with grep to reduce load on jq:
kcat -C -t events | grep 386dd3999e55 | jq -c 'select(.user_id == "386dd3999e55")'
For parallelism, xargs -P N launches N independent processes. It works well for embarrassingly parallel workloads (e.g., processing separate files or Kafka partitions), but not directly for a single streaming pipeline—each process would need its own input source and output.
This distributes input chunks across 48 cores efficiently with minimal overhead.
import os import tempfile import subprocess def run(command): output = subprocess.check_output( command, shell=True, text=True ) return output.strip() def make_parallel(command: str, n: int): temp_dir = tempfile.mkdtemp(prefix="parallel_shell.") os.makedirs(temp_dir, exist_ok=True) command_file = f"{temp_dir}/command.sh" with open(command_file, "w") as fh: fh.write(f"({command}) > {temp_dir}/part_$1.out") new_cmd = f"seq {n} | xargs -P {n} -I{{}} sh {command_file} {{}} _ ; cat {temp_dir}/part_*.out" # cat {temp_dir}/part_*.out to join output from multiple commands return new_cmd command = """kcat -C -G test_20260102_152742 -c 1000000 -o end -t events | grep 386dd3999e55 | jq -c 'select(.user_id == "386dd3999e55")'""" # process 48M messages in parallel parallel_command = make_parallel(command, 48) out = run(parallel_command)
Bonus point: the whole pipeline could be tested with other sources, i.e. local files with cat data/events/*.log
command = """cat data/events/*.log | grep 386dd3999e55 | jq -c 'select(.user_id == "386dd3999e55")'""" parallel_command = make_parallel(command, 1) # run only once out = run(parallel_command)
Best case is when the output is significantly smaller than input, 48M -> 1K
Disclaimer
Full streaming platforms (Kafka Streams, Spark, Databricks) exist but add orders of magnitude more cost and overhead compared to direct multicore Unix tools on bare metal.