import contextlib import inspect import logging import os import time import warnings from abc import ABCMeta, abstractmethod from dataclasses import dataclass from threading import local from typing import TYPE_CHECKING, Any, cast from weakref import WeakValueDictionary
from ._error import Timeout
if TYPE_CHECKING: import sys from types import TracebackType
if sys.version_info >= (3, 11): # pragma: no cover (py311+) from typing import Self else: # pragma: no cover (<py311) from typing_extensions import Self
_LOGGER = logging.getLogger("filelock")
# This is a helper class which is returned by :meth:`BaseFileLock.acquire` and wraps the lock to make sure __enter__ # is not called twice when entering the with statement. If we would simply return *self*, the lock would be acquired # again in the *__enter__* method of the BaseFileLock, but not released again automatically. issue #37 (memory leak) class AcquireReturnProxy: """A context-aware object that will release the lock file when exiting."""
@dataclass class FileLockContext: """A dataclass which holds the context for a ``BaseFileLock`` object."""
# The context is held in a separate class to allow optional use of thread local storage via the # ThreadLocalFileContext class.
#: The path to the lock file.
lock_file: str
#: The default timeout value.
timeout: float
#: The mode for the lock files
mode: int
#: Whether the lock should be blocking or not
blocking: bool
#: The file descriptor for the *_lock_file* as it is returned by the os.open() function, not None when lock held
lock_file_fd: int | None = None
#: The lock counter is used for implementing the nested locking mechanism.
lock_counter: int = 0# When the lock is acquired is increased and the lock is only released, when this value is 0
class ThreadLocalFileContext(FileLockContext, local): """A thread local version of the ``FileLockContext`` class."""
non_matching_params = {
name: (passed_param, set_param) for name, (passed_param, set_param) in params_to_check.items() if passed_param != set_param
} ifnot non_matching_params: return cast(BaseFileLock, instance)
# parameters do not match; raise error
msg = "Singleton lock instances cannot be initialized with differing arguments"
msg += "\nNon-matching arguments: " for param_name, (passed_param, set_param) in non_matching_params.items():
msg += f"\n\t{param_name} (existing lock has {set_param} but {passed_param} was passed)" raise ValueError(msg)
# Workaround to make `__init__`'s params optional in subclasses # E.g. virtualenv changes the signature of the `__init__` method in the `BaseFileLock` class descendant # (https://github.com/tox-dev/filelock/pull/340)
present_params = inspect.signature(cls.__init__).parameters # type: ignore[misc]
init_params = {key: value for key, value in all_params.items() if key in present_params}
:param lock_file: path to the file
:param timeout: default timeout when acquiring the lock, in seconds. It will be used as fallback value in \
the acquire method, if no timeout value (``None``) is given. If you want to disable the timeout, set it \
to a negative value. A timeout of 0 means that there is exactly one attempt to acquire the file lock.
:param mode: file permissions for the lockfile
:param thread_local: Whether this object's internal context should be thread local or not. If this is set to \
``False`` then the lock will be reentrant across threads.
:param blocking: whether the lock should be blocking ornot
:param is_singleton: If this is set to ``True`` then only one instance of this class will be created \
per lock file. This is useful if you want to use the lock object for reentrant locking without needing \
to pass the same object around.
# Create the context. Note that external code should not work with the context directly and should instead use # properties of this class.
kwargs: dict[str, Any] = { "lock_file": os.fspath(lock_file), "timeout": timeout, "mode": mode, "blocking": blocking,
}
self._context: FileLockContext = (ThreadLocalFileContext if thread_local else FileLockContext)(**kwargs)
def is_thread_local(self) -> bool: """:return: a flag indicating if this lock is thread local or not""" return self._is_thread_local
@property def is_singleton(self) -> bool: """:return: a flag indicating if this lock is singleton or not""" return self._is_singleton
@property def lock_file(self) -> str: """:return: path to the lock file""" return self._context.lock_file
@property def timeout(self) -> float: """
:return: the default timeout value, in seconds
@property def mode(self) -> int: """:return: the file permissions for the lockfile""" return self._context.mode
@abstractmethod def _acquire(self) -> None: """If the file lock could be acquired, self._context.lock_file_fd holds the file descriptor of the lock file.""" raise NotImplementedError
@abstractmethod def _release(self) -> None: """Releases the lock and sets self._context.lock_file_fd to None.""" raise NotImplementedError
@property def is_locked(self) -> bool: """
:return: A boolean indicating if the lock file is holding the lock currently.
.. versionchanged:: 2.0.0
This was previously a method andis now a property. """ return self._context.lock_file_fd isnotNone
@property def lock_counter(self) -> int: """:return: The number of times this lock has been acquired (but not yet released).""" return self._context.lock_counter
:param timeout: maximum wait time for acquiring the lock, ``None`` means use the default :attr:`~timeout` isand if ``timeout < 0``, there is no timeout and this method will block until the lock could be acquired
:param poll_interval: interval of trying to acquire the lock file
:param poll_intervall: deprecated, kept for backwards compatibility, use ``poll_interval`` instead
:param blocking: defaults to True. IfFalse, function will return immediately if it cannot obtain a lock on the
first attempt. Otherwise, this method will block until the timeout expires or the lock is acquired.
:raises Timeout: if fails to acquire lock within the timeout period
:return: a context object that will unlock the file when the context is exited
.. code-block:: python
# You can use this method in the context manager (recommended) with lock.acquire(): pass
# Or use an equivalent try-finally construct:
lock.acquire() try: pass finally:
lock.release()
.. versionchanged:: 2.0.0
This method returns now a *proxy* object instead of *self*,
so that it can be used in a with statement without side effects.
""" # Use the default timeout, if no timeout is provided. if timeout isNone:
timeout = self._context.timeout
if blocking isNone:
blocking = self._context.blocking
if poll_intervall isnotNone:
msg = "use poll_interval instead of poll_intervall"
warnings.warn(msg, DeprecationWarning, stacklevel=2)
poll_interval = poll_intervall
# Increment the number right at the beginning. We can still undo it, if something fails.
self._context.lock_counter += 1
lock_id = id(self)
lock_filename = self.lock_file
start_time = time.perf_counter() try: whileTrue: ifnot self.is_locked:
_LOGGER.debug("Attempting to acquire lock %s on %s", lock_id, lock_filename)
self._acquire() if self.is_locked:
_LOGGER.debug("Lock %s acquired on %s", lock_id, lock_filename) break if blocking isFalse:
_LOGGER.debug("Failed to immediately acquire lock %s on %s", lock_id, lock_filename) raise Timeout(lock_filename) # noqa: TRY301 if0 <= timeout < time.perf_counter() - start_time:
_LOGGER.debug("Timeout on acquiring lock %s on %s", lock_id, lock_filename) raise Timeout(lock_filename) # noqa: TRY301
msg = "Lock %s not acquired on %s, waiting %s seconds ..."
_LOGGER.debug(msg, lock_id, lock_filename, poll_interval)
time.sleep(poll_interval) except BaseException: # Something did go wrong, so decrement the counter.
self._context.lock_counter = max(0, self._context.lock_counter - 1) raise return AcquireReturnProxy(lock=self)
def release(self, force: bool = False) -> None: # noqa: FBT001, FBT002 """
Releases the file lock. Please note, that the lock is only completely released, if the lock counter is0.
Also note, that the lock file itself isnot automatically deleted.
:param force: Iftrue, the lock counter is ignored and the lock is released in every case/
""" if self.is_locked:
self._context.lock_counter -= 1
if self._context.lock_counter == 0or force:
lock_id, lock_filename = id(self), self.lock_file
_LOGGER.debug("Attempting to release lock %s on %s", lock_id, lock_filename)
self._release()
self._context.lock_counter = 0
_LOGGER.debug("Lock %s released on %s", lock_id, lock_filename)
def __enter__(self) -> Self: """
Acquire the lock.
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.