Harbor::Processor
Parent
Methods
Included Modules
Attributes
-
worker_count [RW]
CONFUGURATION OPTIONS
- daemonize [RW] (Not documented)
- sleep_time [RW] (Not documented)
- log_level [RW] (Not documented)
- log_file [RW] (Not documented)
Public Class Methods
new(*args)
# File lib/harbor/processor.rb, line 6 6: def self.new(*args) 7: raise "You must subclass Harbor::Processor and implement #reserve and #process" if self == Harbor::Processor 8: raise "You must implement #{self}#reserve" unless instance_methods(false).include?("reserve") 9: raise "You must implement #{self}#process" unless instance_methods(false).include?("process") 10: 11: processor = allocate 12: 13: processor.worker_count = 2 14: processor.daemonize = true 15: processor.sleep_time = 60 16: processor.log_level = :info 17: processor.log_file = "log/processor.log" 18: 19: processor.send(:initialize, *args) 20: processor 21: end
Public Instance Methods
handle_exception(task, exception)
Method which can be over-ridden to define special behavior for unhandled exceptions, such as marking a task as failed in the database.
# File lib/harbor/processor.rb, line 79 79: def handle_exception(task, exception) 80: logger.error("#{exception}\n#{exception.backtrace.join("\n")}") 81: end
handle_interrupt(task)
This method can be over-ridden to define special behavior for when a task is interrupted, such as updating a value in the database.
# File lib/harbor/processor.rb, line 72 72: def handle_interrupt(task) 73: end
interruptible()
Sometimes when running commands in a subshell (with backticks, system, etc.) interrupts will be sent, but the process will exit silently. Processor#interruptible swaps out the :INT handler to allow us to know when an action is interrupted, and optionally rescue.
# File lib/harbor/processor.rb, line 89 89: def interruptible 90: original_handler = trap(:INT, "DEFAULT") 91: yield 92: ensure 93: trap(:INT, original_handler) 94: end
logger()
# File lib/harbor/processor.rb, line 146 146: def logger 147: @logger ||= begin 148: logger = Logging::Logger[self.class] 149: logger.additive = false 150: logger.level = log_level 151: layout = Logging::Layouts::Pattern.new(:pattern => "%-5l %d: %m\n") 152: 153: if daemonize 154: logger.add_appenders(Logging::Appenders::File.new(log_file, :layout => layout)) 155: else 156: logger.add_appenders(Logging::Appenders::Stdout.new(:layout => layout)) 157: end 158: end 159: end
options(optparse)
Accepts an instance of OptionParser for wiring up options, like:
processor = MyProcessor.new
OptionParser.new do |opts|
opts.banner = "Usage: my_processor [options]"
processor.options(opts)
end.parse!
# File lib/harbor/processor.rb, line 42 42: def options(optparse) 43: optparse.on("-n", "--no-daemon", "Run in the foreground") { self.daemonize = false } 44: optparse.on("-w", "--workers=COUNT", Integer, "Number of workers to spawn (default: #{worker_count})") { |count| self.worker_count = count } 45: optparse.on("-l", "--log-level=LEVEL", [:debug, :info, :error], "Set log level (default: #{log_level})") { |log_level| self.log_level = log_level } 46: optparse.on("-L", "--log-file=FILE", "Log file (default: #{log_file})") { |file| self.log_file = file } 47: optparse.on("-s", "--sleep=SECONDS", Integer, "Sleep s seconds between runs (default: #{sleep_time})") { |sleep_time| self.sleep_time = sleep_time } 48: end
process(task)
This method must be over-ridden in your implementation of Harbor::Processor. It will be called within the forked worker process, and accepts as its only argument a task returned by reserve.
# File lib/harbor/processor.rb, line 64 64: def process(task) 65: raise NotImplementedError.new("You must implement #{self.class}#process(task)") 66: end
reserve()
This method must be over-ridden in your implementation of Harbor::Processor, and should return a task to be performed inside of a forked worker process. It should return nil when there are no available tasks.
# File lib/harbor/processor.rb, line 55 55: def reserve 56: raise NotImplementedError.new("You must implement #{self.class}#reserve") 57: end
start()
# File lib/harbor/processor.rb, line 96 96: def start 97: detach if daemonize 98: 99: logger.info "running at %s" % [Process.pid] 100: logger.info "workers = #{worker_count}" 101: 102: trap_signals 103: 104: @workers = {} 105: 106: while alive? 107: begin 108: loop do 109: break if worker_count == 0 && @workers.empty? 110: 111: (worker_count - @workers.size).times do 112: break unless task = reserve 113: @workers[spawn_worker(task)] = task 114: end 115: 116: pid = Process.wait 117: 118: task = @workers.delete(pid) 119: 120: case $?.exitstatus 121: when 0 # success 122: logger.info "[worker#%-5s] %s: completed" % [Process.pid, task.inspect] 123: when 1 # exception 124: logger.error "[worker#%-5s] %s: failed" % [Process.pid, task.inspect] 125: when 130 # interrupt 126: logger.warn "[worker#%-5s] %s: interrupted" % [Process.pid, task.inspect] 127: end 128: end 129: rescue Errno::ECHILD 130: end 131: 132: if alive? 133: begin 134: interruptible { sleep(sleep_time) } 135: rescue Interrupt 136: @alive = false 137: end 138: end 139: end 140: 141: logger.info "shutting down" 142: rescue => e 143: logger.error("#{e}\n#{e.backtrace.join("\n")}") 144: end
Private Instance Methods
alive?()
# File lib/harbor/processor.rb, line 163 163: def alive? 164: defined?(@alive) ? @alive : (@alive = true) 165: end
detach()
# File lib/harbor/processor.rb, line 183 183: def detach 184: srand 185: fork and exit 186: Process.setsid # detach -- we want to be able to close our shell! 187: 188: log_directory = ::File.dirname(log_file) 189: FileUtils.mkdir_p(log_directory) unless ::File.directory?(log_directory) 190: 191: redirect_io(log_file) 192: end
ignore_signals()
# File lib/harbor/processor.rb, line 231 231: def ignore_signals 232: trap(:QUIT, "") 233: trap(:TERM, "") 234: trap(:TTIN, "") 235: trap(:TTOU, "") 236: trap(:INT, "") 237: end
redirect_io(file = nil)
# File lib/harbor/processor.rb, line 194 194: def redirect_io(file = nil) 195: STDIN.reopen "/dev/null" 196: STDOUT.reopen file || "/dev/null" 197: STDOUT.sync = true 198: 199: STDERR.reopen STDOUT 200: STDERR.sync = true 201: end
spawn_worker(task)
# File lib/harbor/processor.rb, line 167 167: def spawn_worker(task) 168: fork do 169: begin 170: $0 = "#{$0}[#{Time.now.strftime("%H:%M:%S")}] #{task.inspect}" 171: srand 172: ignore_signals 173: 174: process(task) 175: rescue => e 176: handle_exception(task, e) 177: 178: raise 179: end 180: end 181: end
trap_signals()
# File lib/harbor/processor.rb, line 203 203: def trap_signals 204: trap(:TTIN) do 205: self.worker_count += 1 206: logger.info "worker_count = #{worker_count}" 207: end 208: 209: trap(:TTOU) do 210: self.worker_count -= 1 211: logger.info "worker_count = #{worker_count}" 212: end 213: 214: graceful_shutdown = lambda do 215: @alive = false 216: self.worker_count = 0 217: logger.info "gracefully shutting down." 218: end 219: 220: trap(:QUIT, graceful_shutdown) 221: trap(:TERM, graceful_shutdown) 222: 223: shutdown = lambda do 224: @alive = false 225: self.worker_count = 0 226: logger.info "stopping workers and shutting down." 227: end 228: trap(:INT, shutdown) 229: end