Created
April 28, 2023 18:01
-
-
Save jcantrill/4dafdf19ef3acea1e716fb4fdb787e9d to your computer and use it in GitHub Desktop.
fluentd rotation test
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'file-tail' | |
source_dir = ARGV.length > 0 ? ARGV[0] : '/tmp/loopfs/test' | |
no_of_sources = ARGV.length > 1 ? ARGV[1].to_i : 1 | |
msg_size = ARGV.length > 2 ? ARGV[2].to_i : 1 | |
pos_file = "#{source_dir}/my.pos" | |
running = true | |
unwatched = "".rjust(16,'f') | |
#reader | |
no_of_sources.times do |i| | |
Thread.start do | |
sleep(10) | |
next_line = 0 | |
mutex = Mutex.new | |
begin | |
filename = "/tmp/out/loopfs.in.#{i}.log.log" | |
File.open(filename) do |log| | |
log.extend(File::Tail) | |
# log.interval # 10 | |
# log.backward(10) | |
log.tail do |line| | |
mutex.synchronize do | |
# puts "#{line.strip().split(' ')[0].to_i} #{line} #{line.strip().split(' ')}" | |
line = line.strip().split(' ')[0].to_i | |
# exit 1 | |
if line == next_line | |
next_line += 1 | |
else | |
if line <= next_line | |
# puts "Read thread-#{i} seems to have been reset. next_line: #{next_line} entry: #{line}" | |
next_line = line + 1 | |
else | |
puts "Read thread-#{i}: Missed a block. exp. next_line: #{next_line} entry: #{line}" | |
next_line = line + 1 | |
end | |
end | |
end | |
end | |
end | |
rescue=>e | |
puts "exiting file reader thread #{i}: #{e}" | |
end | |
end | |
end | |
#writer | |
msg = "0" * msg_size | |
no_of_sources.times do |i| | |
Thread.start do | |
begin | |
counter = 0 | |
while running do | |
source_file = "#{source_dir}/#{i}.log" | |
`echo #{counter},#{msg} >> #{source_file}` | |
counter += 1 | |
end | |
Thread.pass | |
rescue=>e | |
puts "Terminating file write thread: #{e}" | |
running = false | |
ensure | |
puts "exiting file write thread" | |
end | |
end | |
end | |
#rotater | |
Thread.start do | |
sleep(10) | |
begin | |
while running do | |
sleep(15) | |
no_of_sources.times do |i| | |
source_file = "#{source_dir}/#{i}.log" | |
target_file = "#{source_file}.#{Time.now.to_i}" | |
`mv #{source_file} #{target_file}` | |
end | |
end | |
rescue=>e | |
puts "Terminating rotation thread: #{e}" | |
running = false | |
ensure | |
puts "exiting rotation thread" | |
end | |
end | |
#cleaner | |
Thread.start do | |
begin | |
while running do | |
sleep(15) | |
now = Time.now.to_i | |
pattern = "#{source_dir}/*.log.**" | |
Dir[pattern].each do |f| | |
# puts "checking #{f}" | |
moved_on = f.split('.').last.to_i | |
if now - moved_on > 8 #sec | |
# puts "removing #{f}" | |
`rm #{f}` | |
end | |
end | |
end | |
rescue=>e | |
puts "Terminating cleanup thread: #{e}" | |
running = false | |
ensure | |
puts "exiting cleanup thread" | |
end | |
end | |
#watch checker | |
Thread.start do | |
sleep(5) | |
begin | |
watches = {} | |
ruby_pid=`ps -ef | grep ruby | grep posi|grep -m 1 -oE '[0-9]*' | head -1`.split("\n")[0] | |
while running | |
sleep(3) | |
now = Time.now() | |
cmd = "lsof -nPw -p #{ruby_pid} | grep -o \"#{source_dir}/.*$\"" | |
lsof_out = `#{cmd}` | |
lsof_out.split("\n").sort.each do |file| | |
watches[file] = now #updates the ones we are watching | |
end | |
no_of_sources.times do |i| | |
source_file = "#{source_dir}/#{i}.log" | |
value = watches[source_file] | |
if value.nil? | |
puts "watch tread. no watch for #{source_file}" | |
elsif now.to_i > value.to_i + 5 #if our entry is stale then means no watch for a given file | |
puts "[WARN] watch tread. no watch for: #{now.to_i - value.to_i}s. file: #{source_file} lastwatch: #{value}" | |
end | |
end | |
end | |
rescue=>e | |
puts "Terminating lsof thread: #{e.backtrace.join("\n")}" | |
running = false | |
ensure | |
puts "exiting lsof file thread" | |
end | |
end | |
while running do | |
sleep(5) | |
end | |
exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment