Advanced Lesson
Working With Files, Streams & Large Datasets
Master efficient file handling, streaming patterns, and processing massive datasets that don't fit in memory.
π What You'll Learn
This lesson teaches professional data handling techniques used in:
- Machine learning systems & data pipelines
- Analytics & ETL jobs
- Log processing & monitoring systems
- Web scrapers & API integrations
- Large-scale data transformations
You'll learn how to:
- Stream files without loading them into memory
- Process multi-gigabyte datasets efficiently
- Build composable data pipelines
- Handle compressed and binary data
- Implement parallel file processing
π₯ Python Download & Setup
Download Python from: python.org/downloads
Latest version recommended (3.11+)
Part 1: File I/O Fundamentals & Streaming Basics
1. Python's File I/O Model (Text vs Binary, Buffered vs Unbuffered)
- β’ Raw I/O = Receiving individual items one by one (slow)
- β’ Buffered I/O = Receiving full pallets at a time (efficient)
- β’ Text I/O = Unpacking and translating foreign labels into English
When you open a file, Python uses a layered I/O system:
| Layer | What It Does | Speed |
|---|---|---|
| Raw I/O | Direct interaction with OS file descriptors | Slowest |
| Buffered I/O | Uses memory buffers (reads/writes in chunks) | Fast |
| Text I/O | Decodes bytes β Unicode strings | Automatic encoding |
Basic File Opening
Open and read a file with encoding
with open("data.txt", "r", encoding="utf-8") as f:
content = f.read()
print(content)Modes:
"r"β read"w"β write (overwrite)"a"β append"b"β binary mode"rb"β read binary"wb"β write binary
Buffering matters: Python reads data in chunks internally to reduce syscalls, making it ideal for large datasets.
2. Reading Files the Right Way (Avoid .read() on Large Files)
.read() on a large file is like trying toeat an entire buffet in one bite β you'll choke! Streaming line-by-line is likeeating one plate at a time β sustainable and won't overwhelm you.Beginners often do:
Bad Practice
Loading entire file into memory
data = open("log.txt").read()
print(data[:100])This loads the entire file into memory, which is disastrous for large logs.
Instead, stream line-by-line:
Good Practice
Stream file line by line
with open("log.txt") as f:
for line in f:
# Process each line without loading full file
print(line.strip())| Approach | Memory Usage | For 10GB File |
|---|---|---|
| f.read() | Entire file in RAM | 10GB+ RAM needed π₯ |
| for line in f: | One line at a time | ~KB of RAM β |
Advantages:
- β No RAM explosion
- β Pause/resume possible
- β Efficient buffering
- β Ideal for multi-GB files
3. Using Buffered Streams Explicitly
For very large binary files (videos, models, audio):
Buffered Reader
Read large binary files with buffering
from io import BufferedReader
with open("video.mp4", "rb") as f:
stream = BufferedReader(f)
chunk = stream.read(4096)
print(f"Read {len(chunk)} bytes")4. Writing Files Efficiently (Avoid Tiny Writes)
Frequent small writes slow everything down.
Efficient Writing
Buffer writes for efficiency
from io import StringIO
items = ["apple", "banana", "cherry", "date"]
buffer = StringIO()
for item in items:
buffer.write(item + "\n")
with open("output.txt", "w") as f:
f.write(buffer.getvalue())
print("Written to file!")5. Working With CSV Files at Scale
Python's csv module supports streaming:
CSV Streaming
Stream CSV files row by row
import csv
with open("data.csv") as f:
reader = csv.reader(f)
for row in reader:
# Process each row
print(row)Never convert a large CSV into a list:
β rows = list(reader) β loads everything into RAM6. Handling JSON Without Crashing RAM
Streaming JSON
Process large JSON files incrementally
import json
# For line-delimited JSON (NDJSON)
# Each line is a separate JSON object
sample_ndjson = '''{"name": "Alice", "age": 30}
{"name": "Bob", "age": 25}
{"name": "Charlie", "age": 35}'''
for line in sample_ndjson.strip().split("\n"):
item = json.loads(line)
print(f"Processing: {item['name']}")7. Working With Very Large Binary Files
Chunk Reading
Read large binary files in chunks
# Simulate chunked reading
data = b"X" * 5000 # 5KB of data
chunk_size = 1024
offset = 0
while offset < len(data):
chunk = data[offset:offset + chunk_size]
print(f"Processing chunk at offset {offset}, size {len(chunk)}")
offset += chunk_size8. Memory Mapping (mmap) β Fast Random Access to Huge Files
Memory Mapping
Access file bytes without loading entire file
import mmap
# Simulated memory-mapped access pattern
# In real use: mm = mmap.mmap(f.fileno(), 0)
print("Memory mapping allows:")
print("- Instant random access to any position")
print("- No RAM explosion for huge files")
print("- Extremely fast reads")
print("- Ideal for ML datasets and databases")
# Example pattern:
# with open("huge.dat", "rb") as f:
# mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
# print(mm[1000:1010]) # read 10 bytes at offset 1000| Method | Random Access Speed | Memory Usage |
|---|---|---|
| f.seek() + f.read() | Medium | Low (read size) |
| f.read() entire file | Fast (after load) | File size π₯ |
| mmap | Instant | Near zero |
9. Chunk Processing for Massive Datasets
Chunk Generator
Yield file chunks using a generator
def read_in_chunks(data, size=1024):
"""Generator that yields chunks of data."""
offset = 0
while offset < len(data):
yield data[offset:offset + size]
offset += size
# Demo with sample data
sample = "X" * 5000
for i, chunk in enumerate(read_in_chunks(sample, 1000)):
print(f"Chunk {i}: {len(chunk)} chars")10. Using Iterators & Generators in Data Pipelines
Generator Pipeline
Build streaming data pipelines
def read_lines(lines):
for line in lines:
yield line
def filter_errors(lines):
for line in lines:
if "ERROR" in line:
yield line
# Sample log data
log_lines = [
"INFO: Starting service",
"ERROR: Connection failed",
"INFO: Retrying...",
"ERROR: Timeout occurred",
"INFO: Service stopped"
]
for line in filter_errors(read_lines(log_lines)):
print(line)11. Working With Compressed Data (ZIP, GZIP)
Compressed Files
Read compressed files directly
import gzip
import io
# Create sample gzipped data
sample_text = "Hello, compressed world!\nLine 2\nLine 3"
compressed = gzip.compress(sample_text.encode())
# Read it back (streaming pattern)
with gzip.open(io.BytesIO(compressed), "rt") as f:
for line in f:
print(line.strip())12. Handling Data That Doesn't Fit In RAM
Count Lines Efficiently
Process huge files with zero memory growth
# Pattern for counting lines in a 10GB file
# Memory usage stays near 0MB!
lines = [f"Line {i}" for i in range(100000)]
total = 0
for _ in lines:
total += 1
print(f"Total lines: {total:,}")13. Avoiding the Most Common File-I/O Mistakes
- β Reading entire large files into memory
- β Using .read() instead of iteration
- β Building huge lists from file rows
- β Tiny writes inside loops
- β Forgetting to close files
- β Failing to stream compressed data
- β Loading massive JSON using json.load()
- β Storing temporary files unnecessarily
14. Best Practices Recap
- β Use
withfor everything - β Stream line-by-line
- β Use chunked reads for large binaries
- β Use ijson for large JSON
- β Use csv reader instead of list(reader)
- β Use memory mapping for random access
- β Use generators to build pipelines
- β Use compression modules without extracting
Part 2: Advanced Streaming & Dataset Engineering
1. Understanding Buffered I/O Depth
Buffered I/O
Low-level and high-level buffered reads
from io import FileIO, BufferedReader
# Python's I/O layers:
# 1. Raw I/O (FileIO) - OS file descriptors
# 2. Buffered I/O - memory buffers
# 3. Text I/O - decoding/encoding
print("I/O Layer Architecture:")
print(" Raw I/O β direct OS interaction")
print(" Buffered β efficient chunking")
print(" Text I/O β Unicode handling")
# Larger buffer sizes improve throughput
# with open("large.bin", "rb") as f:
# reader = BufferedReader(f, buffer_size=1024*1024)2. Random Access With Seek
File Seek
Jump to specific file positions
import io
# Create sample file-like object
data = b"HEADER_DATA_HERE" + b"X" * 100 + b"TARGET"
file = io.BytesIO(data)
# Jump to position
file.seek(116) # Skip to "TARGET"
print(f"Read at position 116: {file.read(6)}")
# Go back to start
file.seek(0)
print(f"Header: {file.read(16)}")3. Streaming API Data
API Streaming
Stream data from HTTP responses
# Pattern for streaming API data
# Prevents loading giant payloads into RAM
# import requests
# with requests.get(url, stream=True) as r:
# for chunk in r.iter_content(1024):
# process(chunk)
print("Streaming API pattern:")
print("1. Set stream=True in request")
print("2. Use iter_content() or iter_lines()")
print("3. Process chunks incrementally")
print("")
print("Use cases:")
print(" - Live logs")
print(" - Large JSON exports")
print(" - Video streaming")
print(" - Binary dow
...4. Chunked CSV with Pandas
Pandas Chunking
Process large CSVs in chunks
# Pattern for large CSVs with pandas
# import pandas as pd
# for chunk in pd.read_csv("big.csv", chunksize=100_000):
# process(chunk)
print("Pandas chunked reading:")
print(" - Only small chunks in memory")
print(" - Fast vectorized operations")
print(" - Ideal for multi-GB datasets")
print("")
print("Parameters:")
print(" chunksize=100000 # rows per chunk")
print(" usecols=[...] # only needed columns")
print(" dtype={...} # optimize memory")5. Producer/Consumer with Backpressure
Queue Backpressure
Control memory with bounded queues
from queue import Queue
from threading import Thread
import time
q = Queue(maxsize=5) # Limit queue size
def producer():
for i in range(10):
print(f"Producing {i}")
q.put(i)
time.sleep(0.1)
q.put(None) # Signal done
def consumer():
while True:
item = q.get()
if item is None:
break
print(f" Consuming {item}")
time.sleep(0.2)
q.task_done()
# Run pipeline
t1 = Thread(target=producer)
t2 = Thread(target=co
...6. Live Log Tailing
Log Tailing
Watch files for new content
import time
def follow(lines, delay=0.1):
"""Simulate tail -f behavior."""
for line in lines:
yield line
time.sleep(delay)
# Demo with sample data
log_entries = [
"[INFO] Server started",
"[DEBUG] Connection opened",
"[ERROR] Timeout on request",
"[INFO] Retry successful",
]
print("Tailing log file...")
for line in follow(log_entries):
print(line)7. Temporary Files
Temporary Files
Create auto-cleaned temp files
import tempfile
import os
# Create temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as tmp:
tmp.write("Temporary data")
tmp_path = tmp.name
print(f"Created: {tmp_path}")
# Read it back
with open(tmp_path) as f:
print(f"Content: {f.read()}")
# Clean up
os.unlink(tmp_path)
print("Cleaned up!")8. Multiprocessing for Parallel Files
Parallel Processing
Process multiple files in parallel
from multiprocessing import Pool
def process_file(filename):
"""Process a single file."""
# Simulate processing
return f"Processed: {filename}"
# Files to process
files = ["data1.csv", "data2.csv", "data3.csv"]
# Process in parallel
with Pool(processes=3) as pool:
results = pool.map(process_file, files)
for result in results:
print(result)9. Binary Struct Parsing
Struct Parsing
Parse binary records efficiently
import struct
# Define record format: int, float, float
fmt = "I f f" # unsigned int, 2 floats
size = struct.calcsize(fmt)
# Create sample binary data
records = [
struct.pack(fmt, 1, 23.5, 65.2),
struct.pack(fmt, 2, 24.1, 68.0),
struct.pack(fmt, 3, 22.8, 70.5),
]
data = b"".join(records)
# Parse records
print(f"Record size: {size} bytes")
for i in range(0, len(data), size):
sensor_id, temp, humidity = struct.unpack(fmt, data[i:i+size])
print(f"Sensor {sensor_id}: temp={te
...10. Composable Pipeline Functions
Pipeline Composition
Build UNIX-style data pipelines
def read_data(items):
for item in items:
yield item
def strip_whitespace(items):
for item in items:
yield item.strip()
def filter_empty(items):
for item in items:
if item:
yield item
def uppercase(items):
for item in items:
yield item.upper()
# Compose pipeline
data = [" hello ", "", " world ", " python ", ""]
pipeline = uppercase(filter_empty(strip_whitespace(read_data(data))))
print("Pipeline output:")
for item in pipeline:
...11. Avoiding Common Streaming Pitfalls
- β Forgetting .close() outside a with
- β Inefficient small reads
- β Building giant Python lists
- β Using pandas on files too large
- β Decompressing entire files unnecessarily
- β Excessive text splits / regex in hot loops
- β Ignoring buffer sizes
- β Reading binary as text
Part 3: High-Volume File & Data Processing
1. Streaming Compressed Files
GZIP Streaming
Read compressed files without full decompression
import gzip
import io
# Create compressed data
original = "\n".join([f"Log line {i}" for i in range(100)])
compressed = gzip.compress(original.encode())
print(f"Original: {len(original)} bytes")
print(f"Compressed: {len(compressed)} bytes")
# Stream it
with gzip.open(io.BytesIO(compressed), "rt") as f:
count = 0
for line in f:
count += 1
print(f"Streamed {count} lines")2. Zero-Copy with Memoryview
Memoryview
Slice bytes without copying
# Memoryview enables zero-copy slicing
data = bytearray(b"HEADER" + b"X" * 1000 + b"FOOTER")
# Create memoryview (no copy!)
mv = memoryview(data)
# Slice without allocating new memory
header = mv[:6]
footer = mv[-6:]
print(f"Header: {bytes(header)}")
print(f"Footer: {bytes(footer)}")
print(f"Total bytes: {len(data)}")
print("No memory was copied!")3. Async File I/O
Async Files
Async file operations with aiofiles
import asyncio
async def process_data():
"""Simulate async file processing."""
# With aiofiles:
# async with aiofiles.open("file.txt") as f:
# async for line in f:
# await process(line)
items = ["item1", "item2", "item3"]
for item in items:
await asyncio.sleep(0.1)
print(f"Processed: {item}")
asyncio.run(process_data())
print("Async processing complete!")4. Multiprocessing with imap
imap Processing
Stream results from parallel workers
from multiprocessing import Pool
def transform(x):
"""CPU-intensive transformation."""
return x ** 2
data = range(10)
# imap_unordered streams results as they complete
with Pool(4) as p:
results = list(p.imap_unordered(transform, data, chunksize=2))
print("Results:", sorted(results))5. Rolling Window Processing
Rolling Window
Process data with sliding windows
from collections import deque
def rolling_average(data, window_size):
"""Calculate rolling average."""
window = deque(maxlen=window_size)
for value in data:
window.append(value)
if len(window) == window_size:
avg = sum(window) / window_size
yield avg
# Demo
prices = [100, 102, 98, 103, 105, 101, 99, 104]
print("5-period rolling average:")
for avg in rolling_average(prices, 5):
print(f" {avg:.2f}")6. Endless Streaming Pipelines
Streaming Pipeline
Build memory-efficient data flows
def generate():
"""Generate endless data."""
for i in range(1000):
yield f"record_{i}"
def clean(records):
for r in records:
yield r.strip()
def enrich(records):
for r in records:
yield {"id": r, "processed": True}
def take(records, n):
for i, r in enumerate(records):
if i >= n:
break
yield r
# Build pipeline (no memory growth!)
pipeline = take(enrich(clean(generate())), 5)
print("First 5 records:")
for record in pipeli
...7. Atomic File Writes
Atomic Writes
Safe file writing that prevents corruption
import tempfile
import os
def atomic_write(path, data):
"""Write atomically to prevent corruption."""
# Write to temp file first
dir_name = os.path.dirname(path) or "."
fd, tmp_path = tempfile.mkstemp(dir=dir_name)
try:
with os.fdopen(fd, 'w') as f:
f.write(data)
# Atomic rename
os.replace(tmp_path, path)
print(f"Atomically wrote to {path}")
except:
os.unlink(tmp_path)
raise
# Demo
atomic_write("/tmp/test_
...8. Common Pitfalls in Large-Dataset Work
- β NaΓ―ve .read() on giant files
- β Unbounded queues
- β Excessive process spawning
- β Forgetting to chunk operations
- β Mixing CPU & I/O tasks in same thread
- β Converting everything to pandas DataFrame
- β Relying on recursion for streaming
π Final Summary
You've now mastered professional file handling and large-scale data processing in Python.
You can now:
- Stream massive files without loading them into memory
- Build efficient data pipelines with generators
- Process compressed and encrypted data on the fly
- Handle binary formats and structured records
- Use memory mapping for fast random access
- Implement multiprocessing for parallel file processing
- Work with columnar formats like Parquet
- Build real-time log monitoring systems
- Handle terabyte-scale datasets efficiently
π Quick Reference β Files & Streams
| Syntax | What it does |
|---|---|
| with open(path, 'rb') as f: | Open file in binary read mode |
| pathlib.Path(path) | Modern path manipulation |
| for chunk in iter(f.read, b''): | Read large file in chunks |
| csv.DictReader(f) | Read CSV as list of dicts |
| io.StringIO(data) | In-memory file-like object |
π Great work! You've completed this lesson.
You can now handle files of any size efficiently, parse CSV/JSON/binary formats, and stream large datasets without memory issues.
Up next: Advanced Collections β master Counter, deque, ChainMap, itertools, and functools.
Sign up for free to track which lessons you've completed and get learning reminders.