Source code for bandersnatch.delete
#!/usr/bin/env python3
import asyncio
import concurrent.futures
import logging
from argparse import Namespace
from collections.abc import Awaitable
from configparser import ConfigParser
from functools import partial
from json import JSONDecodeError, load
from pathlib import Path
from urllib.parse import urlparse
from packaging.utils import canonicalize_name
from .master import Master
from .storage import storage_backend_plugins
from .verify import get_latest_json
logger = logging.getLogger(__name__)
[docs]
async def delete_path(blob_path: Path, dry_run: bool = False) -> int:
storage_backend = next(iter(storage_backend_plugins()))
if dry_run:
logger.info(f" rm {blob_path}")
return 0
blob_exists = await storage_backend.loop.run_in_executor(
storage_backend.executor, storage_backend.exists, blob_path
)
if not blob_exists:
logger.debug(f"{blob_path} does not exist. Skipping")
return 0
try:
del_partial = partial(storage_backend.delete, blob_path, dry_run=dry_run)
await storage_backend.loop.run_in_executor(
storage_backend.executor, del_partial
)
except FileNotFoundError:
# Due to using threads in executors we sometimes have a
# race condition if canonicalize_name == passed in name
pass
except OSError:
logger.exception(f"Unable to delete {blob_path}")
return 1
return 0
[docs]
async def delete_simple_page(
simple_base_path: Path, package: str, hash_index: bool = False, dry_run: bool = True
) -> int:
if dry_run:
logger.info(f"[dry run]rm simple page of {package}")
return 0
simple_dir = simple_base_path / package
simple_index = simple_dir / "index.html"
hashed_simple_dir = simple_base_path / package[0] / package
hashed_index = hashed_simple_dir / "index.html"
folders_to_clean = [simple_dir]
if hash_index:
if hashed_index.exists():
hashed_index.unlink()
folders_to_clean.append(hashed_simple_dir)
else:
if simple_index.exists():
simple_index.unlink()
for f in folders_to_clean:
# separate to 3 stages to avoid case like s3
# (folder will be removed automatically if empty)
if f.exists():
for p in reversed(list(f.rglob("*"))):
if p.is_file() or p.is_symlink():
p.unlink()
if f.exists():
for p in reversed(list(f.rglob("*"))):
if p.is_dir():
p.rmdir()
if f.exists() and f.is_dir():
f.rmdir()
return 0
[docs]
async def delete_packages(config: ConfigParser, args: Namespace, master: Master) -> int:
workers = args.workers or config.getint("mirror", "workers")
executor = concurrent.futures.ThreadPoolExecutor(max_workers=workers)
storage_backend = next(
iter(
storage_backend_plugins(
backend=config.get("mirror", "storage-backend"),
config=config,
clear_cache=True,
)
)
)
web_base_path = storage_backend.web_base_path
json_base_path = storage_backend.json_base_path
pypi_base_path = storage_backend.pypi_base_path
simple_base_path = storage_backend.simple_base_path
delete_coros: list[Awaitable] = []
for package in args.pypi_packages:
canon_name = canonicalize_name(package)
need_nc_paths = canon_name != package
json_full_path = json_base_path / canon_name
json_full_path_nc = json_base_path / package if need_nc_paths else None
legacy_json_path = pypi_base_path / canon_name
logger.debug(f"Looking up {canon_name} metadata @ {json_full_path}")
if not storage_backend.exists(json_full_path):
if args.dry_run:
logger.error(
f"Skipping {json_full_path} as dry run and no JSON file exists"
)
continue
logger.error(f"{json_full_path} does not exist. Pulling from PyPI")
await get_latest_json(master, json_full_path, executor, False)
if not json_full_path.exists():
logger.info(
f"No json file for {package} found, skipping blob file cleaning"
)
else:
with storage_backend.open_file(json_full_path, text=True) as jfp:
try:
package_data = load(jfp)
except JSONDecodeError:
logger.exception(f"Skipping {canon_name} @ {json_full_path}")
continue
for _release, blobs in package_data["releases"].items():
for blob in blobs:
url_parts = urlparse(blob["url"])
blob_path = web_base_path / url_parts.path[1:]
delete_coros.append(delete_path(blob_path, args.dry_run))
# Attempt to delete json, normal simple path + hash simple path
hash_index_enabled = config.getboolean("mirror", "hash-index")
if need_nc_paths:
delete_coros.append(
delete_simple_page(
simple_base_path,
canon_name,
hash_index=hash_index_enabled,
dry_run=args.dry_run,
)
)
delete_coros.append(
delete_simple_page(
simple_base_path,
package,
hash_index=hash_index_enabled,
dry_run=args.dry_run,
)
)
for package_path in (
json_full_path,
legacy_json_path,
json_full_path_nc,
):
if not package_path:
continue
delete_coros.append(delete_path(package_path, args.dry_run))
if args.dry_run:
logger.info("-- bandersnatch delete DRY RUN --")
if delete_coros:
logger.info(f"Attempting to remove {len(delete_coros)} files")
return sum(await asyncio.gather(*delete_coros))
return 0