Skip to content

When Do I Need to Use Locks in Python asyncio?

I kept adding asyncio.Lock() everywhere in my async code. Every shared variable got a lock. Every method got wrapped in async with self._lock:. I was terrified of race conditions.

Then I saw this comment on Reddit:

“a lot of async code doesn’t need any synchronization primitives. Plain lists can often be used instead of queues, plain bools/ints can often be used instead of events/semaphores”

Wait, what? I had locks protecting simple dictionary lookups. Was I doing it wrong?

The Problem: Overthinking Synchronization

I was building a simple task registry:

task_registry_wrong.py
import asyncio
class TaskRegistry:
def __init__(self):
self._tasks: dict[str, asyncio.Task] = {}
self._lock = asyncio.Lock() # Do I need this?
async def add_task(self, name: str, task: asyncio.Task):
async with self._lock: # Overkill?
self._tasks[name] = task
async def remove_task(self, name: str):
async with self._lock: # Really necessary?
self._tasks.pop(name, None)
async def get_task(self, name: str) -> asyncio.Task | None:
async with self._lock: # For a simple read?
return self._tasks.get(name)

Every operation wrapped in a lock. Felt “safe.” But was it?

Then I remembered something critical about asyncio that changed everything.

The Key Insight: Asyncio Is NOT Threading

In threading, any shared mutable state needs protection because threads can interrupt at any CPU instruction. Your thread could be paused mid-operation at any time.

But asyncio is fundamentally different:

┌─────────────────────────────────────────────────────────────┐
│ THREADING MODEL │
├─────────────────────────────────────────────────────────────┤
│ Thread A: ──┬──[read]──┬──[modify]──┬──[write]──┬──→ │
│ │ │ │ │ │
│ Thread B: └──[can interrupt at ANY point!]──┘→ │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ ASYNCIO MODEL │
├─────────────────────────────────────────────────────────────┤
│ Coroutine A: ──[read]──[modify]──[write]──→ │
│ │ │
│ Switch only │ ┌─ await point ─┐ │
│ happens at: └─┤ ├─→ │
└─────────────────────────────────────────────────────────────┘

Asyncio runs in a single thread with cooperative multitasking.

Coroutines only switch at explicit await points. This means:

  • Synchronous code blocks (no await) run atomically
  • Another coroutine cannot interrupt mid-operation
  • The “race condition” mental model from threading is often wrong for asyncio

The Rule: When Locks Are Actually Needed

After reading the Reddit discussion and testing, I found a simple rule:

You only need an asyncio.Lock when shared state crosses an await boundary.

If your code:

  1. Reads shared state
  2. Awaits (giving control to other coroutines)
  3. Then writes to that same shared state based on the earlier read

…you need a lock.

If there’s no await between read and write? No lock needed.

Experiment 1: Simple Dictionary Operations

Let me test my task registry without locks:

test_no_lock.py
import asyncio
class TaskRegistry:
def __init__(self):
self._tasks: dict[str, asyncio.Task] = {}
# No lock!
def add_task(self, name: str, task: asyncio.Task):
self._tasks[name] = task # Atomic - no await
def remove_task(self, name: str):
self._tasks.pop(name, None) # Atomic - no await
def get_task(self, name: str) -> asyncio.Task | None:
return self._tasks.get(name) # Atomic - no await
async def worker(name: str, registry: TaskRegistry):
for i in range(100):
task = asyncio.create_task(asyncio.sleep(0.01))
registry.add_task(f"{name}_{i}", task)
print(f"{name}: Added 100 tasks")
async def main():
registry = TaskRegistry()
# Multiple coroutines modifying the same dict
await asyncio.gather(
worker("A", registry),
worker("B", registry),
worker("C", registry),
)
print(f"Total tasks: {len(registry._tasks)}") # Should be 300
asyncio.run(main())

Output:

Output
A: Added 100 tasks
B: Added 100 tasks
C: Added 100 tasks
Total tasks: 300

No race conditions. No corrupted state. The dictionary operations are atomic because no await happens during them.

Experiment 2: When I Actually Need a Lock

Now let me create a scenario where a lock IS needed:

rate_limiter_with_lock.py
import asyncio
class AsyncRateLimiter:
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window_seconds = window_seconds
self._requests: list[float] = []
self._lock = asyncio.Lock()
async def acquire(self) -> bool:
async with self._lock:
now = asyncio.get_event_loop().time()
# Step 1: Read and filter state
cutoff = now - self.window_seconds
self._requests = [t for t in self._requests if t > cutoff]
# Step 2: Check condition
if len(self._requests) >= self.max_requests:
return False
# Step 3: CRITICAL - await allows other coroutines to run!
# Without lock, another coroutine could:
# - Also pass the len() check above
# - Also append to _requests
# - Result: more than max_requests allowed!
await asyncio.sleep(0.01) # Simulate some async work
# Step 4: Write state
self._requests.append(now)
return True
async def try_acquire(limiter: AsyncRateLimiter, name: str):
result = await limiter.acquire()
print(f"{name}: {'granted' if result else 'denied'}")
async def main():
limiter = AsyncRateLimiter(max_requests=2, window_seconds=1.0)
# Try to acquire 5 times concurrently
await asyncio.gather(
try_acquire(limiter, "A"),
try_acquire(limiter, "B"),
try_acquire(limiter, "C"),
try_acquire(limiter, "D"),
try_acquire(limiter, "E"),
)
asyncio.run(main())

Without the lock, I could get this:

Output (without lock - WRONG)
A: granted
B: granted
C: granted # Should be denied!
D: denied
E: denied

With the lock, I get the correct behavior:

Output (with lock - CORRECT)
A: granted
B: granted
C: denied # Correct!
D: denied
E: denied

The await in the middle of the check-modify-write sequence is what creates the race condition.

The Classic Check-Then-Act Pattern

This is where I see the most bugs. Pattern:

check_then_act_wrong.py
async def update_cache(self, key: str):
# Check-then-act with await in between
if key not in self._cache:
# PROBLEM: Other coroutine could also enter here!
await self._fetch_from_db(key)
# By now, _cache might have the key from another coroutine
self._cache[key] = ... # Overwrite or duplicate!

Correct with lock:

check_then_act_correct.py
async def get(self, key: str) -> str:
# Fast path: already cached (atomic, no lock needed)
if key in self._cache:
return self._cache[key]
# Slow path: need to load - this requires synchronization
async with self._global_lock:
# Double-check after acquiring lock
if key in self._cache:
return self._cache[key]
# Fetch crosses await - other coroutines could run
value = await self._fetch_from_source(key)
# Update cache
self._cache[key] = value
return value

The double-check pattern (check before lock, check after lock) avoids unnecessary lock contention for the fast path while still protecting the slow path.

Common Mistakes I Was Making

Mistake 1: Locking Atomic Operations

mistake_locking_atomic.py
# WRONG - unnecessary lock
async def get_config(self):
async with self._lock: # This lock does nothing useful
return self._config # No await, no modification

A simple read with no await doesn’t need a lock.

Mistake 2: Using Threading.Lock with Async

mistake_threading_lock.py
import threading # WRONG for asyncio!
class AsyncService:
def __init__(self):
self._lock = threading.Lock() # WRONG!
async def operation(self):
with self._lock: # Blocks the entire thread!
await some_async_op() # Defeats the purpose of async

threading.Lock blocks the thread, which blocks all coroutines. Always use asyncio.Lock for async code.

Mistake 3: Locking a Simple Counter

mistake_counter.py
class Counter:
def __init__(self):
self._value = 0
self._lock = asyncio.Lock() # Often unnecessary!
async def increment(self):
async with self._lock: # Lock acquired...
self._value += 1 # This is atomic! No await here!
# Lock released

If increment doesn’t await, the lock is pointless:

counter_better.py
class SimpleCounter:
def __init__(self):
self._value = 0
def increment(self) -> int: # Note: NOT async!
self._value += 1 # Atomic - no await, no interruption
return self._value

Visual Decision Guide

┌─────────────────────────────────────────────────────────────┐
│ DO I NEED A LOCK? │
├─────────────────────────────────────────────────────────────┤
│ │
│ Does your code share mutable state? │
│ │ │
│ ▼ │
│ ┌─────┴─────┐ │
│ │ NO │──→ No lock needed │
│ └─────┬─────┘ │
│ │ YES │
│ ▼ │
│ Is there an await between read and write? │
│ │ │
│ ┌──────────┼──────────┐ │
│ ▼ ▼ │
│ ┌─────┴─────┐ ┌─────┴─────┐ │
│ │ NO │ │ YES │ │
│ └─────┬─────┘ └─────┬─────┘ │
│ │ │ │
│ ▼ ▼ │
│ No lock needed Lock IS needed │
│ (atomic in asyncio) (state crosses │
│ await boundary) │
│ │
└─────────────────────────────────────────────────────────────┘

What I Changed in My Code

After this realization, I simplified my task registry:

task_registry_final.py
import asyncio
class TaskRegistry:
"""No locks needed - all operations are atomic."""
def __init__(self):
self._tasks: dict[str, asyncio.Task] = {}
def add_task(self, name: str, task: asyncio.Task):
self._tasks[name] = task
def remove_task(self, name: str):
self._tasks.pop(name, None)
def get_task(self, name: str) -> asyncio.Task | None:
return self._tasks.get(name)
def list_tasks(self) -> list[str]:
return list(self._tasks.keys())

Cleaner. Simpler. No locks.

And for my rate limiter, I kept the lock because the check-modify-write crosses an await:

rate_limiter_final.py
class AsyncRateLimiter:
"""Lock needed - check-modify-write crosses await boundary."""
def __init__(self, max_requests: int, window_seconds: float):
self.max_requests = max_requests
self.window_seconds = window_seconds
self._requests: list[float] = []
self._lock = asyncio.Lock()
async def acquire(self) -> bool:
async with self._lock:
now = asyncio.get_event_loop().time()
cutoff = now - self.window_seconds
self._requests = [t for t in self._requests if t > cutoff]
if len(self._requests) >= self.max_requests:
return False
# This await is why we need the lock!
await asyncio.sleep(0.01)
self._requests.append(now)
return True

The Mental Model Shift

I had to unlearn threading instincts:

THREADING MENTAL MODEL (wrong for asyncio):
Shared state = needs lock
Every method = potential race condition
ASYNCIO MENTAL MODEL (correct):
Shared state + await = needs lock
No await = atomic

This non-interruption property is what makes writing async code so much simpler than multithreaded code. I just had to trust it.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments