]> git.notmuchmail.org Git - notmuch/blobdiff - bindings/python-cffi/tests/test_database.py
Introduce CFFI-based python bindings
[notmuch] / bindings / python-cffi / tests / test_database.py
diff --git a/bindings/python-cffi/tests/test_database.py b/bindings/python-cffi/tests/test_database.py
new file mode 100644 (file)
index 0000000..02de0f4
--- /dev/null
@@ -0,0 +1,326 @@
+import collections
+import configparser
+import os
+import pathlib
+
+import pytest
+
+import notdb
+import notdb._errors as errors
+import notdb._database as dbmod
+import notdb._message as message
+
+
+@pytest.fixture
+def db(maildir):
+    with dbmod.Database.create(maildir.path) as db:
+        yield db
+
+
+class TestDefaultDb:
+    """Tests for reading the default database.
+
+    The error cases are fairly undefined, some relevant Python error
+    will come out if you give it a bad filename or if the file does
+    not parse correctly.  So we're not testing this too deeply.
+    """
+
+    def test_config_pathname_default(self, monkeypatch):
+        monkeypatch.delenv('NOTMUCH_CONFIG', raising=False)
+        user = pathlib.Path('~/.notmuch-config').expanduser()
+        assert dbmod._config_pathname() == user
+
+    def test_config_pathname_env(self, monkeypatch):
+        monkeypatch.setenv('NOTMUCH_CONFIG', '/some/random/path')
+        assert dbmod._config_pathname() == pathlib.Path('/some/random/path')
+
+    def test_default_path_nocfg(self, monkeypatch, tmppath):
+        monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath/'foo'))
+        with pytest.raises(FileNotFoundError):
+            dbmod.Database.default_path()
+
+    def test_default_path_cfg_is_dir(self, monkeypatch, tmppath):
+        monkeypatch.setenv('NOTMUCH_CONFIG', str(tmppath))
+        with pytest.raises(IsADirectoryError):
+            dbmod.Database.default_path()
+
+    def test_default_path_parseerr(self, monkeypatch, tmppath):
+        cfg = tmppath / 'notmuch-config'
+        with cfg.open('w') as fp:
+            fp.write('invalid')
+        monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
+        with pytest.raises(configparser.Error):
+            dbmod.Database.default_path()
+
+    def test_default_path_parse(self, monkeypatch, tmppath):
+        cfg = tmppath / 'notmuch-config'
+        with cfg.open('w') as fp:
+            fp.write('[database]\n')
+            fp.write('path={!s}'.format(tmppath))
+        monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg))
+        assert dbmod.Database.default_path() == tmppath
+
+    def test_default_path_param(self, monkeypatch, tmppath):
+        cfg_dummy = tmppath / 'dummy'
+        monkeypatch.setenv('NOTMUCH_CONFIG', str(cfg_dummy))
+        cfg_real = tmppath / 'notmuch_config'
+        with cfg_real.open('w') as fp:
+            fp.write('[database]\n')
+            fp.write('path={!s}'.format(cfg_real/'mail'))
+        assert dbmod.Database.default_path(cfg_real) == cfg_real/'mail'
+
+
+class TestCreate:
+
+    def test_create(self, tmppath, db):
+        assert tmppath.joinpath('.notmuch/xapian/').exists()
+
+    def test_create_already_open(self, tmppath, db):
+        with pytest.raises(errors.NotmuchError):
+            db.create(tmppath)
+
+    def test_create_existing(self, tmppath, db):
+        with pytest.raises(errors.FileError):
+            dbmod.Database.create(path=tmppath)
+
+    def test_close(self, db):
+        db.close()
+
+    def test_del_noclose(self, db):
+        del db
+
+    def test_close_del(self, db):
+        db.close()
+        del db
+
+    def test_closed_attr(self, db):
+        assert not db.closed
+        db.close()
+        assert db.closed
+
+    def test_ctx(self, db):
+        with db as ctx:
+            assert ctx is db
+            assert not db.closed
+        assert db.closed
+
+    def test_path(self, db, tmppath):
+        assert db.path == tmppath
+
+    def test_version(self, db):
+        assert db.version > 0
+
+    def test_needs_upgrade(self, db):
+        assert db.needs_upgrade in (True, False)
+
+
+class TestAtomic:
+
+    def test_exit_early(self, db):
+        with pytest.raises(errors.UnbalancedAtomicError):
+            with db.atomic() as ctx:
+                ctx.force_end()
+
+    def test_exit_late(self, db):
+        with db.atomic() as ctx:
+            pass
+        with pytest.raises(errors.UnbalancedAtomicError):
+            ctx.force_end()
+
+
+class TestRevision:
+
+    def test_single_rev(self, db):
+        r = db.revision()
+        assert isinstance(r, dbmod.DbRevision)
+        assert isinstance(r.rev, int)
+        assert isinstance(r.uuid, bytes)
+        assert r is r
+        assert r == r
+        assert r <= r
+        assert r >= r
+        assert not r < r
+        assert not r > r
+
+    def test_diff_db(self, tmppath):
+        dbpath0 = tmppath.joinpath('db0')
+        dbpath0.mkdir()
+        dbpath1 = tmppath.joinpath('db1')
+        dbpath1.mkdir()
+        db0 = dbmod.Database.create(path=dbpath0)
+        db1 = dbmod.Database.create(path=dbpath1)
+        r_db0 = db0.revision()
+        r_db1 = db1.revision()
+        assert r_db0 != r_db1
+        assert r_db0.uuid != r_db1.uuid
+
+    def test_cmp(self, db, maildir):
+        rev0 = db.revision()
+        _, pathname = maildir.deliver()
+        db.add(pathname, sync_flags=False)
+        rev1 = db.revision()
+        assert rev0 < rev1
+        assert rev0 <= rev1
+        assert not rev0 > rev1
+        assert not rev0 >= rev1
+        assert not rev0 == rev1
+        assert rev0 != rev1
+
+    # XXX add tests for revisions comparisons
+
+class TestMessages:
+
+    def test_add_message(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        msg, dup = db.add(pathname, sync_flags=False)
+        assert isinstance(msg, message.Message)
+        assert msg.path == pathname
+        assert msg.messageid == msgid
+
+    def test_add_message_str(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        msg, dup = db.add(str(pathname), sync_flags=False)
+
+    def test_add_message_bytes(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        msg, dup = db.add(os.fsencode(bytes(pathname)), sync_flags=False)
+
+    def test_remove_message(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        msg, dup = db.add(pathname, sync_flags=False)
+        assert db.find(msgid)
+        dup = db.remove(pathname)
+        with pytest.raises(LookupError):
+            db.find(msgid)
+
+    def test_remove_message_str(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        msg, dup = db.add(pathname, sync_flags=False)
+        assert db.find(msgid)
+        dup = db.remove(str(pathname))
+        with pytest.raises(LookupError):
+            db.find(msgid)
+
+    def test_remove_message_bytes(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        msg, dup = db.add(pathname, sync_flags=False)
+        assert db.find(msgid)
+        dup = db.remove(os.fsencode(bytes(pathname)))
+        with pytest.raises(LookupError):
+            db.find(msgid)
+
+    def test_find_message(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        msg0, dup = db.add(pathname, sync_flags=False)
+        msg1 = db.find(msgid)
+        assert isinstance(msg1, message.Message)
+        assert msg1.messageid == msgid == msg0.messageid
+        assert msg1.path == pathname == msg0.path
+
+    def test_find_message_notfound(self, db):
+        with pytest.raises(LookupError):
+            db.find('foo')
+
+    def test_get_message(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        msg0, _ = db.add(pathname, sync_flags=False)
+        msg1 = db.get(pathname)
+        assert isinstance(msg1, message.Message)
+        assert msg1.messageid == msgid == msg0.messageid
+        assert msg1.path == pathname == msg0.path
+
+    def test_get_message_str(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        db.add(pathname, sync_flags=False)
+        msg = db.get(str(pathname))
+        assert msg.messageid == msgid
+
+    def test_get_message_bytes(self, db, maildir):
+        msgid, pathname = maildir.deliver()
+        db.add(pathname, sync_flags=False)
+        msg = db.get(os.fsencode(bytes(pathname)))
+        assert msg.messageid == msgid
+
+
+class TestTags:
+    # We just want to test this behaves like a set at a hight level.
+    # The set semantics are tested in detail in the test_tags module.
+
+    def test_type(self, db):
+        assert isinstance(db.tags, collections.abc.Set)
+
+    def test_none(self, db):
+        itags = iter(db.tags)
+        with pytest.raises(StopIteration):
+            next(itags)
+        assert len(db.tags) == 0
+        assert not db.tags
+
+    def test_some(self, db, maildir):
+        _, pathname = maildir.deliver()
+        msg, _ = db.add(pathname, sync_flags=False)
+        msg.tags.add('hello')
+        itags = iter(db.tags)
+        assert next(itags) == 'hello'
+        with pytest.raises(StopIteration):
+            next(itags)
+        assert 'hello' in msg.tags
+
+    def test_cache(self, db):
+        assert db.tags is db.tags
+
+    def test_iters(self, db):
+        i1 = iter(db.tags)
+        i2 = iter(db.tags)
+        assert i1 is not i2
+
+
+class TestQuery:
+
+    @pytest.fixture
+    def db(self, maildir, notmuch):
+        """Return a read-only notdb.Database.
+
+        The database will have 3 messages, 2 threads.
+        """
+        msgid, _ = maildir.deliver(body='foo')
+        maildir.deliver(body='bar')
+        maildir.deliver(body='baz',
+                        headers=[('In-Reply-To', '<{}>'.format(msgid))])
+        notmuch('new')
+        with dbmod.Database(maildir.path, 'rw') as db:
+            yield db
+
+    def test_count_messages(self, db):
+        assert db.count_messages('*') == 3
+
+    def test_messages_type(self, db):
+        msgs = db.messages('*')
+        assert isinstance(msgs, collections.abc.Iterator)
+
+    def test_message_no_results(self, db):
+        msgs = db.messages('not_a_matching_query')
+        with pytest.raises(StopIteration):
+            next(msgs)
+
+    def test_message_match(self, db):
+        msgs = db.messages('*')
+        msg = next(msgs)
+        assert isinstance(msg, notdb.Message)
+
+    def test_count_threads(self, db):
+        assert db.count_threads('*') == 2
+
+    def test_threads_type(self, db):
+        threads = db.threads('*')
+        assert isinstance(threads, collections.abc.Iterator)
+
+    def test_threads_no_match(self, db):
+        threads = db.threads('not_a_matching_query')
+        with pytest.raises(StopIteration):
+            next(threads)
+
+    def test_threads_match(self, db):
+        threads = db.threads('*')
+        thread = next(threads)
+        assert isinstance(thread, notdb.Thread)