import logging
import struct
from utils import check_changing_status
from memory import Memory
from network import Mac, IpAddress
LOGGER = logging.getLogger(__name__)
[docs]class TenGbe(Memory):
"""
To do with the CASPER ten GBE yellow block implemented on FPGAs,
and interfaced-to via KATCP memory reads/writes.
"""
def __init__(self, parent, name, address, length_bytes, device_info=None):
"""
:param parent:
:param name:
:param address:
:param length_bytes:
:param device_info:
:return:
"""
self.mac, self.ip_address, self.port = None, None, None
super(TenGbe, self).__init__(name=name, width_bits=32, address=address,
length_bytes=length_bytes)
self.parent = parent
self.fullname = self.parent.host + ':' + self.name
self.block_info = device_info
if device_info is not None:
fabric_ip = device_info['fab_ip']
if fabric_ip.find('(2^24) + ') != -1:
device_info['fab_ip'] = (fabric_ip.replace('*(2^24) + ', '.')
.replace('*(2^16) + ', '.')
.replace('*(2^8) + ', '.')
.replace('*(2^0)', ''))
fabric_mac = device_info['fab_mac']
if fabric_mac.find('hex2dec') != -1:
fabric_mac = fabric_mac.replace('hex2dec(\'', '')
fabric_mac = fabric_mac.replace('\')', '')
device_info['fab_mac'] = (
fabric_mac[0:2] + ':' + fabric_mac[2:4] + ':' +
fabric_mac[4:6] + ':' + fabric_mac[6:8] + ':' +
fabric_mac[8:10] + ':' + fabric_mac[10:])
mac = device_info['fab_mac']
ip_address = device_info['fab_ip']
port = device_info['fab_udp']
if mac is None or ip_address is None or port is None:
raise ValueError('%s: 10Gbe interface must '
'have mac, ip and port.' % self.fullname)
self.setup(mac, ip_address, port)
self.core_details = None
self.snaps = {'tx': None, 'rx': None}
self.registers = {'tx': [], 'rx': []}
self.multicast_subscriptions = []
# TODO
# if self.parent.is_connected():
# self._check()
[docs] @classmethod
def from_device_info(cls, parent, device_name, device_info, memorymap_dict):
"""
Process device info and the memory map to get all necessary info
and return a TenGbe instance.
:param parent: the parent device, normally an FPGA instance
:param device_name: the unique device name
:param device_info: information about this device
:param memorymap_dict: a dictionary containing the device memory map
:return: a TenGbe object
"""
address, length_bytes = -1, -1
for mem_name in memorymap_dict.keys():
if mem_name == device_name:
address = memorymap_dict[mem_name]['address']
length_bytes = memorymap_dict[mem_name]['bytes']
break
if address == -1 or length_bytes == -1:
raise RuntimeError('Could not find address or length '
'for TenGbe %s' % device_name)
return cls(parent, device_name, address, length_bytes, device_info)
def __repr__(self):
return '%s:%s' % (self.__class__.__name__, self.name)
def __str__(self):
"""
String representation of this 10Gbe interface.
"""
return '%s: MAC(%s) IP(%s) Port(%s)' % (
self.name, str(self.mac), str(self.ip_address), str(self.port))
[docs] def setup(self, mac, ipaddress, port):
"""
Set up the MAC, IP and port for this interface
:param mac:
:param ipaddress:
:param port:
:return:
"""
self.mac = Mac(mac)
self.ip_address = IpAddress(ipaddress)
self.port = port if isinstance(port, int) else int(port)
[docs] def post_create_update(self, raw_device_info):
"""
Update the device with information not available at creation.
:param raw_device_info: info about this block that may be useful
"""
self.registers = {'tx': [], 'rx': []}
for register in self.parent.registers:
if register.name.find(self.name + '_') == 0:
name = register.name.replace(self.name + '_', '')
if name[0:2] == 'tx' and name.find('txs_') == -1:
self.registers['tx'].append(register.name)
elif name[0:2] == 'rx' and name.find('rxs_') == -1:
self.registers['rx'].append(register.name)
else:
if not (name.find('txs_') == 0 or name.find('rxs_') == 0):
LOGGER.warn('%s: odd register name %s under tengbe '
'block' % (self.fullname, register.name))
self.snaps = {'tx': None, 'rx': None}
for snapshot in self.parent.snapshots:
if snapshot.name.find(self.name + '_') == 0:
name = snapshot.name.replace(self.name + '_', '')
if name == 'txs_ss':
self.snaps['tx'] = snapshot.name
elif name == 'rxs_ss':
self.snaps['rx'] = snapshot.name
else:
errmsg = '%s: incorrect snap %s under tengbe ' \
'block' % (self.fullname, snapshot.name)
LOGGER.error(errmsg)
raise RuntimeError(errmsg)
def _check(self):
"""
Does this device exist on the parent and it is accessible?
"""
self.parent.read(self.name, 1)
[docs] def read_txsnap(self):
"""
Read the TX snapshot embedded in this TenGBE yellow block
:return:
"""
tmp = self.parent.memory_devices[self.name + '_txs_ss'].read(timeout=10)
return tmp['data']
[docs] def read_rxsnap(self):
"""
Read the RX snapshot embedded in this TenGBE yellow block
:return:
"""
tmp = self.parent.memory_devices[self.name + '_rxs_ss'].read(timeout=10)
return tmp['data']
[docs] def read_rx_counters(self):
"""
Read all RX counters embedded in this TenGBE yellow block
"""
results = {}
for reg in self.registers['rx']:
results[reg] = self.parent.memory_devices[reg].read()['data']['reg']
return results
[docs] def read_tx_counters(self):
"""
Read all TX counters embedded in this TenGBE yellow block
"""
results = {}
for reg in self.registers['tx']:
results[reg] = self.parent.memory_devices[reg].read()['data']['reg']
return results
[docs] def read_counters(self):
"""
Read all the counters embedded in this TenGBE yellow block
"""
results = {}
for direction in ['tx', 'rx']:
for reg in self.registers[direction]:
tmp = self.parent.memory_devices[reg].read()
results[reg] = tmp['data']['reg']
return results
[docs] def rx_okay(self, wait_time=0.2, checks=10):
"""
Is this gbe core receiving okay?
i.e. _rxctr incrementing and _rxerrctr not incrementing
:param wait_time: seconds to wait between checks
:param checks: times to run check
:return: True/False
"""
if checks < 2:
raise RuntimeError('Cannot check less often than twice?')
fields = {
# name, required, True=same|False=different
self.name + '_rxctr': (True, False),
self.name + '_rxfullctr': (False, True),
self.name + '_rxofctr': (False, True),
self.name + '_rxerrctr': (True, True),
self.name + '_rxvldctr': (False, False),
}
result, message = check_changing_status(fields, self.read_rx_counters,
wait_time, checks)
if not result:
LOGGER.error('%s: %s' % (self.fullname, message))
return False
return True
[docs] def tx_okay(self, wait_time=0.2, checks=10):
"""
Is this gbe core transmitting okay?
i.e. _txctr incrementing and _txerrctr not incrementing
:param wait_time: seconds to wait between checks
:param checks: times to run check
:return: True/False
"""
if checks < 2:
raise RuntimeError('Cannot check less often than twice?')
fields = {
# name, required, True=same|False=different
self.name + '_txctr': (True, False),
self.name + '_txfullctr': (False, True),
self.name + '_txofctr': (False, True),
self.name + '_txerrctr': (False, True),
self.name + '_txvldctr': (False, False),
}
result, message = check_changing_status(fields, self.read_tx_counters,
wait_time, checks)
if not result:
LOGGER.error('%s: %s' % (self.fullname, message))
return False
return True
# def fabric_start(self):
# """
# Setup the interface by writing to the fabric directly, bypassing tap.
# :param self:
# :return:
# """
# if self.tap_running():
# log_runtime_error(
# LOGGER, 'TAP running on %s, stop tap before '
# 'accessing fabric directly.' % self.name)
# mac_location = 0x00
# ip_location = 0x10
# port_location = 0x22
# self.parent.write(self.name, self.mac.packed(), mac_location)
# self.parent.write(self.name, self.ip_address.packed(), ip_location)
# # self.parent.write_int(self.name, self.port, offset = port_location)
[docs] def dhcp_start(self):
"""
Configure this interface, then start a DHCP client on ALL interfaces.
"""
if self.mac is None:
# TODO get MAC from EEPROM serial number and assign here
self.mac = '0'
reply, _ = self.parent.transport.katcprequest(
name='tap-start', request_timeout=5,
require_ok=True,
request_args=(self.name, self.name, '0.0.0.0',
str(self.port), str(self.mac), ))
if reply.arguments[0] != 'ok':
raise RuntimeError('%s: failure starting tap driver.' % self.name)
reply, _ = self.parent.transport.katcprequest(
name='tap-arp-config', request_timeout=1,
require_ok=True,
request_args=(self.name, 'mode', '0'))
if reply.arguments[0] != 'ok':
raise RuntimeError('%s: failure disabling ARP.' % self.name)
reply, _ = self.parent.transport.katcprequest(
name='tap-dhcp', request_timeout=30,
require_ok=True,
request_args=(self.name, ))
if reply.arguments[0] != 'ok':
raise RuntimeError('%s: failure starting DHCP client.' % self.name)
reply, _ = self.parent.transport.katcprequest(
name='tap-arp-config', request_timeout=1,
require_ok=True,
request_args=(self.name, 'mode', '-1'))
if reply.arguments[0] != 'ok':
raise RuntimeError('%s: failure re-enabling ARP.' % self.name)
# it looks like the command completed without error, so
# update the basic core details
self.get_10gbe_core_details()
[docs] def tap_start(self, restart=False):
"""
Program a 10GbE device and start the TAP driver.
:param restart: stop before starting
"""
if len(self.name) > 8:
raise NameError('%s: tap device identifier must be shorter than 9 '
'characters..' % self.fullname)
if restart:
self.tap_stop()
if self.tap_running():
LOGGER.info('%s: tap already running.' % self.fullname)
return
LOGGER.info('%s: starting tap driver.' % self.fullname)
reply, _ = self.parent.transport.katcprequest(
name='tap-start', request_timeout=-1, require_ok=True,
request_args=(self.name, self.name, str(self.ip_address),
str(self.port), str(self.mac), ))
if reply.arguments[0] != 'ok':
raise RuntimeError('%s: failure starting tap driver.' %
self.fullname)
[docs] def tap_stop(self):
"""
Stop a TAP driver.
"""
if not self.tap_running():
return
LOGGER.info('%s: stopping tap driver.' % self.fullname)
reply, _ = self.parent.transport.katcprequest(
name='tap-stop', request_timeout=-1,
require_ok=True, request_args=(self.name, ))
if reply.arguments[0] != 'ok':
raise RuntimeError('%s: failure stopping tap '
'device.' % self.fullname)
[docs] def tap_info(self):
"""
Get info on the tap instance running on this interface.
"""
uninforms = []
def handle_inform(msg):
uninforms.append(msg)
self.parent.unhandled_inform_handler = handle_inform
_, informs = self.parent.transport.katcprequest(
name='tap-info', request_timeout=-1,
require_ok=False, request_args=(self.name, ))
self.parent.unhandled_inform_handler = None
# process the tap-info
if len(informs) == 1:
return {'name': informs[0].arguments[0],
'ip': informs[0].arguments[1]}
elif len(informs) == 0:
return {'name': '', 'ip': ''}
else:
raise RuntimeError('%s: invalid return from tap-info?' %
self.fullname)
# TODO - this request should return okay if the tap isn't
# running - it shouldn't fail
# if reply.arguments[0] != 'ok':
# log_runtime_error(LOGGER, 'Failure getting tap info for '
# 'device %s." % str(self))
[docs] def tap_running(self):
"""
Determine if an instance if tap is already running on for this
ten GBE interface.
"""
tapinfo = self.tap_info()
if tapinfo['name'] == '':
return False
return True
[docs] def tap_arp_reload(self):
"""
Have the tap driver reload its ARP table right now.
:return:
"""
reply, _ = self.parent.transport.katcprequest(
name="tap-arp-reload", request_timeout=-1,
require_ok=True, request_args=(self.name, ))
if reply.arguments[0] != 'ok':
raise RuntimeError('Failure requesting ARP reload for tap '
'device %s.' % str(self))
[docs] def multicast_receive(self, ip_str, group_size):
"""
Send a request to KATCP to have this tap instance send a multicast
group join request.
:param ip_str: A dotted decimal string representation of the base
mcast IP address.
:param group_size: An integer for how many mcast addresses from
base to respond to.
"""
# mask = 255*(2 ** 24) + 255*(2 ** 16) + 255*(2 ** 8) + (255-group_size)
# self.parent.write_int(self.name, str2ip(ip_str), offset=12)
# self.parent.write_int(self.name, mask, offset=13)
# mcast_group_string = ip_str + '+' + str(group_size)
mcast_group_string = ip_str
reply, _ = self.parent.transport.katcprequest(
'tap-multicast-add', -1, True, request_args=(self.name, 'recv',
mcast_group_string, ))
if reply.arguments[0] == 'ok':
if mcast_group_string not in self.multicast_subscriptions:
self.multicast_subscriptions.append(mcast_group_string)
return
else:
raise RuntimeError('%s: failed adding multicast receive %s to '
'tap device.' % (self.fullname,
mcast_group_string))
[docs] def multicast_remove(self, ip_str):
"""
Send a request to be removed from a multicast group.
:param ip_str: A dotted decimal string representation of the base
mcast IP address.
"""
try:
reply, _ = self.parent.transport.katcprequest(
'tap-multicast-remove', -1, True,
request_args=(self.name, IpAddress.str2ip(ip_str), ))
except:
raise RuntimeError('%s: tap-multicast-remove does not seem to '
'be supported on %s' % (self.fullname,
self.parent.host))
if reply.arguments[0] == 'ok':
if ip_str not in self.multicast_subscriptions:
LOGGER.warning(
'%s: That is odd, %s removed from mcast subscriptions, but '
'it was not in its list of sbscribed addresses.' % (
self.fullname, ip_str))
self.multicast_subscriptions.remove(ip_str)
return
else:
raise RuntimeError('%s: failed removing multicast address %s '
'from tap device' % (self.fullname,
IpAddress.str2ip(ip_str)))
def _fabric_enable_disable(self, target_val):
# 0x20 or (0x20 / 4)? What was the /4 for?
word_bytes = list(struct.unpack('>4B',
self.parent.read(self.name, 4, 0x20)))
if word_bytes[1] == target_val:
return
word_bytes[1] = target_val
word_packed = struct.pack('>4B', *word_bytes)
self.parent.write(self.name, word_packed, 0x20)
[docs] def fabric_enable(self):
"""
Enable the core fabric
:return:
"""
self._fabric_enable_disable(1)
[docs] def fabric_disable(self):
"""
Enable the core fabric
:return:
"""
self._fabric_enable_disable(0)
[docs] def fabric_soft_reset_toggle(self):
"""
Toggle the fabric soft reset
:return:
"""
word_bytes = struct.unpack('>4B', self.parent.read(self.name, 4, 0x20))
word_bytes = list(word_bytes)
def write_val(val):
word_bytes[0] = val
word_packed = struct.pack('>4B', *word_bytes)
if val == 0:
self.parent.write(self.name, word_packed, 0x20)
else:
self.parent.blindwrite(self.name, word_packed, 0x20)
if word_bytes[0] == 1:
write_val(0)
write_val(1)
write_val(0)
[docs] def get_10gbe_core_details(self, read_arp=False, read_cpu=False):
"""
Get 10GbE core details.
assemble struct for header stuff...
0x00 - 0x07: MAC address
0x08 - 0x0b: Not used
0x0c - 0x0f: Gateway addr
0x10 - 0x13: IP addr
0x14 - 0x17: Not assigned
0x18 - 0x1b: Buffer sizes
0x1c - 0x1f: Not assigned
0x20 : Soft reset (bit 0)
0x21 : Fabric enable (bit 0)
0x22 - 0x23: Fabric port
0x24 - 0x27: XAUI status (bit 2,3,4,5 = lane sync, bit6 = chan_bond)
0x28 - 0x2b: PHY config
0x28 : RX_eq_mix
0x29 : RX_eq_pol
0x2a : TX_preemph
0x2b : TX_diff_ctrl
0x30 - 0x33: Multicast IP RX base address
0x34 - 0x37: Multicast IP mask
0x38 - 0x3b: Multicast subnet mask
0x1000 : CPU TX buffer
0x2000 : CPU RX buffer
0x3000 : ARP tables start
word_width = 8
self.add_field(Bitfield.Field('mac0', 0, word_width, 0, 0 * word_width))
self.add_field(Bitfield.Field('mac1', 0, word_width, 0, 1 * word_width))
self.add_field(Bitfield.Field('mac2', 0, word_width, 0, 2 * word_width))
self.add_field(Bitfield.Field('mac3', 0, word_width, 0, 3 * word_width))
self.add_field(Bitfield.Field('mac4', 0, word_width, 0, 4 * word_width))
self.add_field(Bitfield.Field('mac5', 0, word_width, 0, 5 * word_width))
self.add_field(Bitfield.Field('mac6', 0, word_width, 0, 6 * word_width))
self.add_field(Bitfield.Field('mac7', 0, word_width, 0, 7 * word_width))
self.add_field(Bitfield.Field('unused_1', 0, (0x0c - 0x08) * word_width, 0, 8 * word_width))
self.add_field(Bitfield.Field('gateway_ip0', 0, word_width, 0, 0x0c * word_width))
self.add_field(Bitfield.Field('gateway_ip1', 0, word_width, 0, 0x0d * word_width))
self.add_field(Bitfield.Field('gateway_ip2', 0, word_width, 0, 0x0e * word_width))
self.add_field(Bitfield.Field('gateway_ip3', 0, word_width, 0, 0x0f * word_width))
self.add_field(Bitfield.Field('ip0', 0, word_width, 0, 0x10 * word_width))
self.add_field(Bitfield.Field('ip1', 0, word_width, 0, 0x11 * word_width))
self.add_field(Bitfield.Field('ip2', 0, word_width, 0, 0x12 * word_width))
self.add_field(Bitfield.Field('ip3', 0, word_width, 0, 0x13 * word_width))
self.add_field(Bitfield.Field('unused_2', 0, (0x18 - 0x14) * word_width, 0, 0x14 * word_width))
self.add_field(Bitfield.Field('buf_sizes', 0, (0x1c - 0x18) * word_width, 0, 0x18 * word_width))
self.add_field(Bitfield.Field('unused_3', 0, (0x20 - 0x1c) * word_width, 0, 0x1c * word_width))
self.add_field(Bitfield.Field('soft_reset', 2, 1, 0, 0x20 * word_width))
self.add_field(Bitfield.Field('fabric_enable', 2, 1, 0, 0x21 * word_width))
self.add_field(Bitfield.Field('port', 0, (0x24 - 0x22) * word_width, 0, 0x22 * word_width))
self.add_field(Bitfield.Field('xaui_status', 0, (0x28 - 0x24) * word_width, 0, 0x24 * word_width))
self.add_field(Bitfield.Field('rx_eq_mix', 0, word_width, 0, 0x28 * word_width))
self.add_field(Bitfield.Field('rq_eq_pol', 0, word_width, 0, 0x29 * word_width))
self.add_field(Bitfield.Field('tx_preempth', 0, word_width, 0, 0x2a * word_width))
self.add_field(Bitfield.Field('tx_diff_ctrl', 0, word_width, 0, 0x2b * word_width))
#self.add_field(Bitfield.Field('buffer_tx', 0, 0x1000 * word_width, 0, 0x1000 * word_width))
#self.add_field(Bitfield.Field('buffer_rx', 0, 0x1000 * word_width, 0, 0x2000 * word_width))
#self.add_field(Bitfield.Field('arp_table', 0, 0x1000 * word_width, 0, 0x3000 * word_width))
"""
data = self.parent.read(self.name, 16384)
data = list(struct.unpack('>16384B', data))
returnval = {
'ip_prefix': '%i.%i.%i.' % (data[0x10], data[0x11], data[0x12]),
'ip': IpAddress('%i.%i.%i.%i' % (data[0x10], data[0x11],
data[0x12], data[0x13])),
'mac': Mac('%i:%i:%i:%i:%i:%i' % (data[0x02], data[0x03],
data[0x04], data[0x05],
data[0x06], data[0x07])),
'gateway_ip': IpAddress('%i.%i.%i.%i' % (data[0x0c], data[0x0d],
data[0x0e], data[0x0f])),
'fabric_port': ((data[0x22] << 8) + (data[0x23])),
'fabric_en': bool(data[0x21] & 1),
'xaui_lane_sync': [bool(data[0x27] & 4), bool(data[0x27] & 8),
bool(data[0x27] & 16), bool(data[0x27] & 32)],
'xaui_status': [data[0x24], data[0x25], data[0x26], data[0x27]],
'xaui_chan_bond': bool(data[0x27] & 64),
'xaui_phy': {'rx_eq_mix': data[0x28], 'rx_eq_pol': data[0x29],
'tx_preemph': data[0x2a], 'tx_swing': data[0x2b]},
'multicast': {'base_ip': IpAddress('%i.%i.%i.%i' % (
data[0x30], data[0x31], data[0x32], data[0x33])),
'ip_mask': IpAddress('%i.%i.%i.%i' % (
data[0x34], data[0x35], data[0x36], data[0x37])),
'subnet_mask': IpAddress('%i.%i.%i.%i' % (
data[0x38], data[0x39], data[0x3a], data[0x3b])),
'rx_ips': []}
}
possible_addresses = [int(returnval['multicast']['base_ip'])]
mask_int = int(returnval['multicast']['ip_mask'])
for ctr in range(32):
mask_bit = (mask_int >> ctr) & 1
if not mask_bit:
new_ips = []
for ip in possible_addresses:
new_ips.append(ip & (~(1 << ctr)))
new_ips.append(new_ips[-1] | (1 << ctr))
possible_addresses.extend(new_ips)
tmp = list(set(possible_addresses))
for ip in tmp:
returnval['multicast']['rx_ips'].append(IpAddress(ip))
if read_arp:
returnval['arp'] = self.get_arp_details(data)
if read_cpu:
returnval.update(self.get_cpu_details(data))
self.core_details = returnval
return returnval
[docs] def get_arp_details(self, port_dump=None):
"""
Get ARP details from this interface.
:param port_dump: list - A list of raw bytes from interface memory.
"""
if port_dump is None:
port_dump = self.parent.read(self.name, 16384)
port_dump = list(struct.unpack('>16384B', port_dump))
returnval = []
for addr in range(256):
mac = []
for ctr in range(2, 8):
mac.append(port_dump[0x3000 + (addr * 8) + ctr])
returnval.append(mac)
return returnval
[docs] def get_cpu_details(self, port_dump=None):
"""
Read details of the CPU buffers.
:param port_dump:
:return:
"""
if port_dump is None:
port_dump = self.parent.read(self.name, 16384)
port_dump = list(struct.unpack('>16384B', port_dump))
returnval = {'cpu_tx': {}}
for ctr in range(4096 / 8):
tmp = []
for ctr2 in range(8):
tmp.append(port_dump[4096 + (8 * ctr) + ctr2])
returnval['cpu_tx'][ctr*8] = tmp
returnval['cpu_rx_buf_unack_data'] = port_dump[6 * 4 + 3]
returnval['cpu_rx'] = {}
for ctr in range(port_dump[6 * 4 + 3] + 8):
tmp = []
for ctr2 in range(8):
tmp.append(port_dump[8192 + (8 * ctr) + ctr2])
returnval['cpu_rx'][ctr * 8] = tmp
return returnval
[docs] def print_10gbe_core_details(self, arp=False, cpu=False, refresh=True):
"""
Prints 10GbE core details.
:param arp: boolean, include the ARP table
:param cpu: boolean, include the CPU packet buffers
:param refresh: read the 10gbe details first
"""
if refresh or (self.core_details is None):
self.get_10gbe_core_details(arp, cpu)
details = self.core_details
print('------------------------')
print('%s configuration:' % self.fullname)
print('MAC: ', Mac.mac2str(int(details['mac'])))
print('Gateway: ', details['gateway_ip'])
print('IP: ', details['ip'])
print('Fabric port: ',)
print('%5d' % details['fabric_port'])
print('Fabric interface is currently: %s' %
'Enabled' if details['fabric_en'] else 'Disabled')
print('XAUI Status: ', details['xaui_status'])
for ctr in range(0, 4):
print('\tlane sync %i: %i' % (ctr, details['xaui_lane_sync'][ctr]))
print('\tChannel bond: %i' % details['xaui_chan_bond'])
print('XAUI PHY config: ')
print('\tRX_eq_mix: %2X' % details['xaui_phy']['rx_eq_mix'])
print('\tRX_eq_pol: %2X' % details['xaui_phy']['rx_eq_pol'])
print('\tTX_pre-emph: %2X' % details['xaui_phy']['tx_preemph'])
print('\tTX_diff_ctrl: %2X' % details['xaui_phy']['tx_swing'])
print('Multicast:')
for k in details['multicast']:
print('\t%s: %s' % (k, details['multicast'][k]))
if arp:
self.print_arp_details(refresh=refresh, only_hits=True)
if cpu:
self.print_cpu_details(refresh=refresh)
[docs] def print_arp_details(self, refresh=False, only_hits=False):
"""
Print nicely formatted ARP info.
:param refresh:
:param only_hits:
:return:
"""
details = self.core_details
if details is None:
refresh = True
elif 'arp' not in details.keys():
refresh = True
if refresh:
self.get_10gbe_core_details(read_arp=True)
print('ARP Table: ')
for ip_address in range(256):
all_fs = True
if only_hits:
for mac in range(0, 6):
if details['arp'][ip_address][mac] != 255:
all_fs = False
break
printmac = True
if only_hits and all_fs:
printmac = False
if printmac:
print('IP: %s%3d: MAC:' % (details['ip_prefix'], ip_address),)
for mac in range(0, 6):
print('%02X' % details['arp'][ip_address][mac],)
print('')
[docs] def print_cpu_details(self, refresh=False):
"""
Print nicely formatted CPU details info.
:param refresh:
:return:
"""
details = self.core_details
if details is None:
refresh = True
elif 'cpu_rx' not in details.keys():
refresh = True
if refresh:
self.get_10gbe_core_details(read_cpu=True)
print('CPU TX Interface (at offset 4096 bytes):')
print('Byte offset: Contents (Hex)')
for key, value in details['cpu_tx'].iteritems():
print('%04i: ' % key,)
for val in value:
print('%02x' % val,)
print('')
print('------------------------')
print('CPU RX Interface (at offset 8192bytes):')
print('CPU packet RX buffer unacknowledged data: %i' %
details['cpu_rx_buf_unack_data'])
print('Byte offset: Contents (Hex)')
for key, value in details['cpu_rx'].iteritems():
print('%04i: ' % key,)
for val in value:
print('%02x' % val,)
print('')
print('------------------------')
[docs] def set_arp_table(self, macs):
"""Set the ARP table with a list of MAC addresses. The list, `macs`,
is passed such that the zeroth element is the MAC address of the
device with IP XXX.XXX.XXX.0, and element N is the MAC address of the
device with IP XXX.XXX.XXX.N"""
macs = list(macs)
macs_pack = struct.pack('>%dQ' % (len(macs)), *macs)
self.parent.write(self.name, macs_pack, offset=0x3000)
# end