import asyncio
import hashlib
import html
import logging
import os
import sys
from json import dump
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from urllib.parse import unquote, urlparse
from packaging.utils import canonicalize_name
from . import utils
from .master import PackageNotFound, StalePage
if TYPE_CHECKING: # pragma: no cover
from .mirror import Mirror
from .filter import Filter, LoadedFilters
# Bool to help us not spam the logs with certain log messages
display_filter_log = True
logger = logging.getLogger(__name__)
[docs]class Package:
def __init__(self, name: str, serial: Union[int, str], mirror: "Mirror") -> None:
self.name = canonicalize_name(name)
self.raw_name = name
self.serial = serial
self.mirror = mirror
self._metadata: Optional[Dict] = None
@property
def metadata(self) -> Dict[str, Any]:
assert self._metadata is not None, "Must fetch metadata before accessing it"
return self._metadata
@property
def info(self) -> Dict[str, Any]:
return self.metadata["info"] # type: ignore
@property
def last_serial(self) -> int:
return self.metadata["last_serial"] # type: ignore
@property
def releases(self) -> Dict[str, List]:
return self.metadata["releases"] # type: ignore
@property
def release_files(self) -> List:
release_files: List[Dict] = []
for release in self.releases.values():
release_files.extend(release)
return release_files
@property
def json_file(self) -> Path:
return Path(self.mirror.webdir / "json" / self.name)
@property
def json_pypi_symlink(self) -> Path:
return Path(self.mirror.webdir / "pypi" / self.name / "json")
@property
def simple_directory(self) -> Path:
if self.mirror.hash_index:
return Path(self.mirror.webdir / "simple" / self.name[0] / self.name)
return Path(self.mirror.webdir / "simple" / self.name)
[docs] async def sync(self, filters: "LoadedFilters", attempts: int = 3) -> None:
self.json_saved = False
try:
await self.update_metadata(attempts=attempts)
# Don't save anything if our metadata filters all fail.
if not self.metadata or not self._filter_metadata(
self.metadata, filters.filter_metadata_plugins()
):
return None
# save the metadata before filtering releases
if self.mirror.json_save and not self.json_saved:
loop = asyncio.get_event_loop()
self.json_saved = await loop.run_in_executor(
None, self.save_json_metadata, self.metadata
)
self._filter_all_releases_files(filters.filter_release_file_plugins())
self._filter_all_releases(filters.filter_release_plugins())
await self.sync_release_files()
self.sync_simple_page()
# XMLRPC PyPI Endpoint stores raw_name so we need to provide it
self.mirror.record_finished_package(self.raw_name)
except Exception:
logger.exception(f"Error syncing package: {self.name}@{self.serial}")
self.mirror.errors = True
if self.mirror.errors and self.mirror.stop_on_error:
logger.error("Exiting early after error.")
sys.exit(1)
def _filter_metadata(
self, metadata: Dict, metadata_filters: List["Filter"]
) -> bool:
"""
Run the metadata filtering plugins
"""
global display_filter_log
if not metadata_filters:
if display_filter_log:
logger.info(
"No metadata filters are enabled. Skipping metadata filtering"
)
display_filter_log = False
return True
return all(plugin.filter(metadata) for plugin in metadata_filters)
def _filter_release(
self, release_data: Dict, release_filters: List["Filter"]
) -> bool:
"""
Run the release filtering plugins
"""
global display_filter_log
if not release_filters:
if display_filter_log:
logger.info(
"No release filters are enabled. Skipping release filtering"
)
display_filter_log = False
return True
return all(plugin.filter(release_data) for plugin in release_filters)
def _filter_all_releases(self, release_filters: List["Filter"]) -> bool:
"""
Filter releases and removes releases that fail the filters
"""
releases = list(self.releases.keys())
for version in releases:
if not self._filter_release(
{"version": version, "releases": self.releases, "info": self.info},
release_filters,
):
del self.releases[version]
if releases:
return True
return False
def _filter_release_file(
self, metadata: Dict, release_file_filters: List["Filter"]
) -> bool:
"""
Run the release file filtering plugins
"""
global display_filter_log
if not release_file_filters:
if display_filter_log:
logger.info(
"No release file filters are enabled. Skipping release file filtering" # noqa: E501
)
display_filter_log = False
return True
return all(plugin.filter(metadata) for plugin in release_file_filters)
def _filter_all_releases_files(self, release_file_filters: List["Filter"]) -> bool:
"""
Filter release files and remove empty releases after doing so.
"""
releases = list(self.releases.keys())
for version in releases:
release_files = list(self.releases[version])
for rfindex in reversed(range(len(release_files))):
if not self._filter_release_file(
{
"info": self.info,
"release": version,
"release_file": self.releases[version][rfindex],
},
release_file_filters,
):
del self.releases[version][rfindex]
if not self.releases[version]:
del self.releases[version]
if releases:
return True
return False
[docs] async def sync_release_files(self) -> None:
""" Purge + download files returning files removed + added """
downloaded_files = set()
deferred_exception = None
for release_file in self.release_files:
try:
downloaded_file = await self.download_file(
release_file["url"], release_file["digests"]["sha256"]
)
if downloaded_file:
downloaded_files.add(
str(downloaded_file.relative_to(self.mirror.homedir))
)
except Exception as e:
logger.exception(
"Continuing to next file after error downloading: "
f"{release_file['url']}"
)
if not deferred_exception: # keep first exception
deferred_exception = e
if deferred_exception:
raise deferred_exception # raise the exception after trying all files
self.mirror.altered_packages[self.name] = downloaded_files
[docs] def gen_data_requires_python(self, release: Dict) -> str:
if "requires_python" in release and release["requires_python"] is not None:
return f' data-requires-python="{html.escape(release["requires_python"])}"'
return ""
[docs] def generate_simple_page(self) -> str:
# Generate the header of our simple page.
simple_page_content = (
"<!DOCTYPE html>\n"
"<html>\n"
" <head>\n"
" <title>Links for {0}</title>\n"
" </head>\n"
" <body>\n"
" <h1>Links for {0}</h1>\n"
).format(self.raw_name)
logger.debug(
f"There are {len(self.releases.values())} releases for {self.name}"
)
release_files = self.release_files
# Lets sort based on the filename rather than the whole URL
release_files.sort(key=lambda x: x["filename"])
digest_name = self.mirror.digest_name
simple_page_content += "\n".join(
[
' <a href="{}#{}={}"{}>{}</a><br/>'.format(
self._file_url_to_local_url(r["url"]),
digest_name,
r["digests"][digest_name],
self.gen_data_requires_python(r),
r["filename"],
)
for r in release_files
]
)
simple_page_content += f"\n </body>\n</html>\n<!--SERIAL {self.last_serial}-->"
return simple_page_content
[docs] def sync_simple_page(self) -> None:
logger.info(f"Storing index page: {self.name} - in {self.simple_directory}")
simple_page_content = self.generate_simple_page()
if not self.simple_directory.exists():
self.simple_directory.mkdir(parents=True)
if self.mirror.keep_index_versions > 0:
self._save_simple_page_version(simple_page_content)
else:
simple_page = self.simple_directory / "index.html"
with self.mirror.storage_backend.rewrite(
simple_page, "w", encoding="utf-8"
) as f:
f.write(simple_page_content)
self.mirror.diff_file_list.append(simple_page)
def _save_simple_page_version(self, simple_page_content: str) -> None:
versions_path = self._prepare_versions_path()
timestamp = utils.make_time_stamp()
version_file_name = f"index_{self.serial}_{timestamp}.html"
full_version_path = versions_path / version_file_name
# TODO: Change based on storage backend
with self.mirror.storage_backend.rewrite(
full_version_path, "w", encoding="utf-8"
) as f:
f.write(simple_page_content)
self.mirror.diff_file_list.append(full_version_path)
symlink_path = self.simple_directory / "index.html"
if symlink_path.exists() or symlink_path.is_symlink():
symlink_path.unlink()
symlink_path.symlink_to(full_version_path)
def _prepare_versions_path(self) -> Path:
versions_path = (
self.mirror.storage_backend.PATH_BACKEND(self.simple_directory) / "versions"
)
if not versions_path.exists():
versions_path.mkdir()
else:
version_files = list(sorted(versions_path.iterdir()))
version_files_to_remove = (
len(version_files) - self.mirror.keep_index_versions + 1
)
for i in range(version_files_to_remove):
version_files[i].unlink()
return versions_path
def _file_url_to_local_url(self, url: str) -> str:
parsed = urlparse(url)
if not parsed.path.startswith("/packages"):
raise RuntimeError(f"Got invalid download URL: {url}")
prefix = self.mirror.root_uri if self.mirror.root_uri else "../.."
return prefix + parsed.path
# TODO: This can also return SwiftPath instances now...
def _file_url_to_local_path(self, url: str) -> Path:
path = urlparse(url).path
path = unquote(path)
if not path.startswith("/packages"):
raise RuntimeError(f"Got invalid download URL: {url}")
path = path[1:]
return self.mirror.webdir / path
# TODO: This can also return SwiftPath instances now...
[docs] async def download_file(
self, url: str, sha256sum: str, chunk_size: int = 64 * 1024
) -> Optional[Path]:
path = self._file_url_to_local_path(url)
# Avoid downloading again if we have the file and it matches the hash.
if path.exists():
existing_hash = self.mirror.storage_backend.get_hash(str(path))
if existing_hash == sha256sum:
return None
else:
logger.info(
f"Checksum mismatch with local file {path}: expected {sha256sum} "
+ f"got {existing_hash}, will re-download."
)
path.unlink()
logger.info(f"Downloading: {url}")
dirname = path.parent
if not dirname.exists():
dirname.mkdir(parents=True)
# Even more special handling for the serial of package files here:
# We do not need to track a serial for package files
# as PyPI generally only allows a file to be uploaded once
# and then maybe deleted. Re-uploading (and thus changing the hash)
# is only allowed in extremely rare cases with intervention from the
# PyPI admins.
r_generator = self.mirror.master.get(url, required_serial=None)
response = await r_generator.asend(None)
checksum = hashlib.sha256()
with self.mirror.storage_backend.rewrite(path, "wb") as f:
while True:
chunk = await response.content.read(chunk_size)
if not chunk:
break
checksum.update(chunk)
f.write(chunk)
existing_hash = checksum.hexdigest()
if existing_hash != sha256sum:
# Bad case: the file we got does not match the expected
# checksum. Even if this should be the rare case of a
# re-upload this will fix itself in a later run.
raise ValueError(
f"Inconsistent file. {url} has hash {existing_hash} "
+ f"instead of {sha256sum}."
)
return path