Skip to content

How Do I Ensure Safe File Writes in Python Data Pipelines?

Python programming

My ETL pipeline crashed at 3 AM. When I checked the logs the next morning, the pipeline had stopped halfway through processing a large JSON file. I restarted it, but it immediately skipped the file because “it already existed.” The downstream processor then failed with a JSON parse error.

I opened the file and found this:

corrupted_output.json
{
"records": [
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
{"id": 3, "name": "Char

The file was incomplete. The crash had left a corrupted partial file, and my pipeline’s existence check treated it as a successful output. This pattern repeated multiple times, each requiring manual cleanup and investigation.

Why Standard File Writes Are Dangerous in Pipelines

The problematic pattern in my pipeline was simple but dangerous:

unsafe_pipeline.py
import json
import os
def run_pipeline(input_file, output_file):
# Check if already processed
if os.path.exists(output_file):
print(f"Skipping {output_file} - already exists")
return
# Process and write
data = transform_data(input_file)
with open(output_file, 'w') as f:
json.dump(data, f) # Crash here = corrupted file

This assumes file existence equals successful completion. But when the process crashes during the json.dump() call, the file exists but contains incomplete data. The next run sees the file, skips processing, and passes corrupted data downstream.

Crash Impact Diagram
NORMAL EXECUTION:
Check exists? NO → Process → Write complete → File valid → Done
CRASH DURING WRITE:
Check exists? NO → Process → Write starts → CRASH → File partial
Next run: Check exists? YES → Skip → Corrupted file passed downstream → FAILURE

I tried several workarounds before finding the right solution.

My Failed Attempts

Attempt 1: File Size Checks

I added a size check to detect partial files:

attempt_size_check.py
import os
def run_pipeline(input_file, output_file):
if os.path.exists(output_file) and os.path.getsize(output_file) > 1000:
print(f"Skipping {output_file}")
return
data = transform_data(input_file)
with open(output_file, 'w') as f:
json.dump(data, f)

This failed because partial files can have arbitrary sizes. A crash at different points produces different partial sizes. I couldn’t predict the minimum valid size for every output file.

Attempt 2: Manual Temp File Management

I tried writing to a temp file first, then renaming:

attempt_temp_file.py
import os
import json
def run_pipeline(input_file, output_file):
if os.path.exists(output_file):
return
temp_file = output_file + '.tmp'
data = transform_data(input_file)
try:
with open(temp_file, 'w') as f:
json.dump(data, f)
os.rename(temp_file, output_file)
except Exception as e:
if os.path.exists(temp_file):
os.remove(temp_file)
raise

This approach had problems:

  1. Forgot cleanup in some exception paths
  2. Race conditions when multiple processes ran
  3. Had to handle the temp file logic in every write operation
  4. Made the code verbose and error-prone

I was implementing the same pattern repeatedly, inconsistently, across dozens of write operations.

The Solution: Atomic Writes with safer

A Reddit discussion pointed me to the safer library. It implements the write-to-temp-then-rename pattern correctly, handling all edge cases automatically:

safe_pipeline.py
import json
import os
import safer
def run_pipeline(input_file, output_file):
if os.path.exists(output_file):
print(f"Skipping {output_file} - already exists")
return
data = transform_data(input_file)
# Atomic write: temp file renamed only on success
with safer.open(output_file, 'w') as f:
json.dump(data, f) # Crash here = no file created

The difference is subtle but critical. With safer, the output file either:

  • Exists and is complete/valid (write succeeded)
  • Doesn’t exist at all (write failed or crashed)

No gray areas. No partial files. No corrupted state.

safer Behavior Diagram
SUCCESSFUL WRITE:
safer.open() → Creates temp file → Write data → Close → Rename to target → Target exists (complete)
CRASH DURING WRITE:
safer.open() → Creates temp file → Write starts → CRASH → Temp file orphaned
Next run: Check target exists? NO → Process again → Correct output
The target file never exists in partial state.

How safer Works Internally

The library uses this pattern:

safer Internal Logic
1. Create temporary file in same directory as target
2. Write all data to temporary file
3. On successful close: atomically rename temp to target
4. On exception/crash: temp file never renamed, target unchanged
5. Cleanup: orphaned temp files handled on next run

The rename operation is atomic on most filesystems. Either the rename completes or it doesn’t - no intermediate state. This is the same technique databases use for crash recovery.

Using safer with Different File Types

The library works with any file mode:

safer_examples.py
import safer
import pandas as pd
import pickle
# Text files
with safer.open('output.json', 'w') as f:
json.dump(data, f)
# Binary files
with safer.open('model.pkl', 'wb') as f:
pickle.dump(model, f)
# CSV with pandas
with safer.open('data.csv', 'w') as f:
df.to_csv(f, index=False)
# Multiple related files
for name, content in files_dict.items():
with safer.open(f'{name}.txt', 'w') as f:
f.write(content)

Each write is protected independently. If one fails, others that succeeded remain valid.

Why This Changed My Pipeline Reliability

After switching to safer, my pipeline behavior changed:

Before vs After Comparison
BEFORE (with open()):
Crash rate: 2-3 per week
Corrupted files: ~5 per month
Manual cleanup: 30 minutes per incident
False "file exists" skips: Common
AFTER (with safer.open()):
Crash rate: Same (infrastructure issue)
Corrupted files: Zero
Manual cleanup: Zero
False skips: Zero
Crashes still happen, but they leave no artifacts.

The key benefit: I can now trust file existence checks. When os.path.exists() returns True, the file is valid and complete. When it returns False, the file needs processing. No validation needed, no cleanup required.

Common Mistakes to Avoid

Even with safer, some patterns remain dangerous:

common_mistakes.py
# MISTAKE 1: Writing outside safer context
with safer.open('output.json', 'w') as f:
json.dump(data, f)
# Don't do additional writes after the context closes
with open('output.json', 'a') as f2: # Bypasses safer protection
f2.write('\n')
# MISTAKE 2: Using safer only for some writes
with safer.open('critical.json', 'w') as f: # Protected
json.dump(critical_data, f)
with open('logs.txt', 'w') as f: # Not protected - can corrupt
f.write(log_data)
# MISTAKE 3: Checking temp files manually
# Don't check for .tmp files - safer handles cleanup
# Just check the target file

Apply safer consistently to all writes where corruption matters. Logs that can be regenerated might not need it. Critical data outputs always need it.

Installation and Integration

installation.sh
pip install safer

No configuration needed. The API mirrors the standard open() function, making migration straightforward:

Migration Pattern
Find: with open('file.json', 'w') as f:
Replace: with safer.open('file.json', 'w') as f:
Find: with open('file.bin', 'wb') as f:
Replace: with safer.open('file.bin', 'wb') as f:

The change is minimal but the impact on reliability is significant.

Summary

My pipeline corruption problem came from assuming file existence means successful completion. The safer library solves this by guaranteeing atomic writes: files either exist completely or don’t exist at all.

Key points:

  1. Standard open() can leave partial files after crashes
  2. Existence checks become unreliable with partial files
  3. safer writes to temp file, renames only on success
  4. Crash recovery is automatic - just rerun the pipeline
  5. No manual cleanup, no validation logic needed

After implementing safer, my pipeline handles crashes gracefully. The 3 AM failures no longer create morning cleanup tasks. Files are either complete or nonexistent - the state is always clean.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments