diff options
| author | Floris Bruynooghe <flub@google.com> | 2019-10-08 23:03:12 +0200 |
|---|---|---|
| committer | David Bremner <david@tethera.net> | 2019-12-03 08:12:30 -0400 |
| commit | 83c2d158983875bf77a9b7662894df585b61741c (patch) | |
| tree | 8443e3ab530a9cbf00b17c395f03e19138d3bae0 /bindings/python-cffi/tests | |
| parent | 5f9ea4d2908a597acaf0b809b6f27fa74b70520b (diff) | |
Introduce CFFI-based python bindings
This introduces CFFI-based Python3-only bindings.
The bindings aim at:
- Better performance on pypy
- Easier to use Python-C interface
- More "pythonic"
- The API should not allow invalid operations
- Use native object protocol where possible
- Memory safety; whatever you do from python, it should not coredump.
Diffstat (limited to 'bindings/python-cffi/tests')
| -rw-r--r-- | bindings/python-cffi/tests/conftest.py | 142 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_base.py | 116 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_database.py | 326 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_message.py | 226 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_tags.py | 177 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_thread.py | 102 |
6 files changed, 1089 insertions, 0 deletions
diff --git a/bindings/python-cffi/tests/conftest.py b/bindings/python-cffi/tests/conftest.py new file mode 100644 index 00000000..1b7bbc35 --- /dev/null +++ b/bindings/python-cffi/tests/conftest.py @@ -0,0 +1,142 @@ +import email.message +import mailbox +import pathlib +import socket +import subprocess +import textwrap +import time + +import pytest + + +@pytest.fixture(scope='function') +def tmppath(tmpdir): + """The tmpdir fixture wrapped in pathlib.Path.""" + return pathlib.Path(str(tmpdir)) + + +@pytest.fixture +def notmuch(maildir): + """Return a function which runs notmuch commands on our test maildir. + + This uses the notmuch-config file created by the ``maildir`` + fixture. + """ + def run(*args): + """Run a notmuch comand. + + This function runs with a timeout error as many notmuch + commands may block if multiple processes are trying to open + the database in write-mode. It is all too easy to + accidentally do this in the unittests. + """ + cfg_fname = maildir.path / 'notmuch-config' + cmd = ['notmuch'] + list(args) + print('Invoking: {}'.format(' '.join(cmd))) + proc = subprocess.run(cmd, + timeout=5, + env={'NOTMUCH_CONFIG': str(cfg_fname)}) + proc.check_returncode() + return run + + +@pytest.fixture +def maildir(tmppath): + """A basic test interface to a valid maildir directory. + + This creates a valid maildir and provides a simple mechanism to + deliver test emails to it. It also writes a notmuch-config file + in the top of the maildir. + """ + cur = tmppath / 'cur' + cur.mkdir() + new = tmppath / 'new' + new.mkdir() + tmp = tmppath / 'tmp' + tmp.mkdir() + cfg_fname = tmppath/'notmuch-config' + with cfg_fname.open('w') as fp: + fp.write(textwrap.dedent("""\ + [database] + path={tmppath!s} + [user] + name=Some Hacker + primary_email=dst@example.com + [new] + tags=unread;inbox; + ignore= + [search] + exclude_tags=deleted;spam; + [maildir] + synchronize_flags=true + [crypto] + gpg_path=gpg + """.format(tmppath=tmppath))) + return MailDir(tmppath) + + +class MailDir: + """An interface around a correct maildir.""" + + def __init__(self, path): + self._path = pathlib.Path(path) + self.mailbox = mailbox.Maildir(str(path)) + self._idcount = 0 + + @property + def path(self): + """The pathname of the maildir.""" + return self._path + + def _next_msgid(self): + """Return a new unique message ID.""" + msgid = '{}@{}'.format(self._idcount, socket.getfqdn()) + self._idcount += 1 + return msgid + + def deliver(self, + subject='Test mail', + body='This is a test mail', + to='dst@example.com', + frm='src@example.com', + headers=None, + new=False, # Move to new dir or cur dir? + keywords=None, # List of keywords or labels + seen=False, # Seen flag (cur dir only) + replied=False, # Replied flag (cur dir only) + flagged=False): # Flagged flag (cur dir only) + """Deliver a new mail message in the mbox. + + This does only adds the message to maildir, does not insert it + into the notmuch database. + + :returns: A tuple of (msgid, pathname). + """ + msgid = self._next_msgid() + when = time.time() + msg = email.message.EmailMessage() + msg.add_header('Received', 'by MailDir; {}'.format(time.ctime(when))) + msg.add_header('Message-ID', '<{}>'.format(msgid)) + msg.add_header('Date', time.ctime(when)) + msg.add_header('From', frm) + msg.add_header('To', to) + msg.add_header('Subject', subject) + if headers: + for h, v in headers: + msg.add_header(h, v) + msg.set_content(body) + mdmsg = mailbox.MaildirMessage(msg) + if not new: + mdmsg.set_subdir('cur') + if flagged: + mdmsg.add_flag('F') + if replied: + mdmsg.add_flag('R') + if seen: + mdmsg.add_flag('S') + boxid = self.mailbox.add(mdmsg) + basename = boxid + if mdmsg.get_info(): + basename += mailbox.Maildir.colon + mdmsg.get_info() + msgpath = self.path / mdmsg.get_subdir() / basename + return (msgid, msgpath) diff --git a/bindings/python-cffi/tests/test_base.py b/bindings/python-cffi/tests/test_base.py new file mode 100644 index 00000000..b6d3d62c --- /dev/null +++ b/bindings/python-cffi/tests/test_base.py @@ -0,0 +1,116 @@ +import pytest + +from notdb import _base as base +from notdb import _errors as errors + + +class TestNotmuchObject: + + def test_no_impl_methods(self): + class Object(base.NotmuchObject): + pass + with pytest.raises(TypeError): + Object() + + def test_impl_methods(self): + + class Object(base.NotmuchObject): + + def __init__(self): + pass + + @property + def alive(self): + pass + + def _destroy(self, parent=False): + pass + + Object() + + def test_del(self): + destroyed = False + + class Object(base.NotmuchObject): + + def __init__(self): + pass + + @property + def alive(self): + pass + + def _destroy(self, parent=False): + nonlocal destroyed + destroyed = True + + o = Object() + o.__del__() + assert destroyed + + +class TestMemoryPointer: + + @pytest.fixture + def obj(self): + class Cls: + ptr = base.MemoryPointer() + return Cls() + + def test_unset(self, obj): + with pytest.raises(errors.ObjectDestroyedError): + obj.ptr + + def test_set(self, obj): + obj.ptr = 'some' + assert obj.ptr == 'some' + + def test_cleared(self, obj): + obj.ptr = 'some' + obj.ptr + obj.ptr = None + with pytest.raises(errors.ObjectDestroyedError): + obj.ptr + + def test_two_instances(self, obj): + obj2 = obj.__class__() + obj.ptr = 'foo' + obj2.ptr = 'bar' + assert obj.ptr != obj2.ptr + + +class TestBinString: + + def test_type(self): + s = base.BinString(b'foo') + assert isinstance(s, str) + + def test_init_bytes(self): + s = base.BinString(b'foo') + assert s == 'foo' + + def test_init_str(self): + s = base.BinString('foo') + assert s == 'foo' + + def test_bytes(self): + s = base.BinString(b'foo') + assert bytes(s) == b'foo' + + def test_invalid_utf8(self): + s = base.BinString(b'\x80foo') + assert s == 'foo' + assert bytes(s) == b'\x80foo' + + def test_errors(self): + s = base.BinString(b'\x80foo', errors='replace') + assert s == '�foo' + assert bytes(s) == b'\x80foo' + + def test_encoding(self): + # pound sign: '£' == '\u00a3' latin-1: b'\xa3', utf-8: b'\xc2\xa3' + with pytest.raises(UnicodeDecodeError): + base.BinString(b'\xa3', errors='strict') + s = base.BinString(b'\xa3', encoding='latin-1', errors='strict') + assert s == '£' + assert bytes(s) == b'\xa3' diff --git a/bindings/python-cffi/tests/test_database.py b/bindings/python-cffi/tests/test_database.py new file mode 100644 index 00000000..02de0f41 --- /dev/null +++ b/bindings/python-cffi/tests/test_database.py @@ -0,0 +1,326 @@ +import collections +import configparser +import os +import pathlib + +import pytest + +import notdb +import notdb._errors as errors +import notdb._database as dbmod +import notdb._message as message + + +@pytest.fixture +def db(maildir): + with dbmod.Database.create(maildir.path) as db: + yield db + + +class TestDefaultDb: + """Tests for reading the default database. + + The error cases are fairly undefined, some relevant Python error + will come out if you give it a bad filename or if the file does + not parse correctly. So we're not testing this too deeply. + """ + + def test_config_pathname_default(self, monkeypatch): + monkeypatch.delenv('NOTMUCH_CONFIG', raising=False) + user = pathlib.Path('~/.notmuch-config').expanduser() + assert dbmod._config_pathname() == user + + def test_config_pathname_env(self, monkeypatch): + monkeypatch.setenv('NOTMUCH_CONFIG', '/some/random/path') + assert dbmod._config_pathname() == pathlib.Path('/some/random/path') + + def test_default_path_nocfg(self, monkeypatch, tmppath): + monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath/'foo')) + with pytest.raises(FileNotFoundError): + dbmod.Database.default_path() + + def test_default_path_cfg_is_dir(self, monkeypatch, tmppath): + monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath)) + with pytest.raises(IsADirectoryError): + dbmod.Database.default_path() + + def test_default_path_parseerr(self, monkeypatch, tmppath): + cfg = tmppath / 'notmuch-config' + with cfg.open('w') as fp: + fp.write('invalid') + monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg)) + with pytest.raises(configparser.Error): + dbmod.Database.default_path() + + def test_default_path_parse(self, monkeypatch, tmppath): + cfg = tmppath / 'notmuch-config' + with cfg.open('w') as fp: + fp.write('[database]\n') + fp.write('path={!s}'.format(tmppath)) + monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg)) + assert dbmod.Database.default_path() == tmppath + + def test_default_path_param(self, monkeypatch, tmppath): + cfg_dummy = tmppath / 'dummy' + monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg_dummy)) + cfg_real = tmppath / 'notmuch_config' + with cfg_real.open('w') as fp: + fp.write('[database]\n') + fp.write('path={!s}'.format(cfg_real/'mail')) + assert dbmod.Database.default_path(cfg_real) == cfg_real/'mail' + + +class TestCreate: + + def test_create(self, tmppath, db): + assert tmppath.joinpath('.notmuch/xapian/').exists() + + def test_create_already_open(self, tmppath, db): + with pytest.raises(errors.NotmuchError): + db.create(tmppath) + + def test_create_existing(self, tmppath, db): + with pytest.raises(errors.FileError): + dbmod.Database.create(path=tmppath) + + def test_close(self, db): + db.close() + + def test_del_noclose(self, db): + del db + + def test_close_del(self, db): + db.close() + del db + + def test_closed_attr(self, db): + assert not db.closed + db.close() + assert db.closed + + def test_ctx(self, db): + with db as ctx: + assert ctx is db + assert not db.closed + assert db.closed + + def test_path(self, db, tmppath): + assert db.path == tmppath + + def test_version(self, db): + assert db.version > 0 + + def test_needs_upgrade(self, db): + assert db.needs_upgrade in (True, False) + + +class TestAtomic: + + def test_exit_early(self, db): + with pytest.raises(errors.UnbalancedAtomicError): + with db.atomic() as ctx: + ctx.force_end() + + def test_exit_late(self, db): + with db.atomic() as ctx: + pass + with pytest.raises(errors.UnbalancedAtomicError): + ctx.force_end() + + +class TestRevision: + + def test_single_rev(self, db): + r = db.revision() + assert isinstance(r, dbmod.DbRevision) + assert isinstance(r.rev, int) + assert isinstance(r.uuid, bytes) + assert r is r + assert r == r + assert r <= r + assert r >= r + assert not r < r + assert not r > r + + def test_diff_db(self, tmppath): + dbpath0 = tmppath.joinpath('db0') + dbpath0.mkdir() + dbpath1 = tmppath.joinpath('db1') + dbpath1.mkdir() + db0 = dbmod.Database.create(path=dbpath0) + db1 = dbmod.Database.create(path=dbpath1) + r_db0 = db0.revision() + r_db1 = db1.revision() + assert r_db0 != r_db1 + assert r_db0.uuid != r_db1.uuid + + def test_cmp(self, db, maildir): + rev0 = db.revision() + _, pathname = maildir.deliver() + db.add(pathname, sync_flags=False) + rev1 = db.revision() + assert rev0 < rev1 + assert rev0 <= rev1 + assert not rev0 > rev1 + assert not rev0 >= rev1 + assert not rev0 == rev1 + assert rev0 != rev1 + + # XXX add tests for revisions comparisons + +class TestMessages: + + def test_add_message(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(pathname, sync_flags=False) + assert isinstance(msg, message.Message) + assert msg.path == pathname + assert msg.messageid == msgid + + def test_add_message_str(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(str(pathname), sync_flags=False) + + def test_add_message_bytes(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(os.fsencode(bytes(pathname)), sync_flags=False) + + def test_remove_message(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(pathname, sync_flags=False) + assert db.find(msgid) + dup = db.remove(pathname) + with pytest.raises(LookupError): + db.find(msgid) + + def test_remove_message_str(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(pathname, sync_flags=False) + assert db.find(msgid) + dup = db.remove(str(pathname)) + with pytest.raises(LookupError): + db.find(msgid) + + def test_remove_message_bytes(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(pathname, sync_flags=False) + assert db.find(msgid) + dup = db.remove(os.fsencode(bytes(pathname))) + with pytest.raises(LookupError): + db.find(msgid) + + def test_find_message(self, db, maildir): + msgid, pathname = maildir.deliver() + msg0, dup = db.add(pathname, sync_flags=False) + msg1 = db.find(msgid) + assert isinstance(msg1, message.Message) + assert msg1.messageid == msgid == msg0.messageid + assert msg1.path == pathname == msg0.path + + def test_find_message_notfound(self, db): + with pytest.raises(LookupError): + db.find('foo') + + def test_get_message(self, db, maildir): + msgid, pathname = maildir.deliver() + msg0, _ = db.add(pathname, sync_flags=False) + msg1 = db.get(pathname) + assert isinstance(msg1, message.Message) + assert msg1.messageid == msgid == msg0.messageid + assert msg1.path == pathname == msg0.path + + def test_get_message_str(self, db, maildir): + msgid, pathname = maildir.deliver() + db.add(pathname, sync_flags=False) + msg = db.get(str(pathname)) + assert msg.messageid == msgid + + def test_get_message_bytes(self, db, maildir): + msgid, pathname = maildir.deliver() + db.add(pathname, sync_flags=False) + msg = db.get(os.fsencode(bytes(pathname))) + assert msg.messageid == msgid + + +class TestTags: + # We just want to test this behaves like a set at a hight level. + # The set semantics are tested in detail in the test_tags module. + + def test_type(self, db): + assert isinstance(db.tags, collections.abc.Set) + + def test_none(self, db): + itags = iter(db.tags) + with pytest.raises(StopIteration): + next(itags) + assert len(db.tags) == 0 + assert not db.tags + + def test_some(self, db, maildir): + _, pathname = maildir.deliver() + msg, _ = db.add(pathname, sync_flags=False) + msg.tags.add('hello') + itags = iter(db.tags) + assert next(itags) == 'hello' + with pytest.raises(StopIteration): + next(itags) + assert 'hello' in msg.tags + + def test_cache(self, db): + assert db.tags is db.tags + + def test_iters(self, db): + i1 = iter(db.tags) + i2 = iter(db.tags) + assert i1 is not i2 + + +class TestQuery: + + @pytest.fixture + def db(self, maildir, notmuch): + """Return a read-only notdb.Database. + + The database will have 3 messages, 2 threads. + """ + msgid, _ = maildir.deliver(body='foo') + maildir.deliver(body='bar') + maildir.deliver(body='baz', + headers=[('In-Reply-To', '<{}>'.format(msgid))]) + notmuch('new') + with dbmod.Database(maildir.path, 'rw') as db: + yield db + + def test_count_messages(self, db): + assert db.count_messages('*') == 3 + + def test_messages_type(self, db): + msgs = db.messages('*') + assert isinstance(msgs, collections.abc.Iterator) + + def test_message_no_results(self, db): + msgs = db.messages('not_a_matching_query') + with pytest.raises(StopIteration): + next(msgs) + + def test_message_match(self, db): + msgs = db.messages('*') + msg = next(msgs) + assert isinstance(msg, notdb.Message) + + def test_count_threads(self, db): + assert db.count_threads('*') == 2 + + def test_threads_type(self, db): + threads = db.threads('*') + assert isinstance(threads, collections.abc.Iterator) + + def test_threads_no_match(self, db): + threads = db.threads('not_a_matching_query') + with pytest.raises(StopIteration): + next(threads) + + def test_threads_match(self, db): + threads = db.threads('*') + thread = next(threads) + assert isinstance(thread, notdb.Thread) diff --git a/bindings/python-cffi/tests/test_message.py b/bindings/python-cffi/tests/test_message.py new file mode 100644 index 00000000..56d06f34 --- /dev/null +++ b/bindings/python-cffi/tests/test_message.py @@ -0,0 +1,226 @@ +import collections.abc +import time +import pathlib + +import pytest + +import notdb + + +class TestMessage: + MaildirMsg = collections.namedtuple('MaildirMsg', ['msgid', 'path']) + + @pytest.fixture + def maildir_msg(self, maildir): + msgid, path = maildir.deliver() + return self.MaildirMsg(msgid, path) + + @pytest.fixture + def db(self, maildir): + with notdb.Database.create(maildir.path) as db: + yield db + + @pytest.fixture + def msg(self, db, maildir_msg): + msg, dup = db.add(maildir_msg.path, sync_flags=False) + yield msg + + def test_type(self, msg): + assert isinstance(msg, notdb.NotmuchObject) + assert isinstance(msg, notdb.Message) + + def test_alive(self, msg): + assert msg.alive + + def test_hash(self, msg): + assert hash(msg) + + def test_eq(self, db, msg): + copy = db.get(msg.path) + assert msg == copy + + def test_messageid_type(self, msg): + assert isinstance(msg.messageid, str) + assert isinstance(msg.messageid, notdb.BinString) + assert isinstance(bytes(msg.messageid), bytes) + + def test_messageid(self, msg, maildir_msg): + assert msg.messageid == maildir_msg.msgid + + def test_messageid_find(self, db, msg): + copy = db.find(msg.messageid) + assert msg.messageid == copy.messageid + + def test_threadid_type(self, msg): + assert isinstance(msg.threadid, str) + assert isinstance(msg.threadid, notdb.BinString) + assert isinstance(bytes(msg.threadid), bytes) + + def test_path_type(self, msg): + assert isinstance(msg.path, pathlib.Path) + + def test_path(self, msg, maildir_msg): + assert msg.path == maildir_msg.path + + def test_pathb_type(self, msg): + assert isinstance(msg.pathb, bytes) + + def test_pathb(self, msg, maildir_msg): + assert msg.path == maildir_msg.path + + def test_filenames_type(self, msg): + ifn = msg.filenames() + assert isinstance(ifn, collections.abc.Iterator) + + def test_filenames(self, msg): + ifn = msg.filenames() + fn = next(ifn) + assert fn == msg.path + assert isinstance(fn, pathlib.Path) + with pytest.raises(StopIteration): + next(ifn) + assert list(msg.filenames()) == [msg.path] + + def test_filenamesb_type(self, msg): + ifn = msg.filenamesb() + assert isinstance(ifn, collections.abc.Iterator) + + def test_filenamesb(self, msg): + ifn = msg.filenamesb() + fn = next(ifn) + assert fn == msg.pathb + assert isinstance(fn, bytes) + with pytest.raises(StopIteration): + next(ifn) + assert list(msg.filenamesb()) == [msg.pathb] + + def test_ghost_no(self, msg): + assert not msg.ghost + + def test_date(self, msg): + # XXX Someone seems to treat things as local time instead of + # UTC or the other way around. + now = int(time.time()) + assert abs(now - msg.date) < 3600*24 + + def test_header(self, msg): + assert msg.header('from') == 'src@example.com' + + def test_header_not_present(self, msg): + with pytest.raises(LookupError): + msg.header('foo') + + def test_freeze(self, msg): + with msg.frozen(): + msg.tags.add('foo') + msg.tags.add('bar') + msg.tags.discard('foo') + assert 'foo' not in msg.tags + assert 'bar' in msg.tags + + def test_freeze_err(self, msg): + msg.tags.add('foo') + try: + with msg.frozen(): + msg.tags.clear() + raise Exception('oops') + except Exception: + assert 'foo' in msg.tags + else: + pytest.fail('Context manager did not raise') + + def test_replies_type(self, msg): + assert isinstance(msg.replies(), collections.abc.Iterator) + + def test_replies(self, msg): + with pytest.raises(StopIteration): + next(msg.replies()) + + +class TestProperties: + + @pytest.fixture + def props(self, maildir): + msgid, path = maildir.deliver() + with notdb.Database.create(maildir.path) as db: + msg, dup = db.add(path, sync_flags=False) + yield msg.properties + + def test_type(self, props): + assert isinstance(props, collections.abc.MutableMapping) + + def test_add_single(self, props): + props['foo'] = 'bar' + assert props['foo'] == 'bar' + props.add('bar', 'baz') + assert props['bar'] == 'baz' + + def test_add_dup(self, props): + props.add('foo', 'bar') + props.add('foo', 'baz') + assert props['foo'] == 'bar' + assert (set(props.getall('foo', exact=True)) + == {('foo', 'bar'), ('foo', 'baz')}) + + def test_len(self, props): + props.add('foo', 'a') + props.add('foo', 'b') + props.add('bar', 'a') + assert len(props) == 3 + assert len(props.keys()) == 2 + assert len(props.values()) == 2 + assert len(props.items()) == 3 + + def test_del(self, props): + props.add('foo', 'a') + props.add('foo', 'b') + del props['foo'] + with pytest.raises(KeyError): + props['foo'] + + def test_remove(self, props): + props.add('foo', 'a') + props.add('foo', 'b') + props.remove('foo', 'a') + assert props['foo'] == 'b' + + def test_view_abcs(self, props): + assert isinstance(props.keys(), collections.abc.KeysView) + assert isinstance(props.values(), collections.abc.ValuesView) + assert isinstance(props.items(), collections.abc.ItemsView) + + def test_pop(self, props): + props.add('foo', 'a') + props.add('foo', 'b') + val = props.pop('foo') + assert val == 'a' + + def test_pop_default(self, props): + with pytest.raises(KeyError): + props.pop('foo') + assert props.pop('foo', 'default') == 'default' + + def test_popitem(self, props): + props.add('foo', 'a') + assert props.popitem() == ('foo', 'a') + with pytest.raises(KeyError): + props.popitem() + + def test_clear(self, props): + props.add('foo', 'a') + props.clear() + assert len(props) == 0 + + def test_getall(self, props): + props.add('foo', 'a') + assert set(props.getall('foo')) == {('foo', 'a')} + + def test_getall_prefix(self, props): + props.add('foo', 'a') + props.add('foobar', 'b') + assert set(props.getall('foo')) == {('foo', 'a'), ('foobar', 'b')} + + def test_getall_exact(self, props): + props.add('foo', 'a') + props.add('foobar', 'b') + assert set(props.getall('foo', exact=True)) == {('foo', 'a')} diff --git a/bindings/python-cffi/tests/test_tags.py b/bindings/python-cffi/tests/test_tags.py new file mode 100644 index 00000000..0cb42d89 --- /dev/null +++ b/bindings/python-cffi/tests/test_tags.py @@ -0,0 +1,177 @@ +"""Tests for the behaviour of immutable and mutable tagsets. + +This module tests the Pythonic behaviour of the sets. +""" + +import collections +import subprocess +import textwrap + +import pytest + +from notdb import _database as database +from notdb import _tags as tags + + +class TestImmutable: + + @pytest.fixture + def tagset(self, maildir, notmuch): + """An non-empty immutable tagset. + + This will have the default new mail tags: inbox, unread. + """ + maildir.deliver() + notmuch('new') + with database.Database(maildir.path) as db: + yield db.tags + + def test_type(self, tagset): + assert isinstance(tagset, tags.ImmutableTagSet) + assert isinstance(tagset, collections.abc.Set) + + def test_hash(self, tagset, maildir, notmuch): + h0 = hash(tagset) + notmuch('tag', '+foo', '*') + with database.Database(maildir.path) as db: + h1 = hash(db.tags) + assert h0 != h1 + + def test_eq(self, tagset): + assert tagset == tagset + + def test_neq(self, tagset, maildir, notmuch): + notmuch('tag', '+foo', '*') + with database.Database(maildir.path) as db: + assert tagset != db.tags + + def test_contains(self, tagset): + print(tuple(tagset)) + assert 'unread' in tagset + assert 'foo' not in tagset + + def test_iter(self, tagset): + expected = sorted(['unread', 'inbox']) + found = [] + for tag in tagset: + assert isinstance(tag, str) + found.append(tag) + assert expected == sorted(found) + + def test_special_iter(self, tagset): + expected = sorted([b'unread', b'inbox']) + found = [] + for tag in tagset.iter(): + assert isinstance(tag, bytes) + found.append(tag) + assert expected == sorted(found) + + def test_special_iter_codec(self, tagset): + for tag in tagset.iter(encoding='ascii', errors='surrogateescape'): + assert isinstance(tag, str) + + def test_len(self, tagset): + assert len(tagset) == 2 + + def test_and(self, tagset): + common = tagset & {'unread'} + assert isinstance(common, set) + assert isinstance(common, collections.abc.Set) + assert common == {'unread'} + + def test_or(self, tagset): + res = tagset | {'foo'} + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'unread', 'inbox', 'foo'} + + def test_sub(self, tagset): + res = tagset - {'unread'} + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'inbox'} + + def test_rsub(self, tagset): + res = {'foo', 'unread'} - tagset + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'foo'} + + def test_xor(self, tagset): + res = tagset ^ {'unread', 'foo'} + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'inbox', 'foo'} + + def test_rxor(self, tagset): + res = {'unread', 'foo'} ^ tagset + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'inbox', 'foo'} + + +class TestMutableTagset: + + @pytest.fixture + def tagset(self, maildir, notmuch): + """An non-empty mutable tagset. + + This will have the default new mail tags: inbox, unread. + """ + _, pathname = maildir.deliver() + notmuch('new') + with database.Database(maildir.path, + mode=database.Mode.READ_WRITE) as db: + msg = db.get(pathname) + yield msg.tags + + def test_type(self, tagset): + assert isinstance(tagset, collections.abc.MutableSet) + assert isinstance(tagset, tags.MutableTagSet) + + def test_hash(self, tagset): + assert not isinstance(tagset, collections.abc.Hashable) + with pytest.raises(TypeError): + hash(tagset) + + def test_add(self, tagset): + assert 'foo' not in tagset + tagset.add('foo') + assert 'foo' in tagset + + def test_discard(self, tagset): + assert 'inbox' in tagset + tagset.discard('inbox') + assert 'inbox' not in tagset + + def test_discard_not_present(self, tagset): + assert 'foo' not in tagset + tagset.discard('foo') + + def test_clear(self, tagset): + assert len(tagset) > 0 + tagset.clear() + assert len(tagset) == 0 + + def test_from_maildir_flags(self, maildir, notmuch): + _, pathname = maildir.deliver(flagged=True) + notmuch('new') + with database.Database(maildir.path, + mode=database.Mode.READ_WRITE) as db: + msg = db.get(pathname) + msg.tags.discard('flagged') + msg.tags.from_maildir_flags() + assert 'flagged' in msg.tags + + def test_to_maildir_flags(self, maildir, notmuch): + _, pathname = maildir.deliver(flagged=True) + notmuch('new') + with database.Database(maildir.path, + mode=database.Mode.READ_WRITE) as db: + msg = db.get(pathname) + flags = msg.path.name.split(',')[-1] + assert 'F' in flags + msg.tags.discard('flagged') + msg.tags.to_maildir_flags() + flags = msg.path.name.split(',')[-1] + assert 'F' not in flags diff --git a/bindings/python-cffi/tests/test_thread.py b/bindings/python-cffi/tests/test_thread.py new file mode 100644 index 00000000..366bd8a5 --- /dev/null +++ b/bindings/python-cffi/tests/test_thread.py @@ -0,0 +1,102 @@ +import collections.abc +import time + +import pytest + +import notdb + + +@pytest.fixture +def thread(maildir, notmuch): + """Return a single thread with one matched message.""" + msgid, _ = maildir.deliver(body='foo') + maildir.deliver(body='bar', + headers=[('In-Reply-To', '<{}>'.format(msgid))]) + notmuch('new') + with notdb.Database(maildir.path) as db: + yield next(db.threads('foo')) + + +def test_type(thread): + assert isinstance(thread, notdb.Thread) + assert isinstance(thread, collections.abc.Iterable) + + +def test_threadid(thread): + assert isinstance(thread.threadid, notdb.BinString) + assert thread.threadid + + +def test_len(thread): + assert len(thread) == 2 + + +def test_toplevel_type(thread): + assert isinstance(thread.toplevel(), collections.abc.Iterator) + + +def test_toplevel(thread): + msgs = thread.toplevel() + assert isinstance(next(msgs), notdb.Message) + with pytest.raises(StopIteration): + next(msgs) + + +def test_toplevel_reply(thread): + msg = next(thread.toplevel()) + assert isinstance(next(msg.replies()), notdb.Message) + + +def test_iter(thread): + msgs = list(iter(thread)) + assert len(msgs) == len(thread) + for msg in msgs: + assert isinstance(msg, notdb.Message) + + +def test_matched(thread): + assert thread.matched == 1 + + +def test_authors_type(thread): + assert isinstance(thread.authors, notdb.BinString) + + +def test_authors(thread): + assert thread.authors == 'src@example.com' + + +def test_subject(thread): + assert thread.subject == 'Test mail' + + +def test_first(thread): + # XXX Someone seems to treat things as local time instead of + # UTC or the other way around. + now = int(time.time()) + assert abs(now - thread.first) < 3600*24 + + +def test_last(thread): + # XXX Someone seems to treat things as local time instead of + # UTC or the other way around. + now = int(time.time()) + assert abs(now - thread.last) < 3600*24 + + +def test_first_last(thread): + # Sadly we only have second resolution so these will always be the + # same time in our tests. + assert thread.first <= thread.last + + +def test_tags_type(thread): + assert isinstance(thread.tags, notdb.ImmutableTagSet) + + +def test_tags_cache(thread): + assert thread.tags is thread.tags + + +def test_tags(thread): + assert 'inbox' in thread.tags |
