summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFlorian Bruhin <me@the-compiler.org>2021-07-09 17:06:23 +0200
committerGitHub <noreply@github.com>2021-07-09 17:06:23 +0200
commitae6d9009716c85c679b490ab4df92e80b77b3fa5 (patch)
tree3c03eb22447c3233533dc91679979ee3de8f8991
parent8bdab79011680c13b3b2072a3b18a4471c168131 (diff)
parent71a7674a706b73dfaac5958e9c3bca414c4e8665 (diff)
downloadqutebrowser-ae6d9009716c85c679b490ab4df92e80b77b3fa5.tar.gz
qutebrowser-ae6d9009716c85c679b490ab4df92e80b77b3fa5.zip
Merge pull request #6567 from lufte/issue6039
Database class
-rw-r--r--qutebrowser/app.py7
-rw-r--r--qutebrowser/browser/history.py96
-rw-r--r--qutebrowser/completion/models/histcategory.py7
-rw-r--r--qutebrowser/completion/models/urlmodel.py3
-rw-r--r--qutebrowser/misc/sql.py334
-rw-r--r--tests/helpers/fixtures.py21
-rw-r--r--tests/unit/browser/test_history.py97
-rw-r--r--tests/unit/completion/test_histcategory.py28
-rw-r--r--tests/unit/misc/test_sql.py156
9 files changed, 431 insertions, 318 deletions
diff --git a/qutebrowser/app.py b/qutebrowser/app.py
index 2df0a82f6..1ab28e8d0 100644
--- a/qutebrowser/app.py
+++ b/qutebrowser/app.py
@@ -41,6 +41,7 @@ import os
import sys
import functools
import tempfile
+import pathlib
import datetime
import argparse
from typing import Iterable, Optional
@@ -479,11 +480,9 @@ def _init_modules(*, args):
with debug.log_time("init", "Initializing SQL/history"):
try:
- log.init.debug("Initializing SQL...")
- sql.init(os.path.join(standarddir.data(), 'history.sqlite'))
-
log.init.debug("Initializing web history...")
- history.init(objects.qapp)
+ history.init(db_path=pathlib.Path(standarddir.data()) / 'history.sqlite',
+ parent=objects.qapp)
except sql.KnownError as e:
error.handle_fatal_exc(e, 'Error initializing SQL',
pre_text='Error initializing SQL',
diff --git a/qutebrowser/browser/history.py b/qutebrowser/browser/history.py
index 773c6cc51..559992327 100644
--- a/qutebrowser/browser/history.py
+++ b/qutebrowser/browser/history.py
@@ -22,9 +22,10 @@
import os
import time
import contextlib
-from typing import cast, Mapping, MutableSequence
+import pathlib
+from typing import cast, Mapping, MutableSequence, Optional
-from PyQt5.QtCore import pyqtSlot, QUrl, pyqtSignal
+from PyQt5.QtCore import pyqtSlot, QUrl, QObject, pyqtSignal
from PyQt5.QtWidgets import QProgressDialog, QApplication
from qutebrowser.config import config
@@ -91,13 +92,14 @@ class CompletionMetaInfo(sql.SqlTable):
'force_rebuild': False,
}
- def __init__(self, parent=None):
+ def __init__(self, database: sql.Database,
+ parent: Optional[QObject] = None) -> None:
self._fields = ['key', 'value']
self._constraints = {'key': 'PRIMARY KEY'}
- super().__init__(
- "CompletionMetaInfo", self._fields, constraints=self._constraints)
+ super().__init__(database, "CompletionMetaInfo", self._fields,
+ constraints=self._constraints, parent=parent)
- if sql.user_version_changed():
+ if database.user_version_changed():
self._init_default_values()
def _check_key(self, key):
@@ -125,8 +127,8 @@ class CompletionMetaInfo(sql.SqlTable):
def __getitem__(self, key):
self._check_key(key)
- query = sql.Query('SELECT value FROM CompletionMetaInfo '
- 'WHERE key = :key')
+ query = self.database.query('SELECT value FROM CompletionMetaInfo '
+ 'WHERE key = :key')
return query.run(key=key).value()
def __setitem__(self, key, value):
@@ -138,8 +140,9 @@ class CompletionHistory(sql.SqlTable):
"""History which only has the newest entry for each URL."""
- def __init__(self, parent=None):
- super().__init__("CompletionHistory", ['url', 'title', 'last_atime'],
+ def __init__(self, database: sql.Database,
+ parent: Optional[QObject] = None) -> None:
+ super().__init__(database, "CompletionHistory", ['url', 'title', 'last_atime'],
constraints={'url': 'PRIMARY KEY',
'title': 'NOT NULL',
'last_atime': 'NOT NULL'},
@@ -162,8 +165,9 @@ class WebHistory(sql.SqlTable):
# one url cleared
url_cleared = pyqtSignal(QUrl)
- def __init__(self, progress, parent=None):
- super().__init__("History", ['url', 'title', 'atime', 'redirect'],
+ def __init__(self, database: sql.Database, progress: HistoryProgress,
+ parent: Optional[QObject] = None) -> None:
+ super().__init__(database, "History", ['url', 'title', 'atime', 'redirect'],
constraints={'url': 'NOT NULL',
'title': 'NOT NULL',
'atime': 'NOT NULL',
@@ -173,8 +177,8 @@ class WebHistory(sql.SqlTable):
# Store the last saved url to avoid duplicate immediate saves.
self._last_url = None
- self.completion = CompletionHistory(parent=self)
- self.metainfo = CompletionMetaInfo(parent=self)
+ self.completion = CompletionHistory(database, parent=self)
+ self.metainfo = CompletionMetaInfo(database, parent=self)
try:
rebuild_completion = self.metainfo['force_rebuild']
@@ -184,16 +188,18 @@ class WebHistory(sql.SqlTable):
self.metainfo.try_recover()
rebuild_completion = self.metainfo['force_rebuild']
- if sql.user_version_changed():
- # If the DB user version changed, run a full cleanup and rebuild the
- # completion history.
- #
- # In the future, this could be improved to only be done when actually needed
- # - but version changes happen very infrequently, rebuilding everything
- # gives us less corner-cases to deal with, and we can run a VACUUM to make
- # things smaller.
- self._cleanup_history()
- rebuild_completion = True
+ if self.database.user_version_changed():
+ with self.database.transaction():
+ # If the DB user version changed, run a full cleanup and rebuild the
+ # completion history.
+ #
+ # In the future, this could be improved to only be done when actually
+ # needed - but version changes happen very infrequently, rebuilding
+ # everything gives us less corner-cases to deal with, and we can run a
+ # VACUUM to make things smaller.
+ self._cleanup_history()
+ rebuild_completion = True
+ self.database.upgrade_user_version()
# Get a string of all patterns
patterns = config.instance.get_str('completion.web_history.exclude')
@@ -211,19 +217,19 @@ class WebHistory(sql.SqlTable):
self.create_index('HistoryIndex', 'url')
self.create_index('HistoryAtimeIndex', 'atime')
self._contains_query = self.contains_query('url')
- self._between_query = sql.Query('SELECT * FROM History '
- 'where not redirect '
- 'and not url like "qute://%" '
- 'and atime > :earliest '
- 'and atime <= :latest '
- 'ORDER BY atime desc')
-
- self._before_query = sql.Query('SELECT * FROM History '
- 'where not redirect '
- 'and not url like "qute://%" '
- 'and atime <= :latest '
- 'ORDER BY atime desc '
- 'limit :limit offset :offset')
+ self._between_query = self.database.query('SELECT * FROM History '
+ 'where not redirect '
+ 'and not url like "qute://%" '
+ 'and atime > :earliest '
+ 'and atime <= :latest '
+ 'ORDER BY atime desc')
+
+ self._before_query = self.database.query('SELECT * FROM History '
+ 'where not redirect '
+ 'and not url like "qute://%" '
+ 'and atime <= :latest '
+ 'ORDER BY atime desc '
+ 'limit :limit offset :offset')
def __repr__(self):
return utils.get_repr(self, length=len(self))
@@ -271,7 +277,7 @@ class WebHistory(sql.SqlTable):
'qute://pdfjs%',
]
where_clause = ' OR '.join(f"url LIKE '{term}'" for term in terms)
- q = sql.Query(f'DELETE FROM History WHERE {where_clause}')
+ q = self.database.query(f'DELETE FROM History WHERE {where_clause}')
entries = q.run()
log.sql.debug(f"Cleanup removed {entries.rows_affected()} items")
@@ -297,9 +303,9 @@ class WebHistory(sql.SqlTable):
QApplication.processEvents()
# Select the latest entry for each url
- q = sql.Query('SELECT url, title, max(atime) AS atime FROM History '
- 'WHERE NOT redirect '
- 'GROUP BY url ORDER BY atime asc')
+ q = self.database.query('SELECT url, title, max(atime) AS atime FROM History '
+ 'WHERE NOT redirect '
+ 'GROUP BY url ORDER BY atime asc')
result = q.run()
QApplication.processEvents()
entries = list(result)
@@ -319,7 +325,7 @@ class WebHistory(sql.SqlTable):
self._progress.set_maximum(0)
# We might have caused fragmentation - let's clean up.
- sql.Query('VACUUM').run()
+ self.database.query('VACUUM').run()
QApplication.processEvents()
self.completion.insert_batch(data, replace=True)
@@ -472,15 +478,17 @@ def debug_dump_history(dest):
raise cmdutils.CommandError(f'Could not write history: {e}')
-def init(parent=None):
+def init(db_path: pathlib.Path, parent: Optional[QObject] = None) -> None:
"""Initialize the web history.
Args:
+ db_path: The path for the SQLite database.
parent: The parent to use for WebHistory.
"""
global web_history
progress = HistoryProgress()
- web_history = WebHistory(progress=progress, parent=parent)
+ database = sql.Database(str(db_path))
+ web_history = WebHistory(database=database, progress=progress, parent=parent)
if objects.backend == usertypes.Backend.QtWebKit: # pragma: no cover
from qutebrowser.browser.webkit import webkithistory
diff --git a/qutebrowser/completion/models/histcategory.py b/qutebrowser/completion/models/histcategory.py
index bee2b43d9..8dd1be838 100644
--- a/qutebrowser/completion/models/histcategory.py
+++ b/qutebrowser/completion/models/histcategory.py
@@ -34,11 +34,12 @@ class HistoryCategory(QSqlQueryModel):
"""A completion category that queries the SQL history store."""
- def __init__(self, *,
+ def __init__(self, *, database: sql.Database,
delete_func: util.DeleteFuncType = None,
parent: QWidget = None) -> None:
"""Create a new History completion category."""
super().__init__(parent=parent)
+ self._database = database
self.name = "History"
self._query: Optional[sql.Query] = None
@@ -56,7 +57,7 @@ class HistoryCategory(QSqlQueryModel):
if max_items < 0:
return ''
- min_atime = sql.Query(' '.join([
+ min_atime = self._database.query(' '.join([
'SELECT min(last_atime) FROM',
'(SELECT last_atime FROM CompletionHistory',
'ORDER BY last_atime DESC LIMIT :limit)',
@@ -107,7 +108,7 @@ class HistoryCategory(QSqlQueryModel):
# if the number of words changed, we need to generate a new
# query otherwise, we can reuse the prepared query for
# performance
- self._query = sql.Query(' '.join([
+ self._query = self._database.query(' '.join([
"SELECT url, title, {}".format(timefmt),
"FROM CompletionHistory",
# the incoming pattern will have literal % and _ escaped we
diff --git a/qutebrowser/completion/models/urlmodel.py b/qutebrowser/completion/models/urlmodel.py
index 2152f60ec..56af1f7c7 100644
--- a/qutebrowser/completion/models/urlmodel.py
+++ b/qutebrowser/completion/models/urlmodel.py
@@ -90,7 +90,8 @@ def url(*, info):
history_disabled = info.config.get('completion.web_history.max_items') == 0
if not history_disabled and 'history' in categories:
- hist_cat = histcategory.HistoryCategory(delete_func=_delete_history)
+ hist_cat = histcategory.HistoryCategory(database=history.web_history.database,
+ delete_func=_delete_history)
models['history'] = hist_cat
if 'filesystem' in categories:
diff --git a/qutebrowser/misc/sql.py b/qutebrowser/misc/sql.py
index 68c0fd538..814eb2bb0 100644
--- a/qutebrowser/misc/sql.py
+++ b/qutebrowser/misc/sql.py
@@ -17,15 +17,19 @@
# You should have received a copy of the GNU General Public License
# along with qutebrowser. If not, see <https://www.gnu.org/licenses/>.
-"""Provides access to an in-memory sqlite database."""
+"""Provides access to sqlite databases."""
import collections
+import contextlib
import dataclasses
+import types
+from typing import Any, Dict, Iterator, List, Mapping, MutableSequence, Optional, Type
from PyQt5.QtCore import QObject, pyqtSignal
-from PyQt5.QtSql import QSqlDatabase, QSqlQuery, QSqlError
+from PyQt5.QtSql import QSqlDatabase, QSqlError, QSqlQuery
-from qutebrowser.utils import log, debug
+from qutebrowser.qt import sip
+from qutebrowser.utils import debug, log
@dataclasses.dataclass
@@ -48,32 +52,23 @@ class UserVersion:
minor: int
@classmethod
- def from_int(cls, num):
+ def from_int(cls, num: int) -> 'UserVersion':
"""Parse a number from sqlite into a major/minor user version."""
assert 0 <= num <= 0x7FFF_FFFF, num # signed integer, but shouldn't be negative
major = (num & 0x7FFF_0000) >> 16
minor = num & 0x0000_FFFF
return cls(major, minor)
- def to_int(self):
+ def to_int(self) -> int:
"""Get a sqlite integer from a major/minor user version."""
assert 0 <= self.major <= 0x7FFF # signed integer
assert 0 <= self.minor <= 0xFFFF
return self.major << 16 | self.minor
- def __str__(self):
+ def __str__(self) -> str:
return f'{self.major}.{self.minor}'
-_db_user_version = None # The user version we got from the database
-_USER_VERSION = UserVersion(0, 4) # The current / newest user version
-
-
-def user_version_changed():
- """Whether the version stored in the database is different from the current one."""
- return _db_user_version != _USER_VERSION
-
-
class SqliteErrorCode:
"""Error codes as used by sqlite.
@@ -98,11 +93,11 @@ class Error(Exception):
"""Base class for all SQL related errors."""
- def __init__(self, msg, error=None):
+ def __init__(self, msg: str, error: Optional[QSqlError] = None) -> None:
super().__init__(msg)
self.error = error
- def text(self):
+ def text(self) -> str:
"""Get a short text description of the error.
This is a string suitable to show to the user as error message.
@@ -130,18 +125,17 @@ class BugError(Error):
"""
-def raise_sqlite_error(msg, error):
+def raise_sqlite_error(msg: str, error: QSqlError) -> None:
"""Raise either a BugError or KnownError."""
error_code = error.nativeErrorCode()
database_text = error.databaseText()
driver_text = error.driverText()
log.sql.debug("SQL error:")
- log.sql.debug("type: {}".format(
- debug.qenum_key(QSqlError, error.type())))
- log.sql.debug("database text: {}".format(database_text))
- log.sql.debug("driver text: {}".format(driver_text))
- log.sql.debug("error code: {}".format(error_code))
+ log.sql.debug(f"type: {debug.qenum_key(QSqlError, error.type())}")
+ log.sql.debug(f"database text: {database_text}")
+ log.sql.debug(f"driver text: {driver_text}")
+ log.sql.debug(f"error code: {error_code}")
known_errors = [
SqliteErrorCode.BUSY,
@@ -168,82 +162,145 @@ def raise_sqlite_error(msg, error):
raise BugError(msg, error)
-def init(db_path):
- """Initialize the SQL database connection."""
- database = QSqlDatabase.addDatabase('QSQLITE')
- if not database.isValid():
- raise KnownError('Failed to add database. Are sqlite and Qt sqlite '
- 'support installed?')
- database.setDatabaseName(db_path)
- if not database.open():
- error = database.lastError()
- msg = "Failed to open sqlite database at {}: {}".format(db_path,
- error.text())
- raise_sqlite_error(msg, error)
-
- global _db_user_version
- version_int = Query('pragma user_version').run().value()
- _db_user_version = UserVersion.from_int(version_int)
-
- if _db_user_version.major > _USER_VERSION.major:
- raise KnownError(
- "Database is too new for this qutebrowser version (database version "
- f"{_db_user_version}, but {_USER_VERSION.major}.x is supported)")
-
- if user_version_changed():
- log.sql.debug(f"Migrating from version {_db_user_version} to {_USER_VERSION}")
- # Note we're *not* updating the _db_user_version global here. We still want
- # user_version_changed() to return True, as other modules (such as history.py)
- # use it to create the initial table structure.
- Query(f'PRAGMA user_version = {_USER_VERSION.to_int()}').run()
-
- # Enable write-ahead-logging and reduce disk write frequency
- # see https://sqlite.org/pragma.html and issues #2930 and #3507
- #
- # We might already have done this (without a migration) in earlier versions, but
- # as those are idempotent, let's make sure we run them once again.
- Query("PRAGMA journal_mode=WAL").run()
- Query("PRAGMA synchronous=NORMAL").run()
-
-
-def close():
- """Close the SQL connection."""
- QSqlDatabase.removeDatabase(QSqlDatabase.database().connectionName())
-
-
-def version():
- """Return the sqlite version string."""
- try:
- if not QSqlDatabase.database().isOpen():
- init(':memory:')
- ver = Query("select sqlite_version()").run().value()
- close()
- return ver
- return Query("select sqlite_version()").run().value()
- except KnownError as e:
- return 'UNAVAILABLE ({})'.format(e)
+class Database:
+
+ """A wrapper over a QSqlDatabase connection."""
+
+ _USER_VERSION = UserVersion(0, 4) # The current / newest user version
+
+ def __init__(self, path: str) -> None:
+ if QSqlDatabase.database(path).isValid():
+ raise BugError(f'A connection to the database at "{path}" already exists')
+
+ self._path = path
+ database = QSqlDatabase.addDatabase('QSQLITE', path)
+ if not database.isValid():
+ raise KnownError('Failed to add database. Are sqlite and Qt sqlite '
+ 'support installed?')
+ database.setDatabaseName(path)
+ if not database.open():
+ error = database.lastError()
+ msg = f"Failed to open sqlite database at {path}: {error.text()}"
+ raise_sqlite_error(msg, error)
+
+ version_int = self.query('pragma user_version').run().value()
+ self._user_version = UserVersion.from_int(version_int)
+
+ if self._user_version.major > self._USER_VERSION.major:
+ raise KnownError(
+ "Database is too new for this qutebrowser version (database version "
+ f"{self._user_version}, but {self._USER_VERSION.major}.x is supported)")
+
+ if self.user_version_changed():
+ # Enable write-ahead-logging and reduce disk write frequency
+ # see https://sqlite.org/pragma.html and issues #2930 and #3507
+ #
+ # We might already have done this (without a migration) in earlier versions,
+ # but as those are idempotent, let's make sure we run them once again.
+ self.query("PRAGMA journal_mode=WAL").run()
+ self.query("PRAGMA synchronous=NORMAL").run()
+
+ def qt_database(self) -> QSqlDatabase:
+ """Return the wrapped QSqlDatabase instance."""
+ database = QSqlDatabase.database(self._path, open=True)
+ if not database.isValid():
+ raise BugError('Failed to get connection. Did you close() this Database '
+ 'instance?')
+ return database
+
+ def query(self, querystr: str, forward_only: bool = True) -> 'Query':
+ """Return a Query instance linked to this Database."""
+ return Query(self, querystr, forward_only)
+
+ def table(self, name: str, fields: List[str],
+ constraints: Optional[Dict[str, str]] = None,
+ parent: Optional[QObject] = None) -> 'SqlTable':
+ """Return a SqlTable instance linked to this Database."""
+ return SqlTable(self, name, fields, constraints, parent)
+
+ def user_version_changed(self) -> bool:
+ """Whether the version stored in the database differs from the current one."""
+ return self._user_version != self._USER_VERSION
+
+ def upgrade_user_version(self) -> None:
+ """Upgrade the user version to the latest version.
+
+ This method should be called once all required operations to migrate from one
+ version to another have been run.
+ """
+ log.sql.debug(f"Migrating from version {self._user_version} "
+ f"to {self._USER_VERSION}")
+ self.query(f'PRAGMA user_version = {self._USER_VERSION.to_int()}').run()
+ self._user_version = self._USER_VERSION
+
+ def close(self) -> None:
+ """Close the SQL connection."""
+ database = self.qt_database()
+ database.close()
+ sip.delete(database)
+ QSqlDatabase.removeDatabase(self._path)
+
+ def transaction(self) -> 'Transaction':
+ """Return a Transaction object linked to this Database."""
+ return Transaction(self)
+
+
+class Transaction(contextlib.AbstractContextManager): # type: ignore[type-arg]
+
+ """A Database transaction that can be used as a context manager."""
+
+ def __init__(self, database: Database) -> None:
+ self._database = database
+
+ def __enter__(self) -> None:
+ log.sql.debug('Starting a transaction')
+ db = self._database.qt_database()
+ ok = db.transaction()
+ if not ok:
+ error = db.lastError()
+ msg = f'Failed to start a transaction: "{error.text()}"'
+ raise_sqlite_error(msg, error)
+
+ def __exit__(self,
+ _exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ _exc_tb: Optional[types.TracebackType]) -> None:
+ db = self._database.qt_database()
+ if exc_val:
+ log.sql.debug('Rolling back a transaction')
+ db.rollback()
+ else:
+ log.sql.debug('Committing a transaction')
+ ok = db.commit()
+ if not ok:
+ error = db.lastError()
+ msg = f'Failed to commit a transaction: "{error.text()}"'
+ raise_sqlite_error(msg, error)
class Query:
"""A prepared SQL query."""
- def __init__(self, querystr, forward_only=True):
+ def __init__(self, database: Database, querystr: str,
+ forward_only: bool = True) -> None:
"""Prepare a new SQL query.
Args:
+ database: The Database object on which to operate.
querystr: String to prepare query from.
forward_only: Optimization for queries that will only step forward.
Must be false for completion queries.
"""
- self.query = QSqlQuery(QSqlDatabase.database())
+ self._database = database
+ self.query = QSqlQuery(database.qt_database())
log.sql.vdebug(f'Preparing: {querystr}') # type: ignore[attr-defined]
ok = self.query.prepare(querystr)
self._check_ok('prepare', ok)
self.query.setForwardOnly(forward_only)
- def __iter__(self):
+ def __iter__(self) -> Iterator[Any]:
if not self.query.isActive():
raise BugError("Cannot iterate inactive query")
rec = self.query.record()
@@ -255,17 +312,16 @@ class Query:
rec = self.query.record()
yield rowtype(*[rec.value(i) for i in range(rec.count())])
- def _check_ok(self, step, ok):
+ def _check_ok(self, step: str, ok: bool) -> None:
if not ok:
query = self.query.lastQuery()
error = self.query.lastError()
- msg = 'Failed to {} query "{}": "{}"'.format(step, query,
- error.text())
+ msg = f'Failed to {step} query "{query}": "{error.text()}"'
raise_sqlite_error(msg, error)
- def _bind_values(self, values):
+ def _bind_values(self, values: Mapping[str, Any]) -> Dict[str, Any]:
for key, val in values.items():
- self.query.bindValue(':{}'.format(key), val)
+ self.query.bindValue(f':{key}', val)
bound_values = self.bound_values()
if None in bound_values.values():
@@ -273,7 +329,7 @@ class Query:
return bound_values
- def run(self, **values):
+ def run(self, **values: Any) -> 'Query':
"""Execute the prepared query."""
log.sql.debug(self.query.lastQuery())
@@ -286,14 +342,13 @@ class Query:
return self
- def run_batch(self, values):
+ def run_batch(self, values: Mapping[str, MutableSequence[Any]]) -> None:
"""Execute the query in batch mode."""
- log.sql.debug('Running SQL query (batch): "{}"'.format(
- self.query.lastQuery()))
+ log.sql.debug(f'Running SQL query (batch): "{self.query.lastQuery()}"')
self._bind_values(values)
- db = QSqlDatabase.database()
+ db = self._database.qt_database()
ok = db.transaction()
self._check_ok('transaction', ok)
@@ -308,13 +363,13 @@ class Query:
ok = db.commit()
self._check_ok('commit', ok)
- def value(self):
+ def value(self) -> Any:
"""Return the result of a single-value query (e.g. an EXISTS)."""
if not self.query.next():
raise BugError("No result for single-result query")
return self.query.record().value(0)
- def rows_affected(self):
+ def rows_affected(self) -> int:
"""Return how many rows were affected by a non-SELECT query."""
assert not self.query.isSelect(), self
assert self.query.isActive(), self
@@ -322,7 +377,7 @@ class Query:
assert rows != -1
return rows
- def bound_values(self):
+ def bound_values(self) -> Dict[str, Any]:
return self.query.boundValues()
@@ -332,84 +387,93 @@ class SqlTable(QObject):
Attributes:
_name: Name of the SQL table this wraps.
+ database: The Database to which this table belongs.
Signals:
changed: Emitted when the table is modified.
"""
changed = pyqtSignal()
+ database: Database
- def __init__(self, name, fields, constraints=None, parent=None):
+ def __init__(self, database: Database, name: str, fields: List[str],
+ constraints: Optional[Dict[str, str]] = None,
+ parent: Optional[QObject] = None) -> None:
"""Wrapper over a table in the SQL database.
Args:
+ database: The Database to which this table belongs.
name: Name of the table.
fields: A list of field names.
constraints: A dict mapping field names to constraint strings.
"""
super().__init__(parent)
self._name = name
+ self.database = database
self._create_table(fields, constraints)
- def _create_table(self, fields, constraints, *, force=False):
+ def _create_table(self, fields: List[str], constraints: Optional[Dict[str, str]],
+ *, force: bool = False) -> None:
"""Create the table if the database is uninitialized.
If the table already exists, this does nothing (except with force=True), so it
can e.g. be called on every user_version change.
"""
- if not user_version_changed() and not force:
+ if not self.database.user_version_changed() and not force:
return
constraints = constraints or {}
- column_defs = ['{} {}'.format(field, constraints.get(field, ''))
+ column_defs = [f'{field} {constraints.get(field, "")}'
for field in fields]
- q = Query("CREATE TABLE IF NOT EXISTS {name} ({column_defs})"
- .format(name=self._name, column_defs=', '.join(column_defs)))
+ q = self.database.query(
+ f"CREATE TABLE IF NOT EXISTS {self._name} ({', '.join(column_defs)})"
+ )
q.run()
- def create_index(self, name, field):
+ def create_index(self, name: str, field: str) -> None:
"""Create an index over this table if the database is uninitialized.
Args:
name: Name of the index, should be unique.
field: Name of the field to index.
"""
- if not user_version_changed():
+ if not self.database.user_version_changed():
return
- q = Query("CREATE INDEX IF NOT EXISTS {name} ON {table} ({field})"
- .format(name=name, table=self._name, field=field))
+ q = self.database.query(
+ f"CREATE INDEX IF NOT EXISTS {name} ON {self._name} ({field})"
+ )
q.run()
- def __iter__(self):
+ def __iter__(self) -> Iterator[Any]:
"""Iterate rows in the table."""
- q = Query("SELECT * FROM {table}".format(table=self._name))
+ q = self.database.query(f"SELECT * FROM {self._name}")
q.run()
return iter(q)
- def contains_query(self, field):
+ def contains_query(self, field: str) -> Query:
"""Return a prepared query that checks for the existence of an item.
Args:
field: Field to match.
"""
- return Query(
- "SELECT EXISTS(SELECT * FROM {table} WHERE {field} = :val)"
- .format(table=self._name, field=field))
+ return self.database.query(
+ f"SELECT EXISTS(SELECT * FROM {self._name} WHERE {field} = :val)"
+ )
- def __len__(self):
+ def __len__(self) -> int:
"""Return the count of rows in the table."""
- q = Query("SELECT count(*) FROM {table}".format(table=self._name))
+ q = self.database.query(f"SELECT count(*) FROM {self._name}")
q.run()
return q.value()
- def __bool__(self):
+ def __bool__(self) -> bool:
"""Check whether there's any data in the table."""
- q = Query(f"SELECT 1 FROM {self._name} LIMIT 1")
+ q = self.database.query(f"SELECT 1 FROM {self._name} LIMIT 1")
q.run()
return q.query.next()
- def delete(self, field, value):
+ def delete(self, field: str, value: Any) -> None:
"""Remove all rows for which `field` equals `value`.
Args:
@@ -419,20 +483,21 @@ class SqlTable(QObject):
Return:
The number of rows deleted.
"""
- q = Query(f"DELETE FROM {self._name} where {field} = :val")
+ q = self.database.query(f"DELETE FROM {self._name} where {field} = :val")
q.run(val=value)
if not q.rows_affected():
- raise KeyError('No row with {} = "{}"'.format(field, value))
+ raise KeyError('No row with {field} = "{value}"')
self.changed.emit()
- def _insert_query(self, values, replace):
- params = ', '.join(':{}'.format(key) for key in values)
+ def _insert_query(self, values: Mapping[str, Any], replace: bool) -> Query:
+ params = ', '.join(f':{key}' for key in values)
+ columns = ', '.join(values)
verb = "REPLACE" if replace else "INSERT"
- return Query("{verb} INTO {table} ({columns}) values({params})".format(
- verb=verb, table=self._name, columns=', '.join(values),
- params=params))
+ return self.database.query(
+ f"{verb} INTO {self._name} ({columns}) values({params})"
+ )
- def insert(self, values, replace=False):
+ def insert(self, values: Mapping[str, Any], replace: bool = False) -> None:
"""Append a row to the table.
Args:
@@ -443,7 +508,8 @@ class SqlTable(QObject):
q.run(**values)
self.changed.emit()
- def insert_batch(self, values, replace=False):
+ def insert_batch(self, values: Mapping[str, MutableSequence[Any]],
+ replace: bool = False) -> None:
"""Performantly append multiple rows to the table.
Args:
@@ -454,12 +520,12 @@ class SqlTable(QObject):
q.run_batch(values)
self.changed.emit()
- def delete_all(self):
+ def delete_all(self) -> None:
"""Remove all rows from the table."""
- Query("DELETE FROM {table}".format(table=self._name)).run()
+ self.database.query(f"DELETE FROM {self._name}").run()
self.changed.emit()
- def select(self, sort_by, sort_order, limit=-1):
+ def select(self, sort_by: str, sort_order: str, limit: int = -1) -> Query:
"""Prepare, run, and return a select statement on this table.
Args:
@@ -469,9 +535,17 @@ class SqlTable(QObject):
Return: A prepared and executed select query.
"""
- q = Query("SELECT * FROM {table} ORDER BY {sort_by} {sort_order} "
- "LIMIT :limit"
- .format(table=self._name, sort_by=sort_by,
- sort_order=sort_order))
+ q = self.database.query(
+ f"SELECT * FROM {self._name} ORDER BY {sort_by} {sort_order} LIMIT :limit"
+ )
q.run(limit=limit)
return q
+
+
+def version() -> str:
+ """Return the sqlite version string."""
+ try:
+ with contextlib.closing(Database(':memory:')) as in_memory_db:
+ return in_memory_db.query("select sqlite_version()").run().value()
+ except KnownError as e:
+ return f'UNAVAILABLE ({e})'
diff --git a/tests/helpers/fixtures.py b/tests/helpers/fixtures.py
index 7106698be..cd3778b8a 100644
--- a/tests/helpers/fixtures.py
+++ b/tests/helpers/fixtures.py
@@ -639,15 +639,6 @@ def short_tmpdir():
yield py.path.local(tdir) # pylint: disable=no-member
-@pytest.fixture
-def init_sql(data_tmpdir):
- """Initialize the SQL module, and shut it down after the test."""
- path = str(data_tmpdir / 'test.db')
- sql.init(path)
- yield
- sql.close()
-
-
class ModelValidator:
"""Validates completion models."""
@@ -682,12 +673,20 @@ def download_stub(win_registry, tmpdir, stubs):
@pytest.fixture
-def web_history(fake_save_manager, tmpdir, init_sql, config_stub, stubs,
+def database(data_tmpdir):
+ """Create a Database object."""
+ db = sql.Database(str(data_tmpdir / 'test.db'))
+ yield db
+ db.close()
+
+
+@pytest.fixture
+def web_history(fake_save_manager, tmpdir, database, config_stub, stubs,
monkeypatch):
"""Create a WebHistory object."""
config_stub.val.completion.timestamp_format = '%Y-%m-%d'
config_stub.val.completion.web_history.max_items = -1
- web_history = history.WebHistory(stubs.FakeHistoryProgress())
+ web_history = history.WebHistory(database, stubs.FakeHistoryProgress())
monkeypatch.setattr(history, 'web_history', web_history)
return web_history
diff --git a/tests/unit/browser/test_history.py b/tests/unit/browser/test_history.py
index 1a46c5be0..7906d385c 100644
--- a/tests/unit/browser/test_history.py
+++ b/tests/unit/browser/test_history.py
@@ -31,7 +31,7 @@ from qutebrowser.misc import sql, objects
@pytest.fixture(autouse=True)
-def prerequisites(config_stub, fake_save_manager, init_sql, fake_args):
+def prerequisites(config_stub, fake_save_manager, fake_args):
"""Make sure everything is ready to initialize a WebHistory."""
config_stub.data = {'general': {'private-browsing': False}}
@@ -311,14 +311,14 @@ class TestInit:
@pytest.mark.parametrize('backend', [usertypes.Backend.QtWebEngine,
usertypes.Backend.QtWebKit])
- def test_init(self, backend, qapp, tmpdir, monkeypatch, cleanup_init):
+ def test_init(self, backend, qapp, tmpdir, data_tmpdir, monkeypatch, cleanup_init):
if backend == usertypes.Backend.QtWebKit:
pytest.importorskip('PyQt5.QtWebKitWidgets')
else:
assert backend == usertypes.Backend.QtWebEngine
monkeypatch.setattr(history.objects, 'backend', backend)
- history.init(qapp)
+ history.init(data_tmpdir / f'test_init_{backend}', qapp)
assert history.web_history.parent() is qapp
try:
@@ -368,44 +368,40 @@ class TestDump:
class TestRebuild:
- # FIXME: Some of those tests might be a bit misleading, as creating a new
- # history.WebHistory will regenerate the completion either way with the SQL changes
- # in v2.0.0 (because the user version changed from 0 -> 3).
- #
- # They should be revisited once we can actually create two independent sqlite
- # databases and copy the data over, for a "real" test.
-
- def test_user_version(self, web_history, stubs, monkeypatch):
+ def test_user_version(self, database, stubs, monkeypatch):
"""Ensure that completion is regenerated if user_version changes."""
+ web_history = history.WebHistory(database, stubs.FakeHistoryProgress())
web_history.add_url(QUrl('example.com/1'), redirect=False, atime=1)
web_history.add_url(QUrl('example.com/2'), redirect=False, atime=2)
web_history.completion.delete('url', 'example.com/2')
- # User version always changes, so this won't work
- # hist2 = history.WebHistory(progress=stubs.FakeHistoryProgress())
- # assert list(hist2.completion) == [('example.com/1', '', 1)]
+ hist2 = history.WebHistory(database, progress=stubs.FakeHistoryProgress())
+ assert list(hist2.completion) == [('example.com/1', '', 1)]
- monkeypatch.setattr(sql, 'user_version_changed', lambda: True)
+ monkeypatch.setattr(web_history.database, 'user_version_changed', lambda: True)
- hist3 = history.WebHistory(progress=stubs.FakeHistoryProgress())
+ hist3 = history.WebHistory(web_history.database,
+ progress=stubs.FakeHistoryProgress())
assert list(hist3.completion) == [
('example.com/1', '', 1),
('example.com/2', '', 2),
]
assert not hist3.metainfo['force_rebuild']
- def test_force_rebuild(self, web_history, stubs):
+ def test_force_rebuild(self, database, stubs):
"""Ensure that completion is regenerated if we force a rebuild."""
+ web_history = history.WebHistory(database, stubs.FakeHistoryProgress())
web_history.add_url(QUrl('example.com/1'), redirect=False, atime=1)
web_history.add_url(QUrl('example.com/2'), redirect=False, atime=2)
web_history.completion.delete('url', 'example.com/2')
- hist2 = history.WebHistory(progress=stubs.FakeHistoryProgress())
- # User version always changes, so this won't work
- # assert list(hist2.completion) == [('example.com/1', '', 1)]
+ hist2 = history.WebHistory(web_history.database,
+ progress=stubs.FakeHistoryProgress())
+ assert list(hist2.completion) == [('example.com/1', '', 1)]
hist2.metainfo['force_rebuild'] = True
- hist3 = history.WebHistory(progress=stubs.FakeHistoryProgress())
+ hist3 = history.WebHistory(web_history.database,
+ progress=stubs.FakeHistoryProgress())
assert list(hist3.completion) == [
('example.com/1', '', 1),
('example.com/2', '', 2),
@@ -424,7 +420,8 @@ class TestRebuild:
web_history.add_url(QUrl('http://example.org'),
redirect=False, atime=2)
- hist2 = history.WebHistory(progress=stubs.FakeHistoryProgress())
+ hist2 = history.WebHistory(web_history.database,
+ progress=stubs.FakeHistoryProgress())
assert list(hist2.completion) == [('http://example.com', '', 1)]
def test_pattern_change_rebuild(self, config_stub, web_history, stubs):
@@ -436,14 +433,16 @@ class TestRebuild:
web_history.add_url(QUrl('http://example.org'),
redirect=False, atime=2)
- hist2 = history.WebHistory(progress=stubs.FakeHistoryProgress())
+ hist2 = history.WebHistory(web_history.database,
+ progress=stubs.FakeHistoryProgress())
assert list(hist2.completion) == [
('http://example.com', '', 1),
]
config_stub.val.completion.web_history.exclude = []
- hist3 = history.WebHistory(progress=stubs.FakeHistoryProgress())
+ hist3 = history.WebHistory(web_history.database,
+ progress=stubs.FakeHistoryProgress())
assert list(hist3.completion) == [
('http://example.com', '', 1),
('http://example.org', '', 2)
@@ -454,37 +453,39 @@ class TestRebuild:
web_history.add_url(QUrl('example.com/2'), redirect=False, atime=2)
# Trigger a completion rebuild
- monkeypatch.setattr(sql, 'user_version_changed', lambda: True)
+ monkeypatch.setattr(web_history.database, 'user_version_changed', lambda: True)
progress = stubs.FakeHistoryProgress()
- history.WebHistory(progress=progress)
+ history.WebHistory(web_history.database, progress=progress)
assert progress._value == 2
assert progress._started
assert progress._finished
- def test_interrupted(self, stubs, web_history, monkeypatch):
+ def test_interrupted(self, stubs, database, monkeypatch):
"""If we interrupt the rebuilding process, force_rebuild should still be set."""
+ web_history = history.WebHistory(database, stubs.FakeHistoryProgress())
web_history.add_url(QUrl('example.com/1'), redirect=False, atime=1)
+ web_history.completion.delete('url', 'example.com/1')
progress = stubs.FakeHistoryProgress(raise_on_tick=True)
# Trigger a completion rebuild
- monkeypatch.setattr(sql, 'user_version_changed', lambda: True)
+ monkeypatch.setattr(web_history.database, 'user_version_changed', lambda: True)
with pytest.raises(Exception, match='tick-tock'):
- history.WebHistory(progress=progress)
+ history.WebHistory(web_history.database, progress=progress)
assert web_history.metainfo['force_rebuild']
- # If we now try again, we should get another rebuild. But due to user_version
- # always changing, we can't test this at the moment (see the FIXME in the
- # docstring for details)
+ hist2 = history.WebHistory(web_history.database,
+ progress=stubs.FakeHistoryProgress())
+ assert list(hist2.completion) == [('example.com/1', '', 1)]
class TestCompletionMetaInfo:
@pytest.fixture
- def metainfo(self):
- return history.CompletionMetaInfo()
+ def metainfo(self, database):
+ return history.CompletionMetaInfo(database)
def test_contains_keyerror(self, metainfo):
with pytest.raises(KeyError):
@@ -507,27 +508,27 @@ class TestCompletionMetaInfo:
metainfo['excluded_patterns'] = value
assert metainfo['excluded_patterns'] == value
- # FIXME: It'd be good to test those two things via WebHistory (and not just
- # CompletionMetaInfo in isolation), but we can't do that right now - see the
- # docstring of TestRebuild for details.
-
- def test_recovery_no_key(self, metainfo):
- metainfo.delete('key', 'force_rebuild')
+ def test_recovery_no_key(self, caplog, database, stubs):
+ web_history = history.WebHistory(database, stubs.FakeHistoryProgress())
+ web_history.metainfo.delete('key', 'force_rebuild')
with pytest.raises(sql.BugError, match='No result for single-result query'):
- metainfo['force_rebuild']
+ web_history.metainfo['force_rebuild']
- metainfo.try_recover()
- assert not metainfo['force_rebuild']
+ with caplog.at_level(logging.WARNING):
+ web_history2 = history.WebHistory(database, stubs.FakeHistoryProgress())
+ assert not web_history2.metainfo['force_rebuild']
- def test_recovery_no_table(self, metainfo):
- sql.Query("DROP TABLE CompletionMetaInfo").run()
+ def test_recovery_no_table(self, caplog, database, stubs):
+ web_history = history.WebHistory(database, stubs.FakeHistoryProgress())
+ web_history.metainfo.database.query("DROP TABLE CompletionMetaInfo").run()
with pytest.raises(sql.BugError, match='no such table: CompletionMetaInfo'):
- metainfo['force_rebuild']
+ web_history.metainfo['force_rebuild']
- metainfo.try_recover()
- assert not metainfo['force_rebuild']
+ with caplog.at_level(logging.WARNING):
+ web_history2 = history.WebHistory(database, stubs.FakeHistoryProgress())
+ assert not web_history2.metainfo['force_rebuild']
class TestHistoryProgress:
diff --git a/tests/unit/completion/test_histcategory.py b/tests/unit/completion/test_histcategory.py
index e0a12943b..cb37fb784 100644
--- a/tests/unit/completion/test_histcategory.py
+++ b/tests/unit/completion/test_histcategory.py
@@ -32,10 +32,11 @@ from qutebrowser.utils import usertypes
@pytest.fixture
-def hist(init_sql, config_stub):
+def hist(data_tmpdir, config_stub):
+ db = sql.Database(str(data_tmpdir / 'test_histcategory.db'))
config_stub.val.completion.timestamp_format = '%Y-%m-%d'
config_stub.val.completion.web_history.max_items = -1
- return sql.SqlTable('CompletionHistory', ['url', 'title', 'last_atime'])
+ return sql.SqlTable(db, 'CompletionHistory', ['url', 'title', 'last_atime'])
@pytest.mark.parametrize('pattern, before, after', [
@@ -99,7 +100,7 @@ def test_set_pattern(pattern, before, after, model_validator, hist):
"""Validate the filtering and sorting results of set_pattern."""
for row in before:
hist.insert({'url': row[0], 'title': row[1], 'last_atime': 1})
- cat = histcategory.HistoryCategory()
+ cat = histcategory.HistoryCategory(database=hist.database)
model_validator.set_model(cat)
cat.set_pattern(pattern)
model_validator.validate(after)
@@ -110,7 +111,7 @@ def test_set_pattern_repeated(model_validator, hist):
hist.insert({'url': 'example.com/foo', 'title': 'title1', 'last_atime': 1})
hist.insert({'url': 'example.com/bar', 'title': 'title2', 'last_atime': 1})
hist.insert({'url': 'example.com/baz', 'title': 'title3', 'last_atime': 1})
- cat = histcategory.HistoryCategory()
+ cat = histcategory.HistoryCategory(database=hist.database)
model_validator.set_model(cat)
cat.set_pattern('b')
@@ -143,7 +144,7 @@ def test_set_pattern_repeated(model_validator, hist):
], ids=['numbers', 'characters'])
def test_set_pattern_long(hist, message_mock, caplog, pattern):
hist.insert({'url': 'example.com/foo', 'title': 'title1', 'last_atime': 1})
- cat = histcategory.HistoryCategory()
+ cat = histcategory.HistoryCategory(database=hist.database)
with caplog.at_level(logging.ERROR):
cat.set_pattern(pattern)
msg = message_mock.getmsg(usertypes.MessageLevel.error)
@@ -153,7 +154,7 @@ def test_set_pattern_long(hist, message_mock, caplog, pattern):
@hypothesis.given(pat=strategies.text())
def test_set_pattern_hypothesis(hist, pat, caplog):
hist.insert({'url': 'example.com/foo', 'title': 'title1', 'last_atime': 1})
- cat = histcategory.HistoryCategory()
+ cat = histcategory.HistoryCategory(database=hist.database)
with caplog.at_level(logging.ERROR):
cat.set_pattern(pat)
@@ -202,7 +203,7 @@ def test_sorting(max_items, before, after, model_validator, hist, config_stub):
for url, title, atime in before:
timestamp = datetime.datetime.strptime(atime, '%Y-%m-%d').timestamp()
hist.insert({'url': url, 'title': title, 'last_atime': timestamp})
- cat = histcategory.HistoryCategory()
+ cat = histcategory.HistoryCategory(database=hist.database)
model_validator.set_model(cat)
cat.set_pattern('')
model_validator.validate(after)
@@ -211,7 +212,7 @@ def test_sorting(max_items, before, after, model_validator, hist, config_stub):
def test_remove_rows(hist, model_validator):
hist.insert({'url': 'foo', 'title': 'Foo', 'last_atime': 0})
hist.insert({'url': 'bar', 'title': 'Bar', 'last_atime': 0})
- cat = histcategory.HistoryCategory()
+ cat = histcategory.HistoryCategory(database=hist.database)
model_validator.set_model(cat)
cat.set_pattern('')
hist.delete('url', 'foo')
@@ -227,7 +228,7 @@ def test_remove_rows_fetch(hist):
'title': [str(i) for i in range(300)],
'last_atime': [0] * 300,
})
- cat = histcategory.HistoryCategory()
+ cat = histcategory.HistoryCategory(database=hist.database)
cat.set_pattern('')
# sanity check that we didn't fetch everything up front
@@ -245,20 +246,21 @@ def test_remove_rows_fetch(hist):
('%m/%d/%Y %H:%M', '02/27/2018 08:30'),
('', ''),
])
-def test_timestamp_fmt(fmt, expected, model_validator, config_stub, init_sql):
+def test_timestamp_fmt(fmt, expected, model_validator, config_stub, data_tmpdir):
"""Validate the filtering and sorting results of set_pattern."""
config_stub.val.completion.timestamp_format = fmt
- hist = sql.SqlTable('CompletionHistory', ['url', 'title', 'last_atime'])
+ db = sql.Database(str(data_tmpdir / 'test_timestamp_fmt.db'))
+ hist = sql.SqlTable(db, 'CompletionHistory', ['url', 'title', 'last_atime'])
atime = datetime.datetime(2018, 2, 27, 8, 30)
hist.insert({'url': 'foo', 'title': '', 'last_atime': atime.timestamp()})
- cat = histcategory.HistoryCategory()
+ cat = histcategory.HistoryCategory(database=hist.database)
model_validator.set_model(cat)
cat.set_pattern('')
model_validator.validate([('foo', '', expected)])
def test_skip_duplicate_set(message_mock, caplog, hist):
- cat = histcategory.HistoryCategory()
+ cat = histcategory.HistoryCategory(database=hist.database)
cat.set_pattern('foo')
cat.set_pattern('foobarbaz')
msg = caplog.messages[-1]
diff --git a/tests/unit/misc/test_sql.py b/tests/unit/misc/test_sql.py
index f6fa68869..80ab7513c 100644
--- a/tests/unit/misc/test_sql.py
+++ b/tests/unit/misc/test_sql.py
@@ -23,12 +23,12 @@ import pytest
import hypothesis
from hypothesis import strategies
-from PyQt5.QtSql import QSqlError
+from PyQt5.QtSql import QSqlDatabase, QSqlError, QSqlQuery
from qutebrowser.misc import sql
-pytestmark = pytest.mark.usefixtures('init_sql')
+pytestmark = pytest.mark.usefixtures('data_tmpdir')
class TestUserVersion:
@@ -120,23 +120,23 @@ class TestSqlError:
assert err.text() == "db text"
-def test_init():
- sql.SqlTable('Foo', ['name', 'val', 'lucky'])
+def test_init_table(database):
+ database.table('Foo', ['name', 'val', 'lucky'])
# should not error if table already exists
- sql.SqlTable('Foo', ['name', 'val', 'lucky'])
+ database.table('Foo', ['name', 'val', 'lucky'])
-def test_insert(qtbot):
- table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
+def test_insert(qtbot, database):
+ table = database.table('Foo', ['name', 'val', 'lucky'])
with qtbot.wait_signal(table.changed):
table.insert({'name': 'one', 'val': 1, 'lucky': False})
with qtbot.wait_signal(table.changed):
table.insert({'name': 'wan', 'val': 1, 'lucky': False})
-def test_insert_replace(qtbot):
- table = sql.SqlTable('Foo', ['name', 'val', 'lucky'],
- constraints={'name': 'PRIMARY KEY'})
+def test_insert_replace(qtbot, database):
+ table = database.table('Foo', ['name', 'val', 'lucky'],
+ constraints={'name': 'PRIMARY KEY'})
with qtbot.wait_signal(table.changed):
table.insert({'name': 'one', 'val': 1, 'lucky': False}, replace=True)
with qtbot.wait_signal(table.changed):
@@ -147,8 +147,8 @@ def test_insert_replace(qtbot):
table.insert({'name': 'one', 'val': 11, 'lucky': True}, replace=False)
-def test_insert_batch(qtbot):
- table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
+def test_insert_batch(qtbot, database):
+ table = database.table('Foo', ['name', 'val', 'lucky'])
with qtbot.wait_signal(table.changed):
table.insert_batch({'name': ['one', 'nine', 'thirteen'],
@@ -160,9 +160,9 @@ def test_insert_batch(qtbot):
('thirteen', 13, True)]
-def test_insert_batch_replace(qtbot):
- table = sql.SqlTable('Foo', ['name', 'val', 'lucky'],
- constraints={'name': 'PRIMARY KEY'})
+def test_insert_batch_replace(qtbot, database):
+ table = database.table('Foo', ['name', 'val', 'lucky'],
+ constraints={'name': 'PRIMARY KEY'})
with qtbot.wait_signal(table.changed):
table.insert_batch({'name': ['one', 'nine', 'thirteen'],
@@ -185,8 +185,8 @@ def test_insert_batch_replace(qtbot):
'lucky': [True, True]})
-def test_iter():
- table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
+def test_iter(database):
+ table = database.table('Foo', ['name', 'val', 'lucky'])
table.insert({'name': 'one', 'val': 1, 'lucky': False})
table.insert({'name': 'nine', 'val': 9, 'lucky': False})
table.insert({'name': 'thirteen', 'val': 13, 'lucky': True})
@@ -205,15 +205,15 @@ def test_iter():
([{"a": 2, "b": 5}, {"a": 1, "b": 6}, {"a": 3, "b": 4}], 'a', 'asc', -1,
[(1, 6), (2, 5), (3, 4)]),
])
-def test_select(rows, sort_by, sort_order, limit, result):
- table = sql.SqlTable('Foo', ['a', 'b'])
+def test_select(rows, sort_by, sort_order, limit, result, database):
+ table = database.table('Foo', ['a', 'b'])
for row in rows:
table.insert(row)
assert list(table.select(sort_by, sort_order, limit)) == result
-def test_delete(qtbot):
- table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
+def test_delete(qtbot, database):
+ table = database.table('Foo', ['name', 'val', 'lucky'])
table.insert({'name': 'one', 'val': 1, 'lucky': False})
table.insert({'name': 'nine', 'val': 9, 'lucky': False})
table.insert({'name': 'thirteen', 'val': 13, 'lucky': True})
@@ -227,8 +227,8 @@ def test_delete(qtbot):
assert not list(table)
-def test_len():
- table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
+def test_len(database):
+ table = database.table('Foo', ['name', 'val', 'lucky'])
assert len(table) == 0
table.insert({'name': 'one', 'val': 1, 'lucky': False})
assert len(table) == 1
@@ -238,15 +238,15 @@ def test_len():
assert len(table) == 3
-def test_bool():
- table = sql.SqlTable('Foo', ['name'])
+def test_bool(database):
+ table = database.table('Foo', ['name'])
assert not table
table.insert({'name': 'one'})
assert table
-def test_bool_benchmark(benchmark):
- table = sql.SqlTable('Foo', ['number'])
+def test_bool_benchmark(benchmark, database):
+ table = database.table('Foo', ['number'])
# Simulate a history table
table.create_index('NumberIndex', 'number')
@@ -258,8 +258,8 @@ def test_bool_benchmark(benchmark):
benchmark(run)
-def test_contains():
- table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
+def test_contains(database):
+ table = database.table('Foo', ['name', 'val', 'lucky'])
table.insert({'name': 'one', 'val': 1, 'lucky': False})
table.insert({'name': 'nine', 'val': 9, 'lucky': False})
table.insert({'name': 'thirteen', 'val': 13, 'lucky': True})
@@ -279,8 +279,8 @@ def test_contains():
assert not val_query.run(val=10).value()
-def test_delete_all(qtbot):
- table = sql.SqlTable('Foo', ['name', 'val', 'lucky'])
+def test_delete_all(qtbot, database):
+ table = database.table('Foo', ['name', 'val', 'lucky'])
table.insert({'name': 'one', 'val': 1, 'lucky': False})
table.insert({'name': 'nine', 'val': 9, 'lucky': False})
table.insert({'name': 'thirteen', 'val': 13, 'lucky': True})
@@ -295,90 +295,118 @@ def test_version():
class TestSqlQuery:
- def test_prepare_error(self):
+ def test_prepare_error(self, database):
with pytest.raises(sql.BugError) as excinfo:
- sql.Query('invalid')
+ database.query('invalid')
expected = ('Failed to prepare query "invalid": "near "invalid": '
'syntax error Unable to execute statement"')
assert str(excinfo.value) == expected
@pytest.mark.parametrize('forward_only', [True, False])
- def test_forward_only(self, forward_only):
- q = sql.Query('SELECT 0 WHERE 0', forward_only=forward_only)
+ def test_forward_only(self, forward_only, database):
+ q = database.query('SELECT 0 WHERE 0', forward_only=forward_only)
assert q.query.isForwardOnly() == forward_only
- def test_iter_inactive(self):
- q = sql.Query('SELECT 0')
+ def test_iter_inactive(self, database):
+ q = database.query('SELECT 0')
with pytest.raises(sql.BugError,
match='Cannot iterate inactive query'):
next(iter(q))
- def test_iter_empty(self):
- q = sql.Query('SELECT 0 AS col WHERE 0')
+ def test_iter_empty(self, database):
+ q = database.query('SELECT 0 AS col WHERE 0')
q.run()
with pytest.raises(StopIteration):
next(iter(q))
- def test_iter(self):
- q = sql.Query('SELECT 0 AS col')
+ def test_iter(self, database):
+ q = database.query('SELECT 0 AS col')
q.run()
result = next(iter(q))
assert result.col == 0
- def test_iter_multiple(self):
- q = sql.Query('VALUES (1), (2), (3);')
+ def test_iter_multiple(self, database):
+ q = database.query('VALUES (1), (2), (3);')
res = list(q.run())
assert len(res) == 3
assert res[0].column1 == 1
- def test_run_binding(self):
- q = sql.Query('SELECT :answer')
+ def test_run_binding(self, database):
+ q = database.query('SELECT :answer')
q.run(answer=42)
assert q.value() == 42
- def test_run_missing_binding(self):
- q = sql.Query('SELECT :answer')
+ def test_run_missing_binding(self, database):
+ q = database.query('SELECT :answer')
with pytest.raises(sql.BugError, match='Missing bound values!'):
q.run()
- def test_run_batch(self):
- q = sql.Query('SELECT :answer')
+ def test_run_batch(self, database):
+ q = database.query('SELECT :answer')
q.run_batch(values={'answer': [42]})
assert q.value() == 42
- def test_run_batch_missing_binding(self):
- q = sql.Query('SELECT :answer')
+ def test_run_batch_missing_binding(self, database):
+ q = database.query('SELECT :answer')
with pytest.raises(sql.BugError, match='Missing bound values!'):
q.run_batch(values={})
- def test_value_missing(self):
- q = sql.Query('SELECT 0 WHERE 0')
+ def test_value_missing(self, database):
+ q = database.query('SELECT 0 WHERE 0')
q.run()
- with pytest.raises(sql.BugError,
- match='No result for single-result query'):
+ with pytest.raises(sql.BugError, match='No result for single-result query'):
q.value()
- def test_num_rows_affected_not_active(self):
+ def test_num_rows_affected_not_active(self, database):
with pytest.raises(AssertionError):
- q = sql.Query('SELECT 0')
+ q = database.query('SELECT 0')
q.rows_affected()
- def test_num_rows_affected_select(self):
+ def test_num_rows_affected_select(self, database):
with pytest.raises(AssertionError):
- q = sql.Query('SELECT 0')
+ q = database.query('SELECT 0')
q.run()
q.rows_affected()
@pytest.mark.parametrize('condition', [0, 1])
- def test_num_rows_affected(self, condition):
- table = sql.SqlTable('Foo', ['name'])
+ def test_num_rows_affected(self, condition, database):
+ table = database.table('Foo', ['name'])
table.insert({'name': 'helloworld'})
- q = sql.Query(f'DELETE FROM Foo WHERE {condition}')
+ q = database.query(f'DELETE FROM Foo WHERE {condition}')
q.run()
assert q.rows_affected() == condition
- def test_bound_values(self):
- q = sql.Query('SELECT :answer')
+ def test_bound_values(self, database):
+ q = database.query('SELECT :answer')
q.run(answer=42)
assert q.bound_values() == {':answer': 42}
+
+
+class TestTransaction:
+
+ def test_successful_transaction(self, database):
+ my_table = database.table('my_table', ['column'])
+ with database.transaction():
+ my_table.insert({'column': 1})
+ my_table.insert({'column': 2})
+
+ db2 = QSqlDatabase.addDatabase('QSQLITE', 'db2')
+ db2.setDatabaseName(database.qt_database().databaseName())
+ db2.open()
+ query = QSqlQuery(db2)
+ query.exec('select count(*) from my_table')
+ query.next()
+ assert query.record().value(0) == 0
+ assert database.query('select count(*) from my_table').run().value() == 2
+
+ def test_failed_transaction(self, database):
+ my_table = database.table('my_table', ['column'])
+ try:
+ with database.transaction():
+ my_table.insert({'column': 1})
+ my_table.insert({'column': 2})
+ raise Exception('something went horribly wrong')
+ except Exception:
+ pass
+ assert database.query('select count(*) from my_table').run().value() == 0