Last active
January 31, 2025 10:34
-
-
Save naoh16/342b088032a6d37c73d86fe58120bcf1 to your computer and use it in GitHub Desktop.
AI・IoT PBL 2024
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example of abnormaly detection for 3D accelerometer | |
- Original source code was written by S. Kobayashi. | |
- Then, the source code was modified by by S. Hara. | |
""" | |
import json | |
import time | |
import argparse | |
import datetime | |
# KafkaConsumer document -> https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html | |
from kafka import KafkaConsumer | |
import pandas as pd | |
import numpy as np | |
import streamlit as st | |
#import matplotlib.pyplot as plt | |
TIME_TERM = datetime.timedelta(seconds=10) | |
import det1 | |
def main_loop(consumer: KafkaConsumer, key_spec: str, value_spec: list, value_fn): | |
df_key_spec = value_spec + [ key_spec ] # concat list | |
df = pd.DataFrame([], columns=df_key_spec) | |
try : | |
for message in consumer: | |
# print("DEBUG: key: {0}, value: {1}".format(message.key, message.value)) | |
## Update history (dataframe) | |
_key = pd.to_datetime(message.value[key_spec]) | |
df.loc[_key] = message.value | |
## Anomaly detection (note: score calculation only) | |
_score = det1.anomaly_score_by_value(df, _key, message.value, value_fn) | |
# _score = det1.anomaly_score_by_collection(df, _key, message.value, value_fn) | |
## Update chart of Streamlit App | |
update_chart(_score) | |
print("") | |
except KeyboardInterrupt: | |
print("process interrupted") | |
return | |
# グラフを描画する | |
chart_y = [0.0] | |
chart_area = None | |
def update_chart(new_score=None): | |
global chart_y, chart_obj | |
if new_score is None or chart_obj is None: | |
chart_obj = st.empty() | |
else: | |
chart_y.append(new_score) | |
chart_obj.line_chart(chart_y) | |
time.sleep(0.01) | |
if __name__ == "__main__": | |
st.title('streamlit example') | |
update_chart(None) | |
consumer = KafkaConsumer( | |
"sensor1", | |
bootstrap_servers = "127.0.0.1:9092", | |
auto_offset_reset='earliest', | |
## convert message key to datetime.datetime | |
key_deserializer=lambda m: m.decode("utf-8"), | |
# convert message value to dict | |
value_deserializer=lambda m: json.loads(m.decode("utf-8")) | |
) | |
main_loop(consumer, | |
key_spec='date', | |
value_spec=['ax', 'ay', 'az'], | |
value_fn=lambda v: v['ax']) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Example of abnormaly detection for 3D accelerometer | |
- Original source code was written by S. Kobayashi. | |
- Then, the source code was modified by by S. Hara. | |
""" | |
import json | |
import time | |
import argparse | |
import datetime | |
# KafkaConsumer document -> https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html | |
from kafka import KafkaConsumer | |
import pandas as pd | |
import numpy as np | |
TIME_TERM = datetime.timedelta(seconds=10) | |
def anomaly_score_by_value(df, current_time, current_value, value_fn): | |
# extract data for specified time range (TIME_TERM) | |
current_value = value_fn(current_value) | |
usual_values = df[df.index > current_time-TIME_TERM].apply(value_fn,axis=1).to_numpy() | |
# calculate anomaly score | |
return np.abs(current_value - np.average(usual_values)) - np.std(usual_values) * 2 | |
def test_value_anomaly(df, current_time, current_value, value_fn): | |
anomaly_score = anomaly_score_by_value(df, current_time, current_value, value_fn) | |
anomaly_message = "Normal" if anomaly_score < 0 else "*Anomaly!*" | |
print("> Value anomaly score: {0}, {1}".format(anomaly_score, anomaly_message)) | |
def anomaly_score_by_collection(df, current_time, current_value, value_fn=None): | |
# ignore this step if there are no enough messages | |
if len(df.index) < 3: | |
return 9999 | |
# calculate time interval of data collection | |
tmp = [] | |
for i in range(len(df.index) - 1): | |
tmp.append((df.index[i+1] - df.index[i]).total_seconds()) | |
delta = np.array(tmp) | |
new_delta = delta[-1] | |
#delta = np.array([(df.index[i+1] - df.index[i]).total_seconds() for i in range(len(df.index) - 1)]) | |
# calculate anomaly score | |
return np.abs(new_delta - np.average(delta)) - np.std(delta) * 2 | |
def test_collection_anomaly(df, current_time, current_value, value_fn=None): | |
anomaly_score = anomaly_score_by_collection(df, current_time, current_value) | |
anomaly_message = "Normal" if anomaly_score < 0 else "*Anomaly!*" | |
#print("Time since last message: {0} seconds".format(new_delta)) | |
print("> Data collection anomaly score: {0}, {1}".format(anomaly_score, anomaly_message)) | |
def main_loop(consumer: KafkaConsumer, key_spec: str, value_spec: list, value_fn): | |
df_key_spec = value_spec + [ key_spec ] # concat list | |
df = pd.DataFrame([], columns=df_key_spec) | |
try : | |
for message in consumer: | |
print("DEBUG: key: {0}, value: {1}".format(message.key, message.value)) | |
# Update history (dataframe) | |
_key = pd.to_datetime(message.value[key_spec]) | |
df.loc[_key] = message.value | |
# Anomaly detection | |
test_value_anomaly(df, _key, message.value, value_fn) | |
test_collection_anomaly(df, _key, message.value) | |
print("") | |
except KeyboardInterrupt: | |
print("process interrupted") | |
return | |
if __name__ == "__main__": | |
consumer = KafkaConsumer( | |
"sensor1", | |
bootstrap_servers = "127.0.0.1:9092", | |
auto_offset_reset='earliest', | |
## convert message key to datetime.datetime | |
key_deserializer=lambda m: m.decode("utf-8"), | |
# convert message value to dict | |
value_deserializer=lambda m: json.loads(m.decode("utf-8")) | |
) | |
main_loop(consumer, | |
key_spec='date', | |
value_spec=['ax', 'ay', 'az'], | |
value_fn=lambda v: v['ax']) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import random | |
import json | |
from datetime import datetime, timedelta, timezone | |
import uuid | |
device_id = 'sensor1' | |
count = 50 | |
_seed = 10000 | |
random.seed(_seed) | |
# starttime = datetime.now(tz=timezone(timedelta(hours=+9), 'JST')) | |
starttime = datetime.now(tz=timezone.utc) | |
factor = 1 | |
for n in range(count): | |
if n == 30: factor = 100.0 | |
if n == 40: factor = 1 | |
x = 2 * math.cos(2.0*3.14*n/10.0) + random.gauss(0.0, factor*0.05) | |
y = 2 * math.sin(2.0*3.14*n/10.0) + random.gauss(0.0, factor*0.05) | |
z = -9.8 + random.gauss(0.0, factor*0.05) | |
_date = starttime + timedelta(seconds=n+random.gauss(0.0, 0.0005)) | |
_key = str(uuid.uuid4()) | |
_value = { | |
"date": _date.isoformat(), | |
"ax": x, | |
"ay": y, | |
"az": z | |
} | |
# Key[TAB]Value とする. | |
# kafka-console-producer.sh で --property "parse.key=true" も必要 | |
print('\t'.join([_key, json.dumps(_value)])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment