Skip to content

Instantly share code, notes, and snippets.

@Deathmonster
Created September 8, 2014 20:43
Show Gist options
  • Save Deathmonster/b0443215ebd7cc1d6f9f to your computer and use it in GitHub Desktop.
Save Deathmonster/b0443215ebd7cc1d6f9f to your computer and use it in GitHub Desktop.
Simple RNN Example with Scan / For Loop Comparison
import theano
import theano.tensor as T
import numpy
import numpy.random as rng
import math
class LSTMLayer:
def __init__(self, inputSize, controllerSize, memorySize, outputSize, initialScale, useReluReadGate = True):
readGateSize = memorySize
readDeltaSize = memorySize
writeGateSize = memorySize
keepGateSize = memorySize
scale = initialScale
#It is possible for the model to immediately enter a local minimum at the start of training in which the write gate or keep gate are closed for too
#many memory modules, which prevents the model from learning long-range dependencies.
keepGateInitialBias = 5.0
writeGateInitialBias = 2.0
#All rectified linear unit hidden layers have this initial scale
reluScale = 0.01
self.useReluReadGate = useReluReadGate
readGateScale = initialScale
self.W_controller = theano.shared(numpy.asarray(scale * rng.normal(size = (inputSize + 1 * controllerSize, controllerSize)), dtype = theano.config.floatX), name = "Controller weights")
self.W_readgate = theano.shared(numpy.asarray(readGateScale * rng.normal(size = (readGateSize, inputSize + 1 * controllerSize)), dtype = theano.config.floatX), name = "read gate weights")
self.W_readdelta = theano.shared(numpy.asarray(scale * rng.normal(size = (readDeltaSize, inputSize + 1 * controllerSize)), dtype = theano.config.floatX), name = "readdelta weights")
self.W_writegate = theano.shared(numpy.asarray(scale * rng.normal(size = (writeGateSize, inputSize + 1 * memorySize)), dtype = theano.config.floatX), name = "writegate weights")
self.W_keepgate = theano.shared(numpy.asarray(scale * rng.normal(size = (keepGateSize, inputSize + 1 * memorySize)), dtype = theano.config.floatX), name = "keepgate weights")
self.W_output = theano.shared(numpy.asarray(reluScale * rng.normal(size = (outputSize, inputSize + memorySize + controllerSize)), dtype = theano.config.floatX), name = "output weights")
self.b_controller = theano.shared(numpy.asarray(numpy.zeros(shape = controllerSize), dtype = theano.config.floatX), name = "controller bias")
self.b_readgate = theano.shared(numpy.asarray(numpy.zeros(shape = readGateSize), dtype = theano.config.floatX), name = "readgate bias")
self.b_readdelta = theano.shared(numpy.asarray(numpy.zeros(shape = readDeltaSize), dtype = theano.config.floatX), name = "readdelta bias")
self.b_writegate = theano.shared(numpy.asarray(writeGateInitialBias + numpy.zeros(shape = writeGateSize), dtype = theano.config.floatX), name = "writegate bias")
self.b_keepgate = theano.shared(numpy.asarray(numpy.zeros(shape = keepGateSize) + keepGateInitialBias, dtype = theano.config.floatX), name = "keepgate bias")
self.b_output = theano.shared(numpy.asarray(numpy.zeros(shape = outputSize), dtype = theano.config.floatX), name = "output bias")
self.params = [self.W_controller, self.W_readgate, self.W_readdelta, self.W_writegate, self.W_keepgate, self.W_output, self.b_controller, self.b_readgate, self.b_readdelta, self.b_writegate, self.b_keepgate, self.b_output]
def getOutputs(self, previousController, previousMemory, input_layer):
if previousController.ndim == 1:
axisConcat = 0
else:
axisConcat = 1
controller = T.tanh(T.dot(T.concatenate([previousController, 1.0 * input_layer], axis = axisConcat), self.W_controller) + self.b_controller)
readgate = T.nnet.sigmoid(T.dot(T.concatenate([controller, input_layer], axis = axisConcat), self.W_readgate.T) + self.b_readgate)
readdelta = T.tanh(T.dot(T.concatenate([controller, input_layer], axis = axisConcat), self.W_readdelta.T) + self.b_readdelta)
keepgate = T.nnet.sigmoid(T.dot(T.concatenate([controller, input_layer], axis = axisConcat), self.W_keepgate.T) + self.b_keepgate)
memory_intermediate = previousMemory * keepgate + readgate * readdelta
writegate = T.nnet.sigmoid(T.dot(T.concatenate([controller, input_layer], axis = axisConcat), self.W_writegate.T) + self.b_writegate)
memory = memory_intermediate
output = writegate * T.maximum(0.0, T.dot(T.concatenate([controller, memory, input_layer], axis = axisConcat), self.W_output.T) + self.b_output)
return controller, memory_intermediate, output, controller, readgate, readdelta, keepgate, memory_intermediate, writegate, memory
import theano
import theano.tensor as T
import numpy
import sys
import time
import LSTMLayer
#Doing demo to study performance difference between for loop and LSTM on cpu. Will also try gpu.
sys.setrecursionlimit(100000)
if __name__ == "__main__":
weightSize = 2000
lstm = LSTMLayer.LSTMLayer(weightSize, weightSize, weightSize, weightSize, 0.1, useReluReadGate = True)
W = theano.shared(numpy.random.normal(size = (weightSize,weightSize)))
print W.shape
def oneStep(prevH, prevMemory, prevController, a1,a2,a3,a4,a5,a6,a7):
controller1, memory1, h1, a1,a2,a3,a4,a5,a6,a7 = lstm.getOutputs(prevController, prevMemory, prevH)
return controller1, memory1, h1, a1,a2,a3,a4,a5,a6,a7
sequenceLength = 208
print "Sequence Length", sequenceLength, "Number of Hidden Units:", weightSize
h0 = T.vector()
memory_0 = T.vector()
controller_0 = T.vector()
h1 = h0
new_h = [h0]
new_memory = [memory_0]
new_controller = [controller_0]
a1_init, a2_init, a3_init, a4_init, a5_init, a6_init, a7_init = T.vector(), T.vector(), T.vector(), T.vector(), T.vector(), T.vector(), T.vector()
for i in range(0, sequenceLength):
prevH = new_h[-1]
prevMemory = new_memory[-1]
prevController = new_controller[-1]
r = oneStep(prevH, prevMemory, prevController, a1_init, a2_init, a3_init, a4_init, a5_init, a6_init, a7_init)
newHVal, newMemoryVal, newControllerVal, a1,a2,a3,a4,a5,a6,a7 = r
new_h += [newHVal]
new_memory += [newMemoryVal]
new_controller += [newControllerVal]
new_h_scan, _ = theano.scan(oneStep, sequences = [], outputs_info = [h1, memory_0, controller_0, a1_init, a2_init, a3_init, a4_init, a5_init, a6_init, a7_init], n_steps = sequenceLength)
print "starting grad for loop"
timeStart = time.time()
g = T.grad(sum(map(T.sum, new_h)), h0)
print "time spent on for loop grad", time.time() - timeStart
timeStart = time.time()
g_scan = T.grad(T.sum(new_h_scan), h0)
print "time spent on scan grad", time.time() - timeStart
timeStart = time.time()
f = theano.function(inputs = [h0, memory_0, controller_0, a1_init, a2_init, a3_init, a4_init, a5_init, a6_init, a7_init], outputs = [new_h[-1], g], on_unused_input='ignore')
print "time spent compling for loop", time.time() - timeStart
timeStart = time.time()
f_scan = theano.function(inputs = [h0, memory_0, controller_0, a1_init, a2_init, a3_init, a4_init, a5_init, a6_init, a7_init], outputs = [new_h_scan[-1], g_scan], on_unused_input='ignore')
print "time spent compiling scan", time.time() - timeStart
numIter = 100
timeStart = time.time()
for i in range(0, numIter):
f([1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize)
print "time", time.time() - timeStart
timeStart = time.time()
for i in range(0, numIter):
f_scan([1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize, [1.0] * weightSize)
print "time for scan version", time.time() - timeStart
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment