Module pysmarticle
Expand source code
# SmarticleSwarm.py
# Alex Samland
# August 13, 2019
# Module for communicating with smarticle swarm over Xbee3s
# import time
# from .XbeeComm import XbeeComm
# from .StreamThread import StreamThread
# import threading
# import numpy as np
class SmarticleSwarm(object):
'''
## Description
---
Class that Utilizies XbeeComm class for performing smarticle operations
coordinated with Smarticle.h and Smarticle.cpp used on the Smarticle microcontrollers
Constructor initalizes and opens local base xbee (connected via USB) with given port and baud rate and adds it to attribute `base`
'''
# msg_code_dict = {'toggle_led': 0x20, 'set_mode': 0x21, 'toggle_t4_interrupt': 0x22,\
# 'set_transmit_counts': 0x23, 'select_gait': 0x24, 'toggle_read_sensors': 0x25,\
# 'toggle_transmit': 0x26, 'set_gait_epsilon': 0x27, 'set_pose_noise': 0x28,\
# 'toggle_light_plank': 0x29, 'set_debug': 0x2A, 'set_id': 0x2B, 'set_pose': 0x30,\
# 'set_sync_noise': 0x31, 'set_stream_timing_noise': 0x32,\
# 'set_light_plank_threshold': 0x40, 'init_gait': 0x41, 'stream_pose': 0x42, 'set_plank': 0x43}
# msg_prefix = bytearray([0x13,0x13])
# msg_end = bytearray([0x0A])
# ASCII_OFFSET = 32
# SAMPLE_TIME_MS = 10
def __init__(self, port='/dev/tty.usbserial-DN050I6Q', baud_rate = 9600, debug = 0):
'''
## Remote Device
---
Many methods take in a `remote_device` argument.The XBee sends message to the remote Xbee(s) in one of three ways depending on the `remote_device` argument<br/>
  <b>1.</b> remote_device == `None`:<br/>
    broadcasts message without acks using `broadcast()`<br/>
  <b>2.</b> remote_device == `True`:<br/>
    broadcasts message with acks using `ack_broadcast()`<br/>
  <b>3.</b> remote_device == element of `self.xb.devices.values()`:<br/>
    send message to single Xbee using `send()`<br/>
## Arguments
---
| Argument | Type | Description | Default Value |
| :------ | :-- | :--------- | :----------- |
| port | `string` | USB port to open for local XBee | set for your own convenience |
| baud_rate | `int` | Baud rate to use for USB serial port | 9600 |
| debug | `int` | Enables/disables print statements in class | 0 |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
'''
self.xb = XbeeComm(port,baud_rate,debug)
self.lock = threading.Lock()
@classmethod
def _format_msg(self, msg):
return self.msg_prefix+msg+self.msg_end
@classmethod
def _convert_to_2_chars(self, val):
val = val&0x3ffff
c1 = (val>>7)+self.ASCII_OFFSET
c2 = (val&0x7f)+self.ASCII_OFFSET
return [c1,c2]
def build_network(self, exp_n_smarticles=None):
'''
## Description
---
Clears `devices` dictionary as well as all devices on network.
Discovers remote devices on network, initializes dictionary of all connected devices.
Will ask for retries if expected no of smarticles is not discovered
modified from Digi XBee example DiscoverDevicesSample.py
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| exp_n_smarticles | `int` | Expected number of smarticles to discover | None |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
self.xb.discover()
if exp_n_smarticles != None:
if (len(self.xb.devices))<exp_n_smarticles:
inp= input('Only discovered {} out of {} expected Smarticles. Retry discovery (Y/N)\n'.format(len(self.xb.devices),exp_n_smarticles))
if inp[0].upper()=='Y':
self.build_network(exp_n_smarticles)
else:
#purge Smarticle Xbee buffer
time.sleep(0.5)
self.xb.broadcast('\n')
time.sleep(0.5)
print('Network Discovery Ended\n')
else:
#purge Smarticle Xbee buffer
print('Successfully discovered {} out of {} expected Smarticles\n'.format(len(self.xb.devices),exp_n_smarticles))
time.sleep(0.5)
self.xb.broadcast('\n')
time.sleep(0.5)
print('Network Discovery Ended\n')
else:
#purge Smarticle Xbee buffer
time.sleep(0.5)
self.xb.broadcast('\n')
time.sleep(0.5)
print('Network Discovery Ended\n')
def send_ids(self,asynch=False):
'''
## Description
---
Sends IDs to smarticles
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| asynch | `bool` | Determines whether to send asynchronously (without ack) | False |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['set_id']
for id in self.xb.devices.keys():
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+id]))
self.xb.send(self.xb.devices[id],msg,asynch=False)
def close(self):
'''
## Description
---
closes serial connection to XBee
## Returns
---
`None`
'''
self.xb.close_base()
def set_servos(self, state, remote_device = None):
'''
## Description
---
Enables/disables updating of servos. When enabled it also resets the gait interpolation sequence back to its first initial point
When set_servos ==0, servos can still be controlled using set_pose()
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| state | `int` | Value: 1 or 0. enables/disables updating servos in timer interrupt | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['toggle_t4_interrupt']
if state != 1:
state = 0
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state]))
self.xb.command(msg, remote_device)
def set_transmit(self, state, remote_device = None):
'''
## Description
---
Enables/disables smarticle transmitting data.
Data sent in following format: '{int Photo_front}, {int photo_back}, {int photo_right} {int current_sense}'(values between 0-1023)
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| state | `int` | Value: 1 or 0. enables/disables transmitting Data | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['toggle_transmit']
if state != 1:
state = 0
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state]))
self.xb.command(msg, remote_device)
def set_light_plank(self, state, remote_device = None):
'''
## Description
---
Enables/Disables light plank (i.e. smarticles planking when light sensors are above set threshold)
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| state | `bool` | Value: 1 or 0. enables/disables light planking | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['toggle_light_plank']
if state != 1:
state = 0
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state]))
self.xb.command(msg, remote_device)
def set_sensor_threshold(self, thresh, remote_device = None):
'''
## Description
---
Sets sensor threshold that triggers planking
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| thresh | `list of int` | Threshold values for PR_front, PR_back, PR_right, current sense | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
assert len(thresh)==4, 'Threshold list must be 4 elements'
msg_code = self.msg_code_dict['set_light_plank_threshold']
val = bytearray()
for t in thresh:
val+= self.convert_to_2_chars(t)
msg = self.format_msg(bytearray([msg_code]+val))
self.xb.command(msg, remote_device)
def set_read_sensors(self, state, remote_device = None):
'''
## Description
---
Enables/disables smarticle reading sensors
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| state | `int` | Value: 1 or 0. enables/disables reading sensors | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['toggle_read_sensors']
if state != 1:
state = 0
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state]))
self.xb.command(msg, remote_device)
def set_transmit_period(self, period_ms, remote_device=None):
'''
## Description
---
Sets approximate data transmit period of smarticles
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| period_ms | `int` | transmit period in ms. Max value of 2000s | N/A |
| remote_device | --| see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['set_transmit_counts']
counts = period_ms//self.SAMPLE_TIME_MS
counts = np.clip(counts, 1, 200)
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+counts]))
self.xb.command(msg, remote_device)
def set_debug(self, state, remote_device=None):
'''
## Description
---
Sets debug level of smarticle
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| debug | `int` | Value: 0,1,2. Sets level of debug output. 0 is no output, 1 is limited output, 2 is full output | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['set_debug']
if state not in [0,1,2]:
state = 0
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state]))
self.xb.command(msg, remote_device)
def set_pose_epsilon(self, eps, remote_device = None):
'''
## Description
---
Sets epsilon value for smarticle poses. The smarticle will move the arm
randomly propoportional to epsilon (0-1). For example, if epsilon is 0.1,
the smarticle will move its arm to a random angle 10% of the time
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| eps | 'float' | Value: 0-1, determines proportion of random arm movements | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
# ensure eps is between 0 and 1
eps = 100*round(np.clip(eps,0,1),2)
msg_code = self.msg_code_dict['toggle_light_plank']
if state != 1:
state = 0
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+eps]))
self.xb.command(msg, remote_device)
def set_mode(self, state, remote_device = None):
'''
## Description
---
Changes operating mode of smarticle(s)
## Values
---
| Value | Mode | Description |
| :---: | :--: | :---------: |
| 0 | Idle | Smarticle does nothing. Servos detach and no data is transmitted |
| 1 | Stream servos | Servo points streamed (still in development) |
| 2 | Gait interpolate | Iterate through interpolated points sent to smarticle |
|<img width=250/>|<img width=400/>|<img width=1000/>|
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| state | `int` | Values: 0-2. 0: inactive, 1: stream servo (unfinished), 2:gait interpolate | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
assert (state>=0 and state<=2),"Mode must between 0-2"
msg_code = self.msg_code_dict['set_mode']
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state]))
self.xb.command(msg, remote_device)
def set_plank(self, state_arr, remote_device = None):
'''
## Description
---
Sets smarticle to plank or deplank Note: only active when set_servos == 1
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| state | `bool` | Values: 0,1. 0: deplank 1: plank | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['set_plank']
l = len(state_arr)+self.ASCII_OFFSET
state_arr = list((state_arr+self.ASCII_OFFSET).flatten())
msg = self.format_msg(bytearray([msg_code,l]+state_arr))
self.xb.command(msg, remote_device)
def set_pose(self, posL, posR, remote_device = None):
'''
## Description
---
Sets smarticle to specified servo positions
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| posL | `int` | Left servo angle: 0-180 deg | N/A |
| posR | `int` | Right servo angle: 0-180 deg | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['set_pose']
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+posL,self.ASCII_OFFSET+posR]))
self.xb.command(msg, remote_device)
def stream_pose(self, poses, remote_device=None):
'''
## Description
---
Sets smarticle to specified servo positions. Differs from set_pose in
that it sends angles over the streaming pipeline, which sends a batch message that can specify
separate commands for each smarticle in the same message. Specify id as zero to broadcast servo command to whole swarm.
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| poses | `np.array` | Nx3 array of servo commands. Each row specifies [id, angL, angR] | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['stream_pose']
l = len(poses)+self.ASCII_OFFSET
poses = list((poses+self.ASCII_OFFSET).flatten())
msg = self.format_msg(bytearray([msg_code,l]+poses))
self.xb.command(msg,remote_device)
def set_delay(self, state=-1, max_val=-1, remote_device = None):
'''
## Description
---
Enables/disables random delay in stream servo mode for smarticles.
Writing value of negative one (-1) leaves that field as is on the smarticle
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| state | `int` | Values: 0,1 see note about negative values above | -1 |
| max_val | `int` | Maximum value of random delay | -1 |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg=':SD:{},{}\n'.format(int(state),int(max_val))
self.xb.command(msg, remote_device)
def set_pose_noise(self, max_val, remote_device = None):
'''
## Description
---
Sets noise on servo positions (noisy arm angles)
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| max_val | `int` | Maximum value of random noise | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
assert max_val < 100, 'value must be less than 100'
val=int(2*max_val)
msg_code = self.msg_code_dict['set_pose_noise']
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+val]))
self.xb.command(msg, remote_device)
def set_sync_noise(self, max_val, remote_device = None):
'''
## Description
---
Sets noise on synchronization time for gait interp mode
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| max_val | `int` | Maximum value of random delay | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
timer_counts = self.convert_to_2_chars(int(max_val/0.128))
msg_code = self.msg_code_dict['set_sync_noise']
msg = self.format_msg(bytearray([msg_code]+timer_counts))
self.xb.command(msg, remote_device)
def gait_init(self, gait, delay_ms, gait_num=0, remote_device = None):
'''
## Description
---
Sends gait interpolation data to remote smarticles including:
1. left and right servo interpolation points (max 15 points each)
2. data length
3. period between interpolation points
## Gait
---
Lists of two lists: gaitLpoints and gaitRpoints. These lists must be equal in length and contain at most 15 points
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| gait | list of lists of int | [gaitLpoints, gaitRpoints] | N/A |
| period_ms | `int` | period (ms) between gait points | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = bytearray([self.msg_code_dict['init_gait']])
self.delay_ms = delay_ms
self.gait_len = len(gait[0])
timer_counts = int(delay_ms/0.128)
gaitL=gait[0]
gaitR=gait[1]
gaitL = [int(x+self.ASCII_OFFSET) for x in gaitL]
gaitR = [int(x+self.ASCII_OFFSET) for x in gaitR]
assert len(gaitL)==len(gaitR),'Gait lists must be same length'
gait_points = self.gait_len+self.ASCII_OFFSET
n = gait_num +self.ASCII_OFFSET
delay = self.convert_to_2_chars(timer_counts)
msg= self.format_msg(msg_code+bytearray([n, gait_points]+delay+gaitL+gaitR))
self.xb.command(msg, remote_device)
time.sleep(0.1) #ensure messages are not dropped as buffer isn't implemented yet
def select_gait(self, n, remote_device = None):
'''
## Description
---
Selects gait number of smarticle to execute
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| n | `int` | gait number for smarticle to execute | N/A |
| remote_device | -- | see class description | `None` |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
msg_code = self.msg_code_dict['select_gait']
msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+n]))
self.xb.command(msg, remote_device)
def sync_thread_target(self,sync_period_s, keep_time):
'''
## Description
---
Thread to keep gaits in sync. Not used directly by user but called in `init_sync_thread`
'''
time_adjust_s=sync_period_s-0.0357 #subtract 35ms based on results from timing experiments
msg = bytearray(b'\x11')
#threading.event.wait() blocks until it is a) set and then returns True or b) the specified timeout elapses in which it retrusn nothing
while self.sync_flag.wait() and not self.timer_counts.wait(timeout=(time_adjust_s)):
self.xb.broadcast(msg)
if keep_time:
t = time.time()
with self.lock:
self.sync_time_list.append(t)
def init_sync_thread(self, keep_time=False):
'''
## Description
---
Initializes gait sync thread. Must be called every time the gait sequence is updated
'''
# calculate sync period: approximately 3s but must be a multiple of the gait delay
self.sync_period_s = (self.gait_len*self.delay_ms)/1000
print('sync_period: {}'.format(self.sync_period_s))
if keep_time:
self.sync_time_list =[]
self.sync_thread = threading.Thread(target=self.sync_thread_target, args= (self.sync_period_s,keep_time), daemon = True)
self.timer_counts = threading.Event()
self.sync_flag = threading.Event()
self.sync_thread.start()
def start_sync(self):
'''
## Description
---
starts gait sequence and sync thread
## Returns
---
`None`
'''
delay_t = self.delay_ms/3000
#starts gait sequence
self.set_servos(1)
#wait 1/3 of gait delay to begin sync sequene
time.sleep(delay_t)
#set sync flag so that it returns True and stops blocking
self.sync_flag.set()
def stop_sync(self):
'''
## Description
---
stops gait sequence and pauses gait sync thread
## Returns
---
`None`
'''
#stop gait sequence
self.set_servos(0)
#cause sync_flag.wait() to block
self.sync_flag.clear()
class XbeeComm(object):
''''
## Description
---
Class for simplified XBee communication with remote Smarticles
Built on top of Digi XBee python library:
https://github.com/digidotcom/xbee-python
<br/>
The Constructor initalizes and opens local base xbee (connected via USB) with given port and baud rate and adds it to attribute `base`'''
def __init__(self, port='/dev/tty.usbserial-DN050I6Q', baud_rate = 9600, debug = 0):
'''
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| port | `string` | USB port to open for local XBee | set for your own convenience |
| baud_rate | `int` | Baud rate to use for USB serial port | 9600 |
| debug | `int` | Enables/disables print statements in class | 0 |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
'''
self.base = Raw802Device(port, baud_rate)
self.debug = debug
self.open_base()
self.callbacks_added = False
self.ascii_offset = 32
def open_base(self):
'''
## Description
---
Opens local xbee defined by attribute 'base' which is set in `__init__`
## Arguments
---
None
## Returns
---
This function returns a 1 if it successfully opens the local port and a 0 otherwise.
'''
success = 0
self.base.open()
if self.base is not None and self.base.is_open():
success = 1
elif self.debug:
print("Failed to open device")
return success
def close_base(self):
'''
## Description
---
Closes local xbee defined by attribute 'base' which is set in `__init__`
## Arguments
---
None
## Returns
---
`None`
'''
if self.base is not None and self.base.is_open():
self.base.close()
def add_callbacks(self):
'''
## Description
---
adds callback functions for discovery processess
'''
# Callback for discovered devices.
def callback_device_discovered(remote):
print("Device discovered: %s" % remote)
self.add_remote(remote)
# Callback for discovery finished.
def callback_discovery_finished(status):
if status == NetworkDiscoveryStatus.SUCCESS:
# print("Discovery process finished successfully.\nDevices: {}".format(self.devices))
print("Discovery cycle finished\n")
else:
print("There was an error discovering devices: %s" % status.description)
self.network.add_device_discovered_callback(callback_device_discovered)
self.network.add_discovery_process_finished_callback(callback_discovery_finished)
def add_remote(self,remote_device):
'''
## Description
---
takes in RemoteXbee Object and creates an attribute for it based on its NodeID and adds it to the device dictionary
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| remote_device | `RemoteXbeeDevice` Object | Stores info such as ID and address; see Digi Xbee Documentation for more info | N/A |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
More info on RemoteXbeeDevice:
https://xbplib.readthedocs.io/en/stable/api/digi.xbee.devices.html#digi.xbee.devices.RemoteXBeeDevice
'''
smarticle_number = int(''.join([s for s in remote_device.get_node_id() if s.isdigit()]))
setattr(self,remote_device.get_node_id(),remote_device)
self.devices[smarticle_number]=remote_device
def discover(self):
'''
## Description
---
Clears `devices` dictionary as well as all devices on network.
Discovers remote devices on network, initializes dictionary of all connected devices.
modified from Digi XBee example DiscoverDevicesSample.py
## Arguments
---
None
## Returns
---
`None`
'''
self.devices ={}
self.network = self.base.get_network()
self.network.clear()
self.network.set_discovery_timeout(15) # 15 seconds.
if self.callbacks_added == False:
self.add_callbacks()
self.callbacks_added = True
self.network.start_discovery_process()
print("Discovering remote XBee devices...")
while self.network.is_discovery_running():
time.sleep(0.1)
def send(self, remote_device, msg, asynch = False):
'''
## Description
---
Sends message to remote xbee and receives acknowledgement on sucess.
modified from digi's example SendDataSample.py
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| remote_device | `RemoteXbeeDevice` Object | Stores info such as ID and address; see Digi Xbee Documentation for more info | N/A |
| msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A |
| asynch | `bool` | Determines whether to send asynchronously (without ack) or not | False |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
More info on RemoteXbeeDevice:
https://xbplib.readthedocs.io/en/stable/api/digi.xbee.devices.html#digi.xbee.devices.RemoteXBeeDevice
'''
if remote_device is None:
if self.debug():
print("Could not find the remote device")
exit(1)
if self.debug:
print("Sending data to {} >> {}...".format(remote_device.get_node_id(), msg))
if asynch is True:
self.base.send_data_async(remote_device, msg)
else:
self.base.send_data(remote_device, msg)
if self.debug:
print("Success")
def broadcast(self, msg):
'''
## Description
---
Broadcasts to all xbees on network. NOTE: there are no acknowledgements when using broadcast
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
self.base.send_data_broadcast(msg)
def ack_broadcast(self,msg):
'''
## Description
---
Broadcasts to all xbees on network by sending message individually to each remote xbee in `devices` dictionary.
This broadcast includes acknowledgements.
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
for remote_dev in list(self.devices.values()):
self.send(remote_dev,msg)
def command(self, msg, remote_device = None, asynch = False):
'''
## Description
---
Sends message to remote Xbee in one of three ways depending on the `remote_device` argument. see `SmarticleSwarm` class description
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A |
| remote_device | -- | see class description | `None` |
| asynch | `bool` | Determines whether to send asynchronously (without ack) or not | False |
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
if remote_device == None:
self.broadcast(msg)
elif (isinstance(remote_device,bool) and remote_device==True):
self.ack_broadcast(msg)
else:
assert remote_device in self.devices.values(),"Remote Device not found in active devices"
self.send(remote_device,msg, asynch)
def add_rx_callback(self, callback_fun):
'''
## Description
---
Adds a data received callback function that is called everytime a message is received
## Arguments
---
| Argument | Type | Description | Default Value |
| :------: | :--: | :---------: | :-----------: |
| callback_fun | function | Function that takes 'XbeeMessage' as input | N/A |`
|<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>|
## Returns
---
`None`
'''
self.base.add_data_received_callback(callback_fun)
Classes
class SmarticleSwarm (port='/dev/tty.usbserial-DN050I6Q', baud_rate=9600, debug=0)
-
Description
Class that Utilizies XbeeComm class for performing smarticle operations coordinated with Smarticle.h and Smarticle.cpp used on the Smarticle microcontrollers
Constructor initalizes and opens local base xbee (connected via USB) with given port and baud rate and adds it to attribute
base
Remote Device
Many methods take in a
remote_device
argument.The XBee sends message to the remote Xbee(s) in one of three ways depending on theremote_device
argument
1. remote_device ==None
:
broadcasts message without acks usingbroadcast()
2. remote_device ==True
:
broadcasts message with acks usingack_broadcast()
3. remote_device == element ofself.xb.devices.values()
:
send message to single Xbee usingsend()
Arguments
Argument Type Description Default Value port string
USB port to open for local XBee set for your own convenience baud_rate int
Baud rate to use for USB serial port 9600 debug int
Enables/disables print statements in class 0 Expand source code
class SmarticleSwarm(object): ''' ## Description --- Class that Utilizies XbeeComm class for performing smarticle operations coordinated with Smarticle.h and Smarticle.cpp used on the Smarticle microcontrollers Constructor initalizes and opens local base xbee (connected via USB) with given port and baud rate and adds it to attribute `base` ''' # msg_code_dict = {'toggle_led': 0x20, 'set_mode': 0x21, 'toggle_t4_interrupt': 0x22,\ # 'set_transmit_counts': 0x23, 'select_gait': 0x24, 'toggle_read_sensors': 0x25,\ # 'toggle_transmit': 0x26, 'set_gait_epsilon': 0x27, 'set_pose_noise': 0x28,\ # 'toggle_light_plank': 0x29, 'set_debug': 0x2A, 'set_id': 0x2B, 'set_pose': 0x30,\ # 'set_sync_noise': 0x31, 'set_stream_timing_noise': 0x32,\ # 'set_light_plank_threshold': 0x40, 'init_gait': 0x41, 'stream_pose': 0x42, 'set_plank': 0x43} # msg_prefix = bytearray([0x13,0x13]) # msg_end = bytearray([0x0A]) # ASCII_OFFSET = 32 # SAMPLE_TIME_MS = 10 def __init__(self, port='/dev/tty.usbserial-DN050I6Q', baud_rate = 9600, debug = 0): ''' ## Remote Device --- Many methods take in a `remote_device` argument.The XBee sends message to the remote Xbee(s) in one of three ways depending on the `remote_device` argument<br/>   <b>1.</b> remote_device == `None`:<br/>     broadcasts message without acks using `broadcast()`<br/>   <b>2.</b> remote_device == `True`:<br/>     broadcasts message with acks using `ack_broadcast()`<br/>   <b>3.</b> remote_device == element of `self.xb.devices.values()`:<br/>     send message to single Xbee using `send()`<br/> ## Arguments --- | Argument | Type | Description | Default Value | | :------ | :-- | :--------- | :----------- | | port | `string` | USB port to open for local XBee | set for your own convenience | | baud_rate | `int` | Baud rate to use for USB serial port | 9600 | | debug | `int` | Enables/disables print statements in class | 0 | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ''' self.xb = XbeeComm(port,baud_rate,debug) self.lock = threading.Lock() @classmethod def _format_msg(self, msg): return self.msg_prefix+msg+self.msg_end @classmethod def _convert_to_2_chars(self, val): val = val&0x3ffff c1 = (val>>7)+self.ASCII_OFFSET c2 = (val&0x7f)+self.ASCII_OFFSET return [c1,c2] def build_network(self, exp_n_smarticles=None): ''' ## Description --- Clears `devices` dictionary as well as all devices on network. Discovers remote devices on network, initializes dictionary of all connected devices. Will ask for retries if expected no of smarticles is not discovered modified from Digi XBee example DiscoverDevicesSample.py ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | exp_n_smarticles | `int` | Expected number of smarticles to discover | None | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' self.xb.discover() if exp_n_smarticles != None: if (len(self.xb.devices))<exp_n_smarticles: inp= input('Only discovered {} out of {} expected Smarticles. Retry discovery (Y/N)\n'.format(len(self.xb.devices),exp_n_smarticles)) if inp[0].upper()=='Y': self.build_network(exp_n_smarticles) else: #purge Smarticle Xbee buffer time.sleep(0.5) self.xb.broadcast('\n') time.sleep(0.5) print('Network Discovery Ended\n') else: #purge Smarticle Xbee buffer print('Successfully discovered {} out of {} expected Smarticles\n'.format(len(self.xb.devices),exp_n_smarticles)) time.sleep(0.5) self.xb.broadcast('\n') time.sleep(0.5) print('Network Discovery Ended\n') else: #purge Smarticle Xbee buffer time.sleep(0.5) self.xb.broadcast('\n') time.sleep(0.5) print('Network Discovery Ended\n') def send_ids(self,asynch=False): ''' ## Description --- Sends IDs to smarticles ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | asynch | `bool` | Determines whether to send asynchronously (without ack) | False | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_id'] for id in self.xb.devices.keys(): msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+id])) self.xb.send(self.xb.devices[id],msg,asynch=False) def close(self): ''' ## Description --- closes serial connection to XBee ## Returns --- `None` ''' self.xb.close_base() def set_servos(self, state, remote_device = None): ''' ## Description --- Enables/disables updating of servos. When enabled it also resets the gait interpolation sequence back to its first initial point When set_servos ==0, servos can still be controlled using set_pose() ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Value: 1 or 0. enables/disables updating servos in timer interrupt | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['toggle_t4_interrupt'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device) def set_transmit(self, state, remote_device = None): ''' ## Description --- Enables/disables smarticle transmitting data. Data sent in following format: '{int Photo_front}, {int photo_back}, {int photo_right} {int current_sense}'(values between 0-1023) ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Value: 1 or 0. enables/disables transmitting Data | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['toggle_transmit'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device) def set_light_plank(self, state, remote_device = None): ''' ## Description --- Enables/Disables light plank (i.e. smarticles planking when light sensors are above set threshold) ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `bool` | Value: 1 or 0. enables/disables light planking | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['toggle_light_plank'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device) def set_sensor_threshold(self, thresh, remote_device = None): ''' ## Description --- Sets sensor threshold that triggers planking ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | thresh | `list of int` | Threshold values for PR_front, PR_back, PR_right, current sense | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' assert len(thresh)==4, 'Threshold list must be 4 elements' msg_code = self.msg_code_dict['set_light_plank_threshold'] val = bytearray() for t in thresh: val+= self.convert_to_2_chars(t) msg = self.format_msg(bytearray([msg_code]+val)) self.xb.command(msg, remote_device) def set_read_sensors(self, state, remote_device = None): ''' ## Description --- Enables/disables smarticle reading sensors ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Value: 1 or 0. enables/disables reading sensors | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['toggle_read_sensors'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device) def set_transmit_period(self, period_ms, remote_device=None): ''' ## Description --- Sets approximate data transmit period of smarticles ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | period_ms | `int` | transmit period in ms. Max value of 2000s | N/A | | remote_device | --| see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_transmit_counts'] counts = period_ms//self.SAMPLE_TIME_MS counts = np.clip(counts, 1, 200) msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+counts])) self.xb.command(msg, remote_device) def set_debug(self, state, remote_device=None): ''' ## Description --- Sets debug level of smarticle ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | debug | `int` | Value: 0,1,2. Sets level of debug output. 0 is no output, 1 is limited output, 2 is full output | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_debug'] if state not in [0,1,2]: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device) def set_pose_epsilon(self, eps, remote_device = None): ''' ## Description --- Sets epsilon value for smarticle poses. The smarticle will move the arm randomly propoportional to epsilon (0-1). For example, if epsilon is 0.1, the smarticle will move its arm to a random angle 10% of the time ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | eps | 'float' | Value: 0-1, determines proportion of random arm movements | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' # ensure eps is between 0 and 1 eps = 100*round(np.clip(eps,0,1),2) msg_code = self.msg_code_dict['toggle_light_plank'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+eps])) self.xb.command(msg, remote_device) def set_mode(self, state, remote_device = None): ''' ## Description --- Changes operating mode of smarticle(s) ## Values --- | Value | Mode | Description | | :---: | :--: | :---------: | | 0 | Idle | Smarticle does nothing. Servos detach and no data is transmitted | | 1 | Stream servos | Servo points streamed (still in development) | | 2 | Gait interpolate | Iterate through interpolated points sent to smarticle | |<img width=250/>|<img width=400/>|<img width=1000/>| ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Values: 0-2. 0: inactive, 1: stream servo (unfinished), 2:gait interpolate | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' assert (state>=0 and state<=2),"Mode must between 0-2" msg_code = self.msg_code_dict['set_mode'] msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device) def set_plank(self, state_arr, remote_device = None): ''' ## Description --- Sets smarticle to plank or deplank Note: only active when set_servos == 1 ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `bool` | Values: 0,1. 0: deplank 1: plank | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_plank'] l = len(state_arr)+self.ASCII_OFFSET state_arr = list((state_arr+self.ASCII_OFFSET).flatten()) msg = self.format_msg(bytearray([msg_code,l]+state_arr)) self.xb.command(msg, remote_device) def set_pose(self, posL, posR, remote_device = None): ''' ## Description --- Sets smarticle to specified servo positions ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | posL | `int` | Left servo angle: 0-180 deg | N/A | | posR | `int` | Right servo angle: 0-180 deg | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_pose'] msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+posL,self.ASCII_OFFSET+posR])) self.xb.command(msg, remote_device) def stream_pose(self, poses, remote_device=None): ''' ## Description --- Sets smarticle to specified servo positions. Differs from set_pose in that it sends angles over the streaming pipeline, which sends a batch message that can specify separate commands for each smarticle in the same message. Specify id as zero to broadcast servo command to whole swarm. ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | poses | `np.array` | Nx3 array of servo commands. Each row specifies [id, angL, angR] | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['stream_pose'] l = len(poses)+self.ASCII_OFFSET poses = list((poses+self.ASCII_OFFSET).flatten()) msg = self.format_msg(bytearray([msg_code,l]+poses)) self.xb.command(msg,remote_device) def set_delay(self, state=-1, max_val=-1, remote_device = None): ''' ## Description --- Enables/disables random delay in stream servo mode for smarticles. Writing value of negative one (-1) leaves that field as is on the smarticle ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Values: 0,1 see note about negative values above | -1 | | max_val | `int` | Maximum value of random delay | -1 | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg=':SD:{},{}\n'.format(int(state),int(max_val)) self.xb.command(msg, remote_device) def set_pose_noise(self, max_val, remote_device = None): ''' ## Description --- Sets noise on servo positions (noisy arm angles) ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | max_val | `int` | Maximum value of random noise | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' assert max_val < 100, 'value must be less than 100' val=int(2*max_val) msg_code = self.msg_code_dict['set_pose_noise'] msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+val])) self.xb.command(msg, remote_device) def set_sync_noise(self, max_val, remote_device = None): ''' ## Description --- Sets noise on synchronization time for gait interp mode ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | max_val | `int` | Maximum value of random delay | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' timer_counts = self.convert_to_2_chars(int(max_val/0.128)) msg_code = self.msg_code_dict['set_sync_noise'] msg = self.format_msg(bytearray([msg_code]+timer_counts)) self.xb.command(msg, remote_device) def gait_init(self, gait, delay_ms, gait_num=0, remote_device = None): ''' ## Description --- Sends gait interpolation data to remote smarticles including: 1. left and right servo interpolation points (max 15 points each) 2. data length 3. period between interpolation points ## Gait --- Lists of two lists: gaitLpoints and gaitRpoints. These lists must be equal in length and contain at most 15 points ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | gait | list of lists of int | [gaitLpoints, gaitRpoints] | N/A | | period_ms | `int` | period (ms) between gait points | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = bytearray([self.msg_code_dict['init_gait']]) self.delay_ms = delay_ms self.gait_len = len(gait[0]) timer_counts = int(delay_ms/0.128) gaitL=gait[0] gaitR=gait[1] gaitL = [int(x+self.ASCII_OFFSET) for x in gaitL] gaitR = [int(x+self.ASCII_OFFSET) for x in gaitR] assert len(gaitL)==len(gaitR),'Gait lists must be same length' gait_points = self.gait_len+self.ASCII_OFFSET n = gait_num +self.ASCII_OFFSET delay = self.convert_to_2_chars(timer_counts) msg= self.format_msg(msg_code+bytearray([n, gait_points]+delay+gaitL+gaitR)) self.xb.command(msg, remote_device) time.sleep(0.1) #ensure messages are not dropped as buffer isn't implemented yet def select_gait(self, n, remote_device = None): ''' ## Description --- Selects gait number of smarticle to execute ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | n | `int` | gait number for smarticle to execute | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['select_gait'] msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+n])) self.xb.command(msg, remote_device) def sync_thread_target(self,sync_period_s, keep_time): ''' ## Description --- Thread to keep gaits in sync. Not used directly by user but called in `init_sync_thread` ''' time_adjust_s=sync_period_s-0.0357 #subtract 35ms based on results from timing experiments msg = bytearray(b'\x11') #threading.event.wait() blocks until it is a) set and then returns True or b) the specified timeout elapses in which it retrusn nothing while self.sync_flag.wait() and not self.timer_counts.wait(timeout=(time_adjust_s)): self.xb.broadcast(msg) if keep_time: t = time.time() with self.lock: self.sync_time_list.append(t) def init_sync_thread(self, keep_time=False): ''' ## Description --- Initializes gait sync thread. Must be called every time the gait sequence is updated ''' # calculate sync period: approximately 3s but must be a multiple of the gait delay self.sync_period_s = (self.gait_len*self.delay_ms)/1000 print('sync_period: {}'.format(self.sync_period_s)) if keep_time: self.sync_time_list =[] self.sync_thread = threading.Thread(target=self.sync_thread_target, args= (self.sync_period_s,keep_time), daemon = True) self.timer_counts = threading.Event() self.sync_flag = threading.Event() self.sync_thread.start() def start_sync(self): ''' ## Description --- starts gait sequence and sync thread ## Returns --- `None` ''' delay_t = self.delay_ms/3000 #starts gait sequence self.set_servos(1) #wait 1/3 of gait delay to begin sync sequene time.sleep(delay_t) #set sync flag so that it returns True and stops blocking self.sync_flag.set() def stop_sync(self): ''' ## Description --- stops gait sequence and pauses gait sync thread ## Returns --- `None` ''' #stop gait sequence self.set_servos(0) #cause sync_flag.wait() to block self.sync_flag.clear()
Methods
def build_network(self, exp_n_smarticles=None)
-
Description
Clears
devices
dictionary as well as all devices on network. Discovers remote devices on network, initializes dictionary of all connected devices. Will ask for retries if expected no of smarticles is not discovered modified from Digi XBee example DiscoverDevicesSample.pyArguments
Argument Type Description Default Value exp_n_smarticles int
Expected number of smarticles to discover None Returns
None
Expand source code
def build_network(self, exp_n_smarticles=None): ''' ## Description --- Clears `devices` dictionary as well as all devices on network. Discovers remote devices on network, initializes dictionary of all connected devices. Will ask for retries if expected no of smarticles is not discovered modified from Digi XBee example DiscoverDevicesSample.py ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | exp_n_smarticles | `int` | Expected number of smarticles to discover | None | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' self.xb.discover() if exp_n_smarticles != None: if (len(self.xb.devices))<exp_n_smarticles: inp= input('Only discovered {} out of {} expected Smarticles. Retry discovery (Y/N)\n'.format(len(self.xb.devices),exp_n_smarticles)) if inp[0].upper()=='Y': self.build_network(exp_n_smarticles) else: #purge Smarticle Xbee buffer time.sleep(0.5) self.xb.broadcast('\n') time.sleep(0.5) print('Network Discovery Ended\n') else: #purge Smarticle Xbee buffer print('Successfully discovered {} out of {} expected Smarticles\n'.format(len(self.xb.devices),exp_n_smarticles)) time.sleep(0.5) self.xb.broadcast('\n') time.sleep(0.5) print('Network Discovery Ended\n') else: #purge Smarticle Xbee buffer time.sleep(0.5) self.xb.broadcast('\n') time.sleep(0.5) print('Network Discovery Ended\n')
def close(self)
-
Description
closes serial connection to XBee
Returns
None
Expand source code
def close(self): ''' ## Description --- closes serial connection to XBee ## Returns --- `None` ''' self.xb.close_base()
def gait_init(self, gait, delay_ms, gait_num=0, remote_device=None)
-
Description
Sends gait interpolation data to remote smarticles including: 1. left and right servo interpolation points (max 15 points each) 2. data length 3. period between interpolation points
Gait
Lists of two lists: gaitLpoints and gaitRpoints. These lists must be equal in length and contain at most 15 points
Arguments
Argument Type Description Default Value gait list of lists of int [gaitLpoints, gaitRpoints] N/A period_ms int
period (ms) between gait points N/A remote_device – see class description None
Returns
None
Expand source code
def gait_init(self, gait, delay_ms, gait_num=0, remote_device = None): ''' ## Description --- Sends gait interpolation data to remote smarticles including: 1. left and right servo interpolation points (max 15 points each) 2. data length 3. period between interpolation points ## Gait --- Lists of two lists: gaitLpoints and gaitRpoints. These lists must be equal in length and contain at most 15 points ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | gait | list of lists of int | [gaitLpoints, gaitRpoints] | N/A | | period_ms | `int` | period (ms) between gait points | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = bytearray([self.msg_code_dict['init_gait']]) self.delay_ms = delay_ms self.gait_len = len(gait[0]) timer_counts = int(delay_ms/0.128) gaitL=gait[0] gaitR=gait[1] gaitL = [int(x+self.ASCII_OFFSET) for x in gaitL] gaitR = [int(x+self.ASCII_OFFSET) for x in gaitR] assert len(gaitL)==len(gaitR),'Gait lists must be same length' gait_points = self.gait_len+self.ASCII_OFFSET n = gait_num +self.ASCII_OFFSET delay = self.convert_to_2_chars(timer_counts) msg= self.format_msg(msg_code+bytearray([n, gait_points]+delay+gaitL+gaitR)) self.xb.command(msg, remote_device) time.sleep(0.1) #ensure messages are not dropped as buffer isn't implemented yet
def init_sync_thread(self, keep_time=False)
-
Description
Initializes gait sync thread. Must be called every time the gait sequence is updated
Expand source code
def init_sync_thread(self, keep_time=False): ''' ## Description --- Initializes gait sync thread. Must be called every time the gait sequence is updated ''' # calculate sync period: approximately 3s but must be a multiple of the gait delay self.sync_period_s = (self.gait_len*self.delay_ms)/1000 print('sync_period: {}'.format(self.sync_period_s)) if keep_time: self.sync_time_list =[] self.sync_thread = threading.Thread(target=self.sync_thread_target, args= (self.sync_period_s,keep_time), daemon = True) self.timer_counts = threading.Event() self.sync_flag = threading.Event() self.sync_thread.start()
def select_gait(self, n, remote_device=None)
-
Description
Selects gait number of smarticle to execute
Arguments
Argument Type Description Default Value n int
gait number for smarticle to execute N/A remote_device – see class description None
Returns
None
Expand source code
def select_gait(self, n, remote_device = None): ''' ## Description --- Selects gait number of smarticle to execute ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | n | `int` | gait number for smarticle to execute | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['select_gait'] msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+n])) self.xb.command(msg, remote_device)
def send_ids(self, asynch=False)
-
Description
Sends IDs to smarticles
Arguments
Argument Type Description Default Value asynch bool
Determines whether to send asynchronously (without ack) False Returns
None
Expand source code
def send_ids(self,asynch=False): ''' ## Description --- Sends IDs to smarticles ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | asynch | `bool` | Determines whether to send asynchronously (without ack) | False | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_id'] for id in self.xb.devices.keys(): msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+id])) self.xb.send(self.xb.devices[id],msg,asynch=False)
def set_debug(self, state, remote_device=None)
-
Description
Sets debug level of smarticle
Arguments
Argument Type Description Default Value debug int
Value: 0,1,2. Sets level of debug output. 0 is no output, 1 is limited output, 2 is full output N/A remote_device – see class description None
Returns
None
Expand source code
def set_debug(self, state, remote_device=None): ''' ## Description --- Sets debug level of smarticle ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | debug | `int` | Value: 0,1,2. Sets level of debug output. 0 is no output, 1 is limited output, 2 is full output | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_debug'] if state not in [0,1,2]: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device)
def set_delay(self, state=-1, max_val=-1, remote_device=None)
-
Description
Enables/disables random delay in stream servo mode for smarticles. Writing value of negative one (-1) leaves that field as is on the smarticle
Arguments
Argument Type Description Default Value state int
Values: 0,1 see note about negative values above -1 max_val int
Maximum value of random delay -1 remote_device – see class description None
Returns
None
Expand source code
def set_delay(self, state=-1, max_val=-1, remote_device = None): ''' ## Description --- Enables/disables random delay in stream servo mode for smarticles. Writing value of negative one (-1) leaves that field as is on the smarticle ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Values: 0,1 see note about negative values above | -1 | | max_val | `int` | Maximum value of random delay | -1 | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg=':SD:{},{}\n'.format(int(state),int(max_val)) self.xb.command(msg, remote_device)
def set_light_plank(self, state, remote_device=None)
-
Description
Enables/Disables light plank (i.e. smarticles planking when light sensors are above set threshold)
Arguments
Argument Type Description Default Value state bool
Value: 1 or 0. enables/disables light planking N/A remote_device – see class description None
Returns
None
Expand source code
def set_light_plank(self, state, remote_device = None): ''' ## Description --- Enables/Disables light plank (i.e. smarticles planking when light sensors are above set threshold) ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `bool` | Value: 1 or 0. enables/disables light planking | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['toggle_light_plank'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device)
def set_mode(self, state, remote_device=None)
-
Description
Changes operating mode of smarticle(s)
Values
Value Mode Description 0 Idle Smarticle does nothing. Servos detach and no data is transmitted 1 Stream servos Servo points streamed (still in development) 2 Gait interpolate Iterate through interpolated points sent to smarticle Arguments
Argument Type Description Default Value state int
Values: 0-2. 0: inactive, 1: stream servo (unfinished), 2:gait interpolate N/A remote_device – see class description None
Returns
None
Expand source code
def set_mode(self, state, remote_device = None): ''' ## Description --- Changes operating mode of smarticle(s) ## Values --- | Value | Mode | Description | | :---: | :--: | :---------: | | 0 | Idle | Smarticle does nothing. Servos detach and no data is transmitted | | 1 | Stream servos | Servo points streamed (still in development) | | 2 | Gait interpolate | Iterate through interpolated points sent to smarticle | |<img width=250/>|<img width=400/>|<img width=1000/>| ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Values: 0-2. 0: inactive, 1: stream servo (unfinished), 2:gait interpolate | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' assert (state>=0 and state<=2),"Mode must between 0-2" msg_code = self.msg_code_dict['set_mode'] msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device)
def set_plank(self, state_arr, remote_device=None)
-
Description
Sets smarticle to plank or deplank Note: only active when set_servos == 1
Arguments
Argument Type Description Default Value state bool
Values: 0,1. 0: deplank 1: plank N/A remote_device – see class description None
Returns
None
Expand source code
def set_plank(self, state_arr, remote_device = None): ''' ## Description --- Sets smarticle to plank or deplank Note: only active when set_servos == 1 ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `bool` | Values: 0,1. 0: deplank 1: plank | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_plank'] l = len(state_arr)+self.ASCII_OFFSET state_arr = list((state_arr+self.ASCII_OFFSET).flatten()) msg = self.format_msg(bytearray([msg_code,l]+state_arr)) self.xb.command(msg, remote_device)
def set_pose(self, posL, posR, remote_device=None)
-
Description
Sets smarticle to specified servo positions
Arguments
Argument Type Description Default Value posL int
Left servo angle: 0-180 deg N/A posR int
Right servo angle: 0-180 deg N/A remote_device – see class description None
Returns
None
Expand source code
def set_pose(self, posL, posR, remote_device = None): ''' ## Description --- Sets smarticle to specified servo positions ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | posL | `int` | Left servo angle: 0-180 deg | N/A | | posR | `int` | Right servo angle: 0-180 deg | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_pose'] msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+posL,self.ASCII_OFFSET+posR])) self.xb.command(msg, remote_device)
def set_pose_epsilon(self, eps, remote_device=None)
-
Description
Sets epsilon value for smarticle poses. The smarticle will move the arm randomly propoportional to epsilon (0-1). For example, if epsilon is 0.1, the smarticle will move its arm to a random angle 10% of the time
Arguments
Argument Type Description Default Value eps 'float' Value: 0-1, determines proportion of random arm movements N/A remote_device – see class description None
Returns
None
Expand source code
def set_pose_epsilon(self, eps, remote_device = None): ''' ## Description --- Sets epsilon value for smarticle poses. The smarticle will move the arm randomly propoportional to epsilon (0-1). For example, if epsilon is 0.1, the smarticle will move its arm to a random angle 10% of the time ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | eps | 'float' | Value: 0-1, determines proportion of random arm movements | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' # ensure eps is between 0 and 1 eps = 100*round(np.clip(eps,0,1),2) msg_code = self.msg_code_dict['toggle_light_plank'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+eps])) self.xb.command(msg, remote_device)
def set_pose_noise(self, max_val, remote_device=None)
-
Description
Sets noise on servo positions (noisy arm angles)
Arguments
Argument Type Description Default Value max_val int
Maximum value of random noise N/A remote_device – see class description None
Returns
None
Expand source code
def set_pose_noise(self, max_val, remote_device = None): ''' ## Description --- Sets noise on servo positions (noisy arm angles) ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | max_val | `int` | Maximum value of random noise | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' assert max_val < 100, 'value must be less than 100' val=int(2*max_val) msg_code = self.msg_code_dict['set_pose_noise'] msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+val])) self.xb.command(msg, remote_device)
def set_read_sensors(self, state, remote_device=None)
-
Description
Enables/disables smarticle reading sensors
Arguments
Argument Type Description Default Value state int
Value: 1 or 0. enables/disables reading sensors N/A remote_device – see class description None
Returns
None
Expand source code
def set_read_sensors(self, state, remote_device = None): ''' ## Description --- Enables/disables smarticle reading sensors ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Value: 1 or 0. enables/disables reading sensors | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['toggle_read_sensors'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device)
def set_sensor_threshold(self, thresh, remote_device=None)
-
Description
Sets sensor threshold that triggers planking
Arguments
Argument Type Description Default Value thresh list of int
Threshold values for PR_front, PR_back, PR_right, current sense N/A remote_device – see class description None
Returns
None
Expand source code
def set_sensor_threshold(self, thresh, remote_device = None): ''' ## Description --- Sets sensor threshold that triggers planking ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | thresh | `list of int` | Threshold values for PR_front, PR_back, PR_right, current sense | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' assert len(thresh)==4, 'Threshold list must be 4 elements' msg_code = self.msg_code_dict['set_light_plank_threshold'] val = bytearray() for t in thresh: val+= self.convert_to_2_chars(t) msg = self.format_msg(bytearray([msg_code]+val)) self.xb.command(msg, remote_device)
def set_servos(self, state, remote_device=None)
-
Description
Enables/disables updating of servos. When enabled it also resets the gait interpolation sequence back to its first initial point When set_servos ==0, servos can still be controlled using set_pose()
Arguments
Argument Type Description Default Value state int
Value: 1 or 0. enables/disables updating servos in timer interrupt N/A remote_device – see class description None
Returns
None
Expand source code
def set_servos(self, state, remote_device = None): ''' ## Description --- Enables/disables updating of servos. When enabled it also resets the gait interpolation sequence back to its first initial point When set_servos ==0, servos can still be controlled using set_pose() ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Value: 1 or 0. enables/disables updating servos in timer interrupt | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['toggle_t4_interrupt'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device)
def set_sync_noise(self, max_val, remote_device=None)
-
Description
Sets noise on synchronization time for gait interp mode
Arguments
Argument Type Description Default Value max_val int
Maximum value of random delay N/A remote_device – see class description None
Returns
None
Expand source code
def set_sync_noise(self, max_val, remote_device = None): ''' ## Description --- Sets noise on synchronization time for gait interp mode ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | max_val | `int` | Maximum value of random delay | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' timer_counts = self.convert_to_2_chars(int(max_val/0.128)) msg_code = self.msg_code_dict['set_sync_noise'] msg = self.format_msg(bytearray([msg_code]+timer_counts)) self.xb.command(msg, remote_device)
def set_transmit(self, state, remote_device=None)
-
Description
Enables/disables smarticle transmitting data. Data sent in following format: '{int Photo_front}, {int photo_back}, {int photo_right} {int current_sense}'(values between 0-1023)
Arguments
Argument Type Description Default Value state int
Value: 1 or 0. enables/disables transmitting Data N/A remote_device – see class description None
Returns
None
Expand source code
def set_transmit(self, state, remote_device = None): ''' ## Description --- Enables/disables smarticle transmitting data. Data sent in following format: '{int Photo_front}, {int photo_back}, {int photo_right} {int current_sense}'(values between 0-1023) ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | state | `int` | Value: 1 or 0. enables/disables transmitting Data | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['toggle_transmit'] if state != 1: state = 0 msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+state])) self.xb.command(msg, remote_device)
def set_transmit_period(self, period_ms, remote_device=None)
-
Description
Sets approximate data transmit period of smarticles
Arguments
Argument Type Description Default Value period_ms int
transmit period in ms. Max value of 2000s N/A remote_device – see class description None
Returns
None
Expand source code
def set_transmit_period(self, period_ms, remote_device=None): ''' ## Description --- Sets approximate data transmit period of smarticles ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | period_ms | `int` | transmit period in ms. Max value of 2000s | N/A | | remote_device | --| see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['set_transmit_counts'] counts = period_ms//self.SAMPLE_TIME_MS counts = np.clip(counts, 1, 200) msg = self.format_msg(bytearray([msg_code,self.ASCII_OFFSET+counts])) self.xb.command(msg, remote_device)
def start_sync(self)
-
Description
starts gait sequence and sync thread
Returns
None
Expand source code
def start_sync(self): ''' ## Description --- starts gait sequence and sync thread ## Returns --- `None` ''' delay_t = self.delay_ms/3000 #starts gait sequence self.set_servos(1) #wait 1/3 of gait delay to begin sync sequene time.sleep(delay_t) #set sync flag so that it returns True and stops blocking self.sync_flag.set()
def stop_sync(self)
-
Description
stops gait sequence and pauses gait sync thread
Returns
None
Expand source code
def stop_sync(self): ''' ## Description --- stops gait sequence and pauses gait sync thread ## Returns --- `None` ''' #stop gait sequence self.set_servos(0) #cause sync_flag.wait() to block self.sync_flag.clear()
def stream_pose(self, poses, remote_device=None)
-
Description
Sets smarticle to specified servo positions. Differs from set_pose in that it sends angles over the streaming pipeline, which sends a batch message that can specify separate commands for each smarticle in the same message. Specify id as zero to broadcast servo command to whole swarm.
Arguments
Argument Type Description Default Value poses np.array
Nx3 array of servo commands. Each row specifies [id, angL, angR] N/A remote_device – see class description None
Returns
None
Expand source code
def stream_pose(self, poses, remote_device=None): ''' ## Description --- Sets smarticle to specified servo positions. Differs from set_pose in that it sends angles over the streaming pipeline, which sends a batch message that can specify separate commands for each smarticle in the same message. Specify id as zero to broadcast servo command to whole swarm. ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | poses | `np.array` | Nx3 array of servo commands. Each row specifies [id, angL, angR] | N/A | | remote_device | -- | see class description | `None` | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' msg_code = self.msg_code_dict['stream_pose'] l = len(poses)+self.ASCII_OFFSET poses = list((poses+self.ASCII_OFFSET).flatten()) msg = self.format_msg(bytearray([msg_code,l]+poses)) self.xb.command(msg,remote_device)
def sync_thread_target(self, sync_period_s, keep_time)
-
Description
Thread to keep gaits in sync. Not used directly by user but called in
init_sync_thread
Expand source code
def sync_thread_target(self,sync_period_s, keep_time): ''' ## Description --- Thread to keep gaits in sync. Not used directly by user but called in `init_sync_thread` ''' time_adjust_s=sync_period_s-0.0357 #subtract 35ms based on results from timing experiments msg = bytearray(b'\x11') #threading.event.wait() blocks until it is a) set and then returns True or b) the specified timeout elapses in which it retrusn nothing while self.sync_flag.wait() and not self.timer_counts.wait(timeout=(time_adjust_s)): self.xb.broadcast(msg) if keep_time: t = time.time() with self.lock: self.sync_time_list.append(t)
class XbeeComm (port='/dev/tty.usbserial-DN050I6Q', baud_rate=9600, debug=0)
-
'
Description
Class for simplified XBee communication with remote Smarticles Built on top of Digi XBee python library: https://github.com/digidotcom/xbee-python
The Constructor initalizes and opens local base xbee (connected via USB) with given port and baud rate and adds it to attribute
base
Arguments
Argument Type Description Default Value port string
USB port to open for local XBee set for your own convenience baud_rate int
Baud rate to use for USB serial port 9600 debug int
Enables/disables print statements in class 0 Expand source code
class XbeeComm(object): '''' ## Description --- Class for simplified XBee communication with remote Smarticles Built on top of Digi XBee python library: https://github.com/digidotcom/xbee-python <br/> The Constructor initalizes and opens local base xbee (connected via USB) with given port and baud rate and adds it to attribute `base`''' def __init__(self, port='/dev/tty.usbserial-DN050I6Q', baud_rate = 9600, debug = 0): ''' ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | port | `string` | USB port to open for local XBee | set for your own convenience | | baud_rate | `int` | Baud rate to use for USB serial port | 9600 | | debug | `int` | Enables/disables print statements in class | 0 | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ''' self.base = Raw802Device(port, baud_rate) self.debug = debug self.open_base() self.callbacks_added = False self.ascii_offset = 32 def open_base(self): ''' ## Description --- Opens local xbee defined by attribute 'base' which is set in `__init__` ## Arguments --- None ## Returns --- This function returns a 1 if it successfully opens the local port and a 0 otherwise. ''' success = 0 self.base.open() if self.base is not None and self.base.is_open(): success = 1 elif self.debug: print("Failed to open device") return success def close_base(self): ''' ## Description --- Closes local xbee defined by attribute 'base' which is set in `__init__` ## Arguments --- None ## Returns --- `None` ''' if self.base is not None and self.base.is_open(): self.base.close() def add_callbacks(self): ''' ## Description --- adds callback functions for discovery processess ''' # Callback for discovered devices. def callback_device_discovered(remote): print("Device discovered: %s" % remote) self.add_remote(remote) # Callback for discovery finished. def callback_discovery_finished(status): if status == NetworkDiscoveryStatus.SUCCESS: # print("Discovery process finished successfully.\nDevices: {}".format(self.devices)) print("Discovery cycle finished\n") else: print("There was an error discovering devices: %s" % status.description) self.network.add_device_discovered_callback(callback_device_discovered) self.network.add_discovery_process_finished_callback(callback_discovery_finished) def add_remote(self,remote_device): ''' ## Description --- takes in RemoteXbee Object and creates an attribute for it based on its NodeID and adds it to the device dictionary ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | remote_device | `RemoteXbeeDevice` Object | Stores info such as ID and address; see Digi Xbee Documentation for more info | N/A | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` More info on RemoteXbeeDevice: https://xbplib.readthedocs.io/en/stable/api/digi.xbee.devices.html#digi.xbee.devices.RemoteXBeeDevice ''' smarticle_number = int(''.join([s for s in remote_device.get_node_id() if s.isdigit()])) setattr(self,remote_device.get_node_id(),remote_device) self.devices[smarticle_number]=remote_device def discover(self): ''' ## Description --- Clears `devices` dictionary as well as all devices on network. Discovers remote devices on network, initializes dictionary of all connected devices. modified from Digi XBee example DiscoverDevicesSample.py ## Arguments --- None ## Returns --- `None` ''' self.devices ={} self.network = self.base.get_network() self.network.clear() self.network.set_discovery_timeout(15) # 15 seconds. if self.callbacks_added == False: self.add_callbacks() self.callbacks_added = True self.network.start_discovery_process() print("Discovering remote XBee devices...") while self.network.is_discovery_running(): time.sleep(0.1) def send(self, remote_device, msg, asynch = False): ''' ## Description --- Sends message to remote xbee and receives acknowledgement on sucess. modified from digi's example SendDataSample.py ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | remote_device | `RemoteXbeeDevice` Object | Stores info such as ID and address; see Digi Xbee Documentation for more info | N/A | | msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A | | asynch | `bool` | Determines whether to send asynchronously (without ack) or not | False | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` More info on RemoteXbeeDevice: https://xbplib.readthedocs.io/en/stable/api/digi.xbee.devices.html#digi.xbee.devices.RemoteXBeeDevice ''' if remote_device is None: if self.debug(): print("Could not find the remote device") exit(1) if self.debug: print("Sending data to {} >> {}...".format(remote_device.get_node_id(), msg)) if asynch is True: self.base.send_data_async(remote_device, msg) else: self.base.send_data(remote_device, msg) if self.debug: print("Success") def broadcast(self, msg): ''' ## Description --- Broadcasts to all xbees on network. NOTE: there are no acknowledgements when using broadcast ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' self.base.send_data_broadcast(msg) def ack_broadcast(self,msg): ''' ## Description --- Broadcasts to all xbees on network by sending message individually to each remote xbee in `devices` dictionary. This broadcast includes acknowledgements. ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' for remote_dev in list(self.devices.values()): self.send(remote_dev,msg) def command(self, msg, remote_device = None, asynch = False): ''' ## Description --- Sends message to remote Xbee in one of three ways depending on the `remote_device` argument. see `SmarticleSwarm` class description ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A | | remote_device | -- | see class description | `None` | | asynch | `bool` | Determines whether to send asynchronously (without ack) or not | False | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' if remote_device == None: self.broadcast(msg) elif (isinstance(remote_device,bool) and remote_device==True): self.ack_broadcast(msg) else: assert remote_device in self.devices.values(),"Remote Device not found in active devices" self.send(remote_device,msg, asynch) def add_rx_callback(self, callback_fun): ''' ## Description --- Adds a data received callback function that is called everytime a message is received ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | callback_fun | function | Function that takes 'XbeeMessage' as input | N/A |` |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' self.base.add_data_received_callback(callback_fun)
Methods
def ack_broadcast(self, msg)
-
Description
Broadcasts to all xbees on network by sending message individually to each remote xbee in
devices
dictionary. This broadcast includes acknowledgements.Arguments
Argument Type Description Default Value msg string
orbytearray
Message to send to XBee. Maximum of 108 bytes N/A Returns
None
Expand source code
def ack_broadcast(self,msg): ''' ## Description --- Broadcasts to all xbees on network by sending message individually to each remote xbee in `devices` dictionary. This broadcast includes acknowledgements. ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' for remote_dev in list(self.devices.values()): self.send(remote_dev,msg)
def add_callbacks(self)
-
Description
adds callback functions for discovery processess
Expand source code
def add_callbacks(self): ''' ## Description --- adds callback functions for discovery processess ''' # Callback for discovered devices. def callback_device_discovered(remote): print("Device discovered: %s" % remote) self.add_remote(remote) # Callback for discovery finished. def callback_discovery_finished(status): if status == NetworkDiscoveryStatus.SUCCESS: # print("Discovery process finished successfully.\nDevices: {}".format(self.devices)) print("Discovery cycle finished\n") else: print("There was an error discovering devices: %s" % status.description) self.network.add_device_discovered_callback(callback_device_discovered) self.network.add_discovery_process_finished_callback(callback_discovery_finished)
def add_remote(self, remote_device)
-
Description
takes in RemoteXbee Object and creates an attribute for it based on its NodeID and adds it to the device dictionary
Arguments
Argument Type Description Default Value remote_device RemoteXbeeDevice
ObjectStores info such as ID and address; see Digi Xbee Documentation for more info N/A Returns
None
More info on RemoteXbeeDevice: https://xbplib.readthedocs.io/en/stable/api/digi.xbee.devices.html#digi.xbee.devices.RemoteXBeeDevice
Expand source code
def add_remote(self,remote_device): ''' ## Description --- takes in RemoteXbee Object and creates an attribute for it based on its NodeID and adds it to the device dictionary ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | remote_device | `RemoteXbeeDevice` Object | Stores info such as ID and address; see Digi Xbee Documentation for more info | N/A | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` More info on RemoteXbeeDevice: https://xbplib.readthedocs.io/en/stable/api/digi.xbee.devices.html#digi.xbee.devices.RemoteXBeeDevice ''' smarticle_number = int(''.join([s for s in remote_device.get_node_id() if s.isdigit()])) setattr(self,remote_device.get_node_id(),remote_device) self.devices[smarticle_number]=remote_device
def add_rx_callback(self, callback_fun)
-
Description
Adds a data received callback function that is called everytime a message is received
Arguments
Argument Type Description Default Value callback_fun function Function that takes 'XbeeMessage' as input N/A Returns
None
Expand source code
def add_rx_callback(self, callback_fun): ''' ## Description --- Adds a data received callback function that is called everytime a message is received ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | callback_fun | function | Function that takes 'XbeeMessage' as input | N/A |` |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' self.base.add_data_received_callback(callback_fun)
def broadcast(self, msg)
-
Description
Broadcasts to all xbees on network. NOTE: there are no acknowledgements when using broadcast
Arguments
Argument Type Description Default Value msg string
orbytearray
Message to send to XBee. Maximum of 108 bytes N/A Returns
None
Expand source code
def broadcast(self, msg): ''' ## Description --- Broadcasts to all xbees on network. NOTE: there are no acknowledgements when using broadcast ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' self.base.send_data_broadcast(msg)
def close_base(self)
-
Description
Closes local xbee defined by attribute 'base' which is set in
__init__
Arguments
None
Returns
None
Expand source code
def close_base(self): ''' ## Description --- Closes local xbee defined by attribute 'base' which is set in `__init__` ## Arguments --- None ## Returns --- `None` ''' if self.base is not None and self.base.is_open(): self.base.close()
def command(self, msg, remote_device=None, asynch=False)
-
Description
Sends message to remote Xbee in one of three ways depending on the
remote_device
argument. seeSmarticleSwarm
class descriptionArguments
Argument Type Description Default Value msg string
orbytearray
Message to send to XBee. Maximum of 108 bytes N/A remote_device – see class description None
asynch bool
Determines whether to send asynchronously (without ack) or not False Returns
None
Expand source code
def command(self, msg, remote_device = None, asynch = False): ''' ## Description --- Sends message to remote Xbee in one of three ways depending on the `remote_device` argument. see `SmarticleSwarm` class description ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A | | remote_device | -- | see class description | `None` | | asynch | `bool` | Determines whether to send asynchronously (without ack) or not | False | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` ''' if remote_device == None: self.broadcast(msg) elif (isinstance(remote_device,bool) and remote_device==True): self.ack_broadcast(msg) else: assert remote_device in self.devices.values(),"Remote Device not found in active devices" self.send(remote_device,msg, asynch)
def discover(self)
-
Description
Clears
devices
dictionary as well as all devices on network. Discovers remote devices on network, initializes dictionary of all connected devices. modified from Digi XBee example DiscoverDevicesSample.pyArguments
None
Returns
None
Expand source code
def discover(self): ''' ## Description --- Clears `devices` dictionary as well as all devices on network. Discovers remote devices on network, initializes dictionary of all connected devices. modified from Digi XBee example DiscoverDevicesSample.py ## Arguments --- None ## Returns --- `None` ''' self.devices ={} self.network = self.base.get_network() self.network.clear() self.network.set_discovery_timeout(15) # 15 seconds. if self.callbacks_added == False: self.add_callbacks() self.callbacks_added = True self.network.start_discovery_process() print("Discovering remote XBee devices...") while self.network.is_discovery_running(): time.sleep(0.1)
def open_base(self)
-
Description
Opens local xbee defined by attribute 'base' which is set in
__init__
Arguments
None
Returns
This function returns a 1 if it successfully opens the local port and a 0 otherwise.
Expand source code
def open_base(self): ''' ## Description --- Opens local xbee defined by attribute 'base' which is set in `__init__` ## Arguments --- None ## Returns --- This function returns a 1 if it successfully opens the local port and a 0 otherwise. ''' success = 0 self.base.open() if self.base is not None and self.base.is_open(): success = 1 elif self.debug: print("Failed to open device") return success
def send(self, remote_device, msg, asynch=False)
-
Description
Sends message to remote xbee and receives acknowledgement on sucess. modified from digi's example SendDataSample.py
Arguments
Argument Type Description Default Value remote_device RemoteXbeeDevice
ObjectStores info such as ID and address; see Digi Xbee Documentation for more info N/A msg string
orbytearray
Message to send to XBee. Maximum of 108 bytes N/A asynch bool
Determines whether to send asynchronously (without ack) or not False Returns
None
More info on RemoteXbeeDevice: https://xbplib.readthedocs.io/en/stable/api/digi.xbee.devices.html#digi.xbee.devices.RemoteXBeeDevice
Expand source code
def send(self, remote_device, msg, asynch = False): ''' ## Description --- Sends message to remote xbee and receives acknowledgement on sucess. modified from digi's example SendDataSample.py ## Arguments --- | Argument | Type | Description | Default Value | | :------: | :--: | :---------: | :-----------: | | remote_device | `RemoteXbeeDevice` Object | Stores info such as ID and address; see Digi Xbee Documentation for more info | N/A | | msg | `string` or `bytearray` | Message to send to XBee. Maximum of 108 bytes | N/A | | asynch | `bool` | Determines whether to send asynchronously (without ack) or not | False | |<img width=400/>|<img width=250/>|<img width=1000/>|<img width=550/>| ## Returns --- `None` More info on RemoteXbeeDevice: https://xbplib.readthedocs.io/en/stable/api/digi.xbee.devices.html#digi.xbee.devices.RemoteXBeeDevice ''' if remote_device is None: if self.debug(): print("Could not find the remote device") exit(1) if self.debug: print("Sending data to {} >> {}...".format(remote_device.get_node_id(), msg)) if asynch is True: self.base.send_data_async(remote_device, msg) else: self.base.send_data(remote_device, msg) if self.debug: print("Success")