Source code for bandersnatch.utils

import contextlib
import hashlib
import logging
import os
import os.path
import platform
import re
import shutil
import sys
import tempfile
from collections.abc import Generator
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import IO, Any
from urllib.parse import urlparse

import aiohttp
from packaging.tags import INTERPRETER_SHORT_NAMES

from . import __version__

logger = logging.getLogger(__name__)


[docs] def user_agent() -> str: template = "bandersnatch/{version} ({python}, {system})" template += f" (aiohttp {aiohttp.__version__})" version = __version__ python = sys.implementation.name python += " {}.{}.{}-{}{}".format(*sys.version_info) uname = platform.uname() system = " ".join([uname.system, uname.machine]) return template.format(**locals())
SAFE_NAME_REGEX = re.compile(r"[^A-Za-z0-9.]+") USER_AGENT = user_agent() WINDOWS = bool(platform.system() == "Windows")
[docs] class StrEnum(str, Enum): """Enumeration class where members can be treated as strings.""" value: str def __str__(self) -> str: return self.value
[docs] def make_time_stamp() -> str: """Helper function that returns a timestamp suitable for use in a filename on any OS""" return f"{datetime.utcnow().isoformat()}Z".replace(":", "")
[docs] def convert_url_to_path(url: str) -> str: return urlparse(url).path[1:]
[docs] def hash(path: Path, function: str = "sha256") -> str: h = getattr(hashlib, function) content = path.read_bytes() result = h(content).hexdigest() if isinstance(result, str): return result raise TypeError("hashlib did not return str")
[docs] def find(root: Path | str, dirs: bool = True) -> str: """A test helper simulating 'find'. Iterates over directories and filenames, given as relative paths to the root. """ # TODO: account for alternative backends if isinstance(root, str): root = Path(root) results: list[Path] = [] for dirpath, dirnames, filenames in os.walk(root): names = filenames if dirs: names += dirnames for name in names: results.append(Path(dirpath) / name) results.sort() return "\n".join(str(result.relative_to(root)) for result in results)
[docs] @contextlib.contextmanager def rewrite( filepath: str | Path, mode: str = "w", **kw: Any ) -> Generator[IO, None, None]: """Rewrite an existing file atomically to avoid programs running in parallel to have race conditions while reading.""" # TODO: Account for alternative backends if isinstance(filepath, str): base_dir = os.path.dirname(filepath) filename = os.path.basename(filepath) else: base_dir = str(filepath.parent) filename = filepath.name # Change naming format to be more friendly with distributed POSIX # filesystems like GlusterFS that hash based on filename # GlusterFS ignore '.' at the start of filenames and this avoid rehashing with tempfile.NamedTemporaryFile( mode=mode, prefix=f".{filename}.", delete=False, dir=base_dir, **kw ) as f: filepath_tmp = f.name yield f if not os.path.exists(filepath_tmp): # Allow our clients to remove the file in case it doesn't want it to be # put in place actually but also doesn't want to error out. return os.chmod(filepath_tmp, 0o100644) shutil.move(filepath_tmp, filepath)
[docs] def find_all_files(files: set[Path], base_dir: Path) -> None: for f in base_dir.rglob("*"): if not f.is_file(): continue if hasattr(f, "keep_file") and f.name == f.keep_file: continue files.add(f)
[docs] def bandersnatch_safe_name(name: str) -> str: """Convert an arbitrary string to a standard distribution name Any runs of non-alphanumeric/. characters are replaced with a single '-'. - This was copied from `pkg_resources` (part of `setuptools`) bandersnatch also lower cases the returned name """ return SAFE_NAME_REGEX.sub("-", name).lower()
# From https://peps.python.org/pep-0616/
[docs] def removeprefix(original: str, prefix: str) -> str: """Return a string with the given prefix string removed if present. If the string starts with the prefix string, return string[len(prefix):]. Otherwise, return the original string. Args: original (str): string to remove the prefix (e.g. 'py3.6') prefix (str): the prefix to remove (e.g. 'py') Returns: str: either the modified or the original string (e.g. '3.6') """ if original.startswith(prefix): prefix_len = len(prefix) mod_str = original[prefix_len:] return mod_str else: return original
# Python tags https://peps.python.org/pep-0425/#python-tag
[docs] def parse_version(version: str) -> list[str]: """Converts a version string to a list of strings to check the 1st part of build tags. See PEP 425 (https://peps.python.org/pep-0425/#python-tag) for details. Args: version (str): string in the form of '{major}.{minor}' e.g. '3.6' Returns: List[str]: list of 1st element strings from build tag tuples See https://peps.python.org/pep-0425/#python-tag for details. Some Windows binaries have only the 1st part before the file extension. e.g. ['-cp36-', '-pp36-', '-ip36-', '-jy36-', '-py3.6-', '-py3.6.'] """ _versions: list[str] = [] _version_with_dot = removeprefix(version.lower(), "py") _version_without_dot = _version_with_dot.replace(".", "") interpreters = list(INTERPRETER_SHORT_NAMES.values()) interpreters.remove("py") tag_separator1 = "-" tag_separator2 = "." _versions.extend( [ tag_separator1 + i + _version_without_dot + tag_separator1 for i in interpreters ] ) _versions.extend( [ tag_separator1 + INTERPRETER_SHORT_NAMES.get("python") + _version_with_dot + tag_separator1 ] ) _versions.extend( [ tag_separator1 + INTERPRETER_SHORT_NAMES.get("python") + _version_with_dot + tag_separator2 ] ) return _versions