Source code for bandersnatch_filter_plugins.blocklist_name
import logging
from typing import Any
from packaging.requirements import Requirement
from packaging.utils import canonicalize_name
from packaging.version import InvalidVersion, Version
from bandersnatch.filter import FilterProjectPlugin, FilterReleasePlugin
logger = logging.getLogger("bandersnatch")
[docs]
class BlockListProject(FilterProjectPlugin):
name = "blocklist_project"
# Requires iterable default
blocklist_package_names: list[str] = []
[docs]
def initialize_plugin(self) -> None:
"""
Initialize the plugin
"""
# Generate a list of blocklisted packages from the configuration and
# store it into self.blocklist_package_names attribute so this
# operation doesn't end up in the fastpath.
if not self.blocklist_package_names:
self.blocklist_package_names = self._determine_filtered_package_names()
logger.info(
f"Initialized project plugin {self.name}, filtering "
+ f"{self.blocklist_package_names}"
)
def _determine_filtered_package_names(self) -> list[str]:
"""
Return a list of package names to be filtered base on the configuration
file.
"""
# This plugin only processes packages, if the line in the packages
# configuration contains a PEP440 specifier it will be processed by the
# blocklist release filter. So we need to remove any packages that
# are not applicable for this plugin.
filtered_packages: set[str] = set()
try:
lines = self.blocklist["packages"]
package_lines = lines.split("\n")
except KeyError:
package_lines = []
for package_line in package_lines:
package_line = package_line.strip()
if not package_line or package_line.startswith("#"):
continue
package_requirement = Requirement(package_line)
if package_requirement.specifier:
continue
if package_requirement.name != package_line:
logger.debug(
"Package line %r does not match requirement name %r",
package_line,
package_requirement.name,
)
continue
filtered_packages.add(canonicalize_name(package_requirement.name))
logger.debug("Project blocklist is %r", list(filtered_packages))
return list(filtered_packages)
[docs]
def filter(self, metadata: dict) -> bool:
return not self.check_match(name=metadata["info"]["name"])
[docs]
def check_match(self, **kwargs: Any) -> bool:
"""
Check if the package name matches against a project that is blocklisted
in the configuration.
Parameters
==========
name: str
The normalized package name of the package/project to check against
the blocklist.
Returns
=======
bool:
True if it matches, False otherwise.
"""
name = kwargs.get("name", None)
if not name:
return False
if canonicalize_name(name) in self.blocklist_package_names:
logger.info(f"Package {name!r} is blocklisted")
return True
return False
[docs]
class BlockListRelease(FilterReleasePlugin):
name = "blocklist_release"
# Requires iterable default
blocklist_package_names: list[Requirement] = []
[docs]
def initialize_plugin(self) -> None:
"""
Initialize the plugin
"""
# Generate a list of blocklisted packages from the configuration and
# store it into self.blocklist_package_names attribute so this
# operation doesn't end up in the fastpath.
if not self.blocklist_package_names:
self.blocklist_release_requirements = (
self._determine_filtered_package_requirements()
)
logger.info(
f"Initialized release plugin {self.name}, filtering "
+ f"{self.blocklist_release_requirements}"
)
def _determine_filtered_package_requirements(self) -> list[Requirement]:
"""
Parse the configuration file for [blocklist]packages
Returns
-------
list of packaging.requirements.Requirement
For all PEP440 package specifiers
"""
filtered_requirements: set[Requirement] = set()
try:
lines = self.blocklist["packages"]
package_lines = lines.split("\n")
except KeyError:
package_lines = []
for package_line in package_lines:
package_line = package_line.strip()
if not package_line or package_line.startswith("#"):
continue
requirement = Requirement(package_line)
requirement.name = canonicalize_name(requirement.name)
requirement.specifier.prereleases = True
filtered_requirements.add(requirement)
return list(filtered_requirements)
[docs]
def filter(self, metadata: dict) -> bool:
"""
Returns False if version fails the filter,
i.e. matches a blocklist version specifier
"""
name = metadata["info"]["name"]
version = metadata["version"]
return not self._check_match(canonicalize_name(name), version)
def _check_match(self, name: str, version_string: str) -> bool:
"""
Check if the package name and version matches against a blocklisted
package version specifier.
Parameters
==========
name: str
Package name
version: str
Package version
Returns
=======
bool:
True if it matches, False otherwise.
"""
if not name or not version_string:
return False
try:
version = Version(version_string)
except InvalidVersion:
logger.debug(f"Package {name}=={version_string} has an invalid version")
return False
for requirement in self.blocklist_release_requirements:
if name != requirement.name:
continue
if version in requirement.specifier:
logger.debug(
f"MATCH: Release {name}=={version} matches specifier "
f"{requirement.specifier}"
)
return True
return False