世界上最伟大的投资就是投资自己的教育

首页PostgreSQL
随风 · 练气

PostgreSQL 的 listen/notify 之 queue_classic (十四)

随风发布于4198 次阅读

1. 介绍

queue_classic是一个 ruby 的 gem,用来实现 PostgreSQL 的消息队列。它基于PostgreSQL 的 listen/notify,有很多接口,使用起来比较简单。

如果你有使用过sidekiqresquedelayed_job,就会发现基本每个这种消息队列的 gem 的使用方法都是类似的,里面的概念也是差不多,也就是说,学会了queue_classic就等于会其中三种,只要掌握思想就好了。

2. 安装

假如我们已经有一个 rails 项目了。

在 Gemfile 添加下面这行。

gem "queue_classic", "~> 3.0.0"

执行bundle install

正如我们上面所说的,任务是需要存储的。所以要创建相应的表来存。

# 创建queue_classic_jobs表
rails generate queue_classic:install
# username替换为你自己的数据库的用户名,password是数据库的密码,rails365_dev是数据库名
export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev" 
bundle exec rake db:migrate

3. 测试

我们先在rails console里测试。

前面说过,消息队列是跑在一个进程里的。所以要启动那个进程。

bundle exec rake qc:work

你会发现 delayed_job,sidekiq 也是差不多的启动方法。

还可以指定队列来启动。

QUEUES="priority_queue,secondary_queue" bundle exec rake qc:work

启动好后,我们进入rails console中,执行下面这行语句。

  rails365 git:(master)  rails c
Loading development environment (Rails 4.2.3)
2.2.2 :001 > QC.enqueue("Kernel.puts", "hello world")
nil

你会在进程里看到类似这样的输出,就说明成功了。

  rails365 git:(master)  bundle exec rake qc:work 
hello world

QC.enqueue 就是后面的命令加上参数作为任务 push 到队列中。

上面只是个最简单的例子,还有可以在指定时间,指定队列来执行,具体更为详细的命令要看官方的 readme 文档。

如果需要调试或查看日志,可以开启调试功能,有两种不同的日志,分别是:

export QC_MEASURE="true"

# or

export DEBUG="true"

在运行rake qc:work之前运行,具体的效果,尝试下就知道的。

4. 在 rails 中使用

本站为例,是一个放博客的网站,文章是存放在 articles 这张表,当时为了查看哪篇文章最受欢迎,就在 articles 存了一个字段叫 visit_count,但每次用户查看文章时,就会往这个字段加 1。这个动作是用 rails 的ActiveSupport::Notifications配合 sidekiq 的消息队列来做的,现在要改成用 queue_classic 来做。

我们来看下相关的代码。

# app/workers/update_article_visit_count_worker.rb
class UpdateArticleVisitCountWorker
  include Sidekiq::Worker
  def perform(article_id)
    logger.info 'update article visit count begin'
    @article = Article.find(article_id)
    @article.visit_count += 1
    @article.save!(validate: false)
    logger.info 'update article visit count end'
  end
end

在哪里调用呢,我们是结合 ActiveSupport::Notifications 来做的,这个先不管,你也可以在 articles_controller 的 show action 直接调用。

# config/initializers/notification.rb
ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload|
  Rails.logger.info payload
  if payload[:controller] == "ArticlesController" && payload[:action] == "show"
    UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present?
  end
end

上文提过,queue_classic 主要是利用QC.enqueue这条命令把任务 push 到队列中的。只需要把这行UpdateArticleVisitCountWorker.perform_async(payload[:params]["id"]) if payload[:params]["id"].present?改成我们需要的就可以了。

把增加 visit_count 的值的逻辑移动 model 中去,然后在ActiveSupport::Notifications.subscribeQC.enqueue中调用就好了。

改造之后是这样的。

# app/models/article.rb
class Article < ActiveRecord::Base
  def self.update_article_visit_count(article_id)
    article = Article.find(article_id)
    article.visit_count += 1
    article.save!(validate: false)
  end
end
# config/initializers/notification.rb
ActiveSupport::Notifications.subscribe "process_action.action_controller" do |name, started, finished, unique_id, payload|
  Rails.logger.info payload
  if payload[:controller] == "ArticlesController" && payload[:action] == "show"
    QC.enqueue "Article.update_article_visit_count",payload[:params]["id"] if payload[:params]["id"].present?
  end
end

现在需要重启下rails serverbundle exec rake qc:work

重启rails server运行好export QC_DATABASE_URL="postgres://username:password@localhost/rails365_dev"这个命令。

为了让rake qc:work更能明显地看到日志信息,在运行rake qc:work前先执行export QC_MEASURE="true"

现在可以去页面上测试的。

5. 注意事项

第一点是关于环境变量,也就是QC_DATABASE_URLQC_MEASUREDEBUG这三个,当部署到线上环境时,就要把这三个变量写进 shell 的配置文件,如比 ubuntu 系统,就写进~/.bashrc_profile 就好了。

第二点是关于错误的任务,任务也是有可能会报错的,但是我们不知道哪个任务报错了,所以很不方便,其实官方提供了接口的,你自己可以捕获那个错误信息,捕获后就可以进行自己想要的处理了。其实错误的任务都会一直存在表 queue_classic_jobs 中,这样查看就好,至于那个接口,就是worker.rb中的 handle_failure 方法,官方 readme 文档也有示例,这里不再深究。

完结。

本站文章均为原创内容,如需转载请注明出处,谢谢。

0 条回复
暂无回复~~
相关小书
postgresql教程

postgresql教程

postgresql 最全面,最细致的特性介绍与应用教程

发表于

喜欢
我的微信官网服务号精品文章订阅号微信视频号
程序员随风
统计信息
    学员: 22224
    视频数量: 1463
    文章数量: 460

© 汕尾市求知科技有限公司 | 专业版网站 | 关于我们 | 在线学员:1147

粤公网安备 44152102000088号粤公网安备 44152102000088号 | 粤ICP备19038915号

Top