Source code for bandersnatch_storage_plugins.swift

import atexit
import base64
import configparser
import contextlib
import datetime
import hashlib
import io
import logging
import os
import pathlib
import re
import shutil
import sys
import tempfile
from collections.abc import Generator, Sequence
from typing import IO, Any, NoReturn, Optional

import filelock
import keystoneauth1
import keystoneauth1.exceptions.catalog
import keystoneauth1.identity
import swiftclient.client
import swiftclient.exceptions

from bandersnatch.storage import PATH_TYPES, StoragePlugin

logger = logging.getLogger("bandersnatch")


# See https://stackoverflow.com/a/8571649 for explanation
BASE64_RE = re.compile(b"^([A-Za-z0-9+/]{4})*([A-Za-z0-9+/]{3}=|[A-Za-z0-9+/]{2}==)?$")


[docs] class SwiftFileLock(filelock.BaseFileLock): """ Simply watches the existence of the lock file. """ def __init__( self, lock_file: str, timeout: int = -1, backend: Optional["SwiftStorage"] = None, ) -> None: # The path to the lock file. self.backend: Optional["SwiftStorage"] = backend self._lock_file_fd: Optional["SwiftPath"] super().__init__(lock_file, timeout=timeout) @property def path_backend(self) -> type["SwiftPath"]: if self.backend is not None: return self.backend.PATH_BACKEND raise RuntimeError("Failed to retrieve swift backend") def _acquire(self) -> None: try: logger.info("Attempting to acquire lock") fd: "SwiftPath" = self.path_backend(self.lock_file) fd.write_bytes(b"") except OSError as exc: logger.error("Failed to acquire lock...") logger.exception("Exception: ", exc) pass else: logger.info(f"Acquired lock: {self.lock_file}") self._lock_file_fd = fd return None def _release(self) -> None: self._lock_file_fd = None try: logger.info(f"Removing lock: {self.lock_file}") self.path_backend(self.lock_file).unlink() except OSError as exc: logger.error("Failed to remove lockfile") logger.exception("Exception: ", exc) pass else: logger.info("Successfully cleaned up lock") return None @property def is_locked(self) -> bool: return self.path_backend(self.lock_file).exists()
[docs] class SwiftDirEntry: def __init__(self, entry: dict): if "subdir" in entry: self.path = str(entry["subdir"]) else: self.path = str(entry["name"]) self.name = self.path.rstrip("/").split("/")[-1] self._entry = entry
[docs] def is_dir(self) -> bool: return "subdir" in self._entry
[docs] def is_file(self) -> bool: return not self.is_dir()
# TODO: Refactor this out into reusable base class? class _SwiftAccessor: BACKEND: "SwiftStorage" @classmethod def register_backend(cls, backend: "SwiftStorage") -> None: cls.BACKEND = backend return @staticmethod def stat(target: str) -> NoReturn: raise NotImplementedError("stat() not available on this system") @staticmethod def lstat(target: str) -> NoReturn: raise NotImplementedError("lstat() not available on this system") @staticmethod def open(*args: Any, **kwargs: Any) -> IO: context: contextlib.ExitStack = contextlib.ExitStack() fh: IO = context.enter_context( _SwiftAccessor.BACKEND.open_file(*args, **kwargs) ) atexit.register(context.close) return fh @staticmethod def listdir(target: str) -> list[str]: results: list[str] = [] if not target.endswith("/"): target = f"{target}/" with _SwiftAccessor.BACKEND.connection() as conn: paths = conn.get_container( _SwiftAccessor.BACKEND.default_container, prefix=target, delimiter="/" ) for p in paths: if "subdir" in p: result = p["subdir"] if not str(result).endswith("/"): result = type(result)(f"{p['subdir']!s}/") else: result = p["name"] results.append(result) return results @staticmethod def scandir(target: str) -> Generator[SwiftDirEntry, None, None]: if not target.endswith("/"): target = f"{target}/" with _SwiftAccessor.BACKEND.connection() as conn: paths = conn.get_container( _SwiftAccessor.BACKEND.default_container, prefix=target, delimiter="/" ) for p in paths: yield SwiftDirEntry(p) @staticmethod def chmod(target: str) -> NoReturn: raise NotImplementedError("chmod() is not available on this platform") def lchmod(self, pathobj: str, mode: int) -> NoReturn: raise NotImplementedError("lchmod() not available on this system") @staticmethod def mkdir(*args: Any, **kwargs: Any) -> None: return _SwiftAccessor.BACKEND.mkdir(*args, **kwargs) @staticmethod def unlink(*args: Any, **kwargs: Any) -> None: missing_ok = kwargs.pop("missing_ok", False) try: _SwiftAccessor.BACKEND.delete_file(*args, **kwargs) except OSError as exc: if not missing_ok: logger.exception("Failed to delete non-existent file...", exc) pass return None @staticmethod def link(*args: Any, **kwargs: Any) -> None: return _SwiftAccessor.BACKEND.copy_file(*args, **kwargs) @staticmethod def rmdir(*args: Any, **kwargs: Any) -> None: try: _SwiftAccessor.BACKEND.rmdir(*args, **kwargs) except OSError: pass return None @staticmethod def rename(*args: Any, **kwargs: Any) -> None: return _SwiftAccessor.BACKEND.copy_file(*args, **kwargs) @staticmethod def replace(*args: Any, **kwargs: Any) -> None: return _SwiftAccessor.BACKEND.copy_file(*args, **kwargs) @staticmethod def symlink( a: PATH_TYPES, b: PATH_TYPES, target_is_directory: bool = False, src_container: str | None = None, src_account: str | None = None, ) -> None: return _SwiftAccessor.BACKEND.symlink( a, b, src_container=src_container, src_account=src_account ) @staticmethod def utime(target: str) -> None: return _SwiftAccessor.BACKEND.update_timestamp(target) # Helper for resolve() def readlink(self, path: str) -> str: return str(path) _swift_accessor: type[_SwiftAccessor]
[docs] class SwiftPath(pathlib.Path): _flavour = getattr(pathlib, "_posix_flavour") # noqa BACKEND: "SwiftStorage" __slots__ = ( "_drv", "_root", "_parts", "_str", "_hash", "_pparts", "_cached_cparts", "_accessor", "_closed", ) def __new__(cls, *args: Any) -> "SwiftPath": self = cls._from_parts(args, init=False) self._init() return self def _init(self) -> None: self._accessor = _SwiftAccessor def __str__(self) -> str: """Return the string representation of the path, suitable for passing to system calls.""" try: return self._str # type: ignore except AttributeError: self._str = ( self._format_parsed_parts( # type: ignore self._drv, # type: ignore self._root, # type: ignore self._parts, # type: ignore ) or "." ) return self._str def __fspath__(self) -> str: return str(self) def __bytes__(self) -> bytes: """Return the bytes representation of the path. This is only recommended to use under Unix.""" return os.fsencode(str(self)) def __repr__(self) -> str: return f"{self.__class__.__name__}({self.as_posix()!r})" def _make_child_relpath(self, part: str) -> "SwiftPath": # This is an optimization used for dir walking. `part` must be # a single part relative to this path. parts = self._parts + [os.path.relpath(part, start=str(self))] # type: ignore return self._from_parsed_parts(self._drv, self._root, parts) # type: ignore @classmethod def _parse_args(cls, args: Sequence[str]) -> tuple[str | None, str, list[str]]: # This is useful when you don't want to create an instance, just # canonicalize some constructor arguments. parts: list[str] = [] for a in args: a = os.fspath(a) if isinstance(a, str): # Force-cast str subclasses to str (issue #21127) parts.append(str(a)) else: raise TypeError( "argument should be a str object or an os.PathLike " f"object returning str, not {type(a)}" ) # Modification to prevent us from starting swift paths with "/" if parts[0].startswith("/"): parts[0] = parts[0].lstrip("/") return cls._flavour.parse_parts(parts) # type: ignore @classmethod def _from_parts(cls, args: Sequence[str], init: bool = True) -> "SwiftPath": # We need to call _parse_args on the instance, so as to get the # right flavour. self = object.__new__(cls) drv, root, parts = self._parse_args(args) self._drv = drv # type: ignore self._root = root # type: ignore self._parts = parts # type: ignore if init: self._init() return self @classmethod def _from_parsed_parts( cls, drv: str | None, root: str, parts: list[str], init: bool = True ) -> "SwiftPath": self = object.__new__(cls) self._drv = drv # type: ignore self._root = root # type: ignore self._parts = parts # type: ignore if init: self._init() return self
[docs] @classmethod def register_backend(cls, backend: "SwiftStorage") -> None: cls.BACKEND = backend return
@property def backend(self) -> "SwiftStorage": assert self.BACKEND is not None return self.BACKEND
[docs] def absolute(self) -> "SwiftPath": return self
[docs] def touch(self) -> None: # type: ignore[override] self.write_bytes(b"")
[docs] def is_dir(self) -> bool: target_path = str(self) if ( target_path and target_path != "." and not target_path.endswith(self._flavour.sep) ): target_path = f"{target_path}{self._flavour.sep}" files: list[str] = [] with self.backend.connection() as conn: try: files = conn.get_container( self.backend.default_container, prefix=target_path ) except swiftclient.exceptions.ClientException: return False return bool(files)
[docs] def is_file(self) -> bool: return self.backend.is_file(str(self))
[docs] def exists(self) -> bool: return self.backend.exists(str(self))
[docs] def mkdir( self, mode: int = 511, parents: bool = False, exist_ok: bool = False ) -> None: return self.backend.mkdir(str(self), exist_ok=exist_ok, parents=parents)
[docs] def read_text(self, encoding: str | None = None, errors: str | None = None) -> str: kwargs = {} if encoding is not None: kwargs["encoding"] = encoding if errors is not None: kwargs["errors"] = errors result = self.backend.read_file(str(self), text=True, **kwargs) assert isinstance(result, str) return result
[docs] def write_text( self, data: str, encoding: str | None = "utf-8", errors: str | None = None, newline: str | None = None, ) -> int: self.backend.write_file( str(self), contents=data, encoding=encoding, errors=errors ) return 0
# Different signature than Path.symlink_to # Different signature than Path.write_bytes
[docs] def write_bytes( # type: ignore self, contents: bytes, encoding: str | None = "utf-8", errors: str | None = None, ) -> int: self.backend.write_file( str(self), contents=contents, encoding=encoding, errors=errors ) return 0
[docs] def read_bytes(self) -> bytes: result = self.backend.read_file(str(self), text=False) assert isinstance(result, bytes) return result
[docs] def iterdir( self, conn: swiftclient.client.Connection | None = None, recurse: bool = False, include_swiftkeep: bool = False, ) -> Generator["SwiftPath", None, None]: """Iterate over the files in this directory. Does not yield any result for the special paths '.' and '..'. """ for name in self._accessor.listdir(str(self)): if name in {".", ".."} or name == ".swiftkeep" and not include_swiftkeep: # Yielding a path object for these makes little sense continue path = self._make_child_relpath(name) if not recurse or not str(name).endswith("/") or not path.is_dir(): yield path else: yield path yield from path.iterdir(conn=conn, recurse=recurse)
[docs] class SwiftStorage(StoragePlugin): name = "swift" PATH_BACKEND = SwiftPath @property def directory(self) -> str: try: return self.configuration.get("mirror", "directory") except configparser.NoOptionError: return "srv/pypi"
[docs] def get_config_value( self, config_key: str, *env_keys: Any, default: str | None = None ) -> str | None: value = None try: value = self.configuration["swift"][config_key] except KeyError: os_key = next(iter(k for k in env_keys if k in os.environ), None) if os_key is not None: value = os.environ[os_key] if value is None: value = default return value
[docs] def initialize_plugin(self) -> None: """ Code to initialize the plugin """ swift_credentials = { "user_domain_name": self.get_config_value( "user_domain_name", "OS_USER_DOMAIN_NAME", default="default" ), "project_domain_name": self.get_config_value( "project_domain_name", "OS_PROJECT_DOMAIN_NAME", default="default" ), "password": self.get_config_value("password", "OS_PASSWORD"), } os_options: dict[str, Any] = {} user_id = self.get_config_value("username", "OS_USER_ID", "OS_USERNAME") project = self.get_config_value( "project_name", "OS_PROJECT_NAME", "OS_TENANT_NAME" ) auth_url = self.get_config_value( "auth_url", "OS_AUTH_URL", "OS_AUTHENTICATION_URL" ) object_storage_url = self.get_config_value( "object_storage_url", "OS_STORAGE_URL" ) region = self.get_config_value("region", "OS_REGION_NAME") project_id = self.get_config_value("project_id", "OS_PROJECT_ID") if user_id: swift_credentials["username"] = user_id if project: swift_credentials["project_name"] = project os_options["project_name"] = project if object_storage_url: os_options["object_storage_url"] = object_storage_url if region: os_options["region_name"] = region if project_id: os_options["PROJECT_ID"] = project_id if auth_url: swift_credentials["auth_url"] = auth_url self.os_options = os_options self.auth = keystoneauth1.identity.v3.Password(**swift_credentials) self._test_connection() SwiftPath.register_backend(self) _SwiftAccessor.register_backend(self) global _swift_accessor _swift_accessor = _SwiftAccessor
[docs] def get_lock(self, path: str | None = None) -> SwiftFileLock: """ Retrieve the appropriate `FileLock` backend for this storage plugin :param str path: The path to use for locking :return: A `FileLock` backend for obtaining locks :rtype: SwiftFileLock """ if path is None: path = str(self.mirror_base_path / ".lock") return SwiftFileLock(path, backend=self)
def _test_connection(self) -> None: with self.connection() as conn: try: resp_headers, containers = conn.get_account() except keystoneauth1.exceptions.catalog.EndpointNotFound as exc: logger.exception("Failed authenticating to swift.", exc_info=exc) else: logger.info( "Validated swift credentials, successfully connected to swift!" ) return def _get_session(self) -> keystoneauth1.session.Session: return keystoneauth1.session.Session(auth=self.auth) @property def default_container(self) -> str: try: return self.configuration["swift"]["default_container"] except KeyError: return "bandersnatch"
[docs] @contextlib.contextmanager def connection(self) -> Generator[swiftclient.client.Connection, None, None]: with contextlib.closing( swiftclient.client.Connection( session=self._get_session(), os_options=self.os_options ) ) as swift_conn: yield swift_conn
[docs] def get_container(self, container: str | None = None) -> list[dict[str, str]]: """ Given the name of a container, return its contents. :param str container: The name of the desired container, defaults to :attr:`~SwiftStorage.default_container` :return: A list of objects in the container if it exists :rtype: List[Dict[str, str]] Example: >>> plugin.get_container("bandersnatch") [{ 'bytes': 1101, 'last_modified': '2020-02-27T19:10:17.922970', 'hash': 'a76b4c69bfcf82313bbdc0393b04438a', 'name': 'packages/pyyaml/PyYAML-5.3/LICENSE', 'content_type': 'application/octet-stream' }, { 'bytes': 1779, 'last_modified': '2020-02-27T19:10:17.845520', 'hash': 'c60081e1ad65830b098a7f21a8a8c90e', 'name': 'packages/pyyaml/PyYAML-5.3/PKG-INFO', 'content_type': 'application/octet-stream' }, { 'bytes': 1548, 'last_modified': '2020-02-27T19:10:17.730490', 'hash': '9a8bdf19e93d4b007598b5eb97b461eb', 'name': 'packages/pyyaml/PyYAML-5.3/README', 'content_type': 'application/octet-stream' }, ... ] """ if not container: container = self.default_container with self.connection() as conn: return conn.get_container(container) # type: ignore
[docs] def get_object(self, container_name: str, file_path: str) -> bytes: """Retrieve an object from swift, base64 decoding the contents.""" with self.connection() as conn: try: _, file_contents = conn.get_object(container_name, file_path) except swiftclient.exceptions.ClientException: raise FileNotFoundError(file_path) else: if len(file_contents) % 4 == 0 and BASE64_RE.fullmatch(file_contents): return base64.b64decode(file_contents) return bytes(file_contents)
[docs] def walk( self, root: PATH_TYPES, dirs: bool = True, conn: swiftclient.client.Connection | None = None, ) -> list[SwiftPath]: results: list[SwiftPath] = [] with contextlib.ExitStack() as stack: if conn is None: conn = stack.enter_context(self.connection()) paths = conn.get_container(self.default_container, prefix=str(root)) for p in paths: if "subdir" in p and dirs: results.append(self.PATH_BACKEND(p["subdir"])) else: results.append(self.PATH_BACKEND(p["name"])) return results
[docs] def find(self, root: PATH_TYPES, dirs: bool = True) -> str: """A test helper simulating 'find'. Iterates over directories and filenames, given as relative paths to the root. """ results = self.walk(root, dirs=dirs) results.sort() return "\n".join(str(result.relative_to(root)) for result in results)
[docs] def compare_files(self, file1: PATH_TYPES, file2: PATH_TYPES) -> bool: """Compare two files, returning true if they are the same and False if not.""" file1_contents = self.read_file(file1, text=False) file2_contents = self.read_file(file2, text=False) assert isinstance(file1_contents, bytes) assert isinstance(file2_contents, bytes) file1_hash = hashlib.sha256(file1_contents).hexdigest() file2_hash = hashlib.sha256(file2_contents).hexdigest() return file1_hash == file2_hash
[docs] @contextlib.contextmanager def rewrite( self, filepath: PATH_TYPES, mode: str = "w", **kw: Any ) -> Generator[IO, None, None]: """Rewrite an existing file atomically to avoid programs running in parallel to have race conditions while reading.""" # TODO: Account for alternative backends if isinstance(filepath, str): filename = os.path.basename(filepath) else: filename = filepath.name # Change naming format to be more friendly with distributed POSIX # filesystems like GlusterFS that hash based on filename # GlusterFS ignore '.' at the start of filenames and this avoid rehashing with tempfile.NamedTemporaryFile( mode=mode, prefix=f".{filename}.", delete=False, **kw ) as f: filepath_tmp = f.name yield f if not os.path.exists(filepath_tmp): # Allow our clients to remove the file in case it doesn't want it to be # put in place actually but also doesn't want to error out. return os.chmod(filepath_tmp, 0o100644) shutil.move(filepath_tmp, filepath)
[docs] @contextlib.contextmanager def update_safe(self, filename: PATH_TYPES, **kw: Any) -> Generator[IO, None, None]: """Rewrite a file atomically. Clients are allowed to delete the tmpfile to signal that they don't want to have it updated. """ with tempfile.NamedTemporaryFile( delete=False, prefix=f"{os.path.basename(filename)}.", **kw, ) as tf: tf.has_changed = False # type: ignore yield tf if not os.path.exists(tf.name): return local_filename_tmp = pathlib.Path(tf.name) filename_tmp = SwiftPath(f"{os.path.dirname(filename)}/{tf.name}") self.copy_local_file(str(local_filename_tmp), str(filename_tmp)) local_filename_tmp.unlink() if self.exists(filename) and self.compare_files( str(filename_tmp), str(filename) ): self.delete_file(filename_tmp) else: self.move_file(filename_tmp, filename) tf.has_changed = True # type: ignore
[docs] def copy_local_file(self, source: PATH_TYPES, dest: PATH_TYPES) -> None: """Copy the contents of a local file to a destination in swift""" with open(source, "rb") as fh: self.write_file(str(dest), fh.read()) return
[docs] def copy_file( self, source: PATH_TYPES, dest: PATH_TYPES, dest_container: str | None = None ) -> None: """Copy a file from **source** to **dest**""" if dest_container is None: dest_container = self.default_container dest = f"{dest_container}/{dest}" with self.connection() as conn: conn.copy_object(self.default_container, str(source), dest) return
[docs] def move_file( self, source: PATH_TYPES, dest: PATH_TYPES, dest_container: str | None = None ) -> None: """Move a file from **source** to **dest**""" if dest_container is None: dest_container = self.default_container dest = f"{dest_container}/{dest}" with self.connection() as conn: conn.copy_object(self.default_container, str(source), dest) try: conn.delete_object(self.default_container, str(source)) except swiftclient.exceptions.ClientException: raise FileNotFoundError(str(source)) return
[docs] def write_file( self, path: PATH_TYPES, contents: str | bytes | IO, encoding: str | None = None, errors: str | None = None, ) -> None: """Write data to the provided path. If **contents** is a string, the file will be opened and written in "r" + "utf-8" mode, if bytes are supplied it will be accessed using "rb" mode (i.e. binary write).""" if encoding is not None: if errors is None: try: errors = sys.getfilesystemencodeerrors() except AttributeError: errors = "surrogateescape" if isinstance(contents, str): contents = contents.encode(encoding=encoding, errors=errors) elif isinstance(contents, bytes): contents = contents.decode(encoding=encoding, errors=errors) with self.connection() as conn: conn.put_object(self.default_container, str(path), contents) return
[docs] @contextlib.contextmanager def open_file( self, path: PATH_TYPES, text: bool = True ) -> Generator[IO, None, None]: """Yield a file context to iterate over. If text is false, open the file with 'rb' mode specified.""" wrapper = io.StringIO if text else io.BytesIO content: IO = wrapper(self.read_file(path, text=text)) yield content
[docs] def read_file( self, path: PATH_TYPES, text: bool = True, encoding: str = "utf-8", errors: str | None = None, ) -> str | bytes: """Return the contents of the requested file, either a a bytestring or a unicode string depending on whether **text** is True""" content: str | bytes if not errors: try: errors = sys.getfilesystemencodeerrors() except AttributeError: errors = "surrogateescape" kwargs: dict[str, Any] = {} if errors: kwargs["errors"] = errors content = self.get_object(self.default_container, str(path)) if text and isinstance(content, bytes): content = content.decode(encoding=encoding, **kwargs) return content
[docs] def delete_file(self, path: PATH_TYPES, dry_run: bool = False) -> int: """Delete the provided path, recursively if necessary.""" if not isinstance(path, pathlib.Path): path = pathlib.Path(path) log_prefix = "[DRY RUN] " if dry_run else "" with self.connection() as conn: logger.info(f"{log_prefix}Deleting item from object storage: {path}") if not dry_run: try: conn.delete_object(self.default_container, path.as_posix()) except swiftclient.exceptions.ClientException: raise FileNotFoundError(path.as_posix()) return 0
[docs] def mkdir( self, path: PATH_TYPES, exist_ok: bool = False, parents: bool = False ) -> None: """ Create the provided directory This operation is a no-op on swift. """ logger.warning( f"Creating directory in object storage: {path} with .swiftkeep file" ) if not isinstance(path, self.PATH_BACKEND): path = self.PATH_BACKEND(path) path.joinpath(".swiftkeep").touch()
[docs] def scandir(self, path: PATH_TYPES) -> Generator[SwiftDirEntry, None, None]: """Read entries from the provided directory""" if not isinstance(path, self.PATH_BACKEND): path = self.PATH_BACKEND(path) for p in path._accessor.scandir(str(path)): if p.name == ".swiftkeep": continue yield p
[docs] def rmdir( self, path: PATH_TYPES, recurse: bool = False, force: bool = False, ignore_errors: bool = False, dry_run: bool = False, ) -> int: """ Remove the directory. If recurse is True, allow removing empty children. If force is true, remove contents destructively. """ if not force: if not isinstance(path, self.PATH_BACKEND): path = self.PATH_BACKEND(path) contents = list(path.iterdir(include_swiftkeep=True, recurse=True)) if contents and all(p.name == ".swiftkeep" for p in contents): for p in contents: if p.name == ".swiftkeep": p.unlink() raise OSError( "Object container directories are auto-destroyed when they are emptied" ) target_path = str(path) if target_path == ".": target_path = "" log_prefix = "[DRY RUN] " if dry_run else "" with self.connection() as conn: for item in self.walk(root=target_path): logger.info(f"{log_prefix}Deleting item from object storage: {item}") if not dry_run: conn.delete_object(self.default_container, str(item)) return 0
[docs] def exists(self, path: PATH_TYPES) -> bool: """Check whether the provided path exists""" if not isinstance(path, SwiftPath): path = SwiftPath(str(path)) target_path = str(path) if target_path == ".": target_path = "" return any([self.is_dir(path), self.is_file(path)])
[docs] def is_dir(self, path: PATH_TYPES) -> bool: """Check whether the provided path is a directory.""" if not isinstance(path, SwiftPath): path = SwiftPath(str(path)) target_path = str(path) if target_path == ".": target_path = "" if target_path and not target_path.endswith("/"): target_path = f"{target_path}/" files: list[str] = [] with self.connection() as conn: try: files = conn.get_container(self.default_container, prefix=target_path) except swiftclient.exceptions.ClientException: return False return bool(files)
[docs] def is_file(self, path: PATH_TYPES) -> bool: """Check whether the provided path is a file.""" if not isinstance(path, SwiftPath): path = SwiftPath(path) target_path = str(path) if target_path == ".": return False with self.connection() as conn: try: conn.head_object(self.default_container, target_path) except swiftclient.exceptions.ClientException: return False else: return True
[docs] def update_timestamp(self, path: PATH_TYPES) -> None: with self.connection() as conn: conn.post_object( self.default_container, str(path), {"x-timestamp": str(datetime.datetime.now().timestamp())}, )
[docs] def get_hash(self, path: PATH_TYPES, function: str = "sha256") -> str: h = getattr(hashlib, function)(self.read_file(path, text=False)) return str(h.hexdigest())
[docs] def get_file_size(self, path: PATH_TYPES) -> int: with self.connection() as conn: headers = conn.head_object( self.default_container, str(path), ) return int(headers.get("content-length", "0"))
[docs] def get_upload_time(self, path: PATH_TYPES) -> datetime.datetime: with self.connection() as conn: headers = conn.head_object( self.default_container, str(path), ) ts = int(headers.get("x-object-meta-upload", "0")) return datetime.datetime.fromtimestamp(ts, datetime.timezone.utc)
[docs] def set_upload_time(self, path: PATH_TYPES, time: datetime.datetime) -> None: """Set the upload time of a given **path**""" with self.connection() as conn: conn.post_object( self.default_container, str(path), {"x-object-meta-upload": str(time.timestamp())}, )