diff options
-rw-r--r-- | qutebrowser/browser/webengine/webenginesettings.py | 5 | ||||
-rw-r--r-- | qutebrowser/misc/binparsing.py | 43 | ||||
-rw-r--r-- | qutebrowser/misc/elf.py | 78 | ||||
-rw-r--r-- | qutebrowser/misc/pakjoy.py | 218 | ||||
-rw-r--r-- | tests/unit/misc/test_pakjoy.py | 277 |
5 files changed, 566 insertions, 55 deletions
diff --git a/qutebrowser/browser/webengine/webenginesettings.py b/qutebrowser/browser/webengine/webenginesettings.py index 0a3b6b084..e1aa7c52e 100644 --- a/qutebrowser/browser/webengine/webenginesettings.py +++ b/qutebrowser/browser/webengine/webenginesettings.py @@ -24,6 +24,7 @@ from qutebrowser.browser.webengine import (spell, webenginequtescheme, cookies, webenginedownloads, notification) from qutebrowser.config import config, websettings from qutebrowser.config.websettings import AttributeInfo as Attr +from qutebrowser.misc import pakjoy from qutebrowser.utils import (standarddir, qtutils, message, log, urlmatch, usertypes, objreg, version) if TYPE_CHECKING: @@ -553,6 +554,10 @@ def init(): _global_settings = WebEngineSettings(_SettingsWrapper()) log.init.debug("Initializing profiles...") + + # Apply potential resource patches before initializing profiles. + pakjoy.patch() + _init_default_profile() init_private_profile() config.instance.changed.connect(_update_settings) diff --git a/qutebrowser/misc/binparsing.py b/qutebrowser/misc/binparsing.py new file mode 100644 index 000000000..81e2e6dbb --- /dev/null +++ b/qutebrowser/misc/binparsing.py @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: Florian Bruhin (The-Compiler) <mail@qutebrowser.org> +# +# SPDX-License-Identifier: GPL-3.0-or-later + +"""Utilities for parsing binary files. + +Used by elf.py as well as pakjoy.py. +""" + +import struct +from typing import Any, IO, Tuple + + +class ParseError(Exception): + + """Raised when the file can't be parsed.""" + + +def unpack(fmt: str, fobj: IO[bytes]) -> Tuple[Any, ...]: + """Unpack the given struct format from the given file.""" + size = struct.calcsize(fmt) + data = safe_read(fobj, size) + + try: + return struct.unpack(fmt, data) + except struct.error as e: + raise ParseError(e) + + +def safe_read(fobj: IO[bytes], size: int) -> bytes: + """Read from a file, handling possible exceptions.""" + try: + return fobj.read(size) + except (OSError, OverflowError) as e: + raise ParseError(e) + + +def safe_seek(fobj: IO[bytes], pos: int) -> None: + """Seek in a file, handling possible exceptions.""" + try: + fobj.seek(pos) + except (OSError, OverflowError) as e: + raise ParseError(e) diff --git a/qutebrowser/misc/elf.py b/qutebrowser/misc/elf.py index aa717e790..35af5af28 100644 --- a/qutebrowser/misc/elf.py +++ b/qutebrowser/misc/elf.py @@ -44,21 +44,16 @@ This is a "best effort" parser. If it errors out, we instead end up relying on t PyQtWebEngine version, which is the next best thing. """ -import struct import enum import re import dataclasses import mmap import pathlib -from typing import Any, IO, ClassVar, Dict, Optional, Tuple, cast +from typing import IO, ClassVar, Dict, Optional, cast from qutebrowser.qt import machinery from qutebrowser.utils import log, version, qtutils - - -class ParseError(Exception): - - """Raised when the ELF file can't be parsed.""" +from qutebrowser.misc import binparsing class Bitness(enum.Enum): @@ -77,33 +72,6 @@ class Endianness(enum.Enum): big = 2 -def _unpack(fmt: str, fobj: IO[bytes]) -> Tuple[Any, ...]: - """Unpack the given struct format from the given file.""" - size = struct.calcsize(fmt) - data = _safe_read(fobj, size) - - try: - return struct.unpack(fmt, data) - except struct.error as e: - raise ParseError(e) - - -def _safe_read(fobj: IO[bytes], size: int) -> bytes: - """Read from a file, handling possible exceptions.""" - try: - return fobj.read(size) - except (OSError, OverflowError) as e: - raise ParseError(e) - - -def _safe_seek(fobj: IO[bytes], pos: int) -> None: - """Seek in a file, handling possible exceptions.""" - try: - fobj.seek(pos) - except (OSError, OverflowError) as e: - raise ParseError(e) - - @dataclasses.dataclass class Ident: @@ -125,17 +93,17 @@ class Ident: @classmethod def parse(cls, fobj: IO[bytes]) -> 'Ident': """Parse an ELF ident header from a file.""" - magic, klass, data, elfversion, osabi, abiversion = _unpack(cls._FORMAT, fobj) + magic, klass, data, elfversion, osabi, abiversion = binparsing.unpack(cls._FORMAT, fobj) try: bitness = Bitness(klass) except ValueError: - raise ParseError(f"Invalid bitness {klass}") + raise binparsing.ParseError(f"Invalid bitness {klass}") try: endianness = Endianness(data) except ValueError: - raise ParseError(f"Invalid endianness {data}") + raise binparsing.ParseError(f"Invalid endianness {data}") return cls(magic, bitness, endianness, elfversion, osabi, abiversion) @@ -172,7 +140,7 @@ class Header: def parse(cls, fobj: IO[bytes], bitness: Bitness) -> 'Header': """Parse an ELF header from a file.""" fmt = cls._FORMATS[bitness] - return cls(*_unpack(fmt, fobj)) + return cls(*binparsing.unpack(fmt, fobj)) @dataclasses.dataclass @@ -203,39 +171,39 @@ class SectionHeader: def parse(cls, fobj: IO[bytes], bitness: Bitness) -> 'SectionHeader': """Parse an ELF section header from a file.""" fmt = cls._FORMATS[bitness] - return cls(*_unpack(fmt, fobj)) + return cls(*binparsing.unpack(fmt, fobj)) def get_rodata_header(f: IO[bytes]) -> SectionHeader: """Parse an ELF file and find the .rodata section header.""" ident = Ident.parse(f) if ident.magic != b'\x7fELF': - raise ParseError(f"Invalid magic {ident.magic!r}") + raise binparsing.ParseError(f"Invalid magic {ident.magic!r}") if ident.data != Endianness.little: - raise ParseError("Big endian is unsupported") + raise binparsing.ParseError("Big endian is unsupported") if ident.version != 1: - raise ParseError(f"Only version 1 is supported, not {ident.version}") + raise binparsing.ParseError(f"Only version 1 is supported, not {ident.version}") header = Header.parse(f, bitness=ident.klass) # Read string table - _safe_seek(f, header.shoff + header.shstrndx * header.shentsize) + binparsing.safe_seek(f, header.shoff + header.shstrndx * header.shentsize) shstr = SectionHeader.parse(f, bitness=ident.klass) - _safe_seek(f, shstr.offset) - string_table = _safe_read(f, shstr.size) + binparsing.safe_seek(f, shstr.offset) + string_table = binparsing.safe_read(f, shstr.size) # Back to all sections for i in range(header.shnum): - _safe_seek(f, header.shoff + i * header.shentsize) + binparsing.safe_seek(f, header.shoff + i * header.shentsize) sh = SectionHeader.parse(f, bitness=ident.klass) name = string_table[sh.name:].split(b'\x00')[0] if name == b'.rodata': return sh - raise ParseError("No .rodata section found") + raise binparsing.ParseError("No .rodata section found") @dataclasses.dataclass @@ -262,7 +230,7 @@ def _find_versions(data: bytes) -> Versions: chromium=match.group(2).decode('ascii'), ) except UnicodeDecodeError as e: - raise ParseError(e) + raise binparsing.ParseError(e) # Here it gets even more crazy: Sometimes, we don't have the full UA in one piece # in the string table somehow (?!). However, Qt 6.2 added a separate @@ -273,20 +241,20 @@ def _find_versions(data: bytes) -> Versions: # We first get the partial Chromium version from the UA: match = re.search(pattern[:-4], data) # without trailing literal \x00 if match is None: - raise ParseError("No match in .rodata") + raise binparsing.ParseError("No match in .rodata") webengine_bytes = match.group(1) partial_chromium_bytes = match.group(2) if b"." not in partial_chromium_bytes or len(partial_chromium_bytes) < 6: # some sanity checking - raise ParseError("Inconclusive partial Chromium bytes") + raise binparsing.ParseError("Inconclusive partial Chromium bytes") # And then try to find the *full* string, stored separately, based on the # partial one we got above. pattern = br"\x00(" + re.escape(partial_chromium_bytes) + br"[0-9.]+)\x00" match = re.search(pattern, data) if match is None: - raise ParseError("No match in .rodata for full version") + raise binparsing.ParseError("No match in .rodata for full version") chromium_bytes = match.group(1) try: @@ -295,7 +263,7 @@ def _find_versions(data: bytes) -> Versions: chromium=chromium_bytes.decode('ascii'), ) except UnicodeDecodeError as e: - raise ParseError(e) + raise binparsing.ParseError(e) def _parse_from_file(f: IO[bytes]) -> Versions: @@ -316,8 +284,8 @@ def _parse_from_file(f: IO[bytes]) -> Versions: return _find_versions(cast(bytes, mmap_data)) except (OSError, OverflowError) as e: log.misc.debug(f"mmap failed ({e}), falling back to reading", exc_info=True) - _safe_seek(f, sh.offset) - data = _safe_read(f, sh.size) + binparsing.safe_seek(f, sh.offset) + data = binparsing.safe_read(f, sh.size) return _find_versions(data) @@ -344,6 +312,6 @@ def parse_webenginecore() -> Optional[Versions]: log.misc.debug(f"Got versions from ELF: {versions}") return versions - except ParseError as e: + except binparsing.ParseError as e: log.misc.debug(f"Failed to parse ELF: {e}", exc_info=True) return None diff --git a/qutebrowser/misc/pakjoy.py b/qutebrowser/misc/pakjoy.py new file mode 100644 index 000000000..12e0c8a3e --- /dev/null +++ b/qutebrowser/misc/pakjoy.py @@ -0,0 +1,218 @@ +# SPDX-FileCopyrightText: Florian Bruhin (The-Compiler) <mail@qutebrowser.org> +# +# SPDX-License-Identifier: GPL-3.0-or-later + +"""Chromium .pak repacking. + +This entire file is a great WORKAROUND for https://bugreports.qt.io/browse/QTBUG-118157 +and the fact we can't just simply disable the hangouts extension: +https://bugreports.qt.io/browse/QTBUG-118452 + +It's yet another big hack. If you think this is bad, look at elf.py instead. + +The name of this file might or might not be inspired by a certain vegetable, +as well as the "joy" this bug has caused me. + +Useful references: + +- https://sweetscape.com/010editor/repository/files/PAK.bt (010 editor <3) +- https://textslashplain.com/2022/05/03/chromium-internals-pak-files/ +- https://github.com/myfreeer/chrome-pak-customizer +- https://source.chromium.org/chromium/chromium/src/+/main:tools/grit/pak_util.py +- https://source.chromium.org/chromium/chromium/src/+/main:tools/grit/grit/format/data_pack.py + +This is a "best effort" parser. If it errors out, we don't apply the workaround +instead of crashing. +""" + +import os +import shutil +import pathlib +import dataclasses +from typing import ClassVar, IO, Optional, Dict, Tuple + +from qutebrowser.misc import binparsing +from qutebrowser.utils import qtutils, standarddir, version, utils, log + +HANGOUTS_MARKER = b"// Extension ID: nkeimhogjdpnpccoofpliimaahmaaome" +HANGOUTS_ID = 36197 # as found by toofar +PAK_VERSION = 5 + +TARGET_URL = b"https://*.google.com/*" +REPLACEMENT_URL = b"https://qute.invalid/*" +assert len(TARGET_URL) == len(REPLACEMENT_URL) + + +@dataclasses.dataclass +class PakHeader: + + """Chromium .pak header (version 5).""" + + encoding: int # uint32 + resource_count: int # uint16 + _alias_count: int # uint16 + + _FORMAT: ClassVar[str] = '<IHH' + + @classmethod + def parse(cls, fobj: IO[bytes]) -> 'PakHeader': + """Parse a PAK version 5 header from a file.""" + return cls(*binparsing.unpack(cls._FORMAT, fobj)) + + +@dataclasses.dataclass +class PakEntry: + + """Entry description in a .pak file.""" + + resource_id: int # uint16 + file_offset: int # uint32 + size: int = 0 # not in file + + _FORMAT: ClassVar[str] = '<HI' + + @classmethod + def parse(cls, fobj: IO[bytes]) -> 'PakEntry': + """Parse a PAK entry from a file.""" + return cls(*binparsing.unpack(cls._FORMAT, fobj)) + + +class PakParser: + """Parse webengine pak and find patch location to disable Google Meet extension.""" + + def __init__(self, fobj: IO[bytes]) -> None: + """Parse the .pak file from the given file object.""" + pak_version = binparsing.unpack("<I", fobj)[0] + if pak_version != PAK_VERSION: + raise binparsing.ParseError(f"Unsupported .pak version {pak_version}") + + self.fobj = fobj + entries = self._read_header() + self.manifest_entry, self.manifest = self._find_manifest(entries) + + def find_patch_offset(self) -> int: + """Return byte offset of TARGET_URL into the pak file.""" + try: + return self.manifest_entry.file_offset + self.manifest.index(TARGET_URL) + except ValueError: + raise binparsing.ParseError("Couldn't find URL in manifest") + + def _maybe_get_hangouts_manifest(self, entry: PakEntry) -> Optional[bytes]: + self.fobj.seek(entry.file_offset) + data = self.fobj.read(entry.size) + + if not data.startswith(b"{") or not data.rstrip(b"\n").endswith(b"}"): + # not JSON + return None + + if HANGOUTS_MARKER not in data: + return None + + return data + + def _read_header(self) -> Dict[int, PakEntry]: + """Read the header and entry index from the .pak file.""" + entries = [] + + header = PakHeader.parse(self.fobj) + for _ in range(header.resource_count + 1): # + 1 due to sentinel at end + entries.append(PakEntry.parse(self.fobj)) + + for entry, next_entry in zip(entries, entries[1:]): + if entry.resource_id == 0: + raise binparsing.ParseError("Unexpected sentinel entry") + entry.size = next_entry.file_offset - entry.file_offset + + if entries[-1].resource_id != 0: + raise binparsing.ParseError("Missing sentinel entry") + del entries[-1] + + return {entry.resource_id: entry for entry in entries} + + def _find_manifest(self, entries: Dict[int, PakEntry]) -> Tuple[PakEntry, bytes]: + if HANGOUTS_ID in entries: + suspected_entry = entries[HANGOUTS_ID] + manifest = self._maybe_get_hangouts_manifest(suspected_entry) + if manifest is not None: + return suspected_entry, manifest + + # didn't find it via the prevously known ID, let's search them all... + for entry in entries.values(): + manifest = self._maybe_get_hangouts_manifest(entry) + if manifest is not None: + return entry, manifest + + raise binparsing.ParseError("Couldn't find hangouts manifest") + + +def copy_webengine_resources() -> pathlib.Path: + """Copy qtwebengine resources to local dir for patching.""" + resources_dir = qtutils.library_path(qtutils.LibraryPath.data) + if utils.is_mac: + # I'm not sure how to arrive at this path without hardcoding it + # ourselves. importlib_resources("PyQt6.Qt6") can serve as a + # replacement for the qtutils bit but it doesn't seem to help find the + # actually Resources folder. + resources_dir /= pathlib.Path("lib", "QtWebEngineCore.framework", "Resources") + else: + resources_dir /= "resources" + work_dir = pathlib.Path(standarddir.cache()) / "webengine_resources_pak_quirk" + + log.misc.debug( + "Copying webengine resources for quirk patching: " + f"{resources_dir} -> {work_dir}" + ) + + if work_dir.exists(): + # TODO: make backup? + shutil.rmtree(work_dir) + + shutil.copytree(resources_dir, work_dir) + + os.environ["QTWEBENGINE_RESOURCES_PATH"] = str(work_dir) + + return work_dir + + +def patch(file_to_patch: pathlib.Path = None) -> None: + """Apply any patches to webengine resource pak files.""" + versions = version.qtwebengine_versions(avoid_init=True) + if versions.webengine != utils.VersionNumber(6, 6): + return + + if not file_to_patch: + try: + file_to_patch = copy_webengine_resources() / "qtwebengine_resources.pak" + except OSError: + log.misc.exception("Failed to copy webengine resources, not applying quirk") + return + + if not file_to_patch.exists(): + log.misc.error( + "Resource pak doesn't exist at expected location! " + f"Not applying quirks. Expected location: {file_to_patch}" + ) + return + + with open(file_to_patch, "r+b") as f: + try: + parser = PakParser(f) + log.misc.debug(f"Patching pak entry: {parser.manifest_entry}") + offset = parser.find_patch_offset() + binparsing.safe_seek(f, offset) + f.write(REPLACEMENT_URL) + except binparsing.ParseError: + log.misc.exception("Failed to apply quirk to resources pak.") + + +if __name__ == "__main__": + output_test_file = pathlib.Path("/tmp/test.pak") + #shutil.copy("/opt/google/chrome/resources.pak", output_test_file) + shutil.copy("/usr/share/qt6/resources/qtwebengine_resources.pak", output_test_file) + patch(output_test_file) + + with open(output_test_file, "rb") as fd: + reparsed = PakParser(fd) + + print(reparsed.manifest_entry) + print(reparsed.manifest) diff --git a/tests/unit/misc/test_pakjoy.py b/tests/unit/misc/test_pakjoy.py new file mode 100644 index 000000000..5c35d0111 --- /dev/null +++ b/tests/unit/misc/test_pakjoy.py @@ -0,0 +1,277 @@ +# SPDX-FileCopyrightText: Florian Bruhin (The-Compiler) <mail@qutebrowser.org> +# +# SPDX-License-Identifier: GPL-3.0-or-later + +import os +import io +import json +import struct +import pathlib +import logging + +import pytest + +from qutebrowser.misc import pakjoy, binparsing +from qutebrowser.utils import utils, version, standarddir + + +pytest.importorskip("qutebrowser.qt.webenginecore") + + +versions = version.qtwebengine_versions(avoid_init=True) + + +@pytest.fixture +def skipifneeded(): + """Used to skip happy path tests with the real resources file. + + Since we don't know how reliably the Google Meet hangouts extensions is + reliably in the resource files, and this quirk is only targeting 6.6 + anyway. + """ + if versions.webengine != utils.VersionNumber(6, 6): + raise pytest.skip("Code under test only runs on 6.6") + + +@pytest.fixture(autouse=True) +def clean_env(): + yield + if "QTWEBENGINE_RESOURCES_PATH" in os.environ: + del os.environ["QTWEBENGINE_RESOURCES_PATH"] + + +def patch_version(monkeypatch, *args): + monkeypatch.setattr( + pakjoy.version, + "qtwebengine_versions", + lambda **kwargs: version.WebEngineVersions( + webengine=utils.VersionNumber(*args), + chromium=None, + source="unittest", + ) + ) + + +@pytest.fixture +def unaffected_version(monkeypatch): + patch_version(monkeypatch, 6, 6, 1) + + +@pytest.fixture +def affected_version(monkeypatch): + patch_version(monkeypatch, 6, 6) + + +def test_version_gate(unaffected_version, mocker): + + fake_open = mocker.patch("qutebrowser.misc.pakjoy.open") + pakjoy.patch() + assert not fake_open.called + + +@pytest.fixture(autouse=True) +def tmp_cache(tmp_path, monkeypatch): + monkeypatch.setattr(pakjoy.standarddir, "cache", lambda: tmp_path) + return str(tmp_path) + + +def json_without_comments(bytestring): + str_without_comments = "\n".join( + [ + line + for line in + bytestring.decode("utf-8").split("\n") + if not line.strip().startswith("//") + ] + ) + return json.loads(str_without_comments) + + +@pytest.mark.usefixtures("affected_version") +class TestWithRealResourcesFile: + """Tests that use the real pak file form the Qt installation.""" + + def test_happy_path(self, skipifneeded): + # Go through the full patching processes with the real resources file from + # the current installation. Make sure our replacement string is in it + # afterwards. + pakjoy.patch() + + patched_resources = pathlib.Path(os.environ["QTWEBENGINE_RESOURCES_PATH"]) + + with open(patched_resources / "qtwebengine_resources.pak", "rb") as fd: + reparsed = pakjoy.PakParser(fd) + + json_manifest = json_without_comments(reparsed.manifest) + + assert pakjoy.REPLACEMENT_URL.decode("utf-8") in json_manifest[ + "externally_connectable" + ]["matches"] + + def test_copying_resources(self): + # Test we managed to copy some files over + work_dir = pakjoy.copy_webengine_resources() + + assert work_dir.exists() + assert work_dir == standarddir.cache() / "webengine_resources_pak_quirk" + assert (work_dir / "qtwebengine_resources.pak").exists() + assert len(list(work_dir.glob("*"))) > 1 + + def test_copying_resources_overwrites(self): + work_dir = pakjoy.copy_webengine_resources() + tmpfile = work_dir / "tmp.txt" + tmpfile.touch() + + pakjoy.copy_webengine_resources() + assert not tmpfile.exists() + + @pytest.mark.parametrize("osfunc", ["copytree", "rmtree"]) + def test_copying_resources_oserror(self, monkeypatch, caplog, osfunc): + # Test errors from the calls to shutil are handled + pakjoy.copy_webengine_resources() # run twice so we hit rmtree too + caplog.clear() + + def raiseme(err): + raise err + + monkeypatch.setattr(pakjoy.shutil, osfunc, lambda *_args: raiseme(PermissionError(osfunc))) + with caplog.at_level(logging.ERROR, "misc"): + pakjoy.patch() + assert caplog.messages == ["Failed to copy webengine resources, not applying quirk"] + + def test_expected_file_not_found(self, tmp_cache, monkeypatch, caplog): + with caplog.at_level(logging.ERROR, "misc"): + pakjoy.patch(pathlib.Path(tmp_cache) / "doesntexist") + assert caplog.messages[-1].startswith( + "Resource pak doesn't exist at expected location! " + "Not applying quirks. Expected location: " + ) + + +def json_manifest_factory(extension_id=pakjoy.HANGOUTS_MARKER, url=pakjoy.TARGET_URL): + assert isinstance(extension_id, bytes) + assert isinstance(url, bytes) + + return f""" + {{ + {extension_id.decode("utf-8")} + "key": "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDAQt2ZDdPfoSe/JI6ID5bgLHRCnCu9T36aYczmhw/tnv6QZB2I6WnOCMZXJZlRdqWc7w9jo4BWhYS50Vb4weMfh/I0On7VcRwJUgfAxW2cHB+EkmtI1v4v/OU24OqIa1Nmv9uRVeX0GjhQukdLNhAE6ACWooaf5kqKlCeK+1GOkQIDAQAB", + "name": "Google Hangouts", + // Note: Always update the version number when this file is updated. Chrome + // triggers extension preferences update on the version increase. + "version": "1.3.21", + "manifest_version": 2, + "externally_connectable": {{ + "matches": [ + "{url.decode("utf-8")}", + "http://localhost:*/*" + ] + }} + }} + """.strip().encode("utf-8") + + +def pak_factory(version=5, entries=None, encoding=1, sentinel_position=-1): + if entries is None: + entries = [json_manifest_factory()] + + buffer = io.BytesIO() + buffer.write(struct.pack("<I", version)) + buffer.write(struct.pack(pakjoy.Pak5Header._FORMAT, encoding, len(entries), 0)) + + entry_headers_size = (len(entries) + 1) * 6 + start_of_data = buffer.tell() + entry_headers_size + + # Normally the sentinel sits between the headers and the data. But to get + # full coverage we want to insert it in other positions. + with_indices = list(enumerate(entries, 1)) + if sentinel_position == -1: + with_indices.append((0, b"")) + elif sentinel_position is not None: + with_indices.insert(sentinel_position, (0, b"")) + + accumulated_data_offset = start_of_data + for idx, entry in with_indices: + buffer.write(struct.pack(pakjoy.PakEntry._FORMAT, idx, accumulated_data_offset)) + accumulated_data_offset += len(entry) + + for entry in entries: + assert isinstance(entry, bytes) + buffer.write(entry) + + buffer.seek(0) + return buffer + + +@pytest.mark.usefixtures("affected_version") +class TestWithConstructedResourcesFile: + """Tests that use a constructed pak file to give us more control over it.""" + + def test_happy_path(self): + buffer = pak_factory() + + parser = pakjoy.PakParser(buffer) + + json_manifest = json_without_comments(parser.manifest) + + assert pakjoy.TARGET_URL.decode("utf-8") in json_manifest[ + "externally_connectable" + ]["matches"] + + def test_bad_version(self): + buffer = pak_factory(version=99) + + with pytest.raises( + binparsing.ParseError, + match="Unsupported .pak version 99", + ): + pakjoy.PakParser(buffer) + + @pytest.mark.parametrize("position, error", [ + (0, "Unexpected sentinel entry"), + (None, "Missing sentinel entry"), + ]) + def test_bad_sentinal_position(self, position, error): + buffer = pak_factory(sentinel_position=position) + + with pytest.raises(binparsing.ParseError): + pakjoy.PakParser(buffer) + + @pytest.mark.parametrize("entry", [ + b"{foo}", + b"V2VsbCBoZWxsbyB0aGVyZQo=", + ]) + def test_marker_not_found(self, entry): + buffer = pak_factory(entries=[entry]) + + with pytest.raises( + binparsing.ParseError, + match="Couldn't find hangouts manifest", + ): + pakjoy.PakParser(buffer) + + def test_url_not_found(self): + buffer = pak_factory(entries=[json_manifest_factory(url=b"example.com")]) + + parser = pakjoy.PakParser(buffer) + with pytest.raises( + binparsing.ParseError, + match="Couldn't find URL in manifest", + ): + parser.find_patch_offset() + + def test_url_not_found_high_level(self, tmp_cache, caplog, + affected_version): + buffer = pak_factory(entries=[json_manifest_factory(url=b"example.com")]) + + # Write bytes to file so we can test pakjoy.patch() + tmpfile = pathlib.Path(tmp_cache) / "bad.pak" + with open(tmpfile, "wb") as fd: + fd.write(buffer.read()) + + with caplog.at_level(logging.ERROR, "misc"): + pakjoy.patch(tmpfile) + + assert caplog.messages == [ + "Failed to apply quirk to resources pak." + ] |