Created
September 6, 2012 20:12
-
-
Save leandrosilva/3660045 to your computer and use it in GitHub Desktop.
My dumb Python script to stream Syslog messages to a Kafka server
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# ========================== | |
# Getting Started with klogd | |
# ========================== | |
# | |
# Klogd is a dumb script to stream Syslog messages to a Kafka server. | |
# | |
# 1) Make sure you have all dependencies installed properly | |
# | |
# - Twisted | |
# - PyParsing | |
# - PyKafka | |
# | |
# 2) Make sure you have Kafka up and running | |
# | |
# http://incubator.apache.org/kafka/quickstart.html | |
# | |
# 3) Configure Syslog messages routing (/etc/syslog.conf on Mac OS X) | |
# | |
# *.info;authpriv,remoteauth,ftp,install,internal.none @127.0.0.1:1514 | |
# | |
# 4) Re-launch Syslog deamon (on Mac OS X): | |
# | |
# $ launchctl unload /System/Library/LaunchDaemons/com.apple.syslogd.plist | |
# $ launchctl load /System/Library/LaunchDaemons/com.apple.syslogd.plist | |
# | |
# 5) Start klogd | |
# | |
# $ python klogd.py | |
# | |
# 6) Test | |
# | |
# $ logger -p local0.info -t test.app "bla bla bla info info info" | |
from pyparsing import Word, alphas, Suppress, Combine, nums, string, Optional, Regex | |
from twisted.internet import reactor | |
from twisted.internet.protocol import DatagramProtocol, Factory | |
from time import strftime | |
import json, kafka | |
severity = ["emerg", "alert", "crit", "err", "warn", "notice", "info", "debug",] | |
facility = ["kern", "user", "mail", "daemon", "auth", "syslog", "lpr", "news", | |
"uucp", "cron", "authpriv", "ftp", "ntp", "audit", "alert", "at", "local0", | |
"local1", "local2", "local3", "local4", "local5", "local6", "local7",] | |
class Parser(object): | |
def __init__(self): | |
self.__pattern = self.__build_pattern() | |
def __build_pattern(self): | |
ints = Word(nums) | |
# priority | |
priority = Suppress("<") + ints + Suppress(">") | |
# timestamp | |
month = Word(string.uppercase, string.lowercase, exact=3) | |
day = ints | |
hour = Combine(ints + ":" + ints + ":" + ints) | |
timestamp = month + day + hour | |
# hostname | |
hostname = Word(alphas + nums + "_" + "-" + ".") | |
# appname | |
appname = Word(alphas + "/" + "-" + "_" + ".") + Optional(Suppress("[") + ints + Suppress("]")) + Suppress(":") | |
# message | |
message = Regex(".*") | |
# pattern build | |
return priority + timestamp + hostname + appname + message | |
def parse(self, line, (host, port)): | |
parsed_line = self.__pattern.parseString(line) | |
_priority = parsed_line[0] | |
(_facility, _severity) = self.__get_level(_priority) | |
payload = {} | |
payload["priority"] = _priority | |
payload["facility"] = _facility | |
payload["severity"] = _severity | |
payload["timestamp"] = strftime("%Y-%m-%d %H:%M:%S") | |
payload["hostname"] = parsed_line[4] | |
payload["hostaddr"] = host | |
payload["hostport"] = port | |
payload["appname"] = parsed_line[5] | |
payload["pid"] = parsed_line[6] | |
payload["message"] = parsed_line[7] | |
return json.dumps(payload) | |
def __get_level(self, priority): | |
_priority = int(priority) | |
_facility = _priority / 8 | |
_severity = _priority % 8 | |
return (facility[_facility], severity[_severity]) | |
class Receiver(DatagramProtocol): | |
def __init__(self): | |
self.__parser = Parser() | |
def datagramReceived(self, data, (host, port)): | |
payload = self.__parser.parse(data, (host, port)) | |
self.__send_to_kafka(payload) | |
def __send_to_kafka(self, payload): | |
producer = kafka.producer.Producer('klog', host="127.0.0.1", port=9092) | |
message = kafka.message.Message(payload) | |
producer.send(message) | |
# Let's kick off | |
def main(): | |
print "Listening UDP on port 1514" | |
reactor.listenUDP(1514, Receiver()) | |
reactor.run() | |
if __name__ == "__main__": | |
main() | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Consumes klog topic from Kafka, for test purpose. | |
# | |
import kafka | |
consumer = kafka.consumer.Consumer("klog") | |
for message in consumer.loop(): | |
print "received:", message | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment