Dynamics Adaptation using NengoLoihi Backend

Hello all,

I am trying to use the DynamicsAdaptation class from the abr_control repository with a NengoLoihi backend. Initially, all I’ve done is copy the file, import nengo_loihi, set defaults, and change nengo.Simulator to nengo_loihi.Simulator. When attempting to instantiate the class, I receive the following error.

  File "/home/nfey/dev/wisard/src/abr_control/docs/examples/Mujoco/kg3_point_force_example.py", line 55, in <module>
    spherical=True,
  File "/home/nfey/dev/wisard/src/abr_control/abr_control/controllers/signals/loihi_dynamics_adaptation.py", line 193, in __init__
    self.sim = nengo_loihi.Simulator(self.nengo_model, dt=0.001)
  File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/simulator.py", line 139, in __init__
    discretize=target != "simreal",
  File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/builder.py", line 223, in build
    built = model.builder.build(model, obj, *args, **kwargs)
  File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo/builder/builder.py", line 239, in build
    return cls.builders[obj_cls](model, obj, *args, **kwargs)
  File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/network.py", line 31, in build_network
    nengo_build_network(model, network, progress=None)
  File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo/builder/network.py", line 94, in build_network
    model.build(conn)
  File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/builder.py", line 223, in build
    built = model.builder.build(model, obj, *args, **kwargs)
  File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo/builder/builder.py", line 239, in build
    return cls.builders[obj_cls](model, obj, *args, **kwargs)
  File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/connection.py", line 85, in build_connection
    build_chip_to_host(model, conn)
  File "/home/nfey/.pyenv/versions/wisard/lib/python3.6/site-packages/nengo_loihi/builder/connection.py", line 291, in build_chip_to_host
    "(got %r)" % type(conn.pre_obj).__name__
NotImplementedError: Learning rule presynaptic object must be an Ensemble (got 'Neurons')

Will someone please help me work around this? So far, I’ve tried to reference the nonlinear adaptive control example here for a solution, but I have not had much luck.

Here is the code for the class.

import nengo
import numpy as np

from abr_control._vendor.nengolib.stats import ScatteredHypersphere, spherical_transform

import nengo_loihi
nengo_loihi.set_defaults()

class LoihiDynamicsAdaptation:
    """An implementation of nonlinear dynamics adaptation using Nengo
    Loihi, as described in (DeWolf, Stewart, Slotine, and Eliasmith, 2016).

    The model learns to account for unknown / unmodelled external or
    internal forces, using an efferent copy of the control signal to train.

    Parameters
    ----------
    n_input: int
        the number of inputs going into the adaptive population
    n_output: int
        the number of outputs expected from the adaptive population
    n_neurons: int, optional (Default: 1000)
        number of neurons per adaptive population
    n_ensembles: int, optional (Default: 1)
        number of ensembles of n_neurons number of neurons
    seed: int, optional (Default: None)
        the seed used for random number generation
    pes_learning_rate: float, optional (Default: 1e-6)
        controls the speed of neural adaptation
    intercepts: np.array, optional (Default: None)
        the neural intercepts to be used by the neural ensembles
        shape (n_ensembles, n_neurons)
    encoders: np.array, optional (Default: None)
        an (n_encoders, n_neurons, n_input) array of preferred directions
        for all of the neurons in the adaptive ensembles
    spherical: boolean, Optional (Default: False)
        True to convert inputs to the surface of the hypersphere
        False to scale from 0 to 1
    means: np.array, optional (Default: None)
        Subtracted from the input to the neural ensemble to center the data
        for each dimension around zero
    variances: np.array, optional (Default: None)
        The input signal to the neural ensemble is divided by these values
        after mean subtraction to put the data for each dimensions in the
        range -1 to 1, or 0 to 1 if spherical=True

        ***NOTE*** The variances do not have to encompass the full range of
        joint positions or velocities. Outliers can be left outside the scaling
        range to get a better scaling of the majority of your expected inputs.
        The outliers will still be scaled, but they will be outside the -1 to 1
        range for non-spherical, or 0 to 1 range for spherical
    """

    def __init__(
        self,
        n_input,
        n_output,
        n_neurons=1000,
        n_ensembles=1,
        seed=None,
        pes_learning_rate=1e-6,
        intercepts=None,
        weights=None,
        encoders=None,
        spherical=False,
        means=None,
        variances=None,
        tau_input=0.012,
        tau_training=0.012,
        tau_output=0.2,
        **kwargs,
    ):

        self.n_neurons = n_neurons
        self.n_ensembles = n_ensembles
        if spherical:
            n_input += 1
        self.spherical = spherical

        # if only one of means or variances is defined
        # define the other to have no effect on the data
        if means is not None and variances is None:
            variances = np.ones(means.shape)
        elif means is None and variances is not None:
            means = np.zeros(variances.shape)
        self.means = np.asarray(means)
        self.variances = np.asarray(variances)

        # synapse time constants
        self.tau_input = 0.012  # on input connection
        self.tau_training = 0.012  # on the training signal
        self.tau_output = 0.2  # on the output from the adaptive ensemble
        # NOTE: the time constant on the neural activity used in the learning
        # connection is the default 0.005, and can be set by specifying the
        # pre_synapse parameter inside the PES rule instantiation

        self.seed = seed
        self.pes_learning_rate = pes_learning_rate

        if intercepts is None:
            # Generates intercepts for a d-dimensional ensemble, such that, given a
            # random uniform input (from the interior of the d-dimensional ball), the
            # probability of a neuron firing has the probability density function given
            # by rng.triangular(left, mode, right, size=n)
            triangular = np.random.triangular(
                left=0.35, mode=0.45, right=0.55, size=n_neurons * n_ensembles
            )
            intercepts = nengo.dists.CosineSimilarity(n_input + 2).ppf(1 - triangular)
            intercepts = intercepts.reshape((n_ensembles, n_neurons))

        if weights is None:
            weights = np.zeros((self.n_ensembles, n_output, self.n_neurons))
            print("Initializing connection weights to all zeros")

        if encoders is None:
            np.random.seed = self.seed
            # if NengoLib is installed, use it to optimize encoder placement
            try:
                encoders_dist = ScatteredHypersphere(surface=True)
            except ImportError:
                encoders_dist = nengo.Default
                print(
                    "NengoLib not installed, encoder placement will "
                    + "be sub-optimal."
                )
            encoders = encoders_dist.sample(n_neurons * n_ensembles, n_input)
            encoders = encoders.reshape(n_ensembles, n_neurons, n_input)

        self.input_signal = np.zeros(n_input)
        self.training_signal = np.zeros(n_output)
        self.output = np.zeros(n_output)

        self.nengo_model = nengo.Network(seed=seed)
        # Set the default neuron type for the network
        self.nengo_model.config[nengo.Ensemble].neuron_type = nengo.LIF()
        with self.nengo_model:

            def input_signals_func(t):
                return self.input_signal

            input_signals = nengo.Node(input_signals_func, size_out=n_input)

            def training_signals_func(t):
                return -self.training_signal

            training_signals = nengo.Node(training_signals_func, size_out=n_output)

            # make the adaptive population output accessible
            def output_func(t, x):
                self.output = np.copy(x)

            output = nengo.Node(output_func, size_in=n_output, size_out=0)

            self.adapt_ens = []
            self.conn_learn = []
            for ii in range(self.n_ensembles):
                self.adapt_ens.append(
                    nengo.Ensemble(
                        n_neurons=self.n_neurons,
                        dimensions=n_input,
                        intercepts=intercepts[ii],
                        radius=np.sqrt(n_input),
                        encoders=encoders[ii],
                        **kwargs,
                    )
                )

                # hook up input signal to adaptive population to provide context
                nengo.Connection(
                    input_signals,
                    self.adapt_ens[ii],
                    synapse=self.tau_input,
                )
                
                self.conn_learn.append(
                    nengo.Connection(
                        self.adapt_ens[ii].neurons,
                        output,
                        learning_rule_type=nengo.PES(pes_learning_rate),
                        transform=weights[ii],
                        synapse=self.tau_output,
                    )
                )

                # hook up the training signal to the learning rule
                nengo.Connection(
                    training_signals,
                    self.conn_learn[ii].learning_rule,
                    synapse=self.tau_training,
                )

        nengo.rc.set("decoder_cache", "enabled", "False")
        self.sim = nengo_loihi.Simulator(self.nengo_model, dt=0.001)

    def generate(self, input_signal, training_signal):
        """Generates the control signal

        Parameters
        ----------
        input_signal : numpy.array
            the current desired input signal, typical joint positions and
            velocities in [rad] and [rad/sec] respectively
        training_signal : numpy.array
            the learning signal to drive adaptation
        """

        # if means or variances was defined, self.means is not None
        if self.means is not None:
            input_signal = self.scale_inputs(input_signal)

        # store local copies to feed in to the adaptive population
        self.input_signal = input_signal
        self.training_signal = training_signal
        
        # run the simulation t generate the adaptive signal
        self.sim.run(time_in_seconds=0.001, progress_bar=False)
        
        return self.output

    def scale_inputs(self, input_signal):
        """
        Currently set to accept joint position and velocities as time
        x dimension arrays, and returns them scaled based on the means and
        variances set on instantiation

        PARAMETERS
        ----------
        input_signal : numpy.array
            the current desired input signal, typical joint positions and
            velocities in [rad] and [rad/sec] respectively

        The reason we do a shift by the means is to try and center the majority
        of our expected input values in our scaling range, so when we stretch
        them by variances they encompass more of our input range.
        """
        scaled_input = (input_signal - self.means) / self.variances
        
        if self.spherical:
            # put into the 0-1 range
            scaled_input = scaled_input / 2 + 0.5
            # project onto unit hypersphere in larger state space
            scaled_input = scaled_input.flatten()
            scaled_input = spherical_transform(
                scaled_input.reshape(1, len(scaled_input))
            )
        
        return scaled_input

    def get_weights(self):
        """Save the current weights to the specified test_name folder

        Save weights for individual runs. A group of runs is
        classified as a session. Multiple sessions can then be used
        to average over a set of learned runs. If session or run are set to None
        then the test_name location will be searched for the highest numbered
        folder and file respectively
        """

        return [
            self.sim.signals[self.sim.model.sig[conn]["weights"]]
            for conn in self.conn_learn
        ]

And here is the script where I am attempting to create an adaptive controller object.

"""
Move the kinova-gen3-7dof Mujoco arm to a target position
while experiencing an unexpected point force at its end 
effector. The simulation ends after reaching the target for 
1.5 simulated seconds, and the trajectory of the end-effector 
is plotted in 3D.
"""
import sys
import traceback
import numpy as np
import glfw

from abr_control.controllers import OSC, Damping, signals
from abr_control.arms.mujoco_config import MujocoConfig as arm
from abr_control.interfaces.mujoco import Mujoco
from abr_control.utils import transformations

if len(sys.argv) > 1:
    arm_model = sys.argv[1]
else:
    arm_model = "kinova-gen3-7dof"

# initialize our robot config for the kinova-gen3-7dof
robot_config = arm(arm_model)

# create our Mujoco interface
interface = Mujoco(robot_config, dt=0.001)
interface.connect()
interface.send_target_angles(robot_config.START_ANGLES)


# damp the movements of the arm
damping = Damping(robot_config, kv=10)
# instantiate controller
ctrlr = OSC(
    robot_config,
    kp=200,
    null_controllers=[damping],
    vmax=[0.5, 0],  # [m/s, rad/s]
    # control (x, y, z) out of [x, y, z, alpha, beta, gamma]
    ctrlr_dof=[True, True, True, False, False, False],
)

interface.viewer.adapt = True

# create our adaptive controller
adapt = signals.LoihiDynamicsAdaptation(
    n_neurons=5000,
    n_ensembles=7,
    n_input=14,  # we apply adaptation on the most heavily stressed joints
    n_output=7,
    pes_learning_rate=1e-3,#4.1e-4,
    means=np.zeros(14),
    variances=np.hstack([np.ones(7) * np.pi, np.ones(7) * 0.8727]),
    spherical=True,
)

target_geom_id = interface.sim.model.geom_name2id("target")
green = [0, 0.9, 0, 0.5]
red = [0.9, 0, 0, 0.5]

# set up lists for tracking data
ee_track = []
target_track = []
q_track = []
dq_track = []

try:
    # get the end-effector's initial position
    feedback = interface.get_feedback()
    start = robot_config.Tx("EE", feedback["q"])

    # make the target offset from that start position
    target_xyz = start + np.array([0, -0.2, -0.2])
    interface.set_mocap_xyz(name="target", xyz=target_xyz)

    # set external force on end effector
    interface.set_external_force('EE',np.array([0,0,-10,0,0,0]))

    count = 0.0
    time = 0
    score = 0
    while 1:
        if interface.viewer.exit or (time == 1500):
            glfw.destroy_window(interface.viewer.window)
            break
        
        # get joint angle and velocity feedback
        feedback = interface.get_feedback()

        target = np.hstack(
            [   interface.get_xyz("target"),
                transformations.euler_from_quaternion(
                    interface.get_orientation("target"), "rxyz"
                ),
            ]
        )

        # calculate the control signal
        u = ctrlr.generate(
            q=feedback["q"],
            dq=feedback["dq"],
            target=target,
        )

        u_adapt = np.zeros(robot_config.N_JOINTS)
        u_adapt[:7] = adapt.generate(
            input_signal=np.hstack((feedback["q"][:7], feedback["dq"][:7])),
            training_signal=np.array(ctrlr.training_signal[:7]),
        )
        if interface.viewer.adapt == True:
            u += u_adapt

        # send forces into Mujoco, step the sim forward
        interface.send_forces(u)

        # calculate end-effector position
        ee_xyz = robot_config.Tx("EE", q=feedback["q"])
        # track data
        ee_track.append(np.copy(ee_xyz))
        target_track.append(np.copy(target[:3]))
        q_track.append(np.copy(feedback["q"]))
        dq_track.append(np.copy(feedback["dq"]))

        count += 1

        error = np.linalg.norm(ee_xyz - target[:3])
        if error < 0.02:
            interface.sim.model.geom_rgba[target_geom_id] = green
            time += 1
        else:
            interface.sim.model.geom_rgba[target_geom_id] = red
            time = 0

except:
    print(traceback.format_exc())

finally:
    # stop and reset the Mujoco simulation
    interface.disconnect()

    ee_track = np.array(ee_track)
    target_track = np.array(target_track)
    q_track = np.asarray(q_track)
    dq_track = np.asarray(dq_track)

    if ee_track.shape[0] > 0:
        # plot distance from target and 3D trajectory
        import matplotlib.pyplot as plt
        from mpl_toolkits.mplot3d import axes3d  # pylint: disable=W0611

        fig = plt.figure(figsize=(8, 12))

        ax1 = fig.add_subplot(211)
        ax1.set_ylabel("Distance (m)")
        ax1.set_xlabel("Time (ms)")
        ax1.set_title("Distance to target")
        ax1.plot(
            np.sqrt(np.sum((np.array(target_track) - np.array(ee_track)) ** 2, axis=1))
        )

        ax2 = fig.add_subplot(212, projection="3d")
        ax2.set_title("End-Effector Trajectory")
        ax2.plot(ee_track[:, 0], ee_track[:, 1], ee_track[:, 2], label="ee_xyz")
        ax2.scatter(target_xyz[0], target_xyz[1], target_xyz[2], label="target", c="r")
        ax2.legend()
        plt.show()
        #plt.savefig('point_force_results.png')

I was able to work around this issue by connecting the output to the adaptive ensemble itself rather than its neurons, removing the transform parameter, and setting the function parameter to np.zeros(7). The only issue with this solution is that I cannot set the connection weights before training.

                self.conn_learn.append(
                    nengo.Connection(
                        self.adapt_ens[ii],
                        output,
                        function = lambda x: np.zeros(n_output),
                        learning_rule_type=nengo.PES(pes_learning_rate),
                        # transform=weights[ii],
                        synapse=self.tau_output,
                    )
                )

I’d appreciate it if anyone could provide a better solution.

Thanks!

Hi @nolan,

You are correct in the observation that this error:

NotImplementedError: Learning rule presynaptic object must be an Ensemble (got 'Neurons')

means that for NengoLoihi, the PES learning rule only supports a pre population that is a nengo.Ensemble object, rather than the nengo.Neurons object. Correcting this error, is, as you pointed out, changing self.adapt_ens[ii].neurons to self.adapt_ens[ii]. And you are correct in the observation that you can use the function or transform object to set the weights to 0.

You can also use the solvers parameter to load in custom weights. As an example:

weights = np.zeros((self.adapt_ens[ii].n_neurons, output.size_out))
self.conn_learn.append(
    nengo.Connection(
        self.adapt_ens[ii],
        output,
        learning_rule_type=nengo.PES(pes_learning_rate),
        synapse=self.tau_output,
        solver=nengo.solvers.NoSolver(weights)
    )
)

I’ve also made a test script that creates a learning network, runs it for a while (to do the learning), then saves and loads the weights into another network. All of this is done using the NoSolver method of loading weights, as described above.
test_loadweights2.py (2.3 KB)

1 Like

Thank you @xchoo for the response and test script.

I am interested in saving the connection weights from a model trained using NengoLoihi. I noticed that the NengoLoihi simulator does not have a signals parameter, so using

        return [

            self.sim.signals[self.sim.model.sig[conn]["weights"]]

            for conn in self.conn_learn

        ]

when

self.sim = nengo_loihi.Simulator(self.nengo_model,model=model, dt=0.001)

gives an error.

AttributeError: 'Simulator' object has no attribute 'signals'

Do you know of another way to save the connection weights?

Oh! I had forgotten to mention. The method of probing the connection weights only applies to regular Nengo. NengoLoihi is designed to interface with the simulation running on the Loihi chip through Intel’s NxSDK API. Unfortunately, because the weight changes happen in the chip itself, and because Intel did not provide a way to probe the weights in the NxSDK API, it is currently not possible to obtain the connection weights from any model or network in NengoLoihi.

If you would like to have this feature added, I would make a post on Intel’s INRC forums and ask them if there is a way to probe the connection weights with NxSDK.

1 Like

Interesting! Thank you so much for the help, and I will consider reaching out to them.