diff options
| author | David Bremner <bremner@debian.org> | 2023-12-01 07:51:09 -0400 |
|---|---|---|
| committer | David Bremner <bremner@debian.org> | 2023-12-01 07:51:09 -0400 |
| commit | 126347b6942dd4b0291beb67b119431ebd750a2a (patch) | |
| tree | 532c5163cb0972c8b9e6c8b4577b86afb9c6a6a2 /bindings/python-cffi/tests | |
Import notmuch_0.38.2.orig.tar.xz
[dgit import orig notmuch_0.38.2.orig.tar.xz]
Diffstat (limited to 'bindings/python-cffi/tests')
| -rw-r--r-- | bindings/python-cffi/tests/conftest.py | 149 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_base.py | 116 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_config.py | 60 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_database.py | 342 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_errors.py | 8 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_message.py | 229 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_tags.py | 242 | ||||
| -rw-r--r-- | bindings/python-cffi/tests/test_thread.py | 109 |
8 files changed, 1255 insertions, 0 deletions
diff --git a/bindings/python-cffi/tests/conftest.py b/bindings/python-cffi/tests/conftest.py new file mode 100644 index 00000000..fe90c787 --- /dev/null +++ b/bindings/python-cffi/tests/conftest.py @@ -0,0 +1,149 @@ +import email.message +import mailbox +import pathlib +import shutil +import socket +import subprocess +import textwrap +import time +import os + +import pytest + + +def pytest_report_header(): + which = shutil.which('notmuch') + vers = subprocess.run(['notmuch', '--version'], stdout=subprocess.PIPE) + return ['{} ({})'.format(vers.stdout.decode(errors='replace').strip(),which)] + + +@pytest.fixture(scope='function') +def tmppath(tmpdir): + """The tmpdir fixture wrapped in pathlib.Path.""" + return pathlib.Path(str(tmpdir)) + + +@pytest.fixture +def notmuch(maildir): + """Return a function which runs notmuch commands on our test maildir. + + This uses the notmuch-config file created by the ``maildir`` + fixture. + """ + def run(*args): + """Run a notmuch command. + + This function runs with a timeout error as many notmuch + commands may block if multiple processes are trying to open + the database in write-mode. It is all too easy to + accidentally do this in the unittests. + """ + cfg_fname = maildir.path / 'notmuch-config' + cmd = ['notmuch'] + list(args) + env = os.environ.copy() + env['NOTMUCH_CONFIG'] = str(cfg_fname) + proc = subprocess.run(cmd, + timeout=120, + env=env) + proc.check_returncode() + return run + + +@pytest.fixture +def maildir(tmppath): + """A basic test interface to a valid maildir directory. + + This creates a valid maildir and provides a simple mechanism to + deliver test emails to it. It also writes a notmuch-config file + in the top of the maildir. + """ + cur = tmppath / 'cur' + cur.mkdir() + new = tmppath / 'new' + new.mkdir() + tmp = tmppath / 'tmp' + tmp.mkdir() + cfg_fname = tmppath/'notmuch-config' + with cfg_fname.open('w') as fp: + fp.write(textwrap.dedent("""\ + [database] + path={tmppath!s} + [user] + name=Some Hacker + primary_email=dst@example.com + [new] + tags=unread;inbox; + ignore= + [search] + exclude_tags=deleted;spam; + [maildir] + synchronize_flags=true + """.format(tmppath=tmppath))) + return MailDir(tmppath) + + +class MailDir: + """An interface around a correct maildir.""" + + def __init__(self, path): + self._path = pathlib.Path(path) + self.mailbox = mailbox.Maildir(str(path)) + self._idcount = 0 + + @property + def path(self): + """The pathname of the maildir.""" + return self._path + + def _next_msgid(self): + """Return a new unique message ID.""" + msgid = '{}@{}'.format(self._idcount, socket.getfqdn()) + self._idcount += 1 + return msgid + + def deliver(self, + subject='Test mail', + body='This is a test mail', + to='dst@example.com', + frm='src@example.com', + headers=None, + new=False, # Move to new dir or cur dir? + keywords=None, # List of keywords or labels + seen=False, # Seen flag (cur dir only) + replied=False, # Replied flag (cur dir only) + flagged=False): # Flagged flag (cur dir only) + """Deliver a new mail message in the mbox. + + This does only adds the message to maildir, does not insert it + into the notmuch database. + + :returns: A tuple of (msgid, pathname). + """ + msgid = self._next_msgid() + when = time.time() + msg = email.message.EmailMessage() + msg.add_header('Received', 'by MailDir; {}'.format(time.ctime(when))) + msg.add_header('Message-ID', '<{}>'.format(msgid)) + msg.add_header('Date', time.ctime(when)) + msg.add_header('From', frm) + msg.add_header('To', to) + msg.add_header('Subject', subject) + if headers: + for h, v in headers: + msg.add_header(h, v) + msg.set_content(body) + mdmsg = mailbox.MaildirMessage(msg) + if not new: + mdmsg.set_subdir('cur') + if flagged: + mdmsg.add_flag('F') + if replied: + mdmsg.add_flag('R') + if seen: + mdmsg.add_flag('S') + boxid = self.mailbox.add(mdmsg) + basename = boxid + if mdmsg.get_info(): + basename += mailbox.Maildir.colon + mdmsg.get_info() + msgpath = self.path / mdmsg.get_subdir() / basename + return (msgid, msgpath) diff --git a/bindings/python-cffi/tests/test_base.py b/bindings/python-cffi/tests/test_base.py new file mode 100644 index 00000000..d3280a67 --- /dev/null +++ b/bindings/python-cffi/tests/test_base.py @@ -0,0 +1,116 @@ +import pytest + +from notmuch2 import _base as base +from notmuch2 import _errors as errors + + +class TestNotmuchObject: + + def test_no_impl_methods(self): + class Object(base.NotmuchObject): + pass + with pytest.raises(TypeError): + Object() + + def test_impl_methods(self): + + class Object(base.NotmuchObject): + + def __init__(self): + pass + + @property + def alive(self): + pass + + def _destroy(self, parent=False): + pass + + Object() + + def test_del(self): + destroyed = False + + class Object(base.NotmuchObject): + + def __init__(self): + pass + + @property + def alive(self): + pass + + def _destroy(self, parent=False): + nonlocal destroyed + destroyed = True + + o = Object() + o.__del__() + assert destroyed + + +class TestMemoryPointer: + + @pytest.fixture + def obj(self): + class Cls: + ptr = base.MemoryPointer() + return Cls() + + def test_unset(self, obj): + with pytest.raises(errors.ObjectDestroyedError): + obj.ptr + + def test_set(self, obj): + obj.ptr = 'some' + assert obj.ptr == 'some' + + def test_cleared(self, obj): + obj.ptr = 'some' + obj.ptr + obj.ptr = None + with pytest.raises(errors.ObjectDestroyedError): + obj.ptr + + def test_two_instances(self, obj): + obj2 = obj.__class__() + obj.ptr = 'foo' + obj2.ptr = 'bar' + assert obj.ptr != obj2.ptr + + +class TestBinString: + + def test_type(self): + s = base.BinString(b'foo') + assert isinstance(s, str) + + def test_init_bytes(self): + s = base.BinString(b'foo') + assert s == 'foo' + + def test_init_str(self): + s = base.BinString('foo') + assert s == 'foo' + + def test_bytes(self): + s = base.BinString(b'foo') + assert bytes(s) == b'foo' + + def test_invalid_utf8(self): + s = base.BinString(b'\x80foo') + assert s == 'foo' + assert bytes(s) == b'\x80foo' + + def test_errors(self): + s = base.BinString(b'\x80foo', errors='replace') + assert s == '�foo' + assert bytes(s) == b'\x80foo' + + def test_encoding(self): + # pound sign: '£' == '\u00a3' latin-1: b'\xa3', utf-8: b'\xc2\xa3' + with pytest.raises(UnicodeDecodeError): + base.BinString(b'\xa3', errors='strict') + s = base.BinString(b'\xa3', encoding='latin-1', errors='strict') + assert s == '£' + assert bytes(s) == b'\xa3' diff --git a/bindings/python-cffi/tests/test_config.py b/bindings/python-cffi/tests/test_config.py new file mode 100644 index 00000000..2a7f42f0 --- /dev/null +++ b/bindings/python-cffi/tests/test_config.py @@ -0,0 +1,60 @@ +import collections.abc + +import pytest + +import notmuch2._database as dbmod + +import notmuch2._config as config + + +class TestIter: + + @pytest.fixture + def db(self, maildir): + with dbmod.Database.create(maildir.path) as db: + yield db + + def test_type(self, db): + assert isinstance(db.config, collections.abc.MutableMapping) + assert isinstance(db.config, config.ConfigMapping) + + def test_alive(self, db): + assert db.config.alive + + def test_set_get(self, maildir): + # Ensure get-set works from different db objects + with dbmod.Database.create(maildir.path, config=dbmod.Database.CONFIG.EMPTY) as db0: + db0.config['spam'] = 'ham' + with dbmod.Database(maildir.path, config=dbmod.Database.CONFIG.EMPTY) as db1: + assert db1.config['spam'] == 'ham' + + def test_get_keyerror(self, db): + with pytest.raises(KeyError): + val = db.config['not-a-key'] + print(repr(val)) + + def test_iter(self, db): + def has_prefix(x): + return x.startswith('TEST.') + + assert [ x for x in db.config if has_prefix(x) ] == [] + db.config['TEST.spam'] = 'TEST.ham' + db.config['TEST.eggs'] = 'TEST.bacon' + assert { x for x in db.config if has_prefix(x) } == {'TEST.spam', 'TEST.eggs'} + assert { x for x in db.config.keys() if has_prefix(x) } == {'TEST.spam', 'TEST.eggs'} + assert { x for x in db.config.values() if has_prefix(x) } == {'TEST.ham', 'TEST.bacon'} + assert { (x, y) for (x,y) in db.config.items() if has_prefix(x) } == \ + {('TEST.spam', 'TEST.ham'), ('TEST.eggs', 'TEST.bacon')} + + def test_len(self, db): + defaults = len(db.config) + db.config['spam'] = 'ham' + assert len(db.config) == defaults + 1 + db.config['eggs'] = 'bacon' + assert len(db.config) == defaults + 2 + + def test_del(self, db): + db.config['spam'] = 'ham' + assert db.config.get('spam') == 'ham' + del db.config['spam'] + assert db.config.get('spam') is None diff --git a/bindings/python-cffi/tests/test_database.py b/bindings/python-cffi/tests/test_database.py new file mode 100644 index 00000000..f1d12ea6 --- /dev/null +++ b/bindings/python-cffi/tests/test_database.py @@ -0,0 +1,342 @@ +import collections +import configparser +import os +import pathlib + +import pytest + +import notmuch2 +import notmuch2._errors as errors +import notmuch2._database as dbmod +import notmuch2._message as message + + +@pytest.fixture +def db(maildir): + with dbmod.Database.create(maildir.path, config=notmuch2.Database.CONFIG.EMPTY) as db: + yield db + + +class TestDefaultDb: + """Tests for reading the default database. + + The error cases are fairly undefined, some relevant Python error + will come out if you give it a bad filename or if the file does + not parse correctly. So we're not testing this too deeply. + """ + + def test_config_pathname_default(self, monkeypatch): + monkeypatch.delenv('NOTMUCH_CONFIG', raising=False) + user = pathlib.Path('~/.notmuch-config').expanduser() + assert dbmod._config_pathname() == user + + def test_config_pathname_env(self, monkeypatch): + monkeypatch.setenv('NOTMUCH_CONFIG', '/some/random/path') + assert dbmod._config_pathname() == pathlib.Path('/some/random/path') + + def test_default_path_nocfg(self, monkeypatch, tmppath): + monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath/'foo')) + with pytest.raises(FileNotFoundError): + dbmod.Database.default_path() + + def test_default_path_cfg_is_dir(self, monkeypatch, tmppath): + monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath)) + with pytest.raises(IsADirectoryError): + dbmod.Database.default_path() + + def test_default_path_parseerr(self, monkeypatch, tmppath): + cfg = tmppath / 'notmuch-config' + with cfg.open('w') as fp: + fp.write('invalid') + monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg)) + with pytest.raises(configparser.Error): + dbmod.Database.default_path() + + def test_default_path_parse(self, monkeypatch, tmppath): + cfg = tmppath / 'notmuch-config' + with cfg.open('w') as fp: + fp.write('[database]\n') + fp.write('path={!s}'.format(tmppath)) + monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg)) + assert dbmod.Database.default_path() == tmppath + + def test_default_path_param(self, monkeypatch, tmppath): + cfg_dummy = tmppath / 'dummy' + monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg_dummy)) + cfg_real = tmppath / 'notmuch_config' + with cfg_real.open('w') as fp: + fp.write('[database]\n') + fp.write('path={!s}'.format(cfg_real/'mail')) + assert dbmod.Database.default_path(cfg_real) == cfg_real/'mail' + + +class TestCreate: + + def test_create(self, tmppath, db): + assert tmppath.joinpath('.notmuch/xapian/').exists() + + def test_create_already_open(self, tmppath, db): + with pytest.raises(errors.NotmuchError): + db.create(tmppath) + + def test_create_existing(self, tmppath, db): + with pytest.raises(errors.DatabaseExistsError): + dbmod.Database.create(path=tmppath) + + def test_close(self, db): + db.close() + + def test_del_noclose(self, db): + del db + + def test_close_del(self, db): + db.close() + del db + + def test_closed_attr(self, db): + assert not db.closed + db.close() + assert db.closed + + def test_ctx(self, db): + with db as ctx: + assert ctx is db + assert not db.closed + assert db.closed + + def test_path(self, db, tmppath): + assert db.path == tmppath + + def test_version(self, db): + assert db.version > 0 + + def test_needs_upgrade(self, db): + assert db.needs_upgrade in (True, False) + + +class TestAtomic: + + def test_exit_early(self, db): + with pytest.raises(errors.UnbalancedAtomicError): + with db.atomic() as ctx: + ctx.force_end() + + def test_exit_late(self, db): + with db.atomic() as ctx: + pass + with pytest.raises(errors.UnbalancedAtomicError): + ctx.force_end() + + def test_abort(self, db): + with db.atomic() as txn: + txn.abort() + assert db.closed + + +class TestRevision: + + def test_single_rev(self, db): + r = db.revision() + assert isinstance(r, dbmod.DbRevision) + assert isinstance(r.rev, int) + assert isinstance(r.uuid, bytes) + assert r is r + assert r == r + assert r <= r + assert r >= r + assert not r < r + assert not r > r + + def test_diff_db(self, tmppath): + dbpath0 = tmppath.joinpath('db0') + dbpath0.mkdir() + dbpath1 = tmppath.joinpath('db1') + dbpath1.mkdir() + db0 = dbmod.Database.create(path=dbpath0) + db1 = dbmod.Database.create(path=dbpath1) + r_db0 = db0.revision() + r_db1 = db1.revision() + assert r_db0 != r_db1 + assert r_db0.uuid != r_db1.uuid + + def test_cmp(self, db, maildir): + rev0 = db.revision() + _, pathname = maildir.deliver() + db.add(pathname, sync_flags=False) + rev1 = db.revision() + assert rev0 < rev1 + assert rev0 <= rev1 + assert not rev0 > rev1 + assert not rev0 >= rev1 + assert not rev0 == rev1 + assert rev0 != rev1 + + # XXX add tests for revisions comparisons + +class TestMessages: + + def test_add_message(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(pathname, sync_flags=False) + assert isinstance(msg, message.Message) + assert msg.path == pathname + assert msg.messageid == msgid + + def test_add_message_str(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(str(pathname), sync_flags=False) + + def test_add_message_bytes(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(os.fsencode(bytes(pathname)), sync_flags=False) + + def test_remove_message(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(pathname, sync_flags=False) + assert db.find(msgid) + dup = db.remove(pathname) + with pytest.raises(LookupError): + db.find(msgid) + + def test_remove_message_str(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(pathname, sync_flags=False) + assert db.find(msgid) + dup = db.remove(str(pathname)) + with pytest.raises(LookupError): + db.find(msgid) + + def test_remove_message_bytes(self, db, maildir): + msgid, pathname = maildir.deliver() + msg, dup = db.add(pathname, sync_flags=False) + assert db.find(msgid) + dup = db.remove(os.fsencode(bytes(pathname))) + with pytest.raises(LookupError): + db.find(msgid) + + def test_find_message(self, db, maildir): + msgid, pathname = maildir.deliver() + msg0, dup = db.add(pathname, sync_flags=False) + msg1 = db.find(msgid) + assert isinstance(msg1, message.Message) + assert msg1.messageid == msgid == msg0.messageid + assert msg1.path == pathname == msg0.path + + def test_find_message_notfound(self, db): + with pytest.raises(LookupError): + db.find('foo') + + def test_get_message(self, db, maildir): + msgid, pathname = maildir.deliver() + msg0, _ = db.add(pathname, sync_flags=False) + msg1 = db.get(pathname) + assert isinstance(msg1, message.Message) + assert msg1.messageid == msgid == msg0.messageid + assert msg1.path == pathname == msg0.path + + def test_get_message_str(self, db, maildir): + msgid, pathname = maildir.deliver() + db.add(pathname, sync_flags=False) + msg = db.get(str(pathname)) + assert msg.messageid == msgid + + def test_get_message_bytes(self, db, maildir): + msgid, pathname = maildir.deliver() + db.add(pathname, sync_flags=False) + msg = db.get(os.fsencode(bytes(pathname))) + assert msg.messageid == msgid + + +class TestTags: + # We just want to test this behaves like a set at a hight level. + # The set semantics are tested in detail in the test_tags module. + + def test_type(self, db): + assert isinstance(db.tags, collections.abc.Set) + + def test_none(self, db): + itags = iter(db.tags) + with pytest.raises(StopIteration): + next(itags) + assert len(db.tags) == 0 + assert not db.tags + + def test_some(self, db, maildir): + _, pathname = maildir.deliver() + msg, _ = db.add(pathname, sync_flags=False) + msg.tags.add('hello') + itags = iter(db.tags) + assert next(itags) == 'hello' + with pytest.raises(StopIteration): + next(itags) + assert 'hello' in msg.tags + + def test_cache(self, db): + assert db.tags is db.tags + + def test_iters(self, db): + i1 = iter(db.tags) + i2 = iter(db.tags) + assert i1 is not i2 + + +class TestQuery: + + @pytest.fixture + def db(self, maildir, notmuch): + """Return a read-only notmuch2.Database. + + The database will have 3 messages, 2 threads. + """ + msgid, _ = maildir.deliver(body='foo') + maildir.deliver(body='bar') + maildir.deliver(body='baz', + headers=[('In-Reply-To', '<{}>'.format(msgid))]) + notmuch('new') + with dbmod.Database(maildir.path, 'rw', config=notmuch2.Database.CONFIG.EMPTY) as db: + yield db + + def test_count_messages(self, db): + assert db.count_messages('*') == 3 + + def test_messages_type(self, db): + msgs = db.messages('*') + assert isinstance(msgs, collections.abc.Iterator) + + def test_message_no_results(self, db): + msgs = db.messages('not_a_matching_query') + with pytest.raises(StopIteration): + next(msgs) + + def test_message_match(self, db): + msgs = db.messages('*') + msg = next(msgs) + assert isinstance(msg, notmuch2.Message) + + def test_count_threads(self, db): + assert db.count_threads('*') == 2 + + def test_threads_type(self, db): + threads = db.threads('*') + assert isinstance(threads, collections.abc.Iterator) + + def test_threads_no_match(self, db): + threads = db.threads('not_a_matching_query') + with pytest.raises(StopIteration): + next(threads) + + def test_threads_match(self, db): + threads = db.threads('*') + thread = next(threads) + assert isinstance(thread, notmuch2.Thread) + + def test_use_threaded_message_twice(self, db): + thread = next(db.threads('*')) + for msg in thread.toplevel(): + assert isinstance(msg, notmuch2.Message) + assert msg.alive + del msg + for msg in thread: + assert isinstance(msg, notmuch2.Message) + assert msg.alive + del msg diff --git a/bindings/python-cffi/tests/test_errors.py b/bindings/python-cffi/tests/test_errors.py new file mode 100644 index 00000000..c2519f86 --- /dev/null +++ b/bindings/python-cffi/tests/test_errors.py @@ -0,0 +1,8 @@ +from notmuch2 import _capi as capi +from notmuch2 import _errors as errors + +def test_status_no_message(): + exc = errors.NotmuchError(capi.lib.NOTMUCH_STATUS_PATH_ERROR) + assert exc.status == capi.lib.NOTMUCH_STATUS_PATH_ERROR + assert exc.message is None + assert str(exc) == 'Path supplied is illegal for this function' diff --git a/bindings/python-cffi/tests/test_message.py b/bindings/python-cffi/tests/test_message.py new file mode 100644 index 00000000..56701d05 --- /dev/null +++ b/bindings/python-cffi/tests/test_message.py @@ -0,0 +1,229 @@ +import collections.abc +import time +import pathlib + +import pytest + +import notmuch2 + + +class TestMessage: + MaildirMsg = collections.namedtuple('MaildirMsg', ['msgid', 'path']) + + @pytest.fixture + def maildir_msg(self, maildir): + msgid, path = maildir.deliver() + return self.MaildirMsg(msgid, path) + + @pytest.fixture + def db(self, maildir): + with notmuch2.Database.create(maildir.path) as db: + yield db + + @pytest.fixture + def msg(self, db, maildir_msg): + msg, dup = db.add(maildir_msg.path, sync_flags=False) + yield msg + + def test_type(self, msg): + assert isinstance(msg, notmuch2.NotmuchObject) + assert isinstance(msg, notmuch2.Message) + + def test_alive(self, msg): + assert msg.alive + + def test_hash(self, msg): + assert hash(msg) + + def test_eq(self, db, msg): + copy = db.get(msg.path) + assert msg == copy + + def test_messageid_type(self, msg): + assert isinstance(msg.messageid, str) + assert isinstance(msg.messageid, notmuch2.BinString) + assert isinstance(bytes(msg.messageid), bytes) + + def test_messageid(self, msg, maildir_msg): + assert msg.messageid == maildir_msg.msgid + + def test_messageid_find(self, db, msg): + copy = db.find(msg.messageid) + assert msg.messageid == copy.messageid + + def test_threadid_type(self, msg): + assert isinstance(msg.threadid, str) + assert isinstance(msg.threadid, notmuch2.BinString) + assert isinstance(bytes(msg.threadid), bytes) + + def test_path_type(self, msg): + assert isinstance(msg.path, pathlib.Path) + + def test_path(self, msg, maildir_msg): + assert msg.path == maildir_msg.path + + def test_pathb_type(self, msg): + assert isinstance(msg.pathb, bytes) + + def test_pathb(self, msg, maildir_msg): + assert msg.path == maildir_msg.path + + def test_filenames_type(self, msg): + ifn = msg.filenames() + assert isinstance(ifn, collections.abc.Iterator) + + def test_filenames(self, msg): + ifn = msg.filenames() + fn = next(ifn) + assert fn == msg.path + assert isinstance(fn, pathlib.Path) + with pytest.raises(StopIteration): + next(ifn) + assert list(msg.filenames()) == [msg.path] + + def test_filenamesb_type(self, msg): + ifn = msg.filenamesb() + assert isinstance(ifn, collections.abc.Iterator) + + def test_filenamesb(self, msg): + ifn = msg.filenamesb() + fn = next(ifn) + assert fn == msg.pathb + assert isinstance(fn, bytes) + with pytest.raises(StopIteration): + next(ifn) + assert list(msg.filenamesb()) == [msg.pathb] + + def test_ghost_no(self, msg): + assert not msg.ghost + + def test_matched_no(self,msg): + assert not msg.matched + + def test_date(self, msg): + # XXX Someone seems to treat things as local time instead of + # UTC or the other way around. + now = int(time.time()) + assert abs(now - msg.date) < 3600*24 + + def test_header(self, msg): + assert msg.header('from') == 'src@example.com' + + def test_header_not_present(self, msg): + with pytest.raises(LookupError): + msg.header('foo') + + def test_freeze(self, msg): + with msg.frozen(): + msg.tags.add('foo') + msg.tags.add('bar') + msg.tags.discard('foo') + assert 'foo' not in msg.tags + assert 'bar' in msg.tags + + def test_freeze_err(self, msg): + msg.tags.add('foo') + try: + with msg.frozen(): + msg.tags.clear() + raise Exception('oops') + except Exception: + assert 'foo' in msg.tags + else: + pytest.fail('Context manager did not raise') + + def test_replies_type(self, msg): + assert isinstance(msg.replies(), collections.abc.Iterator) + + def test_replies(self, msg): + with pytest.raises(StopIteration): + next(msg.replies()) + + +class TestProperties: + + @pytest.fixture + def props(self, maildir): + msgid, path = maildir.deliver() + with notmuch2.Database.create(maildir.path) as db: + msg, dup = db.add(path, sync_flags=False) + yield msg.properties + + def test_type(self, props): + assert isinstance(props, collections.abc.MutableMapping) + + def test_add_single(self, props): + props['foo'] = 'bar' + assert props['foo'] == 'bar' + props.add('bar', 'baz') + assert props['bar'] == 'baz' + + def test_add_dup(self, props): + props.add('foo', 'bar') + props.add('foo', 'baz') + assert props['foo'] == 'bar' + assert (set(props.getall('foo', exact=True)) + == {('foo', 'bar'), ('foo', 'baz')}) + + def test_len(self, props): + props.add('foo', 'a') + props.add('foo', 'b') + props.add('bar', 'a') + assert len(props) == 3 + assert len(props.keys()) == 2 + assert len(props.values()) == 2 + assert len(props.items()) == 3 + + def test_del(self, props): + props.add('foo', 'a') + props.add('foo', 'b') + del props['foo'] + with pytest.raises(KeyError): + props['foo'] + + def test_remove(self, props): + props.add('foo', 'a') + props.add('foo', 'b') + props.remove('foo', 'a') + assert props['foo'] == 'b' + + def test_view_abcs(self, props): + assert isinstance(props.keys(), collections.abc.KeysView) + assert isinstance(props.values(), collections.abc.ValuesView) + assert isinstance(props.items(), collections.abc.ItemsView) + + def test_pop(self, props): + props.add('foo', 'a') + props.add('foo', 'b') + val = props.pop('foo') + assert val == 'a' + + def test_pop_default(self, props): + with pytest.raises(KeyError): + props.pop('foo') + assert props.pop('foo', 'default') == 'default' + + def test_popitem(self, props): + props.add('foo', 'a') + assert props.popitem() == ('foo', 'a') + with pytest.raises(KeyError): + props.popitem() + + def test_clear(self, props): + props.add('foo', 'a') + props.clear() + assert len(props) == 0 + + def test_getall(self, props): + props.add('foo', 'a') + assert set(props.getall('foo')) == {('foo', 'a')} + + def test_getall_prefix(self, props): + props.add('foo', 'a') + props.add('foobar', 'b') + assert set(props.getall('foo')) == {('foo', 'a'), ('foobar', 'b')} + + def test_getall_exact(self, props): + props.add('foo', 'a') + props.add('foobar', 'b') + assert set(props.getall('foo', exact=True)) == {('foo', 'a')} diff --git a/bindings/python-cffi/tests/test_tags.py b/bindings/python-cffi/tests/test_tags.py new file mode 100644 index 00000000..f2c6209d --- /dev/null +++ b/bindings/python-cffi/tests/test_tags.py @@ -0,0 +1,242 @@ +"""Tests for the behaviour of immutable and mutable tagsets. + +This module tests the Pythonic behaviour of the sets. +""" + +import collections +import subprocess +import textwrap + +import pytest + +from notmuch2 import _database as database +from notmuch2 import _tags as tags + + +class TestImmutable: + + @pytest.fixture + def tagset(self, maildir, notmuch): + """An non-empty immutable tagset. + + This will have the default new mail tags: inbox, unread. + """ + maildir.deliver() + notmuch('new') + with database.Database(maildir.path, config=database.Database.CONFIG.EMPTY) as db: + yield db.tags + + def test_type(self, tagset): + assert isinstance(tagset, tags.ImmutableTagSet) + assert isinstance(tagset, collections.abc.Set) + + def test_hash(self, tagset, maildir, notmuch): + h0 = hash(tagset) + notmuch('tag', '+foo', '*') + with database.Database(maildir.path, config=database.Database.CONFIG.EMPTY) as db: + h1 = hash(db.tags) + assert h0 != h1 + + def test_eq(self, tagset): + assert tagset == tagset + + def test_neq(self, tagset, maildir, notmuch): + notmuch('tag', '+foo', '*') + with database.Database(maildir.path, config=database.Database.CONFIG.EMPTY) as db: + assert tagset != db.tags + + def test_contains(self, tagset): + print(tuple(tagset)) + assert 'unread' in tagset + assert 'foo' not in tagset + + def test_isdisjoint(self, tagset): + assert tagset.isdisjoint(set(['spam', 'ham'])) + assert not tagset.isdisjoint(set(['inbox'])) + + def test_issubset(self, tagset): + assert {'inbox'} <= tagset + assert {'inbox'}.issubset(tagset) + assert tagset <= {'inbox', 'unread', 'spam'} + assert tagset.issubset({'inbox', 'unread', 'spam'}) + + def test_issuperset(self, tagset): + assert {'inbox', 'unread', 'spam'} >= tagset + assert {'inbox', 'unread', 'spam'}.issuperset(tagset) + assert tagset >= {'inbox'} + assert tagset.issuperset({'inbox'}) + + def test_iter(self, tagset): + expected = sorted(['unread', 'inbox']) + found = [] + for tag in tagset: + assert isinstance(tag, str) + found.append(tag) + assert expected == sorted(found) + + def test_special_iter(self, tagset): + expected = sorted([b'unread', b'inbox']) + found = [] + for tag in tagset.iter(): + assert isinstance(tag, bytes) + found.append(tag) + assert expected == sorted(found) + + def test_special_iter_codec(self, tagset): + for tag in tagset.iter(encoding='ascii', errors='surrogateescape'): + assert isinstance(tag, str) + + def test_len(self, tagset): + assert len(tagset) == 2 + + def test_and(self, tagset): + common = tagset & {'unread'} + assert isinstance(common, set) + assert isinstance(common, collections.abc.Set) + assert common == {'unread'} + common = tagset.intersection({'unread'}) + assert isinstance(common, set) + assert isinstance(common, collections.abc.Set) + assert common == {'unread'} + + def test_or(self, tagset): + res = tagset | {'foo'} + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'unread', 'inbox', 'foo'} + res = tagset.union({'foo'}) + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'unread', 'inbox', 'foo'} + + def test_sub(self, tagset): + res = tagset - {'unread'} + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'inbox'} + res = tagset.difference({'unread'}) + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'inbox'} + + def test_rsub(self, tagset): + res = {'foo', 'unread'} - tagset + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'foo'} + + def test_xor(self, tagset): + res = tagset ^ {'unread', 'foo'} + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'inbox', 'foo'} + res = tagset.symmetric_difference({'unread', 'foo'}) + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'inbox', 'foo'} + + def test_rxor(self, tagset): + res = {'unread', 'foo'} ^ tagset + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'inbox', 'foo'} + + def test_copy(self, tagset): + res = tagset.copy() + assert isinstance(res, set) + assert isinstance(res, collections.abc.Set) + assert res == {'inbox', 'unread'} + + +class TestMutableTagset: + + @pytest.fixture + def tagset(self, maildir, notmuch): + """An non-empty mutable tagset. + + This will have the default new mail tags: inbox, unread. + """ + _, pathname = maildir.deliver() + notmuch('new') + with database.Database(maildir.path, + mode=database.Mode.READ_WRITE, + config=database.Database.CONFIG.EMPTY) as db: + msg = db.get(pathname) + yield msg.tags + + def test_type(self, tagset): + assert isinstance(tagset, collections.abc.MutableSet) + assert isinstance(tagset, tags.MutableTagSet) + + def test_hash(self, tagset): + assert not isinstance(tagset, collections.abc.Hashable) + with pytest.raises(TypeError): + hash(tagset) + + def test_add(self, tagset): + assert 'foo' not in tagset + tagset.add('foo') + assert 'foo' in tagset + + def test_discard(self, tagset): + assert 'inbox' in tagset + tagset.discard('inbox') + assert 'inbox' not in tagset + + def test_discard_not_present(self, tagset): + assert 'foo' not in tagset + tagset.discard('foo') + + def test_clear(self, tagset): + assert len(tagset) > 0 + tagset.clear() + assert len(tagset) == 0 + + def test_from_maildir_flags(self, maildir, notmuch): + _, pathname = maildir.deliver(flagged=True) + notmuch('new') + with database.Database(maildir.path, + mode=database.Mode.READ_WRITE, + config=database.Database.CONFIG.EMPTY) as db: + msg = db.get(pathname) + msg.tags.discard('flagged') + msg.tags.from_maildir_flags() + assert 'flagged' in msg.tags + + def test_to_maildir_flags(self, maildir, notmuch): + _, pathname = maildir.deliver(flagged=True) + notmuch('new') + with database.Database(maildir.path, + mode=database.Mode.READ_WRITE, + config=database.Database.CONFIG.EMPTY) as db: + msg = db.get(pathname) + flags = msg.path.name.split(',')[-1] + assert 'F' in flags + msg.tags.discard('flagged') + msg.tags.to_maildir_flags() + flags = msg.path.name.split(',')[-1] + assert 'F' not in flags + + def test_isdisjoint(self, tagset): + assert tagset.isdisjoint(set(['spam', 'ham'])) + assert not tagset.isdisjoint(set(['inbox'])) + + def test_issubset(self, tagset): + assert {'inbox'} <= tagset + assert {'inbox'}.issubset(tagset) + assert not {'spam'} <= tagset + assert not {'spam'}.issubset(tagset) + assert tagset <= {'inbox', 'unread', 'spam'} + assert tagset.issubset({'inbox', 'unread', 'spam'}) + assert not {'inbox', 'unread', 'spam'} <= tagset + assert not {'inbox', 'unread', 'spam'}.issubset(tagset) + + def test_issuperset(self, tagset): + assert {'inbox', 'unread', 'spam'} >= tagset + assert {'inbox', 'unread', 'spam'}.issuperset(tagset) + assert tagset >= {'inbox'} + assert tagset.issuperset({'inbox'}) + + def test_union(self, tagset): + assert {'spam'}.union(tagset) == {'inbox', 'unread', 'spam'} + assert tagset.union({'spam'}) == {'inbox', 'unread', 'spam'} diff --git a/bindings/python-cffi/tests/test_thread.py b/bindings/python-cffi/tests/test_thread.py new file mode 100644 index 00000000..619d2aac --- /dev/null +++ b/bindings/python-cffi/tests/test_thread.py @@ -0,0 +1,109 @@ +import collections.abc +import time + +import pytest + +import notmuch2 + + +@pytest.fixture +def thread(maildir, notmuch): + """Return a single thread with one matched message.""" + msgid, _ = maildir.deliver(body='foo') + maildir.deliver(body='bar', + headers=[('In-Reply-To', '<{}>'.format(msgid))]) + notmuch('new') + with notmuch2.Database(maildir.path, config=notmuch2.Database.CONFIG.EMPTY) as db: + yield next(db.threads('foo')) + + +def test_type(thread): + assert isinstance(thread, notmuch2.Thread) + assert isinstance(thread, collections.abc.Iterable) + + +def test_threadid(thread): + assert isinstance(thread.threadid, notmuch2.BinString) + assert thread.threadid + + +def test_len(thread): + assert len(thread) == 2 + + +def test_toplevel_type(thread): + assert isinstance(thread.toplevel(), collections.abc.Iterator) + + +def test_toplevel(thread): + msgs = thread.toplevel() + assert isinstance(next(msgs), notmuch2.Message) + with pytest.raises(StopIteration): + next(msgs) + + +def test_toplevel_reply(thread): + msg = next(thread.toplevel()) + assert isinstance(next(msg.replies()), notmuch2.Message) + + +def test_iter(thread): + msgs = list(iter(thread)) + assert len(msgs) == len(thread) + for msg in msgs: + assert isinstance(msg, notmuch2.Message) + + +def test_matched(thread): + assert thread.matched == 1 + +def test_matched_iter(thread): + count = 0 + msgs = list(iter(thread)) + for msg in msgs: + if msg.matched: + count += 1 + assert count == thread.matched + +def test_authors_type(thread): + assert isinstance(thread.authors, notmuch2.BinString) + + +def test_authors(thread): + assert thread.authors == 'src@example.com' + + +def test_subject(thread): + assert thread.subject == 'Test mail' + + +def test_first(thread): + # XXX Someone seems to treat things as local time instead of + # UTC or the other way around. + now = int(time.time()) + assert abs(now - thread.first) < 3600*24 + + +def test_last(thread): + # XXX Someone seems to treat things as local time instead of + # UTC or the other way around. + now = int(time.time()) + assert abs(now - thread.last) < 3600*24 + + +def test_first_last(thread): + # Sadly we only have second resolution so these will always be the + # same time in our tests. + assert thread.first <= thread.last + + +def test_tags_type(thread): + assert isinstance(thread.tags, notmuch2.ImmutableTagSet) + + +def test_tags_cache(thread): + assert thread.tags is thread.tags + + +def test_tags(thread): + assert 'inbox' in thread.tags |
