#!/usr/bin/env python3 import json import mimetypes import os import re import sys import xml.etree.cElementTree as ET from collections import OrderedDict from operator import getitem from datetime import date, timedelta from flask import Flask, request, Response, render_template, send_file from xml.dom import minidom abs_path = os.path.dirname(os.path.abspath(__file__)) app = Flask(__name__) app.config.from_pyfile(os.path.join(abs_path, 'app.cfg')) cache_path = os.path.join(abs_path, 'cache') json_path = os.path.join(cache_path, 'audiobooks.json') def read_cache(json_path): ''' Populate books dict from cache at :json_path: ''' if os.path.exists(json_path): try: with open(json_path, 'r') as cache: books = json.load(cache) books = OrderedDict(sorted( books.items(), key=lambda x: x[1]['title'] )) except Exception: raise ValueError('error loading JSON cache') else: raise ValueError('cache not found, run rebuild.py') return books def check_auth(username, password): ''' Authenticate against configured user/pass ''' ret = (username == app.config['USERNAME'] and password == app.config['PASSWORD']) return ret def escape(s): ''' Ensure XML-safety of attribute values ''' s = s.replace('&', '&') s = s.replace('<', '<') s = s.replace('>', '>') s = s.replace('\'', '"') # https://stackoverflow.com/a/22273639 illegal_unichrs = [ (0x00, 0x08), (0x0B, 0x0C), (0x0E, 0x1F), (0x7F, 0x84), (0x86, 0x9F), (0xFDD0, 0xFDDF), (0xFFFE, 0xFFFF), (0xA9, 0xA9) ] if sys.maxunicode >= 0x10000: illegal_unichrs.extend( [(0x1FFFE, 0x1FFFF), (0x2FFFE, 0x2FFFF), (0x3FFFE, 0x3FFFF), (0x4FFFE, 0x4FFFF), (0x5FFFE, 0x5FFFF), (0x6FFFE, 0x6FFFF), (0x7FFFE, 0x7FFFF), (0x8FFFE, 0x8FFFF), (0x9FFFE, 0x9FFFF), (0xAFFFE, 0xAFFFF), (0xBFFFE, 0xBFFFF), (0xCFFFE, 0xCFFFF), (0xDFFFE, 0xDFFFF), (0xEFFFE, 0xEFFFF), (0xFFFFE, 0xFFFFF), (0x10FFFE, 0x10FFFF)] ) illegal_ranges = ["%s-%s" % (chr(low), chr(high)) for (low, high) in illegal_unichrs] illegal_xml_chars_RE = re.compile(u'[%s]' % u''.join(illegal_ranges)) s = illegal_xml_chars_RE.sub('', s) return s def prettify(elem): ''' Make our RSS feed picturesque :) ''' xml_str = ET.tostring(elem, encoding='utf8', method='xml') xml_dom = minidom.parseString(xml_str) return xml_dom.toprettyxml(indent=' ') @app.route('/') def list_books(): ''' Book listing and audiobook RSS/file download :a: audiobook hash; if provided without :f: (file) return RSS :f: file hash; requires associated audiobook (:a:) to download Listing of audiobooks returned if no params provided ''' books = read_cache(json_path) a = request.args.get('a') # audiobook hash f = request.args.get('f') # file hash # audiobook and file parameters provided: serve up file if a and f: if not books.get(a) or not books[a]['files'].get(f): return 'book or file not found', 404 f_path = books[a]['files'][f]['path'] return send_file(f_path, conditional=True) # serve up audiobook RSS feed; only audiobook hash provided elif a: if not books.get(a): return 'book not found', 404 # we only make use of the itunes ns, others provided for posterity namespaces = { 'itunes':'http://www.itunes.com/dtds/podcast-1.0.dtd', 'googleplay':'http://www.google.com/schemas/play-podcasts/1.0', 'atom':'http://www.w3.org/2005/Atom', 'media':'http://search.yahoo.com/mrss/', 'content':'http://purl.org/rss/1.0/modules/content/', } rss = ET.Element('rss') for k, v in namespaces.items(): rss.set('xmlns:%s' % k, v) rss.set('version', '2.0') channel = ET.SubElement(rss, 'channel') book_title = ET.SubElement(channel, 'title') book_title.text = escape(books[a]['title']) # use filename sort if ignore_tracknum file present in book dir ignore_tracknum = os.path.join(books[a]['path'], 'ignore_tracknum') if os.path.exists(ignore_tracknum): # remove leading zeros from digits (natural sort) conv = lambda s: [int(x) if x.isdigit() else x.lower() for x in re.split('(\d+)', s)] key = lambda x: conv(books[a]['files'][x]['filename']) else: # sort by track number, alphanumerically if track is absent track_list = [] # account for duplicates for a_file in books[a]['files']: track = books[a]['files'][a_file]['track'] if not track or track in track_list: # remove leading zeros from digits (natural sort) conv = lambda s: [int(x) if x.isdigit() else x.lower() for x in re.split('(\d+)', s)] key = lambda x: conv(books[a]['files'][x]['filename']) break track_list.append(track) else: # we have populated and unique track values, use those key = lambda x: books[a]['files'][x]['track'] # populate XML attribute values required by Apple podcasts for idx, f in enumerate(sorted(books[a]['files'], key=key)): item = ET.SubElement(channel, 'item') title = ET.SubElement(item, 'title') title.text = escape(books[a]['files'][f]['title']) author = ET.SubElement(item, 'itunes:author') author.text = escape(books[a]['files'][f]['author']) category = ET.SubElement(item, 'itunes:category') category.text = 'Book' explicit = ET.SubElement(item, 'itunes:explicit') explicit.text = 'no' summary = ET.SubElement(item, 'itunes:summary') summary.text = 'Audiobook served by audiobook-rss' description = ET.SubElement(item, 'description') description.text = 'Audiobook served by audiobook-rss' duration = ET.SubElement(item, 'itunes:duration') duration.text = str(books[a]['files'][f]['duration_str']) guid = ET.SubElement(item, 'guid', isPermaLink='false') guid.text = f # file hash # pubDate descending, day decremented w/ each iteration pub_date = ET.SubElement(item, 'pubDate') pub_date.text = (date(2000, 12, 31) - timedelta(days=idx)).ctime() enc_attr = { 'url': '{}?a={}&f={}'.format( request.base_url, a, f), 'length': str(books[a]['files'][f]['size_bytes']), 'type': 'audio/mpeg' } ET.SubElement(item, 'enclosure', enc_attr) return Response(prettify(rss), mimetype='text/xml') else: auth = request.authorization if not auth or not check_auth(auth.username, auth.password): form = {'WWW-Authenticate': 'Basic realm="o/"'} return Response('unauthorized', 401, form) return render_template('index.html', books=books) if __name__ == '__main__': app.run(host='127.0.0.1', port='8085', threaded=True)