It adds several useful features to Python’s standard synchronization primitives, including lock protocols and enhanced lock implementations.
Install locklib with pip:
pip install locklib... or directly from the Git repository:
pip install git+https://github.com/mutating/locklib.gitYou can also use instld to quickly try out this package and others without installing them.
Protocols let you write type-annotated code without depending on concrete classes. The protocols in this library let you treat lock implementations from the standard library, third-party packages, and this library uniformly.
At a minimum, a lock object should provide two methods:
def acquire(self) -> None: ...
def release(self) -> None: ...All standard library locks conform to this, as do the locks provided by this library.
To check for compliance with this minimum standard, locklib contains the LockProtocol. You can verify that all of these locks satisfy it:
from multiprocessing import Lock as MLock
from threading import Lock as TLock, RLock as TRLock
from asyncio import Lock as ALock
from locklib import SmartLock, LockProtocol
print(isinstance(MLock(), LockProtocol)) # True
print(isinstance(TLock(), LockProtocol)) # True
print(isinstance(TRLock(), LockProtocol)) # True
print(isinstance(ALock(), LockProtocol)) # True
print(isinstance(SmartLock(), LockProtocol)) # TrueHowever, most idiomatic Python code uses locks as context managers. If your code does too, you can use one of the two protocols derived from the base LockProtocol: ContextLockProtocol or AsyncContextLockProtocol. Thus, the protocol hierarchy looks like this:
LockProtocol
├── ContextLockProtocol
└── AsyncContextLockProtocol
ContextLockProtocol describes objects that satisfy LockProtocol and also implement the context manager protocol. Similarly,AsyncContextLockProtocol describes objects that satisfy LockProtocol and implement the asynchronous context manager protocol.
Almost all standard library locks, as well as SmartLock, satisfy ContextLockProtocol:
from multiprocessing import Lock as MLock
from threading import Lock as TLock, RLock as TRLock
from locklib import SmartLock, ContextLockProtocol
print(isinstance(MLock(), ContextLockProtocol)) # True
print(isinstance(TLock(), ContextLockProtocol)) # True
print(isinstance(TRLock(), ContextLockProtocol)) # True
print(isinstance(SmartLock(), ContextLockProtocol)) # TrueHowever, the asyncio.Lock belongs to a separate category and AsyncContextLockProtocol is needed to describe it:
from asyncio import Lock
from locklib import AsyncContextLockProtocol
print(isinstance(Lock(), AsyncContextLockProtocol)) # TrueIf you use type hints and static verification tools like mypy, we highly recommend using the narrowest applicable protocol for your use case.
locklib includes a lock that prevents deadlocks — SmartLock, based on Wait-for Graph. You can use it like a regular Lock from the standard library. Let’s verify that it prevents race conditions in the same way:
from threading import Thread
from locklib import SmartLock
lock = SmartLock()
counter = 0
def function():
global counter
for _ in range(1000):
with lock:
counter += 1
thread_1 = Thread(target=function)
thread_2 = Thread(target=function)
thread_1.start()
thread_2.start()
assert counter == 2000As expected, this lock prevents race conditions just like the standard Lock. Now let’s deliberately trigger a deadlock and see what happens:
from threading import Thread
from locklib import SmartLock
lock_1 = SmartLock()
lock_2 = SmartLock()
def function_1():
while True:
with lock_1:
with lock_2:
pass
def function_2():
while True:
with lock_2:
with lock_1:
pass
thread_1 = Thread(target=function_1)
thread_2 = Thread(target=function_2)
thread_1.start()
thread_2.start()This raises an exception like the following:
...
locklib.errors.DeadLockError: A cycle between 1970256th and 1970257th threads has been detected.
So, with this lock, a deadlock results in an exception instead of blocking forever.
If you want to catch this exception, you can also import it from locklib:
from locklib import DeadLockErrorSometimes, when testing code, you may need to detect whether some action occurs while the lock is held. How can you do this with minimal boilerplate? Use LockTraceWrapper. It is a wrapper around a regular lock that records every acquisition and release. At the same time, it fully preserves the wrapped lock’s behavior.
Creating such a wrapper is easy. Just pass any lock to the constructor:
from threading import Lock
from locklib import LockTraceWrapper
lock = LockTraceWrapper(Lock())You can use it exactly like the wrapped lock:
with lock:
...Anywhere in your program, you can record that a specific event occurred:
lock.notify('event_name')You can then easily check whether an event with this identifier ever occurred outside the lock. To do this, use the was_event_locked method:
lock.was_event_locked('event_name')If the notify method was called with the same parameter only while the lock was held, it will return True. If not, that is, if there was at least one case when the notify method was called with that identifier without the lock being held, False will be returned.
How does it work? It uses a modified balanced-parentheses algorithm. For each thread for which any events were registered (taking the mutex, releasing the mutex, and also calling the notify method), the check takes place separately, that is, we determine that it was the same thread that held the mutex when notify was called, and not some other one.
⚠️ The thread id is used to identify the threads. A thread ID may be reused after a thread exits, which may in some cases cause the wrapper to incorrectly report that an operation was protected by the lock. Make sure this cannot happen during your tests.
If no event with the specified identifier was recorded in any thread, the ThereWasNoSuchEventError exception will be raised by default. If you want to disable this so that the method simply returns False in such situations, pass the keyword argument raise_exception=False to was_event_locked.