method
_run_parallel
v1_9_3_125 -
Show latest stable
- Class:
Test::Unit::Runner
_run_parallel(suites, type, result)public
No documentation available.
# File lib/test/unit.rb, line 386
def _run_parallel suites, type, result
if @options[:parallel] < 1
warn "Error: parameter of -j option should be greater than 0."
return
end
begin
# Require needed things for parallel running
require 'thread'
require 'timeout'
@tasks = @files.dup # Array of filenames.
@need_quit = false
@dead_workers = [] # Array of dead workers.
@warnings = []
shutting_down = false
rep = [] # FIXME: more good naming
# Array of workers.
@workers = @options[:parallel].times.map {
worker = Worker.launch(@options[:ruby],@args)
worker.hook(:dead) do |w,info|
after_worker_quit w
after_worker_down w, *info unless info.empty?
end
worker
}
# Thread: watchdog
watchdog = Thread.new do
while stat = Process.wait2
break if @interrupt # Break when interrupt
pid, stat = stat
w = (@workers + @dead_workers).find{|x| pid == x.pid }.dup
next unless w
unless w.status == :quit
# Worker down
w.died(nil, !stat.signaled? && stat.exitstatus)
end
end
end
@workers_hash = Hash[@workers.map {|w| [w.io,w] }] # out-IO => worker
@ios = @workers.map{|w| w.io } # Array of worker IOs
while _io = IO.select(@ios)[0]
break unless _io.each do |io|
break if @need_quit
worker = @workers_hash[io]
case worker.read
when /^okay$/
worker.status = :running
jobs_status
when /^ready$/
worker.status = :ready
if @tasks.empty?
break unless @workers.find{|x| x.status == :running }
else
worker.run(@tasks.shift, type)
end
jobs_status
when /^done (.+?)$/
r = Marshal.load($1.unpack("m")[0])
result << r[0..1] unless r[0..1] == [nil,nil]
rep << {file: worker.real_file,
report: r[2], result: r[3], testcase: r[5]}
$:.push(*r[4]).uniq!
when /^p (.+?)$/
del_jobs_status
print $1.unpack("m")[0]
jobs_status if @options[:job_status] == :replace
when /^after (.+?)$/
@warnings << Marshal.load($1.unpack("m")[0])
when /^bye (.+?)$/
after_worker_down worker, Marshal.load($1.unpack("m")[0])
when /^bye$/
if shutting_down
after_worker_quit worker
else
after_worker_down worker
end
end
break if @need_quit
end
end
rescue Interrupt => e
@interrupt = e
return result
ensure
shutting_down = true
watchdog.kill if watchdog
if @interrupt
@ios.select!{|x| @workers_hash[x].status == :running }
while [email protected]? && (__io = IO.select(@ios,[],[],10))
_io = __io[0]
_io.each do |io|
worker = @workers_hash[io]
case worker.read
when /^done (.+?)$/
r = Marshal.load($1.unpack("m")[0])
result << r[0..1] unless r[0..1] == [nil,nil]
rep << {file: worker.real_file,
report: r[2], result: r[3], testcase: r[5]}
$:.push(*r[4]).uniq!
@ios.delete(io)
end
end
end
end
@workers.each do |worker|
begin
timeout(1) do
worker.puts "quit"
end
rescue Errno::EPIPE
rescue Timeout::Error
end
worker.close
end
begin
timeout(0.2*@workers.size) do
Process.waitall
end
rescue Timeout::Error
@workers.each do |worker|
begin
Process.kill(:KILL,worker.pid)
rescue Errno::ESRCH; end
end
end
if @interrupt || @options[:no_retry] || @need_quit
rep.each do |r|
report.push(*r[:report])
end
@errors += rep.map{|x| x[:result][0] }.inject(:+)
@failures += rep.map{|x| x[:result][1] }.inject(:+)
@skips += rep.map{|x| x[:result][2] }.inject(:+)
else
puts ""
puts "Retrying..."
puts ""
rep.each do |r|
if r[:testcase] && r[:file] && !r[:report].empty?
require r[:file]
_run_suite(eval(r[:testcase]),type)
else
report.push(*r[:report])
@errors += r[:result][0]
@failures += r[:result][1]
@skips += r[:result][2]
end
end
end
if @warnings
warn ""
ary = []
@warnings.reject! do |w|
r = ary.include?(w[1].message)
ary << w[1].message
r
end
@warnings.each do |w|
warn "#{w[0]}: #{w[1].message} (#{w[1].class})"
end
warn ""
end
end
end