import logging
import struct
import time
import tftpy
from StringIO import StringIO
import zlib
import hashlib
from transport import Transport
__author__ = 'jackh'
__date__ = 'June 2017'
LOGGER = logging.getLogger(__name__)
tftpy.setLogLevel(logging.ERROR)
def set_log_level(level):
tftpy.setLogLevel(level)
def get_log_level():
return tftpy.log.level
def get_core_info_payload(payload_str):
x = struct.unpack('>LLB', payload_str)
rw = x[0] & 0x3
addr = x[0] & 0xfffffffa
size = x[1]
typenum = x[2]
return {'rw' : rw, 'addr' : addr, 'size' : size, 'typenum' : typenum}
def decode_csl_pl(csl):
OFFSET = 2 # ???
regs = {}
v = struct.unpack('%dB' % len(csl), csl)
s = struct.unpack('%ds' % len(csl), csl)[0]
# payload size is first byte
pl = v[OFFSET]
prev_str = ''
nrepchars = 0
c = OFFSET
line = 0
while (c < len(csl)):
if c != OFFSET:
nrepchars = v[c]
c += 1
nchars = v[c]
if (nchars == 0) and (nrepchars == 0):
break
c += 1
this_str = prev_str[:nrepchars] + s[c : c + nchars]
c += nchars
#this_pl = v[c : c + pl]
regs[this_str] = get_core_info_payload(csl[c : c + pl])
c += pl
prev_str = this_str[:]
return regs
def decode_csl(csl):
x = decode_csl_pl(csl).keys()
x.sort()
return x
[docs]class TapcpTransport(Transport):
"""
The network transport for a tapcp-type interface.
"""
def __init__(self, **kwargs):
"""
Initialized Tapcp FPGA object
:param host: IP Address of the targeted Board
:return: none
"""
Transport.__init__(self, **kwargs)
self.t = tftpy.TftpClient(kwargs['host'], 69)
self._logger = LOGGER
self.timeout = kwargs.get('timeout', 1.2) # long enough to account for Flash erases
self.server_timeout = 4 # Microblaze timeout period
self.retries = kwargs.get('retries', 8)
[docs] def listdev(self):
buf = StringIO()
self.t.download('/listdev', buf, timeout=self.timeout)
return decode_csl(buf.getvalue())
[docs] def listdev_pl(self):
buf = StringIO()
self.t.download('/listdev', buf, timeout=self.timeout)
return decode_csl_pl(buf.getvalue())
[docs] def progdev(self, addr=0):
# address shifts down because we operate in 32-bit addressing mode
# see xilinx docs. Todo, fix this microblaze side
buf = StringIO(struct.pack('>L', addr >> 8))
try:
self.t.upload('/progdev', buf, timeout=self.timeout)
except:
pass # the progdev command kills the host, so things will start erroring
[docs] def prog_user_image(self):
""" (Re)Program the FPGA with the file already on flash """
meta = self.get_metadata()
addr = int(meta['prog_bitstream_start'])
print("File in flash is: %s"%meta['filename'])
self.progdev(addr=addr)
[docs] def get_temp(self):
buf = StringIO()
self.t.download('/temp', buf)
return struct.unpack('>f', buf.getvalue())[0]
[docs] def is_connected(self):
try:
self.read('sys_clkcounter', 4)
return True
except:
return False
[docs] def is_running(self):
"""
This is currently an alias for 'is_connected'
"""
return self.is_connected()
def _extract_bitstream(self,filename):
"""
Extract the header and program bitstream from the input file provided.
"""
with open(filename, 'r') as fh:
fpg = fh.read()
header_offset = fpg.find('\n?quit\n') + 7
header = fpg[0:header_offset] + '0'*(1024-header_offset%1024)
prog = fpg[header_offset:]+'0'*(1024-(len(fpg)-header_offset)%1024)
if prog.startswith('\x1f\x8b\x08'):
prog = zlib.decompress(prog, 16 + zlib.MAX_WBITS)
chksum = hashlib.md5()
chksum.update(fpg)
return header, prog, chksum.hexdigest()
def _update_metadata(self,filename,hlen,plen,md5):
"""
Update the meta data at user_flash_loc. Metadata is written
as 5 32bit integers in the following order:
header-location, length of header (in bytes),
program-location, length of the program bitstream (B),
md5sum of the fpg file
"""
USER_FLASH_LOC = 0x800000
SECTOR_SIZE = 0x10000
head_loc = USER_FLASH_LOC + SECTOR_SIZE
prog_loc = head_loc + hlen
metadict = {}; meta = ''
metadict['flash'] = '?sector_size\t%d'%SECTOR_SIZE
metadict['head'] = '?header_start\t%d?header_length\t%d'%(head_loc,hlen)
metadict['prog'] = '?prog_bitstream_start\t%d?prog_bitstream_length\t%d'%(prog_loc,plen)
metadict['md5'] = '?md5sum\t' + md5
metadict['file'] = '?filename\t' + filename.split('/')[-1]
for m in metadict.values():
meta += m
meta += '?end'
meta += '0'*(1024-len(meta)%1024)
self.blindwrite('/flash',meta,offset=USER_FLASH_LOC)
return head_loc, prog_loc
[docs] def upload_to_ram_and_program(self, filename, port=None, timeout=None, wait_complete=True):
USER_FLASH_LOC = 0x800000
sector_size = 0x10000
if(filename.endswith('.fpg')):
header, prog, md5 = self._extract_bitstream(filename)
meta_inflash = self.get_metadata()
if (meta_inflash['md5sum'] == md5):
print('File already on flash!')
self.progdev(int(meta_inflash['prog_bitstream_start']))
else:
HEAD_LOC, PROG_LOC = self._update_metadata(filename,len(header),len(prog),md5)
self.blindwrite('/flash',header+prog, offset= HEAD_LOC)
self.progdev(PROG_LOC)
else:
with open(filename,'r') as fh:
self.blindwrite('/flash', fh.read(), offset=USER_FLASH_LOC)
self.progdev(USER_FLASH_LOC)
def _get_device_address(self, device_name):
"""
:param device_name:
:return:
"""
raise NotImplementedError
[docs] def read(self, device_name, size, offset=0, use_bulk=True):
"""
Return size_bytes of binary data with carriage-return escape-sequenced.
:param device_name: name of memory device from which to read
:param size: how many bytes to read
:param offset: start at this offset, offset in bytes
:param use_bulk: Does nothing. Kept for API compatibility
:return: binary data string
"""
buf = StringIO()
for retry in range(self.retries - 1):
try:
self.t.download('%s.%x.%x' % (device_name, offset//4, size//4), buf, timeout=self.server_timeout)
return buf.getvalue()
except:
# if we fail to get a response after a bunch of packet re-sends, wait for the
# server to timeout and restart the whole transaction.
time.sleep(self.server_timeout)
LOGGER.warning('Tftp error on read -- retrying. %.3f' % time.time())
self.t.download('%s.%x.%x' % (device_name, offset//4, size//4), buf, timeout=self.timeout)
return buf.getvalue()
[docs] def blindwrite(self, device_name, data, offset=0, use_bulk=True):
"""
Unchecked data write.
:param device_name: the memory device to which to write
:param data: the byte string to write
:param offset: the offset, in bytes, at which to write
:param use_bulk: Does nothing. Kept for API compatibility
:return: <nothing>
"""
assert (type(data) == str), 'Must supply binary packed string data'
assert (len(data) % 4 == 0), 'Must write 32-bit-bounded words'
assert (offset % 4 == 0), 'Must write 32-bit-bounded words'
buf = StringIO(data)
for retry in range(self.retries - 1):
try:
self.t.upload('%s.%x.0' % (device_name, offset//4), buf, timeout=self.timeout)
return
except:
# if we fail to get a response after a bunch of packet re-sends, wait for the
# server to timeout and restart the whole transaction.
time.sleep(self.server_timeout)
LOGGER.warning('Tftp error on write -- retrying')
self.t.upload('%s.%x.0' % (device_name, offset//4), buf, timeout=self.timeout)
[docs] def deprogram(self):
"""
Deprogram the FPGA.
This actually reboots & boots from the Golden Image
:return: nothing
"""
# trigger reboot of FPGA
self.progdev(0)
LOGGER.info('%s: deprogrammed okay' % self.host)
[docs] def write_wishbone(self, wb_address, data):
"""
Used to perform low level wishbone write to a wishbone slave. Gives
low level direct access to wishbone bus.
:param wb_address: address of the wishbone slave to write to
:param data: data to write
:return: response object
"""
self.blindwrite('/fpga', data, offset=wb_address)
[docs] def read_wishbone(self, wb_address):
"""
Used to perform low level wishbone read from a Wishbone slave.
:param wb_address: address of the wishbone slave to read from
:return: Read Data or None
"""
return self.read('/fpga', 4, offset=wb_address)
[docs] def get_firmware_version(self):
"""
Read the version of the firmware
:return: golden_image, multiboot, firmware_major_version,
firmware_minor_version
"""
raise NotImplementedError
[docs] def get_soc_version(self):
"""
Read the version of the soc
:return: golden_image, multiboot, soc_major_version, soc_minor_version
"""
raise NotImplementedError