6 class SSHFileError < StandardError; end
8 ## this is a file-like interface to a file that actually lives on the
9 ## other end of an ssh connection. it works by using wc, head and tail
10 ## to simulate (buffered) random access. on a fast connection, this
11 ## can have a good bandwidth, but the latency is pretty terrible:
12 ## about 1 second (!) per request. luckily, we're either just reading
13 ## straight through the mbox (an import) or we're reading a few
14 ## messages at a time (viewing messages) so the latency is not a problem.
16 ## all of the methods here can throw SSHFileErrors, SocketErrors,
17 ## Net::SSH::Exceptions and Errno::ENOENTs.
19 ## debugging TODO: remove me
23 module_function :debug
25 ## a simple buffer of contiguous data
36 def empty?; @start.nil?; end
37 def start; @start; end
38 def endd; @start + @buf.length; end
40 def add data, offset=endd
41 #MBox::debug "+ adding #{data.length} bytes; size will be #{size + data.length}; limit #{SSHFile::MAX_BUF_SIZE}"
49 raise "non-continguous data added to buffer (data #{offset}:#{offset + data.length}, buf range #{start}:#{endd})" if offset + data.length < start || offset > endd
52 @buf = data[0 ... (start - offset)] + @buf
55 return if offset + data.length < endd
56 @buf += data[(endd - offset) .. -1]
61 raise "only ranges supported due to programmer's laziness" unless o.is_a? Range
62 @buf[Range.new(o.first - @start, o.last - @start, o.exclude_end?)]
65 def index what, start=0
66 x = @buf.index(what, start - @start)
67 x.nil? ? nil : x + @start
70 def rindex what, start=0
71 x = @buf.rindex(what, start - @start)
72 x.nil? ? nil : x + @start
75 def size; empty? ? 0 : @buf.size; end
76 def to_s; empty? ? "<empty>" : "[#{start}, #{endd})"; end # for debugging
79 ## sharing a ssh connection to one machines between sources seems to
80 ## create lots of broken situations: commands returning bizarre (large
81 ## positive integer) return codes despite working; commands
82 ## occasionally not working, etc. i suspect this is because of the
83 ## fragile nature of the ssh syncshell.
85 ## at any rate, we now open up one ssh connection per file, which is
86 ## probably silly in the extreme case.
88 ## the file-like interface to a remote file
90 MAX_BUF_SIZE = 1024 * 1024 # bytes
91 MAX_TRANSFER_SIZE = 1024 * 128
92 REASONABLE_TRANSFER_SIZE = 1024 * 32
93 SIZE_CHECK_INTERVAL = 60 * 1 # seconds
95 ## upon these errors we'll try to rereconnect a few times
96 RECOVERABLE_ERRORS = [ Errno::EPIPE, Errno::ETIMEDOUT ]
99 @@shells_mutex = Mutex.new
101 def initialize host, fn, ssh_opts={}
112 @buf_mutex = Mutex.new
115 def to_s; "mbox+ssh://#@host/#@fn"; end ## TODO: remove this EVILness
116 def broken?; !@broken_msg.nil?; end
122 def eof?; @offset >= size; end
123 def eof; eof?; end # lame but IO's method is named this and rmail calls that
124 def seek loc; @offset = loc; end
125 def tell; @offset; end
129 if @file_size.nil? || (Time.now - @last_size_check) > SIZE_CHECK_INTERVAL
130 @last_size_check = Time.now
131 @file_size = do_remote("wc -c #@fn").split.first.to_i
138 @buf_mutex.synchronize do
139 make_buf_include @offset
140 expand_buf_forward while @buf.index("\n", @offset).nil? && @buf.endd < size
141 returning(@buf[@offset .. (@buf.index("\n", @offset) || -1)]) { |line| @offset += line.length }
147 @buf_mutex.synchronize do
148 make_buf_include @offset, n
149 @buf[@offset ... (@offset += n)]
155 ## TODO: share this code with imap
157 @say_id = BufferManager.say s, @say_id if BufferManager.instantiated?
162 BufferManager.clear @say_id if BufferManager.instantiated? && @say_id
167 raise SSHFileError, @broken_msg if broken?
170 @key = [@host, @ssh_opts[:username]]
172 @shell, @shell_mutex = @@shells_mutex.synchronize do
173 unless @@shells.member? @key
174 say "Opening SSH connection to #{@host} for #@fn..."
175 #raise SSHFileError, "simulated SSH file error"
176 session = Net::SSH.start @host, @ssh_opts
177 say "Starting SSH shell..."
178 @@shells[@key] = [session.shell.sync, Mutex.new]
183 say "Checking for #@fn..."
184 @shell_mutex.synchronize { raise Errno::ENOENT, @fn unless @shell.test("-e #@fn").status == 0 }
190 def do_remote cmd, expected_size=0
197 # MBox::debug "sending command: #{cmd.inspect}"
198 result = @shell_mutex.synchronize { x = @shell.send_command cmd; sleep 0.25; x }
199 raise SSHFileError, "Failure during remote command #{cmd.inspect}: #{(result.stderr || result.stdout || "")[0 .. 100]}" unless result.status == 0
202 ## Net::SSH::Exceptions seem to happen every once in a while for
204 rescue Net::SSH::Exception, *RECOVERABLE_ERRORS
205 if (retries += 1) <= 3
206 @@shells_mutex.synchronize do
214 rescue Net::SSH::Exception, SSHFileError, SystemCallError => e
215 @broken_msg = e.message
222 def get_bytes offset, size
223 do_remote "tail -c +#{offset + 1} #@fn | head -c #{size}", size
226 def expand_buf_forward n=REASONABLE_TRANSFER_SIZE
227 @buf.add get_bytes(@buf.endd, n)
230 ## try our best to transfer somewhere between
231 ## REASONABLE_TRANSFER_SIZE and MAX_TRANSFER_SIZE bytes
232 def make_buf_include offset, size=0
233 good_size = [size, REASONABLE_TRANSFER_SIZE].max
235 trans_start, trans_size =
238 elsif offset < @buf.start
239 if @buf.start - offset <= good_size
240 start = [@buf.start - good_size, 0].max
241 [start, @buf.start - start]
242 elsif @buf.start - offset < MAX_TRANSFER_SIZE
243 [offset, @buf.start - offset]
245 MBox::debug "clearing SSH buffer because buf.start #{@buf.start} - offset #{offset} >= #{MAX_TRANSFER_SIZE}"
250 return if [offset + size, self.size].min <= @buf.endd # whoohoo!
251 if offset - @buf.endd <= good_size
252 [@buf.endd, good_size]
253 elsif offset - @buf.endd < MAX_TRANSFER_SIZE
254 [@buf.endd, offset - @buf.endd]
256 MBox::debug "clearing SSH buffer because offset #{offset} - buf.end #{@buf.endd} >= #{MAX_TRANSFER_SIZE}"
262 @buf.clear! if @buf.size > MAX_BUF_SIZE
263 @buf.add get_bytes(trans_start, trans_size), trans_start