Source code for bandersnatch.utils

import contextlib
import hashlib
import logging
import os
import os.path
import platform
import re
import shutil
import sys
import tempfile
from datetime import datetime
from pathlib import Path
from typing import IO, Any, Generator, List, Set, Union
from urllib.parse import urlparse

import aiohttp

from . import __version__

logger = logging.getLogger(__name__)


[docs]def user_agent() -> str: template = "bandersnatch/{version} ({python}, {system})" template += f" (aiohttp {aiohttp.__version__})" version = __version__ python = sys.implementation.name python += " {}.{}.{}-{}{}".format(*sys.version_info) uname = platform.uname() system = " ".join([uname.system, uname.machine]) return template.format(**locals())
SAFE_NAME_REGEX = re.compile(r"[^A-Za-z0-9.]+") USER_AGENT = user_agent() WINDOWS = bool(platform.system() == "Windows")
[docs]def make_time_stamp() -> str: """Helper function that returns a timestamp suitable for use in a filename on any OS""" return f"{datetime.utcnow().isoformat()}Z".replace(":", "")
[docs]def convert_url_to_path(url: str) -> str: return urlparse(url).path[1:]
[docs]def hash(path: Path, function: str = "sha256") -> str: h = getattr(hashlib, function)() with open(path, "rb") as f: while True: chunk = f.read(128 * 1024) if not chunk: break h.update(chunk) return str(h.hexdigest())
[docs]def find(root: Union[Path, str], dirs: bool = True) -> str: """A test helper simulating 'find'. Iterates over directories and filenames, given as relative paths to the root. """ # TODO: account for alternative backends if isinstance(root, str): root = Path(root) results: List[Path] = [] for dirpath, dirnames, filenames in os.walk(root): names = filenames if dirs: names += dirnames for name in names: results.append(Path(dirpath) / name) results.sort() return "\n".join(str(result.relative_to(root)) for result in results)
[docs]@contextlib.contextmanager def rewrite( filepath: Union[str, Path], mode: str = "w", **kw: Any ) -> Generator[IO, None, None]: """Rewrite an existing file atomically to avoid programs running in parallel to have race conditions while reading.""" # TODO: Account for alternative backends if isinstance(filepath, str): base_dir = os.path.dirname(filepath) filename = os.path.basename(filepath) else: base_dir = str(filepath.parent) filename = filepath.name # Change naming format to be more friendly with distributed POSIX # filesystems like GlusterFS that hash based on filename # GlusterFS ignore '.' at the start of filenames and this avoid rehashing with tempfile.NamedTemporaryFile( mode=mode, prefix=f".{filename}.", delete=False, dir=base_dir, **kw ) as f: filepath_tmp = f.name yield f if not os.path.exists(filepath_tmp): # Allow our clients to remove the file in case it doesn't want it to be # put in place actually but also doesn't want to error out. return os.chmod(filepath_tmp, 0o100644) shutil.move(filepath_tmp, filepath)
[docs]def recursive_find_files(files: Set[Path], base_dir: Path) -> None: dirs = [d for d in base_dir.iterdir() if d.is_dir()] files.update([x for x in base_dir.iterdir() if x.is_file()]) for directory in dirs: recursive_find_files(files, directory)
[docs]def bandersnatch_safe_name(name: str) -> str: """Convert an arbitrary string to a standard distribution name Any runs of non-alphanumeric/. characters are replaced with a single '-'. - This was copied from `pkg_resources` (part of `setuptools`) bandersnatch also lower cases the returned name """ return SAFE_NAME_REGEX.sub("-", name).lower()