summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qutebrowser/browser/webengine/webenginesettings.py5
-rw-r--r--qutebrowser/misc/binparsing.py43
-rw-r--r--qutebrowser/misc/elf.py78
-rw-r--r--qutebrowser/misc/pakjoy.py218
-rw-r--r--tests/unit/misc/test_pakjoy.py277
5 files changed, 566 insertions, 55 deletions
diff --git a/qutebrowser/browser/webengine/webenginesettings.py b/qutebrowser/browser/webengine/webenginesettings.py
index 0a3b6b084..e1aa7c52e 100644
--- a/qutebrowser/browser/webengine/webenginesettings.py
+++ b/qutebrowser/browser/webengine/webenginesettings.py
@@ -24,6 +24,7 @@ from qutebrowser.browser.webengine import (spell, webenginequtescheme, cookies,
webenginedownloads, notification)
from qutebrowser.config import config, websettings
from qutebrowser.config.websettings import AttributeInfo as Attr
+from qutebrowser.misc import pakjoy
from qutebrowser.utils import (standarddir, qtutils, message, log,
urlmatch, usertypes, objreg, version)
if TYPE_CHECKING:
@@ -553,6 +554,10 @@ def init():
_global_settings = WebEngineSettings(_SettingsWrapper())
log.init.debug("Initializing profiles...")
+
+ # Apply potential resource patches before initializing profiles.
+ pakjoy.patch()
+
_init_default_profile()
init_private_profile()
config.instance.changed.connect(_update_settings)
diff --git a/qutebrowser/misc/binparsing.py b/qutebrowser/misc/binparsing.py
new file mode 100644
index 000000000..81e2e6dbb
--- /dev/null
+++ b/qutebrowser/misc/binparsing.py
@@ -0,0 +1,43 @@
+# SPDX-FileCopyrightText: Florian Bruhin (The-Compiler) <mail@qutebrowser.org>
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+"""Utilities for parsing binary files.
+
+Used by elf.py as well as pakjoy.py.
+"""
+
+import struct
+from typing import Any, IO, Tuple
+
+
+class ParseError(Exception):
+
+ """Raised when the file can't be parsed."""
+
+
+def unpack(fmt: str, fobj: IO[bytes]) -> Tuple[Any, ...]:
+ """Unpack the given struct format from the given file."""
+ size = struct.calcsize(fmt)
+ data = safe_read(fobj, size)
+
+ try:
+ return struct.unpack(fmt, data)
+ except struct.error as e:
+ raise ParseError(e)
+
+
+def safe_read(fobj: IO[bytes], size: int) -> bytes:
+ """Read from a file, handling possible exceptions."""
+ try:
+ return fobj.read(size)
+ except (OSError, OverflowError) as e:
+ raise ParseError(e)
+
+
+def safe_seek(fobj: IO[bytes], pos: int) -> None:
+ """Seek in a file, handling possible exceptions."""
+ try:
+ fobj.seek(pos)
+ except (OSError, OverflowError) as e:
+ raise ParseError(e)
diff --git a/qutebrowser/misc/elf.py b/qutebrowser/misc/elf.py
index aa717e790..35af5af28 100644
--- a/qutebrowser/misc/elf.py
+++ b/qutebrowser/misc/elf.py
@@ -44,21 +44,16 @@ This is a "best effort" parser. If it errors out, we instead end up relying on t
PyQtWebEngine version, which is the next best thing.
"""
-import struct
import enum
import re
import dataclasses
import mmap
import pathlib
-from typing import Any, IO, ClassVar, Dict, Optional, Tuple, cast
+from typing import IO, ClassVar, Dict, Optional, cast
from qutebrowser.qt import machinery
from qutebrowser.utils import log, version, qtutils
-
-
-class ParseError(Exception):
-
- """Raised when the ELF file can't be parsed."""
+from qutebrowser.misc import binparsing
class Bitness(enum.Enum):
@@ -77,33 +72,6 @@ class Endianness(enum.Enum):
big = 2
-def _unpack(fmt: str, fobj: IO[bytes]) -> Tuple[Any, ...]:
- """Unpack the given struct format from the given file."""
- size = struct.calcsize(fmt)
- data = _safe_read(fobj, size)
-
- try:
- return struct.unpack(fmt, data)
- except struct.error as e:
- raise ParseError(e)
-
-
-def _safe_read(fobj: IO[bytes], size: int) -> bytes:
- """Read from a file, handling possible exceptions."""
- try:
- return fobj.read(size)
- except (OSError, OverflowError) as e:
- raise ParseError(e)
-
-
-def _safe_seek(fobj: IO[bytes], pos: int) -> None:
- """Seek in a file, handling possible exceptions."""
- try:
- fobj.seek(pos)
- except (OSError, OverflowError) as e:
- raise ParseError(e)
-
-
@dataclasses.dataclass
class Ident:
@@ -125,17 +93,17 @@ class Ident:
@classmethod
def parse(cls, fobj: IO[bytes]) -> 'Ident':
"""Parse an ELF ident header from a file."""
- magic, klass, data, elfversion, osabi, abiversion = _unpack(cls._FORMAT, fobj)
+ magic, klass, data, elfversion, osabi, abiversion = binparsing.unpack(cls._FORMAT, fobj)
try:
bitness = Bitness(klass)
except ValueError:
- raise ParseError(f"Invalid bitness {klass}")
+ raise binparsing.ParseError(f"Invalid bitness {klass}")
try:
endianness = Endianness(data)
except ValueError:
- raise ParseError(f"Invalid endianness {data}")
+ raise binparsing.ParseError(f"Invalid endianness {data}")
return cls(magic, bitness, endianness, elfversion, osabi, abiversion)
@@ -172,7 +140,7 @@ class Header:
def parse(cls, fobj: IO[bytes], bitness: Bitness) -> 'Header':
"""Parse an ELF header from a file."""
fmt = cls._FORMATS[bitness]
- return cls(*_unpack(fmt, fobj))
+ return cls(*binparsing.unpack(fmt, fobj))
@dataclasses.dataclass
@@ -203,39 +171,39 @@ class SectionHeader:
def parse(cls, fobj: IO[bytes], bitness: Bitness) -> 'SectionHeader':
"""Parse an ELF section header from a file."""
fmt = cls._FORMATS[bitness]
- return cls(*_unpack(fmt, fobj))
+ return cls(*binparsing.unpack(fmt, fobj))
def get_rodata_header(f: IO[bytes]) -> SectionHeader:
"""Parse an ELF file and find the .rodata section header."""
ident = Ident.parse(f)
if ident.magic != b'\x7fELF':
- raise ParseError(f"Invalid magic {ident.magic!r}")
+ raise binparsing.ParseError(f"Invalid magic {ident.magic!r}")
if ident.data != Endianness.little:
- raise ParseError("Big endian is unsupported")
+ raise binparsing.ParseError("Big endian is unsupported")
if ident.version != 1:
- raise ParseError(f"Only version 1 is supported, not {ident.version}")
+ raise binparsing.ParseError(f"Only version 1 is supported, not {ident.version}")
header = Header.parse(f, bitness=ident.klass)
# Read string table
- _safe_seek(f, header.shoff + header.shstrndx * header.shentsize)
+ binparsing.safe_seek(f, header.shoff + header.shstrndx * header.shentsize)
shstr = SectionHeader.parse(f, bitness=ident.klass)
- _safe_seek(f, shstr.offset)
- string_table = _safe_read(f, shstr.size)
+ binparsing.safe_seek(f, shstr.offset)
+ string_table = binparsing.safe_read(f, shstr.size)
# Back to all sections
for i in range(header.shnum):
- _safe_seek(f, header.shoff + i * header.shentsize)
+ binparsing.safe_seek(f, header.shoff + i * header.shentsize)
sh = SectionHeader.parse(f, bitness=ident.klass)
name = string_table[sh.name:].split(b'\x00')[0]
if name == b'.rodata':
return sh
- raise ParseError("No .rodata section found")
+ raise binparsing.ParseError("No .rodata section found")
@dataclasses.dataclass
@@ -262,7 +230,7 @@ def _find_versions(data: bytes) -> Versions:
chromium=match.group(2).decode('ascii'),
)
except UnicodeDecodeError as e:
- raise ParseError(e)
+ raise binparsing.ParseError(e)
# Here it gets even more crazy: Sometimes, we don't have the full UA in one piece
# in the string table somehow (?!). However, Qt 6.2 added a separate
@@ -273,20 +241,20 @@ def _find_versions(data: bytes) -> Versions:
# We first get the partial Chromium version from the UA:
match = re.search(pattern[:-4], data) # without trailing literal \x00
if match is None:
- raise ParseError("No match in .rodata")
+ raise binparsing.ParseError("No match in .rodata")
webengine_bytes = match.group(1)
partial_chromium_bytes = match.group(2)
if b"." not in partial_chromium_bytes or len(partial_chromium_bytes) < 6:
# some sanity checking
- raise ParseError("Inconclusive partial Chromium bytes")
+ raise binparsing.ParseError("Inconclusive partial Chromium bytes")
# And then try to find the *full* string, stored separately, based on the
# partial one we got above.
pattern = br"\x00(" + re.escape(partial_chromium_bytes) + br"[0-9.]+)\x00"
match = re.search(pattern, data)
if match is None:
- raise ParseError("No match in .rodata for full version")
+ raise binparsing.ParseError("No match in .rodata for full version")
chromium_bytes = match.group(1)
try:
@@ -295,7 +263,7 @@ def _find_versions(data: bytes) -> Versions:
chromium=chromium_bytes.decode('ascii'),
)
except UnicodeDecodeError as e:
- raise ParseError(e)
+ raise binparsing.ParseError(e)
def _parse_from_file(f: IO[bytes]) -> Versions:
@@ -316,8 +284,8 @@ def _parse_from_file(f: IO[bytes]) -> Versions:
return _find_versions(cast(bytes, mmap_data))
except (OSError, OverflowError) as e:
log.misc.debug(f"mmap failed ({e}), falling back to reading", exc_info=True)
- _safe_seek(f, sh.offset)
- data = _safe_read(f, sh.size)
+ binparsing.safe_seek(f, sh.offset)
+ data = binparsing.safe_read(f, sh.size)
return _find_versions(data)
@@ -344,6 +312,6 @@ def parse_webenginecore() -> Optional[Versions]:
log.misc.debug(f"Got versions from ELF: {versions}")
return versions
- except ParseError as e:
+ except binparsing.ParseError as e:
log.misc.debug(f"Failed to parse ELF: {e}", exc_info=True)
return None
diff --git a/qutebrowser/misc/pakjoy.py b/qutebrowser/misc/pakjoy.py
new file mode 100644
index 000000000..12e0c8a3e
--- /dev/null
+++ b/qutebrowser/misc/pakjoy.py
@@ -0,0 +1,218 @@
+# SPDX-FileCopyrightText: Florian Bruhin (The-Compiler) <mail@qutebrowser.org>
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+"""Chromium .pak repacking.
+
+This entire file is a great WORKAROUND for https://bugreports.qt.io/browse/QTBUG-118157
+and the fact we can't just simply disable the hangouts extension:
+https://bugreports.qt.io/browse/QTBUG-118452
+
+It's yet another big hack. If you think this is bad, look at elf.py instead.
+
+The name of this file might or might not be inspired by a certain vegetable,
+as well as the "joy" this bug has caused me.
+
+Useful references:
+
+- https://sweetscape.com/010editor/repository/files/PAK.bt (010 editor <3)
+- https://textslashplain.com/2022/05/03/chromium-internals-pak-files/
+- https://github.com/myfreeer/chrome-pak-customizer
+- https://source.chromium.org/chromium/chromium/src/+/main:tools/grit/pak_util.py
+- https://source.chromium.org/chromium/chromium/src/+/main:tools/grit/grit/format/data_pack.py
+
+This is a "best effort" parser. If it errors out, we don't apply the workaround
+instead of crashing.
+"""
+
+import os
+import shutil
+import pathlib
+import dataclasses
+from typing import ClassVar, IO, Optional, Dict, Tuple
+
+from qutebrowser.misc import binparsing
+from qutebrowser.utils import qtutils, standarddir, version, utils, log
+
+HANGOUTS_MARKER = b"// Extension ID: nkeimhogjdpnpccoofpliimaahmaaome"
+HANGOUTS_ID = 36197 # as found by toofar
+PAK_VERSION = 5
+
+TARGET_URL = b"https://*.google.com/*"
+REPLACEMENT_URL = b"https://qute.invalid/*"
+assert len(TARGET_URL) == len(REPLACEMENT_URL)
+
+
+@dataclasses.dataclass
+class PakHeader:
+
+ """Chromium .pak header (version 5)."""
+
+ encoding: int # uint32
+ resource_count: int # uint16
+ _alias_count: int # uint16
+
+ _FORMAT: ClassVar[str] = '<IHH'
+
+ @classmethod
+ def parse(cls, fobj: IO[bytes]) -> 'PakHeader':
+ """Parse a PAK version 5 header from a file."""
+ return cls(*binparsing.unpack(cls._FORMAT, fobj))
+
+
+@dataclasses.dataclass
+class PakEntry:
+
+ """Entry description in a .pak file."""
+
+ resource_id: int # uint16
+ file_offset: int # uint32
+ size: int = 0 # not in file
+
+ _FORMAT: ClassVar[str] = '<HI'
+
+ @classmethod
+ def parse(cls, fobj: IO[bytes]) -> 'PakEntry':
+ """Parse a PAK entry from a file."""
+ return cls(*binparsing.unpack(cls._FORMAT, fobj))
+
+
+class PakParser:
+ """Parse webengine pak and find patch location to disable Google Meet extension."""
+
+ def __init__(self, fobj: IO[bytes]) -> None:
+ """Parse the .pak file from the given file object."""
+ pak_version = binparsing.unpack("<I", fobj)[0]
+ if pak_version != PAK_VERSION:
+ raise binparsing.ParseError(f"Unsupported .pak version {pak_version}")
+
+ self.fobj = fobj
+ entries = self._read_header()
+ self.manifest_entry, self.manifest = self._find_manifest(entries)
+
+ def find_patch_offset(self) -> int:
+ """Return byte offset of TARGET_URL into the pak file."""
+ try:
+ return self.manifest_entry.file_offset + self.manifest.index(TARGET_URL)
+ except ValueError:
+ raise binparsing.ParseError("Couldn't find URL in manifest")
+
+ def _maybe_get_hangouts_manifest(self, entry: PakEntry) -> Optional[bytes]:
+ self.fobj.seek(entry.file_offset)
+ data = self.fobj.read(entry.size)
+
+ if not data.startswith(b"{") or not data.rstrip(b"\n").endswith(b"}"):
+ # not JSON
+ return None
+
+ if HANGOUTS_MARKER not in data:
+ return None
+
+ return data
+
+ def _read_header(self) -> Dict[int, PakEntry]:
+ """Read the header and entry index from the .pak file."""
+ entries = []
+
+ header = PakHeader.parse(self.fobj)
+ for _ in range(header.resource_count + 1): # + 1 due to sentinel at end
+ entries.append(PakEntry.parse(self.fobj))
+
+ for entry, next_entry in zip(entries, entries[1:]):
+ if entry.resource_id == 0:
+ raise binparsing.ParseError("Unexpected sentinel entry")
+ entry.size = next_entry.file_offset - entry.file_offset
+
+ if entries[-1].resource_id != 0:
+ raise binparsing.ParseError("Missing sentinel entry")
+ del entries[-1]
+
+ return {entry.resource_id: entry for entry in entries}
+
+ def _find_manifest(self, entries: Dict[int, PakEntry]) -> Tuple[PakEntry, bytes]:
+ if HANGOUTS_ID in entries:
+ suspected_entry = entries[HANGOUTS_ID]
+ manifest = self._maybe_get_hangouts_manifest(suspected_entry)
+ if manifest is not None:
+ return suspected_entry, manifest
+
+ # didn't find it via the prevously known ID, let's search them all...
+ for entry in entries.values():
+ manifest = self._maybe_get_hangouts_manifest(entry)
+ if manifest is not None:
+ return entry, manifest
+
+ raise binparsing.ParseError("Couldn't find hangouts manifest")
+
+
+def copy_webengine_resources() -> pathlib.Path:
+ """Copy qtwebengine resources to local dir for patching."""
+ resources_dir = qtutils.library_path(qtutils.LibraryPath.data)
+ if utils.is_mac:
+ # I'm not sure how to arrive at this path without hardcoding it
+ # ourselves. importlib_resources("PyQt6.Qt6") can serve as a
+ # replacement for the qtutils bit but it doesn't seem to help find the
+ # actually Resources folder.
+ resources_dir /= pathlib.Path("lib", "QtWebEngineCore.framework", "Resources")
+ else:
+ resources_dir /= "resources"
+ work_dir = pathlib.Path(standarddir.cache()) / "webengine_resources_pak_quirk"
+
+ log.misc.debug(
+ "Copying webengine resources for quirk patching: "
+ f"{resources_dir} -> {work_dir}"
+ )
+
+ if work_dir.exists():
+ # TODO: make backup?
+ shutil.rmtree(work_dir)
+
+ shutil.copytree(resources_dir, work_dir)
+
+ os.environ["QTWEBENGINE_RESOURCES_PATH"] = str(work_dir)
+
+ return work_dir
+
+
+def patch(file_to_patch: pathlib.Path = None) -> None:
+ """Apply any patches to webengine resource pak files."""
+ versions = version.qtwebengine_versions(avoid_init=True)
+ if versions.webengine != utils.VersionNumber(6, 6):
+ return
+
+ if not file_to_patch:
+ try:
+ file_to_patch = copy_webengine_resources() / "qtwebengine_resources.pak"
+ except OSError:
+ log.misc.exception("Failed to copy webengine resources, not applying quirk")
+ return
+
+ if not file_to_patch.exists():
+ log.misc.error(
+ "Resource pak doesn't exist at expected location! "
+ f"Not applying quirks. Expected location: {file_to_patch}"
+ )
+ return
+
+ with open(file_to_patch, "r+b") as f:
+ try:
+ parser = PakParser(f)
+ log.misc.debug(f"Patching pak entry: {parser.manifest_entry}")
+ offset = parser.find_patch_offset()
+ binparsing.safe_seek(f, offset)
+ f.write(REPLACEMENT_URL)
+ except binparsing.ParseError:
+ log.misc.exception("Failed to apply quirk to resources pak.")
+
+
+if __name__ == "__main__":
+ output_test_file = pathlib.Path("/tmp/test.pak")
+ #shutil.copy("/opt/google/chrome/resources.pak", output_test_file)
+ shutil.copy("/usr/share/qt6/resources/qtwebengine_resources.pak", output_test_file)
+ patch(output_test_file)
+
+ with open(output_test_file, "rb") as fd:
+ reparsed = PakParser(fd)
+
+ print(reparsed.manifest_entry)
+ print(reparsed.manifest)
diff --git a/tests/unit/misc/test_pakjoy.py b/tests/unit/misc/test_pakjoy.py
new file mode 100644
index 000000000..5c35d0111
--- /dev/null
+++ b/tests/unit/misc/test_pakjoy.py
@@ -0,0 +1,277 @@
+# SPDX-FileCopyrightText: Florian Bruhin (The-Compiler) <mail@qutebrowser.org>
+#
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+import os
+import io
+import json
+import struct
+import pathlib
+import logging
+
+import pytest
+
+from qutebrowser.misc import pakjoy, binparsing
+from qutebrowser.utils import utils, version, standarddir
+
+
+pytest.importorskip("qutebrowser.qt.webenginecore")
+
+
+versions = version.qtwebengine_versions(avoid_init=True)
+
+
+@pytest.fixture
+def skipifneeded():
+ """Used to skip happy path tests with the real resources file.
+
+ Since we don't know how reliably the Google Meet hangouts extensions is
+ reliably in the resource files, and this quirk is only targeting 6.6
+ anyway.
+ """
+ if versions.webengine != utils.VersionNumber(6, 6):
+ raise pytest.skip("Code under test only runs on 6.6")
+
+
+@pytest.fixture(autouse=True)
+def clean_env():
+ yield
+ if "QTWEBENGINE_RESOURCES_PATH" in os.environ:
+ del os.environ["QTWEBENGINE_RESOURCES_PATH"]
+
+
+def patch_version(monkeypatch, *args):
+ monkeypatch.setattr(
+ pakjoy.version,
+ "qtwebengine_versions",
+ lambda **kwargs: version.WebEngineVersions(
+ webengine=utils.VersionNumber(*args),
+ chromium=None,
+ source="unittest",
+ )
+ )
+
+
+@pytest.fixture
+def unaffected_version(monkeypatch):
+ patch_version(monkeypatch, 6, 6, 1)
+
+
+@pytest.fixture
+def affected_version(monkeypatch):
+ patch_version(monkeypatch, 6, 6)
+
+
+def test_version_gate(unaffected_version, mocker):
+
+ fake_open = mocker.patch("qutebrowser.misc.pakjoy.open")
+ pakjoy.patch()
+ assert not fake_open.called
+
+
+@pytest.fixture(autouse=True)
+def tmp_cache(tmp_path, monkeypatch):
+ monkeypatch.setattr(pakjoy.standarddir, "cache", lambda: tmp_path)
+ return str(tmp_path)
+
+
+def json_without_comments(bytestring):
+ str_without_comments = "\n".join(
+ [
+ line
+ for line in
+ bytestring.decode("utf-8").split("\n")
+ if not line.strip().startswith("//")
+ ]
+ )
+ return json.loads(str_without_comments)
+
+
+@pytest.mark.usefixtures("affected_version")
+class TestWithRealResourcesFile:
+ """Tests that use the real pak file form the Qt installation."""
+
+ def test_happy_path(self, skipifneeded):
+ # Go through the full patching processes with the real resources file from
+ # the current installation. Make sure our replacement string is in it
+ # afterwards.
+ pakjoy.patch()
+
+ patched_resources = pathlib.Path(os.environ["QTWEBENGINE_RESOURCES_PATH"])
+
+ with open(patched_resources / "qtwebengine_resources.pak", "rb") as fd:
+ reparsed = pakjoy.PakParser(fd)
+
+ json_manifest = json_without_comments(reparsed.manifest)
+
+ assert pakjoy.REPLACEMENT_URL.decode("utf-8") in json_manifest[
+ "externally_connectable"
+ ]["matches"]
+
+ def test_copying_resources(self):
+ # Test we managed to copy some files over
+ work_dir = pakjoy.copy_webengine_resources()
+
+ assert work_dir.exists()
+ assert work_dir == standarddir.cache() / "webengine_resources_pak_quirk"
+ assert (work_dir / "qtwebengine_resources.pak").exists()
+ assert len(list(work_dir.glob("*"))) > 1
+
+ def test_copying_resources_overwrites(self):
+ work_dir = pakjoy.copy_webengine_resources()
+ tmpfile = work_dir / "tmp.txt"
+ tmpfile.touch()
+
+ pakjoy.copy_webengine_resources()
+ assert not tmpfile.exists()
+
+ @pytest.mark.parametrize("osfunc", ["copytree", "rmtree"])
+ def test_copying_resources_oserror(self, monkeypatch, caplog, osfunc):
+ # Test errors from the calls to shutil are handled
+ pakjoy.copy_webengine_resources() # run twice so we hit rmtree too
+ caplog.clear()
+
+ def raiseme(err):
+ raise err
+
+ monkeypatch.setattr(pakjoy.shutil, osfunc, lambda *_args: raiseme(PermissionError(osfunc)))
+ with caplog.at_level(logging.ERROR, "misc"):
+ pakjoy.patch()
+ assert caplog.messages == ["Failed to copy webengine resources, not applying quirk"]
+
+ def test_expected_file_not_found(self, tmp_cache, monkeypatch, caplog):
+ with caplog.at_level(logging.ERROR, "misc"):
+ pakjoy.patch(pathlib.Path(tmp_cache) / "doesntexist")
+ assert caplog.messages[-1].startswith(
+ "Resource pak doesn't exist at expected location! "
+ "Not applying quirks. Expected location: "
+ )
+
+
+def json_manifest_factory(extension_id=pakjoy.HANGOUTS_MARKER, url=pakjoy.TARGET_URL):
+ assert isinstance(extension_id, bytes)
+ assert isinstance(url, bytes)
+
+ return f"""
+ {{
+ {extension_id.decode("utf-8")}
+ "key": "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDAQt2ZDdPfoSe/JI6ID5bgLHRCnCu9T36aYczmhw/tnv6QZB2I6WnOCMZXJZlRdqWc7w9jo4BWhYS50Vb4weMfh/I0On7VcRwJUgfAxW2cHB+EkmtI1v4v/OU24OqIa1Nmv9uRVeX0GjhQukdLNhAE6ACWooaf5kqKlCeK+1GOkQIDAQAB",
+ "name": "Google Hangouts",
+ // Note: Always update the version number when this file is updated. Chrome
+ // triggers extension preferences update on the version increase.
+ "version": "1.3.21",
+ "manifest_version": 2,
+ "externally_connectable": {{
+ "matches": [
+ "{url.decode("utf-8")}",
+ "http://localhost:*/*"
+ ]
+ }}
+ }}
+ """.strip().encode("utf-8")
+
+
+def pak_factory(version=5, entries=None, encoding=1, sentinel_position=-1):
+ if entries is None:
+ entries = [json_manifest_factory()]
+
+ buffer = io.BytesIO()
+ buffer.write(struct.pack("<I", version))
+ buffer.write(struct.pack(pakjoy.Pak5Header._FORMAT, encoding, len(entries), 0))
+
+ entry_headers_size = (len(entries) + 1) * 6
+ start_of_data = buffer.tell() + entry_headers_size
+
+ # Normally the sentinel sits between the headers and the data. But to get
+ # full coverage we want to insert it in other positions.
+ with_indices = list(enumerate(entries, 1))
+ if sentinel_position == -1:
+ with_indices.append((0, b""))
+ elif sentinel_position is not None:
+ with_indices.insert(sentinel_position, (0, b""))
+
+ accumulated_data_offset = start_of_data
+ for idx, entry in with_indices:
+ buffer.write(struct.pack(pakjoy.PakEntry._FORMAT, idx, accumulated_data_offset))
+ accumulated_data_offset += len(entry)
+
+ for entry in entries:
+ assert isinstance(entry, bytes)
+ buffer.write(entry)
+
+ buffer.seek(0)
+ return buffer
+
+
+@pytest.mark.usefixtures("affected_version")
+class TestWithConstructedResourcesFile:
+ """Tests that use a constructed pak file to give us more control over it."""
+
+ def test_happy_path(self):
+ buffer = pak_factory()
+
+ parser = pakjoy.PakParser(buffer)
+
+ json_manifest = json_without_comments(parser.manifest)
+
+ assert pakjoy.TARGET_URL.decode("utf-8") in json_manifest[
+ "externally_connectable"
+ ]["matches"]
+
+ def test_bad_version(self):
+ buffer = pak_factory(version=99)
+
+ with pytest.raises(
+ binparsing.ParseError,
+ match="Unsupported .pak version 99",
+ ):
+ pakjoy.PakParser(buffer)
+
+ @pytest.mark.parametrize("position, error", [
+ (0, "Unexpected sentinel entry"),
+ (None, "Missing sentinel entry"),
+ ])
+ def test_bad_sentinal_position(self, position, error):
+ buffer = pak_factory(sentinel_position=position)
+
+ with pytest.raises(binparsing.ParseError):
+ pakjoy.PakParser(buffer)
+
+ @pytest.mark.parametrize("entry", [
+ b"{foo}",
+ b"V2VsbCBoZWxsbyB0aGVyZQo=",
+ ])
+ def test_marker_not_found(self, entry):
+ buffer = pak_factory(entries=[entry])
+
+ with pytest.raises(
+ binparsing.ParseError,
+ match="Couldn't find hangouts manifest",
+ ):
+ pakjoy.PakParser(buffer)
+
+ def test_url_not_found(self):
+ buffer = pak_factory(entries=[json_manifest_factory(url=b"example.com")])
+
+ parser = pakjoy.PakParser(buffer)
+ with pytest.raises(
+ binparsing.ParseError,
+ match="Couldn't find URL in manifest",
+ ):
+ parser.find_patch_offset()
+
+ def test_url_not_found_high_level(self, tmp_cache, caplog,
+ affected_version):
+ buffer = pak_factory(entries=[json_manifest_factory(url=b"example.com")])
+
+ # Write bytes to file so we can test pakjoy.patch()
+ tmpfile = pathlib.Path(tmp_cache) / "bad.pak"
+ with open(tmpfile, "wb") as fd:
+ fd.write(buffer.read())
+
+ with caplog.at_level(logging.ERROR, "misc"):
+ pakjoy.patch(tmpfile)
+
+ assert caplog.messages == [
+ "Failed to apply quirk to resources pak."
+ ]