Flowdock
method

_run_parallel

Importance_0
v1_9_3_392 - Show latest stable - 0 notes - Class: Runner
_run_parallel(suites, type, result) public

No documentation

This method has no description. You can help the Ruby community by adding new notes.

Hide source
# File lib/test/unit.rb, line 386
      def _run_parallel suites, type, result
        if @options[:parallel] < 1
          warn "Error: parameter of -j option should be greater than 0."
          return
        end

        begin
          # Require needed things for parallel running
          require 'thread'
          require 'timeout'
          @tasks = @files.dup # Array of filenames.
          @need_quit = false
          @dead_workers = []  # Array of dead workers.
          @warnings = []
          shutting_down = false
          rep = [] # FIXME: more good naming

          # Array of workers.
          @workers = @options[:parallel].times.map {
            worker = Worker.launch(@options[:ruby],@args)
            worker.hook(:dead) do |w,info|
              after_worker_quit w
              after_worker_down w, *info unless info.empty?
            end
            worker
          }

          # Thread: watchdog
          watchdog = Thread.new do
            while stat = Process.wait2
              break if @interrupt # Break when interrupt
              pid, stat = stat
              w = (@workers + @dead_workers).find{|x| pid == x.pid }.dup
              next unless w
              unless w.status == :quit
                # Worker down
                w.died(nil, !stat.signaled? && stat.exitstatus)
              end
            end
          end

          @workers_hash = Hash[@workers.map {|w| [w.io,w] }] # out-IO => worker
          @ios = @workers.map{|w| w.io } # Array of worker IOs

          while _io = IO.select(@ios)[0]
            break unless _io.each do |io|
              break if @need_quit
              worker = @workers_hash[io]
              case worker.read
              when /^okay$/
                worker.status = :running
                jobs_status
              when /^ready$/
                worker.status = :ready
                if @tasks.empty?
                  break unless @workers.find{|x| x.status == :running }
                else
                  worker.run(@tasks.shift, type)
                end

                jobs_status
              when /^done (.+?)$/
                r = Marshal.load($1.unpack("m")[0])
                result << r[0..1] unless r[0..1] == [nil,nil]
                rep    << {file: worker.real_file,
                           report: r[2], result: r[3], testcase: r[5]}
                $:.push(*r[4]).uniq!
              when /^p (.+?)$/
                del_jobs_status
                print $1.unpack("m")[0]
                jobs_status if @options[:job_status] == :replace
              when /^after (.+?)$/
                @warnings << Marshal.load($1.unpack("m")[0])
              when /^bye (.+?)$/
                after_worker_down worker, Marshal.load($1.unpack("m")[0])
              when /^bye$/
                if shutting_down
                  after_worker_quit worker
                else
                  after_worker_down worker
                end
              end
              break if @need_quit
            end
          end
        rescue Interrupt => e
          @interrupt = e
          return result
        ensure
          shutting_down = true

          watchdog.kill if watchdog
          if @interrupt
            @ios.select!{|x| @workers_hash[x].status == :running }
            while !@ios.empty? && (__io = IO.select(@ios,[],[],10))
                _io = __io[0]
                _io.each do |io|
                  worker = @workers_hash[io]
                  case worker.read
                  when /^done (.+?)$/
                    r = Marshal.load($1.unpack("m")[0])
                    result << r[0..1] unless r[0..1] == [nil,nil]
                    rep    << {file: worker.real_file,
                               report: r[2], result: r[3], testcase: r[5]}
                    $:.push(*r[4]).uniq!
                    @ios.delete(io)
                  end
                end
            end
          end
          @workers.each do |worker|
            begin
              timeout(1) do
                worker.puts "quit"
              end
            rescue Errno::EPIPE
            rescue Timeout::Error
            end
            worker.close
          end
          begin
            timeout(0.2*@workers.size) do
              Process.waitall
            end
          rescue Timeout::Error
            @workers.each do |worker|
              begin
                Process.kill(:KILL,worker.pid)
              rescue Errno::ESRCH; end
            end
          end

          if @interrupt || @options[:no_retry] || @need_quit
            rep.each do |r|
              report.push(*r[:report])
            end
            @errors   += rep.map{|x| x[:result][0] }.inject(:+)
            @failures += rep.map{|x| x[:result][1] }.inject(:+)
            @skips    += rep.map{|x| x[:result][2] }.inject(:+)
          else
            puts ""
            puts "Retrying..."
            puts ""
            rep.each do |r|
              if r[:testcase] && r[:file] && !r[:report].empty?
                require r[:file]
                _run_suite(eval(r[:testcase]),type)
              else
                report.push(*r[:report])
                @errors += r[:result][0]
                @failures += r[:result][1]
                @skips += r[:result][2]
              end
            end
          end
          if @warnings
            warn ""
            ary = []
            @warnings.reject! do |w|
              r = ary.include?(w[1].message)
              ary << w[1].message
              r
            end
            @warnings.each do |w|
              warn "#{w[0]}: #{w[1].message} (#{w[1].class})"
            end
            warn ""
          end
        end
      end
Register or log in to add new notes.