Source code for bandersnatch_storage_plugins.filesystem

import contextlib
import datetime
import filecmp
import hashlib
import logging
import os
import pathlib
import shutil
import tempfile
from collections.abc import Generator
from typing import IO, Any

import filelock

from bandersnatch.storage import PATH_TYPES, StoragePlugin

logger = logging.getLogger("bandersnatch")


[docs] class FilesystemStorage(StoragePlugin): name = "filesystem" PATH_BACKEND: type[pathlib.Path] = pathlib.Path def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs)
[docs] def get_lock(self, path: str | None = None) -> filelock.FileLock: """ Retrieve the appropriate `FileLock` backend for this storage plugin :param str path: The path to use for locking :return: A `FileLock` backend for obtaining locks :rtype: SwiftFileLock """ if path is None: path = self.mirror_base_path.joinpath(self.flock_path).as_posix() logger.debug(f"Retrieving FileLock instance @ {path}") return filelock.FileLock(path)
[docs] def walk(self, root: PATH_TYPES, dirs: bool = True) -> list[pathlib.Path]: if not isinstance(root, pathlib.Path): root = pathlib.Path(str(root)) results: list[pathlib.Path] = [] for pth in root.iterdir(): if pth.is_dir(): if dirs: results.append(pth) for subpath in self.walk(pth, dirs=dirs): results.append(pth / subpath) else: results.append(pth) return results
[docs] def find(self, root: PATH_TYPES, dirs: bool = True) -> str: """A test helper simulating 'find'. Iterates over directories and filenames, given as relative paths to the root. """ results = self.walk(root, dirs=dirs) results.sort() return "\n".join(str(result.relative_to(root)) for result in results)
[docs] @contextlib.contextmanager def rewrite( self, filepath: PATH_TYPES, mode: str = "w", **kw: Any ) -> Generator[IO, None, None]: """Rewrite an existing file atomically to avoid programs running in parallel to have race conditions while reading.""" # TODO: Account for alternative backends if isinstance(filepath, str): base_dir = os.path.dirname(filepath) filename = os.path.basename(filepath) else: base_dir = str(filepath.parent) filename = filepath.name # Change naming format to be more friendly with distributed POSIX # filesystems like GlusterFS that hash based on filename # GlusterFS ignore '.' at the start of filenames and this avoid rehashing with tempfile.NamedTemporaryFile( mode=mode, prefix=f".{filename}.", delete=False, dir=base_dir, **kw ) as f: filepath_tmp = f.name yield f if not self.exists(filepath_tmp): # Allow our clients to remove the file in case it doesn't want it to be # put in place actually but also doesn't want to error out. return os.chmod(filepath_tmp, 0o100644) logger.debug( f"Writing temporary file {filepath_tmp} to target destination: {filepath!s}" ) self.move_file(filepath_tmp, filepath)
[docs] @contextlib.contextmanager def update_safe(self, filename: PATH_TYPES, **kw: Any) -> Generator[IO, None, None]: """Rewrite a file atomically. Clients are allowed to delete the tmpfile to signal that they don't want to have it updated. """ with tempfile.NamedTemporaryFile( dir=os.path.dirname(filename), delete=False, prefix=f"{os.path.basename(filename)}.", **kw, ) as tf: if self.exists(filename): os.chmod(tf.name, os.stat(filename).st_mode & 0o7777) yield tf if not self.exists(tf.name): return filename_tmp = tf.name if self.exists(filename) and self.compare_files(filename, filename_tmp): logger.debug(f"File not changed...deleting temporary file: {filename_tmp}") os.unlink(filename_tmp) else: logger.debug(f"Modifying destination: {filename!s} with: {filename_tmp}") self.move_file(filename_tmp, filename)
[docs] def compare_files(self, file1: PATH_TYPES, file2: PATH_TYPES) -> bool: """Compare two files, returning true if they are the same and False if not.""" return filecmp.cmp(str(file1), str(file2), shallow=False)
[docs] def copy_file(self, source: PATH_TYPES, dest: PATH_TYPES) -> None: """Copy a file from **source** to **dest**""" if not self.exists(source): raise FileNotFoundError(source) shutil.copy(source, dest) return
[docs] def move_file(self, source: PATH_TYPES, dest: PATH_TYPES) -> None: """Move a file from **source** to **dest**""" if not self.exists(source): raise FileNotFoundError(source) shutil.move(str(source), dest) return
[docs] def write_file(self, path: PATH_TYPES, contents: str | bytes) -> None: """Write data to the provided path. If **contents** is a string, the file will be opened and written in "r" + "utf-8" mode, if bytes are supplied it will be accessed using "rb" mode (i.e. binary write).""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) if isinstance(contents, str): path.write_text(contents) else: path.write_bytes(contents)
[docs] @contextlib.contextmanager def open_file( # noqa self, path: PATH_TYPES, text: bool = True, encoding: str = "utf-8" ) -> Generator[IO, None, None]: """Yield a file context to iterate over. If text is true, open the file with 'rb' mode specified.""" mode = "r" if text else "rb" file_encoding = None if text: file_encoding = encoding if not isinstance(path, pathlib.Path): path = pathlib.Path(path) with path.open(mode=mode, encoding=file_encoding) as fh: yield fh
[docs] def read_file( self, path: PATH_TYPES, text: bool = True, encoding: str = "utf-8", errors: str | None = None, ) -> str | bytes: """Return the contents of the requested file, either a bytestring or a unicode string depending on whether **text** is True""" with self.open_file(path, text=text, encoding=encoding) as fh: contents: str | bytes = fh.read() return contents
[docs] def delete_file(self, path: PATH_TYPES, dry_run: bool = False) -> int: """Delete the provided path, recursively if necessary.""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) log_prefix = "[DRY RUN] " if dry_run else "" logger.info(f"{log_prefix}Removing file: {path!s}") if not dry_run: path.unlink() return 0
[docs] def mkdir( self, path: PATH_TYPES, exist_ok: bool = False, parents: bool = False ) -> None: """Create the provided directory""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) return path.mkdir(exist_ok=exist_ok, parents=parents)
[docs] def scandir(self, path: PATH_TYPES) -> Generator[os.DirEntry, None, None]: """Read entries from the provided directory""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) yield from os.scandir(path)
[docs] def rmdir( self, path: PATH_TYPES, recurse: bool = False, force: bool = False, ignore_errors: bool = False, dry_run: bool = False, ) -> int: """Remove the directory. If recurse is True, allow removing empty children. If force is true, remove contents destructively.""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) log_prefix = "[DRY RUN] " if dry_run else "" if force: logger.info(f"{log_prefix}Forcing removal of files under {path!s}") if not dry_run: shutil.rmtree(path, ignore_errors=ignore_errors) return 0 if recurse: for subdir in path.iterdir(): if not subdir.is_dir(): continue logger.info(f"{log_prefix}Removing directory: {subdir!s}") if not dry_run: rc = self.rmdir( subdir, recurse=recurse, force=force, ignore_errors=ignore_errors, ) if rc != 0: return rc logger.info(f"{log_prefix}Removing directory: {path!s}") if not dry_run: path.rmdir() return 0
[docs] def exists(self, path: PATH_TYPES) -> bool: """Check whether the provided path exists""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) return path.exists()
[docs] def is_dir(self, path: PATH_TYPES) -> bool: """Check whether the provided path is a directory.""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) return path.is_dir()
[docs] def is_file(self, path: PATH_TYPES) -> bool: """Check whether the provided path is a file.""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) return path.is_file()
[docs] def get_hash(self, path: PATH_TYPES, function: str = "sha256") -> str: h = getattr(hashlib, function)() if not isinstance(path, pathlib.Path): path = pathlib.Path(path) logger.debug( f"Opening {path.as_posix()} in binary mode for hash calculation..." ) with open(path.absolute().as_posix(), "rb") as f: for chunk in iter(lambda: f.read(128 * 1024), b""): logger.debug(f"Read chunk: {chunk!s}") h.update(chunk) digest = h.hexdigest() logger.debug(f"Calculated digest: {digest!s}") return str(h.hexdigest())
[docs] def get_file_size(self, path: PATH_TYPES) -> int: """Return the file size of provided path.""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) return path.stat().st_size
[docs] def get_upload_time(self, path: PATH_TYPES) -> datetime.datetime: if not isinstance(path, pathlib.Path): path = pathlib.Path(path) return datetime.datetime.fromtimestamp( path.stat().st_mtime, datetime.timezone.utc )
[docs] def set_upload_time(self, path: PATH_TYPES, time: datetime.datetime) -> None: """Set the upload time of a given **path**""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) ts = time.timestamp() os.utime(path, (ts, ts))