aboutsummaryrefslogtreecommitdiff
path: root/contrib/carddav-query
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/carddav-query')
-rwxr-xr-xcontrib/carddav-query268
1 files changed, 268 insertions, 0 deletions
diff --git a/contrib/carddav-query b/contrib/carddav-query
new file mode 100755
index 00000000..f7eaa793
--- /dev/null
+++ b/contrib/carddav-query
@@ -0,0 +1,268 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: MIT
+# Copyright (c) 2023 Robin Jarry
+
+"""
+Query a CardDAV server for contact names and emails.
+"""
+
+import argparse
+import base64
+import configparser
+import os
+import re
+import subprocess
+import sys
+import xml.etree.ElementTree as xml
+from urllib import error, parse, request
+
+
+def main():
+ try:
+ args = parse_args()
+
+ C = "urn:ietf:params:xml:ns:carddav"
+ D = "DAV:"
+ xml.register_namespace("C", C)
+ xml.register_namespace("D", D)
+
+ # perform the actual address book query
+ query = xml.Element(f"{{{C}}}addressbook-query")
+ prop = xml.SubElement(query, f"{{{D}}}prop")
+ xml.SubElement(prop, f"{{{D}}}getetag")
+ data = xml.SubElement(prop, f"{{{C}}}address-data")
+ xml.SubElement(data, f"{{{C}}}prop", name="FN")
+ xml.SubElement(data, f"{{{C}}}prop", name="EMAIL")
+ limit = xml.SubElement(query, f"{{{C}}}limit")
+ xml.SubElement(limit, f"{{{C}}}nresults").text = str(args.limit)
+ filtre = xml.SubElement(query, f"{{{C}}}filter", test="anyof")
+ for term in args.terms:
+ for attr in "FN", "EMAIL", "NICKNAME", "ORG", "TITLE":
+ prop = xml.SubElement(filtre, f"{{{C}}}prop-filter", name=attr)
+ match = xml.SubElement(
+ prop, f"{{{C}}}text-match", {"match-type": "contains"}
+ )
+ match.text = term
+ data = http_request_xml(
+ "REPORT",
+ args.server_url,
+ query,
+ username=args.username,
+ password=args.password,
+ debug=args.verbose,
+ Depth="1",
+ )
+ for vcard in data.iterfind(f".//{{{C}}}address-data"):
+ for name, email in parse_vcard(vcard.text.strip()):
+ print(f"{email}\t{name}")
+
+ except Exception as e:
+ if isinstance(e, error.HTTPError):
+ if args.verbose:
+ debug_response(e.fp)
+ e = e.fp.read().decode()
+ print(f"error: {e}", file=sys.stderr)
+ sys.exit(1)
+
+
+def http_request_xml(
+ method: str,
+ url: str,
+ data: xml.Element,
+ username: str = None,
+ password: str = None,
+ debug: bool = False,
+ **headers,
+) -> xml.Element:
+ req = request.Request(
+ url=url,
+ method=method,
+ headers={
+ "Content-Type": 'text/xml; charset="utf-8"',
+ **headers,
+ },
+ data=xml.tostring(data, encoding="utf-8", xml_declaration=True),
+ )
+ if username is not None and password is not None:
+ auth = f"{username}:{password}"
+ auth = base64.standard_b64encode(auth.encode("utf-8")).decode("ascii")
+ req.add_header("Authorization", f"Basic {auth}")
+
+ if debug:
+ uri = parse.urlparse(req.full_url)
+ print(f"> {req.method} {uri.path} HTTP/1.1", file=sys.stderr)
+ print(f"> Host: {uri.hostname}", file=sys.stderr)
+ for name, value in req.headers.items():
+ print(f"> {name}: {value}", file=sys.stderr)
+ print(f"{req.data.decode('utf-8')}\n", file=sys.stderr)
+
+ with request.urlopen(req) as resp:
+ data = resp.read().decode("utf-8")
+ if debug:
+ debug_response(resp)
+ print(f"{data}", file=sys.stderr)
+
+ return xml.fromstring(data)
+
+
+def debug_response(resp):
+ print(f"< HTTP/1.1 {resp.code}", file=sys.stderr)
+ for name, value in resp.headers.items():
+ print(f"< {name}: {value}", file=sys.stderr)
+
+
+def parse_vcard(txt):
+ lines = txt.splitlines()
+ if len(lines) < 4 or lines[0] != "BEGIN:VCARD" or lines[-1] != "END:VCARD":
+ return
+ name = None
+ emails = []
+ for line in lines[1:-1]:
+ if line.startswith("FN:"):
+ name = line[len("FN:") :].replace("\\,", ",")
+ continue
+ match = re.match(r"^(?:ITEM\d+\.)?EMAIL(?:;[\w-]+=[^;:]+)*:(.+@.+)$", line)
+ if match:
+ email = match.group(1).lower().replace("\\,", ",")
+ if email not in emails:
+ if "TYPE=pref" in line or "PREF=1" in line:
+ emails.insert(0, email)
+ else:
+ emails.append(email)
+ if name is not None:
+ for e in emails:
+ yield name, e
+
+
+def parse_args():
+ parser = argparse.ArgumentParser(description=__doc__)
+ parser.add_argument(
+ "-l",
+ "--limit",
+ default=10,
+ type=int,
+ help="""
+ Maximum number of results returned by the server (default: 10).
+ If the server does not support limiting, this will be disregarded.
+ """,
+ )
+ parser.add_argument(
+ "-v",
+ "--verbose",
+ action="store_true",
+ help="""
+ Print debug info on stderr.
+ """,
+ )
+ parser.add_argument(
+ "-c",
+ "--config-file",
+ metavar="FILE",
+ default=os.path.expanduser("~/.config/aerc/accounts.conf"),
+ help="""
+ INI configuration file from which to read the CardDAV URL endpoint
+ (default: ~/.config/aerc/accounts.conf).
+ """,
+ )
+ parser.add_argument(
+ "-S",
+ "--config-section",
+ metavar="SECTION",
+ help="""
+ INI configuration section where to find CONFIG_KEY. By default the
+ first section where CONFIG_KEY is found will be used.
+ """,
+ )
+ parser.add_argument(
+ "-k",
+ "--config-key-source",
+ metavar="KEY_SOURCE",
+ default="carddav-source",
+ help="""
+ INI configuration key to lookup in CONFIG_SECTION from CONFIG_FILE.
+ The value must respect the following format:
+ https?://USERNAME[:PASSWORD]@HOSTNAME/PATH/TO/ADDRESSBOOK.
+ Both USERNAME and PASSWORD must be percent encoded.
+ """,
+ )
+ parser.add_argument(
+ "-C",
+ "--config-key-cred-cmd",
+ metavar="KEY_CRED_CMD",
+ default="carddav-source-cred-cmd",
+ help="""
+ INI configuration key to lookup in CONFIG_SECTION from CONFIG_FILE. The
+ value is a command that will be used to determine PASSWORD if it is not
+ present in CONFIG_KEY_SOURCE.
+ """,
+ )
+ parser.add_argument(
+ "-s",
+ "--server-url",
+ help="""
+ CardDAV server URL endpoint. Overrides configuration file.
+ """,
+ )
+ parser.add_argument(
+ "-u",
+ "--username",
+ help="""
+ Username to authenticate on the server. Overrides configuration file.
+ """,
+ )
+ parser.add_argument(
+ "-p",
+ "--password",
+ help="""
+ Password for the specified user. Overrides configuration file.
+ """,
+ )
+ parser.add_argument(
+ "terms",
+ nargs="+",
+ metavar="TERM",
+ help="""
+ Search term. Will be used to search contacts from their FN (formatted
+ name), EMAIL, NICKNAME, ORG (company) and TITLE fields.
+ """,
+ )
+ args = parser.parse_args()
+
+ cfg = configparser.RawConfigParser(strict=False)
+ cfg.read([args.config_file])
+ source = cred_cmd = None
+ if args.config_section:
+ source = cfg.get(args.config_section, args.config_key_source, fallback=None)
+ cred_cmd = cfg.get(args.config_section, args.config_key_cred_cmd, fallback=None)
+ else:
+ for sec in cfg.sections():
+ source = cfg.get(sec, args.config_key_source, fallback=None)
+ if source is not None:
+ cred_cmd = cfg.get(sec, args.config_key_cred_cmd, fallback=None)
+ break
+ if source is not None:
+ try:
+ u = parse.urlparse(source)
+ if args.username is None:
+ args.username = u.username
+ if args.password is None:
+ args.password = u.password
+ if not args.password and cred_cmd is not None:
+ args.password = subprocess.check_output(
+ cred_cmd, shell=True, text=True, encoding="utf-8"
+ ).strip()
+ if args.server_url is None:
+ args.server_url = f"{u.scheme}://{u.hostname}"
+ if u.port is not None:
+ args.server_url += f":{u.port}"
+ args.server_url += u.path
+ except ValueError as e:
+ parser.error(f"{args.config_file}: {e}")
+ if args.server_url is None:
+ parser.error("SERVER_URL is required")
+
+ return args
+
+
+if __name__ == "__main__":
+ main()