summaryrefslogtreecommitdiff
path: root/qutebrowser
diff options
context:
space:
mode:
authorFlorian Bruhin <me@the-compiler.org>2021-07-09 17:06:23 +0200
committerGitHub <noreply@github.com>2021-07-09 17:06:23 +0200
commitae6d9009716c85c679b490ab4df92e80b77b3fa5 (patch)
tree3c03eb22447c3233533dc91679979ee3de8f8991 /qutebrowser
parent8bdab79011680c13b3b2072a3b18a4471c168131 (diff)
parent71a7674a706b73dfaac5958e9c3bca414c4e8665 (diff)
downloadqutebrowser-ae6d9009716c85c679b490ab4df92e80b77b3fa5.tar.gz
qutebrowser-ae6d9009716c85c679b490ab4df92e80b77b3fa5.zip
Merge pull request #6567 from lufte/issue6039
Database class
Diffstat (limited to 'qutebrowser')
-rw-r--r--qutebrowser/app.py7
-rw-r--r--qutebrowser/browser/history.py96
-rw-r--r--qutebrowser/completion/models/histcategory.py7
-rw-r--r--qutebrowser/completion/models/urlmodel.py3
-rw-r--r--qutebrowser/misc/sql.py334
5 files changed, 265 insertions, 182 deletions
diff --git a/qutebrowser/app.py b/qutebrowser/app.py
index 2df0a82f6..1ab28e8d0 100644
--- a/qutebrowser/app.py
+++ b/qutebrowser/app.py
@@ -41,6 +41,7 @@ import os
import sys
import functools
import tempfile
+import pathlib
import datetime
import argparse
from typing import Iterable, Optional
@@ -479,11 +480,9 @@ def _init_modules(*, args):
with debug.log_time("init", "Initializing SQL/history"):
try:
- log.init.debug("Initializing SQL...")
- sql.init(os.path.join(standarddir.data(), 'history.sqlite'))
-
log.init.debug("Initializing web history...")
- history.init(objects.qapp)
+ history.init(db_path=pathlib.Path(standarddir.data()) / 'history.sqlite',
+ parent=objects.qapp)
except sql.KnownError as e:
error.handle_fatal_exc(e, 'Error initializing SQL',
pre_text='Error initializing SQL',
diff --git a/qutebrowser/browser/history.py b/qutebrowser/browser/history.py
index 773c6cc51..559992327 100644
--- a/qutebrowser/browser/history.py
+++ b/qutebrowser/browser/history.py
@@ -22,9 +22,10 @@
import os
import time
import contextlib
-from typing import cast, Mapping, MutableSequence
+import pathlib
+from typing import cast, Mapping, MutableSequence, Optional
-from PyQt5.QtCore import pyqtSlot, QUrl, pyqtSignal
+from PyQt5.QtCore import pyqtSlot, QUrl, QObject, pyqtSignal
from PyQt5.QtWidgets import QProgressDialog, QApplication
from qutebrowser.config import config
@@ -91,13 +92,14 @@ class CompletionMetaInfo(sql.SqlTable):
'force_rebuild': False,
}
- def __init__(self, parent=None):
+ def __init__(self, database: sql.Database,
+ parent: Optional[QObject] = None) -> None:
self._fields = ['key', 'value']
self._constraints = {'key': 'PRIMARY KEY'}
- super().__init__(
- "CompletionMetaInfo", self._fields, constraints=self._constraints)
+ super().__init__(database, "CompletionMetaInfo", self._fields,
+ constraints=self._constraints, parent=parent)
- if sql.user_version_changed():
+ if database.user_version_changed():
self._init_default_values()
def _check_key(self, key):
@@ -125,8 +127,8 @@ class CompletionMetaInfo(sql.SqlTable):
def __getitem__(self, key):
self._check_key(key)
- query = sql.Query('SELECT value FROM CompletionMetaInfo '
- 'WHERE key = :key')
+ query = self.database.query('SELECT value FROM CompletionMetaInfo '
+ 'WHERE key = :key')
return query.run(key=key).value()
def __setitem__(self, key, value):
@@ -138,8 +140,9 @@ class CompletionHistory(sql.SqlTable):
"""History which only has the newest entry for each URL."""
- def __init__(self, parent=None):
- super().__init__("CompletionHistory", ['url', 'title', 'last_atime'],
+ def __init__(self, database: sql.Database,
+ parent: Optional[QObject] = None) -> None:
+ super().__init__(database, "CompletionHistory", ['url', 'title', 'last_atime'],
constraints={'url': 'PRIMARY KEY',
'title': 'NOT NULL',
'last_atime': 'NOT NULL'},
@@ -162,8 +165,9 @@ class WebHistory(sql.SqlTable):
# one url cleared
url_cleared = pyqtSignal(QUrl)
- def __init__(self, progress, parent=None):
- super().__init__("History", ['url', 'title', 'atime', 'redirect'],
+ def __init__(self, database: sql.Database, progress: HistoryProgress,
+ parent: Optional[QObject] = None) -> None:
+ super().__init__(database, "History", ['url', 'title', 'atime', 'redirect'],
constraints={'url': 'NOT NULL',
'title': 'NOT NULL',
'atime': 'NOT NULL',
@@ -173,8 +177,8 @@ class WebHistory(sql.SqlTable):
# Store the last saved url to avoid duplicate immediate saves.
self._last_url = None
- self.completion = CompletionHistory(parent=self)
- self.metainfo = CompletionMetaInfo(parent=self)
+ self.completion = CompletionHistory(database, parent=self)
+ self.metainfo = CompletionMetaInfo(database, parent=self)
try:
rebuild_completion = self.metainfo['force_rebuild']
@@ -184,16 +188,18 @@ class WebHistory(sql.SqlTable):
self.metainfo.try_recover()
rebuild_completion = self.metainfo['force_rebuild']
- if sql.user_version_changed():
- # If the DB user version changed, run a full cleanup and rebuild the
- # completion history.
- #
- # In the future, this could be improved to only be done when actually needed
- # - but version changes happen very infrequently, rebuilding everything
- # gives us less corner-cases to deal with, and we can run a VACUUM to make
- # things smaller.
- self._cleanup_history()
- rebuild_completion = True
+ if self.database.user_version_changed():
+ with self.database.transaction():
+ # If the DB user version changed, run a full cleanup and rebuild the
+ # completion history.
+ #
+ # In the future, this could be improved to only be done when actually
+ # needed - but version changes happen very infrequently, rebuilding
+ # everything gives us less corner-cases to deal with, and we can run a
+ # VACUUM to make things smaller.
+ self._cleanup_history()
+ rebuild_completion = True
+ self.database.upgrade_user_version()
# Get a string of all patterns
patterns = config.instance.get_str('completion.web_history.exclude')
@@ -211,19 +217,19 @@ class WebHistory(sql.SqlTable):
self.create_index('HistoryIndex', 'url')
self.create_index('HistoryAtimeIndex', 'atime')
self._contains_query = self.contains_query('url')
- self._between_query = sql.Query('SELECT * FROM History '
- 'where not redirect '
- 'and not url like "qute://%" '
- 'and atime > :earliest '
- 'and atime <= :latest '
- 'ORDER BY atime desc')
-
- self._before_query = sql.Query('SELECT * FROM History '
- 'where not redirect '
- 'and not url like "qute://%" '
- 'and atime <= :latest '
- 'ORDER BY atime desc '
- 'limit :limit offset :offset')
+ self._between_query = self.database.query('SELECT * FROM History '
+ 'where not redirect '
+ 'and not url like "qute://%" '
+ 'and atime > :earliest '
+ 'and atime <= :latest '
+ 'ORDER BY atime desc')
+
+ self._before_query = self.database.query('SELECT * FROM History '
+ 'where not redirect '
+ 'and not url like "qute://%" '
+ 'and atime <= :latest '
+ 'ORDER BY atime desc '
+ 'limit :limit offset :offset')
def __repr__(self):
return utils.get_repr(self, length=len(self))
@@ -271,7 +277,7 @@ class WebHistory(sql.SqlTable):
'qute://pdfjs%',
]
where_clause = ' OR '.join(f"url LIKE '{term}'" for term in terms)
- q = sql.Query(f'DELETE FROM History WHERE {where_clause}')
+ q = self.database.query(f'DELETE FROM History WHERE {where_clause}')
entries = q.run()
log.sql.debug(f"Cleanup removed {entries.rows_affected()} items")
@@ -297,9 +303,9 @@ class WebHistory(sql.SqlTable):
QApplication.processEvents()
# Select the latest entry for each url
- q = sql.Query('SELECT url, title, max(atime) AS atime FROM History '
- 'WHERE NOT redirect '
- 'GROUP BY url ORDER BY atime asc')
+ q = self.database.query('SELECT url, title, max(atime) AS atime FROM History '
+ 'WHERE NOT redirect '
+ 'GROUP BY url ORDER BY atime asc')
result = q.run()
QApplication.processEvents()
entries = list(result)
@@ -319,7 +325,7 @@ class WebHistory(sql.SqlTable):
self._progress.set_maximum(0)
# We might have caused fragmentation - let's clean up.
- sql.Query('VACUUM').run()
+ self.database.query('VACUUM').run()
QApplication.processEvents()
self.completion.insert_batch(data, replace=True)
@@ -472,15 +478,17 @@ def debug_dump_history(dest):
raise cmdutils.CommandError(f'Could not write history: {e}')
-def init(parent=None):
+def init(db_path: pathlib.Path, parent: Optional[QObject] = None) -> None:
"""Initialize the web history.
Args:
+ db_path: The path for the SQLite database.
parent: The parent to use for WebHistory.
"""
global web_history
progress = HistoryProgress()
- web_history = WebHistory(progress=progress, parent=parent)
+ database = sql.Database(str(db_path))
+ web_history = WebHistory(database=database, progress=progress, parent=parent)
if objects.backend == usertypes.Backend.QtWebKit: # pragma: no cover
from qutebrowser.browser.webkit import webkithistory
diff --git a/qutebrowser/completion/models/histcategory.py b/qutebrowser/completion/models/histcategory.py
index bee2b43d9..8dd1be838 100644
--- a/qutebrowser/completion/models/histcategory.py
+++ b/qutebrowser/completion/models/histcategory.py
@@ -34,11 +34,12 @@ class HistoryCategory(QSqlQueryModel):
"""A completion category that queries the SQL history store."""
- def __init__(self, *,
+ def __init__(self, *, database: sql.Database,
delete_func: util.DeleteFuncType = None,
parent: QWidget = None) -> None:
"""Create a new History completion category."""
super().__init__(parent=parent)
+ self._database = database
self.name = "History"
self._query: Optional[sql.Query] = None
@@ -56,7 +57,7 @@ class HistoryCategory(QSqlQueryModel):
if max_items < 0:
return ''
- min_atime = sql.Query(' '.join([
+ min_atime = self._database.query(' '.join([
'SELECT min(last_atime) FROM',
'(SELECT last_atime FROM CompletionHistory',
'ORDER BY last_atime DESC LIMIT :limit)',
@@ -107,7 +108,7 @@ class HistoryCategory(QSqlQueryModel):
# if the number of words changed, we need to generate a new
# query otherwise, we can reuse the prepared query for
# performance
- self._query = sql.Query(' '.join([
+ self._query = self._database.query(' '.join([
"SELECT url, title, {}".format(timefmt),
"FROM CompletionHistory",
# the incoming pattern will have literal % and _ escaped we
diff --git a/qutebrowser/completion/models/urlmodel.py b/qutebrowser/completion/models/urlmodel.py
index 2152f60ec..56af1f7c7 100644
--- a/qutebrowser/completion/models/urlmodel.py
+++ b/qutebrowser/completion/models/urlmodel.py
@@ -90,7 +90,8 @@ def url(*, info):
history_disabled = info.config.get('completion.web_history.max_items') == 0
if not history_disabled and 'history' in categories:
- hist_cat = histcategory.HistoryCategory(delete_func=_delete_history)
+ hist_cat = histcategory.HistoryCategory(database=history.web_history.database,
+ delete_func=_delete_history)
models['history'] = hist_cat
if 'filesystem' in categories:
diff --git a/qutebrowser/misc/sql.py b/qutebrowser/misc/sql.py
index 68c0fd538..814eb2bb0 100644
--- a/qutebrowser/misc/sql.py
+++ b/qutebrowser/misc/sql.py
@@ -17,15 +17,19 @@
# You should have received a copy of the GNU General Public License
# along with qutebrowser. If not, see <https://www.gnu.org/licenses/>.
-"""Provides access to an in-memory sqlite database."""
+"""Provides access to sqlite databases."""
import collections
+import contextlib
import dataclasses
+import types
+from typing import Any, Dict, Iterator, List, Mapping, MutableSequence, Optional, Type
from PyQt5.QtCore import QObject, pyqtSignal
-from PyQt5.QtSql import QSqlDatabase, QSqlQuery, QSqlError
+from PyQt5.QtSql import QSqlDatabase, QSqlError, QSqlQuery
-from qutebrowser.utils import log, debug
+from qutebrowser.qt import sip
+from qutebrowser.utils import debug, log
@dataclasses.dataclass
@@ -48,32 +52,23 @@ class UserVersion:
minor: int
@classmethod
- def from_int(cls, num):
+ def from_int(cls, num: int) -> 'UserVersion':
"""Parse a number from sqlite into a major/minor user version."""
assert 0 <= num <= 0x7FFF_FFFF, num # signed integer, but shouldn't be negative
major = (num & 0x7FFF_0000) >> 16
minor = num & 0x0000_FFFF
return cls(major, minor)
- def to_int(self):
+ def to_int(self) -> int:
"""Get a sqlite integer from a major/minor user version."""
assert 0 <= self.major <= 0x7FFF # signed integer
assert 0 <= self.minor <= 0xFFFF
return self.major << 16 | self.minor
- def __str__(self):
+ def __str__(self) -> str:
return f'{self.major}.{self.minor}'
-_db_user_version = None # The user version we got from the database
-_USER_VERSION = UserVersion(0, 4) # The current / newest user version
-
-
-def user_version_changed():
- """Whether the version stored in the database is different from the current one."""
- return _db_user_version != _USER_VERSION
-
-
class SqliteErrorCode:
"""Error codes as used by sqlite.
@@ -98,11 +93,11 @@ class Error(Exception):
"""Base class for all SQL related errors."""
- def __init__(self, msg, error=None):
+ def __init__(self, msg: str, error: Optional[QSqlError] = None) -> None:
super().__init__(msg)
self.error = error
- def text(self):
+ def text(self) -> str:
"""Get a short text description of the error.
This is a string suitable to show to the user as error message.
@@ -130,18 +125,17 @@ class BugError(Error):
"""
-def raise_sqlite_error(msg, error):
+def raise_sqlite_error(msg: str, error: QSqlError) -> None:
"""Raise either a BugError or KnownError."""
error_code = error.nativeErrorCode()
database_text = error.databaseText()
driver_text = error.driverText()
log.sql.debug("SQL error:")
- log.sql.debug("type: {}".format(
- debug.qenum_key(QSqlError, error.type())))
- log.sql.debug("database text: {}".format(database_text))
- log.sql.debug("driver text: {}".format(driver_text))
- log.sql.debug("error code: {}".format(error_code))
+ log.sql.debug(f"type: {debug.qenum_key(QSqlError, error.type())}")
+ log.sql.debug(f"database text: {database_text}")
+ log.sql.debug(f"driver text: {driver_text}")
+ log.sql.debug(f"error code: {error_code}")
known_errors = [
SqliteErrorCode.BUSY,
@@ -168,82 +162,145 @@ def raise_sqlite_error(msg, error):
raise BugError(msg, error)
-def init(db_path):
- """Initialize the SQL database connection."""
- database = QSqlDatabase.addDatabase('QSQLITE')
- if not database.isValid():
- raise KnownError('Failed to add database. Are sqlite and Qt sqlite '
- 'support installed?')
- database.setDatabaseName(db_path)
- if not database.open():
- error = database.lastError()
- msg = "Failed to open sqlite database at {}: {}".format(db_path,
- error.text())
- raise_sqlite_error(msg, error)
-
- global _db_user_version
- version_int = Query('pragma user_version').run().value()
- _db_user_version = UserVersion.from_int(version_int)
-
- if _db_user_version.major > _USER_VERSION.major:
- raise KnownError(
- "Database is too new for this qutebrowser version (database version "
- f"{_db_user_version}, but {_USER_VERSION.major}.x is supported)")
-
- if user_version_changed():
- log.sql.debug(f"Migrating from version {_db_user_version} to {_USER_VERSION}")
- # Note we're *not* updating the _db_user_version global here. We still want
- # user_version_changed() to return True, as other modules (such as history.py)
- # use it to create the initial table structure.
- Query(f'PRAGMA user_version = {_USER_VERSION.to_int()}').run()
-
- # Enable write-ahead-logging and reduce disk write frequency
- # see https://sqlite.org/pragma.html and issues #2930 and #3507
- #
- # We might already have done this (without a migration) in earlier versions, but
- # as those are idempotent, let's make sure we run them once again.
- Query("PRAGMA journal_mode=WAL").run()
- Query("PRAGMA synchronous=NORMAL").run()
-
-
-def close():
- """Close the SQL connection."""
- QSqlDatabase.removeDatabase(QSqlDatabase.database().connectionName())
-
-
-def version():
- """Return the sqlite version string."""
- try:
- if not QSqlDatabase.database().isOpen():
- init(':memory:')
- ver = Query("select sqlite_version()").run().value()
- close()
- return ver
- return Query("select sqlite_version()").run().value()
- except KnownError as e:
- return 'UNAVAILABLE ({})'.format(e)
+class Database:
+
+ """A wrapper over a QSqlDatabase connection."""
+
+ _USER_VERSION = UserVersion(0, 4) # The current / newest user version
+
+ def __init__(self, path: str) -> None:
+ if QSqlDatabase.database(path).isValid():
+ raise BugError(f'A connection to the database at "{path}" already exists')
+
+ self._path = path
+ database = QSqlDatabase.addDatabase('QSQLITE', path)
+ if not database.isValid():
+ raise KnownError('Failed to add database. Are sqlite and Qt sqlite '
+ 'support installed?')
+ database.setDatabaseName(path)
+ if not database.open():
+ error = database.lastError()
+ msg = f"Failed to open sqlite database at {path}: {error.text()}"
+ raise_sqlite_error(msg, error)
+
+ version_int = self.query('pragma user_version').run().value()
+ self._user_version = UserVersion.from_int(version_int)
+
+ if self._user_version.major > self._USER_VERSION.major:
+ raise KnownError(
+ "Database is too new for this qutebrowser version (database version "
+ f"{self._user_version}, but {self._USER_VERSION.major}.x is supported)")
+
+ if self.user_version_changed():
+ # Enable write-ahead-logging and reduce disk write frequency
+ # see https://sqlite.org/pragma.html and issues #2930 and #3507
+ #
+ # We might already have done this (without a migration) in earlier versions,
+ # but as those are idempotent, let's make sure we run them once again.
+ self.query("PRAGMA journal_mode=WAL").run()
+ self.query("PRAGMA synchronous=NORMAL").run()
+
+ def qt_database(self) -> QSqlDatabase:
+ """Return the wrapped QSqlDatabase instance."""
+ database = QSqlDatabase.database(self._path, open=True)
+ if not database.isValid():
+ raise BugError('Failed to get connection. Did you close() this Database '
+ 'instance?')
+ return database
+
+ def query(self, querystr: str, forward_only: bool = True) -> 'Query':
+ """Return a Query instance linked to this Database."""
+ return Query(self, querystr, forward_only)
+
+ def table(self, name: str, fields: List[str],
+ constraints: Optional[Dict[str, str]] = None,
+ parent: Optional[QObject] = None) -> 'SqlTable':
+ """Return a SqlTable instance linked to this Database."""
+ return SqlTable(self, name, fields, constraints, parent)
+
+ def user_version_changed(self) -> bool:
+ """Whether the version stored in the database differs from the current one."""
+ return self._user_version != self._USER_VERSION
+
+ def upgrade_user_version(self) -> None:
+ """Upgrade the user version to the latest version.
+
+ This method should be called once all required operations to migrate from one
+ version to another have been run.
+ """
+ log.sql.debug(f"Migrating from version {self._user_version} "
+ f"to {self._USER_VERSION}")
+ self.query(f'PRAGMA user_version = {self._USER_VERSION.to_int()}').run()
+ self._user_version = self._USER_VERSION
+
+ def close(self) -> None:
+ """Close the SQL connection."""
+ database = self.qt_database()
+ database.close()
+ sip.delete(database)
+ QSqlDatabase.removeDatabase(self._path)
+
+ def transaction(self) -> 'Transaction':
+ """Return a Transaction object linked to this Database."""
+ return Transaction(self)
+
+
+class Transaction(contextlib.AbstractContextManager): # type: ignore[type-arg]
+
+ """A Database transaction that can be used as a context manager."""
+
+ def __init__(self, database: Database) -> None:
+ self._database = database
+
+ def __enter__(self) -> None:
+ log.sql.debug('Starting a transaction')
+ db = self._database.qt_database()
+ ok = db.transaction()
+ if not ok:
+ error = db.lastError()
+ msg = f'Failed to start a transaction: "{error.text()}"'
+ raise_sqlite_error(msg, error)
+
+ def __exit__(self,
+ _exc_type: Optional[Type[BaseException]],
+ exc_val: Optional[BaseException],
+ _exc_tb: Optional[types.TracebackType]) -> None:
+ db = self._database.qt_database()
+ if exc_val:
+ log.sql.debug('Rolling back a transaction')
+ db.rollback()
+ else:
+ log.sql.debug('Committing a transaction')
+ ok = db.commit()
+ if not ok:
+ error = db.lastError()
+ msg = f'Failed to commit a transaction: "{error.text()}"'
+ raise_sqlite_error(msg, error)
class Query:
"""A prepared SQL query."""
- def __init__(self, querystr, forward_only=True):
+ def __init__(self, database: Database, querystr: str,
+ forward_only: bool = True) -> None:
"""Prepare a new SQL query.
Args:
+ database: The Database object on which to operate.
querystr: String to prepare query from.
forward_only: Optimization for queries that will only step forward.
Must be false for completion queries.
"""
- self.query = QSqlQuery(QSqlDatabase.database())
+ self._database = database
+ self.query = QSqlQuery(database.qt_database())
log.sql.vdebug(f'Preparing: {querystr}') # type: ignore[attr-defined]
ok = self.query.prepare(querystr)
self._check_ok('prepare', ok)
self.query.setForwardOnly(forward_only)
- def __iter__(self):
+ def __iter__(self) -> Iterator[Any]:
if not self.query.isActive():
raise BugError("Cannot iterate inactive query")
rec = self.query.record()
@@ -255,17 +312,16 @@ class Query:
rec = self.query.record()
yield rowtype(*[rec.value(i) for i in range(rec.count())])
- def _check_ok(self, step, ok):
+ def _check_ok(self, step: str, ok: bool) -> None:
if not ok:
query = self.query.lastQuery()
error = self.query.lastError()
- msg = 'Failed to {} query "{}": "{}"'.format(step, query,
- error.text())
+ msg = f'Failed to {step} query "{query}": "{error.text()}"'
raise_sqlite_error(msg, error)
- def _bind_values(self, values):
+ def _bind_values(self, values: Mapping[str, Any]) -> Dict[str, Any]:
for key, val in values.items():
- self.query.bindValue(':{}'.format(key), val)
+ self.query.bindValue(f':{key}', val)
bound_values = self.bound_values()
if None in bound_values.values():
@@ -273,7 +329,7 @@ class Query:
return bound_values
- def run(self, **values):
+ def run(self, **values: Any) -> 'Query':
"""Execute the prepared query."""
log.sql.debug(self.query.lastQuery())
@@ -286,14 +342,13 @@ class Query:
return self
- def run_batch(self, values):
+ def run_batch(self, values: Mapping[str, MutableSequence[Any]]) -> None:
"""Execute the query in batch mode."""
- log.sql.debug('Running SQL query (batch): "{}"'.format(
- self.query.lastQuery()))
+ log.sql.debug(f'Running SQL query (batch): "{self.query.lastQuery()}"')
self._bind_values(values)
- db = QSqlDatabase.database()
+ db = self._database.qt_database()
ok = db.transaction()
self._check_ok('transaction', ok)
@@ -308,13 +363,13 @@ class Query:
ok = db.commit()
self._check_ok('commit', ok)
- def value(self):
+ def value(self) -> Any:
"""Return the result of a single-value query (e.g. an EXISTS)."""
if not self.query.next():
raise BugError("No result for single-result query")
return self.query.record().value(0)
- def rows_affected(self):
+ def rows_affected(self) -> int:
"""Return how many rows were affected by a non-SELECT query."""
assert not self.query.isSelect(), self
assert self.query.isActive(), self
@@ -322,7 +377,7 @@ class Query:
assert rows != -1
return rows
- def bound_values(self):
+ def bound_values(self) -> Dict[str, Any]:
return self.query.boundValues()
@@ -332,84 +387,93 @@ class SqlTable(QObject):
Attributes:
_name: Name of the SQL table this wraps.
+ database: The Database to which this table belongs.
Signals:
changed: Emitted when the table is modified.
"""
changed = pyqtSignal()
+ database: Database
- def __init__(self, name, fields, constraints=None, parent=None):
+ def __init__(self, database: Database, name: str, fields: List[str],
+ constraints: Optional[Dict[str, str]] = None,
+ parent: Optional[QObject] = None) -> None:
"""Wrapper over a table in the SQL database.
Args:
+ database: The Database to which this table belongs.
name: Name of the table.
fields: A list of field names.
constraints: A dict mapping field names to constraint strings.
"""
super().__init__(parent)
self._name = name
+ self.database = database
self._create_table(fields, constraints)
- def _create_table(self, fields, constraints, *, force=False):
+ def _create_table(self, fields: List[str], constraints: Optional[Dict[str, str]],
+ *, force: bool = False) -> None:
"""Create the table if the database is uninitialized.
If the table already exists, this does nothing (except with force=True), so it
can e.g. be called on every user_version change.
"""
- if not user_version_changed() and not force:
+ if not self.database.user_version_changed() and not force:
return
constraints = constraints or {}
- column_defs = ['{} {}'.format(field, constraints.get(field, ''))
+ column_defs = [f'{field} {constraints.get(field, "")}'
for field in fields]
- q = Query("CREATE TABLE IF NOT EXISTS {name} ({column_defs})"
- .format(name=self._name, column_defs=', '.join(column_defs)))
+ q = self.database.query(
+ f"CREATE TABLE IF NOT EXISTS {self._name} ({', '.join(column_defs)})"
+ )
q.run()
- def create_index(self, name, field):
+ def create_index(self, name: str, field: str) -> None:
"""Create an index over this table if the database is uninitialized.
Args:
name: Name of the index, should be unique.
field: Name of the field to index.
"""
- if not user_version_changed():
+ if not self.database.user_version_changed():
return
- q = Query("CREATE INDEX IF NOT EXISTS {name} ON {table} ({field})"
- .format(name=name, table=self._name, field=field))
+ q = self.database.query(
+ f"CREATE INDEX IF NOT EXISTS {name} ON {self._name} ({field})"
+ )
q.run()
- def __iter__(self):
+ def __iter__(self) -> Iterator[Any]:
"""Iterate rows in the table."""
- q = Query("SELECT * FROM {table}".format(table=self._name))
+ q = self.database.query(f"SELECT * FROM {self._name}")
q.run()
return iter(q)
- def contains_query(self, field):
+ def contains_query(self, field: str) -> Query:
"""Return a prepared query that checks for the existence of an item.
Args:
field: Field to match.
"""
- return Query(
- "SELECT EXISTS(SELECT * FROM {table} WHERE {field} = :val)"
- .format(table=self._name, field=field))
+ return self.database.query(
+ f"SELECT EXISTS(SELECT * FROM {self._name} WHERE {field} = :val)"
+ )
- def __len__(self):
+ def __len__(self) -> int:
"""Return the count of rows in the table."""
- q = Query("SELECT count(*) FROM {table}".format(table=self._name))
+ q = self.database.query(f"SELECT count(*) FROM {self._name}")
q.run()
return q.value()
- def __bool__(self):
+ def __bool__(self) -> bool:
"""Check whether there's any data in the table."""
- q = Query(f"SELECT 1 FROM {self._name} LIMIT 1")
+ q = self.database.query(f"SELECT 1 FROM {self._name} LIMIT 1")
q.run()
return q.query.next()
- def delete(self, field, value):
+ def delete(self, field: str, value: Any) -> None:
"""Remove all rows for which `field` equals `value`.
Args:
@@ -419,20 +483,21 @@ class SqlTable(QObject):
Return:
The number of rows deleted.
"""
- q = Query(f"DELETE FROM {self._name} where {field} = :val")
+ q = self.database.query(f"DELETE FROM {self._name} where {field} = :val")
q.run(val=value)
if not q.rows_affected():
- raise KeyError('No row with {} = "{}"'.format(field, value))
+ raise KeyError('No row with {field} = "{value}"')
self.changed.emit()
- def _insert_query(self, values, replace):
- params = ', '.join(':{}'.format(key) for key in values)
+ def _insert_query(self, values: Mapping[str, Any], replace: bool) -> Query:
+ params = ', '.join(f':{key}' for key in values)
+ columns = ', '.join(values)
verb = "REPLACE" if replace else "INSERT"
- return Query("{verb} INTO {table} ({columns}) values({params})".format(
- verb=verb, table=self._name, columns=', '.join(values),
- params=params))
+ return self.database.query(
+ f"{verb} INTO {self._name} ({columns}) values({params})"
+ )
- def insert(self, values, replace=False):
+ def insert(self, values: Mapping[str, Any], replace: bool = False) -> None:
"""Append a row to the table.
Args:
@@ -443,7 +508,8 @@ class SqlTable(QObject):
q.run(**values)
self.changed.emit()
- def insert_batch(self, values, replace=False):
+ def insert_batch(self, values: Mapping[str, MutableSequence[Any]],
+ replace: bool = False) -> None:
"""Performantly append multiple rows to the table.
Args:
@@ -454,12 +520,12 @@ class SqlTable(QObject):
q.run_batch(values)
self.changed.emit()
- def delete_all(self):
+ def delete_all(self) -> None:
"""Remove all rows from the table."""
- Query("DELETE FROM {table}".format(table=self._name)).run()
+ self.database.query(f"DELETE FROM {self._name}").run()
self.changed.emit()
- def select(self, sort_by, sort_order, limit=-1):
+ def select(self, sort_by: str, sort_order: str, limit: int = -1) -> Query:
"""Prepare, run, and return a select statement on this table.
Args:
@@ -469,9 +535,17 @@ class SqlTable(QObject):
Return: A prepared and executed select query.
"""
- q = Query("SELECT * FROM {table} ORDER BY {sort_by} {sort_order} "
- "LIMIT :limit"
- .format(table=self._name, sort_by=sort_by,
- sort_order=sort_order))
+ q = self.database.query(
+ f"SELECT * FROM {self._name} ORDER BY {sort_by} {sort_order} LIMIT :limit"
+ )
q.run(limit=limit)
return q
+
+
+def version() -> str:
+ """Return the sqlite version string."""
+ try:
+ with contextlib.closing(Database(':memory:')) as in_memory_db:
+ return in_memory_db.query("select sqlite_version()").run().value()
+ except KnownError as e:
+ return f'UNAVAILABLE ({e})'