diff --git a/src/ukify/ukify.py b/src/ukify/ukify.py new file mode 100755 index 00000000000..4041cefc9d0 --- /dev/null +++ b/src/ukify/ukify.py @@ -0,0 +1,576 @@ +#!/usr/bin/env python3 +# SPDX-License-Identifier: LGPL-2.1+ + +# pylint: disable=missing-docstring,invalid-name,import-outside-toplevel +# pylint: disable=consider-using-with,unspecified-encoding,line-too-long +# pylint: disable=too-many-locals,too-many-statements,too-many-return-statements +# pylint: disable=too-many-branches + +import argparse +import collections +import dataclasses +import fnmatch +import itertools +import json +import os +import pathlib +import re +import shlex +import subprocess +import tempfile +import typing + +import pefile + +EFI_ARCH_MAP = { + # host_arch glob : [efi_arch, 32_bit_efi_arch if mixed mode is supported] + 'x86_64' : ['x64', 'ia32'], + 'i[3456]86' : ['ia32'], + 'aarch64' : ['aa64'], + 'arm[45678]*l' : ['arm'], + 'riscv64' : ['riscv64'], +} +EFI_ARCHES: list[str] = sum(EFI_ARCH_MAP.values(), []) + +def guess_efi_arch(): + arch = os.uname().machine + + for glob, mapping in EFI_ARCH_MAP.items(): + if fnmatch.fnmatch(arch, glob): + efi_arch, *fallback = mapping + break + else: + raise ValueError(f'Unsupported architecture {arch}') + + # This makes sense only on some architectures, but it also probably doesn't + # hurt on others, so let's just apply the check everywhere. + if fallback: + fw_platform_size = pathlib.Path('/sys/firmware/efi/fw_platform_size') + try: + size = fw_platform_size.read_text().strip() + except FileNotFoundError: + pass + else: + if int(size) == 32: + efi_arch = fallback[0] + + print(f'Host arch {arch!r}, EFI arch {efi_arch!r}') + return efi_arch + + +def shell_join(cmd): + # TODO: drop in favour of shlex.join once shlex.join supports pathlib.Path. + return ' '.join(shlex.quote(str(x)) for x in cmd) + + +def pe_executable_size(filename): + pe = pefile.PE(filename) + section = pe.sections[-1] + return section.VirtualAddress + section.Misc_VirtualSize + + +def round_up(x, blocksize=4096): + return (x + blocksize - 1) // blocksize * blocksize + + +@dataclasses.dataclass +class Section: + name: str + content: pathlib.Path + tmpfile: typing.IO | None = None + flags: list[str] | None = dataclasses.field(default=None) + offset: int | None = None + measure: bool = False + + @classmethod + def create(cls, name, contents, flags=None, measure=False): + if isinstance(contents, str): + tmp = tempfile.NamedTemporaryFile(mode='wt', prefix=f'tmp{name}') + tmp.write(contents) + tmp.flush() + contents = pathlib.Path(tmp.name) + else: + tmp = None + + return cls(name, contents, tmpfile=tmp, flags=flags, measure=measure) + + @classmethod + def parse_arg(cls, s): + try: + name, contents, *rest = s.split(':') + except ValueError as e: + raise ValueError(f'Cannot parse section spec (name or contents missing): {s!r}') from e + if rest: + raise ValueError(f'Cannot parse section spec (extraneous parameters): {s!r}') + + if contents.startswith('@'): + contents = pathlib.Path(contents[1:]) + + return cls.create(name, contents) + + def size(self): + return self.content.stat().st_size + + def check_name(self): + # PE section names with more than 8 characters are legal, but our stub does + # not support them. + if not self.name.isascii() or not self.name.isprintable(): + raise ValueError(f'Bad section name: {self.name!r}') + if len(self.name) > 8: + raise ValueError(f'Section name too long: {self.name!r}') + + +@dataclasses.dataclass +class UKI: + executable: list[pathlib.Path|str] + sections: list[Section] = dataclasses.field(default_factory=list, init=False) + offset: int | None = dataclasses.field(default=None, init=False) + + def __post_init__(self): + self.offset = round_up(pe_executable_size(self.executable)) + + def add_section(self, section): + assert self.offset + assert section.offset is None + + if section.name in [s.name for s in self.sections]: + raise ValueError(f'Duplicate section {section.name}') + + section.offset = self.offset + self.offset += round_up(section.size()) + self.sections += [section] + + +def parse_banks(s): + banks = re.split(r',|\s+', s) + # TODO: do some sanity checking here + return banks + + +KNOWN_PHASES = ( + 'enter-initrd', + 'leave-initrd', + 'sysinit', + 'ready', + 'shutdown', + 'final', +) + +def parse_phase_paths(s): + # Split on commas or whitespace here. Commas might be hard to parse visually. + paths = re.split(r',|\s+', s) + + for path in paths: + for phase in path.split(':'): + if phase not in KNOWN_PHASES: + raise argparse.ArgumentTypeError(f'Unknown boot phase {phase!r} ({path=})') + + return paths + + +def check_splash(filename): + if filename is None: + return + + # import is delayed, to avoid import when the splash image is not used + try: + from PIL import Image + except ImportError: + return + + img = Image.open(filename, formats=['BMP']) + print(f'Splash image {filename} is {img.width}×{img.height} pixels') + + +def check_inputs(opts): + for name, value in vars(opts).items(): + if name in {'output', 'tools'}: + continue + + if not isinstance(value, pathlib.Path): + continue + + # Open file to check that we can read it, or generate an exception + value.open().close() + + check_splash(opts.splash) + + +def find_tool(name, fallback=None, opts=None): + if opts and opts.tools: + tool = opts.tools / name + if tool.exists(): + return tool + + return fallback or name + + +def combine_signatures(pcrsigs): + combined = collections.defaultdict(list) + for pcrsig in pcrsigs: + for bank, sigs in pcrsig.items(): + for sig in sigs: + if sig not in combined[bank]: + combined[bank] += [sig] + return json.dumps(combined) + + +def call_systemd_measure(uki, linux, opts): + measure_tool = find_tool('systemd-measure', + '/usr/lib/systemd/systemd-measure', + opts=opts) + + banks = opts.pcr_banks or () + + # PCR measurement + + if opts.measure: + pp_groups = opts.phase_path_groups or [] + + cmd = [ + measure_tool, + 'calculate', + f'--linux={linux}', + *(f"--{s.name.removeprefix('.')}={s.content}" + for s in uki.sections + if s.measure), + *(f'--bank={bank}' + for bank in banks), + # For measurement, the keys are not relevant, so we can lump all the phase paths + # into one call to systemd-measure calculate. + *(f'--phase={phase_path}' + for phase_path in itertools.chain.from_iterable(pp_groups)), + ] + + print('+', shell_join(cmd)) + subprocess.check_call(cmd) + + # PCR signing + + if opts.pcr_private_keys: + n_priv = len(opts.pcr_private_keys or ()) + pp_groups = opts.phase_path_groups or [None] * n_priv + pub_keys = opts.pcr_public_keys or [None] * n_priv + + pcrsigs = [] + + cmd = [ + measure_tool, + 'sign', + f'--linux={linux}', + *(f"--{s.name.removeprefix('.')}={s.content}" + for s in uki.sections + if s.measure), + *(f'--bank={bank}' + for bank in banks), + ] + + for priv_key, pub_key, group in zip(opts.pcr_private_keys, + pub_keys, + pp_groups): + extra = [f'--private-key={priv_key}'] + if pub_key: + extra += [f'--public-key={pub_key}'] + extra += [f'--phase={phase_path}' for phase_path in group or ()] + + print('+', shell_join(cmd + extra)) + pcrsig = subprocess.check_output(cmd + extra, text=True) + pcrsig = json.loads(pcrsig) + pcrsigs += [pcrsig] + + combined = combine_signatures(pcrsigs) + uki.add_section(Section.create('.pcrsig', combined)) + + +def make_uki(opts): + # kernel payload signing + + sbsign_tool = find_tool('sbsign', opts=opts) + sbsign_invocation = [ + sbsign_tool, + '--key', opts.sb_key, + '--cert', opts.sb_cert, + ] + + if opts.signing_engine is not None: + sbsign_invocation += ['--engine', opts.signing_engine] + + sign_kernel = opts.sign_kernel + if sign_kernel is None and opts.sb_key: + # figure out if we should sign the kernel + sbverify_tool = find_tool('sbverify', opts=opts) + + cmd = [ + sbverify_tool, + '--list', + opts.linux, + ] + + print('+', shell_join(cmd)) + info = subprocess.check_output(cmd, text=True) + + # sbverify has wonderful API + if 'No signature table present' in info: + sign_kernel = True + + if sign_kernel: + linux_signed = tempfile.NamedTemporaryFile(prefix='linux-signed') + linux = linux_signed.name + + cmd = [ + *sbsign_invocation, + opts.linux, + '--output', linux, + ] + + print('+', shell_join(cmd)) + subprocess.check_call(cmd) + else: + linux = opts.linux + + uki = UKI(opts.stub) + + # TODO: derive public key from from opts.pcr_private_keys? + pcrpkey = opts.pcrpkey + if pcrpkey is None: + if opts.pcr_public_keys and len(opts.pcr_public_keys) == 1: + pcrpkey = opts.pcr_public_keys[0] + + sections = [ + # name, content, measure? + ('.osrel', opts.os_release, True ), + ('.cmdline', opts.cmdline, True ), + ('.dtb', opts.devicetree, True ), + ('.splash', opts.splash, True ), + ('.pcrpkey', pcrpkey, True ), + ('.initrd', opts.initrd, True ), + ('.uname', opts.uname, False), + + # linux shall be last to leave breathing room for decompression. + # We'll add it later. + ] + + for name, content, measure in sections: + if content: + uki.add_section(Section.create(name, content, measure=measure)) + + # systemd-measure doesn't know about those extra sections + for section in opts.sections: + uki.add_section(section) + + # PCR measurement and signing + + call_systemd_measure(uki, linux, opts=opts) + + # UKI creation + + uki.add_section( + Section.create('.linux', linux, measure=True, + flags=['code', 'readonly'])) + + if opts.sb_key: + unsigned = tempfile.NamedTemporaryFile(prefix='uki') + output = unsigned.name + else: + output = opts.output + + objcopy_tool = find_tool('objcopy', opts=opts) + + cmd = [ + objcopy_tool, + opts.stub, + *itertools.chain.from_iterable( + ('--add-section', f'{s.name}={s.content}', + '--change-section-vma', f'{s.name}=0x{s.offset:x}') + for s in uki.sections), + *itertools.chain.from_iterable( + ('--set-section-flags', f"{s.name}={','.join(s.flags)}") + for s in uki.sections + if s.flags is not None), + output, + ] + print('+', shell_join(cmd)) + subprocess.check_call(cmd) + + # UKI signing + + if opts.sb_key: + cmd = [ + *sbsign_invocation, + unsigned.name, + '--output', opts.output, + ] + print('+', shell_join(cmd)) + subprocess.check_call(cmd) + + # We end up with no executable bits, let's reapply them + os.umask(umask := os.umask(0)) + os.chmod(opts.output, 0o777 & ~umask) + + print(f"Wrote {'signed' if opts.sb_key else 'unsigned'} {opts.output}") + + +def parse_args(args=None): + p = argparse.ArgumentParser( + description='Build and sign Unified Kernel Images', + allow_abbrev=False, + usage='''\ +usage: ukify [options…] linux initrd + ukify -h | --help +''') + + # Suppress printing of usage synopsis on errors + p.error = lambda message: p.exit(2, f'{p.prog}: error: {message}\n') + + p.add_argument('linux', + type=pathlib.Path, + help='vmlinuz file [.linux section]') + p.add_argument('initrd', + type=pathlib.Path, + help='initrd file [.initrd section]') + + p.add_argument('--cmdline', + metavar='TEXT|@PATH', + help='kernel command line [.cmdline section]') + + p.add_argument('--os-release', + metavar='TEXT|@PATH', + help='path to os-release file [.osrel section]') + + p.add_argument('--devicetree', + metavar='PATH', + type=pathlib.Path, + help='Device Tree file [.dtb section]') + p.add_argument('--splash', + metavar='BMP', + type=pathlib.Path, + help='splash image bitmap file [.splash section]') + p.add_argument('--pcrpkey', + metavar='KEY', + type=pathlib.Path, + help='embedded public key to seal secrets to [.pcrpkey section]') + p.add_argument('--uname', + metavar='VERSION', + help='"uname -r" information [.uname section]') + + p.add_argument('--efi-arch', + metavar='ARCH', + choices=('ia32', 'x64', 'arm', 'aa64', 'riscv64'), + help='target EFI architecture') + + p.add_argument('--stub', + type=pathlib.Path, + help='path the the sd-stub file [.text,.data,… sections]') + + p.add_argument('--section', + dest='sections', + metavar='NAME:TEXT|@PATH', + type=Section.parse_arg, + action='append', + default=[], + help='additional section as name and contents [NAME section]') + + p.add_argument('--pcr-private-key', + dest='pcr_private_keys', + metavar='PATH', + type=pathlib.Path, + action='append', + help='private part of the keypair for signing PCR signatures') + p.add_argument('--pcr-public-key', + dest='pcr_public_keys', + metavar='PATH', + type=pathlib.Path, + action='append', + help='public part of the keypair for signing PCR signatures') + p.add_argument('--phases', + dest='phase_path_groups', + metavar='PHASE-PATH…', + type=parse_phase_paths, + action='append', + help='phase-paths to create signatures for') + + p.add_argument('--pcr-banks', + metavar='BANK…', + type=parse_banks) + + p.add_argument('--signing-engine', + metavar='ENGINE', + help='OpenSSL engine to use for signing') + p.add_argument('--secureboot-private-key', + dest='sb_key', + help='path to key file or engine-specific designation for SB signing') + p.add_argument('--secureboot-certificate', + dest='sb_cert', + help='path to certificate file or engine-specific designation for SB signing') + + p.add_argument('--sign-kernel', + action=argparse.BooleanOptionalAction, + help='Sign the embedded kernel') + + p.add_argument('--tools', + type=pathlib.Path, + help='a directory with systemd-measure and other tools') + + p.add_argument('--output', '-o', + type=pathlib.Path, + help='output file path') + + p.add_argument('--measure', + action=argparse.BooleanOptionalAction, + help='print systemd-measure output for the UKI') + + opts = p.parse_args(args) + + if opts.cmdline and opts.cmdline.startswith('@'): + opts.cmdline = pathlib.Path(opts.cmdline[1:]) + + if opts.os_release is not None and opts.os_release.startswith('@'): + opts.os_release = pathlib.Path(opts.os_release[1:]) + elif opts.os_release is None: + p = pathlib.Path('/etc/os-release') + if not p.exists(): + p = pathlib.Path('/usr/lib/os-release') + opts.os_release = p + + if opts.efi_arch is None: + opts.efi_arch = guess_efi_arch() + + if opts.stub is None: + opts.stub = f'/usr/lib/systemd/boot/efi/linux{opts.efi_arch}.efi.stub' + + if opts.signing_engine is None: + opts.sb_key = pathlib.Path(opts.sb_key) if opts.sb_key else None + opts.sb_cert = pathlib.Path(opts.sb_cert) if opts.sb_cert else None + + if bool(opts.sb_key) ^ bool(opts.sb_cert): + raise ValueError('--secureboot-private-key= and --secureboot-certificate= must be specified together') + + if opts.sign_kernel and not opts.sb_key: + raise ValueError('--sign-kernel requires --secureboot-private-key= and --secureboot-certificate= to be specified') + + n_pcr_pub = None if opts.pcr_public_keys is None else len(opts.pcr_public_keys) + n_pcr_priv = None if opts.pcr_private_keys is None else len(opts.pcr_private_keys) + n_phase_path_groups = None if opts.phase_path_groups is None else len(opts.phase_path_groups) + if n_pcr_pub is not None and n_pcr_pub != n_pcr_priv: + raise ValueError('--pcr-public-key= specifications must match --pcr-private-key=') + if n_phase_path_groups is not None and n_phase_path_groups != n_pcr_priv: + raise ValueError('--phases= specifications must match --pcr-private-key=') + + if opts.output is None: + suffix = '.efi' if opts.sb_key else '.unsigned.efi' + opts.output = opts.linux.name + suffix + + for section in opts.sections: + section.check_name() + + return opts + + +def main(): + opts = parse_args() + check_inputs(opts) + make_uki(opts) + + +if __name__ == '__main__': + main()