#!/usr/bin/python # # Copyright (c) 2011-2012 David Bremner # License: Same as notmuch # dependencies # - python 2.6 for json # - argparse; either python 2.7, or install separately from __future__ import print_function from __future__ import unicode_literals import codecs import collections import datetime import email.utils try: # Python 3 from urllib.parse import quote except ImportError: # Python 2 from urllib import quote import json import argparse import os import re import sys import subprocess import xml.sax.saxutils _ENCODING = 'UTF-8' _PAGES = {} if not hasattr(collections, 'OrderedDict'): # Python 2.6 or earlier class _OrderedDict (dict): "Just enough of a stub to get through Page._get_threads" def __init__(self, *args, **kwargs): super(_OrderedDict, self).__init__(*args, **kwargs) self._keys = [] # record key order def __setitem__(self, key, value): super(_OrderedDict, self).__setitem__(key, value) self._keys.append(key) def values(self): for key in self._keys: yield self[key] collections.OrderedDict = _OrderedDict def read_config(path=None, encoding=None): "Read config from json file" if not encoding: encoding = _ENCODING if path: fp = open(path) else: nmbhome = os.getenv('NMBGIT', os.path.expanduser('~/.nmbug')) # read only the first line from the pipe sha1_bytes = subprocess.Popen( ['git', '--git-dir', nmbhome, 'show-ref', '-s', 'config'], stdout=subprocess.PIPE).stdout.readline() sha1 = sha1_bytes.decode(encoding).rstrip() fp_byte_stream = subprocess.Popen( ['git', '--git-dir', nmbhome, 'cat-file', 'blob', sha1+':status-config.json'], stdout=subprocess.PIPE).stdout fp = codecs.getreader(encoding=encoding)(stream=fp_byte_stream) return json.load(fp) class Thread (list): def __init__(self): self.running_data = {} class Page (object): def __init__(self, header=None, footer=None): self.header = header self.footer = footer def write(self, database, views, stream=None): if not stream: try: # Python 3 byte_stream = sys.stdout.buffer except AttributeError: # Python 2 byte_stream = sys.stdout stream = codecs.getwriter(encoding=_ENCODING)(stream=byte_stream) self._write_header(views=views, stream=stream) for view in views: self._write_view(database=database, view=view, stream=stream) self._write_footer(views=views, stream=stream) def _write_header(self, views, stream): if self.header: stream.write(self.header) def _write_footer(self, views, stream): if self.footer: stream.write(self.footer) def _write_view(self, database, view, stream): if 'query-string' not in view: query = view['query'] view['query-string'] = ' and '.join(query) q = notmuch.Query(database, view['query-string']) q.set_sort(notmuch.Query.SORT.OLDEST_FIRST) threads = self._get_threads(messages=q.search_messages()) self._write_view_header(view=view, stream=stream) self._write_threads(threads=threads, stream=stream) def _get_threads(self, messages): threads = collections.OrderedDict() for message in messages: thread_id = message.get_thread_id() if thread_id in threads: thread = threads[thread_id] else: thread = Thread() threads[thread_id] = thread thread.running_data, display_data = self._message_display_data( running_data=thread.running_data, message=message) thread.append(display_data) return list(threads.values()) def _write_view_header(self, view, stream): pass def _write_threads(self, threads, stream): for thread in threads: for message_display_data in thread: stream.write( ('{date:10.10s} {from:20.20s} {subject:40.40s}\n' '{message-id-term:>72}\n' ).format(**message_display_data)) if thread != threads[-1]: stream.write('\n') def _message_display_data(self, running_data, message): headers = ('thread-id', 'message-id', 'date', 'from', 'subject') data = {} for header in headers: if header == 'thread-id': value = message.get_thread_id() elif header == 'message-id': value = message.get_message_id() data['message-id-term'] = 'id:"{0}"'.format(value) elif header == 'date': value = str(datetime.datetime.utcfromtimestamp( message.get_date()).date()) else: value = message.get_header(header) if header == 'from': (value, addr) = email.utils.parseaddr(value) if not value: value = addr.split('@')[0] data[header] = value next_running_data = data.copy() for header, value in data.items(): if header in ['message-id', 'subject']: continue if value == running_data.get(header, None): data[header] = '' return (next_running_data, data) class HtmlPage (Page): _slug_regexp = re.compile('\W+') def _write_header(self, views, stream): super(HtmlPage, self)._write_header(views=views, stream=stream) stream.write('\n') def _write_view_header(self, view, stream): stream.write('

{title}

\n'.format(**view)) stream.write('

\n') if 'comment' in view: stream.write(view['comment']) stream.write('\n') for line in [ 'The view is generated from the following query:', '

', '

', ' ', view['query-string'], ' ', '

', ]: stream.write(line) stream.write('\n') def _write_threads(self, threads, stream): if not threads: return stream.write('\n') for thread in threads: stream.write(' \n') for message_display_data in thread: stream.write(( ' \n' ' \n' ' \n' ' \n' ' \n' ' \n' ' \n' ' \n' ).format(**message_display_data)) stream.write(' \n') if thread != threads[-1]: stream.write( ' \n') stream.write('
{date}{message-id-term}
{from}{subject}

\n') def _message_display_data(self, *args, **kwargs): running_data, display_data = super( HtmlPage, self)._message_display_data( *args, **kwargs) if 'subject' in display_data and 'message-id' in display_data: d = { 'message-id': quote(display_data['message-id']), 'subject': xml.sax.saxutils.escape(display_data['subject']), } display_data['subject'] = ( '{subject}' ).format(**d) for key in ['message-id', 'from']: if key in display_data: display_data[key] = xml.sax.saxutils.escape(display_data[key]) return (running_data, display_data) def _slug(self, string): return self._slug_regexp.sub('-', string) parser = argparse.ArgumentParser() parser.add_argument('--text', help='output plain text format', action='store_true') parser.add_argument('--config', help='load config from given file', metavar='PATH') parser.add_argument('--list-views', help='list views', action='store_true') parser.add_argument('--get-query', help='get query for view', metavar='VIEW') args = parser.parse_args() config = read_config(path=args.config) _PAGES['text'] = Page() _PAGES['html'] = HtmlPage( header=''' {title}

{title}

Generated: {date}
{blurb}

Views

'''.format(date=datetime.datetime.utcnow().date(), title=config['meta']['title'], blurb=config['meta']['blurb'], encoding=_ENCODING, inter_message_padding='0.25em', border_radius='0.5em'), footer='\n\n', ) if args.list_views: for view in config['views']: print(view['title']) sys.exit(0) elif args.get_query != None: for view in config['views']: if args.get_query == view['title']: print(' and '.join(view['query'])) sys.exit(0) else: # only import notmuch if needed import notmuch if args.text: page = _PAGES['text'] else: page = _PAGES['html'] db = notmuch.Database(mode=notmuch.Database.MODE.READ_ONLY) page.write(database=db, views=config['views'])