summaryrefslogtreecommitdiff
path: root/searx/utils.py
diff options
context:
space:
mode:
authorAlexandre Flament <alex@al-f.net>2020-11-26 15:12:11 +0100
committerAlexandre Flament <alex@al-f.net>2020-12-03 10:22:48 +0100
commit1d0c368746e0ae28ea042edaf4c75ee3a2b738c2 (patch)
tree8a277759920f97677510e0e72cc0f16d84817f11 /searx/utils.py
parent6b5a57882242f24f867b6aa14b79b514720c6d83 (diff)
downloadsearxng-1d0c368746e0ae28ea042edaf4c75ee3a2b738c2.tar.gz
searxng-1d0c368746e0ae28ea042edaf4c75ee3a2b738c2.zip
[enh] record details exception per engine
add an new API /stats/errors
Diffstat (limited to 'searx/utils.py')
-rw-r--r--searx/utils.py148
1 files changed, 125 insertions, 23 deletions
diff --git a/searx/utils.py b/searx/utils.py
index 738f2c4d5..80cb556fd 100644
--- a/searx/utils.py
+++ b/searx/utils.py
@@ -10,7 +10,7 @@ from html.parser import HTMLParser
from urllib.parse import urljoin, urlparse
from lxml import html
-from lxml.etree import XPath, _ElementStringResult, _ElementUnicodeResult
+from lxml.etree import ElementBase, XPath, XPathError, XPathSyntaxError, _ElementStringResult, _ElementUnicodeResult
from babel.core import get_global
@@ -18,6 +18,7 @@ from searx import settings
from searx.data import USER_AGENTS
from searx.version import VERSION_STRING
from searx.languages import language_codes
+from searx.exceptions import SearxXPathSyntaxException, SearxEngineXPathException
from searx import logger
@@ -33,6 +34,13 @@ xpath_cache = dict()
lang_to_lc_cache = dict()
+class NotSetClass:
+ pass
+
+
+NOTSET = NotSetClass()
+
+
def searx_useragent():
"""Return the searx User Agent"""
return 'searx/{searx_version} {suffix}'.format(
@@ -125,7 +133,7 @@ def html_to_text(html_str):
return s.get_text()
-def extract_text(xpath_results):
+def extract_text(xpath_results, allow_none=False):
"""Extract text from a lxml result
* if xpath_results is list, extract the text from each result and concat the list
@@ -133,22 +141,27 @@ def extract_text(xpath_results):
( text_content() method from lxml )
* if xpath_results is a string element, then it's already done
"""
- if type(xpath_results) == list:
+ if isinstance(xpath_results, list):
# it's list of result : concat everything using recursive call
result = ''
for e in xpath_results:
result = result + extract_text(e)
return result.strip()
- elif type(xpath_results) in [_ElementStringResult, _ElementUnicodeResult]:
- # it's a string
- return ''.join(xpath_results)
- else:
+ elif isinstance(xpath_results, ElementBase):
# it's a element
text = html.tostring(
xpath_results, encoding='unicode', method='text', with_tail=False
)
text = text.strip().replace('\n', ' ')
return ' '.join(text.split())
+ elif isinstance(xpath_results, (_ElementStringResult, _ElementUnicodeResult, str, Number, bool)):
+ return str(xpath_results)
+ elif xpath_results is None and allow_none:
+ return None
+ elif xpath_results is None and not allow_none:
+ raise ValueError('extract_text(None, allow_none=False)')
+ else:
+ raise ValueError('unsupported type')
def normalize_url(url, base_url):
@@ -170,7 +183,7 @@ def normalize_url(url, base_url):
>>> normalize_url('', 'https://example.com')
'https://example.com/'
>>> normalize_url('/test', '/path')
- raise Exception
+ raise ValueError
Raises:
* lxml.etree.ParserError
@@ -194,7 +207,7 @@ def normalize_url(url, base_url):
# add a / at this end of the url if there is no path
if not parsed_url.netloc:
- raise Exception('Cannot parse url')
+ raise ValueError('Cannot parse url')
if not parsed_url.path:
url += '/'
@@ -224,17 +237,17 @@ def extract_url(xpath_results, base_url):
>>> f('', 'https://example.com')
raise lxml.etree.ParserError
>>> searx.utils.extract_url([], 'https://example.com')
- raise Exception
+ raise ValueError
Raises:
- * Exception
+ * ValueError
* lxml.etree.ParserError
Returns:
* str: normalized URL
"""
if xpath_results == []:
- raise Exception('Empty url resultset')
+ raise ValueError('Empty url resultset')
url = extract_text(xpath_results)
return normalize_url(url, base_url)
@@ -258,7 +271,6 @@ def dict_subset(d, properties):
def list_get(a_list, index, default=None):
"""Get element in list or default value
-
Examples:
>>> list_get(['A', 'B', 'C'], 0)
'A'
@@ -310,7 +322,7 @@ def get_torrent_size(filesize, filesize_multiplier):
filesize = int(filesize * 1000 * 1000)
elif filesize_multiplier == 'KiB':
filesize = int(filesize * 1000)
- except:
+ except ValueError:
filesize = None
return filesize
@@ -506,20 +518,110 @@ def get_engine_from_settings(name):
return {}
-def get_xpath(xpath_str):
+def get_xpath(xpath_spec):
"""Return cached compiled XPath
There is no thread lock.
Worst case scenario, xpath_str is compiled more than one time.
+
+ Args:
+ * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
+
+ Returns:
+ * result (bool, float, list, str): Results.
+
+ Raises:
+ * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
+ * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
+ """
+ if isinstance(xpath_spec, str):
+ result = xpath_cache.get(xpath_spec, None)
+ if result is None:
+ try:
+ result = XPath(xpath_spec)
+ except XPathSyntaxError as e:
+ raise SearxXPathSyntaxException(xpath_spec, str(e.msg))
+ xpath_cache[xpath_spec] = result
+ return result
+
+ if isinstance(xpath_spec, XPath):
+ return xpath_spec
+
+ raise TypeError('xpath_spec must be either a str or a lxml.etree.XPath')
+
+
+def eval_xpath(element, xpath_spec):
+ """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all.
+ See https://lxml.de/xpathxslt.html#xpath-return-values
+
+ Args:
+ * element (ElementBase): [description]
+ * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
+
+ Returns:
+ * result (bool, float, list, str): Results.
+
+ Raises:
+ * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
+ * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
+ * SearxEngineXPathException: Raise when the XPath can't be evaluated.
"""
- result = xpath_cache.get(xpath_str, None)
- if result is None:
- result = XPath(xpath_str)
- xpath_cache[xpath_str] = result
+ xpath = get_xpath(xpath_spec)
+ try:
+ return xpath(element)
+ except XPathError as e:
+ arg = ' '.join([str(i) for i in e.args])
+ raise SearxEngineXPathException(xpath_spec, arg)
+
+
+def eval_xpath_list(element, xpath_spec, min_len=None):
+ """Same as eval_xpath, check if the result is a list
+
+ Args:
+ * element (ElementBase): [description]
+ * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath
+ * min_len (int, optional): [description]. Defaults to None.
+
+ Raises:
+ * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
+ * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
+ * SearxEngineXPathException: raise if the result is not a list
+
+ Returns:
+ * result (bool, float, list, str): Results.
+ """
+ result = eval_xpath(element, xpath_spec)
+ if not isinstance(result, list):
+ raise SearxEngineXPathException(xpath_spec, 'the result is not a list')
+ if min_len is not None and min_len > len(result):
+ raise SearxEngineXPathException(xpath_spec, 'len(xpath_str) < ' + str(min_len))
return result
-def eval_xpath(element, xpath_str):
- """Equivalent of element.xpath(xpath_str) but compile xpath_str once for all."""
- xpath = get_xpath(xpath_str)
- return xpath(element)
+def eval_xpath_getindex(elements, xpath_spec, index, default=NOTSET):
+ """Call eval_xpath_list then get one element using the index parameter.
+ If the index does not exist, either aise an exception is default is not set,
+ other return the default value (can be None).
+
+ Args:
+ * elements (ElementBase): lxml element to apply the xpath.
+ * xpath_spec (str|lxml.etree.XPath): XPath as a str or lxml.etree.XPath.
+ * index (int): index to get
+ * default (Object, optional): Defaults if index doesn't exist.
+
+ Raises:
+ * TypeError: Raise when xpath_spec is neither a str nor a lxml.etree.XPath
+ * SearxXPathSyntaxException: Raise when there is a syntax error in the XPath
+ * SearxEngineXPathException: if the index is not found. Also see eval_xpath.
+
+ Returns:
+ * result (bool, float, list, str): Results.
+ """
+ result = eval_xpath_list(elements, xpath_spec)
+ if index >= -len(result) and index < len(result):
+ return result[index]
+ if default == NOTSET:
+ # raise an SearxEngineXPathException instead of IndexError
+ # to record xpath_spec
+ raise SearxEngineXPathException(xpath_spec, 'index ' + str(index) + ' not found')
+ return default