Created
July 28, 2023 13:57
-
-
Save jsvd/a047320ff68bbe064e93dec0d6a251f7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# encoding: utf-8 | |
# JRUBY_OPTS="-J-Xmx4g -J-Xms4g" ruby beats_writer_ssl.rb | |
require "socket" | |
require "thread" | |
require "zlib" | |
require "json" | |
require "openssl" | |
Thread.abort_on_exception = true | |
HOST="127.0.0.1" | |
PORT=3333 | |
CLIENT_CERT="/Users/joaoduarte/elastic/certificates/client_from_root.crt" | |
CLIENT_KEY="/Users/joaoduarte/elastic/certificates/client_from_root.key.pkcs8" | |
module Lumberjack | |
SEQUENCE_MAX = (2**32-1).freeze | |
class Client | |
def initialize | |
@sequence = 0 | |
@socket = connect | |
end | |
private | |
def connect | |
socket = TCPSocket.new(HOST, PORT) | |
ctx = OpenSSL::SSL::SSLContext.new | |
ctx.cert = OpenSSL::X509::Certificate.new(File.read(CLIENT_CERT)) | |
ctx.key = OpenSSL::PKey::RSA.new(File.read(CLIENT_KEY)) | |
ctx.ssl_version = :TLSv1_2 | |
# Wrap the socket with SSL/TLS | |
ssl_socket = OpenSSL::SSL::SSLSocket.new(socket, ctx) | |
ssl_socket.sync_close = true | |
ssl_socket.connect | |
ssl_socket | |
end | |
public | |
def write(elements, chunk_size=5000, sleep_seconds=0.01) | |
elements = [elements] if elements.is_a?(Hash) | |
send_window_size(elements.size) | |
payload = elements.map { |element| JsonEncoder.to_frame(element, inc) }.join | |
send_payload(payload, chunk_size, sleep_seconds) | |
end | |
private | |
def inc | |
@sequence = 0 if @sequence + 1 > Lumberjack::SEQUENCE_MAX | |
@sequence = @sequence + 1 | |
end | |
private | |
def send_window_size(size) | |
@socket.syswrite(["2", "W", size].pack("AAN")) | |
end | |
private | |
def send_payload(payload, chunk_size, sleep_seconds) | |
payload_size = payload.size | |
written = 0 | |
while written < payload_size | |
written += @socket.syswrite(payload[written..written+chunk_size]) | |
puts "written #{written}.." | |
sleep sleep_seconds | |
end | |
end | |
public | |
def close | |
@socket.close | |
end | |
end | |
module JsonEncoder | |
def self.to_frame(hash, sequence) | |
json = hash.to_json | |
json_length = json.bytesize | |
pack = "AANNA#{json_length}" | |
frame = ["2", "J", sequence, json_length, json] | |
frame.pack(pack) | |
end | |
end | |
end | |
client_count = 1500 | |
message = 'a'*8*16*1024 | |
#require 'pry' | |
#binding.pry | |
puts "Connecting #{client_count} clients" | |
clients = client_count.times.map { Lumberjack::Client.new } | |
puts "Writing approximately #{(client_count*message.size)/1024.0/1024.0}Mib across #{client_count} clients" | |
threads = client_count.times.map do |i| | |
Thread.new(i) do |i| | |
client = clients[i] | |
# keep message size above 16k, requiring two TLS records | |
data = [ { "message" => message } ] | |
50.times do | |
client.write(data) | |
sleep 1*rand | |
end | |
client.close | |
end | |
end | |
threads.each(&:join) | |
puts "Done" | |
sleep 10 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/tmp/logstash-8.9.0 | |
❯ LS_JAVA_OPTS="-Dio.netty.allocator.numHeapArenas=0 -XX:NativeMemoryTracking=summary -Dio.netty.allocator.numDirectArenas=1 -XX:MaxDirectMemorySize=128m" bin/logstash -e " | |
input { | |
beats { | |
port => 3333 | |
enrich => none | |
ssl => true | |
ssl_certificate => '/Users/joaoduarte/elastic/certificates/client_from_root.crt' | |
ssl_key => '/Users/joaoduarte/elastic/certificates/client_from_root.key.pkcs8' | |
ssl_certificate_authorities => '/Users/joaoduarte/elastic/certificates/root.crt' | |
ssl_client_authentication => 'required' | |
} | |
} | |
filter { | |
ruby { | |
init => \"Thread.new { loop { puts Java::io.netty.buffer.ByteBufAllocator::DEFAULT.metric.toString(); sleep 5 } } \" | |
code => \"\" | |
} | |
} | |
output { null { } }" -w 1 -b 1 | |
Using bundled JDK: /tmp/logstash-8.9.0/jdk.app/Contents/Home | |
Sending Logstash logs to /tmp/logstash-8.9.0/logs which is now configured via log4j2.properties | |
[2023-07-28T14:55:53,810][INFO ][logstash.runner ] Log4j configuration path used is: /tmp/logstash-8.9.0/config/log4j2.properties | |
[2023-07-28T14:55:53,814][WARN ][logstash.runner ] The use of JAVA_HOME has been deprecated. Logstash 8.0 and later ignores JAVA_HOME and uses the bundled JDK. Running Logstash with the bundled JDK is recommended. The bundled JDK has been verified to work with each specific version of Logstash, and generally provides best performance and reliability. If you have compelling reasons for using your own JDK (organizational-specific compliance requirements, for example), you can configure LS_JAVA_HOME to use that version instead. | |
[2023-07-28T14:55:53,814][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"8.9.0", "jruby.version"=>"jruby 9.3.10.0 (2.6.8) 2023-02-01 107b2e6697 OpenJDK 64-Bit Server VM 17.0.7+7 on 17.0.7+7 +indy +jit [arm64-darwin]"} | |
[2023-07-28T14:55:53,815][INFO ][logstash.runner ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Dio.netty.allocator.numHeapArenas=0, -XX:NativeMemoryTracking=summary, -Dio.netty.allocator.numDirectArenas=1, -XX:MaxDirectMemorySize=128m, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED] | |
[2023-07-28T14:55:53,831][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified | |
/private/tmp/logstash-8.9.0/vendor/bundle/jruby/2.6.0/gems/sinatra-2.2.4/lib/sinatra/base.rb:938: warning: constant Tilt::Cache is deprecated | |
[2023-07-28T14:55:54,067][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false} | |
[2023-07-28T14:55:54,186][INFO ][org.reflections.Reflections] Reflections took 44 ms to scan 1 urls, producing 132 keys and 464 values | |
[2023-07-28T14:55:54,277][WARN ][logstash.inputs.beats ] You are using a deprecated config setting "ssl" set in beats. Deprecated settings will continue to work, but are scheduled for removal from logstash in the future. Use 'ssl_enabled' instead. If you have any questions about this, please visit the #logstash channel on freenode irc. {:name=>"ssl", :plugin=><LogStash::Inputs::Beats ssl_certificate=>"/Users/joaoduarte/elastic/certificates/client_from_root.crt", ssl_client_authentication=>"required", ssl_key=>"/Users/joaoduarte/elastic/certificates/client_from_root.key.pkcs8", port=>3333, enrich=>["none"], id=>"812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb", ssl=>true, ssl_certificate_authorities=>["/Users/joaoduarte/elastic/certificates/root.crt"], enable_metric=>true, debug=>false, codec=><LogStash::Codecs::Plain id=>"plain_a39ca8af-f3a5-467d-a33b-7392b8dad781", enable_metric=>true, charset=>"UTF-8">, host=>"0.0.0.0", ssl_enabled=>false, ssl_verify_mode=>"none", ssl_peer_metadata=>false, include_codec_tag=>true, ssl_handshake_timeout=>10000, ssl_cipher_suites=>["TLS_AES_256_GCM_SHA384", "TLS_AES_128_GCM_SHA256", "TLS_CHACHA20_POLY1305_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384", "TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256", "TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384", "TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256", "TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256"], ssl_supported_protocols=>["TLSv1.2", "TLSv1.3"], client_inactivity_timeout=>60, executor_threads=>10, add_hostname=>false, tls_min_version=>1, tls_max_version=>1.3>} | |
[2023-07-28T14:55:54,321][INFO ][logstash.javapipeline ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise. | |
[2023-07-28T14:55:54,328][WARN ][logstash.javapipeline ][main] 'pipeline.ordered' is enabled and is likely less efficient, consider disabling if preserving event order is not necessary | |
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 0; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 0; chunkSize: 4194304) | |
[2023-07-28T14:55:54,361][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>1, "pipeline.batch.size"=>1, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x262d8cc@/private/tmp/logstash-8.9.0/logstash-core/lib/logstash/java_pipeline.rb:134 run>"} | |
[2023-07-28T14:55:54,578][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.22} | |
[2023-07-28T14:55:54,585][INFO ][logstash.inputs.beats ][main] Starting input listener {:address=>"0.0.0.0:3333"} | |
[2023-07-28T14:55:54,859][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"} | |
[2023-07-28T14:55:54,866][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]} | |
[2023-07-28T14:55:54,879][INFO ][org.logstash.beats.Server][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Starting server on port: 3333 | |
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304) | |
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304) | |
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304) | |
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304) | |
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 4194304; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304) | |
PooledByteBufAllocatorMetric(usedHeapMemory: -1; usedDirectMemory: 130023424; numHeapArenas: 0; numDirectArenas: 1; smallCacheSize: 256; normalCacheSize: 64; numThreadLocalCaches: 20; chunkSize: 4194304) | |
[2023-07-28T14:56:27,882][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 106825184, ratio: 82 | |
[2023-07-28T14:56:27,886][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 106976992, ratio: 82 | |
[2023-07-28T14:56:27,884][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0xd4882a85, L:/127.0.0.1:3333 - R:/127.0.0.1:55498] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash | |
[2023-07-28T14:56:27,944][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x36b863a5, L:/127.0.0.1:3333 - R:/127.0.0.1:55411] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash | |
[2023-07-28T14:56:28,477][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 107220064, ratio: 82 | |
[2023-07-28T14:56:28,479][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x03117f6f, L:/127.0.0.1:3333 - R:/127.0.0.1:55880] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash | |
[2023-07-28T14:56:29,069][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 107207776, ratio: 82 | |
[2023-07-28T14:56:29,115][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x13a4bfeb, L:/127.0.0.1:3333 - R:/127.0.0.1:55419] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash | |
[2023-07-28T14:56:32,060][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 107142240, ratio: 82 | |
[2023-07-28T14:56:32,061][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x14d57f24, L:/127.0.0.1:3333 - R:/127.0.0.1:56154] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash | |
[2023-07-28T14:56:34,493][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 109245488, ratio: 84 | |
[2023-07-28T14:56:34,494][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x36b863a5, L:/127.0.0.1:3333 - R:/127.0.0.1:55411] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash | |
[2023-07-28T14:56:35,112][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 109215792, ratio: 84 | |
[2023-07-28T14:56:35,114][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x7ef85f80, L:/127.0.0.1:3333 - R:/127.0.0.1:55612] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash | |
[2023-07-28T14:56:39,345][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 109069360, ratio: 84 | |
[2023-07-28T14:56:39,390][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0x0afee5ea, L:/127.0.0.1:3333 - R:/127.0.0.1:55462] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash | |
[2023-07-28T14:56:40,526][INFO ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Direct memory status, used: 130023424, pinned: 109685392, ratio: 84 | |
[2023-07-28T14:56:40,528][WARN ][org.logstash.beats.OOMConnectionCloser][main][812dd43ba5f9f8d9c0da0b036a89d22542e08782a19ec4a7854c64e7ad7aa2bb] Dropping connection [id: 0xc3888ed5, L:/127.0.0.1:3333 - R:/127.0.0.1:55670] because run out of direct memory. To fix it, check if the upstream source has a way to lower the number of concurrent connections or reduce the bulk's size it transmits. Raise up the -XX:MaxDirectMemorySize option in the JVM running Logstash | |
^C[2023-07-28T14:56:40,698][WARN ][logstash.runner ] SIGINT received. Shutting down. | |
^C[2023-07-28T14:56:40,890][FATAL][logstash.runner ] SIGINT received. Terminating immediately.. | |
[2023-07-28T14:56:40,912][FATAL][org.logstash.Logstash ] | |
org.jruby.exceptions.ThreadKill: null |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment