# Ruby Script to Get messages from SQS containing information of Cloudtrail json.gz file in S3 # everytime a cloudtrail event occurs, AWS will upload the log in json.gz format to S3 and notify in SQS # we use SQS to get new log events and download from S3, combine all in one json file for bulk importing to Elasticserach # ready to be used with Kibana # Run this houly or every 30 minutes. require 'aws-sdk' require 'json' Aws.config.update({ region: 'ap-southeast-1' }) SQS_URL = "https://sqs.ap-southeast-1.amazonaws.com/xxxxxxxxx/xxxxxxxxx" SQS_ENDPOINT = "sqs.ap-southeast-1.amazonaws.com" BASE="AWSLogs/xxxxxxxxxxx/CloudTrail/ap-southeast-1" def gunzip(data) sio = StringIO.new(data) gz = Zlib::GzipReader.new(sio) read_data = gz.read gz.close read_data end @s3_client = Aws::S3::Client.new( region: 'ap-southeast-1', access_key_id: 'xxxxxxxxxxxx', secret_access_key: 'xxxxxxxxxxxxxxxxx' ) @sqs_client = Aws::SQS::Client.new( region: 'ap-southeast-1', access_key_id: 'xxxxxxxxxxxxxxxxx', secret_access_key: 'xxxxxxxxxxxxxxxxx' ) resp = @sqs_client.receive_message({ queue_url: SQS_URL, attribute_names: ["Policy", "VisibilityTimeout", "CreatedTimestamp"], message_attribute_names: ["MessageAttributeName"], max_number_of_messages: 10, # The maximum number of messages to return, max: 10 visibility_timeout: 60, # The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request wait_time_seconds: 1, }) @json = [] @message_handlers = [] resp.messages.each do |message| message.receipt_handle body = JSON.parse(message.body) timestamp = body["Timestamp"] msg = JSON.parse body["Message"] bucket = msg["s3Bucket"] key = msg["s3ObjectKey"][0] file = @s3_client.get_object( response_target: '/tmp/json.gz', bucket: bucket, key: key ) data = gunzip(File.read "/tmp/json.gz") if json_data = JSON.load(data)["Records"] @json += json_data @message_handlers << message.receipt_handle end end if @json.empty? puts "Nothing in SQS" exit end @logstash_date = Time.now.strftime("%Y.%m.%d") @all = File.open('all', 'w') @json.each do |json| date = json["eventTime"] @all.write %Q{{ "index" : { "_index" : "logstash-#{@logstash_date}", "_type" : "fluentd", "_timestamp" : "#{date}" } }} @all.write "\n" @all.write json.to_json @all.write "\n" end @all.close puts "-------------------" puts "Processing #{@json.size} requests for #{@logstash_date}" `curl -s -XPOST localhost:9200/_bulk --data-binary @all; echo` puts "Deleting #{@message_handlers.size} SQS messages" @message_handlers.each do |handler| @sqs_client.delete_message({ queue_url: SQS_URL, receipt_handle: handler }) end puts "Done"