Skip to content

Instantly share code, notes, and snippets.

@naoh16
Last active January 31, 2025 10:34
Show Gist options
  • Save naoh16/342b088032a6d37c73d86fe58120bcf1 to your computer and use it in GitHub Desktop.
Save naoh16/342b088032a6d37c73d86fe58120bcf1 to your computer and use it in GitHub Desktop.
AI・IoT PBL 2024
"""
Example of abnormaly detection for 3D accelerometer
- Original source code was written by S. Kobayashi.
- Then, the source code was modified by by S. Hara.
"""
import json
import time
import argparse
import datetime
# KafkaConsumer document -> https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
from kafka import KafkaConsumer
import pandas as pd
import numpy as np
import streamlit as st
#import matplotlib.pyplot as plt
TIME_TERM = datetime.timedelta(seconds=10)
import det1
def main_loop(consumer: KafkaConsumer, key_spec: str, value_spec: list, value_fn):
df_key_spec = value_spec + [ key_spec ] # concat list
df = pd.DataFrame([], columns=df_key_spec)
try :
for message in consumer:
# print("DEBUG: key: {0}, value: {1}".format(message.key, message.value))
## Update history (dataframe)
_key = pd.to_datetime(message.value[key_spec])
df.loc[_key] = message.value
## Anomaly detection (note: score calculation only)
_score = det1.anomaly_score_by_value(df, _key, message.value, value_fn)
# _score = det1.anomaly_score_by_collection(df, _key, message.value, value_fn)
## Update chart of Streamlit App
update_chart(_score)
print("")
except KeyboardInterrupt:
print("process interrupted")
return
# グラフを描画する
chart_y = [0.0]
chart_area = None
def update_chart(new_score=None):
global chart_y, chart_obj
if new_score is None or chart_obj is None:
chart_obj = st.empty()
else:
chart_y.append(new_score)
chart_obj.line_chart(chart_y)
time.sleep(0.01)
if __name__ == "__main__":
st.title('streamlit example')
update_chart(None)
consumer = KafkaConsumer(
"sensor1",
bootstrap_servers = "127.0.0.1:9092",
auto_offset_reset='earliest',
## convert message key to datetime.datetime
key_deserializer=lambda m: m.decode("utf-8"),
# convert message value to dict
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
main_loop(consumer,
key_spec='date',
value_spec=['ax', 'ay', 'az'],
value_fn=lambda v: v['ax'])
"""
Example of abnormaly detection for 3D accelerometer
- Original source code was written by S. Kobayashi.
- Then, the source code was modified by by S. Hara.
"""
import json
import time
import argparse
import datetime
# KafkaConsumer document -> https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
from kafka import KafkaConsumer
import pandas as pd
import numpy as np
TIME_TERM = datetime.timedelta(seconds=10)
def anomaly_score_by_value(df, current_time, current_value, value_fn):
# extract data for specified time range (TIME_TERM)
current_value = value_fn(current_value)
usual_values = df[df.index > current_time-TIME_TERM].apply(value_fn,axis=1).to_numpy()
# calculate anomaly score
return np.abs(current_value - np.average(usual_values)) - np.std(usual_values) * 2
def test_value_anomaly(df, current_time, current_value, value_fn):
anomaly_score = anomaly_score_by_value(df, current_time, current_value, value_fn)
anomaly_message = "Normal" if anomaly_score < 0 else "*Anomaly!*"
print("> Value anomaly score: {0}, {1}".format(anomaly_score, anomaly_message))
def anomaly_score_by_collection(df, current_time, current_value, value_fn=None):
# ignore this step if there are no enough messages
if len(df.index) < 3:
return 9999
# calculate time interval of data collection
tmp = []
for i in range(len(df.index) - 1):
tmp.append((df.index[i+1] - df.index[i]).total_seconds())
delta = np.array(tmp)
new_delta = delta[-1]
#delta = np.array([(df.index[i+1] - df.index[i]).total_seconds() for i in range(len(df.index) - 1)])
# calculate anomaly score
return np.abs(new_delta - np.average(delta)) - np.std(delta) * 2
def test_collection_anomaly(df, current_time, current_value, value_fn=None):
anomaly_score = anomaly_score_by_collection(df, current_time, current_value)
anomaly_message = "Normal" if anomaly_score < 0 else "*Anomaly!*"
#print("Time since last message: {0} seconds".format(new_delta))
print("> Data collection anomaly score: {0}, {1}".format(anomaly_score, anomaly_message))
def main_loop(consumer: KafkaConsumer, key_spec: str, value_spec: list, value_fn):
df_key_spec = value_spec + [ key_spec ] # concat list
df = pd.DataFrame([], columns=df_key_spec)
try :
for message in consumer:
print("DEBUG: key: {0}, value: {1}".format(message.key, message.value))
# Update history (dataframe)
_key = pd.to_datetime(message.value[key_spec])
df.loc[_key] = message.value
# Anomaly detection
test_value_anomaly(df, _key, message.value, value_fn)
test_collection_anomaly(df, _key, message.value)
print("")
except KeyboardInterrupt:
print("process interrupted")
return
if __name__ == "__main__":
consumer = KafkaConsumer(
"sensor1",
bootstrap_servers = "127.0.0.1:9092",
auto_offset_reset='earliest',
## convert message key to datetime.datetime
key_deserializer=lambda m: m.decode("utf-8"),
# convert message value to dict
value_deserializer=lambda m: json.loads(m.decode("utf-8"))
)
main_loop(consumer,
key_spec='date',
value_spec=['ax', 'ay', 'az'],
value_fn=lambda v: v['ax'])
import math
import random
import json
from datetime import datetime, timedelta, timezone
import uuid
device_id = 'sensor1'
count = 50
_seed = 10000
random.seed(_seed)
# starttime = datetime.now(tz=timezone(timedelta(hours=+9), 'JST'))
starttime = datetime.now(tz=timezone.utc)
factor = 1
for n in range(count):
if n == 30: factor = 100.0
if n == 40: factor = 1
x = 2 * math.cos(2.0*3.14*n/10.0) + random.gauss(0.0, factor*0.05)
y = 2 * math.sin(2.0*3.14*n/10.0) + random.gauss(0.0, factor*0.05)
z = -9.8 + random.gauss(0.0, factor*0.05)
_date = starttime + timedelta(seconds=n+random.gauss(0.0, 0.0005))
_key = str(uuid.uuid4())
_value = {
"date": _date.isoformat(),
"ax": x,
"ay": y,
"az": z
}
# Key[TAB]Value とする.
# kafka-console-producer.sh で --property "parse.key=true" も必要
print('\t'.join([_key, json.dumps(_value)]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment