Last active
December 14, 2015 17:49
-
-
Save jkent/5125274 to your computer and use it in GitHub Desktop.
wrapper class for libftdi1 and mpsse i2c bus and device classes
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# vim: ts=4 et sw=4 sts=4 ai | |
import ftdi1 | |
NONE = ftdi1.NONE | |
ODD = ftdi1.ODD | |
EVEN = ftdi1.EVEN | |
MARK = ftdi1.MARK | |
SPACE = ftdi1.SPACE | |
STOP_BIT_1 = ftdi1.STOP_BIT_1 | |
STOP_BIT_15 = ftdi1.STOP_BIT_15 | |
STOP_BIT_2 = ftdi1.STOP_BIT_2 | |
BITMODE_RESET = ftdi1.BITMODE_RESET | |
BITMODE_BITBANG = ftdi1.BITMODE_BITBANG | |
BITMODE_MPSSE = ftdi1.BITMODE_MPSSE | |
BITMODE_SYNCBB = ftdi1.BITMODE_SYNCBB | |
BITMODE_MCU = ftdi1.BITMODE_MCU | |
BITMODE_OPTO = ftdi1.BITMODE_OPTO | |
BITMODE_CBUS = ftdi1.BITMODE_CBUS | |
BITMODE_SYNCFF = ftdi1.BITMODE_SYNCFF | |
BITMODE_FT1284 = ftdi1.BITMODE_FT1284 | |
INTERFACE_ANY = ftdi1.INTERFACE_ANY | |
INTERFACE_A = ftdi1.INTERFACE_A | |
INTERFACE_B = ftdi1.INTERFACE_B | |
INTERFACE_C = ftdi1.INTERFACE_C | |
INTERFACE_D = ftdi1.INTERFACE_D | |
class FtdiError(EnvironmentError): | |
def __init__(self, context, errno, strerror=None): | |
if not strerror: | |
strerror = ftdi1.get_error_string(context) | |
EnvironmentError.__init__(self, errno, strerror) | |
class Ftdi: | |
def __init__(self): | |
self._ctx = ftdi1.context() | |
result = ftdi1.init(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def __del__(self): | |
ftdi1.deinit(self._ctx) | |
def __getitem__(self, key): | |
return self._ctx.__getattr__(key) | |
def __setitem__(self, key, value): | |
self._ctx.__setattr__(key, value) | |
def open(self, vid, pid, description=None, serial=None, index=None): | |
if index == None: | |
result = ftdi1.usb_open_desc(self._ctx, vid, pid, description, serial) | |
else: | |
result = ftdi1.usb_open_desc_index(self._ctx, vid, pid, description, serial, index) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def open_string(self, s): | |
result = ftdi1.usb_open_string(self._ctx, s) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def reset(self): | |
result = ftdi1.usb_reset(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def purge_rx_buffer(self): | |
result = ftdi1.usb_purge_rx_buffer(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def purge_tx_buffer(self): | |
result = ftdi1.usb_purge_tx_buffer(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def purge_buffers(self): | |
result = ftdi1.usb_purge_buffers(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def close(self): | |
result = ftdi1.usb_close(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def set_interface(self, interface): | |
result = ftdi1.set_interface(self._ctx, interface) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def find_all(self, vid=0, pid=0): | |
devlist = ftdi1.device_list() | |
result = ftdi1.usb_find_all(self._ctx, devlist, vid, pid) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def set_baudrate(self, baudrate): | |
result = ftdi1.set_baudrate(self._ctx, baudrate) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def set_line_property(self, bits=8, stop_bits=STOP_BIT_1, parity=NONE, break_=False): | |
result = ftdi1.set_line_property2(self._ctx, bits, stop_bits, parity, break_) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def write_data(self, data): | |
result = ftdi1.write_data(self._ctx, data, len(data)) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
return result | |
def write_data_set_chunksize(self, chunksize): | |
result = ftdi1.write_data_set_chunksize(self._ctx, chunksize) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def write_data_get_chunksize(self): | |
result, chunksize = ftdi1.write_data_get_chunksize(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
return chunksize | |
def read_data(self, length): | |
result, data = ftdi1.read_data(self._ctx, length) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
return data[:result] | |
def read_data_set_chunksize(self, chunksize): | |
result = ftdi1.read_data_set_chunksize(self._ctx, chunksize) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def read_data_get_chunksize(self): | |
result, chunksize = ftdi1.read_data_get_chunksize(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
return chunksize | |
def set_bitmode(self, mode, bitmask=0x00): | |
result = ftdi1.set_bitmode(self._ctx, bitmask, mode) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def disable_bitbang(self): | |
result = ftdi1.disable_bitbang(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def read_pins(self): | |
result, pins = ftdi1.read_pins(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
return pins | |
def set_latency_timer(self, latency): | |
result = ftdi1.set_latency_timer(self._ctx, latency) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def get_latency_timer(self): | |
result, latency = ftdi1.get_latency_timer(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
return ord(latency) | |
def poll_modem_status(self): | |
result, status = ftdi1.poll_modem_status(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
return status | |
def set_flow_control(self, flowctrl): | |
result = ftdi1.setflowctrl(self._ctx, flowctrl) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def set_dtr(self, state): | |
result = ftdi1.setdtr(self._ctx, state) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def set_rts(self, state): | |
result = ftdi1.setrts(self._ctx, state) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def set_dtr_rts(self, dtr, rts): | |
result = ftdi1.setdtr_rts(self._ctx, dtr, rts) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def set_event_char(self, char=None): | |
enable = True | |
if char == None: | |
enable = False | |
char = 0 | |
result = ftdi1.set_event_char(self._ctx, char, enable) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def set_error_char(self, char=None): | |
enable = True | |
if char == None: | |
enable = False | |
char = 0 | |
result = ftdi1.set_error_char(self._ctx, char, enable) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
def read_chipid(self): | |
result, chipid = ftdi1.read_chipid(self._ctx) | |
if result < 0: | |
raise FtdiError(self._ctx, result) | |
return chipid | |
@staticmethod | |
def get_library_version(): | |
return ftdi1.get_library_version() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# vim: ts=4 et sw=4 sts=4 ai | |
import struct | |
import ftdi | |
class I2CError(EnvironmentError): | |
pass | |
class I2CBus: | |
def __init__(self, dev): | |
self.dev = dev | |
self.reset() | |
def reset(self): | |
self.out = '' | |
self.dev.reset() | |
self.dev.purge_rx_buffer() | |
self.dev.set_event_char() | |
self.dev.set_error_char() | |
self.dev.set_latency_timer(1) | |
self.dev.set_bitmode(ftdi.BITMODE_RESET) | |
self.dev.set_bitmode(ftdi.BITMODE_MPSSE) | |
# synchronize with MPSSE | |
self.out += '\xAA\xAB\x87' | |
self.flush() | |
data = '' | |
while not data.endswith('\xFA\xAA\xFA\xAB'): | |
read = self.dev.read_data(1024) | |
if not read: | |
raise I2CError('unable to synchronize MPSSE with 0xAA 0xAB') | |
data += read | |
self.out += '\x8A' # disable clock divide by 5 | |
self.out += '\x97' # disable adaptive clocking | |
self.out += '\x8D' # enable 3 phase data clock | |
self.out += '\x80\x03\x03' # SDA:1 SCL:1 | |
# SK frequency = 60MHz / ((1 + div) * 2) | |
self.out += '\x86' + struct.pack('<H', 299) # set TCK divisor for 100 kHz | |
self.out += '\x85' # disable TDI/TDO loopback | |
self.flush() | |
def start(self): | |
self.out += '\x80\x03\x03' # SDA:1 SCL:1 | |
self.out += '\x80\x01\x03' * 20 # SDA:0 SCL:1 | |
self.out += '\x80\x00\x03' # SDA:0 SCL:0 | |
def stop(self): | |
self.out += '\x80\x01\x03' * 20 # SDA:0 SCL:1 | |
self.out += '\x80\x03\x03' # SDA:1 SCL:1 | |
self.out += '\x80\x03\x00' # SDA:Z SCL:Z | |
self.flush() | |
def write(self, byte): | |
if type(byte) == int: | |
byte = chr(byte) | |
self.out += '\x11\x00\x00' # clock one byte out, MSB first, on falling edge | |
self.out += byte | |
self.out += '\x80\x02\x01' # SDA:Z SCL:0 | |
self.out += '\x22\x00' # clock one bit in on rising edge | |
self.out += '\x87' # send immediate | |
self.flush() | |
in_ = '' | |
while True: | |
in_ += self.dev.read_data(1) | |
if in_: | |
break | |
self.out +='\x80\x02\x03' # SDA:1 SCL:0 | |
if ord(in_) & 0x01: | |
return False | |
return True | |
def read(self): | |
self.out += '\x80\x02\x01' # SDA:Z SCL:0 | |
self.out += '\x24\x00\x00' # clock one byte in, MSB first, on falling edge | |
self.out += '\x87' # send immediate | |
self.flush() | |
in_ = '' | |
while True: | |
in_ += self.dev.read_data(1) | |
if in_: | |
break | |
return in_ | |
def ack(self, ack=True): | |
self.out += '\x80\x03\x03' # SDA:1 SCL:1 | |
if ack: | |
self.out += '\x12\x00\x00' | |
else: | |
self.out += '\x12\x00\x80' | |
def flush(self): | |
self.dev.write_data(self.out) | |
self.out = '' | |
class I2CDevice: | |
def __init__(self, bus, addr): | |
self.bus = bus | |
self.addr = addr | |
def _set_register(self, reg_addr): | |
self.bus.start() | |
if not self.bus.write(self.addr << 1): | |
raise I2CError('ack not received') | |
if not self.bus.write(reg_addr): | |
raise I2CError('ack not received') | |
def reg_read(self, reg_addr, length=1): | |
data = '' | |
self._set_register(reg_addr) | |
self.bus.start() | |
if not self.bus.write((self.addr << 1) + 1): | |
raise I2CError('ack not received') | |
for i in xrange(1, length+1): | |
data += self.bus.read() | |
self.bus.ack(i != length) | |
self.bus.stop() | |
return data | |
def reg_write(self, reg_addr, data): | |
self._set_register(reg_addr) | |
for byte in data: | |
if not self.bus.write(byte): | |
raise I2CError('ack not received') | |
self.bus.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment