Hello all,
I am trying to use the DynamicsAdaptation class from the abr_control repository with a NengoLoihi backend. Initially, all I’ve done is copy the file, import nengo_loihi, set defaults, and change nengo.Simulator to nengo_loihi.Simulator. When attempting to instantiate the class, I receive the following error.
File "/home/nfey/dev/wisard/src/abr_control/docs/examples/Mujoco/kg3_point_force_example.py", line 55, in <module>
spherical=True,
File "/home/nfey/dev/wisard/src/abr_control/abr_control/controllers/signals/loihi_dynamics_adaptation.py", line 193, in __init__
self.sim = nengo_loihi.Simulator(self.nengo_model, dt=0.001)
File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/simulator.py", line 139, in __init__
discretize=target != "simreal",
File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/builder.py", line 223, in build
built = model.builder.build(model, obj, *args, **kwargs)
File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo/builder/builder.py", line 239, in build
return cls.builders[obj_cls](model, obj, *args, **kwargs)
File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/network.py", line 31, in build_network
nengo_build_network(model, network, progress=None)
File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo/builder/network.py", line 94, in build_network
model.build(conn)
File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/builder.py", line 223, in build
built = model.builder.build(model, obj, *args, **kwargs)
File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo/builder/builder.py", line 239, in build
return cls.builders[obj_cls](model, obj, *args, **kwargs)
File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/connection.py", line 85, in build_connection
build_chip_to_host(model, conn)
File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/connection.py", line 291, in build_chip_to_host
"(got %r)" % type(conn.pre_obj).__name__
NotImplementedError: Learning rule presynaptic object must be an Ensemble (got 'Neurons')
Will someone please help me work around this? So far, I’ve tried to reference the nonlinear adaptive control example here for a solution, but I have not had much luck.
Here is the code for the class.
import nengo
import numpy as np
from abr_control._vendor.nengolib.stats import ScatteredHypersphere, spherical_transform
import nengo_loihi
nengo_loihi.set_defaults()
class LoihiDynamicsAdaptation:
"""An implementation of nonlinear dynamics adaptation using Nengo
Loihi, as described in (DeWolf, Stewart, Slotine, and Eliasmith, 2016).
The model learns to account for unknown / unmodelled external or
internal forces, using an efferent copy of the control signal to train.
Parameters
----------
n_input: int
the number of inputs going into the adaptive population
n_output: int
the number of outputs expected from the adaptive population
n_neurons: int, optional (Default: 1000)
number of neurons per adaptive population
n_ensembles: int, optional (Default: 1)
number of ensembles of n_neurons number of neurons
seed: int, optional (Default: None)
the seed used for random number generation
pes_learning_rate: float, optional (Default: 1e-6)
controls the speed of neural adaptation
intercepts: np.array, optional (Default: None)
the neural intercepts to be used by the neural ensembles
shape (n_ensembles, n_neurons)
encoders: np.array, optional (Default: None)
an (n_encoders, n_neurons, n_input) array of preferred directions
for all of the neurons in the adaptive ensembles
spherical: boolean, Optional (Default: False)
True to convert inputs to the surface of the hypersphere
False to scale from 0 to 1
means: np.array, optional (Default: None)
Subtracted from the input to the neural ensemble to center the data
for each dimension around zero
variances: np.array, optional (Default: None)
The input signal to the neural ensemble is divided by these values
after mean subtraction to put the data for each dimensions in the
range -1 to 1, or 0 to 1 if spherical=True
***NOTE*** The variances do not have to encompass the full range of
joint positions or velocities. Outliers can be left outside the scaling
range to get a better scaling of the majority of your expected inputs.
The outliers will still be scaled, but they will be outside the -1 to 1
range for non-spherical, or 0 to 1 range for spherical
"""
def __init__(
self,
n_input,
n_output,
n_neurons=1000,
n_ensembles=1,
seed=None,
pes_learning_rate=1e-6,
intercepts=None,
weights=None,
encoders=None,
spherical=False,
means=None,
variances=None,
tau_input=0.012,
tau_training=0.012,
tau_output=0.2,
**kwargs,
):
self.n_neurons = n_neurons
self.n_ensembles = n_ensembles
if spherical:
n_input += 1
self.spherical = spherical
# if only one of means or variances is defined
# define the other to have no effect on the data
if means is not None and variances is None:
variances = np.ones(means.shape)
elif means is None and variances is not None:
means = np.zeros(variances.shape)
self.means = np.asarray(means)
self.variances = np.asarray(variances)
# synapse time constants
self.tau_input = 0.012 # on input connection
self.tau_training = 0.012 # on the training signal
self.tau_output = 0.2 # on the output from the adaptive ensemble
# NOTE: the time constant on the neural activity used in the learning
# connection is the default 0.005, and can be set by specifying the
# pre_synapse parameter inside the PES rule instantiation
self.seed = seed
self.pes_learning_rate = pes_learning_rate
if intercepts is None:
# Generates intercepts for a d-dimensional ensemble, such that, given a
# random uniform input (from the interior of the d-dimensional ball), the
# probability of a neuron firing has the probability density function given
# by rng.triangular(left, mode, right, size=n)
triangular = np.random.triangular(
left=0.35, mode=0.45, right=0.55, size=n_neurons * n_ensembles
)
intercepts = nengo.dists.CosineSimilarity(n_input + 2).ppf(1 - triangular)
intercepts = intercepts.reshape((n_ensembles, n_neurons))
if weights is None:
weights = np.zeros((self.n_ensembles, n_output, self.n_neurons))
print("Initializing connection weights to all zeros")
if encoders is None:
np.random.seed = self.seed
# if NengoLib is installed, use it to optimize encoder placement
try:
encoders_dist = ScatteredHypersphere(surface=True)
except ImportError:
encoders_dist = nengo.Default
print(
"NengoLib not installed, encoder placement will "
+ "be sub-optimal."
)
encoders = encoders_dist.sample(n_neurons * n_ensembles, n_input)
encoders = encoders.reshape(n_ensembles, n_neurons, n_input)
self.input_signal = np.zeros(n_input)
self.training_signal = np.zeros(n_output)
self.output = np.zeros(n_output)
self.nengo_model = nengo.Network(seed=seed)
# Set the default neuron type for the network
self.nengo_model.config[nengo.Ensemble].neuron_type = nengo.LIF()
with self.nengo_model:
def input_signals_func(t):
return self.input_signal
input_signals = nengo.Node(input_signals_func, size_out=n_input)
def training_signals_func(t):
return -self.training_signal
training_signals = nengo.Node(training_signals_func, size_out=n_output)
# make the adaptive population output accessible
def output_func(t, x):
self.output = np.copy(x)
output = nengo.Node(output_func, size_in=n_output, size_out=0)
self.adapt_ens = []
self.conn_learn = []
for ii in range(self.n_ensembles):
self.adapt_ens.append(
nengo.Ensemble(
n_neurons=self.n_neurons,
dimensions=n_input,
intercepts=intercepts[ii],
radius=np.sqrt(n_input),
encoders=encoders[ii],
**kwargs,
)
)
# hook up input signal to adaptive population to provide context
nengo.Connection(
input_signals,
self.adapt_ens[ii],
synapse=self.tau_input,
)
self.conn_learn.append(
nengo.Connection(
self.adapt_ens[ii].neurons,
output,
learning_rule_type=nengo.PES(pes_learning_rate),
transform=weights[ii],
synapse=self.tau_output,
)
)
# hook up the training signal to the learning rule
nengo.Connection(
training_signals,
self.conn_learn[ii].learning_rule,
synapse=self.tau_training,
)
nengo.rc.set("decoder_cache", "enabled", "False")
self.sim = nengo_loihi.Simulator(self.nengo_model, dt=0.001)
def generate(self, input_signal, training_signal):
"""Generates the control signal
Parameters
----------
input_signal : numpy.array
the current desired input signal, typical joint positions and
velocities in [rad] and [rad/sec] respectively
training_signal : numpy.array
the learning signal to drive adaptation
"""
# if means or variances was defined, self.means is not None
if self.means is not None:
input_signal = self.scale_inputs(input_signal)
# store local copies to feed in to the adaptive population
self.input_signal = input_signal
self.training_signal = training_signal
# run the simulation t generate the adaptive signal
self.sim.run(time_in_seconds=0.001, progress_bar=False)
return self.output
def scale_inputs(self, input_signal):
"""
Currently set to accept joint position and velocities as time
x dimension arrays, and returns them scaled based on the means and
variances set on instantiation
PARAMETERS
----------
input_signal : numpy.array
the current desired input signal, typical joint positions and
velocities in [rad] and [rad/sec] respectively
The reason we do a shift by the means is to try and center the majority
of our expected input values in our scaling range, so when we stretch
them by variances they encompass more of our input range.
"""
scaled_input = (input_signal - self.means) / self.variances
if self.spherical:
# put into the 0-1 range
scaled_input = scaled_input / 2 + 0.5
# project onto unit hypersphere in larger state space
scaled_input = scaled_input.flatten()
scaled_input = spherical_transform(
scaled_input.reshape(1, len(scaled_input))
)
return scaled_input
def get_weights(self):
"""Save the current weights to the specified test_name folder
Save weights for individual runs. A group of runs is
classified as a session. Multiple sessions can then be used
to average over a set of learned runs. If session or run are set to None
then the test_name location will be searched for the highest numbered
folder and file respectively
"""
return [
self.sim.signals[self.sim.model.sig[conn]["weights"]]
for conn in self.conn_learn
]
And here is the script where I am attempting to create an adaptive controller object.
"""
Move the kinova-gen3-7dof Mujoco arm to a target position
while experiencing an unexpected point force at its end
effector. The simulation ends after reaching the target for
1.5 simulated seconds, and the trajectory of the end-effector
is plotted in 3D.
"""
import sys
import traceback
import numpy as np
import glfw
from abr_control.controllers import OSC, Damping, signals
from abr_control.arms.mujoco_config import MujocoConfig as arm
from abr_control.interfaces.mujoco import Mujoco
from abr_control.utils import transformations
if len(sys.argv) > 1:
arm_model = sys.argv[1]
else:
arm_model = "kinova-gen3-7dof"
# initialize our robot config for the kinova-gen3-7dof
robot_config = arm(arm_model)
# create our Mujoco interface
interface = Mujoco(robot_config, dt=0.001)
interface.connect()
interface.send_target_angles(robot_config.START_ANGLES)
# damp the movements of the arm
damping = Damping(robot_config, kv=10)
# instantiate controller
ctrlr = OSC(
robot_config,
kp=200,
null_controllers=[damping],
vmax=[0.5, 0], # [m/s, rad/s]
# control (x, y, z) out of [x, y, z, alpha, beta, gamma]
ctrlr_dof=[True, True, True, False, False, False],
)
interface.viewer.adapt = True
# create our adaptive controller
adapt = signals.LoihiDynamicsAdaptation(
n_neurons=5000,
n_ensembles=7,
n_input=14, # we apply adaptation on the most heavily stressed joints
n_output=7,
pes_learning_rate=1e-3,#4.1e-4,
means=np.zeros(14),
variances=np.hstack([np.ones(7) * np.pi, np.ones(7) * 0.8727]),
spherical=True,
)
target_geom_id = interface.sim.model.geom_name2id("target")
green = [0, 0.9, 0, 0.5]
red = [0.9, 0, 0, 0.5]
# set up lists for tracking data
ee_track = []
target_track = []
q_track = []
dq_track = []
try:
# get the end-effector's initial position
feedback = interface.get_feedback()
start = robot_config.Tx("EE", feedback["q"])
# make the target offset from that start position
target_xyz = start + np.array([0, -0.2, -0.2])
interface.set_mocap_xyz(name="target", xyz=target_xyz)
# set external force on end effector
interface.set_external_force('EE',np.array([0,0,-10,0,0,0]))
count = 0.0
time = 0
score = 0
while 1:
if interface.viewer.exit or (time == 1500):
glfw.destroy_window(interface.viewer.window)
break
# get joint angle and velocity feedback
feedback = interface.get_feedback()
target = np.hstack(
[ interface.get_xyz("target"),
transformations.euler_from_quaternion(
interface.get_orientation("target"), "rxyz"
),
]
)
# calculate the control signal
u = ctrlr.generate(
q=feedback["q"],
dq=feedback["dq"],
target=target,
)
u_adapt = np.zeros(robot_config.N_JOINTS)
u_adapt[:7] = adapt.generate(
input_signal=np.hstack((feedback["q"][:7], feedback["dq"][:7])),
training_signal=np.array(ctrlr.training_signal[:7]),
)
if interface.viewer.adapt == True:
u += u_adapt
# send forces into Mujoco, step the sim forward
interface.send_forces(u)
# calculate end-effector position
ee_xyz = robot_config.Tx("EE", q=feedback["q"])
# track data
ee_track.append(np.copy(ee_xyz))
target_track.append(np.copy(target[:3]))
q_track.append(np.copy(feedback["q"]))
dq_track.append(np.copy(feedback["dq"]))
count += 1
error = np.linalg.norm(ee_xyz - target[:3])
if error < 0.02:
interface.sim.model.geom_rgba[target_geom_id] = green
time += 1
else:
interface.sim.model.geom_rgba[target_geom_id] = red
time = 0
except:
print(traceback.format_exc())
finally:
# stop and reset the Mujoco simulation
interface.disconnect()
ee_track = np.array(ee_track)
target_track = np.array(target_track)
q_track = np.asarray(q_track)
dq_track = np.asarray(dq_track)
if ee_track.shape[0] > 0:
# plot distance from target and 3D trajectory
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import axes3d # pylint: disable=W0611
fig = plt.figure(figsize=(8, 12))
ax1 = fig.add_subplot(211)
ax1.set_ylabel("Distance (m)")
ax1.set_xlabel("Time (ms)")
ax1.set_title("Distance to target")
ax1.plot(
np.sqrt(np.sum((np.array(target_track) - np.array(ee_track)) ** 2, axis=1))
)
ax2 = fig.add_subplot(212, projection="3d")
ax2.set_title("End-Effector Trajectory")
ax2.plot(ee_track[:, 0], ee_track[:, 1], ee_track[:, 2], label="ee_xyz")
ax2.scatter(target_xyz[0], target_xyz[1], target_xyz[2], label="target", c="r")
ax2.legend()
plt.show()
#plt.savefig('point_force_results.png')