You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

257 lines
13 KiB
Python

#!/usr/bin/env python3
import argparse
import hashlib
import json
import os.path
import pathlib
import shutil
import sys
import urllib.request
from collections import OrderedDict
DEFAULT_REPO_URL = "https://downloads.raspberrypi.com/os_list_imagingutility_v4.json"
DEFAULT_OUTPUT_JSON_FILE = "os_list_local.rpi-imager-manifest"
CACHE_DIR = "cache"
ICONS_DIR = os.path.join(CACHE_DIR, "icons")
CHUNK_SIZE = 1024 * 1024 # read files in 1MB chunks
# Valid OS capabilities (see doc/json-schema/os-list-schema.json)
VALID_OS_CAPABILITIES = {
"i2c": "Enable I2C interface option",
"onewire": "Enable 1-Wire interface option",
"passwordless_sudo": "Enable passwordless sudo option in user setup",
"rpi_connect": "Enable Raspberry Pi Connect setup",
"secure_boot": "Enable secure boot signing",
"serial": "Enable serial interface option",
"spi": "Enable SPI interface option",
"usb_otg": "Enable USB Gadget mode option",
}
# Valid device (hardware) capabilities
VALID_DEVICE_CAPABILITIES = {
"i2c": "Device supports I2C interface",
"onewire": "Device supports 1-Wire interface",
"serial": "Device supports serial interface",
"serial_on_console_only": "Serial is console-only mode",
"spi": "Device supports SPI interface",
"usb_otg": "Device supports USB Gadget mode",
}
def fatal_error(reason):
print(f"Error: {reason}")
sys.exit(1)
def display_warning(reason):
print(f"Warning: {reason}")
def scan_os_list(os_list, os_entries=OrderedDict(), duplicates=set()):
for os_entry in os_list:
if "subitems" in os_entry:
scan_os_list(os_entry["subitems"], os_entries, duplicates)
elif "url" in os_entry:
image_filename = os.path.basename(os_entry["url"])
if image_filename not in duplicates:
if image_filename in os_entries:
duplicates.add(image_filename)
del os_entries[image_filename]
os_entries[image_filename] = os_entry
return os_entries, duplicates
def download_file(url, filename):
try:
#print(f"Downloading {url} to {filename}")
with urllib.request.urlopen(url) as response:
with open(filename, "wb") as fh:
shutil.copyfileobj(response, fh, CHUNK_SIZE)
return True
except Exception:
return False
def download_icon(icon_url):
icon_filename = os.path.join(ICONS_DIR, os.path.basename(icon_url))
if os.path.exists(icon_filename) or download_file(icon_url, icon_filename):
return icon_filename
else:
display_warning(f"Couldn't download {icon_url}")
def calculate_checksum(filename):
with open(filename, "rb") as f:
if hasattr(hashlib, "file_digest"):
# only available in python 3.11 or newer
hasher = hashlib.file_digest(f, "sha256")
else:
hasher = hashlib.new("sha256")
while True:
data = f.read(CHUNK_SIZE)
if not data:
break
hasher.update(data)
return hasher.hexdigest()
if __name__ == "__main__":
epilog = "available OS capabilities for --capabilities:\n"
epilog += "\n".join(f" {cap:12} {desc}" for cap, desc in VALID_OS_CAPABILITIES.items())
epilog += "\n\navailable device capabilities for --device-capabilities:\n"
epilog += "\n".join(f" {cap:20} {desc}" for cap, desc in VALID_DEVICE_CAPABILITIES.items())
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=epilog)
parser.add_argument("--repo", default=DEFAULT_REPO_URL, help="custom repository URL")
parser.add_argument("--search-dir", default=".", help="directory to search for downloaded images")
parser.add_argument("--output-json", default=DEFAULT_OUTPUT_JSON_FILE, help=f"manifest file to create (defaults to {DEFAULT_OUTPUT_JSON_FILE})")
parser.add_argument("--online", action="store_true", help="use online URLs from the repository instead of searching for local files")
group = parser.add_mutually_exclusive_group()
group.add_argument("--dry-run", action="store_true", help="dry run only, don't create manifest file")
group.add_argument("--download-icons", action="store_true", help="make a local copy of the device and OS icons")
parser.add_argument("--verify-checksums", action="store_true", help="verify the checksums of the downloaded images (ignored with --online)")
parser.add_argument("--capabilities", nargs="+", metavar="CAP",
help="add OS capabilities to enable features in the customization wizard (see below)")
parser.add_argument("--device-capabilities", nargs="+", metavar="CAP",
help="add device (hardware) capabilities to all devices (see below)")
args = parser.parse_args()
# Validate OS capabilities if provided
if args.capabilities:
invalid_caps = set(args.capabilities) - VALID_OS_CAPABILITIES.keys()
if invalid_caps:
fatal_error(f"Invalid OS capabilities: {', '.join(sorted(invalid_caps))}. Valid capabilities are: {', '.join(VALID_OS_CAPABILITIES.keys())}")
# Validate device capabilities if provided
if args.device_capabilities:
invalid_caps = set(args.device_capabilities) - VALID_DEVICE_CAPABILITIES.keys()
if invalid_caps:
fatal_error(f"Invalid device capabilities: {', '.join(sorted(invalid_caps))}. Valid capabilities are: {', '.join(VALID_DEVICE_CAPABILITIES.keys())}")
repo_urlparts = urllib.parse.urlparse(args.repo)
if not repo_urlparts.netloc or repo_urlparts.scheme not in ("http", "https"):
fatal_error("Expected repo to be a http:// or https:// URL")
repo_filename = os.path.basename(repo_urlparts.path)
if os.path.exists(repo_filename):
online_json_file = repo_filename
else:
os.makedirs(CACHE_DIR, exist_ok=True)
online_json_file = os.path.join(CACHE_DIR, repo_filename)
if os.path.exists(online_json_file):
print(f"Info: {online_json_file} already exists, so using local file")
else:
if not download_file(args.repo, online_json_file):
fatal_error(f"Couldn't download {args.repo}")
with open(online_json_file) as online_fh:
online_data = json.load(online_fh)
# Find image filenames in the online JSON (discarding duplicates)
online_os_entries, duplicate_online_filenames = scan_os_list(online_data["os_list"])
if not online_os_entries:
fatal_error(f"No matching OSes found in {online_json_file}")
if args.download_icons:
os.makedirs(ICONS_DIR, exist_ok=True)
local_os_entries = dict()
local_device_tags = set()
if args.online:
# Use all OS entries from online manifest with their original URLs
for name, os_entry in online_os_entries.items():
if args.dry_run:
print(f"Including {os_entry['name']}")
local_os_entries[name] = os_entry
if "devices" in os_entry:
for tag in os_entry["devices"]:
local_device_tags.add(tag)
if args.download_icons and "icon" in os_entry:
local_icon_filename = download_icon(os_entry["icon"])
if local_icon_filename:
os_entry["icon"] = pathlib.Path(os.path.abspath(local_icon_filename)).as_uri()
else:
# Recursively search for any matching local filenames
duplicate_local_filenames = set()
abs_search_dir_len = len(os.path.abspath(args.search_dir)) + 1
for root, dirs, files in os.walk(args.search_dir):
for name in files:
abs_local_image_filename = os.path.abspath(os.path.join(root, name))
# remove leading search_dir prefix
display_filename = abs_local_image_filename[abs_search_dir_len:]
if name in duplicate_online_filenames:
display_warning(f"Ignoring {display_filename} as it matched multiple filenames in {online_json_file}")
elif name in duplicate_local_filenames:
pass
elif name in local_os_entries:
display_warning(f"Ignoring {display_filename} as there are multiple local copies")
del local_os_entries[name]
duplicate_local_filenames.add(name)
elif name in online_os_entries:
os_entry = online_os_entries[name]
if "image_download_size" in os_entry and os.path.getsize(abs_local_image_filename) != os_entry["image_download_size"]:
display_warning(f"Ignoring {display_filename} as its size doesn't match with {online_json_file}")
elif args.verify_checksums and "image_download_sha256" in os_entry and calculate_checksum(abs_local_image_filename) != os_entry["image_download_sha256"]:
display_warning(f"Ignoring {display_filename} as its checksum doesn't match with {online_json_file}")
else:
if args.dry_run:
print(f"Found {display_filename} ({os_entry['name']})")
# point at our local file instead of the online URL
os_entry["url"] = pathlib.Path(abs_local_image_filename).as_uri()
# Auto-detect bmap sidecar file (e.g., image.img.xz.bmap or image.img.bmap)
for bmap_suffix in [abs_local_image_filename + ".bmap",
os.path.splitext(abs_local_image_filename)[0] + ".bmap"]:
if os.path.exists(bmap_suffix):
os_entry["bmap_url"] = pathlib.Path(bmap_suffix).as_uri()
if args.dry_run:
print(f" Found bmap sidecar: {bmap_suffix[abs_search_dir_len:]}")
break
local_os_entries[name] = os_entry
if "devices" in os_entry:
for tag in os_entry["devices"]:
local_device_tags.add(tag)
if args.download_icons and "icon" in os_entry:
local_icon_filename = download_icon(os_entry["icon"])
if local_icon_filename:
os_entry["icon"] = pathlib.Path(os.path.abspath(local_icon_filename)).as_uri()
num_images = len(local_os_entries)
if num_images < 1:
if args.dry_run:
fatal_error(f"No matching image files found")
else:
fatal_error(f"No matching image files found, so not creating {args.output_json}")
# Create the output data structure
local_data = dict()
if "imager" in online_data and "devices" in online_data["imager"]:
local_data["imager"] = dict()
local_data["imager"]["devices"] = list()
for device in online_data["imager"]["devices"]:
if "tags" not in device or not device["tags"] or any(tag in local_device_tags for tag in device["tags"]):
local_data["imager"]["devices"].append(device)
if args.download_icons and "icon" in device:
local_icon_filename = download_icon(device["icon"])
if local_icon_filename:
device["icon"] = pathlib.Path(os.path.abspath(local_icon_filename)).as_uri()
# Add device capabilities if specified
if args.device_capabilities:
existing_caps = set(device.get("capabilities", []))
merged_caps = existing_caps | set(args.device_capabilities)
device["capabilities"] = sorted(merged_caps)
# Sort the output OSes into the same order as the input OSes
os_order = list(online_os_entries.keys())
local_data["os_list"] = sorted(local_os_entries.values(), key=lambda x: os_order.index(os.path.basename(x["url"])))
# Add capabilities to OS entries if specified
if args.capabilities:
for os_entry in local_data["os_list"]:
# Merge with existing capabilities if present
existing_caps = set(os_entry.get("capabilities", []))
merged_caps = existing_caps | set(args.capabilities)
os_entry["capabilities"] = sorted(merged_caps)
# And finally write the local JSON file
if args.dry_run:
print(f"Would write {num_images} image{'s' if num_images > 1 else ''} to {args.output_json}")
else:
with open(args.output_json, "w") as local_fh:
json.dump(local_data, local_fh, indent=2)
print(f"Wrote {num_images} image{'s' if num_images > 1 else ''} to {args.output_json}")