Source code for command_line_assistant.utils.files

"""Utilitary module to handle file operations"""

import errno
import fcntl
import logging
import mimetypes
import os
import stat
import tempfile
from io import TextIOWrapper
from pathlib import Path
from typing import Optional, Union

from command_line_assistant.utils.environment import get_xdg_state_path

logger = logging.getLogger(__name__)


[docs] def guess_mimetype(attachment: Optional[TextIOWrapper]) -> str: """Guess the mimetype of a given attachment. Arguments: attachment (Optional[TextIOWrapper]): The attachment to be checked for mimetype. Returns: str: The guessed mimetype or "unknown/unknown" if not found. """ unknown_mimetype = "unknown/unknown" if not attachment: return unknown_mimetype path = Path(attachment.name) result = mimetypes.guess_type(path) mimetype = result[0] if not mimetype: return unknown_mimetype return mimetype
[docs] def create_folder(path: Path, parents: bool = False, mode: int = 0o700) -> None: """Try to create a new folder under the specified directory. Notes: If the folder already exists, we will skip the directory creation and log an info message. That "check" is done by catching the `py:FileExistsError` and `py:FileNotFoundError` exceptions. Arguments: path (Path): The path of the folder that needs to be created. parents (bool): If it should create the parents folder from the given path. mode (int): The permissions of the given folder. Defaults to 0700. """ try: if not path.exists(): logger.debug( "Directory %s does not exist. Creating it with permissions %s", path, mode, ) path.mkdir(mode=mode, parents=parents) except (FileExistsError, FileNotFoundError) as e: logger.info( "Skipping directory creation at '%s' as we found an exception %s. It's possible that the folder already exists or is a race condition.", path, str(e), )
[docs] def write_file(contents: Union[str, bytes], path: Path, mode: int = 0o600) -> None: """Try to write contents to a given file Notes: If the folder already exists, we will skip the file creation and log an info message. That "check" is done by catching the `py:FileExistsError` and `py:FileNotFoundError` exceptions. Arguments: contents (Union[str, bytes]): The contents that will be written to the file. Either bytes or str are expected. path (Path): The path of the file that needs to be created. mode (int): The permissions of the given file. Defaults to 0600. """ try: logger.debug( "File %s does not exist. Creating it with permissions %s", path, mode ) if isinstance(contents, bytes): path.write_bytes(contents) else: path.write_text(contents) path.chmod(mode) except (FileExistsError, FileNotFoundError) as e: logger.info( "Skipping file creation at '%s' as we found an exception %s. It's possible that the file already exists or is a race condition.", path, str(e), )
[docs] class NamedFileLock: """File-based lock mechanism for general locking purposes. This class provides a way to track state across processes using a lock file. The lock file contains the PID of the capturing process. Usage: with NamedFileLock(name="my_file") as lock: # Execute your operations # Lock is automatically released """ def __init__(self, name: str) -> None: """Initialize the named file lock mechanism.""" self._pid: int = os.getpid() self._name: str = name.replace("-", "_") self._lock_file: Path = Path(get_xdg_state_path(), name + ".lock") @property def pid(self) -> int: """Return the parent pid.""" return self._pid @pid.setter def pid(self, value: int) -> None: """Set the value for the internal _pid property.""" self._pid = value @property def is_locked(self) -> bool: """Check if a lock is currently active. This checks if the lock file exists and if the PID in it is still running. Returns: bool: True if a lock is active, False otherwise """ if not self._lock_file.exists(): return False try: pid = int(self._lock_file.read_text().strip()) # Check if process is still running. Sending signal 0 to a process # will raise an OSError if no process with that pid is running. os.kill(pid, 0) return True except (ValueError, FileNotFoundError): # Clean up stale lock file self._lock_file.unlink(missing_ok=True) return False
[docs] def acquire(self) -> None: """Acquire the lock. Raises: RuntimeError: If a lock is already active """ if self.is_locked: raise RuntimeError( "A lock is already active in another process. " "Please, remove the lock before trying to acquire again." ) create_folder(self._lock_file.parent, parents=True) with tempfile.NamedTemporaryFile( mode="w", suffix=".pid", prefix=self._name, dir=self._lock_file.parent ) as handler: # Write current process ID handler.write(str(os.getpid())) handler.flush() # stat.S_IRUSR = Owner has read permission # stat.S_IWUSR = Owner has write permission os.chmod(handler.name, stat.S_IRUSR | stat.S_IWUSR) # Use fcntl for file locking fcntl.flock(handler.file.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) try: os.link(handler.name, self._lock_file) except FileExistsError as e: if e.errno != errno.EEXIST: raise
[docs] def release(self) -> None: """Release the terminal lock.""" self._lock_file.unlink(missing_ok=True)
def __enter__(self) -> "NamedFileLock": """Context manager entry. Returns: NamedFileLock: Instance of itself """ self.acquire() return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """Context manager exit.""" self.release()