Created
July 17, 2024 22:54
-
-
Save maxsei/76c5e964dae3494c03bceba73be8e1dc to your computer and use it in GitHub Desktop.
device manager api
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"database/sql" | |
"errors" | |
"fmt" | |
"io" | |
"slices" | |
"sync" | |
"time" | |
"github.com/google/uuid" | |
) | |
func main() { | |
} | |
type ( | |
ApiDeviceListenerFacing interface { | |
AddSerialDevice(serialDevice SerialDevice) error | |
RemoveSerialDevice(serialNumber string) error | |
} | |
ApiUserFacing interface { | |
IdentifyDevice(info DeviceInfo) error | |
SubDevices(sinceBefore *time.Time) (init []uuid.UUID, changes <-chan Event, err error) | |
UnsubDevices(sub <-chan Event) error | |
} | |
// deviceManager stores devices are ready to start recording if they have the | |
// right information (speed and a name). | |
deviceManager struct { | |
db DatabaseInterface | |
devices []Device | |
devicesM sync.Mutex | |
unidentifiedDeviceEvents Bus[Event] | |
} | |
) | |
// AddSerialDevice will be called by serial device listener when it detects a | |
// new serial device. | |
func (dm *deviceManager) AddSerialDevice(serialDevice SerialDevice) error { | |
e, err := dm.addSerialDevice(serialDevice) | |
if err != nil { | |
return err | |
} | |
return dm.unidentifiedDeviceEvents.Pub(*e) | |
} | |
func (dm *deviceManager) addSerialDevice(serialDevice SerialDevice) (*Event, error) { | |
var device Device | |
{ | |
info, err := dm.db.GetDeviceInfoBySerialNumber(serialDevice.SerialNumber()) | |
if err != nil { | |
if !errors.Is(err, sql.ErrNoRows) { | |
return nil, err | |
} | |
id, err := uuid.NewV7() | |
if err != nil { | |
return nil, err | |
} | |
info = &DeviceInfo{ID: id} | |
} | |
device = Device{serialDevice, *info} | |
} | |
dm.devicesM.Lock() | |
defer dm.devicesM.Unlock() | |
i := slices.IndexFunc(dm.devices, func(el Device) bool { | |
return el.SerialNumber() == device.SerialNumber() | |
}) | |
if i >= 0 { | |
return nil, fmt.Errorf("serial device(SN:%s) already exists", device.SerialNumber()) | |
} | |
dm.devices = append(dm.devices, device) | |
return &Event{ID: device.ID, At: time.Now()}, nil | |
} | |
// RemoveSerialDevice find the entry with the serial number if it exists and | |
func (dm *deviceManager) RemoveSerialDevice(serialNumber string) error { | |
e, err := dm.removeSerialDevice(serialNumber) | |
if err != nil { | |
return err | |
} | |
return dm.unidentifiedDeviceEvents.Pub(*e) | |
} | |
func (dm *deviceManager) removeSerialDevice(serialNumber string) (*Event, error) { | |
dm.devicesM.Lock() | |
defer dm.devicesM.Unlock() | |
return dm.removeSerialDeviceUnsafe(serialNumber) | |
} | |
func (dm *deviceManager) removeSerialDeviceUnsafe(serialNumber string) (*Event, error) { | |
i := slices.IndexFunc(dm.devices, func(el Device) bool { | |
return el.SerialNumber() == serialNumber | |
}) | |
if i < 0 { | |
return nil, fmt.Errorf("serial device(SN:%s) does not exist", serialNumber) | |
} | |
devices := dm.devices[i] | |
dm.devices = slices.Delete(dm.devices, i, i+1) | |
return &Event{ID: devices.ID, At: time.Now(), Redacted: true}, nil | |
} | |
// IdentifyDevice device finds the device in the cache, identifies it in the | |
// database, and then writes the result back into the cache | |
func (dm *deviceManager) IdentifyDevice(info DeviceInfo) error { | |
// TODO: validate info | |
dm.devicesM.Lock() | |
defer dm.devicesM.Unlock() | |
i := slices.IndexFunc(dm.devices, func(el Device) bool { | |
return el.ID == info.ID | |
}) | |
if i < 0 { | |
return fmt.Errorf("could not find device with id: %s", info.ID) | |
} | |
device := &dm.devices[i] | |
if err := dm.db.IdentifyDevice(device.SerialNumber(), info); err != nil { | |
return err | |
} | |
device.DeviceInfo = info | |
// FIXME: Kick off recording from device, or remove device and push to another device manager here! | |
return dm.unidentifiedDeviceEvents.Pub(Event{device.ID, time.Now(), true}) | |
} | |
// SubDeviceIdChanges | |
func (dm *deviceManager) SubDevices(sinceBefore *time.Time) (init []uuid.UUID, changes <-chan Event, err error) { | |
dm.devicesM.Lock() | |
defer dm.devicesM.Unlock() | |
changes, err = dm.unidentifiedDeviceEvents.Sub() | |
if err != nil { | |
return nil, nil, err | |
} | |
for _, dev := range dm.devices { | |
if dev.IsIdentified() { | |
continue | |
} | |
init = append(init, dev.ID) | |
} | |
return | |
} | |
func (dm *deviceManager) UnsubDevices(sub <-chan Event) error { | |
return dm.unidentifiedDeviceEvents.Unsub(sub) | |
} | |
type Device struct { | |
SerialDevice `json:"-"` | |
DeviceInfo | |
} | |
func (d *Device) IsIdentified() bool { return len(d.Name) > 0 } | |
// Event encodes information about particular entity having been CRUDed. | |
type Event struct { | |
ID uuid.UUID | |
At time.Time | |
Redacted bool `json:",omitempty"` | |
} | |
type AsEventable interface { | |
AsEvent() Event | |
} | |
type DatabaseInterface interface { | |
GetDeviceInfoBySerialNumber(serialNumber string) (*DeviceInfo, error) | |
IdentifyDevice(serialNumber string, info DeviceInfo) error | |
} | |
type DeviceInfo struct { | |
ID uuid.UUID | |
// SerialNumber string | |
Name string | |
Baud int64 | |
} | |
type SerialDevice interface { | |
io.ReadWriteCloser | |
SerialNumber() string | |
SetSpeed(baud int) error | |
GetSpeed() (int, error) | |
} | |
// Start package bus. | |
var ( | |
ErrBusClosed = errors.New("closed") | |
ErrSubscriberNotFound = errors.New("subscriber not found") | |
) | |
type Publisher[T any] interface { | |
Pub(msg T) error | |
Close() error | |
} | |
type Subscribable[T any] interface { | |
Sub() (<-chan T, error) | |
Unsub(sub <-chan T) error | |
} | |
func NewBus[T any]() *Bus[T] { | |
return &Bus[T]{} | |
} | |
type Bus[T any] struct { | |
subscribers []chan T | |
m sync.RWMutex | |
closed bool | |
} | |
func (b *Bus[T]) Pub(msg T) error { | |
b.m.RLocker().Lock() | |
defer b.m.RLocker().Unlock() | |
if b.closed { | |
return ErrBusClosed | |
} | |
for _, sub := range b.subscribers { | |
// XXX: must guarantee that each message is seen (could be bad for slow consumers) | |
sub <- msg | |
// select { | |
// case sub <- msg: | |
// default: | |
// // defer b.Unsub(sub) | |
// } | |
} | |
return nil | |
} | |
func (b *Bus[T]) Sub() (<-chan T, error) { | |
b.m.Lock() | |
if b.closed { | |
return nil, ErrBusClosed | |
} | |
res := make(chan T, 1) | |
b.subscribers = append(b.subscribers, res) | |
b.m.Unlock() | |
return res, nil | |
} | |
func (b *Bus[T]) Unsub(sub <-chan T) error { | |
b.m.Lock() | |
defer b.m.Unlock() | |
if b.closed { | |
return ErrBusClosed | |
} | |
for i := range b.subscribers { | |
if b.subscribers[i] != sub { | |
continue | |
} | |
close(b.subscribers[i]) | |
b.subscribers = slices.Delete(b.subscribers, i, i+1) | |
return nil | |
} | |
return ErrSubscriberNotFound | |
} | |
func (b *Bus[T]) Close() error { | |
b.m.Lock() | |
if b.closed { | |
return ErrBusClosed | |
} | |
b.closed = true | |
for i := range b.subscribers { | |
close(b.subscribers[i]) | |
} | |
b.subscribers = b.subscribers[:] | |
b.m.Unlock() | |
return nil | |
} | |
// End package bus. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment