世界上最伟大的投资就是投资自己的教育

首页Ruby
随风 · 练气

Sidekiq 实战进阶

随风发布于2147 次阅读

1. 存储机制

这一节我们来分析一下 sidekiq 更为进阶的内容,先从 sidekiq 的存储开始。

sidekiq 是使用 redis 作为存储机制的。目前有一个 job 是下面这样的:

class UpdateArticleVisitCountJob < ActiveJob::Base
  queue_as :default

  def perform(article_id)
    Sidekiq.logger.info 'update article visit count begin'
    @article = Article.find(article_id)
    @article.visit_count += 1
    @article.save!(validate: false)
    Sidekiq.logger.info 'update article visit count end'
  end
end

先不开启 sidekiq。先在网页上或 console 上激发这个 job。

# rails console
article = Article.first
UpdateArticleVisitCountJob.perform_later article.id

再来查看 redis 的存储情况。

127.0.0.1:6379> keys sidekiq:*
1) "sidekiq:queue:default"
2) "sidekiq:queues"

分别来看一下这两个 key 存的是何值。

127.0.0.1:6379> type sidekiq:queue:default
list
127.0.0.1:6379> lrange sidekiq:queue:default 0 -1
1) "{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"UpdateArticleVisitCountJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"UpdateArticleVisitCountJob\",\"job_id\":\"d8c1e0c3-76bc-4bfe-b7a6-3ae816b7edb1\",\"queue_name\":\"default\",\"arguments\":[\"2015-10-27-redis-de-tu-xing-hua-gong-ju-si\"]}],\"retry\":true,\"jid\":\"373bc9016faa7bef4dc13280\",\"created_at\":1452411730.167636,\"enqueued_at\":1452411730.167695}"

127.0.0.1:6379> type sidekiq:queues
set
127.0.0.1:6379> smembers sidekiq:queues
1) "default"

由此可见,sidekiq:queues 存的是队列的集合,因为只有一个队列,所以只有default,而sidekiq:queue:default存的是default这个队列的 job,它是一个 list,每次有新的 job 过来,就会 push 进这个 list。这些 job 都有各自的 class,arguments 等参数,格式是一个 json,这样就能找到执行的 class 和代码,加上传递过来的参数就可以执行了。

我把上面的 job 的 queue 从default换成myqueue

127.0.0.1:6379> smembers sidekiq:queues
1) "myqueue"
2) "default"

127.0.0.1:6379> keys sidekiq:*
1) "sidekiq:queues"
2) "sidekiq:queue:default"
3) "sidekiq:queue:myqueue"

在开启sidekiq进程之前,我们先在redis-cli打开monitor

127.0.0.1:6379> monitor
OK

再来开启sidekiq。运行bundle exec sidekiq即可。

会发现monitor中有类似下面这样的输出:

1452413290.988732 [0 127.0.0.1:53645] "brpop" "sidekiq:queue:default" "2"
1452413290.991246 [0 127.0.0.1:53646] "incrby" "sidekiq:stat:processed" "0"
1452413291.064356 [0 127.0.0.1:53646] "incrby" "sidekiq:stat:processed:2016-01-10" "0"
1452413291.074665 [0 127.0.0.1:53645] "brpop" "sidekiq:queue:default" "2"
...
1452413291.085721 [0 127.0.0.1:53646] "incrby" "sidekiq:stat:failed" "0"
1452413291.085772 [0 127.0.0.1:53646] "incrby" "sidekiq:stat:failed:2016-01-10" "0"
1452413291.085813 [0 127.0.0.1:53646] "del" "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6:workers"
...
1452413291.108264 [0 127.0.0.1:53646] "sadd" "sidekiq:processes" "MacintoshdeMacBook-Air.local:21359:aff51da8b2a6"
1452413291.115825 [0 127.0.0.1:53666] "brpop" "sidekiq:queue:default" "2"
1452413291.117003 [0 127.0.0.1:53667] "brpop" "sidekiq:queue:default" "2"
1452413291.117066 [0 127.0.0.1:53646] "hmset" "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6" "info" "{\"hostname\":\"MacintoshdeMacBook-Air.local\",\"started_at\":1452413290.988961,\"pid\":21359,\"tag\":\"rails365\",\"concurrency\":25,\"queues\":[\"default\"],\"labels\":[],\"identity\":\"MacintoshdeMacBook-Air.local:21359:aff51da8b2a6\"}" "busy" "1" "beat" "1452413291.1081119"
1452413291.118247 [0 127.0.0.1:53668] "brpop" "sidekiq:queue:default" "2"
1452413291.124678 [0 127.0.0.1:53646] "expire" "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6" "60"
1452413291.124745 [0 127.0.0.1:53646] "rpop" "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6-signals"

上面列出的只是一部分输出,从输出你也可以看到sidekiq会开启多个线程不断地执行brpop指令。

brpop是 redis 中一个阻塞式的 pop 指令,它能够弹出 list 的一个元素。

下面来看下 redis 中存了哪些内容。

1) "sidekiq:stat:failed:2016-01-10"
2) "sidekiq:MacintoshdeMacBook-Air.local:21359:aff51da8b2a6"
3) "sidekiq:stat:processed"
4) "sidekiq:queues"
5) "sidekiq:stat:processed:2016-01-10"
6) "sidekiq:processes"
7) "sidekiq:stat:failed"
8) "sidekiq:queue:myqueue"

也就是说,它会把处理过的,失败的 job 的一些信息都记录下来,这些信息也是 sidekiq 中的 web ui 利用的,它就是利用这些信息来查询一共处理了多少 job,失败的有多少等等。

2. 配置文件

sidekiq是一个程序,它能够带参数来运行,下面是它的所有参数。

$ sidekiq --help
2016-01-10T08:21:42.317Z 22167 TID-ouj4fd3hk INFO: sidekiq [options]
    -c, --concurrency INT            processor threads to use
    -d, --daemon                     Daemonize process
    -e, --environment ENV            Application environment
    -g, --tag TAG                    Process tag for procline
    -i, --index INT                  unique process index on this machine
    -q, --queue QUEUE[,WEIGHT]       Queues to process with optional weights
    -r, --require [PATH|DIR]         Location of Rails application with workers or file to require
    -t, --timeout NUM                Shutdown timeout
    -v, --verbose                    Print more verbose output
    -C, --config PATH                path to YAML config file
    -L, --logfile PATH               path to writable logfile
    -P, --pidfile PATH               path to pidfile
    -V, --version                    Print version and exit
    -h, --help                       Show help

比如指定并发数,队列,超时时间等,还有作为进程一些必须的参数,例如指定 pid 文件,日志文件等。

然而这些参数也可以以配置文件的形式存在。

比如:

# config/sidekiq.yml
---
:concurrency: 5
:pidfile: tmp/pids/sidekiq.pid
:logfile: ./log/sidekiq.log
staging:
  :concurrency: 10
production:
  :concurrency: 20
:queues:
  - default
  - [myqueue, 2]

只要运行的时候指定好-c参数就行的,例如:sidekiq -c config/sidekiq.yml

3. 部署问题

sidekiq部署到线上环境,首先需要让sidekiq以 deamon 的形式运行,就是以后台程序的形式运行。

sidekiq有个参数-d可以做到这个。

还需要指定-e参数,一般是production

为了能让进程被友好地 kill 掉,还需要 pidfile 文件,也就是-P参数。

除此之外,还有查错使用的日志文件,也即-L参数。

不过,我们都会使用capistranomina来自动化部署我们的项目,它们对应使用的插件分别是下面两个:

具体的可以研究这两个插件的源码,来看它是如何启动,停止sidekiq进程的。

然而,官方并不推荐这种做法,原因是不能保证可用性,比如 sidekiq 挂掉,它不能自动重启。

所以官方的推荐作法是用systemd或者upstart,它们比较有可靠性。

不过我推荐monit来管理sidekiq

关于这几个官方都有类似的 example:

4. 并发控制

上面的配置文件有一个参数 concurrency 就是来控制并发数的,这个并发数指的是线程的数量,也就是同一时刻能处理的并发量。

关于此点,必须来说说sidekiqdelayed_jobresque的并发机制的区别。

delayed_job是使用 mysql,postgresql,mongodb 等作为存储介质的,而sidekiqresque是使用速度更快的内存系统 redis 作为存储介质。delayed_jobresque是使用单线程的进程机制,也就是说,它们同一时刻能处理的 job 是一个。假如要实现并发,同时处理多个 job,就得开多个进程,且它们都有这种支持。众所周知,进程开销是很大,也很占用内存。而sidekiq是使用线程机制的,它默认可以开 25 个线程,也就是 25 个并发,当然,它也可以开多个进程,如果要实现高并发,就可以开 N 个进程,每个进程又开 N 个线程。那并发数就是 NxN 的数量。

sidekiq是使用线程连接池来管理线程,所以不用太担心线程的开销问题。

还有一个问题就是参数 concurrency 的值可以等于数据库的线程连接池 pool 的数量,这个数据库的线程连接池 pool 的数量,默认是 5,可以调成跟 concurrency 的值一样。

production:
  adapter: mysql2
  database: foo_production
  pool: 25

5. 异常捕获

在跑sidekiq的 job 的时候,也是有可能出现异常的,如果这时候我们要查看异常,只能去看 log。不然的话就使用一些通知机制,比如 slack,邮件等来主动通知我们。

还有一种方式就是把这个异常捕获起来存放到数据库,然后再去查看。

我是用了一张表来存放这些异常。

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.error_handlers << Proc.new do |ex,ctx_hash|
    Admin::SidekiqException.create!(ex: ex, ctx_hash: ctx_hash)
  end
end

6. 其他

sidekiqresque等相比,它默认有个 retry 机制,这个可以确保 job 失败的时候去重新触发,job 失败会抛出异常,抛出异常的原因可以有很多种,可能本来程序就有错,这种情况下可以等程序恢复正常之后,job 能正常运行,确保不丢失原来的 job。还有一种情况,就是可能这个 job 需要连接数据库,或其他网络,如果当时不通,job 就会失败,可以不断地 retry,等网络通了之后 job 就可以成功运行了。所以这是一个很好的机制。

sidekiq还可以指定 retry 的次数,也可以在 job 中关闭 retry 这个功能。

还可以以中间件的形式关闭掉这个功能。

# config/initializers/sidekiq.rb
Sidekiq.configure_server do |config|
  config.server_middleware do |chain|
    chain.remove Sidekiq::Middleware::Server::RetryJobs
  end
end

如果用 activejob 来包装sidekiq,默认就是retry的,也不能指定retry的选项了。

sidekiq没有 job 的callback功能,而resque有,如果使用activejob,也有这个功能。

完结。

本站文章均为原创内容,如需转载请注明出处,谢谢。

0 条回复
暂无回复~~
喜欢
我的微信官网服务号精品文章订阅号微信视频号
程序员随风
统计信息
    学员: 22744
    视频数量: 1492
    文章数量: 466

© 汕尾市求知科技有限公司 | 专业版网站 | 关于我们 | 在线学员:1151

粤公网安备 44152102000088号粤公网安备 44152102000088号 | 粤ICP备19038915号

Top