Tag Archives: programming

Natural policy gradient in TensorFlow

In working towards reproducing some results from deep learning control papers, one of the learning algorithms that came up was natural policy gradient. The basic idea of natural policy gradient is to use the curvature information of the of the policy’s distribution over actions in the weight update. There are good resources that go into details about the natural policy gradient method (such as here and here), so I’m not going to go into details in this post. Rather, possibly just because I’m new to TensorFlow, I found implementing this method less than straightforward due to a number of unexpected gotchyas. So I’m going to quickly review the algorithm and then mostly focus on the implementation issues that I ran into.

If you’re not familiar with policy gradient or various approaches to the cart-pole problem I also suggest that you check out this blog post from KV Frans, which provides the basis for the code I use here. All of the code that I use in this post is available up on my GitHub, and it’s worth noting this code is for TensorFlow 1.7.0.

Natural policy gradient review

Let’s informally define our notation

  • s_t and a_t are the system state and chosen action at time t
  • \theta are the network parameters that we’re learning
  • \pi_\theta is the control policy network parameterized \theta
  • A^\pi(s_t, a_t, t) is the advantage function, which returns the estimated total reward for taking action a_t in state s_t relative to default behaviour (i.e. was this action better or worse than what we normally do)

The regular policy gradient is calculated as:

\textbf{g} = \frac{1}{T}\sum_{t=0}^T \nabla_\theta \; \textrm{log} \; \pi_\theta(a_t | s_t) \; \hat{A}^\pi(s_t, a_t, t),

where \nabla_\theta denotes calculating the partial derivative with respect to \theta, the hat above A^\pi denotes that we’re using an estimation of the advantage function, and \pi_\theta(a_t | s_t) returns a scalar value which is the probability of the chosen action a_t in state s_t.

For the natural policy gradient, we’re going to calculate the Fisher information matrix, F, which estimates the curvature of the control policy:

\hat{F}_{\theta_k} = \frac{1}{T} \sum_{t=0}^T \nabla_\theta \; \textrm{log} \; \pi_\theta(a_t | s_t) \; \nabla_\theta \; \textrm{log} \; \pi_\theta (a_t | s_t)^T.

The goal of including curvature information is to be able move along the steepest ascent direction, which we estimate with \hat{\textbf{F}}^{-1}_{\theta_k}. Including this then the weight update becomes

\theta_{k+1} = \theta_k + \alpha \hat{\textbf{F}}^{-1}_{\theta_k} \textbf{g},

where \alpha is a learning rate parameter. In his paper Towards Generalization and Simplicity in Continuous Control, Aravind Rajeswaran notes that empirically it’s difficult to choose an appropriate \alpha value or set an appropriate schedule. To address this, they normalize the update under the Fisher metric:

\theta_{k+1} = \theta_k + \sqrt{\frac{\delta}{\textbf{g}^T\hat{\textbf{F}}^{-1}_{\theta_k}\textbf{f}}} \; \hat{\textbf{F}}^{-1}_{\theta_k} \textbf{g}.

Using the above for the learning rate can be thought of as choosing a step size \delta that is normalized with respect to the change in the control policy. This is beneficial because it means that we don’t do any parameter updates that will drastically change the output of the policy network.

Here’s what the pseudocode for the algorithm looks like

  • Initialize policy parameters \theta_0
  • for k = 1 to K do:
    • collect N trajectories by rolling out the stochastic policy \pi_{\theta_k}
    • compute \nabla_\theta \; \textrm{log} \; \pi_\theta(a_t | s_t) for each (s, a) pair along the trajectories sampled
    • compute advantages \hat{A}^\pi_k based on the sampled trajectories and the estimated value function V^\pi_{k-1}
    • compute the policy gradient g as specified above
    • compute the Fisher matrix and perform the weight update as specified above
    • update the value network to get V^\pi_k

OK great! On to implementation.

Implementation in TensorFlow 1.7.0

Calculating the gradient at every time step

In TensorFlow, the tf.gradient(ys, xs) function, which calculates the partial derivative of  ys with respect to xs:

\nabla_{X} Y

will automatically sum over over all elements in ys. There is no function for getting the gradient of each individual entry (i.e. the Jacobian) in ys, which is less than great. People have complained, but until some update happens we can address this with basic list comprehension to iterate through each entry in the history and calculate the gradient at that point in time. If you have a faster means that you’ve tested please comment below!

The code for this looks like

        g_log_prob = tf.stack(
            [tf.gradients(action_log_prob_flat[i], my_variables)[0]
                for i in range(action_log_prob_flat.get_shape()[0])])

Where the result is called g_log_prob_flat because it’s the gradient of the log probability of the chosen action at each time step.

Inverse of a positive definite matrix with rounding errors

One of the problems I ran into was in calculating the normalized learning rate \sqrt{\frac{\delta}{\textbf{g}^T\hat{\textbf{F}}^{-1}_{\theta_k}\textbf{f}}} was that the square root calculation was dying because negative values were being set in. This was very confusing because \hat{\textbf{F}} is a positive definite matrix, which means that \hat{\textbf{F}}^{-1} is also positive definite, which means that \textbf{x}^T \hat{\textbf{F}}^{-1} \textbf{x} \geq 0 for any \textbf{x}.

So it was clear that it was a calculation error somewhere, but I couldn’t find the source of the error for a while, until I started doing the inverse calculation myself using SVD to explicitly look at the eigenvalues and make sure they were all > 0. Turns out that there were some very, very small eigenvalues with a negative sign, like -1e-7 kind of range. My guess (and hope) is that these are just rounding errors, but they’re enough to mess up the calculations of the normalized learning rate. So, to handle this I explicitly set those values to zero, and that took care of that.

        S, U, V = tf.svd(F)
        atol = tf.reduce_max(S) * 1e-6
        S_inv = tf.divide(1.0, S)
        S_inv = tf.where(S < atol, tf.zeros_like(S), S_inv)
        S_inv = tf.diag(S_inv)
        F_inv = tf.matmul(S_inv, tf.transpose(U))
        F_inv = tf.matmul(V, F_inv)

Manually adding an update to the learning parameters

Once the parameter update is explicitly calculated, we then need to update them. To implement an explicit parameter update in TensorFlow we do

        # update trainable parameters
        my_variables[0] = tf.assign_add(my_variables[0], update)

So to update the parameters then we call sess.run(my_variables, feed_dict={...}). It’s worth noting here too that any time you run the session with my_variables the parameter update will be calculated and applied, so you can’t run the session with my_variables to only fetch the current values.

Results on the CartPole problem

Now running the original policy gradient algorithm against the natural policy gradient algorithm (with everything else the same) we can examine the results of using the Fisher information matrix in the update provides some strong benefits. The plot below shows the maximum reward received in a batch of 200 time steps, where the system receives a reward of 1 for every time step that the pole stays upright, and 200 is the maximum reward achievable.
Natural_policy_gradient
To generate this plot I ran 10 sessions of 300 batches, where each batch runs as many episodes as it takes to get 200 time steps of data. The solid lines are the mean value at each epoch across all sessions, and the shaded areas are the 95% confidence intervals. So we can see that the natural policy gradient starts to hit a reward of 200 inside 100 batches, and that the mean stays higher than normal policy gradient even after 300 batches.

It’s also worth noting that for both the policy gradient and natural policy gradient the average time to run a batch and weight update was about the same on my machine, between 150-180 milliseconds.

The modified code for policy gradient, the natural policy gradient, and plotting code are all up on my GitHub.

Other notes

  • Had to reduce the \delta to 0.001 (from 0.05 which was used in the Rajeswaran paper) to get the learning to converge, suspect this is because of the greatly reduced complexity of the space and control problem.
  • The Rajeswaran paper algorithm does gradient ascent instead of descent, which is why the signs are how they are.
Advertisement
Tagged , , ,

Building a spiking neural model of adaptive arm control

About a year ago I published the work from my thesis in a paper called ‘A spiking neural model of adaptive arm control’. In this paper I presented the Recurrent Error-driven Adaptive Control Hierarchy (REACH) model. The goal of the model is to begin working towards reproducing behavioural level phenomena of human movement with biologically plausible spiking neural networks.

To do this, I start by using three methods from control literature (operational space control, dynamic movement primitives, and non-linear adaptive control) to create an algorithms level model of the motor control system that captures behavioural level phenomena of human movement. Then I explore how this functionality could be mapped to the primate brain and implemented in spiking neurons. Finally, I look at the data generated by this model on the behavioural level (e.g. kinematics of movement), the systems level (i.e. analysis of populations of neurons), and the single-cell level (e.g. correlating neural activity with movement parameters) and compare/contrast with experimental data.

By having a full model framework (from observable behaviour to neural spikes) is to have a more constrained computational model of the motor control system; adding lower-level biological constraints to behavioural models and higher-level behavioural constraints to neural models.

In general, once you have a model, the critical next step is to generating testable predictions that can be used to discriminate between other models with different implementations or underlying algorithms. Discriminative predictions allow us to design experiments that can gather evidence in favour or against different hypotheses of brain function, and provide clues to useful directions for further research. Which is the real contribution of computational modeling.

So that’s a quick overview of the paper; there are quite a few pages of supplementary information that describe the details of the model implementation, and I provided the code and data used to generate the data analysis figures. However, code to explicitly run the model on your own has been missing. As one of the major points of this blog is to provide code for furthering research, this is pretty embarrassing. So, to begin to remedy this, in this post I’m going to work through a REACH framework for building models to control a two-link arm through reaching in a line, tracing a circle, performing the centre-out reaching task, and adapting online to unexpected perturbations during reaching imposed by a joint-velocity based forcefield.

This post is directed towards those who have already read the paper (although not necessarily the supplementary material). To run the scripts you’ll need Nengo, Nengo GUI, and NengoLib all installed. There’s a description of the theory behind the Neural Engineering Framework, which I use extensively in my Nengo modeling, in the paper. I’m hoping that between that and code readability / my explanations below that most will be comfortable starting to play around with the code. But if you’re not, and would like more resources, you can check out the Getting Started page on the Nengo website, the tutorials from How To Build a Brain, and the examples in the Nengo GUI.

You can find all of the code up on my GitHub.

NOTE: I’m using the two-link arm (which is fully implemented in Python) instead of the three-link arm (which has compile issues for Macs) both so that everyone should be able to run the model arm code and to reduce the number of neurons that are required for control, so that hopefully you can run this on you laptop in the event that you don’t have a super power Ubuntu work station. Scaling this model up to the three-link arm is straight-forward though, and I will work on getting code up (for the three-link arm for non-Mac users) as a next project.

Implementing PMC – the trajectory generation system

I’ve talked at length about dynamic movement primitives (DMPs) in previous posts, so I won’t describe those again here. Instead I will focus on their implementation in neurons.

def generate(y_des, speed=1, alpha=1000.0):
    beta = alpha / 4.0

    # generate the forcing function
    forces, _, goals = forcing_functions.generate(
        y_des=y_des, rhythmic=False, alpha=alpha, beta=beta)

    # create alpha synapse, which has point attractor dynamics
    tau = np.sqrt(1.0 / (alpha*beta))
    alpha_synapse = nengolib.Alpha(tau)

    net = nengo.Network('PMC')
    with net:
        net.output = nengo.Node(size_in=2)

        # create a start / stop movement signal
        time_func = lambda t: min(max(
            (t * speed) % 4.5 - 2.5, -1), 1)

        def goal_func(t):
            t = time_func(t)
            if t <= -1:
                return goals[0]
            return goals[1]
        net.goal = nengo.Node(output=goal_func, label='goal')

        # -------------------- Ramp ---------------------------
        ramp_node = nengo.Node(output=time_func, label='ramp')
        net.ramp = nengo.Ensemble(
            n_neurons=500, dimensions=1, label='ramp ens')
        nengo.Connection(ramp_node, net.ramp)

        # ------------------- Forcing Functions ---------------
        def relay_func(t, x):
            t = time_func(t)
            if t <= -1:
                return [0, 0]
            return x
        # the relay prevents forces from being sent on reset
        relay = nengo.Node(output=relay_func, size_in=2)

        domain = np.linspace(-1, 1, len(forces[0]))
        x_func = interpolate.interp1d(domain, forces[0])
        y_func = interpolate.interp1d(domain, forces[1])
        transform=1.0/alpha/beta
        nengo.Connection(net.ramp, relay[0], transform=transform,
                         function=x_func, synapse=alpha_synapse)
        nengo.Connection(net.ramp, relay[1], transform=transform,
                         function=y_func, synapse=alpha_synapse)
        nengo.Connection(relay, net.output)

        nengo.Connection(net.goal[0], net.output[0],
                         synapse=alpha_synapse)
        nengo.Connection(net.goal[1], net.output[1],
                         synapse=alpha_synapse)
    return net

The generate method for the PMC takes in a desired trajectory, y_des, as a parameter. The first thing we do (on lines 5-6) is calculate the forcing function that will push the DMP point attractor along the desired trajectory.

The next thing (on lines 9-10) is creating an Alpha (second-order low-pass filter) synapse. By writing out the dynamics of a point attractor in Laplace space, one of the lab members, Aaron Voelker, noticed that the dynamics could be fully implemented by creating an Alpha synapse with an appropriate choice of tau. I walk through all of the math behind this in this post. Here we’ll use that more compact method and project directly to the output node, which improves performance and reduces the number of neurons.

Inside the PMC network we create a time_func node, which is the pace-setter during simulation. It will output a linear ramp from -1 to 1 every few seconds, with the pace set by the speed parameter, and then pause before restarting.

We also have a goal node, which will provide a target starting and ending point for the trajectory. Both the time_func and goal nodes are created and used as a model simulation convenience, and proposed to be generated elsewhere in the brain (the basal ganglia, why not? #igotreasons #provemewrong).

The ramp ensemble is the most important component of the trajectory generation system. It takes the output from the time_func node as input, and generates the forcing function which will guide our little system through the trajectory that was passed in. The ensemble itself is nothing special, but rather the function that it approximates on its outgoing connection. We set up this function approximation with the following code:

        domain = np.linspace(-1, 1, len(forces[0]))
        x_func = interpolate.interp1d(domain, forces[0])
        y_func = interpolate.interp1d(domain, forces[1])
        transform=1.0/alpha/beta
        nengo.Connection(net.ramp, relay[0], transform=transform,
                         function=x_func, synapse=alpha_synapse)
        nengo.Connection(net.ramp, relay[1], transform=transform,
                         function=y_func, synapse=alpha_synapse)
        nengo.Connection(relay, net.output)

We want the forcing function be generated as the signals represented in the ramp ensemble moves from -1 to 1. To achieve this, we create interpolation functions, x_func and y_func, which are set up to generate the forcing function values mapped to input values between -1 and 1. We pass these functions into the outgoing connections from the ramp population (one for x and one for y). So now when the ramp ensemble is representing -1, 0, and 1 the output along the two connections will be the starting, middle, and ending x and y points of the forcing function trajectory. The transform and synapse are set on each connection with the appropriate gain values and Alpha synapse, respectively, to implement point attractor dynamics.

NOTE: The above DMP implementation can generate a trajectory signal with as many dimensions as you would like, and all that’s required is adding another outgoing Connection projecting from the ramp ensemble.

The last thing in the code is hooking up the goal node to the output, which completes the point attractor implementation.

Implementing M1 – the kinematics of operational space control

In REACH, we’ve modelled the primary motor cortex (M1) as responsible for taking in a desired hand movement (i.e. target_position - current_hand_position) and calculating a set of joint torques to carry that out. Explicitly, M1 generates the kinematics component of an OSC signal:

\textbf{u}_\textrm{M1} = \textbf{J}^T \textbf{M}_\textbf{x} (k_p (\textbf{x}^* - \textbf{x}))

In the paper I did this using several populations of neurons, one to calculate the Jacobian, and then an EnsembleArray to perform the multiplication for the dot product of each dimension separately. Since then I’ve had the chance to rework things and it’s now done entirely in one ensemble.

Now, the set up for the M1 model that computes the above function is to have a single ensemble of neurons that takes in the joint angles, \textbf{q}, and control signal \textbf{u}_\textbf{x} = k_p (\textbf{x}^* - \textbf{x}), and outputs the function above. Sounds pretty simple, right? Simple is good.

Let’s look at the code (where I’ve stripped out the comments and some debugging code):

def generate(arm, kp=1, operational_space=True,
             inertia_compensation=True, means=None, scales=None):

    dim = arm.DOF + 2

    means = np.zeros(dim) if means is None else means
    scales = np.ones(dim) if scales is None else scales
    scale_down, scale_up = generate_scaling_functions(
        np.asarray(means), np.asarray(scales))

    net = nengo.Network('M1')
    with net:
        # create / connect up M1 ------------------------------
        net.M1 = nengo.Ensemble(
            n_neurons=1000, dimensions=dim,
            radius=np.sqrt(dim),
            intercepts=AreaIntercepts(
                dimensions=dim,
                base=nengo.dists.Uniform(-1, .1)))

        # expecting input in form [q, x_des]
        net.input = nengo.Node(output=scale_down, size_in=dim)
        net.output = nengo.Node(size_in=arm.DOF)

        def M1_func(x, operational_space, inertia_compensation):
            """ calculate the kinematics of the OSC signal """
            x = scale_up(x)
            q = x[:arm.DOF]
            x_des = kp * x[arm.DOF:]

            # calculate hand (dx, dy)
            if operational_space:
                J = arm.J(q=q)

                if inertia_compensation:
                    # account for inertia
                    Mx = arm.Mx(q=q)
                    x_des = np.dot(Mx, x_des)
                # transform to joint torques
                u = np.dot(J.T, x_des)
            else:
                u = x_des

                if inertia_compensation:
                    # account for mass
                    M = arm.M(q=q)
                    u = np.dot(M, x_des)

            return u

        # send in system feedback and target information
        # don't account for synapses twice
        nengo.Connection(net.input, net.M1, synapse=None)
        nengo.Connection(
            net.M1, net.output,
            function=lambda x: M1_func(
                x, operational_space, inertia_compensation),
            synapse=None)

    return net

The ensemble of neurons itself is created with a few interesting parameters:

        net.M1 = nengo.Ensemble(
            n_neurons=1000, dimensions=dim,
            radius=np.sqrt(dim),
            intercepts=AreaIntercepts(
                dimensions=dim, base=nengo.dists.Uniform(-1, .1)))

Specifically, the radius and intercepts parameters.

Setting the intercepts

First we’ll discuss setting the intercepts using the AreaIntercepts distribution. The intercepts of a neuron determine how much of state space a neuron is active in, which we’ll refer to as the ‘active proportion’. If you don’t know what kind of functions you want to approximate with your neurons, then you having the active proportions for your ensemble chosen from a uniform distribution is a good starting point. This means that you’ll have roughly the same number of neurons active across all of state space as you do neurons that are active over half of state space as you do neurons that are active over very small portions of state space.

By default, Nengo sets the intercepts such that the distribution of active proportions is uniform for lower dimensional spaces. But when you start moving into higher dimensional spaces (into a hypersphere) the default method breaks down and you get mostly neurons that are either active for all of state space or almost none of state space. The AreaIntercepts class calculates how the intercepts should be set to achieve the desire active proportion inside a hypersphere. There are a lot more details here that you can read up on in this IPython notebook by Dr. Terrence C. Stewart.

What you need to know right now is that we’re setting the intercepts of the neurons such that the percentage of state space for which any given neuron is active is chosen from a uniform distribution between 0% and 55%. In other words, a neuron will maximally be active in 55% of state space, no more. This will let us model more nonlinear functions (such as the kinematics of the OSC signal) with fewer neurons. If this description is clear as mud, I really recommend checking out the IPython notebook linked above to get an intuition for what I’m talking about.

Scaling the input signal

The other parameter we set on the M1 ensemble is the radius. The radius scales the range of input values that the ensemble can represent, which is by default anything inside the unit hypersphere (i.e. hypersphere with radius=1). If the radius is left at this default value, the neural activity will saturate for vectors with magnitude greater than 1, leading to inaccurate vector representation and function approximation for input vectors with magnitude > 1. For lower dimensions this isn’t a terrible problem, but as the dimensions of the state space you’re representing grow it becomes more common for input vectors to have a norm greater than 1. Typically, we’d like to be able to, at a minimum, represent vectors with any number of dimensions where any element can be anywhere between -1 and 1. To do this, all we have to do is calculate the norm of the unit vector size dim, which is np.sqrt(dim) (the magnitude of a vector size dim with all elements set to one).

Now that we’re able to represent vectors where the input values are all between -1 and 1, the last part of this sub-network is scaling the input to be between -1 and 1. We use two scaling functions, scale_down and scale_up. The scale_down function subtracts a mean value and scales the input signal to be between -1 and 1. The scale_up function reverts the vector back to it’s original values so that calculations can be carried out normally on the decoding. In choosing mean and scaling values, there are two ways we can set these functions up:

  1. Set them generally, based on the upper and lower bounds of the input signal. For M1, the input is [\textbf{q}, \textbf{u}_\textbf{x}] where \textbf{u}_\textbf{x} is the control signal in end-effector space, we know that the joint angles are always in the range 0 to \pi (because that’s how the arm simulation is programmed), so we can set the means and scales to be \frac{\pi}{2} for \textbf{q}. For \textbf{u} a mean of zero is reasonable, and we can choose (arbitrarily, empirically, or analytically) the largest task space control signal we want to represent accurately.
  2. Or, if we know the model will be performing a specific task, we can look at the range of input values encountered during that task and set the means and scales terms appropriately. For the task of reaching in a straight line, the arm moves in a very limited state space and we can set the mean and we can tune these parameter to be very specific:
                                 means=[0.6, 2.2, 0, 0],
                                 scales=[.25, .25, .25, .25]
    

The benefit of the second method, although one can argue it’s parameter tuning and makes things less biologically plausible, is that it lets us run simulations with fewer neurons. The first method works for all of state space, given enough neurons, but seeing as we don’t always want to be simulating 100k+ neurons we’re using the second method here. By tuning the scaling functions more specifically we’re able to run our model using 1k neurons (and could probably get away with fewer). It’s important to keep in mind though that if the arm moves outside the expected range the control will become unstable.

Implementing CB – the dynamics of operational space control

The cerebellum (CB) sub-network has two components to it: dynamics compensation and dynamics adaptation. First we’ll discuss the dynamics compensation. By which I mean the k_v \textbf{M} (\textbf{q}^* - \textbf{q}) term from the OSC signal.

Much like the calculating the kinematics term of the OSC signal in M1, we can calculate the entire dynamics compensation term using a single ensemble with an appropriate radius, scaled inputs, and well chosen intercepts.

def generate(arm, kv=1, learning_rate=None, learned_weights=None,
             means=None, scales=None):
    dim = arm.DOF * 2

    means = np.zeros(dim) if means is None else means
    scales = np.ones(dim) if scales is None else scales
    scale_down, scale_up = generate_scaling_functions(
        np.asarray(means), np.asarray(scales))

    net = nengo.Network('CB')
    with net:
        # create / connect up CB --------------------------------
        net.CB = nengo.Ensemble(
            n_neurons=1000, dimensions=dim,
            radius=np.sqrt(dim),
            intercepts=AreaIntercepts(
                dimensions=dim,
                base=nengo.dists.Uniform(-1, .1)))
        # expecting input in form [q, dq, u]
        net.input = nengo.Node(output=scale_down,
                               size_in=dim+arm.DOF+2)
        cb_input = nengo.Node(size_in=dim, label='CB input')
        nengo.Connection(net.input[:dim], cb_input)
        # output is [-Mdq, u_adapt]
        net.output = nengo.Node(size_in=arm.DOF*2)

        def CB_func(x):
            """ calculate the dynamic component of OSC signal """
            x = scale_up(x)
            q = x[:arm.DOF]
            dq = x[arm.DOF:arm.DOF*2]

            # calculate inertia matrix
            M = arm.M(q=q)
            return -np.dot(M, kv * dq)
        # connect up the input and output
        nengo.Connection(net.input[:dim], net.CB)
        nengo.Connection(net.CB, net.output[:arm.DOF],
                         function=CB_func, synapse=None)

I don’t think there’s anything noteworthy going on here, most of the relevant details have already been discussed…so we’ll move on to the adaptation!

Implementing CB – non-linear dynamics adaptation

The final part of the model is the non-linear dynamics adaptation, modelled as a separate ensemble in the cerebellar sub-network (a separate ensemble so that it’s more modular, the learning connection could also come off of the other CB population). I work through the details and proof of the learning rule in the paper, so I’m not going to discuss that here. But I will restate the learning rule here:

\dot{\textbf{d}} = \textbf{L}_d \textbf{A} \otimes \textbf{u},

where \textbf{d} are the neural decoders, \textbf{L}_d is the learning rate, \textbf{A} is the neural activity of the ensemble, and \textbf{u} is the joint space control signal sent to the arm. This is a basic delta learning rule, where the decoders of the active neurons are modified to push the decoded function in a direction that reduces the error.

The adaptive ensemble can be initialized either using saved weights (passed in with the learned_weights paramater) or as all zeros. It is important to note that setting decoders to all zeros means that does not mean having zero neural activity, so learning will not be affected by this initialization.

        # dynamics adaptation------------------------------------
        if learning_rate is not None:
            net.CB_adapt = nengo.Ensemble(
                n_neurons=1000, dimensions=arm.DOF*2,
                radius=np.sqrt(arm.DOF*2),
                # enforce spiking neurons
                neuron_type=nengo.LIF(),
                intercepts=AreaIntercepts(
                    dimensions=arm.DOF,
                    base=nengo.dists.Uniform(-.5, .2)))

            net.learn_encoders = nengo.Connection(
                net.input[:arm.DOF*2], net.CB_adapt,)

            # if no saved weights were passed in start from zero
            weights = (
                learned_weights if learned_weights is not None
                else np.zeros((arm.DOF, net.CB_adapt.n_neurons)))
            net.learn_conn = nengo.Connection(
                # connect directly to arm so that adaptive signal
                # is not included in the training signal
                net.CB_adapt.neurons, net.output[arm.DOF:],
                transform=weights,
                learning_rule_type=nengo.PES(
                    learning_rate=learning_rate),
                synapse=None)

            nengo.Connection(net.input[dim:dim+2],
                             net.learn_conn.learning_rule,
                             transform=-1, synapse=.01)
    return net

We’re able to implement that learning rule using Nengo’s prescribed error-sensitivity (PES) learning on our connection from the adaptive ensemble to the output. With this set up the system will be able to learn to adapt to perturbations that are functions of the input (set here to be [\textbf{q}, \dot{\textbf{q}}]).

The intercepts in this population are set to values I found worked well for adapting to a few different forces, but it’s definitely a parameter to play with in your own scripts if you’re finding that there’s too much or not enough generalization of the decoded function output across the state space.

One other thing to mention is that we need to have a relay node to amalgamate the control signals output from M1 and the dynamics compensation ensemble in the CB. This signal is used to train the adaptive ensemble, and it’s important that the adaptive ensemble’s output is not included in the training signal, or else the system quickly goes off to positive or negative infinity.

Implementing S1 – a placeholder

The last sub-network in the REACH model is a placeholder for a primary sensory cortex (S1) model. This is just a set of ensembles that represents the feedback from the arm and relay it on to the rest of the model.

def generate(arm, direct_mode=False, means=None, scales=None):
    dim = arm.DOF*2 + 2  # represents [q, dq, hand_xy]

    means = np.zeros(dim) if means is None else means
    scales = np.ones(dim) if scales is None else scales
    scale_down, scale_up = generate_scaling_functions(
        np.asarray(means), np.asarray(scales))

    net = nengo.Network('S1')
    with net:
        # create / connect up S1 --------------------------------
        net.S1 = nengo.networks.EnsembleArray(
            n_neurons=50, n_ensembles=dim)

        # expecting input in form [q, x_des]
        net.input = nengo.Node(output=scale_down, size_in=dim)
        net.output = nengo.Node(
            lambda t, x: scale_up(x), size_in=dim)

        # send in system feedback and target information
        # don't account for synapses twice
        nengo.Connection(net.input, net.S1.input, synapse=None)
        nengo.Connection(net.S1.output, net.output, synapse=None)

    return net

Since there’s no function that we’re decoding off of the represented variables we can use separate ensembles to represent each dimension with an EnsembleArray. If we were going to decode some function of, for example, q0 and dq0, then we would need an ensemble that represents both variables. But since we’re just decoding out f(x) = x, using an EnsembleArray is a convenient way to decrease the number of neurons needed to accurately represent the input.

Creating a model using the framework

The REACH model has been set up to be as much of a plug and play system as possible. To generate a model you first create the M1, PMC, CB, and S1 networks, and then they’re all hooked up to each other using the framework.py file. Here’s an example script that controls the arm to trace a circle:

def generate():
    kp = 200
    kv = np.sqrt(kp) * 1.5

    center = np.array([0, 1.25])
    arm_sim = arm.Arm2Link(dt=1e-3)
    # set the initial position of the arm
    arm_sim.init_q = arm_sim.inv_kinematics(center)
    arm_sim.reset()

    net = nengo.Network(seed=0)
    with net:
        net.dim = arm_sim.DOF
        net.arm_node = arm_sim.create_nengo_node()
        net.error = nengo.Ensemble(1000, 2)
        net.xy = nengo.Node(size_in=2)

        # create an M1 model -------------------------------------
        net.M1 = M1.generate(arm_sim, kp=kp,
                             operational_space=True,
                             inertia_compensation=True,
                             means=[0.6, 2.2, 0, 0],
                             scales=[.5, .5, .25, .25])

        # create an S1 model -------------------------------------
        net.S1 = S1.generate(arm_sim,
                             means=[.6, 2.2, -.5, 0, 0, 1.25],
                             scales=[.5, .5, 1.7, 1.5, .75, .75])

        # subtract current position to get task space direction
        nengo.Connection(net.S1.output[net.dim*2:], net.error,
                         transform=-1)

        # create a trajectory for the hand to follow -------------
        x = np.linspace(0.0, 2.0*np.pi, 100)
        PMC_trajectory = np.vstack([np.cos(x) * .5,
                                    np.sin(x) * .5])
        PMC_trajectory += center[:, None]
        # create / connect up PMC --------------------------------
        net.PMC = PMC.generate(PMC_trajectory, speed=1)
        # send target for calculating control signal
        nengo.Connection(net.PMC.output, net.error)
        # send target (x,y) for plotting
        nengo.Connection(net.PMC.output, net.xy)

        net.CB = CB.generate(arm_sim, kv=kv,
                             means=[0.6, 2.2, -.5, 0],
                             scales=[.5, .5, 1.6, 1.5])

    model = framework.generate(net=net, probes_on=True)
    return model

In line 50 you can see the call to the framework code, which will hook up the most common connections that don’t vary between the different scripts.

The REACH model has assigned functionality to each area / sub-network, and you can see the expected input / output in the comments at the top of each sub-network file, but the implementations are open. You can create your own M1, PMC, CB, or S1 sub-networks and try them out in the context of a full model that generates high-level movement behaviour.

Running the model

To run the model you’ll need Nengo, Nengo GUI, and NengoLib all installed. You can then pull open Nengo GUI and load any of the a# scripts. In all of these scripts the S1 model is just an ensemble that represents the output from the arm_node. Here’s what each of the scripts does:

  1. a01 has a spiking M1 and CB, dynamics adaptation turned off. The model guides the arm in reaching in a straight line to a single target and back.
  2. a02 has a spiking M1, PMC, and CB, dynamics adaptation turned off. The PMC generates a path for the hand to follow that traces out a circle.
  3. a03 has a spiking M1, PMC, and CB, dynamics adaptation turned off. The PMC generates a path for the joints to follow, which moves the hand in a straight line to a target and back.
  4. a04 has a spiking M1 and CB, dynamics adaptation turned off. The model performs the centre-out reaching task, starting at a central point and reaching to 8 points around a circle.
  5. a05 has a spiking M1 and CB, and dynamics adaptation turned on. The model performs the centre-out reaching task, starting at a central point and reaching to 8 points around a circle. As the model reaches, a forcefield is applied based on the joint velocities that pushes the arm as it tries to reach the target. After 100-150 seconds of simulation the arm has adapted and learned to reach in a straight line again.

Here’s what it looks like when you pull open a02 in Nengo GUI:

REACH_a02

I’m not going to win any awards for arm animation, but! It’s still a useful visualization, and if anyone is proficient in javascript and want’s to improve it, please do! You can see the network architecture in the top left, the spikes generated by M1 and CB to the right of that, the arm in the bottom left, and the path traced out by the hand just to the right of that. On the top right you can see the a02 script code, and below that the Nengo console.

Conclusions

One of the most immediate extensions (aside from any sort of model of S1) that comes to mind here is implementing a more detailed cerebellar model, as there are several anatomically detailed models which have the same supervised learner functionality (for example (Yamazaki and Nagao, 2012)).

Hopefully people find this post and the code useful for their own work, or at least interesting! In the ideal scenario this would be a community project, where researchers add models of different brain areas and we end up with a large library of implementations to build larger models with in a Mr. Potato Head kind of fashion.

You can find all of the code up on my GitHub. And again, this code all should have been publicly available along with the publication. Hopefully the code proves useful! If you have any questions about it please don’t hesitate to make a comment here or contact me through email, and I’ll get back to you as soon as I can.

Tagged , , , , , ,

Matplotlib legends for mean and confidence interval plots

When plotting means and confidence intervals, sometimes the mean lines are hard to see and it’s nice to have included in your legend the color of the confidence interval shading. It turns out this is a bit of a chore in Matplotlib, but building off of their online examples you can get something that looks pretty alright:

mean_and_CI

So here’s code for getting the above plot, with an option for solid or dashed lines. The sky is the limit!

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from matplotlib.colors import colorConverter as cc
import numpy as np

def plot_mean_and_CI(mean, lb, ub, color_mean=None, color_shading=None):
    # plot the shaded range of the confidence intervals
    plt.fill_between(range(mean.shape[0]), ub, lb,
                     color=color_shading, alpha=.5)
    # plot the mean on top
    plt.plot(mean, color_mean)

# generate 3 sets of random means and confidence intervals to plot
mean0 = np.random.random(50)
ub0 = mean0 + np.random.random(50) + .5
lb0 = mean0 - np.random.random(50) - .5

mean1 = np.random.random(50) + 2
ub1 = mean1 + np.random.random(50) + .5
lb1 = mean1 - np.random.random(50) - .5

mean2 = np.random.random(50) -1
ub2 = mean2 + np.random.random(50) + .5
lb2 = mean2 - np.random.random(50) - .5

# plot the data
fig = plt.figure(1, figsize=(7, 2.5))
plot_mean_and_CI(mean0, ub0, lb0, color_mean='k', color_shading='k')
plot_mean_and_CI(mean1, ub1, lb1, color_mean='b', color_shading='b')
plot_mean_and_CI(mean2, ub2, lb2, color_mean='g--', color_shading='g')

class LegendObject(object):
    def __init__(self, facecolor='red', edgecolor='white', dashed=False):
        self.facecolor = facecolor
        self.edgecolor = edgecolor
        self.dashed = dashed

    def legend_artist(self, legend, orig_handle, fontsize, handlebox):
        x0, y0 = handlebox.xdescent, handlebox.ydescent
        width, height = handlebox.width, handlebox.height
        patch = mpatches.Rectangle(
            # create a rectangle that is filled with color
            [x0, y0], width, height, facecolor=self.facecolor,
            # and whose edges are the faded color
            edgecolor=self.edgecolor, lw=3)
        handlebox.add_artist(patch)

        # if we're creating the legend for a dashed line,
        # manually add the dash in to our rectangle
        if self.dashed:
            patch1 = mpatches.Rectangle(
                [x0 + 2*width/5, y0], width/5, height, facecolor=self.edgecolor,
                transform=handlebox.get_transform())
            handlebox.add_artist(patch1)

        return patch

bg = np.array([1, 1, 1])  # background of the legend is white
colors = ['black', 'blue', 'green']
# with alpha = .5, the faded color is the average of the background and color
colors_faded = [(np.array(cc.to_rgb(color)) + bg) / 2.0 for color in colors]

plt.legend([0, 1, 2], ['Data 0', 'Data 1', 'Data 2'],
           handler_map={
               0: LegendObject(colors[0], colors_faded[0]),
               1: LegendObject(colors[1], colors_faded[1]),
               2: LegendObject(colors[2], colors_faded[2], dashed=True),
            })

plt.title('Example mean and confidence interval plot')
plt.tight_layout()
plt.grid()
plt.show()

Side note, really enjoying the default formatting in Matplotlib 2+.

Tagged , , ,

Using VREP for simulation of force-controlled models

I’ve been playing around a bit with different simulators, and one that we’re a big fan of in the lab is VREP. It’s free for academics and you can talk to them about licences if you’re looking for commercial use. I haven’t actually had much experience with it before myself, so I decided to make a simple force controlled arm model to get experience using it. All in all, there were only a few weird API things that I had to get through, and once you have them down it’s pretty straight forward. This post is going to be about the steps that I needed to take to get things all set up. For a more general start-up on VREP check out All the code in this post and the model I use can be found up on my GitHub.

Getting the right files where you need them

As discussed in the remote API overview, you’ll need three files in whatever folder you want to run your Python script from to be able to hook into VREP remotely:

  • remoteApi.dll, remoteApi.dylib or remoteApi.so (depending on what OS you’re using)
  • vrep.py
  • vrepConstants.py

You can find these files inside your VREP_HOME/programming/remoteApiBindings/python/python and VREP_HOME/programming/remoteApiBindings/lib/lib folders. Make sure that these files are in whatever folder you’re running your Python scripts from.

The model

It’s easy to create a new model to mess around with in VREP, so that’s the route I went, rather than importing one of their pre-made models and having some sneaky parameter setting cause me a bunch of grief. You can just right click->add then go at it. There are a bunch of tutorials so I’m not going to go into detail here. The main things are:

  • Make sure joints are in ‘Torque/force’ mode.
  • Make sure that joint ‘Motor enabled’ property is checked. The motor enabled property is in the ‘Show dynamic properties dialogue’ menu, which you find when you double click on the joint in the Scene Hierarchy.
  • Know the names of the joints as shown in the Scene Hierarchy.

So here’s a picture:

VREP
where you can see the names of the objects in the model highlighted in red, the Torque/force selection highlighted in blue, and the Motor enabled option highlighted in green. And of course my beautiful arm model in the background.

Setting up the continuous server

The goal is to connect VREP to Python so that we can send torques to the arm from our Python script and get the feedback necessary to calculate those torques. There are a few ways to set up a remote connection in VREP.

The basic one is they have you add a child script in VREP and attach it to an object in the world that sets up a port and then you hit go on your simulation and can then run your Python script to connect in. This gets old real fast. Fortunately, it’s easy to automate everything from Python so that you can connect in, start the simulation, run it for however long, and then stop the simulation without having to click back and forth.

The first step is to make sure that your remoteApiConnections.txt file in you VREP home directory is set up properly. A continuous server should be set up by default, but doesn’t hurt to double check. And you can take the chance to turn on debugging, which can be pretty helpful. So open up that file and make sure it looks like this:

portIndex1_port                 = 19997
portIndex1_debug                = true
portIndex1_syncSimTrigger       = true

Once that’s set up, when VREP starts we can connect in from Python. In our Python script, first we’ll close any open connections that might be around, and then we’ll set up a new connection in:

import vrep 

# close any open connections
vrep.simxFinish(-1)
# Connect to the V-REP continuous server
clientID = vrep.simxStart('127.0.0.1', 19997, True, True, 500, 5) 

if clientID != -1: # if we connected successfully
    print ('Connected to remote API server')

So once the connection is made, we check the clientID value to make sure that it didn’t fail, and then we carry on with our script.

Synchronizing

By default, VREP will run its simulation in its own thread, and both the simulation and the controller using the remote API will be running simultaneously. This can lead to some weird behaviour as things fall out of synch etc etc, so what we want instead is for the VREP sim to only run one time step for each time step the controller runs. To do that, we need to set VREP to synchronous mode. So the next few lines of our Python script look like:

    # --------------------- Setup the simulation 

    vrep.simxSynchronous(clientID,True)

and then later, once we’ve calculated our control signal, sent it over, and want the simulation to run one time step forward, we do that by calling

    # move simulation ahead one time step
    vrep.simxSynchronousTrigger(clientID)

Get the handles to objects of interest

OK the next chunk of code in our script uses the names of our objects (as specified in the Scene Hierarchy in VREP) to get an ID for each which to identify which object we want to send a command to or receive feedback from:

    joint_names = ['shoulder', 'elbow']
    # joint target velocities discussed below
    joint_target_velocities = np.ones(len(joint_names)) * 10000.0

    # get the handles for each joint and set up streaming
    joint_handles = [vrep.simxGetObjectHandle(clientID,
        name, vrep.simx_opmode_blocking)[1] for name in joint_names]

    # get handle for target and set up streaming
    _, target_handle = vrep.simxGetObjectHandle(clientID,
                    'target', vrep.simx_opmode_blocking)

Set dt and start the simulation

And the final thing that we’re going to do in our simulation set up is specify the timestep that we want to use, and then start the simulation. I found this in a forum post, and I must say whatever VREP lacks in intuitive API their forum moderators are on the ball. NOTE: To use a custom time step you have to also set the dt option in the VREP simulator to ‘custom’. The drop down is to the left of the ‘play’ arrow, and if you don’t have it set to ‘custom’ you won’t be able to set the time step through Python.

    dt = .01
    vrep.simxSetFloatingParameter(clientID,
            vrep.sim_floatparam_simulation_time_step,
            dt, # specify a simulation time step
            vrep.simx_opmode_oneshot)

    # --------------------- Start the simulation

    # start our simulation in lockstep with our code
    vrep.simxStartSimulation(clientID,
            vrep.simx_opmode_blocking)

Main loop

For this next chunk I’m going to cut out everything that’s not VREP, since I have a bunch of posts explaining the control signal derivation and forward transformation matrices.

So, once we’ve started the simulation, I’ve set things up for the arm to be controlled for 1 second and then for the simulation to stop and everything shut down and disconnect.

    while count < 1: # run for 1 simulated second

        # get the (x,y,z) position of the target
        _, target_xyz = vrep.simxGetObjectPosition(clientID,
                target_handle,
                -1, # retrieve absolute, not relative, position
                vrep.simx_opmode_blocking)
        if _ !=0 : raise Exception()
        track_target.append(np.copy(target_xyz)) # store for plotting
        target_xyz = np.asarray(target_xyz)

        q = np.zeros(len(joint_handles))
        dq = np.zeros(len(joint_handles))
        for ii,joint_handle in enumerate(joint_handles):
            # get the joint angles
            _, q[ii] = vrep.simxGetJointPosition(clientID,
                    joint_handle,
                    vrep.simx_opmode_blocking)
            if _ !=0 : raise Exception()
            # get the joint velocity
            _, dq[ii] = vrep.simxGetObjectFloatParameter(clientID,
                    joint_handle,
                    2012, # ID for angular velocity of the joint
                    vrep.simx_opmode_blocking)
            if _ !=0 : raise Exception()

In the above chunk of code, I think the big thing to point out is that I’m using vrep.simx_opmode_blocking in each call, instead of vrep.simx_opmode_buffer. The difference is that you for sure get the current values of the simulation when you use blocking, and you can be behind a time step using buffer.

Aside from that the other notable things are I raise an exception if the first parameter (which is the return code) is ever not 0, and that I use simxGetObjectFloatParameter to get the joint velocity instead of simxGetObjectVelocity, which has a rotational velocity component. Zero is the return code for ‘everything worked’, and if you don’t check it and have some silly things going on you can be veeerrrrryyy mystified when things don’t work. And what simxGetObjectVelocity returns is the rotational velocity of the joint relative to the world frame, and not the angular velocity of the joint in its own coordinates. That was also a briefly confusing.

So the next thing I do is calculate u, which we’ll skip, and then we need to set the forces for the joint. This part of the API is real screwy. You can’t set the force applied to the joint directly. Instead, you have to set the target velocity of the joint to some really high value (hence that array we made before), and then modulate the maximum force that can be applied to that joint. Also important: When you want to apply a force in the other direction, you change the sign on the target velocity, but keep the force sign positive.

        # ... calculate u ... 

        for ii,joint_handle in enumerate(joint_handles):
            # get the current joint torque
            _, torque = \
                vrep.simxGetJointForce(clientID,
                        joint_handle,
                        vrep.simx_opmode_blocking)
            if _ !=0 : raise Exception()

            # if force has changed signs,
            # we need to change the target velocity sign
            if np.sign(torque) * np.sign(u[ii]) < 0:
                joint_target_velocities[ii] = \
                        joint_target_velocities[ii] * -1
                vrep.simxSetJointTargetVelocity(clientID,
                        joint_handle,
                        joint_target_velocities[ii], # target velocity
                        vrep.simx_opmode_blocking)
            if _ !=0 : raise Exception()

            # and now modulate the force
            vrep.simxSetJointForce(clientID,
                    joint_handle,
                    abs(u[ii]), # force to apply
                    vrep.simx_opmode_blocking)
            if _ !=0 : raise Exception()

        # move simulation ahead one time step
        vrep.simxSynchronousTrigger(clientID)
        count += dt

So as you can see we check the current torque, see if we need to change the sign on the target velocity, modulate the maximum allowed force, and then finally step the VREP simulation forward.

Conclusions

And there you go! Here’s an animation of it in action (note this is a super low quality gif and it looks way better / smoother when actually running it yourself):

reach_to_target

All in all, VREP has been enjoyable to work with so far. It didn’t take long to get things moving and off the ground, the visualization is great, and I haven’t even scratched the surface of what you can do with it. Best of all (so far) you can fully automate everything from Python. Hopefully this is enough to help some people get their own models going and save a few hours and headaches! Again, the full code and the model are up on my GitHub.

Nits

  • When you’re applying your control signal, make sure you test each joint in isolation, to make sure your torques push things in the direction you think they do. I had checked the rotation direction in VREP, but the control signal for both joints ended up needing to be multiplied by -1.
  • Another nit when you’re building your model, if you use the rotate button from the VREP toolbar on your model, wherever that joint rotates to is now 0 degrees. If you want to set the joint to start at 45 degrees, instead double click and change Pos[deg] option inside ‘Joint’ in Scene Object Properties.
Tagged , , , ,

Simultaneous perturbation vs finite differences for linear dynamics estimation and control signal optimization

Recently in my posts I’ve been using finite differences to approximate the gradient of loss functions and dynamical systems, with the intention of creating generalizable controllers that can be run on any system without having to calculate out derivatives beforehand. Finite differences is pretty much the most straight-forward way of approximating a gradient that there is: vary each parameter up and down (assuming we’re doing central differencing), one at a time, run it through your function and estimate the parameters effect on the system by calculating the difference between resulting function output. To do this requires 2 samples of the function for each parameter.

But there’s always more than one way to peel an avocado, and another approach that’s been used with much success is the Simultaneous Perturbation Stochastic Approximation (SPSA) algorithm, which was developed by Dr. James Spall (link to overview paper). SPSA is a method of gradient approximation, like finite differences, but, critically, the difference is that it varies all of the parameters at once, rather than one at a time. As a result, you can get an approximation of the gradient with far fewer samples from the system, and also when you don’t have explicit control over your samples (i.e. the ability to vary each parameter one at a time).

Gradient approximation

Given some function \textbf{f} dependent on some set of parameters \textbf{x}, we’re used to finding the gradient of \textbf{f}(\textbf{x}) using FDSA (finite differences stochastic approximation) written in this form:

\textbf{f}_x = \frac{\textbf{f}(\textbf{x} + \Delta \textbf{x}) - \textbf{f}(\textbf{x} - \Delta \textbf{x})}{2 \Delta \textbf{x}} = \Delta x^{-1} \frac{\textbf{f}(\textbf{x} + \Delta \textbf{x}) - \textbf{f}(\textbf{x} - \Delta \textbf{x})}{2},

where \Delta\textbf{x} is a perturbation to the parameter set, and the subscript on the left-hand side denotes the derivative of \textbf{f}(\textbf{x}) with respect to \textbf{x}.

And that’s how we’ve calculated it before, estimating the gradient of a single parameter at a time. But, we can rewrite this for a set perturbations \Delta \textbf{X} = [\Delta\textbf{x}_0, ... , \Delta\textbf{x}_N]^T:

\textbf{f}_\textbf{x} = \Delta \textbf{X}^{-1} \textbf{F},

where

\textbf{F} = [\frac{\textbf{f}(\textbf{x} + \Delta \textbf{x}_0) - \textbf{f}(\textbf{x} - \Delta \textbf{x}_0)}{2}, ... , \frac{\textbf{f}(\textbf{x} + \Delta \textbf{x}_0) - \textbf{f}(\textbf{x}_N - \Delta \textbf{x}_N)}{2}]^T,

which works as long as \Delta\textbf{X} is square. When it’s not square (i.e. we have don’t have the same number of samples as we have parameters), we run into problems, because we can’t calculate \Delta\textbf{X}^{-1} directly. To address this, let’s take a step back and then work forward again to get a more general form that works for non-square \Delta \textbf{X} too.

By rewriting the above, and getting rid of the inverse by moving \Delta\textbf{x} back to the other side, we have:

\Delta\textbf{X} \; \textbf{f}_\textbf{x} = \textbf{F}

Now, the standard trick to move a matrix that’s not square is to just make it square by multiplying it by its transpose to get a square matrix, and then the whole thing by the inverse:

\Delta\textbf{X}^T \Delta\textbf{X} \; \textbf{f}_\textbf{x} = \Delta\textbf{X}^T \textbf{F}

(\Delta\textbf{X}^T \Delta\textbf{X})^{-1} (\Delta\textbf{X}^T \Delta\textbf{X}) \textbf{f}_\textbf{x} = (\Delta\textbf{X}^T \Delta\textbf{X})^{-1} \Delta\textbf{X}^T \textbf{F}

\textbf{f}_\textbf{x} = (\Delta\textbf{X}^T \Delta\textbf{X})^{-1} \Delta\textbf{X}^T \textbf{F}

Alright! Now we’re comfortable with this characterization of gradient approximation using a form that works with non-square perturbation matrices.

Again, in FDSA, we only vary one parameter at a time. This means that there will only ever be one non-zero entry per row of \Delta \textbf{X}. By contrast, in SPSA, we vary multiple parameters, and so rows of \Delta\textbf{X} will be just chalk full of non-zero entries.

Gradient approximation to estimate \textbf{f}_\textbf{x} and \textbf{f}_\textbf{u} for LQR control

This paper, by Drs. Jan Peters and Stepfan Schaal, is where I first stumbled across the above formulation of gradient approximation and read about SPSA (side note: I really recommend reading instead the Scholarpedia article on policy gradients, because it has fixes to a couple notation mistakes from the paper). Reading about this I thought, oh interesting, an alternative to FDSA for gradient approximation, let’s see how it well it does replacing FDSA in a linear quadratic regulator (LQR).

Implementing this was pretty simple. Just had to modify the calc_derivs function, which I use to estimate the derivative of the arm with respect to the state and control signal, in my LQR controller code by changing from standard finite differences to simultaneous perturbation:

def calc_derivs(self, x, u):
    """" calculate gradient of plant dynamics using Simultaneous
    Perturbation Stochastic Approximation (SPSA). Implemented
    based on (Peters & Schaal, 2008).

    x np.array: the state of the system
    u np.array: the control signal
    """
    # Initialization and coefficient selection
    num_iters = 20

    eps = 1e-4
    delta_K = None
    delta_J = None
    for ii in range(num_iters):
        # Generation of simultaneous perturbation vector
        # choose each component from a Bernoulli +-1 distribution
        # with probability of .5 for each +-1 outcome.
        delta_k = np.random.choice([-1,1],
                                   size=len(x) + len(u),
                                   p=[.5, .5])
        # Function evaluations
        inc_x = np.copy(x) + eps * delta_k[:len(x)]
        inc_u = np.copy(u) + eps * delta_k[len(x):]
        state_inc = self.plant_dynamics(inc_x, inc_u)
        dec_x = np.copy(x) - eps * delta_k[:len(x)]
        dec_u = np.copy(u) - eps * delta_k[len(x):]
        state_dec = self.plant_dynamics(dec_x, dec_u)

        delta_j = ((state_inc - state_dec) /
                        (2.0 * eps)).reshape(-1)

        # Track delta_k and delta_j
        delta_K = delta_k if delta_K is None else \
            np.vstack([delta_K, delta_k])
        delta_J =  delta_j if delta_J is None else \
            np.vstack([delta_J, delta_j])

    f_xu = np.dot(np.linalg.pinv(np.dot(delta_K.T, delta_K)),
            np.dot(delta_K.T, delta_J))
    f_x = f_xu[:len(x)]
    f_u = f_xu[len(x):]

    return f_x.T , f_u.T

A couple notes about the above code. First, you’ll notice that the f_x and f_b matrices are both calculated at the same time. That’s pretty slick! And that calculation for f_xu is just a straight implementation of the matrix form of gradient approximation, where I’ve arranged things so that f_x is in the top part and f_u is in the lower part.

The second thing is that the perturbation vector delta_k is generated from a Bernoulli distribution. The reason behind this is that we want to have a bunch of different samples that pretty reasonably spread the state space and move all the parameters independently. Making each perturbation some distance times -1 or 1 is an easy way to achieve this.

Thirdly, there’s the num_iters variable. This is a very important variable, as it dictates how many random samples of our system we take before we estimate the gradient. I’ve found that to get this to work for both the 2-link arm and the more complex 3-link arm, it needs to be at least 20. Or else things explode and die horribly. Just…horribly.

OK let’s look at the results:


The first thing to notice is that I’ve finally discovered the Seaborn plotting package. The second is that SPSA does as well as FDSA.

You may ask: Is there any difference? Well, if we time these functions, on my lil’ laptop, for the 2-link arm it takes SPSA approximately 2.0ms, but it takes FDSA only 0.8ms. So for the same performance the SPSA is taking almost 3 times as long to run. Why? This boils down to how many times the system dynamics need to be sampled by each algorithm to get a good approximation of the gradient. For a 2-link arm, FDSA has 6 parameters (\textbf{q}, \dot{\textbf{q}}, and \textbf{u}) that it needs to sample twice (we’re doing central differencing), for a total of 12 samples. And as I mentioned above, the SPSA algorithm needs 20 samples to be stable.

For the 3-link arm, SPSA took about 3.1ms on average and FDSA (which must now perform 18 samples of the dynamics) still only 2.1ms. So number of samples isn’t the only cause of time difference between these two algorithms. SPSA needs to perform that a few more matrix operations, including a matrix inverse, which is expensive, while FDSA can calculate the gradient of each parameter individually, which is much less expensive.

OK so SPSA not really impressive here. BUT! As I discovered, there are other means of employing SPSA.

Gradient approximation to optimize the control signal directly

In the previous set up we were using SPSA to estimate the gradient of the system under control, and then we used that gradient to calculate a control signal that minimized the loss function (as specified inside the LQR). This is one way to use gradient approximation methods. Another way to use these methods is approximate the gradient of the loss function directly, and use that information to iteratively calculate a control signal that minimizes the loss function. This second application is the primary use of the SPSA algorithm, and is what’s described by Dr. Spall in his overview paper.

In this application, the algorithm works like this:

  1. start with initial input to system
  2. perturb input and simulate results
  3. observe loss function and calculate gradient
  4. update input to system
  5. repeat to convergence

Because in this approach we’re iteratively optimizing the input using our gradient estimation, having a noisy estimate is no longer a death sentence, as it was in the LQR. If we update our input to the system with several noisy gradient estimates the noise will essentially just cancel itself out. This means that SPSA now has a powerful advantage over FDSA: Since in SPSA we vary all parameters at once, only 2 samples of the loss function are used to estimate the gradient, regardless of the number of parameters. In contrast, FDSA needs to sample the loss function twice for every input parameter. Here’s a picture from (Spall, 1998) that shows the two running against each other to optimize a 2D problem:

2DplotFDSAvsSPSA
This gets across that even though SPSA bounces around more, they both reach the solution in the same number of steps. And, in general, this is the case, as Dr. Spall talks about in the paper. There’s also a couple more details of the algorithm, so let’s look at it in detail. Here’s the code, which is just a straight translation into Python out of the description in Dr. Spall’s paper:

# Step 1: Initialization and coefficient selection
max_iters = 5
converge_thresh = 1e-5

alpha = 0.602 # from (Spall, 1998)
gamma = 0.101
a = .101 # found empirically using HyperOpt
A = .193
c = .0277

delta_K = None
delta_J = None
u = np.copy(self.u) if self.u is not None \
        else np.zeros(self.arm.DOF)
for k in range(max_iters):
    ak = a / (A + k + 1)**alpha
    ck = c / (k + 1)**gamma

    # Step 2: Generation of simultaneous perturbation vector
    # choose each component from a bernoulli +-1 distribution with
    # probability of .5 for each +-1 outcome.
    delta_k = np.random.choice([-1,1], size=arm.DOF, p=[.5, .5])

    # Step 3: Function evaluations
    inc_u = np.copy(u) + ck * delta_k
    cost_inc = self.cost(np.copy(state), inc_u)
    dec_u = np.copy(u) - ck * delta_k
    cost_dec = self.cost(np.copy(state), dec_u)

    # Step 4: Gradient approximation
    gk = np.dot((cost_inc - cost_dec) / (2.0*ck), delta_k)

    # Step 5: Update u estimate
    old_u = np.copy(u)
    u -= ak * gk

    # Step 6: Check for convergence
    if np.sum(abs(u - old_u)) < converge_thresh:
        break

The main as-of-yet-unexplained parts of this code are the alpha, gamma, a, A, and c variables. What’s their deal?

Looking inside the loop, we can see that ck controls the magnitude of our perturbations. Looking a little further down, ak is just the learning rate. And all of those other parameters are just involved in shaping the trajectories that ak and ck follow through iterations, which is a path towards zero. So the first steps and perturbations are the biggest, and each successively becomes smaller as the iteration count increases.

There are a few heuristics that Dr. Spall goes over, but there aren’t any hard and fast rules for setting a, A, and c. Here, I just used HyperOpt to find some values that worked pretty well for this particular problem.

The FDSA version of this is also very straight-forward:

# Step 1: Initialization and coefficient selection
max_iters = 10
converge_thresh = 1e-5

eps = 1e-4
u = np.copy(self.u) if self.u is not None \
        else np.zeros(self.arm.DOF)
for k in range(max_iters):

    gk = np.zeros(u.shape)
    for ii in range(gk.shape[0]):
        # Step 2: Generate perturbations one parameter at a time
        inc_u = np.copy(u)
        inc_u[ii] += eps
        dec_u = np.copy(u)
        dec_u -= eps

        # Step 3: Function evaluation
        cost_inc = self.cost(np.copy(state), inc_u)
        cost_dec = self.cost(np.copy(state), dec_u)

        # Step 4: Gradient approximation
        gk[ii] = (cost_inc - cost_dec) / (2.0 * eps)

    old_u = np.copy(u)
    # Step 5: Update u estimate
    u -= 1e-5 * gk

    # Step 6: Check for convergence
    if np.sum(abs(u - old_u)) < converge_thresh:
        break

You’ll notice that in both the SPSA and FDSA code we’re no longer sampling plant_dynamics, we’re instead sampling cost, a loss function I defined. From just my experience playing around with these algorithms a bit, getting the loss function to be appropriate and give the desired behaviour is definitely a bit of an art. It feels like much more of an art than in other controllers I’ve coded, but that could just be me.

The cost function that I’m using is pretty much the first thing you’d think of. It penalizes distance to target and having non-zero velocity. Getting the weighting between distance to target and velocity set up so that the arm moves to the target but also doesn’t overshoot definitely took a bit of trial and error, er, I mean empirical analysis. Here’s the cost function that I found worked pretty well, note that I had to make special cases for the different arms:

 def cost(self, x, u): 
        dt = .1 if self.arm.DOF == 3 else .01
        next_x = self.plant_dynamics(x, u, dt=dt)
        vel_gain = 100 if self.arm.DOF == 3 else 10
        return (np.sqrt(np.sum((self.arm.x - self.target)**2)) * 1000 \
                + np.sum((next_x[self.arm.DOF:])**2) * vel_gain)

So that’s all the code, let’s look at the results!


For these results, I used a max of 10 iterations for optimizing the control signal. I was definitely surprised by the quality of the results, especially for the 3-link arm, compared to the results generated by a standard LQR controller. Although I need to note, again, that it was a fair bit of me playing around with the exact cost function to get these results. Lots of empirical analysis.

The two controllers generate results that are identical to visual inspection. However, especially in the 3-link arm, the time required to run the FDSA was significantly longer than the SPSA controller. It took approximately 140ms for the SPSA controller to run a single loop, but took FDSA on average 700ms for a single loop of calculating the control signal. Almost 5 times as long! For the same results! In directly optimizing the control signal, SPSA gets a big win over standard FDSA. So, if you’re looking to directly optimize over a loss function, SPSA is probably the way you want to go.

Conclusions

First off, I thought it was really neat to directly apply gradient approximation methods to optimizing the control signal. It’s something I haven’t tried before, but definitely makes sense, and can generate some really nice results when tuned properly. Automating the tuning is definitely I’ll be discussing in future posts, because doing it by hand takes a long time and is annoying.

In the LQR, the gradient approximation was best done by the FDSA. I think the main reasons for this is that in solving for the control signal the LQR algorithm uses matrix inverses, and any errors in the linear approximations to the dynamics are going to be amplified quite a bit. If I did anything less than 10-15 iterations (20 for the 3-link arm) in the SPSA approximation then things exploded. Also, here the SPSA algorithm required a matrix inverse, where the FDSA didn’t. This is because we only varied one parameter at a time in FDSA, and the effects of changing each was isolated. In the SPSA case, we had to consider the changes across all the variables and the resulting effects all at once, essentially noting which variables changed by how much and the changes in each case, and averaging. Here, even with the more complex 3-link arm, FDSA was faster, so I’m going to stick with it in my LQR and iLQR implementations.

In the direct control signal optimization SPSA beat the pants off of FDSA. It was almost 5 times faster for control of the 3-link arm. This was, again, because in this case we could use noisy samples of the gradient of the loss function and relied on noise to cancel itself out as we iterated. So we only needed 2 samples of the loss function in SPSA, where in FDSA we needed 2*num_parameters. And although this generated pretty good results I would definitely be hesitant against using this for any more complicated systems, because tuning that cost function to get out a good trajectory was a pain. If you’re interested in playing around with this, you can check out the code for the gradient controllers up on my GitHub.

Tagged , , , , , ,

The iterative Linear Quadratic Regulator algorithm

A few months ago I posted on Linear Quadratic Regulators (LQRs) for control of non-linear systems using finite-differences. The gist of it was at every time step linearize the dynamics, quadratize (it could be a word) the cost function around the current point in state space and compute your feedback gain off of that, as though the dynamics were both linear and consistent (i.e. didn’t change in different states). And that was pretty cool because you didn’t need all the equations of motion and inertia matrices etc to generate a control signal. You could just use the simulation you had, sample it a bunch to estimate the dynamics and value function, and go off of that.

The LQR, however, operates with maverick disregard for changes in the future. Careless of the consequences, it optimizes assuming the linear dynamics approximated at the current time step hold for all time. It would be really great to have an algorithm that was able to plan out and optimize a sequence, mindful of the changing dynamics of the system.

This is exactly the iterative Linear Quadratic Regulator method (iLQR) was designed for. iLQR is an extension of LQR control, and the idea here is basically to optimize a whole control sequence rather than just the control signal for the current point in time. The basic flow of the algorithm is:

  1. Initialize with initial state x_0 and initial control sequence \textbf{U} = [u_{t_0}, u_{t_1}, ..., u_{t_{N-1}}].
  2. Do a forward pass, i.e. simulate the system using (x_0, \textbf{U}) to get the trajectory through state space, \textbf{X}, that results from applying the control sequence \textbf{U} starting in x_0.
  3. Do a backward pass, estimate the value function and dynamics for each (\textbf{x}, \textbf{u}) in the state-space and control signal trajectories.
  4. Calculate an updated control signal \hat{\textbf{U}} and evaluate cost of trajectory resulting from (x_0, \hat{\textbf{U}}).
    1. If |(\textrm{cost}(x_0, \hat{\textbf{U}}) - \textrm{cost}(x_0, \textbf{U})| < \textrm{threshold} then we've converged and exit.
    2. If \textrm{cost}(x_0, \hat{\textbf{U}}) < \textrm{cost}(x_0, \textbf{U}), then set \textbf{U} = \hat{\textbf{U}}, and change the update size to be more aggressive. Go back to step 2.
    3. If \textrm{cost}(x_0, \hat{\textbf{U}}) \geq \textrm{cost}(x_0, \textbf{U}) change the update size to be more modest. Go back to step 3.

There are a bunch of descriptions of iLQR, and it also goes by names like ‘the sequential linear quadratic algorithm’. The paper that I’m going to be working off of is by Yuval Tassa out of Emo Todorov’s lab, called Control-limited differential dynamic programming. And the Python implementation of this can be found up on my github in my Control repo. Also, a big thank you to Dr. Emo Todorov who provided Matlab code for the iLQG algorithm, which was super helpful.

Defining things

So let’s dive in. Formally defining things, we have our system \textbf{x}, and dynamics described with the function \textbf{f}, such that

\textbf{x}_{t+1} = \textbf{f}(\textbf{x}_t, \textbf{u}_t),

where \textbf{u} is the input control signal. The trajectory \{\textbf{X}, \textbf{U}\} is the sequence of states \textbf{X} = \{\textbf{x}_0, \textbf{x}_1, ..., \textbf{x}_N\} that result from applying the control sequence \textbf{U} = \{\textbf{u}_0, \textbf{u}_1, ..., \textbf{u}_{N-1}\} starting in the initial state \textbf{x}_0.

Now we need to define all of our cost related equations, so we know exactly what we’re dealing with.

Define the total cost function J, which is the sum of the immediate cost, \ell, from each state in the trajectory plus the final cost, \ell_f:

J(\textbf{x}_0, \textbf{U}) = \sum\limits^{N-1}\limits_{t=0} \ell(\textbf{x}_t, \textbf{u}_t) + \ell_f(\textbf{x}_N).

Letting \textbf{U}_t = \{\textbf{u}_t, \textbf{u}_{t+1}, ..., \textbf{U}_{N-1}\}, we define the cost-to-go as the sum of costs from time t to N:

J_t(\textbf{x}, \textbf{U}_t) = \sum\limits^{N-1}\limits_{i=t} \ell(\textbf{x}_i, \textbf{u}_i) + \ell_f(\textbf{x}_N).

The value function V at time t is the optimal cost-to-go from a given state:

V_t(\textbf{x}) = \min\limits_{\textbf{U}_t} J_t(\textbf{x}, \textbf{U}_t),

where the above equation just says that the optimal cost-to-go is found by using the control sequence \textbf{U}_t that minimizes J_t.

At the final time step, N, the value function is simply

V(\textbf{x}_N) = \ell_f(\textbf{x}_N).

For all preceding time steps, we can write the value function as a function of the immediate cost \ell(\textbf{x}, \textbf{u}) and the value function at the next time step:

V(\textbf{x}) = \min\limits_{\textbf{u}} \left[ \ell(\textbf{x}, \textbf{u}) + V(\textbf{f}(\textbf{x}, \textbf{u})) \right].

NOTE: In the paper, they use the notation V'(\textbf{f}(\textbf{x}, \textbf{u})) to denote the value function at the next time step, which is redundant since \textbf{x}_{t+1} = \textbf{f}(\textbf{x}_t, \textbf{u}_t), but it comes in handy later when they drop the dependencies to simplify notation. So, heads up: V' = V(\textbf{f}(\textbf{x}, \textbf{u}).

Forward rollout

The forward rollout consists of two parts. The first part is to simulating things to generate the (\textbf{X}, \textbf{U}), from which we can calculate the overall cost of the trajectory, and find out the path that the arm will take. To improve things though we’ll need a lot of information about the partial derivatives of the system, calculating these is the second part of the forward rollout phase.

To calculate all these partial derivatives we’ll use (\textbf{X}, \textbf{U}). For each (\textbf{x}_t, \textbf{u}_t) we’ll calculate the derivatives of \textbf{f}(\textbf{x}_t, \textbf{u}_t) with respect to \textbf{x}_t and \textbf{u}_t, which will give us what we need for our linear approximation of the system dynamics.

To get the information we need about the value function, we’ll need the first and second derivatives of \ell(\textbf{x}_t, \textbf{u}_t) and \ell_f(\textbf{x}_t, \textbf{x}_t) with respect to \textbf{x}_t and \textbf{u}_t.

So all in all, we need to calculate \textbf{f}_\textbf{x}, \textbf{f}_\textbf{u}, \ell_\textbf{x}, \ell_\textbf{u}, \ell_\textbf{xx}, \ell_\textbf{ux}, \ell_\textbf{uu}, where the subscripts denote a partial derivative, so \ell_\textbf{x} is the partial derivative of \ell with respect to \textbf{x}, \ell_\textbf{xx} is the second derivative of \ell with respect to \textbf{x}, etc. And to calculate all of these partial derivatives, we’re going to use finite differences! Just like in the LQR with finite differences post. Long story short, load up the simulation for every time step, slightly vary one of the parameters, and measure the resulting change.

Once we have all of these, we’re ready to move on to the backward pass.

Backward pass

Now, we started out with an initial trajectory, but that was just a guess. We want our algorithm to take it and then converge to a local minimum. To do this, we’re going to add some perturbing values and use them to minimize the value function. Specifically, we’re going to compute a local solution to our value function using a quadratic Taylor expansion. So let’s define Q(\delta \textbf{x}, \delta \textbf{u}) to be the change in our value function at (\textbf{x}, \textbf{u}) as a result of small perturbations (\delta \textbf{x}, \delta \textbf{u}):

Q(\delta \textbf{x}, \delta \textbf{u}) = \ell (\textbf{x} + \delta \textbf{x}, \textbf{u} + \delta \textbf{u}) + V(\textbf{f}(\textbf{x} + \delta\textbf{x}, \textbf{u} + \delta \textbf{u})).

The second-order expansion of Q is given by:

Q_\textbf{x} = \ell_\textbf{x} + \textbf{f}_\textbf{x}^T V'_\textbf{x},

Q_\textbf{u} = \ell_\textbf{u} + \textbf{f}_\textbf{u}^T V'_\textbf{x},

Q_\textbf{xx} = \ell_\textbf{xx} + \textbf{f}_\textbf{x}^T V'_\textbf{xx} \textbf{f}_\textbf{x} + V'_\textbf{x} \cdot \textbf{f}_\textbf{xx},

Q_\textbf{ux} = \ell_\textbf{ux} + \textbf{f}_\textbf{u}^T V'_\textbf{xx} \textbf{f}_\textbf{x}+ V'_\textbf{x} \cdot \textbf{f}_\textbf{ux},

Q_\textbf{uu} = \ell_\textbf{uu} + \textbf{f}_\textbf{u}^T V'_\textbf{xx} \textbf{f}_\textbf{u}+ V'_\textbf{x} \cdot \textbf{f}_\textbf{uu}.

Remember that V' = V(\textbf{f}(\textbf{x}, \textbf{u})), which is the value function at the next time step. NOTE: All of the second derivatives of \textbf{f} are zero in the systems we’re controlling here, so when we calculate the second derivatives we don’t need to worry about doing any tensor math, yay!

Given the second-order expansion of Q, we can to compute the optimal modification to the control signal, \delta \textbf{u}^*. This control signal update has two parts, a feedforward term, \textbf{k}, and a feedback term \textbf{K} \delta\textbf{x}. The optimal update is the \delta\textbf{u} that minimizes the cost of Q:

\delta\textbf{u}^*(\delta \textbf{x}) = \min\limits_{\delta\textbf{u}}Q(\delta\textbf{x}, \delta\textbf{u}) = \textbf{k} + \textbf{K}\delta\textbf{x},

where \textbf{k} = -Q^{-1}_\textbf{uu} Q_\textbf{u} and \textbf{K} = -Q^{-1}_\textbf{uu} Q_\textbf{ux}.

Derivation can be found in this earlier paper by Li and Todorov. By then substituting this policy into the expansion of Q we get a quadratic model of V. They do some mathamagics and come out with:

V_\textbf{x} = Q_\textbf{x} - \textbf{K}^T Q_\textbf{uu} \textbf{k},

V_\textbf{xx} = Q_\textbf{xx} - \textbf{K}^T Q_\textbf{uu} \textbf{K}.

So now we have all of the terms that we need, and they’re defined in terms of the values at the next time step. We know the value of the value function at the final time step V_N = \ell_f(\textbf{x}_N), and so we’ll simply plug this value in and work backwards in time recursively computing the partial derivatives of Q and V.

Calculate control signal update

Once those are all calculated, we can calculate the gain matrices, \textbf{k} and \textbf{K}, for our control signal update. Huzzah! Now all that’s left to do is evaluate this new trajectory. So we set up our system

\hat{\textbf{x}}_0 = \textbf{x}_0,

\hat{\textbf{u}}_t = \textbf{u}_t + \textbf{k}_t + \textbf{K}_t (\hat{\textbf{x}}_t - \textbf{x}_t),

\hat{\textbf{x}}_{t+1} = \textbf{f}(\hat{\textbf{x}}_t, \hat{\textbf{u}}_t),

and record the cost. Now if the cost of the new trajectory (\hat{\textbf{X}}, \hat{\textbf{U}}) is less than the cost of (\textbf{X}, \textbf{U}) then we set \textbf{U} = \hat{\textbf{U}} and go do it all again! And when the cost from an update becomes less than a threshold value, call it done. In code this looks like:

if costnew < cost:
  sim_new_trajectory = True

  if (abs(costnew - cost)/cost) < self.converge_thresh:
    break

Of course, another option we need to account for is when costnew > cost. What do we do in this case? Our control update hasn’t worked, do we just exit?

The Levenberg-Marquardt heuristic
No! Phew.

The control signal update in iLQR is calculated in such a way that it can behave like Gauss-Newton optimization (which uses second-order derivative information) or like gradient descent (which only uses first-order derivative information). The is that if the updates are going well, then lets include curvature information in our update to help optimize things faster. If the updates aren’t going well let’s dial back towards gradient descent, stick to first-order derivative information and use smaller steps. This wizardry is known as the Levenberg-Marquardt heuristic. So how does it work?

Something we skimmed over in the iLQR description was that we need to calculate Q^{-1}_\textbf{uu} to get the \textbf{k} and \textbf{K} matrices. Instead of using np.linalg.pinv or somesuch, we’re going to calculate the inverse ourselves after finding the eigenvalues and eigenvectors, so that we can regularize it. This will let us do a couple of things. First, we’ll be able to make sure that our estimate of curvature (Q_\textbf{uu}^{-1}) stays positive definite, which is important to make sure that we always have a descent direction. Second, we’re going to add a regularization term to the eigenvalues to prevent them from exploding when we take their inverse. Here’s our regularization implemented in Python:

 
Q_uu_evals, Q_uu_evecs = np.linalg.eig(Q_uu)
Q_uu_evals[Q_uu_evals < 0] = 0.0
Q_uu_evals += lamb
Q_uu_inv = np.dot(Q_uu_evecs,
    np.dot(np.diag(1.0/Q_uu_evals), Q_uu_evecs.T))

Now, what happens when we change lamb? The eigenvalues represent the magnitude of each of the eigenvectors, and by taking their reciprocal we flip the contributions of the vectors. So the ones that were contributing the least now have the largest singular values, and the ones that contributed the most now have the smallest eigenvalues. By adding a regularization term we ensure that the inverted eigenvalues can never be larger than 1/lamb. So essentially we throw out information.

In the case where we’ve got a really good approximation of the system dynamics and value function, we don’t want to do this. We want to use all of the information available because it’s accurate, so make lamb small and get a more accurate inverse. In the case where we have a bad approximation of the dynamics we want to be more conservative, which means not having those large singular values. Smaller singular values give a smaller Q_\textbf{uu}^{-1} estimate, which then gives smaller gain matrices and control signal update, which is what we want to do when our control signal updates are going poorly.

How do you know if they’re going poorly or not, you now surely ask! Clever as always, we’re going to use the result of the previous iteration to update lamb. So adding to the code from just above, the end of our control update loop is going to look like:

lamb = 1.0 # initial value of lambda
...
if costnew < cost:
  lamb /= self.lamb_factor
  sim_new_trajectory = True

  if (abs(costnew - cost)/cost) < self.converge_thresh:
    break
else:
  lamb *= self.lamb_factor
  if lamb > self.max_lamb:
    break

And that is pretty much everything! OK let’s see how this runs!

Simulation results

If you want to run this and see for yourself, you can go copy my Control repo, navigate to the main directory, and run

python run.py arm2 reach

or substitute in arm3. If you’re having trouble getting the arm2 simulation to run, try arm2_python, which is a straight Python implementation of the arm dynamics, and should work no sweat for Windows and Mac.

Below you can see results from the iLQR controller controlling the 2 and 3 link arms (click on the figures to see full sized versions, they got distorted a bit in the shrinking to fit on the page), using immediate and final state cost functions defined as:

l = np.sum(u**2)

and

pos_err = np.array([self.arm.x[0] - self.target[0], 
                    self.arm.x[1] - self.target[1]])
l = (wp * np.sum(pos_err**2) + # pos error
    wv * np.sum(x[self.arm.DOF:self.arm.DOF*2]**2)) # vel error

where wp and wv are just gain values, x is the state of the system, and self.arm.x is the (x,y) position of the hand. These read as “during movement, penalize large control signals, and at the final state, have a big penalty on not being at the target.”


So let’s give it up for iLQR, this is awesome! How much of a crazy improvement is that over LQR? And with all knowledge of the system through finite differences, and with the full movements in exactly 1 second! (Note: The simulation speeds look different because of my editing to keep the gif sizes small, they both take the same amount of time for each movement.)

Changing cost functions
Something that you may notice is that the control of the 3 link is actually straighter than the 2 link. I thought that this might be just an issue with the gain values, since the scale of movement is smaller for the 2 link arm than the 3 link there might have been less of a penalty for not moving in a straight line, BUT this was wrong. You can crank the gains and still get the same movement. The actual reason is that this is what the cost function specifies, if you look in the code, only \ell_f(\textbf{x}_N) penalizes the distance from the target, and the cost function during movement is strictly to minimize the control signal, i.e. \ell(\textbf{x}_t, \textbf{u}_t) = \textbf{u}_t^2.

Well that’s a lot of talk, you say, like the incorrigible antagonist we both know you to be, prove it. Alright, fine! Here’s iLQR running with an updated cost function that includes the end-effector’s distance from the target in the immediate cost:

2linklxcost
All that I had to do to get this was change the immediate cost from

l = np.sum(u**2)

to

l = np.sum(u**2)
pos_err = np.array([self.arm.x[0] - self.target[0], 
                    self.arm.x[1] - self.target[1]])
l += (wp * np.sum(pos_err**2) + # pos error
    wv * np.sum(x[self.arm.DOF:self.arm.DOF*2]**2)) # vel error

where all I had to do was include the position penalty term from the final state cost into the immediate state cost.

Changing sequence length
In these simulations the system is simulating at .01 time step, and I gave it 100 time steps to reach the target. What if I give it only 50 time steps?


It looks pretty much the same! It’s just now twice as fast, which is of course achieved by using larger control signals, which we don’t see, but dang awesome.

What if we try to make it there in 10 time steps??


OK well that does not look good. So what’s going on in this case? Basically we’ve given the algorithm an impossible task. It can’t make it to the target location in 10 time steps. In the implementation I wrote here, if it hits the end of it’s control sequence and it hasn’t reached the target yet, the control sequence starts over back at t=0. Remember that part of the target state is also velocity, so basically it moves for 10 time steps to try to minimize (x,y) distance, and then slows down to minimize final state cost in the velocity term.

In conclusion

This algorithm has been used in a ton of things, for controlling robots and simulations, and is an important part of guided policy search, which has been used to very successfully train deep networks in control problems. It’s getting really impressive results for controlling the arm models that I’ve built here, and using finite differences should easily generalize to other systems.

iLQR is very computationally expensive, though, so that’s definitely a downside. It’s definitely less expensive if you have the equations of your system, or at least a decent approximation of them, and you don’t need to use finite differences. But you pay for the efficiency with a loss in generality.

There are also a bunch of parameters to play around with that I haven’t explored at all here, like the weights in the cost function penalizing the magnitude of the cost function and the final state position error. I showed a basic example of changing the cost function, which hopefully gets across just how easy changing these things out can be when you’re using finite differences, and there’s a lot to play around with there too.

Implementation note
In the Yuval and Todorov paper, they talked about using backtracking line search when generating the control signal. So the algorithm they had when generating the new control signal was actually:

\hat{\textbf{u}}_t = \hat{\textbf{u}}_t + \alpha\textbf{k}_t + \textbf{K}_t(\hat{\textbf{x}}_t - \textbf{x}_t)

where \alpha was the backtracking search parameter, which gets set to one initially and then reduced. It’s very possible I didn’t implement it as intended, but I found consistently that \alpha = 1 always generated the best results, so it was just adding computation time. So I left it out of my implementation. If anyone has insights on an implementation that improves results, please let me know!

And then finally, another thank you to Dr. Emo Todorov for providing Matlab code for the iLQG algorithm, which was very helpful, especially for getting the Levenberg-Marquardt heuristic implemented properly.

Tagged , , , , , , , ,

Arm visualization with PyGame

So, in my neverending quest for better arm visualizations so I can make prettier pictures / videos and improve my submission acceptance rates I’ve started looking at PyGame. I started out hoping to find something that was quick and easy in Python for animating using SVG images, and PyGame is as close as I got. All in all, I’m decently happy with the result. It runs slower than the Matplotlib animation, and you can’t zoom in like you can on a Matplotlib graph, but it’s prettier. So, tradeoffs!

There were a few things that I ran into that tripped me up a bit when I was doing this implementation, so I thought that I would share, and that’s what this post is going to be about. Below is an animation showing what the arm visualization looks like, and as always the code for everything can be found on my Github. That links you to the control folder, and you can find all of the code developed / used for this post in this folder up on my GitHub.

Arm visualization using PyGame

Alright, there were a few parts in doing this that were a bit tricky. Let’s go through them one at a time.

Rotations in PyGame

Turns out rotating images is a pain right off the bat, but once you get over the initial hurdles it looks pretty slick.

Centering your image

To start we’re just going to attempt to rotate an image in place, here’s a first pass:

import pygame
import pygame.locals

white = (255, 255, 255)

pygame.init()

display = pygame.display.set_mode((300, 300))
fpsClock = pygame.time.Clock()

image = pygame.image.load('pic.png')

while 1:

    display.fill(white)

    image = pygame.transform.rotate(image, 1)
    rect = image.get_rect()

    display.blit(image, rect)

    # check for quit
    for event in pygame.event.get():
        if event.type == pygame.locals.QUIT:
            pygame.quit()
            sys.exit()

    pygame.display.update()
    fpsClock.tick(30)

First thing you notice upon running this is that the image flies off to the side very quickly, as shown below:
rotation-bad1
This is because we need to recenter the image after each rotation. To do that we can add in this after line 21:

    rect.center = (150, 150)

and so now our animation looks like:
rotation-bad2
At which point a very egregious problem becomes clear: the image is destroying itself as it rotates.

Transforming from a base image

Basically what’s happening is that every transformation the image gets distorted a little bit, and continues moving farther and farther away from the original. To prevent this we’re going save the original and also track the total degrees to rotate the image. Then we’ll perform one rotation (with the minimal distortion caused by one transformation) and plot that every time step. To do this we’ll introduce a degrees variable to track the total rotations. The changes to the code start on line 12:

radians = 0

while 1:

    display.fill(white)

    radians += .5

    rotated_image = pygame.transform.rotate(image, radians)
    rect = rotated_image.get_rect()
    rect.center = (150, 150)

And the resulting animation looks like:
rotation-bad3
Which is significantly better! Pretty good, in fact. Looking close, however, you can notice that it gets a little choppy. And this is because the pygame.transform.rotate method doesn’t use anti-aliasing to smooth out the image. pygame.transformation.rotozoom does, however! So we’ll use rotozoom and set the zoom level to 1, changing line 20:

    rotated = pygame.transform.rotozoom(image, degrees, 1)

And now we have a nice smooth animation!
rotation-good

At this point, we’re going to create a class to take care of this rotation business for us, storing the original image and providing a function that rotates smoothly and recenters the image.

import numpy as np
import pygame

class ArmPart:
    """
    A class for storing relevant arm segment information.
    """
    def __init__(self, pic, scale=1.0):
        self.base = pygame.image.load(pic)
        # some handy constants
        self.length = self.base.get_rect()[2]
        self.scale = self.length * scale
        self.offset = self.scale / 2.0

        self.rotation = 0.0 # in radians

    def rotate(self, rotation):
        """
        Rotates and re-centers the arm segment.
        """
        self.rotation += rotation 
        # rotate our image 
        image = pygame.transform.rotozoom(self.base, np.degrees(self.rotation), 1)
        # reset the center
        rect = image.get_rect()
        rect.center = (0, 0)

        return image, rect

Everything is just what we’ve seen so far, except the offset, which is going to be very useful for the trig we’re about to get into.

Trig

With rotations working and going smoothly one of the major hurdles is now behind us. At this point we can get our arm images and use some trig to make sure that they rotate around the joint ends rather than in the center. Using the above class now we’ll write a script that loads in a picture of an upper arm, and then rotates it around the shoulder.

import numpy as np
import pygame
import pygame.locals

from armpart import ArmPart

black = (0, 0, 0)
white = (255, 255, 255)

pygame.init()

width = 500
height = 500
display = pygame.display.set_mode((width, height))
fpsClock = pygame.time.Clock()

upperarm = ArmPart('upperarm.png', scale=.7)

base = (width / 2, height / 2)

while 1:

    display.fill(white)

    ua_image, ua_rect = upperarm.rotate(.01) 
    ua_rect.center += np.asarray(base)
    ua_rect.center -= np.array([np.cos(upperarm.rotation) * upperarm.offset,
                                -np.sin(upperarm.rotation) * upperarm.offset])

    display.blit(ua_image, ua_rect)
    pygame.draw.circle(display, black, base, 30)

    # check for quit
    for event in pygame.event.get():
        if event.type == pygame.locals.QUIT:
            pygame.quit()
            sys.exit()

    pygame.display.update()
    fpsClock.tick(30)

So far the only trig we need is simply calculating the offset from the center point. Here it’s calculated as half of the length of the image multiplied by a scaling term. The scaling term is because we don’t want the rotation point to be the absolute edge of the image but rather we want it to be somewhere inside the arm. We calculate the x and y offset values using the cos and sin functions, respectively, with a negative sign in from of the sin function because the y axis is inverted, using the top of the screen as 0. The black circle is just for ease of viewing while we’re figuring all the trig out, to make it easier to verify it’s rotating around the point we want. The above code results in the following
arm1

Going ahead and adding in the other links gives us

import numpy as np
import pygame
import pygame.locals

from armpart import ArmPart

black = (0, 0, 0)
white = (255, 255, 255)

pygame.init()

width = 750
height = 750
display = pygame.display.set_mode((width, height))
fpsClock = pygame.time.Clock()

upperarm = ArmPart('upperarm.png', scale=.7)
forearm = ArmPart('forearm.png', scale=.8)
hand = ArmPart('hand.png', scale=1.0)

origin = (width / 2, height / 2)

while 1:

    display.fill(white)

    ua_image, ua_rect = upperarm.rotate(.03) 
    fa_image, fa_rect = forearm.rotate(-.02) 
    h_image, h_rect = hand.rotate(.03) 

    # generate (x,y) positions of each of the joints
    joints_x = np.cumsum([0, 
                          upperarm.scale * np.cos(upperarm.rotation),
                          forearm.scale * np.cos(forearm.rotation),
                          hand.length * np.cos(hand.rotation)]) + origin[0]
    joints_y = np.cumsum([0, 
                          upperarm.scale * np.sin(upperarm.rotation),
                          forearm.scale * np.sin(forearm.rotation), 
                          hand.length * np.sin(hand.rotation)]) * -1 + origin[1]
    joints = [(int(x), int(y)) for x,y in zip(joints_x, joints_y)]

    def transform(rect, origin, arm_part):
        rect.center += np.asarray(origin)
        rect.center += np.array([np.cos(arm_part.rotation) * arm_part.offset,
                                -np.sin(arm_part.rotation) * arm_part.offset])

    transform(ua_rect, joints[0], upperarm)
    transform(fa_rect, joints[1], forearm)
    transform(h_rect, joints[2], hand)
    # transform the hand a bit more because it's weird
    h_rect.center += np.array([np.cos(hand.rotation), 
                              -np.sin(hand.rotation)]) * -10

    display.blit(ua_image, ua_rect)
    display.blit(fa_image, fa_rect)
    display.blit(h_image, h_rect)

    # check for quit
    for event in pygame.event.get():
        if event.type == pygame.locals.QUIT:
            pygame.quit()
            sys.exit()

    pygame.display.update()
    fpsClock.tick(30)

So basically the only thing we’ve done here that was a little more complicated was setting up the centers of the forearm and hand images to be at the end of the upper arm and forearm, respectively. We do that in the joints_x and joints_y variables, and the trig for that is straight from basic robotics.
The above code results in the following animation (which is a little choppy because I dropped some frames to keep the file size small):
animation

Plotting transparent lines

The other kind of of tricky thing that was done in the visualization code was the plotting of transparent lines of the ‘skeleton’ of the arm. The reason that this was kind of tricky was because you can’t just use the pygame.draw.lines method, because it doesn’t allow for transparency. So what you end up having to do instead is generate a set of Surfaces of the shape that you want for each of the lines, and then transform and plot them appropriately, which is thankfully pretty straightforward after working things out for the images above.

To generate a transparent surface you use the pygame.SRCALPHA variable, so it looks like

surface = pygame.Surface((width, height), pygame.SRCALPHA, 32)
surface.fill((100, 100, 100, 200))

where the variable passed in to fill is a 4-tuple, with the first 3 parameters the standard RGB levels, and the fourth parameter being the transparency level.

Then there’s one more snag, in that when you blit a surface it goes by the top left position of the rectangle that contains it. So doing the same transformations for the images we then have to transform the surface further because with the images we were specifying the desired center point. This is easy enough; just get a handle to the rect for the surface and subtract out half of the width and height (post-rotation).

The code with the transparent lines (and some circles at the joints added in for prettiness) then is

import numpy as np
import pygame
import pygame.locals

from armpart import ArmPart

black = (0, 0, 0)
white = (255, 255, 255)
arm_color = (50, 50, 50, 200) # fourth value specifies transparency

pygame.init()

width = 750
height = 750
display = pygame.display.set_mode((width, height))
fpsClock = pygame.time.Clock()

upperarm = ArmPart('upperarm.png', scale=.7)
forearm = ArmPart('forearm.png', scale=.8)
hand = ArmPart('hand.png', scale=1.0)

line_width = 15
line_upperarm = pygame.Surface((upperarm.scale, line_width), pygame.SRCALPHA, 32)
line_forearm = pygame.Surface((forearm.scale, line_width), pygame.SRCALPHA, 32)
line_hand = pygame.Surface((hand.scale, line_width), pygame.SRCALPHA, 32)

line_upperarm.fill(arm_color)
line_forearm.fill(arm_color)
line_hand.fill(arm_color)

origin = (width / 2, height / 2)

while 1:

    display.fill(white)

    # rotate our joints
    ua_image, ua_rect = upperarm.rotate(.03) 
    fa_image, fa_rect = forearm.rotate(-.02) 
    h_image, h_rect = hand.rotate(.03) 

    # generate (x,y) positions of each of the joints
    joints_x = np.cumsum([0, 
                          upperarm.scale * np.cos(upperarm.rotation),
                          forearm.scale * np.cos(forearm.rotation),
                          hand.length * np.cos(hand.rotation)]) + origin[0]
    joints_y = np.cumsum([0, 
                          upperarm.scale * np.sin(upperarm.rotation),
                          forearm.scale * np.sin(forearm.rotation), 
                          hand.length * np.sin(hand.rotation)]) * -1 + origin[1]
    joints = [(int(x), int(y)) for x,y in zip(joints_x, joints_y)]

    def transform(rect, base, arm_part):
        rect.center += np.asarray(base)
        rect.center += np.array([np.cos(arm_part.rotation) * arm_part.offset,
                                -np.sin(arm_part.rotation) * arm_part.offset])

    transform(ua_rect, joints[0], upperarm)
    transform(fa_rect, joints[1], forearm)
    transform(h_rect, joints[2], hand)
    # transform the hand a bit more because it's weird
    h_rect.center += np.array([np.cos(hand.rotation), 
                              -np.sin(hand.rotation)]) * -10

    display.blit(ua_image, ua_rect)
    display.blit(fa_image, fa_rect)
    display.blit(h_image, h_rect)

    # rotate arm lines
    line_ua = pygame.transform.rotozoom(line_upperarm, 
                                        np.degrees(upperarm.rotation), 1)
    line_fa = pygame.transform.rotozoom(line_forearm, 
                                        np.degrees(forearm.rotation), 1)
    line_h = pygame.transform.rotozoom(line_hand, 
                                        np.degrees(hand.rotation), 1)
    # translate arm lines
    lua_rect = line_ua.get_rect()
    transform(lua_rect, joints[0], upperarm)
    lua_rect.center += np.array([-lua_rect.width / 2.0, -lua_rect.height / 2.0])

    lfa_rect = line_fa.get_rect()
    transform(lfa_rect, joints[1], forearm)
    lfa_rect.center += np.array([-lfa_rect.width / 2.0, -lfa_rect.height / 2.0])

    lh_rect = line_h.get_rect()
    transform(lh_rect, joints[2], hand)
    lh_rect.center += np.array([-lh_rect.width / 2.0, -lh_rect.height / 2.0])

    display.blit(line_ua, lua_rect)
    display.blit(line_fa, lfa_rect)
    display.blit(line_h, lh_rect)

    # draw circles at joints for pretty
    pygame.draw.circle(display, black, joints[0], 30)
    pygame.draw.circle(display, arm_color, joints[0], 12)
    pygame.draw.circle(display, black, joints[1], 20)
    pygame.draw.circle(display, arm_color, joints[1], 7)
    pygame.draw.circle(display, black, joints[2], 15)
    pygame.draw.circle(display, arm_color, joints[2], 5)

    # check for quit
    for event in pygame.event.get():
        if event.type == pygame.locals.QUIT:
            pygame.quit()
            sys.exit()

    pygame.display.update()
    fpsClock.tick(30)

And the resulting arm visualization now looks like this!
arm3wlines

From here you can then blit on an image in the background, and the little zoom in I have in the top left is just another surface, it’s all pretty straight forward. And that’s pretty much everything! Hopefully this helps someone trying to get some fancier PyGame things going, and if you have any means of generating fancier PyGame animations yourself please drop any tips below in the comments!

Again, the code from this post is all up on my GitHub.

Tagged , , ,

Inverse kinematics of 3-link arm with constrained minimization in Python

Inverse kinematics is a common topic in robotics control; one that most anyone working with a robotic arm needs to address at some point. The use case I’ll be looking at here is the situation where we want to specify an (x,y) coordinate for the end-effector, and solve for the appropriate joint angle configuration that best achieves this. It’s surprisingly straightforward to handle with the right approach, so it’s a nice little exercise in Python optimization (and we’ll even work in some graphical display!) And per usual, the code is all up on my github.

Most of the time the servos that anyone is working with are position controlled, meaning that you provide them with a set of desired angles for them to move to, rather than force controlled, where you provide them with a torque signal that directly moves them. In this case, inverse kinematics is all you need. You simply work out the set of joint angles you want the servos to move to and send those out directly. In biological systems this clearly isn’t the end of the work, because even if you know where you want to go you still have to provide a force (muscle activation) signal that will move your system there. But! as mentioned in the motor babbling paper review post I wrote a while back, having the target specified in the most basic space (i.e. joint angles instead of (x,y) coordinates for a servo based arm, or muscle lengths instead of joint angles for a muscle based arm) can make calculating, or learning, the appropriate force signal much easier; so something that can do the inverse kinematics calculation is worth having.

Inverse kinematics

Now then, how do we go about finding the joint angle configuration that best achieves a desired (x,y) coordinate? Constrained minimization! Our constraint is that the tip of the arm must be at the specified (x,y) position, and we need to specify some cost function for the system to minimize to choose among all the possible configurations that accomplish this. The cost function that we’re going to use here is a very intuitive one, that I first encountered in the paper ‘Learning operational space control’ by Jan Peters. What we’re going to do is specify a default, or resting, joint state configuration, and minimize our distance from that. Which makes a lot of sense.

I’m going to post the code for calculating the inverse kinematics of a 3-link arm, and then we’ll work through it.

import math
import numpy as np
import scipy.optimize  

class Arm3Link:
    
    def __init__(self, q=None, q0=None, L=None):
        """Set up the basic parameters of the arm.
        All lists are in order [shoulder, elbow, wrist].
        
        :param list q: the initial joint angles of the arm
        :param list q0: the default (resting state) joint configuration
        :param list L: the arm segment lengths
        """
        # initial joint angles
        if q is None: q = [.3, .3, 0]
        self.q = q
        # some default arm positions
        if q0 is None: q0 = np.array([math.pi/4, math.pi/4, math.pi/4]) 
        self.q0 = q0
        # arm segment lengths
        if L is None: L = np.array([1, 1, 1]) 
        self.L = L
        
        self.max_angles = [math.pi, math.pi, math.pi/4]
        self.min_angles = [0, 0, -math.pi/4]

    def get_xy(self, q=None):
        if q is None: q = self.q

        x = self.L[0]*np.cos(q[0]) + \
            self.L[1]*np.cos(q[0]+q[1]) + \
            self.L[2]*np.cos(np.sum(q))

        y = self.L[0]*np.sin(q[0]) + \
            self.L[1]*np.sin(q[0]+q[1]) + \
            self.L[2]*np.sin(np.sum(q))

        return [x, y]

    def inv_kin(self, xy):

        def distance_to_default(q, *args): 
            # weights found with trial and error, get some wrist bend, but not much
            weight = [1, 1, 1.3] 
            return np.sqrt(np.sum([(qi - q0i)**2 * wi
                for qi,q0i,wi in zip(q, self.q0, weight)]))

        def x_constraint(q, xy):
            x = ( self.L[0]*np.cos(q[0]) + self.L[1]*np.cos(q[0]+q[1]) + 
                self.L[2]*np.cos(np.sum(q)) ) - xy[0]
            return x

        def y_constraint(q, xy): 
            y = ( self.L[0]*np.sin(q[0]) + self.L[1]*np.sin(q[0]+q[1]) + 
                self.L[2]*np.sin(np.sum(q)) ) - xy[1]
            return y
        
        return scipy.optimize.fmin_slsqp( func=distance_to_default, 
            x0=self.q, eqcons=[x_constraint, y_constraint], 
            args=[xy], iprint=0) # iprint=0 suppresses output


I’ve taken out most of the comments for compactness, but there are plenty in the code on github, don’t you worry. Alright, let’s take a look!

First, we go through and set up the parameters of our arm. If nothing was passed in during construction, then we’re going to make an arm that is initially at it’s resting position, with shoulder and elbow joint angles = \frac{\pi}{4}, and wrist angle = 0. We also set up the arm segment lengths to all be 1, and the minimum and maximum joint angles.

Next, there’s the get_xy function, which just uses some simple trig to calculate the current (x,y) position of the end-effector. This is going to be a handy thing to have when we want to see how well the joint configurations we calculate achieve our desired hand position.

And then after this, we have the actual inverse kinematics function. inv_kin takes in a desired (x,y) position, and returns to us a list of the joint angles that will best achieve this. We define three methods inside inv_kin that define our constrained optimization problem, distance_to_default, which is the function we are minimizing, and x_constraint and y_constraint, which are the constraints that must be met for a solution to be valid. These are all pretty straight forward functions, the distance_to_default function calculates the Euclidean distance in joint space to the resting state, and the constraint functions calculate the difference between the actual and desired end-effector positions for a given set of joint angles.

There is one point of interest in the distance_to_default method, and that is the use of the weights therein. What is the point of applying a gain to these values, you ask? Why, simply that it lets us weight the relative importance of each of these joints remaining close to their default configuration! If you think about moving your arm on a horizontal plane, the wrist actually bends very little. To mimic this we can set the weight applied to the distance of the wrist from its resting state higher than those of the other joints, so that it becomes a higher priority to keep the wrist near its resting state. I suggest playing around with once the graphical display (below) is built up, it’s fun to see the effects different weightings can give on joint configurations.

With these three functions defined, we can now call up the scipy.optimize.fmin_slsqp function, which performs the sequential least squares method to arrive at a solution.

Really, that’s all there is to it!

To test this now, we have the following method:

def test():
    ############Test it!##################

    arm = Arm3Link()

    # set of desired (x,y) hand positions
    x = np.arange(-.75, .75, .05)
    y = np.arange(0, .75, .05)

    # threshold for printing out information, to find trouble spots
    thresh = .025

    count = 0
    total_error = 0
    # test it across the range of specified x and y values
    for xi in range(len(x)):
        for yi in range(len(y)):
            # test the inv_kin function on a range of different targets
            xy = [x[xi], y[yi]]
            # run the inv_kin function, get the optimal joint angles
            q = arm.inv_kin(xy=xy)
            # find the (x,y) position of the hand given these angles
            actual_xy = arm.get_xy(q)
            # calculate the root squared error
            error = np.sqrt((np.array(xy) - np.array(actual_xy))**2)
            # total the error 
            total_error += error
            
            # if the error was high, print out more information
            if np.sum(error) > thresh:
                print '-------------------------'
                print 'Initial joint angles', arm.q 
                print 'Final joint angles: ', q
                print 'Desired hand position: ', xy
                print 'Actual hand position: ', actual_xy
                print 'Error: ', error
                print '-------------------------'
            
            count += 1

    print '\n---------Results---------'
    print 'Total number of trials: ', count
    print 'Total error: ', total_error
    print '-------------------------'

Which is simply working through a set of target (x,y) points, and calculating the total error across all of them. If any particular point gives an error above the threshold level, print out the information for that point so we can take a closer look if so desired. Fortunately, the threshold error isn’t even approached in this test, and for output we get:

---------Results---------
Total number of trials:  450
Total error:  [  3.33831421e-05   2.89667496e-05]
-------------------------

Not bad!

Visualization

So that’s all great and dandy, but it’s always nice to be able to really visualize these things, so I wrote another method that uses Pyglet to help visualize. This is a really easy to use graphics program that I tested out as a means here, and I was very happy with it. I ended up writing a method that pops up a window with a 3-link arm drawn on it, and the arm uses the above inverse kinematics function in the Arm class written above to calculate the appropriate joint angles for the arm to be at to follow the mouse. Once the joint angles are calculated, then their (x,y) locations are also calculated, and the arm is drawn. Pleasingly, the inv_kin above is fast enough to work in real time, so it’s a kind of fun toy example. Again, I’ll show the code and then we’ll work through it below.

import numpy as np
import pyglet
import time

import Arm

def plot(): 
    """A function for plotting an arm, and having it calculate the 
    inverse kinematics such that given the mouse (x, y) position it 
    finds the appropriate joint angles to reach that point."""

    # create an instance of the arm
    arm = Arm.Arm3Link(L = np.array([300,200,100]))

    # make our window for drawin'
    window = pyglet.window.Window()

    label = pyglet.text.Label('Mouse (x,y)', font_name='Times New Roman', 
        font_size=36, x=window.width//2, y=window.height//2,
        anchor_x='center', anchor_y='center')

    def get_joint_positions():
        """This method finds the (x,y) coordinates of each joint"""

        x = np.array([ 0, 
            arm.L[0]*np.cos(arm.q[0]),
            arm.L[0]*np.cos(arm.q[0]) + arm.L[1]*np.cos(arm.q[0]+arm.q[1]),
            arm.L[0]*np.cos(arm.q[0]) + arm.L[1]*np.cos(arm.q[0]+arm.q[1]) + 
                arm.L[2]*np.cos(np.sum(arm.q)) ]) + window.width/2

        y = np.array([ 0, 
            arm.L[0]*np.sin(arm.q[0]),
            arm.L[0]*np.sin(arm.q[0]) + arm.L[1]*np.sin(arm.q[0]+arm.q[1]),
            arm.L[0]*np.sin(arm.q[0]) + arm.L[1]*np.sin(arm.q[0]+arm.q[1]) + 
                arm.L[2]*np.sin(np.sum(arm.q)) ])

        return np.array([x, y]).astype('int')
    
    window.jps = get_joint_positions()

    @window.event
    def on_draw():
        window.clear()
        label.draw()
        for i in range(3): 
            pyglet.graphics.draw(2, pyglet.gl.GL_LINES, ('v2i', 
                (window.jps[0][i], window.jps[1][i], 
                 window.jps[0][i+1], window.jps[1][i+1])))

    @window.event
    def on_mouse_motion(x, y, dx, dy):
        # call the inverse kinematics function of the arm
        # to find the joint angles optimal for pointing at 
        # this position of the mouse 
        label.text = '(x,y) = (%.3f, %.3f)'%(x,y)
        arm.q = arm.inv_kin([x - window.width/2, y]) # get new arm angles
        window.jps = get_joint_positions() # get new joint (x,y) positions

    pyglet.app.run()

plot()

There are really only a few things that we’re doing in this method: We create an arm with somewhat normal segment lengths and a window for plotting; we have a function for calculating the (x,y) coordinates of each of the joints for plotting; we have a function that tracks the position of the mouse and updates our arm joint angles by calling the arm.inv_kin function we wrote above; and lastly we plot it! Easy. The functions aren’t quite in that order, and that’s because I chose alphabetical ordering over conceptual. Here’s a picture! armplot

I think the only kind of weird thing going on in here is that my variables that I use in both this method and in the overwritten window methods are defined as belonging to window. Aside from that, it’s straightforward trig to calculate the joint angle Cartesian coordinates, and then there’s also a label that displays the mouse (x,y) position for the hell of it. Honestly, the label was in a Pyglet tutorial example and I just left it in.

It’s not going to win any awards for graphics by any means, but it’s simple and it works! And, assuming you have Pyglet installed (which is very easy to do), you should straight up be able to run it and be amazed. Or a little impressed. Or not, whatever. But in any case now you have your very own inverse kinematics example on a 3-link arm! Moving to any other number of joints is exactly the same, you just have to add the corresponding number of default joint configurations and everything should work out as is.

From here, the plan is to look at bringing in an actual force driven arm model to the mix, and then building up a system that can track the mouse, but with a control signal that specifies forces to apply to the arm, rather than positions. Which I think will be pretty neat. And in case you missed it at the top, here’s a link to all the code on my github.

Tagged , , ,

Nengo model – Low pass derivative filter

To just get the code you can copy / paste from below or get the code from my github: low_pass_derivative_filter.py.

In the course of building models in Nengo, I recently came in to need for a neural implementation of a low pass derivative filter. I scripted up a sub-network in Nengo (www.nengo.ca) that does this, and until we get the model database / repository up and running for Nengo scripts I’ll keep working through building these things here, because it goes over some basic methods that can be useful when you’re building up your models.

Basically there are three parts to this model: Derivative calculation, absolute value calculation, and an inhibitory projection with a threshold activation that projects to the output population. Here’s a picture:
Here’s the idea: The population input projects both directly to the output population and to a population that calculates the derivative of the sum across dimensions of the input signal. The derivative population passes on the derivative of the input signal to an absolute value calculating population, which passes the absolute value of the derivative on to a population that isn’t activated for values under a threshold level. This threshold population then projects very strong inhibition to the output population, so that when the absolute value of the derivative is above the threshold level, no output is projected, and otherwise the system just relays the input signal straight through. Here’s the code:

def make_dlowpass(name, neurons, dimensions, radius=10, tau_inhib=0.005, inhib_scale=10):

    dlowpass = nef.Network(name)

    dlowpass.make('input', neurons=1, dimensions=dimensions, mode='direct') # create input relay
    output = dlowpass.make('output', neurons=dimensions*neurons, dimensions=dimensions) # create output relay
    
    # now we track the derivative of sum, and only let output relay the input
    # if the derivative is below a given threshold
    dlowpass.make('derivative', neurons=radius*neurons, dimensions=2, radius=radius) # create population to calculate the derivative
    dlowpass.connect('derivative', 'derivative', index_pre=0, index_post=1, pstc=0.1) # set up recurrent connection
    dlowpass.add(make_abs_val(name='abs_val', neurons=neurons, dimensions=1, intercept=(.2,1))) # create a subnetwork to calculate the absolute value  

    # connect it up!
    dlowpass.connect('input', 'output') # set up communication channel
    dlowpass.connect('input', 'derivative', index_post=0)
    
    def sub(x):
        return [x[0] - x[1]]
    dlowpass.connect('derivative', 'abs_val.input', func=sub)
        
    # set up inhibitory matrix
    inhib_matrix = [[-inhib_scale]] * neurons * dimensions
    output.addTermination('inhibition', inhib_matrix, tau_inhib, False)
    dlowpass.connect('abs_val.output', output.getTermination('inhibition'))

    return dlowpass.network

First off, this code is taking advantage of the absolute value function that was written a couple blog posts ago. You can either go check out that post or I’ll also have that function included in the code at the end for completeness. Aside from that, there are a lot of things going on that you won’t come across if you’re just coding up simple examples, so let’s look at them.

At the top, we’re assigning our output network a handle, which I try to avoid in general for neatness, since most of the times you can reference it by simply referring to it’s name, 'output'. The reason that I assign it a handle here is because we’re going to be calling upon some Java API features that (to my knowledge) aren’t handled in the Python API yet, and although we could call up the output node as a Java object with dlowpass.get('output') using only its assigned name, it will just be cleaner in the end to have a handle for it. We’ll come back to this.

The next interesting thing that happens, is that when we’re creating our derivative population, we set the number of neurons to 10*neurons, and radius=10. This is to because in the derivative we’re representing the sum of all of the input dimensions. How are all the input dimensions being summed up in derivative, you ask? Just below, on line 17. When we connect up the input relay to derivative we set index_post=0, which means that all of the input dimensions are going to to project to the same dimension of the derivative population. The default weight for this connection is 1, so then dimension 0 of derivative is equal to 1 * value for value in input_dimensions . Super.

But why do we set radius=10? This is because the radius parameter specifies the range of values represented by this population. The default is (-1,1), but when we specify radius, the new range of represented values becomes (-radius, radius). We’re making a bit of an assumption that this value won’t go outside of the range (-10,10) here, but that should be OK for most of the situations we’re going to come across. In the specific model I’m using this for it’s definitely the case, so that’s why I’ve set the default value to 10. And because we don’t want the accuracy in representation to decrease, we also scale up the number of neurons in this population by radius*neurons.

And there’s still more happening in this derivative population! On line 11 I specify a recurrent connection that projects into a second dimension represented in derivative. So now what’s going to happen is that the sum of the input signals is projected into the first dimension of derivative, and through a recurrent connection the value of the sum of the input signals from time t-pstc will be represented in the second dimension of the derivative population.

To calculate the derivative then, it’s a simple matter of subtracting the previous signal from the current signal, which is what happens in the function sub that I define on line 19. To implement this function, when connecting up derivative to abs_val, just set the parameter func=sub. Easy.

Now, when the abs_val population is made, we set and intercept=(.2,1). This is the same trick we used in the previous absolute value function model, but it’s acting as a threshold here. Basically, this population won’t respond if the value being projected into it is between (-.2, .2).

So, up to this point, what we have is a summation of the input dimensions, the derivative being calculated and passed to an absolute value function, and this population only responds if the derivative is greater than .2.

The last part is hooking up this abs_val population to the output relay, to suppress output whenever it’s activated (i.e. when the derivative is greater than .2). This is where we need to pull into the Java API, and this is why we specified a handle for our output population. In lines 25-26, what’s going on is that instead of using the NEF neural compiler functionality to set up our connection weights to compute some function, we’re specifying them ourselves. And we’re specifying them to prevent the neurons in output from firing. Now, the activation of the neurons in abs_val reduces the voltage values being sent into the output population, inhibiting their activity.

And that’s it! Here’s the complete code to run an example of this network (which can also be found on my github low_pass_derivative_filter.py):

import nef

# constants / parameter setup etc
N = 50 # number of neurons
D = 3 # number of dimensions

def make_abs_val(name, neurons, dimensions, intercept=[0]):
    def mult_neg_one(x):
        return x[0] * -1 

    abs_val = nef.Network(name)

    abs_val.make('input', neurons=1, dimensions=dimensions, mode='direct') # create input relay
    abs_val.make('output', neurons=1, dimensions=dimensions, mode='direct') # create output relay
    
    for d in range(dimensions): # create a positive and negative population for each dimension in the input signal
        abs_val.make('abs_pos%d'%d, neurons=neurons, dimensions=1, encoders=[[1]], intercept=intercept)
        abs_val.make('abs_neg%d'%d, neurons=neurons, dimensions=1, encoders=[[-1]], intercept=intercept)

        abs_val.connect('input', 'abs_pos%d'%d, index_pre=d)
        abs_val.connect('input', 'abs_neg%d'%d, index_pre=d)
    
        abs_val.connect('abs_pos%d'%d, 'output', index_post=d)
        abs_val.connect('abs_neg%d'%d, 'output', index_post=d, func=mult_neg_one)

    return abs_val.network

def make_dlowpass(name, neurons, dimensions, radius=10, tau_inhib=0.005, inhib_scale=10):

    dlowpass = nef.Network(name)

    dlowpass.make('input', neurons=1, dimensions=dimensions, mode='direct') # create input relay
    output = dlowpass.make('output', neurons=dimensions*neurons, dimensions=dimensions) # create output relay
    
    # now we track the derivative of sum, and only let output relay the input
    # if the derivative is below a given threshold
    dlowpass.make('derivative', neurons=radius*neurons, dimensions=2, radius=radius) # create population to calculate the derivative
    dlowpass.connect('derivative', 'derivative', index_pre=0, index_post=1, pstc=0.1) # set up recurrent connection
    dlowpass.add(make_abs_val(name='abs_val', neurons=neurons, dimensions=1, intercept=(.2,1))) # create a subnetwork to calculate the absolute value  

    # connect it up!
    dlowpass.connect('input', 'output') # set up communication channel
    dlowpass.connect('input', 'derivative', index_post=0)
    
    def sub(x):
        return [x[0] - x[1]]
    dlowpass.connect('derivative', 'abs_val.input', func=sub)
        
    # set up inhibitory matrix
    inhib_matrix = [[-inhib_scale]] * neurons * dimensions
    output.addTermination('inhibition', inhib_matrix, tau_inhib, False)
    dlowpass.connect('abs_val.output', output.getTermination('inhibition'))

    return dlowpass.network

# Create network
net = nef.Network('net')

# Create / add low pass derivative filter
net.add(make_dlowpass(name='dlowpass', neurons=N, dimensions=D))

# Make function input
net.make_input('input_function', values=[0]*D)

# Connect up function input to filter
net.connect('input_function', 'dlowpass.input')

# Add it all to Nengo
net.add_to_nengo()

And here’s a picture of it running. What you can see is that anytime the input changes quickly, the system input drops to zero, but when the input is holding constant or is changing slowly the output is allowed to pass through. Great! Just what we wanted. net

Tagged , ,

Using the same random number generator in C++ and Python

Anyone who has converted code from one language to another, where there is a random number generator involved, knows the pain of rigorously checking that both versions of code do the exact same thing. In the past I’ve done a write out to file from one of the language’s random number generators (RNGs), and then read in from file, but it’s still be a pain in the ass as there’s some overhead involved and you have to change everything back and forth if you need to do debugging / comparison in the future, etc etc. After some searching, it doesn’t seem that there are any common RNGs, and the closest I found was a suggestion saying to write the same algorithm out in each of the languages and use that.

Well. I happen to know how to use Cython now, and rather than rewrite the same code in C++ and Python, I thought it was a perfect opportunity to put to use this knowledge. There’s a program called SimpleRNG for C++ (http://www.codeproject.com/Articles/25172/Simple-Random-Number-Generation), with a whole slew of basic random number generation methods, so the idea was just to take that and throw some Cython at it to get access to the same RNG in both C++ and Python. It turned out to be almost a trivial exercise, but with very useful results!

Since we already have the SimpleRNG.h and SimpleRNG.cpp code all written, taken from link above, we can start by making a .pyx file that will 1) provide a Cython handle to the C++ code, and 2) define a Python class that can call these functions. Remembering not to name the .pyx file the same thing as the .cpp files, for fear of the code generated by Cython overwriting my .cpp files, I added a “py” prefix. The first part of the .pyx file is just redefining all the functionality we want from the header file:
pySimpleRNG.pyx

cdef extern from &quot;SimpleRNG.h&quot;:
    cdef cppclass SimpleRNG:
        SimpleRNG()

        # Seed the random number generator 
        void SetState(unsigned int u, unsigned int v)

        # A uniform random sample from the open interval (0, 1) 
        double GetUniform()

        # A uniform random sample from the set of unsigned integers 
        unsigned int GetUint()

        # Normal (Gaussian) random sample 
        double GetNormal(double mean, double standardDeviation)

        # Exponential random sample 
        double GetExponential(double mean)

        # Gamma random sample
        double GetGamma(double shape, double scale)

        # Chi-square sample
        double GetChiSquare(double degreesOfFreedom)

        # Inverse-gamma sample
        double GetInverseGamma(double shape, double scale)

        # Weibull sample
        double GetWeibull(double shape, double scale)

        # Cauchy sample
        double GetCauchy(double median, double scale)

        # Student-t sample
        double GetStudentT(double degreesOfFreedom)

        # The Laplace distribution is also known as the double exponential distribution.
        double GetLaplace(double mean, double scale)

        # Log-normal sample
        double GetLogNormal(double mu, double sigma)

        # Beta sample
        double GetBeta(double a, double b)

        # Poisson sample
        int GetPoisson(double lam)

Look at all those functions! I left out two functions from the redefinition, namely double GetUniform(unsigned int& u, unsigned int& v) and unsigned int GetUint(unsigned int& u, unsigned int& v), for the simple reason that I don’t want to deal with reference operators in Cython, and I don’t have any need for the functionality provided with the overloaded GetUniform() and GetUint() functions.

Alright, the first part is done. Second part! Straightforward again, define a Python class, create a pointer to cppclass we just defined, and then make a bunch of handle functions to call them up. That looks like:
pySimpleRNG.pyx

cdef class pySimpleRNG:
    cdef SimpleRNG* thisptr # hold a C++ instance
    def __cinit__(self):
        self.thisptr = new SimpleRNG()
    def __dealloc__(self):
        del self.thisptr
    
    # Seed the random number generator 
    def SetState(self, unsigned int u, unsigned int v):
        self.thisptr.SetState(u, v)

    # A uniform random sample from the open interval (0, 1) 
    def GetUniform(self): 
        return self.thisptr.GetUniform()

    # A uniform random sample from the set of unsigned integers 
    def GetUint(self):
        return self.thisptr.GetUint()

    # Normal (Gaussian) random sample 
    def GetNormal(self, double mean=0, double std_dev=1):
        return self.thisptr.GetNormal(mean, std_dev)

    # Exponential random sample 
    def GetExponential(self, double mean):
        return self.thisptr.GetExponential(mean)

    # Gamma random sample
    def GetGamma(self, double shape, double scale):
        return self.thisptr.GetGamma(shape, scale)

    # Chi-square sample
    def GetChiSquare(self, double degreesOfFreedom):
        return self.thisptr.GetChiSquare(degreesOfFreedom)

    # Inverse-gamma sample
    def GetInverseGamma(self, double shape, double scale):
        return self.thisptr.GetInverseGamma(shape, scale)

    # Weibull sample
    def GetWeibull(self, double shape, double scale):
        return self.thisptr.GetWeibull(shape, scale)

    # Cauchy sample
    def GetCauchy(self, double median, double scale):
        return self.thisptr.GetCauchy(median, scale)

    # Student-t sample
    def GetStudentT(self, double degreesOfFreedom):
        return self.thisptr.GetStudentT(degreesOfFreedom)

    # The Laplace distribution is also known as the double exponential distribution.
    def GetLaplace(self, double mean, double scale):
        return self.thisptr.GetLaplace(mean, scale)

    # Log-normal sample
    def GetLogNormal(self, double mu, double sigma):
        return self.thisptr.GetLogNormal(mu, sigma)

    # Beta sample
    def GetBeta(self, double a, double b):
        return self.thisptr.GetBeta(a, b)

Again, very simple. The only thing I’ve added in this code is default values for the GetNormal() method, specifying a mean of 0 and standard deviation of 1, since I’ll be using it a lot and it’s nice to have default values.

Now the only thing left is the setup file:
setup.py

from distutils.core import setup
from distutils.extension import Extension
from Cython.Distutils import build_ext

setup(
  name = 'SimpleRNG',
  ext_modules=[ 
    Extension(&quot;SimpleRNG&quot;, 
              sources=[&quot;pySimpleRNG.pyx&quot;, &quot;SimpleRNG.cpp&quot;], # Note, you can link against a c++ library instead of including the source
              language=&quot;c++&quot;),
    ],
  cmdclass = {'build_ext': build_ext},

)

And now calling it from IPython

run setup.py build_ext -i

And pleasantly, now, you can call the SetState(int,int) function and generate the same set of random numbers in both C++ and Python. Which is absolutely wonderful for comparison / debugging. It’s been super useful for me, I hope someone else also finds it helpful!

All the code for this post can be found at my github repository: pySimpleRNG. If you’re simply looking for what you need to get SimpleRNG in Python, then all you need is the SimpleRNG.o file! Drop it into your Python project folder and import away.

Update: It was pointed out that having a Python script example would be useful, which is very true! Here’s a quick script using this shared library.

from SimpleRNG import pySimpleRNG

a = pySimpleRNG()
a.SetState(1,1) # set the seed
a.GetUniform() # remember this number
a.GetUniform()
a.SetState(1,1) # reset seed 
a.GetUniform() # returns the same number as above
Tagged , , , ,