# Ruby Script to Get messages from SQS containing information of Cloudtrail json.gz file in S3
# everytime a cloudtrail event occurs, AWS will upload the log in json.gz format to S3 and notify in SQS
# we use SQS to get new log events and download from S3, combine all in one json file for bulk importing to Elasticserach
# ready to be used with Kibana
# Run this houly or every 30 minutes.


require 'aws-sdk'
require 'json'

Aws.config.update({
  region: 'ap-southeast-1'
})

SQS_URL = "https://sqs.ap-southeast-1.amazonaws.com/xxxxxxxxx/xxxxxxxxx"
SQS_ENDPOINT = "sqs.ap-southeast-1.amazonaws.com"

BASE="AWSLogs/xxxxxxxxxxx/CloudTrail/ap-southeast-1"

def gunzip(data)
  sio = StringIO.new(data)
  gz = Zlib::GzipReader.new(sio)
  read_data = gz.read
  gz.close
  read_data
end

@s3_client = Aws::S3::Client.new(
  region: 'ap-southeast-1',
  access_key_id: 'xxxxxxxxxxxx',
  secret_access_key: 'xxxxxxxxxxxxxxxxx'
)

@sqs_client = Aws::SQS::Client.new(
  region: 'ap-southeast-1',
  access_key_id: 'xxxxxxxxxxxxxxxxx',
  secret_access_key: 'xxxxxxxxxxxxxxxxx'
)

resp = @sqs_client.receive_message({
  queue_url: SQS_URL,
  attribute_names: ["Policy", "VisibilityTimeout", "CreatedTimestamp"],
  message_attribute_names: ["MessageAttributeName"],
  max_number_of_messages: 10, # The maximum number of messages to return, max: 10
  visibility_timeout: 60, # The duration in seconds that the received messages are hidden from subsequent retrieve requests after being retrieved by a ReceiveMessage request
  wait_time_seconds: 1,
})

@json = []
@message_handlers = []

resp.messages.each do |message|
  message.receipt_handle
  body = JSON.parse(message.body)
  timestamp = body["Timestamp"]
  msg = JSON.parse body["Message"]
  bucket = msg["s3Bucket"]
  key = msg["s3ObjectKey"][0]

  file = @s3_client.get_object(
    response_target: '/tmp/json.gz',
    bucket: bucket,
    key: key
  )

  data = gunzip(File.read "/tmp/json.gz")
  if json_data = JSON.load(data)["Records"]
    @json += json_data
    @message_handlers << message.receipt_handle
  end

end

if @json.empty?
  puts "Nothing in SQS"
  exit
end

@logstash_date = Time.now.strftime("%Y.%m.%d")

@all = File.open('all', 'w')
@json.each do |json|
  date = json["eventTime"]
  @all.write %Q{{ "index" : { "_index" : "logstash-#{@logstash_date}", "_type" : "fluentd", "_timestamp" : "#{date}" } }}
  @all.write "\n"
  @all.write json.to_json
  @all.write "\n"
end
@all.close

puts "-------------------"
puts "Processing #{@json.size} requests for #{@logstash_date}"
`curl -s -XPOST localhost:9200/_bulk --data-binary @all; echo`

puts "Deleting #{@message_handlers.size} SQS messages"
@message_handlers.each do |handler|
  @sqs_client.delete_message({
    queue_url: SQS_URL,
    receipt_handle: handler
  })
end

puts "Done"