Skip to content

Instantly share code, notes, and snippets.

@maxsei
Created July 17, 2024 22:54
Show Gist options
  • Save maxsei/76c5e964dae3494c03bceba73be8e1dc to your computer and use it in GitHub Desktop.
Save maxsei/76c5e964dae3494c03bceba73be8e1dc to your computer and use it in GitHub Desktop.
device manager api
package main
import (
"database/sql"
"errors"
"fmt"
"io"
"slices"
"sync"
"time"
"github.com/google/uuid"
)
func main() {
}
type (
ApiDeviceListenerFacing interface {
AddSerialDevice(serialDevice SerialDevice) error
RemoveSerialDevice(serialNumber string) error
}
ApiUserFacing interface {
IdentifyDevice(info DeviceInfo) error
SubDevices(sinceBefore *time.Time) (init []uuid.UUID, changes <-chan Event, err error)
UnsubDevices(sub <-chan Event) error
}
// deviceManager stores devices are ready to start recording if they have the
// right information (speed and a name).
deviceManager struct {
db DatabaseInterface
devices []Device
devicesM sync.Mutex
unidentifiedDeviceEvents Bus[Event]
}
)
// AddSerialDevice will be called by serial device listener when it detects a
// new serial device.
func (dm *deviceManager) AddSerialDevice(serialDevice SerialDevice) error {
e, err := dm.addSerialDevice(serialDevice)
if err != nil {
return err
}
return dm.unidentifiedDeviceEvents.Pub(*e)
}
func (dm *deviceManager) addSerialDevice(serialDevice SerialDevice) (*Event, error) {
var device Device
{
info, err := dm.db.GetDeviceInfoBySerialNumber(serialDevice.SerialNumber())
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, err
}
id, err := uuid.NewV7()
if err != nil {
return nil, err
}
info = &DeviceInfo{ID: id}
}
device = Device{serialDevice, *info}
}
dm.devicesM.Lock()
defer dm.devicesM.Unlock()
i := slices.IndexFunc(dm.devices, func(el Device) bool {
return el.SerialNumber() == device.SerialNumber()
})
if i >= 0 {
return nil, fmt.Errorf("serial device(SN:%s) already exists", device.SerialNumber())
}
dm.devices = append(dm.devices, device)
return &Event{ID: device.ID, At: time.Now()}, nil
}
// RemoveSerialDevice find the entry with the serial number if it exists and
func (dm *deviceManager) RemoveSerialDevice(serialNumber string) error {
e, err := dm.removeSerialDevice(serialNumber)
if err != nil {
return err
}
return dm.unidentifiedDeviceEvents.Pub(*e)
}
func (dm *deviceManager) removeSerialDevice(serialNumber string) (*Event, error) {
dm.devicesM.Lock()
defer dm.devicesM.Unlock()
return dm.removeSerialDeviceUnsafe(serialNumber)
}
func (dm *deviceManager) removeSerialDeviceUnsafe(serialNumber string) (*Event, error) {
i := slices.IndexFunc(dm.devices, func(el Device) bool {
return el.SerialNumber() == serialNumber
})
if i < 0 {
return nil, fmt.Errorf("serial device(SN:%s) does not exist", serialNumber)
}
devices := dm.devices[i]
dm.devices = slices.Delete(dm.devices, i, i+1)
return &Event{ID: devices.ID, At: time.Now(), Redacted: true}, nil
}
// IdentifyDevice device finds the device in the cache, identifies it in the
// database, and then writes the result back into the cache
func (dm *deviceManager) IdentifyDevice(info DeviceInfo) error {
// TODO: validate info
dm.devicesM.Lock()
defer dm.devicesM.Unlock()
i := slices.IndexFunc(dm.devices, func(el Device) bool {
return el.ID == info.ID
})
if i < 0 {
return fmt.Errorf("could not find device with id: %s", info.ID)
}
device := &dm.devices[i]
if err := dm.db.IdentifyDevice(device.SerialNumber(), info); err != nil {
return err
}
device.DeviceInfo = info
// FIXME: Kick off recording from device, or remove device and push to another device manager here!
return dm.unidentifiedDeviceEvents.Pub(Event{device.ID, time.Now(), true})
}
// SubDeviceIdChanges
func (dm *deviceManager) SubDevices(sinceBefore *time.Time) (init []uuid.UUID, changes <-chan Event, err error) {
dm.devicesM.Lock()
defer dm.devicesM.Unlock()
changes, err = dm.unidentifiedDeviceEvents.Sub()
if err != nil {
return nil, nil, err
}
for _, dev := range dm.devices {
if dev.IsIdentified() {
continue
}
init = append(init, dev.ID)
}
return
}
func (dm *deviceManager) UnsubDevices(sub <-chan Event) error {
return dm.unidentifiedDeviceEvents.Unsub(sub)
}
type Device struct {
SerialDevice `json:"-"`
DeviceInfo
}
func (d *Device) IsIdentified() bool { return len(d.Name) > 0 }
// Event encodes information about particular entity having been CRUDed.
type Event struct {
ID uuid.UUID
At time.Time
Redacted bool `json:",omitempty"`
}
type AsEventable interface {
AsEvent() Event
}
type DatabaseInterface interface {
GetDeviceInfoBySerialNumber(serialNumber string) (*DeviceInfo, error)
IdentifyDevice(serialNumber string, info DeviceInfo) error
}
type DeviceInfo struct {
ID uuid.UUID
// SerialNumber string
Name string
Baud int64
}
type SerialDevice interface {
io.ReadWriteCloser
SerialNumber() string
SetSpeed(baud int) error
GetSpeed() (int, error)
}
// Start package bus.
var (
ErrBusClosed = errors.New("closed")
ErrSubscriberNotFound = errors.New("subscriber not found")
)
type Publisher[T any] interface {
Pub(msg T) error
Close() error
}
type Subscribable[T any] interface {
Sub() (<-chan T, error)
Unsub(sub <-chan T) error
}
func NewBus[T any]() *Bus[T] {
return &Bus[T]{}
}
type Bus[T any] struct {
subscribers []chan T
m sync.RWMutex
closed bool
}
func (b *Bus[T]) Pub(msg T) error {
b.m.RLocker().Lock()
defer b.m.RLocker().Unlock()
if b.closed {
return ErrBusClosed
}
for _, sub := range b.subscribers {
// XXX: must guarantee that each message is seen (could be bad for slow consumers)
sub <- msg
// select {
// case sub <- msg:
// default:
// // defer b.Unsub(sub)
// }
}
return nil
}
func (b *Bus[T]) Sub() (<-chan T, error) {
b.m.Lock()
if b.closed {
return nil, ErrBusClosed
}
res := make(chan T, 1)
b.subscribers = append(b.subscribers, res)
b.m.Unlock()
return res, nil
}
func (b *Bus[T]) Unsub(sub <-chan T) error {
b.m.Lock()
defer b.m.Unlock()
if b.closed {
return ErrBusClosed
}
for i := range b.subscribers {
if b.subscribers[i] != sub {
continue
}
close(b.subscribers[i])
b.subscribers = slices.Delete(b.subscribers, i, i+1)
return nil
}
return ErrSubscriberNotFound
}
func (b *Bus[T]) Close() error {
b.m.Lock()
if b.closed {
return ErrBusClosed
}
b.closed = true
for i := range b.subscribers {
close(b.subscribers[i])
}
b.subscribers = b.subscribers[:]
b.m.Unlock()
return nil
}
// End package bus.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment