"""
Storage management
"""
import asyncio
import configparser
import contextlib
import datetime
import hashlib
import logging
import pathlib
from collections import defaultdict
from collections.abc import Generator, Iterable, Sequence
from concurrent.futures import ThreadPoolExecutor
from typing import IO, Any, Protocol
import filelock
import pkg_resources
from packaging.utils import canonicalize_name
from .configuration import BandersnatchConfig
PATH_TYPES = pathlib.Path | str
# The API_REVISION is incremented if the plugin class is modified in a
# backwards incompatible way. In order to prevent loading older
# broken plugins that may be installed and will break due to changes to
# the methods of the classes.
PLUGIN_API_REVISION = 1
STORAGE_PLUGIN_RESOURCE = f"bandersnatch_storage_plugins.v{PLUGIN_API_REVISION}.backend"
loaded_storage_plugins: dict[str, list["Storage"]] = defaultdict(list)
logger = logging.getLogger("bandersnatch")
[docs]
class StorageDirEntry(Protocol):
@property
def name(self) -> str | bytes: ...
@property
def path(self) -> str | bytes: ...
[docs]
def is_dir(self) -> bool: ...
[docs]
def is_file(self) -> bool: ...
[docs]
def is_symlink(self) -> bool: ...
[docs]
class Storage:
"""
Base Storage class
"""
name = "storage"
PATH_BACKEND: type[pathlib.Path] = pathlib.Path
def __init__(
self,
*args: Any,
config: configparser.ConfigParser | None = None,
**kwargs: Any,
) -> None:
self.flock_path: PATH_TYPES = ".lock"
if config is not None:
if isinstance(config, BandersnatchConfig):
config = config.config
self.configuration = config
else:
self.configuration = BandersnatchConfig().config
try:
storage_backend = self.configuration["mirror"]["storage-backend"]
except (KeyError, TypeError):
storage_backend = "filesystem"
if storage_backend != self.name:
return
# register relevant path backends etc
self.initialize_plugin()
try:
self.mirror_base_path = self.PATH_BACKEND(
self.configuration.get("mirror", "directory")
)
except (configparser.NoOptionError, configparser.NoSectionError):
self.mirror_base_path = self.PATH_BACKEND(".")
self.web_base_path = self.mirror_base_path / "web"
self.json_base_path = self.web_base_path / "json"
self.pypi_base_path = self.web_base_path / "pypi"
self.simple_base_path = self.web_base_path / "simple"
self.executor = ThreadPoolExecutor(
max_workers=self.configuration.getint("mirror", "workers")
)
self.loop = asyncio.get_event_loop()
def __str__(self) -> str:
return (
f"{self.__class__.__name__}(name={self.name}, "
f"mirror_base_path={self.mirror_base_path!s})"
)
def __repr__(self) -> str:
return (
f"<{self.__class__.__name__} object: {self.name} @ "
f"{self.mirror_base_path!s}>"
)
def __hash__(self) -> int:
return hash((self.name, str(self.directory), str(self.flock_path)))
@property
def directory(self) -> str:
try:
return self.configuration.get("mirror", "directory")
except (configparser.NoOptionError, configparser.NoSectionError):
return "/srv/pypi"
[docs]
@staticmethod
def canonicalize_package(name: str) -> str:
return str(canonicalize_name(name))
[docs]
def get_lock(self, path: str) -> filelock.BaseFileLock:
"""
Retrieve the appropriate `FileLock` backend for this storage plugin
:param str path: The path to use for locking
:return: A `FileLock` backend for obtaining locks
:rtype: filelock.BaseFileLock
"""
raise NotImplementedError
[docs]
def get_json_paths(self, name: str) -> Sequence[PATH_TYPES]:
canonicalized_name = self.canonicalize_package(name)
paths = [
self.json_base_path / canonicalized_name,
self.pypi_base_path / canonicalized_name,
]
if canonicalized_name != name:
paths.append(self.json_base_path / name)
return paths
[docs]
def initialize_plugin(self) -> None:
"""
Code to initialize the plugin
"""
# The initialize_plugin method is run once to initialize the plugin. This should
# contain all code to set up the plugin.
# This method is not run in the fast path and should be used to do things like
# indexing filter databases, etc that will speed the operation of the filter
# and check_match methods that are called in the fast path.
pass
[docs]
def hash_file(self, path: PATH_TYPES, function: str = "sha256") -> str:
h = getattr(hashlib, function)()
with self.open_file(path, text=False) as f:
while chunk := f.read(8192):
h.update(chunk)
return str(h.hexdigest())
[docs]
def iter_dir(self, path: PATH_TYPES) -> Generator[PATH_TYPES, None, None]:
"""Iterate over the path, returning the sub-paths"""
if not issubclass(type(path), pathlib.Path):
path = self.PATH_BACKEND(str(path))
assert isinstance(path, pathlib.Path)
yield from path.iterdir()
[docs]
@contextlib.contextmanager
def rewrite(
self, filepath: PATH_TYPES, mode: str = "w", **kw: Any
) -> Generator[IO, None, None]:
"""Rewrite an existing file atomically to avoid programs running in
parallel to have race conditions while reading."""
raise NotImplementedError
[docs]
@contextlib.contextmanager
def update_safe(self, filename: PATH_TYPES, **kw: Any) -> Generator[IO, None, None]:
"""Rewrite a file atomically.
Clients are allowed to delete the tmpfile to signal that they don't
want to have it updated.
"""
raise NotImplementedError
[docs]
def find(self, root: PATH_TYPES, dirs: bool = True) -> str:
"""A test helper simulating 'find'.
Iterates over directories and filenames, given as relative paths to the
root.
"""
raise NotImplementedError
[docs]
def compare_files(self, file1: PATH_TYPES, file2: PATH_TYPES) -> bool:
"""
Compare two files and determine whether they contain the same data. Return
True if they match
"""
raise NotImplementedError
[docs]
def write_file(self, path: PATH_TYPES, contents: str | bytes) -> None:
"""Write data to the provided path. If **contents** is a string, the file will
be opened and written in "r" + "utf-8" mode, if bytes are supplied it will be
accessed using "rb" mode (i.e. binary write)."""
raise NotImplementedError
[docs]
@contextlib.contextmanager
def open_file(
self, path: PATH_TYPES, text: bool = True
) -> Generator[IO, None, None]:
"""Yield a file context to iterate over. If text is true, open the file with
'rb' mode specified."""
raise NotImplementedError
[docs]
def read_file(
self,
path: PATH_TYPES,
text: bool = True,
encoding: str = "utf-8",
errors: str | None = None,
) -> str | bytes:
"""Yield a file context to iterate over. If text is true, open the file with
'rb' mode specified."""
raise NotImplementedError
[docs]
def delete(self, path: PATH_TYPES, dry_run: bool = False) -> int:
"""Delete the provided path."""
if not isinstance(path, pathlib.Path):
path = pathlib.Path(path)
log_prefix = "[DRY RUN] " if dry_run else ""
logger.info(f"{log_prefix}Deleting path: {path!s}")
if not dry_run:
if not self.exists(path):
logger.debug(f"{path!s} does not exist. Skipping")
return 0
if self.is_file(path):
return self.delete_file(path, dry_run=dry_run)
else:
return self.rmdir(path, dry_run=dry_run, force=True)
return 0
[docs]
def delete_file(self, path: PATH_TYPES, dry_run: bool = False) -> int:
"""Delete the provided path, recursively if necessary."""
raise NotImplementedError
[docs]
def copy_file(self, source: PATH_TYPES, dest: PATH_TYPES) -> None:
"""Copy a file from **source** to **dest**"""
raise NotImplementedError
[docs]
def move_file(self, source: PATH_TYPES, dest: PATH_TYPES) -> None:
"""Move a file from **source** to **dest**"""
raise NotImplementedError
[docs]
def mkdir(
self, path: PATH_TYPES, exist_ok: bool = False, parents: bool = False
) -> None:
"""Create the provided directory"""
raise NotImplementedError
[docs]
def scandir(self, path: PATH_TYPES) -> Generator[StorageDirEntry, None, None]:
"""Read entries from the provided directory"""
raise NotImplementedError
[docs]
def rmdir(
self,
path: PATH_TYPES,
recurse: bool = False,
force: bool = False,
ignore_errors: bool = False,
dry_run: bool = False,
) -> int:
"""Remove the directory. If recurse is True, allow removing empty children.
If force is true, remove contents destructively."""
raise NotImplementedError
[docs]
def exists(self, path: PATH_TYPES) -> bool:
"""Check whether the provided path exists"""
raise NotImplementedError
[docs]
def is_dir(self, path: PATH_TYPES) -> bool:
"""Check whether the provided path is a directory."""
raise NotImplementedError
[docs]
def is_file(self, path: PATH_TYPES) -> bool:
"""Check whether the provided path is a file."""
raise NotImplementedError
[docs]
def symlink(self, source: PATH_TYPES, dest: PATH_TYPES) -> None:
"""Create a symlink at **dest** that points back at **source**"""
if not issubclass(type(dest), pathlib.Path):
dest = self.PATH_BACKEND(dest)
assert isinstance(dest, pathlib.Path)
dest.symlink_to(source)
[docs]
def get_hash(self, path: PATH_TYPES, function: str = "sha256") -> str:
"""Get the sha256sum of a given **path**"""
raise NotImplementedError
[docs]
def get_file_size(self, path: PATH_TYPES) -> int:
"""Get the size of a given **path** in bytes"""
raise NotImplementedError
[docs]
def get_upload_time(self, path: PATH_TYPES) -> datetime.datetime:
"""Get the upload time of a given **path**"""
raise NotImplementedError
[docs]
def set_upload_time(self, path: PATH_TYPES, time: datetime.datetime) -> None:
"""Set the upload time of a given **path**"""
raise NotImplementedError
[docs]
class StoragePlugin(Storage):
"""
Plugin that provides a storage backend for bandersnatch
"""
name = "storage_plugin"
[docs]
def load_storage_plugins(
entrypoint_group: str,
enabled_plugin: str | None = None,
config: configparser.ConfigParser | None = None,
clear_cache: bool = False,
) -> set[Storage]:
"""
Load all storage plugins that are registered with pkg_resources
Parameters
==========
entrypoint_group: str
The entrypoint group name to load plugins from
enabled_plugin: str
The optional enabled storage plugin to search for
config: configparser.ConfigParser
The optional configparser instance to pass in
clear_cache: bool
Whether to clear the plugin cache
Returns
=======
List of Storage:
A list of objects derived from the Storage class
"""
global loaded_storage_plugins
if config is None:
config = BandersnatchConfig().config
if not enabled_plugin:
try:
enabled_plugin = config["mirror"]["storage-backend"]
logger.info(f"Loading storage plugin: {enabled_plugin}")
except KeyError:
enabled_plugin = "filesystem"
logger.info(
"Failed to find configured storage backend, using default: "
f"{enabled_plugin}"
)
pass
if clear_cache:
loaded_storage_plugins = defaultdict(list)
# If the plugins for the entrypoint_group have been loaded return them
cached_plugins = loaded_storage_plugins.get(entrypoint_group)
if cached_plugins:
return set(cached_plugins)
plugins = set()
for entry_point in pkg_resources.iter_entry_points(group=entrypoint_group):
if entry_point.name == enabled_plugin + "_plugin":
try:
plugin_class = entry_point.load()
plugin_instance = plugin_class(config=config)
plugins.add(plugin_instance)
except ModuleNotFoundError as me:
logger.error(f"Unable to load entry point {entry_point}: {me}")
loaded_storage_plugins[entrypoint_group] = list(plugins)
return plugins
[docs]
def storage_backend_plugins(
backend: str | None = "filesystem",
config: configparser.ConfigParser | None = None,
clear_cache: bool = False,
) -> Iterable[Storage]:
"""
Load and return the release filtering plugin objects
Parameters
==========
backend: str
The optional enabled storage plugin to search for
config: configparser.ConfigParser
The optional configparser instance to pass in
clear_cache: bool
Whether to clear the plugin cache
Returns
-------
list of bandersnatch.storage.Storage:
List of objects derived from the bandersnatch.storage.Storage class
"""
return load_storage_plugins(
STORAGE_PLUGIN_RESOURCE,
enabled_plugin=backend,
config=config,
clear_cache=clear_cache,
)