class PollManager
include Singleton
+ HookManager.register "before-add-message", <<EOS
+Executes immediately before a message is added to the index.
+Variables:
+ message: the new message
+EOS
+
+ HookManager.register "before-poll", <<EOS
+Executes immediately before a poll for new messages commences.
+No variables.
+EOS
+
+ HookManager.register "after-poll", <<EOS
+Executes immediately after a poll for new messages completes.
+Variables:
+ num: the total number of new messages added in this poll
+ num_inbox: the number of new messages added in this poll which
+ appear in the inbox (i.e. were not auto-archived).
+num_inbox_total_unread: the total number of unread messages in the inbox
+ from_and_subj: an array of (from email address, subject) pairs
+ from_and_subj_inbox: an array of (from email address, subject) pairs for
+ only those messages appearing in the inbox
+EOS
+
DELAY = 300
def initialize
- @polling = false
+ @mutex = Mutex.new
+ @thread = nil
@last_poll = nil
+ @polling = false
self.class.i_am_the_instance self
-
- Redwood::reporting_thread do
- while true
- sleep DELAY / 2
- poll if @last_poll.nil? || (Time.now - @last_poll) >= DELAY
- end
- end
end
def buffer
- BufferManager.spawn_unless_exists("<poll for new messages>", :hidden => true) do
- PollMode.new
- end
+ b, new = BufferManager.spawn_unless_exists("poll for new messages", :hidden => true, :system => true) { PollMode.new }
+ b
end
def poll
+ return if @polling
+ @polling = true
+ HookManager.run "before-poll"
+
BufferManager.flash "Polling for new messages..."
- num, numi = buffer.mode.poll
+ num, numi, from_and_subj, from_and_subj_inbox = buffer.mode.poll
if num > 0
- BufferManager.flash "Loaded #{num} new messages, #{numi} to inbox."
+ BufferManager.flash "Loaded #{num.pluralize 'new message'}, #{numi} to inbox."
else
- BufferManager.flash "No new messages."
+ BufferManager.flash "No new messages."
end
+
+ HookManager.run "after-poll", :num => num, :num_inbox => numi, :from_and_subj => from_and_subj, :from_and_subj_inbox => from_and_subj_inbox, :num_inbox_total_unread => lambda { Index.num_results_for :labels => [:inbox, :unread] }
+
+ @polling = false
[num, numi]
end
- def do_poll
- return [0, 0] if @polling
- @polling = true
- found = {}
- total_num = 0
- total_numi = 0
+ def start
+ @thread = Redwood::reporting_thread("periodic poll") do
+ while true
+ sleep DELAY / 2
+ poll if @last_poll.nil? || (Time.now - @last_poll) >= DELAY
+ end
+ end
+ end
- Index.usual_sources.each do |source|
- next if source.done?
- yield "Loading from #{source}... "
+ def stop
+ @thread.kill if @thread
+ @thread = nil
+ end
- start_offset = nil
- num = 0
- num_inbox = 0
- source.each do |offset, labels|
- start_offset ||= offset
- yield "Found message at #{offset} with labels #{labels * ', '}"
+ def do_poll
+ total_num = total_numi = 0
+ from_and_subj = []
+ from_and_subj_inbox = []
+
+ @mutex.synchronize do
+ Index.usual_sources.each do |source|
+# yield "source #{source} is done? #{source.done?} (cur_offset #{source.cur_offset} >= #{source.end_offset})"
begin
- m = Redwood::Message.new :source => source, :source_info => offset,
- :labels => labels
- if found[m.id]
- yield "Skipping duplicate message #{m.id}"
- next
- else
- found[m.id] = true
- end
-
- if Index.add_message m
- UpdateManager.relay :add, m
+ yield "Loading from #{source}... " unless source.done? || (source.respond_to?(:has_errors?) && source.has_errors?)
+ rescue SourceError => e
+ Redwood::log "problem getting messages from #{source}: #{e.message}"
+ Redwood::report_broken_sources :force_to_top => true
+ next
+ end
+
+ num = 0
+ numi = 0
+ add_messages_from source do |m, offset, entry|
+ ## always preserve the labels on disk.
+ m.labels = ((m.labels - [:unread, :inbox]) + entry[:label].symbolistize).uniq if entry
+ yield "Found message at #{offset} with labels {#{m.labels * ', '}}"
+ unless entry
num += 1
- total_num += 1
- total_numi += 1 if m.labels.include? :inbox
+ from_and_subj << [m.from && m.from.longname, m.subj]
+ if m.has_label?(:inbox) && ([:spam, :deleted, :killed] & m.labels).empty?
+ from_and_subj_inbox << [m.from && m.from.longname, m.subj]
+ numi += 1
+ end
end
- rescue SourceError, MessageFormatError => e
- yield "Ignoring erroneous message at #{source}##{offset}: #{e.message}"
+ m
+ end
+ yield "Found #{num} messages, #{numi} to inbox." unless num == 0
+ total_num += num
+ total_numi += numi
+ end
+
+ yield "Done polling; loaded #{total_num} new messages total"
+ @last_poll = Time.now
+ @polling = false
+ end
+ [total_num, total_numi, from_and_subj, from_and_subj_inbox]
+ end
+
+ ## this is the main mechanism for adding new messages to the
+ ## index. it's called both by sup-sync and by PollMode.
+ ##
+ ## for each message in the source, starting from the source's
+ ## starting offset, this methods yields the message, the source
+ ## offset, and the index entry on disk (if any). it expects the
+ ## yield to return the message (possibly altered in some way), and
+ ## then adds it (if new) or updates it (if previously seen).
+ ##
+ ## the labels of the yielded message are the default source
+ ## labels. it is likely that callers will want to replace these with
+ ## the index labels, if they exist, so that state is not lost when
+ ## e.g. a new version of a message from a mailing list comes in.
+ def add_messages_from source, opts={}
+ begin
+ return if source.done? || source.has_errors?
+
+ source.each do |offset, labels|
+ if source.has_errors?
+ Redwood::log "error loading messages from #{source}: #{source.error.message}"
+ return
end
- if num % 1000 == 0 && num > 0
- elapsed = Time.now - start
- pctdone = (offset.to_f - start_offset) / (source.total.to_f - start_offset)
- remaining = (source.end_offset.to_f - offset.to_f) * (elapsed.to_f / (offset.to_f - start_offset))
- yield "## #{num} (#{(pctdone * 100.0)}% done) read; #{elapsed.to_time_s} elapsed; est. #{remaining.to_time_s} remaining"
+ labels << :sent if source.uri.eql?(SentManager.source_uri)
+ labels.each { |l| LabelManager << l }
+ labels = labels + (source.archived? ? [] : [:inbox])
+
+ m = Message.new :source => source, :source_info => offset, :labels => labels
+ m.load_from_source!
+
+ if m.source_marked_read?
+ m.remove_label :unread
+ labels.delete :unread
end
+
+ docid, entry = Index.load_entry_for_id m.id
+ HookManager.run "before-add-message", :message => m
+ m = yield(m, offset, entry) or next if block_given?
+ times = Index.sync_message m, false, docid, entry, opts
+ UpdateManager.relay self, :added, m unless entry
end
- yield "Found #{num} messages" unless num == 0
+ rescue SourceError => e
+ Redwood::log "problem getting messages from #{source}: #{e.message}"
+ Redwood::report_broken_sources :force_to_top => true
end
- yield "Done polling; loaded #{total_num} new messages total"
- @last_poll = Time.now
- @polling = false
- [total_num, total_numi]
end
end