Last active
March 5, 2024 15:28
-
-
Save acaburaz/19bf1b212ff43ecb6e3f6f40eeb30344 to your computer and use it in GitHub Desktop.
Raspberry PI pimono LCD, RX6, Sound on RF Device Push
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import time | |
import json | |
import subprocess | |
import RPi.GPIO as GPIO | |
import os | |
from PIL import Image, ImageDraw, ImageFont | |
import ST7735 | |
from rpi_rf import RFDevice | |
import socket | |
file_path = 'stats.json' | |
GPIO.setmode(GPIO.BCM) | |
class Stats: | |
def __init__(self, total_sales=0): | |
self.total_sales = total_sales | |
def increment_sales(self, amount=1): | |
self.total_sales += amount | |
def sales_per_month(self): | |
return round(self.total_sales / 52) | |
def sales_per_week(self): | |
return round(self.total_sales / 7) | |
def to_dict(self): | |
return { | |
"Monthly": self.sales_per_month(), | |
"Sales": self.total_sales, | |
"Weekly": self.sales_per_week() | |
} | |
def read_statistics(file_path): | |
try: | |
with open(file_path, 'r') as file: | |
stats_data = json.load(file) | |
return Stats(total_sales=stats_data.get("Sales", 0)) | |
except (FileNotFoundError, json.JSONDecodeError): | |
return Stats() | |
def write_statistics(file_path, stats): | |
with open(file_path, 'w') as file: | |
json.dump(stats.to_dict(), file, indent=4) | |
def update_display(stats): | |
# Create a new image for the display | |
image = Image.new('RGB', (disp.width, disp.height), color=(0, 0, 0)) | |
draw = ImageDraw.Draw(image) | |
# Load and resize the logo image | |
logo = Image.open('LP_logo_qr.png').resize((64, 64), Image.ANTIALIAS) | |
# Calculate and paste the logo position | |
logo_x_pos = 10 | |
logo_y_pos = (disp.height - logo.height) // 2 | |
image.paste(logo, (logo_x_pos, logo_y_pos)) | |
# Define font for text | |
font_path = '/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf' | |
small_font_size = 10 | |
font = ImageFont.truetype(font_path, small_font_size) | |
# Color mapping for different keys | |
color_mapping = { | |
"Sales": (0, 255, 0), # Green | |
"Monthly": (0, 0, 255), # Blue | |
"Weekly": (128, 128, 128) # Gray | |
} | |
# Draw text to the right of the logo | |
text_x_pos = logo_x_pos + logo.width + 10 | |
y_offset = logo_y_pos | |
# Iterate over stats in a specific order | |
stats_dict = stats.to_dict() | |
for key in ["Sales", "Monthly", "Weekly"]: | |
value = stats_dict[key] | |
# Round the value to 2 decimal places if it's a float, and format the text | |
formatted_value = "{:.2f}".format(value) if isinstance(value, float) else value | |
text_to_display = '{}: {}'.format(key, formatted_value) | |
text_color = color_mapping[key] | |
draw.text((text_x_pos, y_offset), text_to_display, font=font, fill=text_color) | |
y_offset += font.getsize(text_to_display)[1] + 2 # Adjust gap based on text height | |
# Update the display with the image | |
# Add some vertical space before displaying the IP address | |
y_offset += 10 # Adjust spacing as needed | |
ip_font_size = 8 | |
ip_font = ImageFont.truetype(font_path, ip_font_size) | |
# Get the IP address and draw it | |
ip_address = get_ip_address() | |
wifi_name = get_wifi_name() | |
draw.text((text_x_pos, y_offset), '{}'.format(wifi_name), font=ip_font, fill=(255, 255, 255)) | |
draw.text((text_x_pos, y_offset+9), '{}'.format(ip_address), font=ip_font, fill=(255, 255, 255)) | |
disp.display(image) | |
import subprocess | |
def get_wifi_name(): | |
try: | |
# Run the iwgetid command with subprocess to get the SSID | |
ssid_output = subprocess.check_output(['iwgetid', '-r']).decode('utf-8').strip() | |
return ssid_output | |
except subprocess.CalledProcessError: | |
# Return None or a default string if the command fails | |
return "NaN" | |
def get_ip_address(): | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
try: | |
# doesn't even have to be reachable | |
s.connect(('10.255.255.255', 1)) | |
IP = s.getsockname()[0] | |
except Exception: | |
IP = '127.0.0.1' | |
finally: | |
s.close() | |
return IP | |
def play_sound_and_control_gpio(): | |
try: | |
# Explicitly set GPIO mode each time before using GPIO | |
GPIO.setmode(GPIO.BCM) | |
# Setup GPIO pin as an output | |
GPIO.setup(17, GPIO.OUT) | |
# Turn GPIO pin 17 on | |
GPIO.output(17, GPIO.HIGH) | |
os.system('raspi-gpio set 13 a0') | |
os.system('aplay sales_alternative.wav') | |
time.sleep(3) | |
# Turn GPIO pin 17 off | |
GPIO.output(17, GPIO.LOW) | |
os.system('raspi-gpio set 13 a5') | |
# Optionally, clean up only the pin(s) you used | |
GPIO.cleanup(17) # Clean up the specific GPIO pin instead of all | |
print("Sound played and GPIO manipulated successfully.") | |
except Exception as e: | |
print("An error occurred:", e) | |
# Create ST7735 LCD display class. | |
disp = ST7735.ST7735( | |
port=0, | |
cs=ST7735.BG_SPI_CS_FRONT, # BG_SPI_CS_BACK or BG_SPI_CS_FRONT. BG_SPI_CS_FRONT (eg: CE1) for Enviro Plus | |
dc=9, # "GPIO9" / "PIN21". "PIN21" for a Pi 5 with Enviro Plus | |
backlight=19, # "PIN18" for back BG slot, "PIN19" for front BG slot. "PIN32" for a Pi 5 with Enviro Plus | |
rotation=270, | |
spi_speed_hz=4000000 | |
) | |
stats = read_statistics(file_path) | |
update_display(stats) | |
disp.begin() | |
#font = ImageFont.load_default() | |
rfdevice = RFDevice(27) | |
rfdevice.enable_rx() | |
last_code_timestamp = None | |
interested_rf_codes = [14077633, 5089089, 4312385,4971201] | |
file_path = 'stats.json' | |
# Dictionary to track the last processing time for each code | |
last_processed_times = {} | |
processing_delay = 300 # Delay in seconds before the same code can be processed again | |
while True: | |
try: | |
current_time = time.time() | |
received_code = rfdevice.rx_code | |
# Check if the received code is new or enough time has passed since it was last processed | |
if received_code in interested_rf_codes and (received_code not in last_processed_times or (current_time - last_processed_times[received_code]) >= processing_delay): | |
print('Processing code:', received_code) | |
stats = read_statistics(file_path) | |
stats.increment_sales() | |
write_statistics(file_path, stats) | |
# Optionally update the display and play a sound | |
play_sound_and_control_gpio() | |
# Update the last processed time for the received code | |
last_processed_times[received_code] = current_time | |
update_display(stats) | |
received_code=0 | |
elif received_code in interested_rf_codes: | |
print("Received code recently processed, skipping:", received_code) | |
else: | |
print("Received code not in interested list:", received_code) | |
time.sleep(0.1) # Short delay to reduce CPU usage | |
except Exception as e: | |
print("An error occurred:", e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment