import pytest
-import notdb
-import notdb._errors as errors
-import notdb._database as dbmod
-import notdb._message as message
+import notmuch2
+import notmuch2._errors as errors
+import notmuch2._database as dbmod
+import notmuch2._message as message
@pytest.fixture
def db(maildir):
- with dbmod.Database.create(maildir.path) as db:
+ with dbmod.Database.create(maildir.path, config=notmuch2.Database.CONFIG.EMPTY) as db:
yield db
db.create(tmppath)
def test_create_existing(self, tmppath, db):
- with pytest.raises(errors.FileError):
+ with pytest.raises(errors.DatabaseExistsError):
dbmod.Database.create(path=tmppath)
def test_close(self, db):
with pytest.raises(errors.UnbalancedAtomicError):
ctx.force_end()
+ def test_abort(self, db):
+ with db.atomic() as txn:
+ txn.abort()
+ assert db.closed
+
class TestRevision:
@pytest.fixture
def db(self, maildir, notmuch):
- """Return a read-only notdb.Database.
+ """Return a read-only notmuch2.Database.
The database will have 3 messages, 2 threads.
"""
maildir.deliver(body='baz',
headers=[('In-Reply-To', '<{}>'.format(msgid))])
notmuch('new')
- with dbmod.Database(maildir.path, 'rw') as db:
+ with dbmod.Database(maildir.path, 'rw', config=notmuch2.Database.CONFIG.EMPTY) as db:
yield db
def test_count_messages(self, db):
msgs = db.messages('*')
assert isinstance(msgs, collections.abc.Iterator)
+ def test_messages_iterator(self, db):
+ for msg in db.messages('*'):
+ assert isinstance(msg, notmuch2.Message)
+ assert isinstance(msg.messageid, str)
+
+ def test_messages_iterator_list(self, db):
+ msgs = list(db.messages('*'))
+ assert len(msgs) == 3
+ for msg in msgs:
+ assert isinstance(msg, notmuch2.Message)
+ assert isinstance(msg.messageid, str)
+
def test_message_no_results(self, db):
msgs = db.messages('not_a_matching_query')
with pytest.raises(StopIteration):
def test_message_match(self, db):
msgs = db.messages('*')
msg = next(msgs)
- assert isinstance(msg, notdb.Message)
+ assert isinstance(msg, notmuch2.Message)
def test_count_threads(self, db):
assert db.count_threads('*') == 2
threads = db.threads('*')
assert isinstance(threads, collections.abc.Iterator)
+ def test_threads_iterator(self, db):
+ for t in db.threads('*'):
+ assert isinstance(t, notmuch2.Thread)
+ assert isinstance(t.threadid, str)
+ for msg in t:
+ assert isinstance(msg, notmuch2.Message)
+ assert isinstance(msg.messageid, str)
+
+ def test_threads_iterator_list(self, db):
+ threads = list(db.threads('*'))
+ assert len(threads) == 2
+ for t in threads:
+ assert isinstance(t, notmuch2.Thread)
+ assert isinstance(t.threadid, str)
+ msgs = list(t)
+ for msg in msgs:
+ assert isinstance(msg, notmuch2.Message)
+ assert isinstance(msg.messageid, str)
+
def test_threads_no_match(self, db):
threads = db.threads('not_a_matching_query')
with pytest.raises(StopIteration):
def test_threads_match(self, db):
threads = db.threads('*')
thread = next(threads)
- assert isinstance(thread, notdb.Thread)
+ assert isinstance(thread, notmuch2.Thread)
+
+ def test_use_threaded_message_twice(self, db):
+ thread = next(db.threads('*'))
+ for msg in thread.toplevel():
+ assert isinstance(msg, notmuch2.Message)
+ assert msg.alive
+ del msg
+ for msg in thread:
+ assert isinstance(msg, notmuch2.Message)
+ assert msg.alive
+ del msg