In [None]:
!pip install nengo
#

In [None]:
!pip install nengo-dl


In [None]:
%matplotlib inline

from urllib.request import urlretrieve

import matplotlib.pyplot as plt
import nengo
import numpy as np
import tensorflow as tf

import nengo_dl
import cv2
seed = 0
np.random.seed(seed)
tf.random.set_seed(seed)

In [None]:
(train_images, train_labels), (
    test_images,
    test_labels,
) = tf.keras.datasets.fashion_mnist.load_data()


# flatten images and add time dimension
train_images = train_images.reshape((train_images.shape[0], 1, -1))
train_labels = train_labels.reshape((train_labels.shape[0], 1, -1))
test_images = test_images.reshape((test_images.shape[0], 1, -1))
test_labels = test_labels.reshape((test_labels.shape[0], 1, -1))

In [None]:
# AlexNet 

# input
inp = tf.keras.Input(shape=(28, 28, 1))

# convolutional layers
conv0 = tf.keras.layers.Conv2D(
    filters=24,
    kernel_size=11,
    strides=4,
    activation=tf.nn.relu,
    padding='same',
)(inp)

conv1 = tf.keras.layers.Conv2D(
    filters=64,
    kernel_size=5,
    strides=1,
    padding='same',
    activation=tf.nn.relu,
)(conv0)

pool0=tf.keras.layers.AveragePooling2D(
    pool_size=2,
    strides=2,
)(conv1)

conv2=tf.keras.layers.Conv2D(
    filters=96, 
    kernel_size=3,
    strides=1,
    padding='same',
    activation=tf.nn.relu,
)(pool0)

pool1=tf.keras.layers.AveragePooling2D(
    pool_size=2,
    strides=2,
)(conv2)

conv3=tf.keras.layers.Conv2D(
    filters=96,
    kernel_size=3,
    strides=1,
    padding='same',
    activation=tf.nn.relu,
)(pool1)

conv4=tf.keras.layers.Conv2D(
    filters=24,
    kernel_size=3,
    strides=1,
    padding='same',
    activation=tf.nn.relu,
)(conv3)

# fully connected layer
flatten = tf.keras.layers.Flatten()(conv4)
dense0 = tf.keras.layers.Dense(units=1024)(flatten)
dense1=tf.keras.layers.Dense(units=1024)(dense0)
dense2=tf.keras.layers.Dense(units=250)(dense1)
dense=tf.keras.layers.Dense(units=10)(dense2)

model = tf.keras.Model(inputs=inp, outputs=dense)


In [None]:
!pip install keras-spiking

In [None]:
import keras_spiking

In [None]:
energy = keras_spiking.ModelEnergy(model)
energy.summary(columns=(
        "name",
        "energy cpu",
        "energy gpu",
        "synop_energy cpu",
        "synop_energy gpu",
        "neuron_energy cpu",
        "neuron_energy gpu",
    ),
    print_warnings=False
  )

In [None]:
converter = nengo_dl.Converter(model)

In [None]:
from nengo_dl.graph_optimizer import noop_planner

In [None]:
do_training = True
if do_training:
  

    with converter.net:
          nengo_dl.configure_settings(planner=noop_planner)
    with nengo_dl.Simulator(converter.net, minibatch_size=200) as sim:
        # run training
        sim.compile(
            optimizer=tf.optimizers.RMSprop(0.001),
            loss={
                converter.outputs[dense]: tf.losses.SparseCategoricalCrossentropy(
                    from_logits=True
                )
            },
            metrics={converter.outputs[dense]: tf.metrics.sparse_categorical_accuracy},
        )
        sim.fit(
            {converter.inputs[inp]: train_images},
            {converter.outputs[dense]: train_labels},
            validation_data=(
                {converter.inputs[inp]: test_images},
                {converter.outputs[dense]: test_labels},
            ),
            epochs=50,
        )
       

        # save the parameters to file
        sim.save_params("./keras_to_snn_params")
else:
    # download pretrained weights
    urlretrieve(
        "https://drive.google.com/uc?export=download&"
        "id=1lBkR968AQo__t8sMMeDYGTQpBJZIs2_T",
        "keras_to_snn_params.npz",
    )
    print("Loaded pretrained weights")

Build finished in 0:00:00                                                      
Optimization finished in 0:00:00                                               
Construction finished in 0:00:00                                               
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


In [None]:
def run_network(
    activation,
    params_file="keras_to_snn_params",
    n_steps=120,
    scale_firing_rates=1,
    synapse=None,
    n_test=500,
):
    # convert the keras model to a nengo network
    nengo_converter = nengo_dl.Converter(
        model,
        swap_activations={tf.nn.relu: activation},
        scale_firing_rates=scale_firing_rates,
        synapse=synapse,
    )

    # get input/output objects
    nengo_input = nengo_converter.inputs[inp]
    nengo_output = nengo_converter.outputs[dense]

    with converter.net:
        nengo_dl.configure_settings(planner=noop_planner)
    # add a probe to the first convolutional layer to record activity.
    # we'll only record from a subset of neurons, to save memory.
    sample_neurons = np.linspace(
        0,
        np.prod(conv0.shape[1:]),
        1000,
        endpoint=False,
        dtype=np.int32,
    )
    with nengo_converter.net:
        conv0_probe = nengo.Probe(nengo_converter.layers[conv4])

    # repeat inputs for some number of timesteps
    tiled_test_images = np.tile(test_images[:n_test], (1, n_steps, 1))

    # set some options to speed up simulation
    with nengo_converter.net:
        nengo_dl.configure_settings(stateful=False)

    # build network, load in trained weights, run inference on test images
    with nengo_dl.Simulator(
        nengo_converter.net, minibatch_size=10, progress_bar=False
    ) as nengo_sim:
        nengo_sim.load_params(params_file)
        data = nengo_sim.predict({nengo_input: tiled_test_images})

    # compute accuracy on test data, using output of network on
    # last timestep
    predictions = np.argmax(data[nengo_output][:, -1], axis=-1)
    accuracy = (predictions == test_labels[:n_test, 0, 0]).mean()
    print(f"Test accuracy: {100 * accuracy:.2f}%")

    # plot the results
    for ii in range(3):
        plt.figure(figsize=(12, 4))

        plt.subplot(1, 3, 1)
        plt.title("Input image")
        plt.imshow(test_images[ii, 0].reshape((28, 28)), cmap="gray")
        plt.axis("off")

        plt.subplot(1, 3, 2)
        scaled_data = data[conv0_probe][ii] * scale_firing_rates
        if isinstance(activation, nengo.SpikingRectifiedLinear):
        #     scaled_data *= 0.001
        #     rates = np.sum(scaled_data, axis=0) / (n_steps * nengo_sim.dt)
            plt.ylabel("Number of spikes")
        # else:
        #     rates = scaled_data
        #     plt.ylabel("Firing rates (Hz)")
        plt.xlabel("Timestep")
        plt.title(
            f"Neural activities (conv0 mean={rates.mean():.1f} Hz, "
            f"max={rates.max():.1f} Hz)"
        )
        plt.plot(scaled_data)

        plt.subplot(1, 3, 3)
        plt.title("Output predictions")
        plt.plot(tf.nn.softmax(data[nengo_output][ii]))
        plt.legend([str(j) for j in range(10)], loc="upper left")
        plt.xlabel("Timestep")
        plt.ylabel("Probability")

        plt.tight_layout()

In [None]:
run_network(activation=nengo.RectifiedLinear())


In [None]:
run_network(activation=nengo.SpikingRectifiedLinear())

In [None]:
for s in [ 0.01, 0.02, 0.05,0.2]:
    print(f"Synapse={s:.3f}")
    run_network(
        activation=nengo.SpikingRectifiedLinear(),
        n_steps=120,
        synapse=s,
        # scale_firing_rates=5
    )
    plt.show()

In [None]:

for scale in [5, 10, 20]:
    print(f"Scale={scale}")
    run_network(
        activation=nengo.SpikingRectifiedLinear(),
        scale_firing_rates=scale,
        synapse=0.02,
        n_steps=120
    )
    plt.show()
