summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandre Flament <alex@al-f.net>2020-09-19 18:25:24 +0200
committerAlexandre Flament <alex@al-f.net>2020-09-22 11:57:06 +0200
commitad0758e52a900186f203c61373b6ef3c63240065 (patch)
treeb1603ebec0f86d7f678fdbddbd2e45a981e91f22
parentf9664037a6424bffc6a7bb7af9dcccc07e7c3d84 (diff)
downloadsearxng-ad0758e52a900186f203c61373b6ef3c63240065.tar.gz
searxng-ad0758e52a900186f203c61373b6ef3c63240065.zip
[mod] add searx/webutils.py
contains utility functions and classes used only by webapp.py
-rw-r--r--searx/utils.py139
-rwxr-xr-xsearx/webapp.py10
-rw-r--r--searx/webutils.py127
-rw-r--r--tests/unit/test_utils.py59
-rw-r--r--tests/unit/test_webutils.py65
5 files changed, 198 insertions, 202 deletions
diff --git a/searx/utils.py b/searx/utils.py
index 0eb9f6a34..f74f2ac88 100644
--- a/searx/utils.py
+++ b/searx/utils.py
@@ -1,27 +1,21 @@
# -*- coding: utf-8 -*-
import os
import sys
-import csv
-import hashlib
-import hmac
import re
import json
-from codecs import getincrementalencoder
from imp import load_source
from numbers import Number
from os.path import splitext, join
-from io import open, StringIO
+from io import open
from random import choice
from html.parser import HTMLParser
from lxml.etree import XPath
from babel.core import get_global
-from babel.dates import format_date
from searx import settings
from searx.version import VERSION_STRING
from searx.languages import language_codes
-from searx import settings
from searx import logger
@@ -50,33 +44,6 @@ def gen_useragent(os=None):
return str(useragents['ua'].format(os=os or choice(useragents['os']), version=choice(useragents['versions'])))
-def highlight_content(content, query):
-
- if not content:
- return None
- # ignoring html contents
- # TODO better html content detection
- if content.find('<') != -1:
- return content
-
- if content.lower().find(query.lower()) > -1:
- query_regex = '({0})'.format(re.escape(query))
- content = re.sub(query_regex, '<span class="highlight">\\1</span>',
- content, flags=re.I | re.U)
- else:
- regex_parts = []
- for chunk in query.split():
- if len(chunk) == 1:
- regex_parts.append('\\W+{0}\\W+'.format(re.escape(chunk)))
- else:
- regex_parts.append('{0}'.format(re.escape(chunk)))
- query_regex = '({0})'.format('|'.join(regex_parts))
- content = re.sub(query_regex, '<span class="highlight">\\1</span>',
- content, flags=re.I | re.U)
-
- return content
-
-
class HTMLTextExtractorException(Exception):
pass
@@ -139,91 +106,6 @@ def html_to_text(html):
return s.get_text()
-class UnicodeWriter:
- """
- A CSV writer which will write rows to CSV file "f",
- which is encoded in the given encoding.
- """
-
- def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
- # Redirect output to a queue
- self.queue = StringIO()
- self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
- self.stream = f
- self.encoder = getincrementalencoder(encoding)()
-
- def writerow(self, row):
- self.writer.writerow(row)
- # Fetch UTF-8 output from the queue ...
- data = self.queue.getvalue()
- data = data.strip('\x00')
- # ... and reencode it into the target encoding
- data = self.encoder.encode(data)
- # write to the target stream
- self.stream.write(data.decode())
- # empty queue
- self.queue.truncate(0)
-
- def writerows(self, rows):
- for row in rows:
- self.writerow(row)
-
-
-def get_resources_directory(searx_directory, subdirectory, resources_directory):
- if not resources_directory:
- resources_directory = os.path.join(searx_directory, subdirectory)
- if not os.path.isdir(resources_directory):
- raise Exception(resources_directory + " is not a directory")
- return resources_directory
-
-
-def get_themes(templates_path):
- """Returns available themes list."""
- themes = os.listdir(templates_path)
- if '__common__' in themes:
- themes.remove('__common__')
- return themes
-
-
-def get_static_files(static_path):
- static_files = set()
- static_path_length = len(static_path) + 1
- for directory, _, files in os.walk(static_path):
- for filename in files:
- f = os.path.join(directory[static_path_length:], filename)
- static_files.add(f)
- return static_files
-
-
-def get_result_templates(templates_path):
- result_templates = set()
- templates_path_length = len(templates_path) + 1
- for directory, _, files in os.walk(templates_path):
- if directory.endswith('result_templates'):
- for filename in files:
- f = os.path.join(directory[templates_path_length:], filename)
- result_templates.add(f)
- return result_templates
-
-
-def format_date_by_locale(date, locale_string):
- # strftime works only on dates after 1900
-
- if date.year <= 1900:
- return date.isoformat().split('T')[0]
-
- if locale_string == 'all':
- locale_string = settings['ui']['default_locale'] or 'en_US'
-
- # to avoid crashing if locale is not supported by babel
- try:
- formatted_date = format_date(date, locale=locale_string)
- except:
- formatted_date = format_date(date, "YYYY-MM-dd")
-
- return formatted_date
-
-
def dict_subset(d, properties):
result = {}
for k in properties:
@@ -232,14 +114,6 @@ def dict_subset(d, properties):
return result
-def prettify_url(url, max_length=74):
- if len(url) > max_length:
- chunk_len = int(max_length / 2 + 1)
- return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
- else:
- return url
-
-
# get element in list or default value
def list_get(a_list, index, default=None):
if len(a_list) > index:
@@ -383,17 +257,6 @@ def load_module(filename, module_dir):
return module
-def new_hmac(secret_key, url):
- try:
- secret_key_bytes = bytes(secret_key, 'utf-8')
- except TypeError as err:
- if isinstance(secret_key, bytes):
- secret_key_bytes = secret_key
- else:
- raise err
- return hmac.new(secret_key_bytes, url, hashlib.sha256).hexdigest()
-
-
def to_string(obj):
if isinstance(obj, str):
return obj
diff --git a/searx/webapp.py b/searx/webapp.py
index f2ef8b209..90bc8fc6e 100755
--- a/searx/webapp.py
+++ b/searx/webapp.py
@@ -62,11 +62,12 @@ from searx.exceptions import SearxParameterException
from searx.engines import (
categories, engines, engine_shortcuts, get_engines_stats, initialize_engines
)
-from searx.utils import (
- UnicodeWriter, highlight_content, html_to_text, get_resources_directory,
- get_static_files, get_result_templates, get_themes, gen_useragent,
- dict_subset, prettify_url, match_language
+from searx.webutils import (
+ UnicodeWriter, highlight_content, get_resources_directory,
+ get_static_files, get_result_templates, get_themes,
+ prettify_url, new_hmac
)
+from searx.utils import html_to_text, gen_useragent, dict_subset, match_language
from searx.version import VERSION_STRING
from searx.languages import language_codes as languages
from searx.search import SearchWithPlugins, get_search_query_from_webapp
@@ -76,7 +77,6 @@ from searx.plugins import plugins
from searx.plugins.oa_doi_rewrite import get_doi_resolver
from searx.preferences import Preferences, ValidationException, LANGUAGE_CODES
from searx.answerers import answerers
-from searx.utils import new_hmac
# check if the pyopenssl package is installed.
# It is needed for SSL connection without trouble, see #298
diff --git a/searx/webutils.py b/searx/webutils.py
new file mode 100644
index 000000000..2900c0edd
--- /dev/null
+++ b/searx/webutils.py
@@ -0,0 +1,127 @@
+# -*- coding: utf-8 -*-
+import os
+import csv
+import hashlib
+import hmac
+import re
+
+from io import StringIO
+from codecs import getincrementalencoder
+
+from searx import logger
+
+
+logger = logger.getChild('webutils')
+
+
+class UnicodeWriter:
+ """
+ A CSV writer which will write rows to CSV file "f",
+ which is encoded in the given encoding.
+ """
+
+ def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
+ # Redirect output to a queue
+ self.queue = StringIO()
+ self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
+ self.stream = f
+ self.encoder = getincrementalencoder(encoding)()
+
+ def writerow(self, row):
+ self.writer.writerow(row)
+ # Fetch UTF-8 output from the queue ...
+ data = self.queue.getvalue()
+ data = data.strip('\x00')
+ # ... and reencode it into the target encoding
+ data = self.encoder.encode(data)
+ # write to the target stream
+ self.stream.write(data.decode())
+ # empty queue
+ self.queue.truncate(0)
+
+ def writerows(self, rows):
+ for row in rows:
+ self.writerow(row)
+
+
+def get_resources_directory(searx_directory, subdirectory, resources_directory):
+ if not resources_directory:
+ resources_directory = os.path.join(searx_directory, subdirectory)
+ if not os.path.isdir(resources_directory):
+ raise Exception(resources_directory + " is not a directory")
+ return resources_directory
+
+
+def get_themes(templates_path):
+ """Returns available themes list."""
+ themes = os.listdir(templates_path)
+ if '__common__' in themes:
+ themes.remove('__common__')
+ return themes
+
+
+def get_static_files(static_path):
+ static_files = set()
+ static_path_length = len(static_path) + 1
+ for directory, _, files in os.walk(static_path):
+ for filename in files:
+ f = os.path.join(directory[static_path_length:], filename)
+ static_files.add(f)
+ return static_files
+
+
+def get_result_templates(templates_path):
+ result_templates = set()
+ templates_path_length = len(templates_path) + 1
+ for directory, _, files in os.walk(templates_path):
+ if directory.endswith('result_templates'):
+ for filename in files:
+ f = os.path.join(directory[templates_path_length:], filename)
+ result_templates.add(f)
+ return result_templates
+
+
+def new_hmac(secret_key, url):
+ try:
+ secret_key_bytes = bytes(secret_key, 'utf-8')
+ except TypeError as err:
+ if isinstance(secret_key, bytes):
+ secret_key_bytes = secret_key
+ else:
+ raise err
+ return hmac.new(secret_key_bytes, url, hashlib.sha256).hexdigest()
+
+
+def prettify_url(url, max_length=74):
+ if len(url) > max_length:
+ chunk_len = int(max_length / 2 + 1)
+ return '{0}[...]{1}'.format(url[:chunk_len], url[-chunk_len:])
+ else:
+ return url
+
+
+def highlight_content(content, query):
+
+ if not content:
+ return None
+ # ignoring html contents
+ # TODO better html content detection
+ if content.find('<') != -1:
+ return content
+
+ if content.lower().find(query.lower()) > -1:
+ query_regex = '({0})'.format(re.escape(query))
+ content = re.sub(query_regex, '<span class="highlight">\\1</span>',
+ content, flags=re.I | re.U)
+ else:
+ regex_parts = []
+ for chunk in query.split():
+ if len(chunk) == 1:
+ regex_parts.append('\\W+{0}\\W+'.format(re.escape(chunk)))
+ else:
+ regex_parts.append('{0}'.format(re.escape(chunk)))
+ query_regex = '({0})'.format('|'.join(regex_parts))
+ content = re.sub(query_regex, '<span class="highlight">\\1</span>',
+ content, flags=re.I | re.U)
+
+ return content
diff --git a/tests/unit/test_utils.py b/tests/unit/test_utils.py
index 08b759542..69f5ef92a 100644
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
-import mock
from searx.testing import SearxTestCase
from searx import utils
@@ -16,25 +15,6 @@ class TestUtils(SearxTestCase):
self.assertIsNotNone(utils.searx_useragent())
self.assertTrue(utils.searx_useragent().startswith('searx'))
- def test_highlight_content(self):
- self.assertEqual(utils.highlight_content(0, None), None)
- self.assertEqual(utils.highlight_content(None, None), None)
- self.assertEqual(utils.highlight_content('', None), None)
- self.assertEqual(utils.highlight_content(False, None), None)
-
- contents = [
- '<html></html>'
- 'not<'
- ]
- for content in contents:
- self.assertEqual(utils.highlight_content(content, None), content)
-
- content = 'a'
- query = 'test'
- self.assertEqual(utils.highlight_content(content, query), content)
- query = 'a test'
- self.assertEqual(utils.highlight_content(content, query), content)
-
def test_html_to_text(self):
html = """
<a href="/testlink" class="link_access_account">
@@ -56,15 +36,6 @@ class TestUtils(SearxTestCase):
html = '<p><b>Lorem ipsum</i>dolor sit amet</p>'
self.assertEqual(utils.html_to_text(html), "Lorem ipsum")
- def test_prettify_url(self):
- data = (('https://searx.me/', 'https://searx.me/'),
- ('https://searx.me/ű', 'https://searx.me/ű'),
- ('https://searx.me/' + (100 * 'a'), 'https://searx.me/[...]aaaaaaaaaaaaaaaaa'),
- ('https://searx.me/' + (100 * 'ű'), 'https://searx.me/[...]űűűűűűűűűűűűűűűűű'))
-
- for test_url, expected in data:
- self.assertEqual(utils.prettify_url(test_url, max_length=32), expected)
-
def test_match_language(self):
self.assertEqual(utils.match_language('es', ['es']), 'es')
self.assertEqual(utils.match_language('es', [], fallback='fallback'), 'fallback')
@@ -124,33 +95,3 @@ class TestHTMLTextExtractor(SearxTestCase):
text = '<p><b>Lorem ipsum</i>dolor sit amet</p>'
with self.assertRaises(utils.HTMLTextExtractorException):
self.html_text_extractor.feed(text)
-
-
-class TestUnicodeWriter(SearxTestCase):
-
- def setUp(self):
- self.unicode_writer = utils.UnicodeWriter(mock.MagicMock())
-
- def test_write_row(self):
- row = [1, 2, 3]
- self.assertEqual(self.unicode_writer.writerow(row), None)
-
- def test_write_rows(self):
- self.unicode_writer.writerow = mock.MagicMock()
- rows = [1, 2, 3]
- self.unicode_writer.writerows(rows)
- self.assertEqual(self.unicode_writer.writerow.call_count, len(rows))
-
-
-class TestNewHmac(SearxTestCase):
-
- def test_bytes(self):
- for secret_key in ['secret', b'secret', 1]:
- if secret_key == 1:
- with self.assertRaises(TypeError):
- utils.new_hmac(secret_key, b'http://example.com')
- continue
- res = utils.new_hmac(secret_key, b'http://example.com')
- self.assertEqual(
- res,
- '23e2baa2404012a5cc8e4a18b4aabf0dde4cb9b56f679ddc0fd6d7c24339d819')
diff --git a/tests/unit/test_webutils.py b/tests/unit/test_webutils.py
new file mode 100644
index 000000000..aa464688b
--- /dev/null
+++ b/tests/unit/test_webutils.py
@@ -0,0 +1,65 @@
+# -*- coding: utf-8 -*-
+import mock
+from searx.testing import SearxTestCase
+from searx import webutils
+
+
+class TestWebUtils(SearxTestCase):
+
+ def test_prettify_url(self):
+ data = (('https://searx.me/', 'https://searx.me/'),
+ ('https://searx.me/ű', 'https://searx.me/ű'),
+ ('https://searx.me/' + (100 * 'a'), 'https://searx.me/[...]aaaaaaaaaaaaaaaaa'),
+ ('https://searx.me/' + (100 * 'ű'), 'https://searx.me/[...]űűűűűűűűűűűűűűűűű'))
+
+ for test_url, expected in data:
+ self.assertEqual(webutils.prettify_url(test_url, max_length=32), expected)
+
+ def test_highlight_content(self):
+ self.assertEqual(webutils.highlight_content(0, None), None)
+ self.assertEqual(webutils.highlight_content(None, None), None)
+ self.assertEqual(webutils.highlight_content('', None), None)
+ self.assertEqual(webutils.highlight_content(False, None), None)
+
+ contents = [
+ '<html></html>'
+ 'not<'
+ ]
+ for content in contents:
+ self.assertEqual(webutils.highlight_content(content, None), content)
+
+ content = 'a'
+ query = 'test'
+ self.assertEqual(webutils.highlight_content(content, query), content)
+ query = 'a test'
+ self.assertEqual(webutils.highlight_content(content, query), content)
+
+
+class TestUnicodeWriter(SearxTestCase):
+
+ def setUp(self):
+ self.unicode_writer = webutils.UnicodeWriter(mock.MagicMock())
+
+ def test_write_row(self):
+ row = [1, 2, 3]
+ self.assertEqual(self.unicode_writer.writerow(row), None)
+
+ def test_write_rows(self):
+ self.unicode_writer.writerow = mock.MagicMock()
+ rows = [1, 2, 3]
+ self.unicode_writer.writerows(rows)
+ self.assertEqual(self.unicode_writer.writerow.call_count, len(rows))
+
+
+class TestNewHmac(SearxTestCase):
+
+ def test_bytes(self):
+ for secret_key in ['secret', b'secret', 1]:
+ if secret_key == 1:
+ with self.assertRaises(TypeError):
+ webutils.new_hmac(secret_key, b'http://example.com')
+ continue
+ res = webutils.new_hmac(secret_key, b'http://example.com')
+ self.assertEqual(
+ res,
+ '23e2baa2404012a5cc8e4a18b4aabf0dde4cb9b56f679ddc0fd6d7c24339d819')