aboutsummaryrefslogtreecommitdiff
path: root/bindings/python-cffi/tests
diff options
context:
space:
mode:
authorFloris Bruynooghe <flub@google.com>2019-10-08 23:03:12 +0200
committerDavid Bremner <david@tethera.net>2019-12-03 08:12:30 -0400
commit83c2d158983875bf77a9b7662894df585b61741c (patch)
tree8443e3ab530a9cbf00b17c395f03e19138d3bae0 /bindings/python-cffi/tests
parent5f9ea4d2908a597acaf0b809b6f27fa74b70520b (diff)
Introduce CFFI-based python bindings
This introduces CFFI-based Python3-only bindings. The bindings aim at: - Better performance on pypy - Easier to use Python-C interface - More "pythonic" - The API should not allow invalid operations - Use native object protocol where possible - Memory safety; whatever you do from python, it should not coredump.
Diffstat (limited to 'bindings/python-cffi/tests')
-rw-r--r--bindings/python-cffi/tests/conftest.py142
-rw-r--r--bindings/python-cffi/tests/test_base.py116
-rw-r--r--bindings/python-cffi/tests/test_database.py326
-rw-r--r--bindings/python-cffi/tests/test_message.py226
-rw-r--r--bindings/python-cffi/tests/test_tags.py177
-rw-r--r--bindings/python-cffi/tests/test_thread.py102
6 files changed, 1089 insertions, 0 deletions
diff --git a/bindings/python-cffi/tests/conftest.py b/bindings/python-cffi/tests/conftest.py
new file mode 100644
index 00000000..1b7bbc35
--- /dev/null
+++ b/bindings/python-cffi/tests/conftest.py
@@ -0,0 +1,142 @@
+import email.message
+import mailbox
+import pathlib
+import socket
+import subprocess
+import textwrap
+import time
+
+import pytest
+
+
+@pytest.fixture(scope='function')
+def tmppath(tmpdir):
+ """The tmpdir fixture wrapped in pathlib.Path."""
+ return pathlib.Path(str(tmpdir))
+
+
+@pytest.fixture
+def notmuch(maildir):
+ """Return a function which runs notmuch commands on our test maildir.
+
+ This uses the notmuch-config file created by the ``maildir``
+ fixture.
+ """
+ def run(*args):
+ """Run a notmuch comand.
+
+ This function runs with a timeout error as many notmuch
+ commands may block if multiple processes are trying to open
+ the database in write-mode. It is all too easy to
+ accidentally do this in the unittests.
+ """
+ cfg_fname = maildir.path / 'notmuch-config'
+ cmd = ['notmuch'] + list(args)
+ print('Invoking: {}'.format(' '.join(cmd)))
+ proc = subprocess.run(cmd,
+ timeout=5,
+ env={'NOTMUCH_CONFIG': str(cfg_fname)})
+ proc.check_returncode()
+ return run
+
+
+@pytest.fixture
+def maildir(tmppath):
+ """A basic test interface to a valid maildir directory.
+
+ This creates a valid maildir and provides a simple mechanism to
+ deliver test emails to it. It also writes a notmuch-config file
+ in the top of the maildir.
+ """
+ cur = tmppath / 'cur'
+ cur.mkdir()
+ new = tmppath / 'new'
+ new.mkdir()
+ tmp = tmppath / 'tmp'
+ tmp.mkdir()
+ cfg_fname = tmppath/'notmuch-config'
+ with cfg_fname.open('w') as fp:
+ fp.write(textwrap.dedent("""\
+ [database]
+ path={tmppath!s}
+ [user]
+ name=Some Hacker
+ primary_email=dst@example.com
+ [new]
+ tags=unread;inbox;
+ ignore=
+ [search]
+ exclude_tags=deleted;spam;
+ [maildir]
+ synchronize_flags=true
+ [crypto]
+ gpg_path=gpg
+ """.format(tmppath=tmppath)))
+ return MailDir(tmppath)
+
+
+class MailDir:
+ """An interface around a correct maildir."""
+
+ def __init__(self, path):
+ self._path = pathlib.Path(path)
+ self.mailbox = mailbox.Maildir(str(path))
+ self._idcount = 0
+
+ @property
+ def path(self):
+ """The pathname of the maildir."""
+ return self._path
+
+ def _next_msgid(self):
+ """Return a new unique message ID."""
+ msgid = '{}@{}'.format(self._idcount, socket.getfqdn())
+ self._idcount += 1
+ return msgid
+
+ def deliver(self,
+ subject='Test mail',
+ body='This is a test mail',
+ to='dst@example.com',
+ frm='src@example.com',
+ headers=None,
+ new=False, # Move to new dir or cur dir?
+ keywords=None, # List of keywords or labels
+ seen=False, # Seen flag (cur dir only)
+ replied=False, # Replied flag (cur dir only)
+ flagged=False): # Flagged flag (cur dir only)
+ """Deliver a new mail message in the mbox.
+
+ This does only adds the message to maildir, does not insert it
+ into the notmuch database.
+
+ :returns: A tuple of (msgid, pathname).
+ """
+ msgid = self._next_msgid()
+ when = time.time()
+ msg = email.message.EmailMessage()
+ msg.add_header('Received', 'by MailDir; {}'.format(time.ctime(when)))
+ msg.add_header('Message-ID', '<{}>'.format(msgid))
+ msg.add_header('Date', time.ctime(when))
+ msg.add_header('From', frm)
+ msg.add_header('To', to)
+ msg.add_header('Subject', subject)
+ if headers:
+ for h, v in headers:
+ msg.add_header(h, v)
+ msg.set_content(body)
+ mdmsg = mailbox.MaildirMessage(msg)
+ if not new:
+ mdmsg.set_subdir('cur')
+ if flagged:
+ mdmsg.add_flag('F')
+ if replied:
+ mdmsg.add_flag('R')
+ if seen:
+ mdmsg.add_flag('S')
+ boxid = self.mailbox.add(mdmsg)
+ basename = boxid
+ if mdmsg.get_info():
+ basename += mailbox.Maildir.colon + mdmsg.get_info()
+ msgpath = self.path / mdmsg.get_subdir() / basename
+ return (msgid, msgpath)
diff --git a/bindings/python-cffi/tests/test_base.py b/bindings/python-cffi/tests/test_base.py
new file mode 100644
index 00000000..b6d3d62c
--- /dev/null
+++ b/bindings/python-cffi/tests/test_base.py
@@ -0,0 +1,116 @@
+import pytest
+
+from notdb import _base as base
+from notdb import _errors as errors
+
+
+class TestNotmuchObject:
+
+ def test_no_impl_methods(self):
+ class Object(base.NotmuchObject):
+ pass
+ with pytest.raises(TypeError):
+ Object()
+
+ def test_impl_methods(self):
+
+ class Object(base.NotmuchObject):
+
+ def __init__(self):
+ pass
+
+ @property
+ def alive(self):
+ pass
+
+ def _destroy(self, parent=False):
+ pass
+
+ Object()
+
+ def test_del(self):
+ destroyed = False
+
+ class Object(base.NotmuchObject):
+
+ def __init__(self):
+ pass
+
+ @property
+ def alive(self):
+ pass
+
+ def _destroy(self, parent=False):
+ nonlocal destroyed
+ destroyed = True
+
+ o = Object()
+ o.__del__()
+ assert destroyed
+
+
+class TestMemoryPointer:
+
+ @pytest.fixture
+ def obj(self):
+ class Cls:
+ ptr = base.MemoryPointer()
+ return Cls()
+
+ def test_unset(self, obj):
+ with pytest.raises(errors.ObjectDestroyedError):
+ obj.ptr
+
+ def test_set(self, obj):
+ obj.ptr = 'some'
+ assert obj.ptr == 'some'
+
+ def test_cleared(self, obj):
+ obj.ptr = 'some'
+ obj.ptr
+ obj.ptr = None
+ with pytest.raises(errors.ObjectDestroyedError):
+ obj.ptr
+
+ def test_two_instances(self, obj):
+ obj2 = obj.__class__()
+ obj.ptr = 'foo'
+ obj2.ptr = 'bar'
+ assert obj.ptr != obj2.ptr
+
+
+class TestBinString:
+
+ def test_type(self):
+ s = base.BinString(b'foo')
+ assert isinstance(s, str)
+
+ def test_init_bytes(self):
+ s = base.BinString(b'foo')
+ assert s == 'foo'
+
+ def test_init_str(self):
+ s = base.BinString('foo')
+ assert s == 'foo'
+
+ def test_bytes(self):
+ s = base.BinString(b'foo')
+ assert bytes(s) == b'foo'
+
+ def test_invalid_utf8(self):
+ s = base.BinString(b'\x80foo')
+ assert s == 'foo'
+ assert bytes(s) == b'\x80foo'
+
+ def test_errors(self):
+ s = base.BinString(b'\x80foo', errors='replace')
+ assert s == '�foo'
+ assert bytes(s) == b'\x80foo'
+
+ def test_encoding(self):
+ # pound sign: '£' == '\u00a3' latin-1: b'\xa3', utf-8: b'\xc2\xa3'
+ with pytest.raises(UnicodeDecodeError):
+ base.BinString(b'\xa3', errors='strict')
+ s = base.BinString(b'\xa3', encoding='latin-1', errors='strict')
+ assert s == '£'
+ assert bytes(s) == b'\xa3'
diff --git a/bindings/python-cffi/tests/test_database.py b/bindings/python-cffi/tests/test_database.py
new file mode 100644
index 00000000..02de0f41
--- /dev/null
+++ b/bindings/python-cffi/tests/test_database.py
@@ -0,0 +1,326 @@
+import collections
+import configparser
+import os
+import pathlib
+
+import pytest
+
+import notdb
+import notdb._errors as errors
+import notdb._database as dbmod
+import notdb._message as message
+
+
+@pytest.fixture
+def db(maildir):
+ with dbmod.Database.create(maildir.path) as db:
+ yield db
+
+
+class TestDefaultDb:
+ """Tests for reading the default database.
+
+ The error cases are fairly undefined, some relevant Python error
+ will come out if you give it a bad filename or if the file does
+ not parse correctly. So we're not testing this too deeply.
+ """
+
+ def test_config_pathname_default(self, monkeypatch):
+ monkeypatch.delenv('NOTMUCH_CONFIG', raising=False)
+ user = pathlib.Path('~/.notmuch-config').expanduser()
+ assert dbmod._config_pathname() == user
+
+ def test_config_pathname_env(self, monkeypatch):
+ monkeypatch.setenv('NOTMUCH_CONFIG', '/some/random/path')
+ assert dbmod._config_pathname() == pathlib.Path('/some/random/path')
+
+ def test_default_path_nocfg(self, monkeypatch, tmppath):
+ monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath/'foo'))
+ with pytest.raises(FileNotFoundError):
+ dbmod.Database.default_path()
+
+ def test_default_path_cfg_is_dir(self, monkeypatch, tmppath):
+ monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath))
+ with pytest.raises(IsADirectoryError):
+ dbmod.Database.default_path()
+
+ def test_default_path_parseerr(self, monkeypatch, tmppath):
+ cfg = tmppath / 'notmuch-config'
+ with cfg.open('w') as fp:
+ fp.write('invalid')
+ monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
+ with pytest.raises(configparser.Error):
+ dbmod.Database.default_path()
+
+ def test_default_path_parse(self, monkeypatch, tmppath):
+ cfg = tmppath / 'notmuch-config'
+ with cfg.open('w') as fp:
+ fp.write('[database]\n')
+ fp.write('path={!s}'.format(tmppath))
+ monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
+ assert dbmod.Database.default_path() == tmppath
+
+ def test_default_path_param(self, monkeypatch, tmppath):
+ cfg_dummy = tmppath / 'dummy'
+ monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg_dummy))
+ cfg_real = tmppath / 'notmuch_config'
+ with cfg_real.open('w') as fp:
+ fp.write('[database]\n')
+ fp.write('path={!s}'.format(cfg_real/'mail'))
+ assert dbmod.Database.default_path(cfg_real) == cfg_real/'mail'
+
+
+class TestCreate:
+
+ def test_create(self, tmppath, db):
+ assert tmppath.joinpath('.notmuch/xapian/').exists()
+
+ def test_create_already_open(self, tmppath, db):
+ with pytest.raises(errors.NotmuchError):
+ db.create(tmppath)
+
+ def test_create_existing(self, tmppath, db):
+ with pytest.raises(errors.FileError):
+ dbmod.Database.create(path=tmppath)
+
+ def test_close(self, db):
+ db.close()
+
+ def test_del_noclose(self, db):
+ del db
+
+ def test_close_del(self, db):
+ db.close()
+ del db
+
+ def test_closed_attr(self, db):
+ assert not db.closed
+ db.close()
+ assert db.closed
+
+ def test_ctx(self, db):
+ with db as ctx:
+ assert ctx is db
+ assert not db.closed
+ assert db.closed
+
+ def test_path(self, db, tmppath):
+ assert db.path == tmppath
+
+ def test_version(self, db):
+ assert db.version > 0
+
+ def test_needs_upgrade(self, db):
+ assert db.needs_upgrade in (True, False)
+
+
+class TestAtomic:
+
+ def test_exit_early(self, db):
+ with pytest.raises(errors.UnbalancedAtomicError):
+ with db.atomic() as ctx:
+ ctx.force_end()
+
+ def test_exit_late(self, db):
+ with db.atomic() as ctx:
+ pass
+ with pytest.raises(errors.UnbalancedAtomicError):
+ ctx.force_end()
+
+
+class TestRevision:
+
+ def test_single_rev(self, db):
+ r = db.revision()
+ assert isinstance(r, dbmod.DbRevision)
+ assert isinstance(r.rev, int)
+ assert isinstance(r.uuid, bytes)
+ assert r is r
+ assert r == r
+ assert r <= r
+ assert r >= r
+ assert not r < r
+ assert not r > r
+
+ def test_diff_db(self, tmppath):
+ dbpath0 = tmppath.joinpath('db0')
+ dbpath0.mkdir()
+ dbpath1 = tmppath.joinpath('db1')
+ dbpath1.mkdir()
+ db0 = dbmod.Database.create(path=dbpath0)
+ db1 = dbmod.Database.create(path=dbpath1)
+ r_db0 = db0.revision()
+ r_db1 = db1.revision()
+ assert r_db0 != r_db1
+ assert r_db0.uuid != r_db1.uuid
+
+ def test_cmp(self, db, maildir):
+ rev0 = db.revision()
+ _, pathname = maildir.deliver()
+ db.add(pathname, sync_flags=False)
+ rev1 = db.revision()
+ assert rev0 < rev1
+ assert rev0 <= rev1
+ assert not rev0 > rev1
+ assert not rev0 >= rev1
+ assert not rev0 == rev1
+ assert rev0 != rev1
+
+ # XXX add tests for revisions comparisons
+
+class TestMessages:
+
+ def test_add_message(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ msg, dup = db.add(pathname, sync_flags=False)
+ assert isinstance(msg, message.Message)
+ assert msg.path == pathname
+ assert msg.messageid == msgid
+
+ def test_add_message_str(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ msg, dup = db.add(str(pathname), sync_flags=False)
+
+ def test_add_message_bytes(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ msg, dup = db.add(os.fsencode(bytes(pathname)), sync_flags=False)
+
+ def test_remove_message(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ msg, dup = db.add(pathname, sync_flags=False)
+ assert db.find(msgid)
+ dup = db.remove(pathname)
+ with pytest.raises(LookupError):
+ db.find(msgid)
+
+ def test_remove_message_str(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ msg, dup = db.add(pathname, sync_flags=False)
+ assert db.find(msgid)
+ dup = db.remove(str(pathname))
+ with pytest.raises(LookupError):
+ db.find(msgid)
+
+ def test_remove_message_bytes(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ msg, dup = db.add(pathname, sync_flags=False)
+ assert db.find(msgid)
+ dup = db.remove(os.fsencode(bytes(pathname)))
+ with pytest.raises(LookupError):
+ db.find(msgid)
+
+ def test_find_message(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ msg0, dup = db.add(pathname, sync_flags=False)
+ msg1 = db.find(msgid)
+ assert isinstance(msg1, message.Message)
+ assert msg1.messageid == msgid == msg0.messageid
+ assert msg1.path == pathname == msg0.path
+
+ def test_find_message_notfound(self, db):
+ with pytest.raises(LookupError):
+ db.find('foo')
+
+ def test_get_message(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ msg0, _ = db.add(pathname, sync_flags=False)
+ msg1 = db.get(pathname)
+ assert isinstance(msg1, message.Message)
+ assert msg1.messageid == msgid == msg0.messageid
+ assert msg1.path == pathname == msg0.path
+
+ def test_get_message_str(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ db.add(pathname, sync_flags=False)
+ msg = db.get(str(pathname))
+ assert msg.messageid == msgid
+
+ def test_get_message_bytes(self, db, maildir):
+ msgid, pathname = maildir.deliver()
+ db.add(pathname, sync_flags=False)
+ msg = db.get(os.fsencode(bytes(pathname)))
+ assert msg.messageid == msgid
+
+
+class TestTags:
+ # We just want to test this behaves like a set at a hight level.
+ # The set semantics are tested in detail in the test_tags module.
+
+ def test_type(self, db):
+ assert isinstance(db.tags, collections.abc.Set)
+
+ def test_none(self, db):
+ itags = iter(db.tags)
+ with pytest.raises(StopIteration):
+ next(itags)
+ assert len(db.tags) == 0
+ assert not db.tags
+
+ def test_some(self, db, maildir):
+ _, pathname = maildir.deliver()
+ msg, _ = db.add(pathname, sync_flags=False)
+ msg.tags.add('hello')
+ itags = iter(db.tags)
+ assert next(itags) == 'hello'
+ with pytest.raises(StopIteration):
+ next(itags)
+ assert 'hello' in msg.tags
+
+ def test_cache(self, db):
+ assert db.tags is db.tags
+
+ def test_iters(self, db):
+ i1 = iter(db.tags)
+ i2 = iter(db.tags)
+ assert i1 is not i2
+
+
+class TestQuery:
+
+ @pytest.fixture
+ def db(self, maildir, notmuch):
+ """Return a read-only notdb.Database.
+
+ The database will have 3 messages, 2 threads.
+ """
+ msgid, _ = maildir.deliver(body='foo')
+ maildir.deliver(body='bar')
+ maildir.deliver(body='baz',
+ headers=[('In-Reply-To', '<{}>'.format(msgid))])
+ notmuch('new')
+ with dbmod.Database(maildir.path, 'rw') as db:
+ yield db
+
+ def test_count_messages(self, db):
+ assert db.count_messages('*') == 3
+
+ def test_messages_type(self, db):
+ msgs = db.messages('*')
+ assert isinstance(msgs, collections.abc.Iterator)
+
+ def test_message_no_results(self, db):
+ msgs = db.messages('not_a_matching_query')
+ with pytest.raises(StopIteration):
+ next(msgs)
+
+ def test_message_match(self, db):
+ msgs = db.messages('*')
+ msg = next(msgs)
+ assert isinstance(msg, notdb.Message)
+
+ def test_count_threads(self, db):
+ assert db.count_threads('*') == 2
+
+ def test_threads_type(self, db):
+ threads = db.threads('*')
+ assert isinstance(threads, collections.abc.Iterator)
+
+ def test_threads_no_match(self, db):
+ threads = db.threads('not_a_matching_query')
+ with pytest.raises(StopIteration):
+ next(threads)
+
+ def test_threads_match(self, db):
+ threads = db.threads('*')
+ thread = next(threads)
+ assert isinstance(thread, notdb.Thread)
diff --git a/bindings/python-cffi/tests/test_message.py b/bindings/python-cffi/tests/test_message.py
new file mode 100644
index 00000000..56d06f34
--- /dev/null
+++ b/bindings/python-cffi/tests/test_message.py
@@ -0,0 +1,226 @@
+import collections.abc
+import time
+import pathlib
+
+import pytest
+
+import notdb
+
+
+class TestMessage:
+ MaildirMsg = collections.namedtuple('MaildirMsg', ['msgid', 'path'])
+
+ @pytest.fixture
+ def maildir_msg(self, maildir):
+ msgid, path = maildir.deliver()
+ return self.MaildirMsg(msgid, path)
+
+ @pytest.fixture
+ def db(self, maildir):
+ with notdb.Database.create(maildir.path) as db:
+ yield db
+
+ @pytest.fixture
+ def msg(self, db, maildir_msg):
+ msg, dup = db.add(maildir_msg.path, sync_flags=False)
+ yield msg
+
+ def test_type(self, msg):
+ assert isinstance(msg, notdb.NotmuchObject)
+ assert isinstance(msg, notdb.Message)
+
+ def test_alive(self, msg):
+ assert msg.alive
+
+ def test_hash(self, msg):
+ assert hash(msg)
+
+ def test_eq(self, db, msg):
+ copy = db.get(msg.path)
+ assert msg == copy
+
+ def test_messageid_type(self, msg):
+ assert isinstance(msg.messageid, str)
+ assert isinstance(msg.messageid, notdb.BinString)
+ assert isinstance(bytes(msg.messageid), bytes)
+
+ def test_messageid(self, msg, maildir_msg):
+ assert msg.messageid == maildir_msg.msgid
+
+ def test_messageid_find(self, db, msg):
+ copy = db.find(msg.messageid)
+ assert msg.messageid == copy.messageid
+
+ def test_threadid_type(self, msg):
+ assert isinstance(msg.threadid, str)
+ assert isinstance(msg.threadid, notdb.BinString)
+ assert isinstance(bytes(msg.threadid), bytes)
+
+ def test_path_type(self, msg):
+ assert isinstance(msg.path, pathlib.Path)
+
+ def test_path(self, msg, maildir_msg):
+ assert msg.path == maildir_msg.path
+
+ def test_pathb_type(self, msg):
+ assert isinstance(msg.pathb, bytes)
+
+ def test_pathb(self, msg, maildir_msg):
+ assert msg.path == maildir_msg.path
+
+ def test_filenames_type(self, msg):
+ ifn = msg.filenames()
+ assert isinstance(ifn, collections.abc.Iterator)
+
+ def test_filenames(self, msg):
+ ifn = msg.filenames()
+ fn = next(ifn)
+ assert fn == msg.path
+ assert isinstance(fn, pathlib.Path)
+ with pytest.raises(StopIteration):
+ next(ifn)
+ assert list(msg.filenames()) == [msg.path]
+
+ def test_filenamesb_type(self, msg):
+ ifn = msg.filenamesb()
+ assert isinstance(ifn, collections.abc.Iterator)
+
+ def test_filenamesb(self, msg):
+ ifn = msg.filenamesb()
+ fn = next(ifn)
+ assert fn == msg.pathb
+ assert isinstance(fn, bytes)
+ with pytest.raises(StopIteration):
+ next(ifn)
+ assert list(msg.filenamesb()) == [msg.pathb]
+
+ def test_ghost_no(self, msg):
+ assert not msg.ghost
+
+ def test_date(self, msg):
+ # XXX Someone seems to treat things as local time instead of
+ # UTC or the other way around.
+ now = int(time.time())
+ assert abs(now - msg.date) < 3600*24
+
+ def test_header(self, msg):
+ assert msg.header('from') == 'src@example.com'
+
+ def test_header_not_present(self, msg):
+ with pytest.raises(LookupError):
+ msg.header('foo')
+
+ def test_freeze(self, msg):
+ with msg.frozen():
+ msg.tags.add('foo')
+ msg.tags.add('bar')
+ msg.tags.discard('foo')
+ assert 'foo' not in msg.tags
+ assert 'bar' in msg.tags
+
+ def test_freeze_err(self, msg):
+ msg.tags.add('foo')
+ try:
+ with msg.frozen():
+ msg.tags.clear()
+ raise Exception('oops')
+ except Exception:
+ assert 'foo' in msg.tags
+ else:
+ pytest.fail('Context manager did not raise')
+
+ def test_replies_type(self, msg):
+ assert isinstance(msg.replies(), collections.abc.Iterator)
+
+ def test_replies(self, msg):
+ with pytest.raises(StopIteration):
+ next(msg.replies())
+
+
+class TestProperties:
+
+ @pytest.fixture
+ def props(self, maildir):
+ msgid, path = maildir.deliver()
+ with notdb.Database.create(maildir.path) as db:
+ msg, dup = db.add(path, sync_flags=False)
+ yield msg.properties
+
+ def test_type(self, props):
+ assert isinstance(props, collections.abc.MutableMapping)
+
+ def test_add_single(self, props):
+ props['foo'] = 'bar'
+ assert props['foo'] == 'bar'
+ props.add('bar', 'baz')
+ assert props['bar'] == 'baz'
+
+ def test_add_dup(self, props):
+ props.add('foo', 'bar')
+ props.add('foo', 'baz')
+ assert props['foo'] == 'bar'
+ assert (set(props.getall('foo', exact=True))
+ == {('foo', 'bar'), ('foo', 'baz')})
+
+ def test_len(self, props):
+ props.add('foo', 'a')
+ props.add('foo', 'b')
+ props.add('bar', 'a')
+ assert len(props) == 3
+ assert len(props.keys()) == 2
+ assert len(props.values()) == 2
+ assert len(props.items()) == 3
+
+ def test_del(self, props):
+ props.add('foo', 'a')
+ props.add('foo', 'b')
+ del props['foo']
+ with pytest.raises(KeyError):
+ props['foo']
+
+ def test_remove(self, props):
+ props.add('foo', 'a')
+ props.add('foo', 'b')
+ props.remove('foo', 'a')
+ assert props['foo'] == 'b'
+
+ def test_view_abcs(self, props):
+ assert isinstance(props.keys(), collections.abc.KeysView)
+ assert isinstance(props.values(), collections.abc.ValuesView)
+ assert isinstance(props.items(), collections.abc.ItemsView)
+
+ def test_pop(self, props):
+ props.add('foo', 'a')
+ props.add('foo', 'b')
+ val = props.pop('foo')
+ assert val == 'a'
+
+ def test_pop_default(self, props):
+ with pytest.raises(KeyError):
+ props.pop('foo')
+ assert props.pop('foo', 'default') == 'default'
+
+ def test_popitem(self, props):
+ props.add('foo', 'a')
+ assert props.popitem() == ('foo', 'a')
+ with pytest.raises(KeyError):
+ props.popitem()
+
+ def test_clear(self, props):
+ props.add('foo', 'a')
+ props.clear()
+ assert len(props) == 0
+
+ def test_getall(self, props):
+ props.add('foo', 'a')
+ assert set(props.getall('foo')) == {('foo', 'a')}
+
+ def test_getall_prefix(self, props):
+ props.add('foo', 'a')
+ props.add('foobar', 'b')
+ assert set(props.getall('foo')) == {('foo', 'a'), ('foobar', 'b')}
+
+ def test_getall_exact(self, props):
+ props.add('foo', 'a')
+ props.add('foobar', 'b')
+ assert set(props.getall('foo', exact=True)) == {('foo', 'a')}
diff --git a/bindings/python-cffi/tests/test_tags.py b/bindings/python-cffi/tests/test_tags.py
new file mode 100644
index 00000000..0cb42d89
--- /dev/null
+++ b/bindings/python-cffi/tests/test_tags.py
@@ -0,0 +1,177 @@
+"""Tests for the behaviour of immutable and mutable tagsets.
+
+This module tests the Pythonic behaviour of the sets.
+"""
+
+import collections
+import subprocess
+import textwrap
+
+import pytest
+
+from notdb import _database as database
+from notdb import _tags as tags
+
+
+class TestImmutable:
+
+ @pytest.fixture
+ def tagset(self, maildir, notmuch):
+ """An non-empty immutable tagset.
+
+ This will have the default new mail tags: inbox, unread.
+ """
+ maildir.deliver()
+ notmuch('new')
+ with database.Database(maildir.path) as db:
+ yield db.tags
+
+ def test_type(self, tagset):
+ assert isinstance(tagset, tags.ImmutableTagSet)
+ assert isinstance(tagset, collections.abc.Set)
+
+ def test_hash(self, tagset, maildir, notmuch):
+ h0 = hash(tagset)
+ notmuch('tag', '+foo', '*')
+ with database.Database(maildir.path) as db:
+ h1 = hash(db.tags)
+ assert h0 != h1
+
+ def test_eq(self, tagset):
+ assert tagset == tagset
+
+ def test_neq(self, tagset, maildir, notmuch):
+ notmuch('tag', '+foo', '*')
+ with database.Database(maildir.path) as db:
+ assert tagset != db.tags
+
+ def test_contains(self, tagset):
+ print(tuple(tagset))
+ assert 'unread' in tagset
+ assert 'foo' not in tagset
+
+ def test_iter(self, tagset):
+ expected = sorted(['unread', 'inbox'])
+ found = []
+ for tag in tagset:
+ assert isinstance(tag, str)
+ found.append(tag)
+ assert expected == sorted(found)
+
+ def test_special_iter(self, tagset):
+ expected = sorted([b'unread', b'inbox'])
+ found = []
+ for tag in tagset.iter():
+ assert isinstance(tag, bytes)
+ found.append(tag)
+ assert expected == sorted(found)
+
+ def test_special_iter_codec(self, tagset):
+ for tag in tagset.iter(encoding='ascii', errors='surrogateescape'):
+ assert isinstance(tag, str)
+
+ def test_len(self, tagset):
+ assert len(tagset) == 2
+
+ def test_and(self, tagset):
+ common = tagset & {'unread'}
+ assert isinstance(common, set)
+ assert isinstance(common, collections.abc.Set)
+ assert common == {'unread'}
+
+ def test_or(self, tagset):
+ res = tagset | {'foo'}
+ assert isinstance(res, set)
+ assert isinstance(res, collections.abc.Set)
+ assert res == {'unread', 'inbox', 'foo'}
+
+ def test_sub(self, tagset):
+ res = tagset - {'unread'}
+ assert isinstance(res, set)
+ assert isinstance(res, collections.abc.Set)
+ assert res == {'inbox'}
+
+ def test_rsub(self, tagset):
+ res = {'foo', 'unread'} - tagset
+ assert isinstance(res, set)
+ assert isinstance(res, collections.abc.Set)
+ assert res == {'foo'}
+
+ def test_xor(self, tagset):
+ res = tagset ^ {'unread', 'foo'}
+ assert isinstance(res, set)
+ assert isinstance(res, collections.abc.Set)
+ assert res == {'inbox', 'foo'}
+
+ def test_rxor(self, tagset):
+ res = {'unread', 'foo'} ^ tagset
+ assert isinstance(res, set)
+ assert isinstance(res, collections.abc.Set)
+ assert res == {'inbox', 'foo'}
+
+
+class TestMutableTagset:
+
+ @pytest.fixture
+ def tagset(self, maildir, notmuch):
+ """An non-empty mutable tagset.
+
+ This will have the default new mail tags: inbox, unread.
+ """
+ _, pathname = maildir.deliver()
+ notmuch('new')
+ with database.Database(maildir.path,
+ mode=database.Mode.READ_WRITE) as db:
+ msg = db.get(pathname)
+ yield msg.tags
+
+ def test_type(self, tagset):
+ assert isinstance(tagset, collections.abc.MutableSet)
+ assert isinstance(tagset, tags.MutableTagSet)
+
+ def test_hash(self, tagset):
+ assert not isinstance(tagset, collections.abc.Hashable)
+ with pytest.raises(TypeError):
+ hash(tagset)
+
+ def test_add(self, tagset):
+ assert 'foo' not in tagset
+ tagset.add('foo')
+ assert 'foo' in tagset
+
+ def test_discard(self, tagset):
+ assert 'inbox' in tagset
+ tagset.discard('inbox')
+ assert 'inbox' not in tagset
+
+ def test_discard_not_present(self, tagset):
+ assert 'foo' not in tagset
+ tagset.discard('foo')
+
+ def test_clear(self, tagset):
+ assert len(tagset) > 0
+ tagset.clear()
+ assert len(tagset) == 0
+
+ def test_from_maildir_flags(self, maildir, notmuch):
+ _, pathname = maildir.deliver(flagged=True)
+ notmuch('new')
+ with database.Database(maildir.path,
+ mode=database.Mode.READ_WRITE) as db:
+ msg = db.get(pathname)
+ msg.tags.discard('flagged')
+ msg.tags.from_maildir_flags()
+ assert 'flagged' in msg.tags
+
+ def test_to_maildir_flags(self, maildir, notmuch):
+ _, pathname = maildir.deliver(flagged=True)
+ notmuch('new')
+ with database.Database(maildir.path,
+ mode=database.Mode.READ_WRITE) as db:
+ msg = db.get(pathname)
+ flags = msg.path.name.split(',')[-1]
+ assert 'F' in flags
+ msg.tags.discard('flagged')
+ msg.tags.to_maildir_flags()
+ flags = msg.path.name.split(',')[-1]
+ assert 'F' not in flags
diff --git a/bindings/python-cffi/tests/test_thread.py b/bindings/python-cffi/tests/test_thread.py
new file mode 100644
index 00000000..366bd8a5
--- /dev/null
+++ b/bindings/python-cffi/tests/test_thread.py
@@ -0,0 +1,102 @@
+import collections.abc
+import time
+
+import pytest
+
+import notdb
+
+
+@pytest.fixture
+def thread(maildir, notmuch):
+ """Return a single thread with one matched message."""
+ msgid, _ = maildir.deliver(body='foo')
+ maildir.deliver(body='bar',
+ headers=[('In-Reply-To', '<{}>'.format(msgid))])
+ notmuch('new')
+ with notdb.Database(maildir.path) as db:
+ yield next(db.threads('foo'))
+
+
+def test_type(thread):
+ assert isinstance(thread, notdb.Thread)
+ assert isinstance(thread, collections.abc.Iterable)
+
+
+def test_threadid(thread):
+ assert isinstance(thread.threadid, notdb.BinString)
+ assert thread.threadid
+
+
+def test_len(thread):
+ assert len(thread) == 2
+
+
+def test_toplevel_type(thread):
+ assert isinstance(thread.toplevel(), collections.abc.Iterator)
+
+
+def test_toplevel(thread):
+ msgs = thread.toplevel()
+ assert isinstance(next(msgs), notdb.Message)
+ with pytest.raises(StopIteration):
+ next(msgs)
+
+
+def test_toplevel_reply(thread):
+ msg = next(thread.toplevel())
+ assert isinstance(next(msg.replies()), notdb.Message)
+
+
+def test_iter(thread):
+ msgs = list(iter(thread))
+ assert len(msgs) == len(thread)
+ for msg in msgs:
+ assert isinstance(msg, notdb.Message)
+
+
+def test_matched(thread):
+ assert thread.matched == 1
+
+
+def test_authors_type(thread):
+ assert isinstance(thread.authors, notdb.BinString)
+
+
+def test_authors(thread):
+ assert thread.authors == 'src@example.com'
+
+
+def test_subject(thread):
+ assert thread.subject == 'Test mail'
+
+
+def test_first(thread):
+ # XXX Someone seems to treat things as local time instead of
+ # UTC or the other way around.
+ now = int(time.time())
+ assert abs(now - thread.first) < 3600*24
+
+
+def test_last(thread):
+ # XXX Someone seems to treat things as local time instead of
+ # UTC or the other way around.
+ now = int(time.time())
+ assert abs(now - thread.last) < 3600*24
+
+
+def test_first_last(thread):
+ # Sadly we only have second resolution so these will always be the
+ # same time in our tests.
+ assert thread.first <= thread.last
+
+
+def test_tags_type(thread):
+ assert isinstance(thread.tags, notdb.ImmutableTagSet)
+
+
+def test_tags_cache(thread):
+ assert thread.tags is thread.tags
+
+
+def test_tags(thread):
+ assert 'inbox' in thread.tags