Skip to content

asyncio.Lock vs threading.Lock: When to Use Each for Python Shared Resources

Problem

When I built an async Python web server, multiple requests needed to update a shared counter. Without proper synchronization, the counter value was wrong after concurrent updates. I tried using threading.Lock to protect the shared resource, but my async server became slow and unresponsive.

Here’s what happened:

Console output showing race condition
Expected counter: 100
Actual counter: 87 # Some updates lost!

Environment

  • Python 3.11
  • asyncio for async server
  • Ubuntu 22.04

What happened?

I wrote code to update a shared counter from multiple concurrent tasks:

race_condition_example.py
import asyncio
class Counter:
def __init__(self):
self.value = 0
async def update_counter(counter):
# Race condition: two tasks can read same value, both increment
current = counter.value
counter.value = current + 1 # Lost updates!
async def test():
counter = Counter()
tasks = [update_counter(counter) for _ in range(100)]
await asyncio.gather(*tasks)
print(f"Expected: 100, Actual: {counter.value}")
asyncio.run(test())

I ran this test:

Run race condition test
python race_condition_example.py

Output:

Race condition output
Expected: 100, Actual: 83

17 updates were lost. Multiple tasks read the same value and wrote the same incremented value, losing some updates.

First attempt: threading.Lock

I added threading.Lock to protect the counter:

threading_lock_example.py
import asyncio
import threading
lock = threading.Lock()
class Counter:
def __init__(self):
self.value = 0
async def update_counter(counter):
with lock: # threading.Lock
current = counter.value
counter.value = current + 1
async def test():
counter = Counter()
tasks = [update_counter(counter) for _ in range(100)]
await asyncio.gather(*tasks)
print(f"Expected: 100, Actual: {counter.value}")
asyncio.run(test())

Result:

Output showing correct count but slow performance
Expected: 100, Actual: 100

The counter was correct, but the async tasks became sequential. Each task blocked while waiting for the lock. The event loop couldn’t run other tasks during the lock wait.

How to solve it?

I switched to asyncio.Lock with async with:

asyncio_lock_example.py
import asyncio
lock = asyncio.Lock()
class Counter:
def __init__(self):
self.value = 0
async def update_counter(counter):
async with lock: # asyncio.Lock - non-blocking!
counter.value += 1
async def test():
counter = Counter()
tasks = [update_counter(counter) for _ in range(100)]
await asyncio.gather(*tasks)
print(f"Expected: 100, Actual: {counter.value}")
asyncio.run(test())

Result:

Output showing correct count with async performance
Expected: 100, Actual: 100

The counter is correct, and the lock doesn’t block the event loop. Tasks waiting for the lock yield control to other tasks.

The reason

I think the key difference is:

  1. threading.Lock blocks the thread - When a task waits for a threading.Lock, the entire thread (including the event loop) stops. No other async tasks can run.

  2. asyncio.Lock yields to the event loop - When a coroutine waits for an asyncio.Lock, it yields control back to the event loop. Other tasks can run while waiting.

Lock behavior comparison
threading.Lock (BAD in async):
┌──────────────────────────────┐
│ Task 1 holds lock │
│ Event loop BLOCKED │
│ Task 2, 3, 4... all frozen │
└──────────────────────────────┘
asyncio.Lock (GOOD in async):
┌────────────┐
│ Task 1 │ ──→ Lock acquired
│ in lock │
└────────────┘
┌────────────┐
│ Task 2 │ ──→ Waiting for lock
│ yields │ Event loop runs other tasks
│ control │
└────────────┘

Use asyncio.Lock for async code. Use threading.Lock only for multi-threaded code.

When to use each lock

Lock TypeUse CaseBlocks Event Loop?
asyncio.LockAsync code (coroutines)No
threading.LockMulti-threaded codeYes
multiprocessing.LockMulti-process codeN/A

For async web servers handling concurrent requests, always use asyncio.Lock:

lock_usage.py
import asyncio
# Create lock at module/class level (not inside function!)
lock = asyncio.Lock()
async def safe_update(shared_resource):
async with lock:
# Safe access - only one coroutine at a time
shared_resource.value += 1

Common mistakes

1. Creating lock inside function:

wrong_lock_location.py
# BAD: New lock every call (doesn't protect anything!)
async def update(counter):
lock = asyncio.Lock() # Each call creates new lock
async with lock:
counter.value += 1 # Not protected!

Create the lock once at module or class level, not inside functions.

2. Using blocking code inside async lock:

blocking_in_lock.py
# BAD: Blocking code inside async lock
async def update():
async with lock:
time.sleep(1) # Blocks event loop even with async lock!
counter.value += 1

Even with asyncio.Lock, blocking operations like time.sleep() freeze the event loop.

3. Mixing lock types:

mixed_locks.py
# BAD: threading.Lock in async code
async def update():
with threading_lock: # Blocks event loop!
await asyncio.sleep(1) # Never runs

Never use threading.Lock inside async functions.

Summary

In this post, I explained when to use asyncio.Lock vs threading.Lock. The key point is: use asyncio.Lock for async code because it yields control to the event loop. threading.Lock blocks the event loop and should only be used in multi-threaded code.

Final Words + More Resources

My intention with this article was to help others share my knowledge and experience. If you want to contact me, you can contact by email: Email me

Here are also the most important links from this article along with some further resources that will help you in this scope:

Oh, and if you found these resources useful, don’t forget to support me by starring the repo on GitHub!

Comments