When Do I Need to Use Locks in Python asyncio?
I kept adding asyncio.Lock() everywhere in my async code. Every shared variable got a lock. Every method got wrapped in async with self._lock:. I was terrified of race conditions.
Then I saw this comment on Reddit:
“a lot of async code doesn’t need any synchronization primitives. Plain lists can often be used instead of queues, plain bools/ints can often be used instead of events/semaphores”
Wait, what? I had locks protecting simple dictionary lookups. Was I doing it wrong?
The Problem: Overthinking Synchronization
I was building a simple task registry:
import asyncio
class TaskRegistry: def __init__(self): self._tasks: dict[str, asyncio.Task] = {} self._lock = asyncio.Lock() # Do I need this?
async def add_task(self, name: str, task: asyncio.Task): async with self._lock: # Overkill? self._tasks[name] = task
async def remove_task(self, name: str): async with self._lock: # Really necessary? self._tasks.pop(name, None)
async def get_task(self, name: str) -> asyncio.Task | None: async with self._lock: # For a simple read? return self._tasks.get(name)Every operation wrapped in a lock. Felt “safe.” But was it?
Then I remembered something critical about asyncio that changed everything.
The Key Insight: Asyncio Is NOT Threading
In threading, any shared mutable state needs protection because threads can interrupt at any CPU instruction. Your thread could be paused mid-operation at any time.
But asyncio is fundamentally different:
┌─────────────────────────────────────────────────────────────┐│ THREADING MODEL │├─────────────────────────────────────────────────────────────┤│ Thread A: ──┬──[read]──┬──[modify]──┬──[write]──┬──→ ││ │ │ │ │ ││ Thread B: └──[can interrupt at ANY point!]──┘→ │└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐│ ASYNCIO MODEL │├─────────────────────────────────────────────────────────────┤│ Coroutine A: ──[read]──[modify]──[write]──→ ││ │ ││ Switch only │ ┌─ await point ─┐ ││ happens at: └─┤ ├─→ │└─────────────────────────────────────────────────────────────┘Asyncio runs in a single thread with cooperative multitasking.
Coroutines only switch at explicit await points. This means:
- Synchronous code blocks (no await) run atomically
- Another coroutine cannot interrupt mid-operation
- The “race condition” mental model from threading is often wrong for asyncio
The Rule: When Locks Are Actually Needed
After reading the Reddit discussion and testing, I found a simple rule:
You only need an asyncio.Lock when shared state crosses an await boundary.
If your code:
- Reads shared state
- Awaits (giving control to other coroutines)
- Then writes to that same shared state based on the earlier read
…you need a lock.
If there’s no await between read and write? No lock needed.
Experiment 1: Simple Dictionary Operations
Let me test my task registry without locks:
import asyncio
class TaskRegistry: def __init__(self): self._tasks: dict[str, asyncio.Task] = {} # No lock!
def add_task(self, name: str, task: asyncio.Task): self._tasks[name] = task # Atomic - no await
def remove_task(self, name: str): self._tasks.pop(name, None) # Atomic - no await
def get_task(self, name: str) -> asyncio.Task | None: return self._tasks.get(name) # Atomic - no await
async def worker(name: str, registry: TaskRegistry): for i in range(100): task = asyncio.create_task(asyncio.sleep(0.01)) registry.add_task(f"{name}_{i}", task) print(f"{name}: Added 100 tasks")
async def main(): registry = TaskRegistry()
# Multiple coroutines modifying the same dict await asyncio.gather( worker("A", registry), worker("B", registry), worker("C", registry), )
print(f"Total tasks: {len(registry._tasks)}") # Should be 300
asyncio.run(main())Output:
A: Added 100 tasksB: Added 100 tasksC: Added 100 tasksTotal tasks: 300No race conditions. No corrupted state. The dictionary operations are atomic because no await happens during them.
Experiment 2: When I Actually Need a Lock
Now let me create a scenario where a lock IS needed:
import asyncio
class AsyncRateLimiter: def __init__(self, max_requests: int, window_seconds: float): self.max_requests = max_requests self.window_seconds = window_seconds self._requests: list[float] = [] self._lock = asyncio.Lock()
async def acquire(self) -> bool: async with self._lock: now = asyncio.get_event_loop().time()
# Step 1: Read and filter state cutoff = now - self.window_seconds self._requests = [t for t in self._requests if t > cutoff]
# Step 2: Check condition if len(self._requests) >= self.max_requests: return False
# Step 3: CRITICAL - await allows other coroutines to run! # Without lock, another coroutine could: # - Also pass the len() check above # - Also append to _requests # - Result: more than max_requests allowed! await asyncio.sleep(0.01) # Simulate some async work
# Step 4: Write state self._requests.append(now) return True
async def try_acquire(limiter: AsyncRateLimiter, name: str): result = await limiter.acquire() print(f"{name}: {'granted' if result else 'denied'}")
async def main(): limiter = AsyncRateLimiter(max_requests=2, window_seconds=1.0)
# Try to acquire 5 times concurrently await asyncio.gather( try_acquire(limiter, "A"), try_acquire(limiter, "B"), try_acquire(limiter, "C"), try_acquire(limiter, "D"), try_acquire(limiter, "E"), )
asyncio.run(main())Without the lock, I could get this:
A: grantedB: grantedC: granted # Should be denied!D: deniedE: deniedWith the lock, I get the correct behavior:
A: grantedB: grantedC: denied # Correct!D: deniedE: deniedThe await in the middle of the check-modify-write sequence is what creates the race condition.
The Classic Check-Then-Act Pattern
This is where I see the most bugs. Pattern:
async def update_cache(self, key: str): # Check-then-act with await in between if key not in self._cache: # PROBLEM: Other coroutine could also enter here! await self._fetch_from_db(key) # By now, _cache might have the key from another coroutine self._cache[key] = ... # Overwrite or duplicate!Correct with lock:
async def get(self, key: str) -> str: # Fast path: already cached (atomic, no lock needed) if key in self._cache: return self._cache[key]
# Slow path: need to load - this requires synchronization async with self._global_lock: # Double-check after acquiring lock if key in self._cache: return self._cache[key]
# Fetch crosses await - other coroutines could run value = await self._fetch_from_source(key)
# Update cache self._cache[key] = value return valueThe double-check pattern (check before lock, check after lock) avoids unnecessary lock contention for the fast path while still protecting the slow path.
Common Mistakes I Was Making
Mistake 1: Locking Atomic Operations
# WRONG - unnecessary lockasync def get_config(self): async with self._lock: # This lock does nothing useful return self._config # No await, no modificationA simple read with no await doesn’t need a lock.
Mistake 2: Using Threading.Lock with Async
import threading # WRONG for asyncio!
class AsyncService: def __init__(self): self._lock = threading.Lock() # WRONG!
async def operation(self): with self._lock: # Blocks the entire thread! await some_async_op() # Defeats the purpose of asyncthreading.Lock blocks the thread, which blocks all coroutines. Always use asyncio.Lock for async code.
Mistake 3: Locking a Simple Counter
class Counter: def __init__(self): self._value = 0 self._lock = asyncio.Lock() # Often unnecessary!
async def increment(self): async with self._lock: # Lock acquired... self._value += 1 # This is atomic! No await here! # Lock releasedIf increment doesn’t await, the lock is pointless:
class SimpleCounter: def __init__(self): self._value = 0
def increment(self) -> int: # Note: NOT async! self._value += 1 # Atomic - no await, no interruption return self._valueVisual Decision Guide
┌─────────────────────────────────────────────────────────────┐│ DO I NEED A LOCK? │├─────────────────────────────────────────────────────────────┤│ ││ Does your code share mutable state? ││ │ ││ ▼ ││ ┌─────┴─────┐ ││ │ NO │──→ No lock needed ││ └─────┬─────┘ ││ │ YES ││ ▼ ││ Is there an await between read and write? ││ │ ││ ┌──────────┼──────────┐ ││ ▼ ▼ ││ ┌─────┴─────┐ ┌─────┴─────┐ ││ │ NO │ │ YES │ ││ └─────┬─────┘ └─────┬─────┘ ││ │ │ ││ ▼ ▼ ││ No lock needed Lock IS needed ││ (atomic in asyncio) (state crosses ││ await boundary) ││ │└─────────────────────────────────────────────────────────────┘What I Changed in My Code
After this realization, I simplified my task registry:
import asyncio
class TaskRegistry: """No locks needed - all operations are atomic.""" def __init__(self): self._tasks: dict[str, asyncio.Task] = {}
def add_task(self, name: str, task: asyncio.Task): self._tasks[name] = task
def remove_task(self, name: str): self._tasks.pop(name, None)
def get_task(self, name: str) -> asyncio.Task | None: return self._tasks.get(name)
def list_tasks(self) -> list[str]: return list(self._tasks.keys())Cleaner. Simpler. No locks.
And for my rate limiter, I kept the lock because the check-modify-write crosses an await:
class AsyncRateLimiter: """Lock needed - check-modify-write crosses await boundary.""" def __init__(self, max_requests: int, window_seconds: float): self.max_requests = max_requests self.window_seconds = window_seconds self._requests: list[float] = [] self._lock = asyncio.Lock()
async def acquire(self) -> bool: async with self._lock: now = asyncio.get_event_loop().time() cutoff = now - self.window_seconds self._requests = [t for t in self._requests if t > cutoff]
if len(self._requests) >= self.max_requests: return False
# This await is why we need the lock! await asyncio.sleep(0.01)
self._requests.append(now) return TrueThe Mental Model Shift
I had to unlearn threading instincts:
THREADING MENTAL MODEL (wrong for asyncio): Shared state = needs lock Every method = potential race condition
ASYNCIO MENTAL MODEL (correct): Shared state + await = needs lock No await = atomicThis non-interruption property is what makes writing async code so much simpler than multithreaded code. I just had to trust it.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments