首页ruby
海外散仙厉飞雨 · 真仙

Ruby 的三种线程同步控制方式

海外散仙厉飞雨发布于

1. 互斥锁Mutex

Mutex是最常见的一种线程同步的控制方式,其他各种语言都有这种方式。

它是这样的原理:线程要访问一段代码前,先获得一把锁,其他的线程就不能在锁被释放前访问这段代码,只能等锁释放了,其他线程才能进来。rails之所以是线程安全的原因就在于使用了这种机制,每个请求(action),最终就是会进入这样一个互斥锁的控制。

rails中相关的源码是这样的:

# https://github.com/rack/rack/blob/master/lib/rack/lock.rb#L16
require 'thread'
require 'rack/body_proxy'

module Rack
  # Rack::Lock locks every request inside a mutex, so that every request
  # will effectively be executed synchronously.
  class Lock
    def initialize(app, mutex = Mutex.new)
      @app, @mutex = app, mutex
    end

    def call(env)
      @mutex.lock
      begin
        response = @app.call(env.merge(RACK_MULTITHREAD => false))
        returned = response << BodyProxy.new(response.pop) { @mutex.unlock }
      ensure
        @mutex.unlock unless returned
      end
    end
  end
end

ruby标准库Mutex中定义了mutex的方法。

下面是一个Mutex的例子。

class Item
  class << self; attr_accessor :price end
  @price = 0
end

mutex = Mutex.new

threads = (1..10).map do |i|
  Thread.new(i) do |i|
    mutex.synchronize do 
      item_price = Item.price # Reading value
      sleep(rand(0..2))
      item_price += 10        # Updating value
      sleep(rand(0..2))
      Item.price = item_price # Writing value
    end
  end
end

threads.each {|t| t.join}

puts "Item.price = #{Item.price}"

2. 线程安全的数据类型Queue

ruby语言只有一个数据类型是线程安全的,那就是Queue(队列),其他的,比如Array,Hash等都不是线程安全。

require 'open-uri'
require 'nokogiri'
require 'thread'

work_q = Queue.new

(1..50).each{|page| work_q << page}

workers = (0...10).map do
  Thread.new do
    begin
      while page = work_q.pop(true)      
        begin
          puts "page - #{page}"
          doc = Nokogiri::HTML(open("https://ruby-china.org/topics?page=#{page}"))
          doc.css("div.topic").each do |node|
            puts "#{node.css('> div.infos > div.title > a').text} #{node.css("> div.count > a").text}"
          end
        rescue => e
          puts "problem on page #{page}"
          puts e.inspect
        end
      end # while
      puts ""
    rescue ThreadError
    end
  end
end
workers.map(&:join)

3. 条件变量ConditonVariable

使用ConditonVariable进行同步控制,能够在一些致命的资源竞争部分挂起线程直到有可用的资源为止。它能够根据状态的变化,让线程挂起,直到条件达到时,重新唤醒线程。

require 'thread'
require 'net/http'

mutex    = Mutex.new
condvar  = ConditionVariable.new
results  = Array.new

Thread.new do
  10.times do
    response = Net::HTTP.get_response('dynamic.xkcd.com', '/random/comic/')
    random_comic_url = response['Location']

    mutex.synchronize do
      results << random_comic_url
      condvar.signal                     # Signal the ConditionVariable
    end
  end
end

comics_received = 0

until comics_received >= 10
  mutex.synchronize do
    while results.empty?
      condvar.wait(mutex)
    end

    url = results.shift
    puts "You should check out #{url}"
  end

  comics_received += 1
end

完结。

本站帖子均为原创内容,如需转载请注明出处,谢谢。

0 条回复
暂无回复~~
喜欢

© 汕尾市求知科技有限公司 | 粤ICP备19038915号 | 在线学员:91

Top