diff options
author | Adam Tauber <asciimoo@gmail.com> | 2015-10-03 17:26:07 +0200 |
---|---|---|
committer | Adam Tauber <asciimoo@gmail.com> | 2015-10-03 17:26:07 +0200 |
commit | b6c3cb0bdd020a459d0ef5c21d1303ed0148cc0c (patch) | |
tree | 4b0c0ae035b7ff467c8ef7d729aa589de38e9f62 /searx/search.py | |
parent | 0ad272c5cb81a9c69008aa86a1f29cd642ddf4ff (diff) | |
download | searxng-b6c3cb0bdd020a459d0ef5c21d1303ed0148cc0c.tar.gz searxng-b6c3cb0bdd020a459d0ef5c21d1303ed0148cc0c.zip |
[enh][mod] result handling refactor
Several changes has been made:
- Parallel result merge
- Scoring algorithm slightly changed (see result_score())
- Proper Thread locking on global data manipulation
Diffstat (limited to 'searx/search.py')
-rw-r--r-- | searx/search.py | 273 |
1 files changed, 16 insertions, 257 deletions
diff --git a/searx/search.py b/searx/search.py index f2b5235b8..02676a149 100644 --- a/searx/search.py +++ b/searx/search.py @@ -16,13 +16,8 @@ along with searx. If not, see < http://www.gnu.org/licenses/ >. ''' import threading -import re import searx.poolrequests as requests_lib -from itertools import izip_longest, chain -from operator import itemgetter -from Queue import Queue from time import time -from urlparse import urlparse, unquote from searx import settings from searx.engines import ( categories, engines @@ -30,6 +25,7 @@ from searx.engines import ( from searx.languages import language_codes from searx.utils import gen_useragent, get_blocked_engines from searx.query import Query +from searx.results import ResultContainer from searx import logger logger = logger.getChild('search') @@ -42,7 +38,8 @@ def search_request_wrapper(fn, url, engine_name, **kwargs): return fn(url, **kwargs) except: # increase errors stats - engines[engine_name].stats['errors'] += 1 + with threading.RLock(): + engines[engine_name].stats['errors'] += 1 # print engine name and specific error message logger.exception('engine crash: {0}'.format(engine_name)) @@ -84,7 +81,7 @@ def default_request_params(): # create a callback wrapper for the search engine results -def make_callback(engine_name, results_queue, callback, params): +def make_callback(engine_name, callback, params, result_container): # creating a callback wrapper for the search engine results def process_callback(response, **kwargs): @@ -96,12 +93,17 @@ def make_callback(engine_name, results_queue, callback, params): response.search_params = params - timeout_overhead = 0.2 # seconds search_duration = time() - params['started'] + # update stats with current page-load-time + with threading.RLock(): + engines[engine_name].stats['page_load_time'] += search_duration + + timeout_overhead = 0.2 # seconds timeout_limit = engines[engine_name].timeout + timeout_overhead + if search_duration > timeout_limit: - engines[engine_name].stats['page_load_time'] += timeout_limit - engines[engine_name].stats['errors'] += 1 + with threading.RLock(): + engines[engine_name].stats['errors'] += 1 return # callback @@ -111,212 +113,11 @@ def make_callback(engine_name, results_queue, callback, params): for result in search_results: result['engine'] = engine_name - results_queue.put_nowait((engine_name, search_results)) - - # update stats with current page-load-time - engines[engine_name].stats['page_load_time'] += search_duration + result_container.extend(engine_name, search_results) return process_callback -# return the meaningful length of the content for a result -def content_result_len(content): - if isinstance(content, basestring): - content = re.sub('[,;:!?\./\\\\ ()-_]', '', content) - return len(content) - else: - return 0 - - -# score results and remove duplications -def score_results(results): - # calculate scoring parameters - flat_res = filter( - None, chain.from_iterable(izip_longest(*results.values()))) - flat_len = len(flat_res) - engines_len = len(results) - - results = [] - - # pass 1: deduplication + scoring - for i, res in enumerate(flat_res): - - res['parsed_url'] = urlparse(res['url']) - - # if the result has no scheme, use http as default - if not res['parsed_url'].scheme: - res['parsed_url'] = res['parsed_url']._replace(scheme="http") - - res['host'] = res['parsed_url'].netloc - - if res['host'].startswith('www.'): - res['host'] = res['host'].replace('www.', '', 1) - - res['engines'] = [res['engine']] - - weight = 1.0 - - # strip multiple spaces and cariage returns from content - if res.get('content'): - res['content'] = re.sub(' +', ' ', - res['content'].strip().replace('\n', '')) - - # get weight of this engine if possible - if hasattr(engines[res['engine']], 'weight'): - weight = float(engines[res['engine']].weight) - - # calculate score for that engine - score = int((flat_len - i) / engines_len) * weight + 1 - - # check for duplicates - duplicated = False - for new_res in results: - # remove / from the end of the url if required - p1 = res['parsed_url'].path[:-1]\ - if res['parsed_url'].path.endswith('/')\ - else res['parsed_url'].path - p2 = new_res['parsed_url'].path[:-1]\ - if new_res['parsed_url'].path.endswith('/')\ - else new_res['parsed_url'].path - - # check if that result is a duplicate - if res['host'] == new_res['host'] and\ - unquote(p1) == unquote(p2) and\ - res['parsed_url'].query == new_res['parsed_url'].query and\ - res.get('template') == new_res.get('template'): - duplicated = new_res - break - - # merge duplicates together - if duplicated: - # using content with more text - if content_result_len(res.get('content', '')) >\ - content_result_len(duplicated.get('content', '')): - duplicated['content'] = res['content'] - - # increase result-score - duplicated['score'] += score - - # add engine to list of result-engines - duplicated['engines'].append(res['engine']) - - # using https if possible - if duplicated['parsed_url'].scheme == 'https': - continue - elif res['parsed_url'].scheme == 'https': - duplicated['url'] = res['parsed_url'].geturl() - duplicated['parsed_url'] = res['parsed_url'] - - # if there is no duplicate found, append result - else: - res['score'] = score - - results.append(res) - - results = sorted(results, key=itemgetter('score'), reverse=True) - - # pass 2 : group results by category and template - gresults = [] - categoryPositions = {} - - for i, res in enumerate(results): - # FIXME : handle more than one category per engine - category = engines[res['engine']].categories[0] + ':' + ''\ - if 'template' not in res\ - else res['template'] - - current = None if category not in categoryPositions\ - else categoryPositions[category] - - # group with previous results using the same category - # if the group can accept more result and is not too far - # from the current position - if current is not None and (current['count'] > 0)\ - and (len(gresults) - current['index'] < 20): - # group with the previous results using - # the same category with this one - index = current['index'] - gresults.insert(index, res) - - # update every index after the current one - # (including the current one) - for k in categoryPositions: - v = categoryPositions[k]['index'] - if v >= index: - categoryPositions[k]['index'] = v + 1 - - # update this category - current['count'] -= 1 - - else: - # same category - gresults.append(res) - - # update categoryIndex - categoryPositions[category] = {'index': len(gresults), 'count': 8} - - # return gresults - return gresults - - -def merge_two_infoboxes(infobox1, infobox2): - if 'urls' in infobox2: - urls1 = infobox1.get('urls', None) - if urls1 is None: - urls1 = [] - infobox1.set('urls', urls1) - - urlSet = set() - for url in infobox1.get('urls', []): - urlSet.add(url.get('url', None)) - - for url in infobox2.get('urls', []): - if url.get('url', None) not in urlSet: - urls1.append(url) - - if 'attributes' in infobox2: - attributes1 = infobox1.get('attributes', None) - if attributes1 is None: - attributes1 = [] - infobox1.set('attributes', attributes1) - - attributeSet = set() - for attribute in infobox1.get('attributes', []): - if attribute.get('label', None) not in attributeSet: - attributeSet.add(attribute.get('label', None)) - - for attribute in infobox2.get('attributes', []): - attributes1.append(attribute) - - if 'content' in infobox2: - content1 = infobox1.get('content', None) - content2 = infobox2.get('content', '') - if content1 is not None: - if content_result_len(content2) > content_result_len(content1): - infobox1['content'] = content2 - else: - infobox1.set('content', content2) - - -def merge_infoboxes(infoboxes): - results = [] - infoboxes_id = {} - for infobox in infoboxes: - add_infobox = True - infobox_id = infobox.get('id', None) - if infobox_id is not None: - existingIndex = infoboxes_id.get(infobox_id, None) - if existingIndex is not None: - merge_two_infoboxes(results[existingIndex], infobox) - add_infobox = False - - if add_infobox: - results.append(infobox) - infoboxes_id[infobox_id] = len(results) - 1 - - return results - - class Search(object): """Search information container""" @@ -334,10 +135,7 @@ class Search(object): # set blocked engines self.blocked_engines = get_blocked_engines(engines, request.cookies) - self.results = [] - self.suggestions = set() - self.answers = set() - self.infoboxes = [] + self.result_container = ResultContainer() self.request_data = {} # set specific language if set @@ -449,8 +247,6 @@ class Search(object): # init vars requests = [] - results_queue = Queue() - results = {} # increase number of searches number_of_searches += 1 @@ -504,9 +300,9 @@ class Search(object): # create a callback wrapper for the search engine results callback = make_callback( selected_engine['name'], - results_queue, engine.response, - request_params) + request_params, + self.result_container) # create dictionary which contain all # informations about the request @@ -539,42 +335,5 @@ class Search(object): # send all search-request threaded_requests(requests) - while not results_queue.empty(): - engine_name, engine_results = results_queue.get_nowait() - - # TODO type checks - [self.suggestions.add(x['suggestion']) - for x in list(engine_results) - if 'suggestion' in x - and engine_results.remove(x) is None] - - [self.answers.add(x['answer']) - for x in list(engine_results) - if 'answer' in x - and engine_results.remove(x) is None] - - self.infoboxes.extend(x for x in list(engine_results) - if 'infobox' in x - and engine_results.remove(x) is None) - - results[engine_name] = engine_results - - # update engine-specific stats - for engine_name, engine_results in results.items(): - engines[engine_name].stats['search_count'] += 1 - engines[engine_name].stats['result_count'] += len(engine_results) - - # score results and remove duplications - self.results = score_results(results) - - # merge infoboxes according to their ids - self.infoboxes = merge_infoboxes(self.infoboxes) - - # update engine stats, using calculated score - for result in self.results: - for res_engine in result['engines']: - engines[result['engine']]\ - .stats['score_count'] += result['score'] - # return results, suggestions, answers and infoboxes return self |