summaryrefslogtreecommitdiff
path: root/searx/engines/openstreetmap.py
diff options
context:
space:
mode:
authorAlexandre Flament <alex@al-f.net>2021-06-04 09:35:26 +0200
committerMarkus Heiser <markus.heiser@darmarit.de>2021-06-09 18:08:23 +0200
commitc75425655fdadf9554b97ae0309a6181acd34ce3 (patch)
tree7fee6892d2a64f2c44db8cb35b079bf823991b8e /searx/engines/openstreetmap.py
parent92c8a8829f2e68e7ceb3b4670ebea4c4e6541a7c (diff)
downloadsearxng-c75425655fdadf9554b97ae0309a6181acd34ce3.tar.gz
searxng-c75425655fdadf9554b97ae0309a6181acd34ce3.zip
[enh] openstreetmap / map template: improve results
implements ideas described in #69 * update the engine * use wikidata * update map.html template
Diffstat (limited to 'searx/engines/openstreetmap.py')
-rw-r--r--searx/engines/openstreetmap.py467
1 files changed, 403 insertions, 64 deletions
diff --git a/searx/engines/openstreetmap.py b/searx/engines/openstreetmap.py
index f11aa5f8c..819a2ea1d 100644
--- a/searx/engines/openstreetmap.py
+++ b/searx/engines/openstreetmap.py
@@ -2,11 +2,21 @@
"""
OpenStreetMap (Map)
"""
+# lint: pylint
+# pylint: disable=missing-function-docstring
import re
from json import loads
+from urllib.parse import urlencode
+from functools import partial
+
from flask_babel import gettext
+from searx.data import OSM_KEYS_TAGS, CURRENCIES
+from searx.utils import searx_useragent
+from searx.external_urls import get_external_url
+from searx.engines.wikidata import send_wikidata_query, sparql_string_escape
+
# about
about = {
"website": 'https://www.openstreetmap.org/',
@@ -23,89 +33,418 @@ paging = False
# search-url
base_url = 'https://nominatim.openstreetmap.org/'
-search_string = 'search/{query}?format=json&polygon_geojson=1&addressdetails=1'
-result_base_url = 'https://openstreetmap.org/{osm_type}/{osm_id}'
+search_string = 'search?{query}&polygon_geojson=1&format=jsonv2&addressdetails=1&extratags=1&dedupe=1'
+result_id_url = 'https://openstreetmap.org/{osm_type}/{osm_id}'
+result_lat_lon_url = 'https://www.openstreetmap.org/?mlat={lat}&mlon={lon}&zoom={zoom}&layers=M'
-route_url = 'https://graphhopper.com/maps/?point={}&point={}&locale=en-US&vehicle=car&weighting=fastest&turn_costs=true&use_miles=false&layer=Omniscale' # noqa
+route_url = 'https://graphhopper.com/maps/?point={}&point={}&locale=en-US&vehicle=car&weighting=fastest&turn_costs=true&use_miles=false&layer=Omniscale' # pylint: disable=line-too-long
route_re = re.compile('(?:from )?(.+) to (.+)')
+wikidata_image_sparql = """
+select ?item ?itemLabel ?image ?sign ?symbol ?website ?wikipediaName
+where {
+ values ?item { %WIKIDATA_IDS% }
+ OPTIONAL { ?item wdt:P18|wdt:P8517|wdt:P4291|wdt:P5252|wdt:P3451|wdt:P4640|wdt:P5775|wdt:P2716|wdt:P1801|wdt:P4896 ?image }
+ OPTIONAL { ?item wdt:P1766|wdt:P8505|wdt:P8667 ?sign }
+ OPTIONAL { ?item wdt:P41|wdt:P94|wdt:P154|wdt:P158|wdt:P2910|wdt:P4004|wdt:P5962|wdt:P8972 ?symbol }
+ OPTIONAL { ?item wdt:P856 ?website }
+ SERVICE wikibase:label {
+ bd:serviceParam wikibase:language "%LANGUAGE%,en".
+ ?item rdfs:label ?itemLabel .
+ }
+ OPTIONAL {
+ ?wikipediaUrl schema:about ?item;
+ schema:isPartOf/wikibase:wikiGroup "wikipedia";
+ schema:name ?wikipediaName;
+ schema:inLanguage "%LANGUAGE%" .
+ }
+}
+ORDER by ?item
+"""
+
-# do search-request
-def request(query, params):
+# key value that are link: mapping functions
+# 'mapillary': P1947
+# but https://github.com/kartaview/openstreetcam.org/issues/60
+# but https://taginfo.openstreetmap.org/keys/kartaview ...
+def value_to_https_link(value):
+ http = 'http://'
+ if value.startswith(http):
+ value = 'https://' + value[len(http) :]
+ return (value, value)
+
+
+def value_to_website_link(value):
+ value = value.split(';')[0]
+ return (value, value)
+
+
+def value_wikipedia_link(value):
+ value = value.split(':', 1)
+ return ('https://{0}.wikipedia.org/wiki/{1}'.format(*value), '{1} ({0})'.format(*value))
+
+
+def value_with_prefix(prefix, value):
+ return (prefix + value, value)
+
+
+VALUE_TO_LINK = {
+ 'website': value_to_website_link,
+ 'contact:website': value_to_website_link,
+ 'email': partial(value_with_prefix, 'mailto:'),
+ 'contact:email': partial(value_with_prefix, 'mailto:'),
+ 'contact:phone': partial(value_with_prefix, 'tel:'),
+ 'phone': partial(value_with_prefix, 'tel:'),
+ 'fax': partial(value_with_prefix, 'fax:'),
+ 'contact:fax': partial(value_with_prefix, 'fax:'),
+ 'contact:mastodon': value_to_https_link,
+ 'facebook': value_to_https_link,
+ 'contact:facebook': value_to_https_link,
+ 'contact:foursquare': value_to_https_link,
+ 'contact:instagram': value_to_https_link,
+ 'contact:linkedin': value_to_https_link,
+ 'contact:pinterest': value_to_https_link,
+ 'contact:telegram': value_to_https_link,
+ 'contact:tripadvisor': value_to_https_link,
+ 'contact:twitter': value_to_https_link,
+ 'contact:yelp': value_to_https_link,
+ 'contact:youtube': value_to_https_link,
+ 'contact:webcam': value_to_website_link,
+ 'wikipedia': value_wikipedia_link,
+ 'wikidata': partial(value_with_prefix, 'https://wikidata.org/wiki/'),
+ 'brand:wikidata': partial(value_with_prefix, 'https://wikidata.org/wiki/'),
+}
+KEY_ORDER = [
+ 'cuisine',
+ 'organic',
+ 'delivery',
+ 'delivery:covid19',
+ 'opening_hours',
+ 'opening_hours:covid19',
+ 'fee',
+ 'payment:*',
+ 'currency:*',
+ 'outdoor_seating',
+ 'bench',
+ 'wheelchair',
+ 'level',
+ 'building:levels',
+ 'bin',
+ 'public_transport',
+ 'internet_access:ssid',
+]
+KEY_RANKS = {k: i for i, k in enumerate(KEY_ORDER)}
- params['url'] = base_url + search_string.format(query=query)
- params['route'] = route_re.match(query)
+def request(query, params):
+ """do search-request"""
+ params['url'] = base_url + search_string.format(query=urlencode({'q': query}))
+ params['route'] = route_re.match(query)
+ params['headers']['User-Agent'] = searx_useragent()
return params
-# get response from search-request
def response(resp):
+ """get response from search-request"""
results = []
- json = loads(resp.text)
+ nominatim_json = loads(resp.text)
+ user_language = resp.search_params['language']
if resp.search_params['route']:
- results.append({
- 'answer': gettext('Get directions'),
- 'url': route_url.format(*resp.search_params['route'].groups()),
- })
+ results.append(
+ {
+ 'answer': gettext('Get directions'),
+ 'url': route_url.format(*resp.search_params['route'].groups()),
+ }
+ )
+
+ fetch_wikidata(nominatim_json, user_language)
# parse results
- for r in json:
- if 'display_name' not in r:
+ for result in nominatim_json:
+ title, address = get_title_address(result)
+
+ # ignore result without title
+ if not title:
continue
- title = r['display_name'] or ''
- osm_type = r.get('osm_type', r.get('type'))
- url = result_base_url.format(osm_type=osm_type,
- osm_id=r['osm_id'])
-
- osm = {'type': osm_type,
- 'id': r['osm_id']}
-
- geojson = r.get('geojson')
-
- # if no geojson is found and osm_type is a node, add geojson Point
- if not geojson and osm_type == 'node':
- geojson = {'type': 'Point', 'coordinates': [r['lon'], r['lat']]}
-
- address_raw = r.get('address')
- address = {}
-
- # get name
- if r['class'] == 'amenity' or\
- r['class'] == 'shop' or\
- r['class'] == 'tourism' or\
- r['class'] == 'leisure':
- if address_raw.get('address29'):
- address = {'name': address_raw.get('address29')}
- else:
- address = {'name': address_raw.get(r['type'])}
-
- # add rest of adressdata, if something is already found
- if address.get('name'):
- address.update({'house_number': address_raw.get('house_number'),
- 'road': address_raw.get('road'),
- 'locality': address_raw.get('city',
- address_raw.get('town', # noqa
- address_raw.get('village'))), # noqa
- 'postcode': address_raw.get('postcode'),
- 'country': address_raw.get('country'),
- 'country_code': address_raw.get('country_code')})
- else:
- address = None
+ url, osm, geojson = get_url_osm_geojson(result)
+ img_src = get_img_src(result)
+ links, link_keys = get_links(result, user_language)
+ data = get_data(result, user_language, link_keys)
# append result
- results.append({'template': 'map.html',
- 'title': title,
- 'content': '',
- 'longitude': r['lon'],
- 'latitude': r['lat'],
- 'boundingbox': r['boundingbox'],
- 'geojson': geojson,
- 'address': address,
- 'osm': osm,
- 'url': url})
+ results.append(
+ {
+ 'template': 'map.html',
+ 'title': title,
+ 'address': address,
+ 'address_label': get_key_label('addr', user_language),
+ 'url': url,
+ 'osm': osm,
+ 'geojson': geojson,
+ 'img_src': img_src,
+ 'links': links,
+ 'data': data,
+ 'type': get_tag_label(result.get('category'), result.get('type', ''), user_language),
+ 'type_icon': result.get('icon'),
+ 'content': '',
+ 'longitude': result['lon'],
+ 'latitude': result['lat'],
+ 'boundingbox': result['boundingbox'],
+ }
+ )
# return results
return results
+
+
+def get_wikipedia_image(raw_value):
+ if not raw_value:
+ return None
+ return get_external_url('wikimedia_image', raw_value)
+
+
+def fetch_wikidata(nominatim_json, user_langage):
+ """Update nominatim_json using the result of an unique to wikidata
+
+ For result in nominatim_json:
+ If result['extratags']['wikidata'] or r['extratags']['wikidata link']:
+ Set result['wikidata'] to { 'image': ..., 'image_sign':..., 'image_symbal':... }
+ Set result['extratags']['wikipedia'] if not defined
+ Set result['extratags']['contact:website'] if not defined
+ """
+ wikidata_ids = []
+ wd_to_results = {}
+ for result in nominatim_json:
+ e = result.get("extratags")
+ if e:
+ # ignore brand:wikidata
+ wd_id = e.get("wikidata", e.get("wikidata link"))
+ if wd_id and wd_id not in wikidata_ids:
+ wikidata_ids.append("wd:" + wd_id)
+ wd_to_results.setdefault(wd_id, []).append(result)
+
+ if wikidata_ids:
+ wikidata_ids_str = " ".join(wikidata_ids)
+ query = wikidata_image_sparql.replace('%WIKIDATA_IDS%', sparql_string_escape(wikidata_ids_str)).replace(
+ '%LANGUAGE%', sparql_string_escape(user_langage)
+ )
+ wikidata_json = send_wikidata_query(query)
+ for wd_result in wikidata_json.get('results', {}).get('bindings', {}):
+ wd_id = wd_result['item']['value'].replace('http://www.wikidata.org/entity/', '')
+ for result in wd_to_results.get(wd_id, []):
+ result['wikidata'] = {
+ 'itemLabel': wd_result['itemLabel']['value'],
+ 'image': get_wikipedia_image(wd_result.get('image', {}).get('value')),
+ 'image_sign': get_wikipedia_image(wd_result.get('sign', {}).get('value')),
+ 'image_symbol': get_wikipedia_image(wd_result.get('symbol', {}).get('value')),
+ }
+ # overwrite wikipedia link
+ wikipedia_name = wd_result.get('wikipediaName', {}).get('value')
+ if wikipedia_name:
+ result['extratags']['wikipedia'] = user_langage + ':' + wikipedia_name
+ # get website if not already defined
+ website = wd_result.get('website', {}).get('value')
+ if (
+ website
+ and not result['extratags'].get('contact:website')
+ and not result['extratags'].get('website')
+ ):
+ result['extratags']['contact:website'] = website
+
+
+def get_title_address(result):
+ """Return title and address
+
+ title may be None
+ """
+ address_raw = result.get('address')
+ address_name = None
+ address = {}
+
+ # get name
+ if (
+ result['category'] == 'amenity'
+ or result['category'] == 'shop'
+ or result['category'] == 'tourism'
+ or result['category'] == 'leisure'
+ ):
+ if address_raw.get('address29'):
+ # https://github.com/osm-search/Nominatim/issues/1662
+ address_name = address_raw.get('address29')
+ else:
+ address_name = address_raw.get(result['category'])
+ elif result['type'] in address_raw:
+ address_name = address_raw.get(result['type'])
+
+ # add rest of adressdata, if something is already found
+ if address_name:
+ title = address_name
+ address.update(
+ {
+ 'name': address_name,
+ 'house_number': address_raw.get('house_number'),
+ 'road': address_raw.get('road'),
+ 'locality': address_raw.get(
+ 'city', address_raw.get('town', address_raw.get('village')) # noqa
+ ), # noqa
+ 'postcode': address_raw.get('postcode'),
+ 'country': address_raw.get('country'),
+ 'country_code': address_raw.get('country_code'),
+ }
+ )
+ else:
+ title = result.get('display_name')
+
+ return title, address
+
+
+def get_url_osm_geojson(result):
+ """Get url, osm and geojson
+ """
+ osm_type = result.get('osm_type', result.get('type'))
+ if 'osm_id' not in result:
+ # see https://github.com/osm-search/Nominatim/issues/1521
+ # query example: "EC1M 5RF London"
+ url = result_lat_lon_url.format(lat=result['lat'], lon=result['lon'], zoom=12)
+ osm = {}
+ else:
+ url = result_id_url.format(osm_type=osm_type, osm_id=result['osm_id'])
+ osm = {'type': osm_type, 'id': result['osm_id']}
+
+ geojson = result.get('geojson')
+ # if no geojson is found and osm_type is a node, add geojson Point
+ if not geojson and osm_type == 'node':
+ geojson = {'type': 'Point', 'coordinates': [result['lon'], result['lat']]}
+
+ return url, osm, geojson
+
+
+def get_img_src(result):
+ """Get image URL from either wikidata or r['extratags']"""
+ # wikidata
+ img_src = None
+ if 'wikidata' in result:
+ img_src = result['wikidata']['image']
+ if not img_src:
+ img_src = result['wikidata']['image_symbol']
+ if not img_src:
+ img_src = result['wikidata']['image_sign']
+
+ # img_src
+ if not img_src and result.get('extratags', {}).get('image'):
+ img_src = result['extratags']['image']
+ del result['extratags']['image']
+ if not img_src and result.get('extratags', {}).get('wikimedia_commons'):
+ img_src = get_external_url('wikimedia_image', result['extratags']['wikimedia_commons'])
+ del result['extratags']['wikimedia_commons']
+
+ return img_src
+
+
+def get_links(result, user_language):
+ """Return links from result['extratags']"""
+ links = []
+ link_keys = set()
+ for k, mapping_function in VALUE_TO_LINK.items():
+ raw_value = result['extratags'].get(k)
+ if raw_value:
+ url, url_label = mapping_function(raw_value)
+ if url.startswith('https://wikidata.org'):
+ url_label = result.get('wikidata', {}).get('itemLabel') or url_label
+ links.append(
+ {
+ 'label': get_key_label(k, user_language),
+ 'url': url,
+ 'url_label': url_label,
+ }
+ )
+ link_keys.add(k)
+ return links, link_keys
+
+
+def get_data(result, user_language, ignore_keys):
+ """Return key, value of result['extratags']
+
+ Must be call after get_links
+
+ Note: the values are not translated
+ """
+ data = []
+ for k, v in result['extratags'].items():
+ if k in ignore_keys:
+ continue
+ if get_key_rank(k) is None:
+ continue
+ k_label = get_key_label(k, user_language)
+ if k_label:
+ data.append(
+ {
+ 'label': k_label,
+ 'key': k,
+ 'value': v,
+ }
+ )
+ data.sort(key=lambda entry: (get_key_rank(entry['key']), entry['label']))
+ return data
+
+
+def get_key_rank(k):
+ """Get OSM key rank
+
+ The rank defines in which order the key are displayed in the HTML result
+ """
+ key_rank = KEY_RANKS.get(k)
+ if key_rank is None:
+ # "payment:*" in KEY_ORDER matches "payment:cash", "payment:debit card", etc...
+ key_rank = KEY_RANKS.get(k.split(':')[0] + ':*')
+ return key_rank
+
+
+def get_label(labels, lang):
+ """Get label from labels in OSM_KEYS_TAGS
+
+ in OSM_KEYS_TAGS, labels have key == '*'
+ """
+ tag_label = labels.get(lang.lower())
+ if tag_label is None:
+ # example: if 'zh-hk' is not found, check 'zh'
+ tag_label = labels.get(lang.split('-')[0])
+ if tag_label is None and lang != 'en':
+ # example: if 'zh' is not found, check 'en'
+ tag_label = labels.get('en')
+ if tag_label is None and len(labels.values()) > 0:
+ # example: if still not found, use the first entry
+ tag_label = labels.values()[0]
+ return tag_label
+
+
+def get_tag_label(tag_category, tag_name, lang):
+ """Get tag label from OSM_KEYS_TAGS"""
+ tag_name = '' if tag_name is None else tag_name
+ tag_labels = OSM_KEYS_TAGS['tags'].get(tag_category, {}).get(tag_name, {})
+ return get_label(tag_labels, lang)
+
+
+def get_key_label(key_name, lang):
+ """Get key label from OSM_KEYS_TAGS"""
+ if key_name.startswith('currency:'):
+ # currency:EUR --> get the name from the CURRENCIES variable
+ # see https://wiki.openstreetmap.org/wiki/Key%3Acurrency
+ # and for exampe https://taginfo.openstreetmap.org/keys/currency:EUR#values
+ # but there is also currency=EUR (currently not handled)
+ # https://taginfo.openstreetmap.org/keys/currency#values
+ currency = key_name.split(':')
+ if len(currency) > 1:
+ o = CURRENCIES['iso4217'].get(currency)
+ if o:
+ return get_label(o, lang).lower()
+ return currency
+
+ labels = OSM_KEYS_TAGS['keys']
+ for k in key_name.split(':') + ['*']:
+ labels = labels.get(k)
+ if labels is None:
+ return None
+ return get_label(labels, lang)