Skip to content

Instantly share code, notes, and snippets.

@jcantrill
Created April 28, 2023 18:01
Show Gist options
  • Save jcantrill/4dafdf19ef3acea1e716fb4fdb787e9d to your computer and use it in GitHub Desktop.
Save jcantrill/4dafdf19ef3acea1e716fb4fdb787e9d to your computer and use it in GitHub Desktop.
fluentd rotation test
require 'file-tail'
source_dir = ARGV.length > 0 ? ARGV[0] : '/tmp/loopfs/test'
no_of_sources = ARGV.length > 1 ? ARGV[1].to_i : 1
msg_size = ARGV.length > 2 ? ARGV[2].to_i : 1
pos_file = "#{source_dir}/my.pos"
running = true
unwatched = "".rjust(16,'f')
#reader
no_of_sources.times do |i|
Thread.start do
sleep(10)
next_line = 0
mutex = Mutex.new
begin
filename = "/tmp/out/loopfs.in.#{i}.log.log"
File.open(filename) do |log|
log.extend(File::Tail)
# log.interval # 10
# log.backward(10)
log.tail do |line|
mutex.synchronize do
# puts "#{line.strip().split(' ')[0].to_i} #{line} #{line.strip().split(' ')}"
line = line.strip().split(' ')[0].to_i
# exit 1
if line == next_line
next_line += 1
else
if line <= next_line
# puts "Read thread-#{i} seems to have been reset. next_line: #{next_line} entry: #{line}"
next_line = line + 1
else
puts "Read thread-#{i}: Missed a block. exp. next_line: #{next_line} entry: #{line}"
next_line = line + 1
end
end
end
end
end
rescue=>e
puts "exiting file reader thread #{i}: #{e}"
end
end
end
#writer
msg = "0" * msg_size
no_of_sources.times do |i|
Thread.start do
begin
counter = 0
while running do
source_file = "#{source_dir}/#{i}.log"
`echo #{counter},#{msg} >> #{source_file}`
counter += 1
end
Thread.pass
rescue=>e
puts "Terminating file write thread: #{e}"
running = false
ensure
puts "exiting file write thread"
end
end
end
#rotater
Thread.start do
sleep(10)
begin
while running do
sleep(15)
no_of_sources.times do |i|
source_file = "#{source_dir}/#{i}.log"
target_file = "#{source_file}.#{Time.now.to_i}"
`mv #{source_file} #{target_file}`
end
end
rescue=>e
puts "Terminating rotation thread: #{e}"
running = false
ensure
puts "exiting rotation thread"
end
end
#cleaner
Thread.start do
begin
while running do
sleep(15)
now = Time.now.to_i
pattern = "#{source_dir}/*.log.**"
Dir[pattern].each do |f|
# puts "checking #{f}"
moved_on = f.split('.').last.to_i
if now - moved_on > 8 #sec
# puts "removing #{f}"
`rm #{f}`
end
end
end
rescue=>e
puts "Terminating cleanup thread: #{e}"
running = false
ensure
puts "exiting cleanup thread"
end
end
#watch checker
Thread.start do
sleep(5)
begin
watches = {}
ruby_pid=`ps -ef | grep ruby | grep posi|grep -m 1 -oE '[0-9]*' | head -1`.split("\n")[0]
while running
sleep(3)
now = Time.now()
cmd = "lsof -nPw -p #{ruby_pid} | grep -o \"#{source_dir}/.*$\""
lsof_out = `#{cmd}`
lsof_out.split("\n").sort.each do |file|
watches[file] = now #updates the ones we are watching
end
no_of_sources.times do |i|
source_file = "#{source_dir}/#{i}.log"
value = watches[source_file]
if value.nil?
puts "watch tread. no watch for #{source_file}"
elsif now.to_i > value.to_i + 5 #if our entry is stale then means no watch for a given file
puts "[WARN] watch tread. no watch for: #{now.to_i - value.to_i}s. file: #{source_file} lastwatch: #{value}"
end
end
end
rescue=>e
puts "Terminating lsof thread: #{e.backtrace.join("\n")}"
running = false
ensure
puts "exiting lsof file thread"
end
end
while running do
sleep(5)
end
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment