世界上最伟大的投资就是投资自己的教育

首页Ruby
随风 · 元婴

消息队列之 active_job 结合 Sidekiq (一)

随风发布于1282 次阅读

1. 简介

sidekiq是用 ruby 实现的消息列队的 gem,相关的 gem 还有resquedelayed_jobqueue_classic等。关于消息队列的解释可以查阅这篇文章PostgreSQL 的 listen/notify 之消息队列 (一),这里不再详述。每个 gem 的存储介质和原理都不太一样,比如 queue_classic 只用 PostgreSQL 来存,delayed_job 可以用 Mysql,PostgreSQL,Sqlite 等,而 sidekiq 用 redis。原理也不一样,比如 queue_classic 是基于 PostgreSQL 的 listen/notify,sidekiq 是基于 redis 的 pub/sub 模式。

有这么多的消息队列,每个都要用,都要去学,是一件很耗精力的事情。而且,这些 gem 的使用方法都差不多,很多使用上的概念很类似,比如指定队列,优先级,执行时间等都很相似。何不用一种接口或方法来统一呢,就像 activerecord 一样,假如要换数据库,那只是换个 gem 加一个配置文件而已,因为他们的使用方法都差不多。

只要把相同的使用方法抽出来,形成一个接口,而这些 gem 就成了适配器 (adapter)。这个就像汽车的引擎或发动机一样,可以随时更换。

所以有了active_job,这个原本是一个 gem,不过从 4.2 开始被合作到 rails,成为 rails 的一部分了。

2. sidekiq

现在我们来单独使用 sidekiq,也就是不用 active_job 的情况下。

2.1 安装

在 Gemfile 文件添加下面这行。

gem 'sidekiq', '~> 3.5.1'
# 因为依赖于redis
sudo apt-get install redis-server

由于消息队列是开了另一个独立于 web 的进程,在那个进程中,是在时刻接收进来的任务,然后排队处理,所以需要在 controller 或 model 把那个任务丢给消息队列的进程就好了。详情可参阅PostgreSQL 的 listen/notify 之消息队列 (一)的详述。

而 controller 或 model 是通过什么和那进程连接的呢?就是通过 worker,一个 worker 是暴露在外面的类,可以在 controller 或 model 中全局调用。

首先来定义一个 worker。

# app/workers/update_article_visit_count_worker.rb
class UpdateArticleVisitCountWorker
  include Sidekiq::Worker
  def perform(article_id)
    logger.info 'update article visit count begin'
    @article = Article.find(article_id)
    @article.visit_count += 1
    @article.save!(validate: false)
    logger.info 'update article visit count end'
  end
end

现在就可以把 sidekiq 的进程启动起来,准备测试。

bundle exec sidekiq

进入rails console中。

article = Article.find(:first)
UpdateArticleVisitCountWorker.perform_async(article.id)

如果没有报错且数据库被作了相应的改变,说明是有效果的。

这里有个要注意的地方,在上面的例子中,不传 article 整个对象,而是传 article 的 id,是有原因的,主要是基于两点。

第一,无论 article 还是 article_id 是作为参数传给 sidekiq 进程的,你传对象不好处理的,那是 ruby 的对象,你传到另一个进程需要反序列化之类的,而传 id,只是一个数字就好传多了,也好处理。

第二,worker 中是放耗时的动作的,那个 find 可以比较耗时,这取决于数据库的记录,所以尽量将这种需要时间的动作放到 worker 中的 perform。

整个 sidekiq 的使用很简单,就是建立一个 worker 的类,然后把逻辑写到 perform 方法,在需要的地方用 perform_async 调用就好了。

关于 sidekiq 的线上部署,如果你是使用 mina 的话,那就简单了,使用mina-sidekiq,也就几行代码搞定。

sidekiq 默认会使用 redis 的默认 ip 地址,端口等,不过这些都是可以改变的。

sidekiq 远不止这么简单,它是有强大的功能却易用,关于 sidekiq 的更加详细的信息,比如错误处理,日志记录,Batches,监控,高级选项等可以参考sidekiq-wiki。还有,sidekiq 还有很多插件,你往 github 搜一下可能就可以找到一些。

3. active_job

把上面的例子改成用 active_job 来实现。

生成类似 worker 的文件。

rails generate job update_article_visit_count

生成了下面这个文件。

# app/jobs/update_article_visit_count_job.rb
class UpdateArticleVisitCountJob < ActiveJob::Base
  queue_as :default

  def perform(*args)
    # Do something later
  end
end

UpdateArticleVisitCountJob类继承自ActiveJob::Base

现在把上面的那段逻辑放进去。

# app/jobs/update_article_visit_count_job.rb
class UpdateArticleVisitCountJob < ActiveJob::Base
  queue_as :default

  def perform(article_id)
    logger.info 'update article visit count begin'
    @article = Article.find(article_id)
    @article.visit_count += 1
    @article.save!(validate: false)
    logger.info 'update article visit count end'
  end
end

指定用 sidekiq 作为 adapter。

# config/application.rb
module YourApp
  class Application < Rails::Application
    config.active_job.queue_adapter = :sidekiq
  end
end

进入rails console中测试。

article = Article.first
UpdateArticleVisitCountJob.perform_later article.id

跟上面的调用也是类似。

这是基本的功能,active_job 还支持指定队列,回调函数,异常捕获等。具体地参考官方文档

下一篇:消息队列之 RabbitMQ 结合 sneakers(二)
完结。

本站文章均为原创内容,如需转载请注明出处,谢谢。

0 条回复
暂无回复~~
喜欢
我的微信官网服务号精品文章订阅号微信视频号
程序员随风
统计信息
    学员: 20595
    视频数量: 1260
    文章数量: 447

© 汕尾市求知科技有限公司 | 专业版网站 | 在线学员:1133

粤公网安备 44152102000088号粤公网安备 44152102000088号 | 粤ICP备19038915号

Top