method
_run_parallel
v1_9_3_392 -
Show latest stable
-
0 notes -
Class: Runner
- 1_8_6_287
- 1_8_7_72
- 1_8_7_330
- 1_9_1_378
- 1_9_2_180
- 1_9_3_125 (0)
- 1_9_3_392 (0)
- 2_1_10 (0)
- 2_2_9
- 2_4_6
- 2_5_5
- 2_6_3
- What's this?
_run_parallel(suites, type, result)
public
Hide source
# File lib/test/unit.rb, line 386 def _run_parallel suites, type, result if @options[:parallel] < 1 warn "Error: parameter of -j option should be greater than 0." return end begin # Require needed things for parallel running require 'thread' require 'timeout' @tasks = @files.dup # Array of filenames. @need_quit = false @dead_workers = [] # Array of dead workers. @warnings = [] shutting_down = false rep = [] # FIXME: more good naming # Array of workers. @workers = @options[:parallel].times.map { worker = Worker.launch(@options[:ruby],@args) worker.hook(:dead) do |w,info| after_worker_quit w after_worker_down w, *info unless info.empty? end worker } # Thread: watchdog watchdog = Thread.new do while stat = Process.wait2 break if @interrupt # Break when interrupt pid, stat = stat w = (@workers + @dead_workers).find{|x| pid == x.pid }.dup next unless w unless w.status == :quit # Worker down w.died(nil, !stat.signaled? && stat.exitstatus) end end end @workers_hash = Hash[@workers.map {|w| [w.io,w] }] # out-IO => worker @ios = @workers.map{|w| w.io } # Array of worker IOs while _io = IO.select(@ios)[0] break unless _io.each do |io| break if @need_quit worker = @workers_hash[io] case worker.read when /^okay$/ worker.status = :running jobs_status when /^ready$/ worker.status = :ready if @tasks.empty? break unless @workers.find{|x| x.status == :running } else worker.run(@tasks.shift, type) end jobs_status when /^done (.+?)$/ r = Marshal.load($1.unpack("m")[0]) result << r[0..1] unless r[0..1] == [nil,nil] rep << {file: worker.real_file, report: r[2], result: r[3], testcase: r[5]} $:.push(*r[4]).uniq! when /^p (.+?)$/ del_jobs_status print $1.unpack("m")[0] jobs_status if @options[:job_status] == :replace when /^after (.+?)$/ @warnings << Marshal.load($1.unpack("m")[0]) when /^bye (.+?)$/ after_worker_down worker, Marshal.load($1.unpack("m")[0]) when /^bye$/ if shutting_down after_worker_quit worker else after_worker_down worker end end break if @need_quit end end rescue Interrupt => e @interrupt = e return result ensure shutting_down = true watchdog.kill if watchdog if @interrupt @ios.select!{|x| @workers_hash[x].status == :running } while !@ios.empty? && (__io = IO.select(@ios,[],[],10)) _io = __io[0] _io.each do |io| worker = @workers_hash[io] case worker.read when /^done (.+?)$/ r = Marshal.load($1.unpack("m")[0]) result << r[0..1] unless r[0..1] == [nil,nil] rep << {file: worker.real_file, report: r[2], result: r[3], testcase: r[5]} $:.push(*r[4]).uniq! @ios.delete(io) end end end end @workers.each do |worker| begin timeout(1) do worker.puts "quit" end rescue Errno::EPIPE rescue Timeout::Error end worker.close end begin timeout(0.2*@workers.size) do Process.waitall end rescue Timeout::Error @workers.each do |worker| begin Process.kill(:KILL,worker.pid) rescue Errno::ESRCH; end end end if @interrupt || @options[:no_retry] || @need_quit rep.each do |r| report.push(*r[:report]) end @errors += rep.map{|x| x[:result][0] }.inject(:+) @failures += rep.map{|x| x[:result][1] }.inject(:+) @skips += rep.map{|x| x[:result][2] }.inject(:+) else puts "" puts "Retrying..." puts "" rep.each do |r| if r[:testcase] && r[:file] && !r[:report].empty? require r[:file] _run_suite(eval(r[:testcase]),type) else report.push(*r[:report]) @errors += r[:result][0] @failures += r[:result][1] @skips += r[:result][2] end end end if @warnings warn "" ary = [] @warnings.reject! do |w| r = ary.include?(w[1].message) ary << w[1].message r end @warnings.each do |w| warn "#{w[0]}: #{w[1].message} (#{w[1].class})" end warn "" end end end