How to Avoid Synchronization Primitives in Python asyncio
I was debugging a race condition in my asyncio application when I realized something unsettling: the asyncio.Lock I added was making things worse, not better. The event loop was blocking, throughput tanked, and the bug still appeared intermittently.
After hours of frustration, I discovered the real problem wasn’t the lock—it was that I was writing asyncio code with a threading mental model. The synchronization primitive was a bandage covering a deeper design flaw.
Here’s what I learned about designing asyncio applications that don’t need synchronization primitives at all.
The Threading Mental Model Trap
When I first started with asyncio, I brought my threading habits with me:
import asyncio
class SharedCounter: def __init__(self): self._value = 0 self._lock = asyncio.Lock()
async def increment(self): async with self._lock: self._value += 1 return self._valueThis feels natural if you’re coming from threading. But asyncio runs on a single thread. In pure async code (no run_in_executor, no thread mixing), there’s no true parallelism—only cooperative concurrency.
The key insight: a coroutine only yields at explicit await points. Operations between awaits are atomic from the perspective of other coroutines.
So when I looked at my “race condition” more carefully:
counter = 0
async def increment_twice(): counter += 1 # Atomic - no await await asyncio.sleep(0) # YIELD POINT counter += 1 # Still atomic, but counter may have changed!The bug wasn’t a race condition in the threading sense. It was that I was modifying state across a yield point without accounting for what other coroutines might do during that yield.
The Real Purpose of asyncio Synchronization Primitives
After diving into discussions and documentation, I found an enlightening perspective:
“Those constructs exist because there is a lot of synchronous code that could benefit if migrated to asyncio. Rather than requiring it be redesigned to fit with asyncio oriented design these primitives allow it to be switched over in a piecemeal fashion.”
Synchronization primitives in asyncio are migration scaffolding, not architectural goals. They exist to help legacy code transition piecemeal without a complete redesign.
When should you actually use them?
- Thread mixing: When using
run_in_executorto run synchronous code - Gradual migration: Converting synchronous code to async incrementally
- External resources: Protecting non-asyncio-aware resources
For new async code? There’s a better path.
Pattern 1: Immutable Objects
The root cause of needing synchronization is shared mutable state. Remove the mutable part, and locks become unnecessary.
# BAD: Mutable shared state requires synchronizationclass ConnectionManager: state: str = "disconnected" # Shared mutable!
async def connect(self): self.state = "connecting" await self._establish_connection() self.state = "connected"
# GOOD: Immutable state, return new valuesasync def connect(current_state: str) -> str: if current_state != "disconnected": raise ValueError(f"Cannot connect from {current_state}") await establish_connection() return "connected" # Return new immutable valueThe state flows through the system rather than being stored in one place. Each function receives state and returns new state:
async def run_connection_lifecycle(): state = "disconnected" state = await connect(state) # "connected" state = await disconnect(state) # "disconnected"No locks needed because nothing is shared.
Pattern 2: Message Passing via Queues
Instead of sharing state between coroutines, have each own its state and communicate via messages.
import asynciofrom dataclasses import dataclassfrom typing import Any
@dataclassclass Message: type: str payload: Any reply_to: asyncio.Queue | None = None
class Actor: """Actor with isolated state, communicates via messages."""
def __init__(self, name: str): self.name = name self.queue: asyncio.Queue[Message] = asyncio.Queue() self._state: dict = {} # Private state, never shared
async def send(self, msg: Message): await self.queue.put(msg)
async def run(self): while True: msg = await self.queue.get() result = await self.handle(msg) if msg.reply_to: await msg.reply_to.put(result)
async def handle(self, msg: Message) -> Any: if msg.type == "increment": self._state["count"] = self._state.get("count", 0) + 1 return self._state["count"] return NoneEach actor owns its state exclusively. No coroutine can touch another’s state directly—they communicate through messages:
async def main(): counter = Actor("counter")
async with asyncio.TaskGroup() as tg: tg.create_task(counter.run())
# Request increment via message reply = asyncio.Queue() await counter.send(Message("increment", None, reply)) result = await reply.get() print(f"Count: {result}") # Count: 1This pattern—often called the Actor model—eliminates shared state by design.
Pattern 3: Structured Concurrency with TaskGroup
Python 3.11’s TaskGroup provides structured concurrency with automatic result collection.
# BAD: gather() with shared resultsresults = [] # Shared mutable listawait asyncio.gather( fetch_and_append(results, url1), fetch_and_append(results, url2),)
# GOOD: TaskGroup returns immutable resultsasync def fetch_all(urls): async with asyncio.TaskGroup() as tg: tasks = [tg.create_task(fetch(url)) for url in urls] # Results are collected automatically, no shared list return [task.result() for task in tasks]The difference:
gather() approach: - Create shared mutable container - Multiple tasks append to it - Need synchronization for safety
TaskGroup approach: - Each task produces its own result - Results collected after all complete - No shared state during executionPattern 4: Pass Values, Not References
This was the hardest habit to break. In Python, objects are passed by reference by default, which means shared references create hidden coupling.
# BAD: Shared reference creates hidden couplingclass Processor: data: dict = {} # Shared by reference!
async def process(self, key): self.data[key] = await fetch(key)
# GOOD: Pass copies/valuesasync def process_item(data: dict, key: str) -> dict: new_value = await fetch(key) return {**data, key: new_value} # Immutable updateThe hidden coupling problem:
With shared references: Coroutine A ─────┐ ├──> shared_dict (RACE CONDITION) Coroutine B ─────┘
With value passing: Coroutine A ───> receives dict_copy ───> returns new_dict Coroutine B ───> receives dict_copy ───> returns new_dict
No shared state, no race conditions.When You Actually Need Synchronization
I’m not saying synchronization primitives are useless. They’re essential for:
- Thread mixing - When
run_in_executorintroduces actual threads - Callback-based APIs - When external code modifies state unpredictably
- Gradual migration - Converting synchronous code incrementally
But for pure async code on a single event loop? Start with the assumption that you don’t need them.
Common Mistakes I Made
Mistake 1: Using threading primitives in asyncio
# WRONG: threading.Lock blocks the event loop!import threadinglock = threading.Lock()
async def unsafe(): with lock: # Blocks entire event loop! await slow_operation()Threading locks are synchronous—they’ll freeze your event loop.
Mistake 2: Over-engineering with Condition variables
state = "disconnected"condition = asyncio.Condition()
async def wait_for_state(target): async with condition: await condition.wait_for(lambda: state == target)
# The "lost update" problem:# If state changes from "closing" to "closed" before# wait_for_state checks, it never sees "closing"!State machines with conditions are fragile. Immutable state transitions are clearer:
async def wait_for_state(current: str, target: str) -> str: if current == target: return current await asyncio.sleep(0.1) return current # Caller decides what to doMistake 3: Queue overkill
# Sometimes a simple list works fineresults = []
async def fetch_sequential(urls): for url in urls: results.append(await fetch(url)) # Sequential, no concurrency return resultsIf there’s no concurrency, you don’t need a queue. Plain data structures work.
A Complete Example: Pipeline Without Synchronization
Here’s a data processing pipeline using message passing:
import asynciofrom dataclasses import dataclassfrom typing import Any
@dataclassclass Message: type: str payload: Any reply_to: asyncio.Queue | None = None
class Stage: """Processing stage with isolated state."""
def __init__(self, name: str, process_fn): self.name = name self.process_fn = process_fn self.queue: asyncio.Queue[Message] = asyncio.Queue() self._processed = 0 # Private state
async def send(self, msg: Message): await self.queue.put(msg)
async def run(self): while True: msg = await self.queue.get() result = await self.process_fn(msg.payload) self._processed += 1
if msg.reply_to: await msg.reply_to.put(result)
async def load_data(path: str) -> list: await asyncio.sleep(0.1) # Simulate I/O return [f"item_{i}" for i in range(5)]
async def process_item(item: str) -> str: await asyncio.sleep(0.05) return item.upper()
async def main(): # Create pipeline stages loader = Stage("loader", load_data) processor = Stage("processor", process_item)
async with asyncio.TaskGroup() as tg: tg.create_task(loader.run()) tg.create_task(processor.run())
# Send data through pipeline reply = asyncio.Queue() await loader.send(Message("load", "data.csv", reply)) items = await reply.get()
# Process each item for item in items: await processor.send(Message("process", item, reply)) result = await reply.get() print(f"Processed: {result}")
asyncio.run(main())Output:
Processed: ITEM_0Processed: ITEM_1Processed: ITEM_2Processed: ITEM_3Processed: ITEM_4No locks, no semaphores, no shared state. Each stage owns its data and communicates through messages.
Key Takeaways
After refactoring my application:
- Shared mutable state is the problem - Synchronization primitives treat symptoms, not the cause
- Single-threaded async changes the rules - Operations between awaits are atomic
- Design patterns matter more than primitives - Immutability, message passing, and structured concurrency eliminate race conditions by design
- Synchronization is for migration - New async code shouldn’t need it
The code became simpler, not more complex. I stopped fighting the event loop and started working with it.
Final Words + More Resources
My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me
Here are also the most important links from this article along with some further resources that will help you in this scope:
Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!
Comments