Last active
July 10, 2017 11:43
-
-
Save csabahenk/1389b608b27881782d9e63cc4c14b9e1 to your computer and use it in GitHub Desktop.
Elasticsearch test script that indexes Mediawiki dumps
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'yaml' | |
require 'net/http' | |
require 'socket' | |
require 'uri' | |
require 'open-uri' | |
require 'pathname' | |
require 'fileutils' | |
require 'etc' | |
require 'json' | |
require 'logger' | |
require 'optparse' | |
require 'time' | |
require 'shellwords' | |
require 'zlib' | |
CONFFILE = "/etc/elasticsearch/elasticsearch.yml" | |
OPTS = { | |
statedir: "/var/local/elastictest", | |
width: 4, | |
elasticservice: { | |
'2.4' => [%w[sudo -u elasticsearch], :elasticsearch, | |
%w[-Des.default.path.home=/usr/share/elasticsearch | |
-Des.default.path.conf=/etc/elasticsearch]].flatten, | |
'5.x' => [%w[sudo -u elasticsearch env ES_HOME=/usr/share/elasticsearch], | |
:elasticsearch, "-Edefault.path.conf=/etc/elasticsearch"].flatten | |
}, | |
elasticsearch: "/usr/share/elasticsearch/bin/elasticsearch", | |
mediawiki: "en.wikiquote.org", | |
datauri: nil, # uri of a dump of :mediawiki | |
wikisettings: nil, | |
chunklines: 500, | |
debug: false, | |
strace: false, | |
straceopts: "-v -efile,desc -f -s500 -y -tt", | |
tracefiles: [], | |
} | |
ELASTICUSER = "elasticsearch" | |
ELASTICPORT = 9200 | |
ERRORPATTERN = /failed|Exception/ | |
# Ruby 2.0 compat shim | |
unless [].respond_to? :to_h | |
class Array | |
def to_h | |
h = {} | |
each { |k,v| h[k] = v } | |
h | |
end | |
end | |
end | |
# utility func for iterated field retrieval | |
def ipath a,*i | |
i.flatten.each { |j| a = a[j] } | |
a | |
end | |
# step the counter in pathname (eg. /foo/bar01 -> /foo/bar02) | |
# return [old counter, new pathname] | |
def nextpath path | |
(bp = File.basename(path)) =~ /\d+/ | |
k = $&.to_i | |
w = $&.size | |
[k, File.join(File.dirname(path), bp.sub($&, "%0#{w}d" % (k+1)))] | |
end | |
def savefile handle:nil, path:nil, targetdir:nil, blocksize: 1<<20, size:nil | |
[handle, path].compact.empty? and raise ArgumentError, | |
"one of handle and path should be specified" | |
path ||= handle.path | |
targetdir ||= File.dirname path | |
base = File.basename path | |
esc = proc {|*a| a.join.shellescape } | |
puts "Saving #{path} ..." | |
obj,sig = handle ? [handle, [:instance_eval]] : [Kernel, [:open, path]] | |
obj.send(*sig) { |h| | |
size ||= h.stat.size - h.pos | |
open("|pv -c -N #{esc["from: ", base]} -s #{size} | gzip " \ | |
"| pv -c -N #{esc["to: ", base, ".gz"]} > " \ | |
"#{esc[File.join(targetdir, base), ".gz"]}", "w") { |z| | |
while b = h.read(blocksize) | |
z << b | |
end | |
} | |
} | |
puts "OK." | |
end | |
exit if ENV["SYNTAX_TEST"] == "1" | |
############ | |
# load confs | |
############ | |
arrayopts = %i[straceopts] | |
conf = nil | |
op = OptionParser.new | |
optnormalize = Hash.new { |h,x| x }.merge! Hash => Array, | |
NilClass => String, | |
Fixnum => Integer | |
OPTS.each { |k,v| | |
op.on("--#{k} arg", optnormalize[v.class]) { |x| OPTS[k] = x } | |
} | |
op.on("-c", "--conf=<yaml>", "config file") { |f| conf = f } | |
op.parse! | |
OPTS.merge! case conf | |
when "-" | |
YAML.load STDIN | |
when String | |
YAML.load_file conf | |
when nil | |
{} | |
else | |
raise "wtf" | |
end | |
%i[strace debug].each { |o| | |
ENV[o.to_s.upcase] == "1" and OPTS[o] = true | |
} | |
$elasticconf = YAML.load_file CONFFILE | |
arrayopts.each { |o| | |
case OPTS[o] | |
when String | |
OPTS[o] = OPTS[o].shellsplit | |
end | |
} | |
################################################ | |
# create statedir and get entry of highest index | |
################################################ | |
FileUtils.mkpath OPTS[:statedir] | |
sde = Dir.glob(File.join OPTS[:statedir], "*").grep(/\d/) | |
sde.empty? and sde << File.join(OPTS[:statedir], "0"*OPTS[:width]) | |
sdh = sde.map { |d| | |
File.basename(d) =~ /\d+/ | |
[$&.to_i, d] | |
}.to_h | |
statedir_hi = sdh[sdh.keys.max] | |
################################################## | |
# associate next paths to various params we manage | |
################################################## | |
$resource,genh = {},{} | |
(%w[cluster.name path.data path.logs]. | |
map {|k| [k, $elasticconf[k]] } << [:statedir, statedir_hi] | |
).each { |k,v| genh[k],$resource[k] = nextpath v } | |
################################################################### | |
# check if the generation numbers agree and define actual generation | |
################################################################### | |
unless genh.values.uniq.size == 1 | |
raise "divergence in parameter generations: #{genh}.inspect" | |
end | |
startmsg = "Run #{genh.values[0] + 1}." | |
puts startmsg | |
###################### | |
# create the new paths | |
###################### | |
$resource.each { |k,pa| | |
# if not absolute, it's not really a path | |
Pathname.new(pa).absolute? or next | |
if File.exist? pa | |
# XXX: transactional rollback would be neat | |
raise "path for parameter #{k.inspect}: #{pa} already exists" | |
end | |
FileUtils.mkpath pa | |
if String === k | |
# paths that occur in conf file | |
File.chown nil, Etc.getgrnam(ELASTICUSER).gid, pa | |
File.chmod 0775, pa | |
end | |
} | |
################ | |
# set up logging | |
################ | |
logfiles = [] | |
logfiles << File.join($resource[:statedir], "test.log") | |
$logger = Logger.new logfiles.last | |
$logger.level = Logger::DEBUG | |
$logger.info startmsg | |
if OPTS[:debug] | |
logfiles << File.join($resource[:statedir], "debug.log") | |
$dbglogger = Logger.new logfiles.last | |
$dbglogger.level = Logger::DEBUG | |
else | |
class Mock | |
def method_missing *a, **kw, &b | |
end | |
end | |
$dbglogger = Mock.new | |
end | |
if OPTS[:strace] | |
$stracelog = File.join($resource[:statedir], "strace.log") | |
logfiles << $stracelog | |
end | |
logfiles.each { |f| | |
FileUtils.ln_sf File.join(*[$resource[:statedir], f].map { |d| File.basename d}), | |
File.dirname($resource[:statedir]) | |
} | |
begin | |
############# | |
# update conf | |
############# | |
$elasticconf.merge! $resource.select { |k,pa| String === k } | |
unless File.exist? "#{CONFFILE}.orig" | |
FileUtils.cp CONFFILE, "#{CONFFILE}.orig" | |
end | |
open("#{CONFFILE}.tmp", "w") { |f| f << $elasticconf.to_yaml } | |
File.rename "#{CONFFILE}.tmp", CONFFILE | |
############################################### | |
# check elastic version and set up service argv | |
############################################### | |
versionstr = nil | |
3.times { |i| | |
$elasticversion and break | |
puts "Checking for Elasticsearch version...#{i.zero? ? "" : " (#{i+1}. attempt)"}" | |
IO.popen([OPTS[:elasticsearch], "--version"], &:read) =~ /Version:\s+(\S+)/ | |
versionstr = $1 | |
$elasticversion = case versionstr | |
when /\A2\.4/ | |
'2.4' | |
when /\A5\.\d/ | |
'5.x' | |
end | |
} | |
unless $elasticversion | |
raise "unknown elasticsearch version #{versionstr}" | |
end | |
case OPTS[:elasticservice] | |
when Hash | |
OPTS[:elasticservice] = OPTS[:elasticservice][$elasticversion] | |
end | |
"#{versionstr} belongs to #{$elasticversion} family.".instance_eval { | |
puts self | |
$logger.info "Elasticsearch version: #{self}" | |
} | |
if OPTS[:strace] | |
OPTS[:elasticservice] = [%w[sudo strace], OPTS[:straceopts], "-o", $stracelog, | |
"--", OPTS[:elasticservice]].flatten | |
end | |
##################### | |
# Opening trace files | |
##################### | |
$tracing = OPTS[:tracefiles].map { |f| | |
h = open f, "r+" | |
h.seek 0, File::SEEK_END | |
[f, h] | |
}.to_h | |
##################### | |
# start elasticsearch | |
##################### | |
"Starting Elasticsearch service.".instance_eval { | |
puts self | |
$logger.info self | |
} | |
$elasticpidfile = File.join($resource['path.logs'], "elastic.pid") | |
pip = IO.pipe | |
elasticargs = OPTS[:elasticservice].map { |e| OPTS[e] || e } + | |
["-p", $elasticpidfile] | |
$dbglogger.debug elasticargs | |
$elasticdaemon = Process.spawn *elasticargs, | |
out: pip[1], err: %i[child out], rlimit_nofile: 1<<16 | |
pip[1].close | |
$elastic_out = pip[0] | |
$elasticlogfile = File.join($resource[:statedir], "elastic.log") | |
File.symlink File.join($resource['path.logs'], "#{$resource['cluster.name']}.log"), $elasticlogfile | |
# XXX copy-paste | |
FileUtils.ln_sf File.join(*[$resource[:statedir], $elasticlogfile].map { |d| File.basename d}), | |
File.dirname($resource[:statedir]) | |
################################## | |
# connect to Elasticsearch service | |
################################## | |
$elasticconn = Net::HTTP.new Socket.gethostname, ELASTICPORT | |
puts "Trying to connect to Elasticsearch ..." | |
health = loop do | |
begin | |
break $elasticconn.get("/_cat/health?v").body | |
rescue Errno::ECONNREFUSED | |
sleep 5 | |
end | |
end | |
puts "OK." | |
$logger.info "Elasticsearch health: #{health}" | |
############################# | |
# create index for :mediawiki | |
############################# | |
$logger.info "Creating index for #{OPTS[:mediawiki]}." | |
wikisettings = open(OPTS[:wikisettings]||"https://#{OPTS[:mediawiki]}/w/api.php?action=cirrus-settings-dump&format=json&formatversion=2") { |f| JSON.load f } | |
$logger.info $elasticconn.request(Net::HTTP::Put.new("/#{OPTS[:mediawiki]}?pretty"), | |
{'analysis' => ipath(wikisettings, %w[content page index analysis]), | |
'number_of_shards' => 1, 'number_of_replicas' => 0}.to_json | |
).body | |
case $elasticversion | |
when '2.4' | |
$logger.info $elasticconn.request(Net::HTTP::Put.new("/#{OPTS[:mediawiki]}/_mapping/page?pretty"), | |
JSON.load(wikisettings.to_json. | |
gsub('"index_analyzer"', '"analyzer"'). | |
gsub('"position_offset_gap"', '"position_increment_gap"'))['content'].to_json | |
).body | |
end | |
###################### | |
# open the data stream | |
###################### | |
$datastream = Zlib::GzipReader.new(if URI.parse(OPTS[:datauri]).scheme | |
# with Net:HTTP it's a pita to | |
# 1) deal with https and redirects | |
# 2) get an IO object for the response body | |
# so instead of using it, we just call out to curl | |
IO.popen ["curl", "-Ls", OPTS[:datauri]] | |
else | |
open OPTS[:datauri] | |
end) | |
###################### | |
# data indexing thread | |
###################### | |
$chan_r,chan_w = IO.pipe | |
Thread.new { | |
begin | |
databuf,$databuf_idx = [],1 | |
data_committed = nil | |
loop do | |
l = $datastream.gets | |
l and databuf << l | |
data_committed = false | |
if databuf.size == OPTS[:chunklines] or (not l and not databuf.empty?) | |
begin | |
rsp = $elasticconn.request_post("/#{OPTS[:mediawiki]}/_bulk", databuf.join) | |
rescue => exc | |
"POST failure: #{exc.class}: #{exc.message}".instance_eval { | |
puts self | |
$logger.warn self | |
} | |
break | |
end | |
rspdata = JSON.load rsp.body | |
$logger.info "Chunk #{$databuf_idx}: took #{rspdata['took']}" | |
databuf.clear | |
$databuf_idx += 1 | |
data_committed = true | |
end | |
l or break | |
end | |
# if we are at EOF of data, but the last bits of | |
# data has just been committed, then we take a rest | |
# to allow Elasticsearch to react on it, and continue | |
# the loop to check that reaction. Otherwise (ie. in | |
# the next turn) we break. | |
data_committed and sleep(0.1) | |
rescue Exception => x | |
$logger.error "index thread got exception #{x.inspect}" | |
ensure | |
chan_w << "Q" | |
end | |
} | |
########################################## | |
# checking ElasticSearch output for errors | |
########################################## | |
$error = false | |
turn = 0 | |
loop do | |
turn += 1 | |
ra,_,_ = IO.select [$elastic_out, $chan_r] | |
if ra.include? $chan_r | |
break | |
end | |
if ra.include? $elastic_out | |
l = $elastic_out.gets | |
$dbglogger.debug { [turn,l].inspect } | |
case l | |
when ERRORPATTERN | |
"Error at chunk #{$databuf_idx}: #{l.size > 500 ? "#{l[0...500]} ...(#{l.size})" : l}".instance_eval { | |
puts self | |
$logger.error self | |
} | |
$error = true | |
break | |
when nil | |
break | |
end | |
end | |
end | |
######### | |
# cleanup | |
######### | |
$logger.info "Cleaning up ..." | |
tclean = Time.now | |
$datastream.closed? or $datastream.close | |
elasticpid = begin | |
Integer IO.read($elasticpidfile).strip | |
rescue => exc | |
"Elastic pid file not available: #{exc.class}: #{exc.message}".instance_eval { | |
puts self | |
$logger.warn self | |
} | |
$elasticdaemon | |
end | |
Process.kill :TERM, elasticpid | |
fork do | |
sleep 1 | |
begin | |
Process.kill :KILL, elasticpid | |
rescue Errno::ESRCH | |
end | |
end | |
Process.wait $elasticdaemon | |
# if no error was captured on stdout, scrub the log file to check if there | |
# is some under the radar error there... | |
unless $error | |
errors = IO.readlines($elasticlogfile).grep(ERRORPATTERN) | |
unless errors.empty? | |
t = errors.map { |l| | |
begin | |
Time.parse l[0...32] | |
rescue ArgumentError | |
end | |
}.compact.min || Time.at(0) | |
if t <= tclean | |
{warn: "Elastic log contains UNNOTICED ERROR!", | |
error: "Error: #{errors.first}"}.each { |lv,msg| | |
puts msg | |
$logger.send lv, msg | |
} | |
$error = true | |
else | |
# We are not concerned if error is later then start of cleaning | |
# up, as then it's just the cleanup that caused an error. | |
# We just log the top of the error trace (as log file will be | |
# deleted, a last chance to show something). | |
errors[0..1].each { |e| | |
"Cleanup inflicted error: #{e}".instance_eval { | |
puts self | |
$logger.info self | |
} | |
} | |
end | |
end | |
end | |
# ditch Elasticsearch data if run was successful (= uninteresting) | |
if $error | |
["Retaining Elasticsearch data at #{$resource['path.data']}.", | |
"Disk usage: #{`df -h #{$resource['path.data'].shellescape}`}"].each { |msg| | |
puts msg | |
$logger.info msg | |
} | |
OPTS[:strace] and savefile path: $stracelog | |
$tracing.each { |f,h| savefile path: f, handle: h, targetdir: $resource[:statedir] } | |
else | |
FileUtils.rmtree $resource.values_at('path.data', 'path.logs') | |
end | |
OPTS[:strace] and File.delete $stracelog | |
$tracing.each_value { |h| h.truncate 0 } | |
$logger.info "Exiting." | |
puts | |
rescue => exc | |
$logger.fatal exc | |
raise | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment