From 906ab35e87c099d411ffb53ac7602e55c23b08ea Mon Sep 17 00:00:00 2001 From: William Morgan Date: Tue, 11 Aug 2009 15:34:50 -0400 Subject: [PATCH] refactor index access into three methods and rewrite PollManager#each_message_from Couple big changes in this commit, but they're all tied together. Index.sync_message is refactored into three separate methods: add_message, update_message and update_message_state. The intention is that add_message is called for new messages only, update_message is called for changing the method body on disk (e.g. when we see multiple copies of the same message, or by DraftManager when the text of a draft is changes), and update_message_state is called when the labels on a message change. So indexes that differentiate those operations can exhibit more natural performance characteristics. Also, PollManager.add_messages_from has been renamed to each_message_from and changed significantly. It now *only* yields successive messages; it does not load the index version of the message, and it does not auto-add the message to the index. (In fact, it ignores the result of the block.) There's also a new method called add_new_message that calls Index.add_message and then relays the update to other GUI elements. There was a lot of refactoring of sup-sync that was part of this. Probably not strictly necessary but it was too hard to untangle the changes. --- bin/sup-sync | 156 ++++++++++++++++------------- bin/sup-sync-back | 14 ++- bin/sup-tweak-labels | 2 +- lib/sup/draft.rb | 9 +- lib/sup/ferret_index.rb | 5 + lib/sup/index.rb | 9 +- lib/sup/message.rb | 4 +- lib/sup/modes/thread-index-mode.rb | 2 +- lib/sup/poll.rb | 69 +++++++------ lib/sup/sent.rb | 6 +- lib/sup/thread.rb | 2 +- lib/sup/xapian_index.rb | 5 + 12 files changed, 156 insertions(+), 127 deletions(-) diff --git a/bin/sup-sync b/bin/sup-sync index f233072..a8cb768 100755 --- a/bin/sup-sync +++ b/bin/sup-sync @@ -21,6 +21,10 @@ class Numeric end end +class Set + def to_s; to_a * ',' end +end + def time startt = Time.now yield @@ -54,7 +58,7 @@ by running "sup-add --help". Options controlling WHICH messages sup-sync operates on: EOS opt :new, "Operate on new messages only. Don't scan over the entire source. (Default.)", :short => :none - opt :changed, "Scan over the entire source for messages that have been deleted, altered, or moved from another source. (In the case of mbox sources, this includes all messages AFTER an altered message.)" + opt :changed, "Scan over the entire source for messages that have been deleted, altered, or moved from another source." opt :restored, "Operate only on those messages included in a dump file as specified by --restore which have changed state." opt :all, "Operate on all messages in the source, regardless of newness or changedness." opt :start_at, "For --changed, --restored and --all, start at a particular offset.", :type => :int @@ -68,7 +72,7 @@ EOS opt :discard, "Discard any message state in the index and use the default source state. Dangerous!", :short => :none opt :archive, "When using the default source state, mark messages as archived.", :short => "-x" opt :read, "When using the default source state, mark messages as read." - opt :extra_labels, "When using the default source state, also apply these user-defined labels. Should be a comma-separated list.", :type => String, :short => :none + opt :extra_labels, "When using the default source state, also apply these user-defined labels (a comma-separated list)", :default => "", :short => :none text < true do |m_old, m, offset| + Redwood::PollManager.each_message_from source do |m| num_scanned += 1 seen[m.id] = true - - if Time.now - last_info_time > PROGRESS_UPDATE_INTERVAL - last_info_time = Time.now - elapsed = last_info_time - start_time - start = opts[:start_at] || source.start_offset - pctdone = 100.0 * (source.cur_offset - start).to_f / (source.end_offset - start).to_f - remaining = (100.0 - pctdone) * (elapsed.to_f / pctdone) - $stderr.printf "## read %dm (about %.0f%%) @ %.1fm/s. %s elapsed, about %s remaining\n", num_scanned, pctdone, num_scanned / elapsed, elapsed.to_time_s, remaining.to_time_s + old_m = index.build_message m.id + + case target + when :changed + ## skip this message if we're operating only on changed messages, the + ## message is in the index, and it's unchanged from what the source is + ## reporting. + next if old_m && old_m.source.id == m.source.id && old_m.source_info == m.source_info + when :restored + ## skip if we're operating on restored messages, and this one + ## ain't (or we wouldn't be making a change) + next unless old_m && restored_state[m.id] && restored_state[m.id] != old_m.labels + when :new + ## nothing to do; we'll consider all messages starting at the start offset, which + ## hasn't been changed. + when :all + ## nothing to do; we'll consider all messages starting at the start offset, which + ## was reset to the beginning above. end - ## skip if we're operating only on changed messages, the message - ## is in the index, and it's unchanged from what the source is - ## reporting. - next if target == :changed && m_old && m_old.source.id == source.id && m_old.source_info == offset - - ## get the state currently in the index - index_state = m_old.labels.dup if m_old - - ## skip if we're operating on restored messages, and this one - ## ain't (or we wouldn't be making a change) - next if target == :restored && (!restored_state[m.id] || !index_state || restored_state[m.id] == index_state) - - ## m.labels is the default source labels. tweak these according - ## to default source state modification flags. + ## tweak source labels according to commandline arguments if necessary m.labels.delete :inbox if opts[:archive] m.labels.delete :unread if opts[:read] - m.labels += opts[:extra_labels].to_set_of_symbols(",") if opts[:extra_labels] - - ## assign message labels based on the operation we're performing - case op - when :asis + m.labels += opts[:extra_labels].to_set_of_symbols(",") + + ## decide what to do based on message labels and the operation we're performing + dothis, new_labels = case + when (op == :restore) && restored_state[m.id] && old_m && (old_m.labels != restored_state[m.id]) + [:update_message_state, restored_state[m.id]] + when op == :discard + if old_m && (old_m.labels != m.labels) + [:update_message_state, m.labels] + else + # don't do anything + end + else ## duplicate behavior of poll mode: if index_state is non-nil, this is a newer ## version of an older message, so merge in any new labels except :unread and ## :inbox. - m.labels = ((m.labels - [:unread, :inbox]) + index_state) if index_state - when :restore - ## if the entry exists on disk - if restored_state[m.id] - m.labels = restored_state[m.id] - num_restored += 1 - elsif index_state - m.labels = index_state + ## + ## TODO: refactor such that this isn't duplicated + if old_m + m.labels = old_m.labels + (m.labels - [:unread, :inbox]) + :update_message + else + :add_message end - when :discard - ## nothin! use default source labels + end + + ## now, actually do the operation + case dothis + when :add_message + $stderr.puts "Adding new message #{source}###{m.source_info} with labels #{m.labels}" if opts[:verbose] + index.add_message m unless opts[:dry_run] + num_added += 1 + when :update_message + $stderr.puts "Updating message #{source}###{m.source_info}; labels #{old_m.labels} => #{m.labels}; offset #{old_m.source_info} => #{m.source_info}" if opts[:verbose] + index.update_message m unless opts[:dry_run] + num_updated += 1 + when :update_message_state + $stderr.puts "Changing flags for #{source}##{m.source_info} from #{m.labels} to #{new_labels}" + m.labels = new_labels + index.update_message_state m unless opts[:dry_run] + num_updated += 1 end if Time.now - last_info_time > PROGRESS_UPDATE_INTERVAL @@ -194,17 +221,8 @@ begin remaining = (100.0 - pctdone) * (elapsed.to_f / pctdone) $stderr.printf "## read %dm (about %.0f%%) @ %.1fm/s. %s elapsed, about %s remaining\n", num_scanned, pctdone, num_scanned / elapsed, elapsed.to_time_s, remaining.to_time_s end - - if index_state.nil? - puts "Adding message #{source}##{offset} from #{m.from} with state {#{m.labels.to_a * ', '}}" if opts[:verbose] - num_added += 1 - else - puts "Updating message #{source}##{offset}, source #{m_old.source.id} => #{source.id}, offset #{m_old.source_info} => #{offset}, state {#{index_state.to_a * ', '}} => {#{m.labels.to_a * ', '}}" if opts[:verbose] - num_updated += 1 - end - - opts[:dry_run] ? nil : m end + $stderr.puts "Scanned #{num_scanned}, added #{num_added}, updated #{num_updated} messages from #{source}." $stderr.puts "Restored state on #{num_restored} (#{100.0 * num_restored / num_scanned}%) messages." if num_restored > 0 end diff --git a/bin/sup-sync-back b/bin/sup-sync-back index 56ac4eb..adadcb3 100755 --- a/bin/sup-sync-back +++ b/bin/sup-sync-back @@ -110,28 +110,28 @@ EOS num_dropped = num_moved = num_scanned = 0 out_fp = Tempfile.new "sup-sync-back-#{source.id}" - Redwood::PollManager.add_messages_from source do |m_old, m, offset| + Redwood::PollManager.each_message_from source do |m| num_scanned += 1 - if m_old + if(m_old = index.build_message(m.id)) labels = m_old.labels if labels.member? :deleted if opts[:drop_deleted] - puts "Dropping deleted message #{source}##{offset}" if opts[:verbose] + puts "Dropping deleted message #{source}##{m.source_info}" if opts[:verbose] num_dropped += 1 elsif opts[:move_deleted] && labels.member?(:deleted) - puts "Moving deleted message #{source}##{offset}" if opts[:verbose] + puts "Moving deleted message #{source}##{m.source_info}" if opts[:verbose] save m, deleted_fp unless opts[:dry_run] num_moved += 1 end elsif labels.member? :spam if opts[:drop_spam] - puts "Dropping spam message #{source}##{offset}" if opts[:verbose] + puts "Dropping spam message #{source}##{m.source_info}" if opts[:verbose] num_dropped += 1 elsif opts[:move_spam] && labels.member?(:spam) - puts "Moving spam message #{source}##{offset}" if opts[:verbose] + puts "Moving spam message #{source}##{m.source_info}" if opts[:verbose] save m, spam_fp unless opts[:dry_run] num_moved += 1 end @@ -141,8 +141,6 @@ EOS else save m, out_fp unless opts[:dry_run] end - - nil # don't actually add anything! end $stderr.puts "Scanned #{num_scanned}, dropped #{num_dropped}, moved #{num_moved} messages from #{source}." modified_sources << source if num_dropped > 0 || num_moved > 0 diff --git a/bin/sup-tweak-labels b/bin/sup-tweak-labels index 905aac2..b2c6b1d 100755 --- a/bin/sup-tweak-labels +++ b/bin/sup-tweak-labels @@ -105,7 +105,7 @@ begin puts "From #{m.from}, subject: #{m.subj}" if opts[:very_verbose] puts "#{m.id}: {#{old_labels.to_a.join ','}} => {#{m.labels.to_a.join ','}}" if opts[:verbose] puts if opts[:very_verbose] - index.sync_message m unless opts[:dry_run] + index.update_message_state m unless opts[:dry_run] end if Time.now - last_info_time > 60 diff --git a/lib/sup/draft.rb b/lib/sup/draft.rb index dd4574d..ce8f064 100644 --- a/lib/sup/draft.rb +++ b/lib/sup/draft.rb @@ -20,12 +20,9 @@ class DraftManager File.open(fn, "w") { |f| yield f } my_message = nil - @source.each do |thisoffset, theselabels| - m = Message.build_from_source @source, thisoffset - m.labels = theselabels - Index.sync_message m - UpdateManager.relay self, :added, m - my_message = m if thisoffset == offset + PollManager.each_message_from(@source) do |m| + PollManager.add_new_message m + my_message = m end my_message diff --git a/lib/sup/ferret_index.rb b/lib/sup/ferret_index.rb index 546faf8..3655d7a 100644 --- a/lib/sup/ferret_index.rb +++ b/lib/sup/ferret_index.rb @@ -45,6 +45,10 @@ class FerretIndex < BaseIndex end end + def add_message m; sync_message m end + def update_message m; sync_message m end + def update_message_state m; sync_message m end + def sync_message m, opts={} entry = @index[m.id] @@ -125,6 +129,7 @@ class FerretIndex < BaseIndex @index.add_document d end end + private :sync_message def save_index fn=File.join(@dir, "ferret") # don't have to do anything, apparently diff --git a/lib/sup/index.rb b/lib/sup/index.rb index fb46eb0..122026a 100644 --- a/lib/sup/index.rb +++ b/lib/sup/index.rb @@ -113,12 +113,9 @@ EOS unimplemented end - ## Syncs the message to the index, replacing any previous version. adding - ## either way. Index state will be determined by the message's #labels - ## accessor. - def sync_message m, opts={} - unimplemented - end + def add_message m; unimplemented end + def update_message m; unimplemented end + def update_message_state m; unimplemented end def save_index fn unimplemented diff --git a/lib/sup/message.rb b/lib/sup/message.rb index 3b10744..1e9c659 100644 --- a/lib/sup/message.rb +++ b/lib/sup/message.rb @@ -157,9 +157,9 @@ class Message ## don't tempt me. def sanitize_message_id mid; mid.gsub(/(\s|[^\000-\177])+/, "")[0..254] end - def save index + def save_state index return unless @dirty - index.sync_message self + index.update_message_state self @dirty = false true end diff --git a/lib/sup/modes/thread-index-mode.rb b/lib/sup/modes/thread-index-mode.rb index 905ad98..fb6b2ce 100644 --- a/lib/sup/modes/thread-index-mode.rb +++ b/lib/sup/modes/thread-index-mode.rb @@ -477,7 +477,7 @@ EOS BufferManager.say("Saving threads...") do |say_id| dirty_threads.each_with_index do |t, i| BufferManager.say "Saving modified thread #{i + 1} of #{dirty_threads.length}...", say_id - t.save Index + t.save_state Index end end end diff --git a/lib/sup/poll.rb b/lib/sup/poll.rb index 0c46d2f..0c8f51d 100644 --- a/lib/sup/poll.rb +++ b/lib/sup/poll.rb @@ -95,11 +95,24 @@ EOS num = 0 numi = 0 - add_messages_from source do |m_old, m, offset| - ## always preserve the labels on disk. - m.labels = (m.labels - [:unread, :inbox]) + m_old.labels if m_old - yield "Found message at #{offset} with labels {#{m.labels.to_a * ', '}}" - unless m_old + each_message_from source do |m| + yield "Found message at #{m.source_info} with labels {#{m.labels.to_a * ', '}}" + old_m = Index.build_message m.id + if old_m + if old_m.source.id != source.id || old_m.source_info != m.source_info + ## here we merge labels between new and old versions, but we don't let the new + ## message add :unread or :inbox labels. (they can exist in the old version, + ## just not be added.) + new_labels = old_m.labels + (m.labels - [:unread, :inbox]) + yield "Message at #{m.source_info} is an updated of an old message. Updating labels from #{m.labels.to_a * ','} => #{new_labels.to_a * ','}" + m.labels = new_labels + Index.update_message m + else + yield "Skipping already-imported message at #{m.source_info}" + end + else + yield "Found new message at #{m.source_info} with labels #{m.labels.to_a * ','}" + Index.add_message m num += 1 from_and_subj << [m.from && m.from.longname, m.subj] if (m.labels & [:inbox, :spam, :deleted, :killed]) == Set.new([:inbox]) @@ -121,47 +134,43 @@ EOS [total_num, total_numi, from_and_subj, from_and_subj_inbox] end - ## this is the main mechanism for adding new messages to the - ## index. it's called both by sup-sync and by PollMode. - ## - ## for each message in the source, starting from the source's - ## starting offset, this methods yields the message, the source - ## offset, and the index entry on disk (if any). it expects the - ## yield to return the message (possibly altered in some way), and - ## then adds it (if new) or updates it (if previously seen). + ## like Source#each, but yields successive Message objects, which have their + ## labels and offsets set correctly. ## - ## the labels of the yielded message are the default source - ## labels. it is likely that callers will want to replace these with - ## the index labels, if they exist, so that state is not lost when - ## e.g. a new version of a message from a mailing list comes in. - def add_messages_from source, opts={} + ## this is the primary mechanism for iterating over messages from a source. + def each_message_from source, opts={} begin return if source.done? || source.has_errors? - source.each do |offset, default_labels| + source.each do |offset, source_labels| if source.has_errors? Redwood::log "error loading messages from #{source}: #{source.error.message}" return end - m_new = Message.build_from_source source, offset - m_old = Index.build_message m_new.id - - m_new.labels += default_labels + (source.archived? ? [] : [:inbox]) - m_new.labels << :sent if source.uri.eql?(SentManager.source_uri) - m_new.labels.delete :unread if m_new.source_marked_read? - m_new.labels.each { |l| LabelManager << l } + m = Message.build_from_source source, offset + m.labels += source_labels + (source.archived? ? [] : [:inbox]) + m.labels.delete :unread if m.source_marked_read? # preserve read status if possible + m.labels.each { |l| LabelManager << l } - HookManager.run "before-add-message", :message => m_new - m_ret = yield(m_old, m_new, offset) or next if block_given? - Index.sync_message m_ret, opts - UpdateManager.relay self, :added, m_ret unless m_old + HookManager.run "before-add-message", :message => m + yield m end rescue SourceError => e Redwood::log "problem getting messages from #{source}: #{e.message}" Redwood::report_broken_sources :force_to_top => true end end + + ## TODO: see if we can do this within PollMode rather than by calling this + ## method. + ## + ## a wrapper around Index.add_message that calls the proper hooks, + ## does the gui callback stuff, etc. + def add_new_message m + Index.add_message m + UpdateManager.relay self, :added, m + end end end diff --git a/lib/sup/sent.rb b/lib/sup/sent.rb index b750d71..74fe1ae 100644 --- a/lib/sup/sent.rb +++ b/lib/sup/sent.rb @@ -30,9 +30,9 @@ class SentManager def write_sent_message date, from_email, &block @source.store_message date, from_email, &block - PollManager.add_messages_from(@source) do |m_old, m, offset| + PollManager.each_message_from(@source) do |m| m.remove_label :unread - m + PollManager.add_new_message m end end end @@ -52,7 +52,7 @@ class SentLoader < MBox::Loader def uri; 'sup://sent' end def id; 9998; end - def labels; [:inbox]; end + def labels; [:inbox, :sent]; end end end diff --git a/lib/sup/thread.rb b/lib/sup/thread.rb index 1474b6e..81ce7e8 100644 --- a/lib/sup/thread.rb +++ b/lib/sup/thread.rb @@ -113,7 +113,7 @@ class Thread def set_labels l; each { |m, *o| m && m.labels = l }; end def has_label? t; any? { |m, *o| m && m.has_label?(t) }; end - def save index; each { |m, *o| m && m.save(index) }; end + def save_state index; each { |m, *o| m && m.save_state(index) }; end def direct_participants map { |m, *o| [m.from] + m.to if m }.flatten.compact.uniq diff --git a/lib/sup/xapian_index.rb b/lib/sup/xapian_index.rb index 33f2204..f79b055 100644 --- a/lib/sup/xapian_index.rb +++ b/lib/sup/xapian_index.rb @@ -87,6 +87,10 @@ class XapianIndex < BaseIndex m end + def add_message m; sync_message m end + def update_message m; sync_message m end + def update_message_state m; sync_message m end + def sync_message m, opts={} entry = synchronize { @entries[m.id] } snippet = m.snippet @@ -119,6 +123,7 @@ class XapianIndex < BaseIndex end true end + private :sync_message def num_results_for query={} xapian_query = build_xapian_query query -- 2.45.2