Remove deprecated code.

This commit is contained in:
Francois Chollet 2015-11-28 16:34:06 -08:00
parent 25e85b616f
commit 4b39b5f36b
29 changed files with 0 additions and 3497 deletions

@ -1,221 +0,0 @@
'''
We loop over words in a dataset, and for each word, we look at a context window around the word.
We generate pairs of (pivot_word, other_word_from_same_context) with label 1,
and pairs of (pivot_word, random_word) with label 0 (skip-gram method).
We use the layer WordContextProduct to learn embeddings for the word couples,
and compute a proximity score between the embeddings (= p(context|word)),
trained with our positive and negative labels.
We then use the weights computed by WordContextProduct to encode words
and demonstrate that the geometry of the embedding space
captures certain useful semantic properties.
Read more about skip-gram in this particularly gnomic paper by Mikolov et al.:
http://arxiv.org/pdf/1301.3781v3.pdf
Note: you should run this on GPU, otherwise training will be quite slow.
On a EC2 GPU instance, expect 3 hours per 10e6 comments (~10e8 words) per epoch with dim_proj=256.
Should be much faster on a modern GPU.
GPU command:
THEANO_FLAGS=mode=FAST_RUN,device=gpu,floatX=float32 python skipgram_word_embeddings.py
Dataset: 5,845,908 Hacker News comments.
Obtain the dataset at:
https://mega.co.nz/#F!YohlwD7R!wec0yNO86SeaNGIYQBOR0A
(HNCommentsAll.1perline.json.bz2)
'''
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
import theano
from six.moves import cPickle
import os, re, json
from keras.preprocessing import sequence, text
from keras.optimizers import SGD, RMSprop, Adagrad
from keras.utils import np_utils, generic_utils
from keras.models import Sequential
from keras.layers.embeddings import WordContextProduct, Embedding
from six.moves import range
from six.moves import zip
max_features = 50000 # vocabulary size: top 50,000 most common words in data
skip_top = 100 # ignore top 100 most common words
nb_epoch = 1
dim_proj = 256 # embedding space dimension
save = True
load_model = False
load_tokenizer = False
train_model = True
save_dir = os.path.expanduser("~/.keras/models")
if not os.path.exists(save_dir):
os.makedirs(save_dir)
model_load_fname = "HN_skipgram_model.pkl"
model_save_fname = "HN_skipgram_model.pkl"
tokenizer_fname = "HN_tokenizer.pkl"
data_path = os.path.expanduser("~/")+"HNCommentsAll.1perline.json"
# text preprocessing utils
html_tags = re.compile(r'<.*?>')
to_replace = [('&#x27;', "'")]
hex_tags = re.compile(r'&.*?;')
def clean_comment(comment):
c = str(comment.encode("utf-8"))
c = html_tags.sub(' ', c)
for tag, char in to_replace:
c = c.replace(tag, char)
c = hex_tags.sub(' ', c)
return c
def text_generator(path=data_path):
f = open(path)
for i, l in enumerate(f):
comment_data = json.loads(l)
comment_text = comment_data["comment_text"]
comment_text = clean_comment(comment_text)
if i % 10000 == 0:
print(i)
yield comment_text
f.close()
# model management
if load_tokenizer:
print('Load tokenizer...')
tokenizer = cPickle.load(open(os.path.join(save_dir, tokenizer_fname), 'rb'))
else:
print("Fit tokenizer...")
tokenizer = text.Tokenizer(nb_words=max_features)
tokenizer.fit_on_texts(text_generator())
if save:
print("Save tokenizer...")
if not os.path.exists(save_dir):
os.makedirs(save_dir)
cPickle.dump(tokenizer, open(os.path.join(save_dir, tokenizer_fname), "wb"))
# training process
if train_model:
if load_model:
print('Load model...')
model = cPickle.load(open(os.path.join(save_dir, model_load_fname), 'rb'))
else:
print('Build model...')
model = Sequential()
model.add(WordContextProduct(max_features, proj_dim=dim_proj, init="uniform"))
model.compile(loss='mse', optimizer='rmsprop')
sampling_table = sequence.make_sampling_table(max_features)
for e in range(nb_epoch):
print('-'*40)
print('Epoch', e)
print('-'*40)
progbar = generic_utils.Progbar(tokenizer.document_count)
samples_seen = 0
losses = []
for i, seq in enumerate(tokenizer.texts_to_sequences_generator(text_generator())):
# get skipgram couples for one text in the dataset
couples, labels = sequence.skipgrams(seq, max_features, window_size=4, negative_samples=1., sampling_table=sampling_table)
if couples:
# one gradient update per sentence (one sentence = a few 1000s of word couples)
X = np.array(couples, dtype="int32")
loss = model.train_on_batch(X, labels)
losses.append(loss)
if len(losses) % 100 == 0:
progbar.update(i, values=[("loss", np.mean(losses))])
losses = []
samples_seen += len(labels)
print('Samples seen:', samples_seen)
print("Training completed!")
if save:
print("Saving model...")
if not os.path.exists(save_dir):
os.makedirs(save_dir)
cPickle.dump(model, open(os.path.join(save_dir, model_save_fname), "wb"))
print("It's test time!")
# recover the embedding weights trained with skipgram:
weights = model.layers[0].get_weights()[0]
# we no longer need this
del model
weights[:skip_top] = np.zeros((skip_top, dim_proj))
norm_weights = np_utils.normalize(weights)
word_index = tokenizer.word_index
reverse_word_index = dict([(v, k) for k, v in list(word_index.items())])
def embed_word(w):
i = word_index.get(w)
if (not i) or (i < skip_top) or (i >= max_features):
return None
return norm_weights[i]
def closest_to_point(point, nb_closest=10):
proximities = np.dot(norm_weights, point)
tups = list(zip(list(range(len(proximities))), proximities))
tups.sort(key=lambda x: x[1], reverse=True)
return [(reverse_word_index.get(t[0]), t[1]) for t in tups[:nb_closest]]
def closest_to_word(w, nb_closest=10):
i = word_index.get(w)
if (not i) or (i < skip_top) or (i >= max_features):
return []
return closest_to_point(norm_weights[i].T, nb_closest)
''' the resuls in comments below were for:
5.8M HN comments
dim_proj = 256
nb_epoch = 2
optimizer = rmsprop
loss = mse
max_features = 50000
skip_top = 100
negative_samples = 1.
window_size = 4
and frequency subsampling of factor 10e-5.
'''
words = [
"article", # post, story, hn, read, comments
"3", # 6, 4, 5, 2
"two", # three, few, several, each
"great", # love, nice, working, looking
"data", # information, memory, database
"money", # company, pay, customers, spend
"years", # ago, year, months, hours, week, days
"android", # ios, release, os, mobile, beta
"javascript", # js, css, compiler, library, jquery, ruby
"look", # looks, looking
"business", # industry, professional, customers
"company", # companies, startup, founders, startups
"after", # before, once, until
"own", # personal, our, having
"us", # united, country, american, tech, diversity, usa, china, sv
"using", # javascript, js, tools (lol)
"here", # hn, post, comments
]
for w in words:
res = closest_to_word(w)
print('====', w)
for r in res:
print(r)

@ -1,44 +0,0 @@
from __future__ import absolute_import
import numpy as np
import theano
import theano.tensor as T
def floatX(X):
return np.asarray(X, dtype=theano.config.floatX)
def sharedX(X, dtype=theano.config.floatX, name=None):
return theano.shared(np.asarray(X, dtype=dtype), name=name)
def shared_zeros(shape, dtype=theano.config.floatX, name=None):
return sharedX(np.zeros(shape), dtype=dtype, name=name)
def shared_scalar(val=0., dtype=theano.config.floatX, name=None):
return theano.shared(np.cast[dtype](val))
def shared_ones(shape, dtype=theano.config.floatX, name=None):
return sharedX(np.ones(shape), dtype=dtype, name=name)
def alloc_zeros_matrix(*dims):
return T.alloc(np.cast[theano.config.floatX](0.), *dims)
def ndim_tensor(ndim):
if ndim == 1:
return T.vector()
elif ndim == 2:
return T.matrix()
elif ndim == 3:
return T.tensor3()
elif ndim == 4:
return T.tensor4()
return T.matrix()
def on_gpu():
return theano.config.device[:3] == 'gpu'

@ -1,331 +0,0 @@
import unittest
from numpy.testing import assert_allclose
import numpy as np
from keras.backend import theano_backend as KTH
from keras.backend import tensorflow_backend as KTF
def check_single_tensor_operation(function_name, input_shape, **kwargs):
val = np.random.random(input_shape) - 0.5
xth = KTH.variable(val)
xtf = KTF.variable(val)
zth = KTH.eval(getattr(KTH, function_name)(xth, **kwargs))
ztf = KTF.eval(getattr(KTF, function_name)(xtf, **kwargs))
assert zth.shape == ztf.shape
assert_allclose(zth, ztf, atol=1e-06)
def check_two_tensor_operation(function_name, x_input_shape,
y_input_shape, **kwargs):
xval = np.random.random(x_input_shape) - 0.5
xth = KTH.variable(xval)
xtf = KTF.variable(xval)
yval = np.random.random(y_input_shape) - 0.5
yth = KTH.variable(yval)
ytf = KTF.variable(yval)
zth = KTH.eval(getattr(KTH, function_name)(xth, yth, **kwargs))
ztf = KTF.eval(getattr(KTF, function_name)(xtf, ytf, **kwargs))
assert zth.shape == ztf.shape
assert_allclose(zth, ztf, atol=1e-06)
class TestBackend(unittest.TestCase):
def test_linear_operations(self):
check_two_tensor_operation('dot', (4, 2), (2, 4))
check_single_tensor_operation('transpose', (4, 2))
def test_shape_operations(self):
# concatenate
xval = np.random.random((4, 3))
xth = KTH.variable(xval)
xtf = KTF.variable(xval)
yval = np.random.random((4, 2))
yth = KTH.variable(yval)
ytf = KTF.variable(yval)
zth = KTH.eval(KTH.concatenate([xth, yth], axis=-1))
ztf = KTF.eval(KTF.concatenate([xtf, ytf], axis=-1))
assert zth.shape == ztf.shape
assert_allclose(zth, ztf, atol=1e-06)
check_single_tensor_operation('reshape', (4, 2), shape=(8, 1))
check_single_tensor_operation('permute_dimensions', (4, 2, 3),
pattern=(2, 0, 1))
check_single_tensor_operation('repeat', (4, 1), n=3)
check_single_tensor_operation('flatten', (4, 1))
check_single_tensor_operation('expand_dims', (4, 3), dim=-1)
check_single_tensor_operation('expand_dims', (4, 3, 2), dim=1)
check_single_tensor_operation('squeeze', (4, 3, 1), axis=2)
def test_value_manipulation(self):
val = np.random.random((4, 2))
xth = KTH.variable(val)
xtf = KTF.variable(val)
# get_value
valth = KTH.get_value(xth)
valtf = KTF.get_value(xtf)
assert valtf.shape == valth.shape
assert_allclose(valth, valtf, atol=1e-06)
# set_value
val = np.random.random((4, 2))
KTH.set_value(xth, val)
KTF.set_value(xtf, val)
valth = KTH.get_value(xth)
valtf = KTF.get_value(xtf)
assert valtf.shape == valth.shape
assert_allclose(valth, valtf, atol=1e-06)
# count_params
assert KTH.count_params(xth) == KTF.count_params(xtf)
def test_elementwise_operations(self):
check_single_tensor_operation('max', (4, 2))
check_single_tensor_operation('max', (4, 2), axis=1, keepdims=True)
check_single_tensor_operation('min', (4, 2))
check_single_tensor_operation('min', (4, 2), axis=1, keepdims=True)
check_single_tensor_operation('mean', (4, 2))
check_single_tensor_operation('mean', (4, 2), axis=1, keepdims=True)
check_single_tensor_operation('mean', (4, 2, 3), axis=-1, keepdims=True)
check_single_tensor_operation('std', (4, 2))
check_single_tensor_operation('std', (4, 2), axis=1, keepdims=True)
check_single_tensor_operation('prod', (4, 2))
check_single_tensor_operation('prod', (4, 2), axis=1, keepdims=True)
# does not work yet, wait for bool <-> int casting in TF (coming soon)
# check_single_tensor_operation('any', (4, 2))
# check_single_tensor_operation('any', (4, 2), axis=1, keepdims=True)
check_single_tensor_operation('argmax', (4, 2))
check_single_tensor_operation('argmax', (4, 2), axis=1)
check_single_tensor_operation('argmin', (4, 2))
check_single_tensor_operation('argmin', (4, 2), axis=1)
check_single_tensor_operation('square', (4, 2))
check_single_tensor_operation('abs', (4, 2))
check_single_tensor_operation('sqrt', (4, 2))
check_single_tensor_operation('exp', (4, 2))
check_single_tensor_operation('log', (4, 2))
check_single_tensor_operation('round', (4, 2))
check_single_tensor_operation('pow', (4, 2), a=3)
check_single_tensor_operation('clip', (4, 2), min_value=0.4,
max_value=0.6)
# two-tensor ops
check_two_tensor_operation('equal', (4, 2), (4, 2))
check_two_tensor_operation('maximum', (4, 2), (4, 2))
check_two_tensor_operation('minimum', (4, 2), (4, 2))
def test_gradient(self):
val = np.random.random((4, 2))
xth = KTH.variable(val)
xtf = KTF.variable(val)
expth = xth * KTH.exp(xth)
exptf = xtf * KTF.exp(xtf)
lossth = KTH.sum(expth)
losstf = KTF.sum(exptf)
gradth = KTH.gradients(lossth, [expth])
gradtf = KTF.gradients(losstf, [exptf])
zth = KTH.eval(gradth[0])
ztf = KTF.eval(gradtf[0])
assert zth.shape == ztf.shape
assert_allclose(zth, ztf, atol=1e-06)
def test_function(self):
val = np.random.random((4, 2))
input_val = np.random.random((4, 2))
xth = KTH.variable(val)
xtf = KTF.variable(val)
yth = KTH.placeholder(ndim=2)
ytf = KTF.placeholder(ndim=2)
exp_th = KTH.square(xth) + yth
exp_tf = KTF.square(xtf) + ytf
update_th = xth * 2
update_tf = xtf * 2
fth = KTH.function([yth], [exp_th], updates=[(xth, update_th)])
ftf = KTF.function([ytf], [exp_tf], updates=[(xtf, update_tf)])
function_outputs_th = fth([input_val])[0]
function_outputs_tf = ftf([input_val])[0]
assert function_outputs_th.shape == function_outputs_tf.shape
assert_allclose(function_outputs_th, function_outputs_tf, atol=1e-06)
new_val_th = KTH.get_value(xth)
new_val_tf = KTF.get_value(xtf)
assert new_val_th.shape == new_val_tf.shape
assert_allclose(new_val_th, new_val_tf, atol=1e-06)
def test_rnn(self):
# implement a simple RNN
input_dim = 8
output_dim = 4
timesteps = 5
input_val = np.random.random((32, timesteps, input_dim))
init_state_val = np.random.random((32, output_dim))
W_i_val = np.random.random((input_dim, output_dim))
W_o_val = np.random.random((output_dim, output_dim))
def rnn_step_fn(input_dim, output_dim, K):
W_i = K.variable(W_i_val)
W_o = K.variable(W_o_val)
def step_function(x, states):
assert len(states) == 1
prev_output = states[0]
output = K.dot(x, W_i) + K.dot(prev_output, W_o)
return output, [output]
return step_function
th_rnn_step_fn = rnn_step_fn(input_dim, output_dim, KTH)
inputs = KTH.variable(input_val)
initial_states = [KTH.variable(init_state_val)]
last_output, outputs, new_states = KTH.rnn(th_rnn_step_fn, inputs,
initial_states,
go_backwards=False,
masking=False)
th_last_output = KTH.eval(last_output)
th_outputs = KTH.eval(outputs)
assert len(new_states) == 1
th_state = KTH.eval(new_states[0])
tf_rnn_step_fn = rnn_step_fn(input_dim, output_dim, KTF)
inputs = KTF.variable(input_val)
initial_states = [KTF.variable(init_state_val)]
last_output, outputs, new_states = KTF.rnn(tf_rnn_step_fn, inputs,
initial_states,
go_backwards=False,
masking=False)
tf_last_output = KTF.eval(last_output)
tf_outputs = KTF.eval(outputs)
assert len(new_states) == 1
tf_state = KTF.eval(new_states[0])
assert_allclose(tf_last_output, th_last_output, atol=1e-06)
assert_allclose(tf_outputs, th_outputs, atol=1e-06)
assert_allclose(tf_state, th_state, atol=1e-06)
def test_switch(self):
val = np.random.random()
xth = KTH.variable(val)
xth = KTH.switch(xth >= 0.5, xth * 0.1, xth * 0.2)
xtf = KTF.variable(val)
xtf = KTF.switch(xtf >= 0.5, xtf * 0.1, xtf * 0.2)
zth = KTH.eval(xth)
ztf = KTF.eval(xtf)
assert zth.shape == ztf.shape
assert_allclose(zth, ztf, atol=1e-06)
def test_nn_operations(self):
check_single_tensor_operation('relu', (4, 2), alpha=0.1, max_value=0.5)
check_single_tensor_operation('softmax', (4, 10))
check_single_tensor_operation('softplus', (4, 10))
check_single_tensor_operation('sigmoid', (4, 2))
check_single_tensor_operation('hard_sigmoid', (4, 2))
check_single_tensor_operation('tanh', (4, 2))
# dropout
val = np.random.random((20, 20))
xth = KTH.variable(val)
xtf = KTF.variable(val)
zth = KTH.eval(KTH.dropout(xth, level=0.2))
ztf = KTF.eval(KTF.dropout(xtf, level=0.2))
assert zth.shape == ztf.shape
# dropout patterns are different, only check mean
assert np.abs(zth.mean() - ztf.mean()) < 0.05
check_two_tensor_operation('binary_crossentropy', (4, 2), (4, 2), from_logits=True)
check_two_tensor_operation('categorical_crossentropy', (4, 2), (4, 2), from_logits=True)
check_two_tensor_operation('binary_crossentropy', (4, 2), (4, 2), from_logits=False)
check_two_tensor_operation('categorical_crossentropy', (4, 2), (4, 2), from_logits=False)
# def test_conv2d(self):
# '''conv2d works "properly" with Theano and TF but outputs different
# values in each case. Cause unclear (input / kernel shape format?)
# '''
# # TH kernel shape: (depth, input_depth, rows, cols)
# check_two_tensor_operation('conv2d', (5, 3, 10, 12), (4, 3, 2, 2),
# strides=(1, 1), border_mode='valid')
# check_two_tensor_operation('conv2d', (5, 3, 10, 12), (4, 3, 2, 2),
# strides=(1, 1), border_mode='same')
# # TF kernel shape: (rows, cols, input_depth, depth)
# check_two_tensor_operation('conv2d', (5, 10, 12, 3), (2, 2, 3, 4),
# strides=(1, 1), border_mode='valid', dim_ordering='tf')
# check_two_tensor_operation('conv2d', (5, 10, 12, 3), (2, 2, 3, 4),
# strides=(1, 1), border_mode='same', dim_ordering='tf')
# check_two_tensor_operation('conv2d', (5, 3, 10, 12), (4, 3, 3, 3),
# strides=(1, 1), border_mode='valid')
# check_two_tensor_operation('conv2d', (5, 3, 10, 12), (4, 3, 3, 3),
# strides=(1, 1), border_mode='same')
# check_two_tensor_operation('conv2d', (5, 3, 10, 12), (4, 3, 3, 3),
# strides=(2, 2), border_mode='valid')
# def test_maxpool2d(self):
# '''maxpool2d works "properly" with Theano and TF but outputs different
# values in each case. Cause unclear (input shape format?)
# '''
# check_single_tensor_operation('maxpool2d', (5, 3, 10, 12), pool_size=(2, 2),
# strides=(1, 1), border_mode='valid')
# check_single_tensor_operation('maxpool2d', (5, 3, 9, 11), pool_size=(2, 2),
# strides=(1, 1), border_mode='valid')
# check_single_tensor_operation('maxpool2d', (5, 3, 9, 11), pool_size=(2, 3),
# strides=(1, 1), border_mode='valid')
def test_random_normal(self):
mean = 0.
std = 1.
rand = KTF.get_value(KTF.random_normal((1000, 1000), mean=mean, std=std))
assert(rand.shape == (1000, 1000))
assert(np.abs(np.mean(rand) - mean) < 0.01)
assert(np.abs(np.std(rand) - std) < 0.01)
rand = KTF.get_value(KTF.random_normal((1000, 1000), mean=mean, std=std))
assert(rand.shape == (1000, 1000))
assert(np.abs(np.mean(rand) - mean) < 0.01)
assert(np.abs(np.std(rand) - std) < 0.01)
def test_random_uniform(self):
mean = 0.
std = 1.
rand = KTF.get_value(KTF.random_normal((1000, 1000), mean=mean, std=std))
assert(rand.shape == (1000, 1000))
assert(np.abs(np.mean(rand) - mean) < 0.01)
assert(np.abs(np.std(rand) - std) < 0.01)
rand = KTF.get_value(KTF.random_normal((1000, 1000), mean=mean, std=std))
assert(rand.shape == (1000, 1000))
assert(np.abs(np.mean(rand) - mean) < 0.01)
assert(np.abs(np.std(rand) - std) < 0.01)
if __name__ == '__main__':
unittest.main()

@ -1,173 +0,0 @@
import unittest
import numpy as np
from numpy.testing import assert_allclose
from keras import backend as K
from keras.layers import convolutional
class TestConvolutions(unittest.TestCase):
def test_convolution_1d(self):
nb_samples = 9
nb_steps = 7
input_dim = 10
filter_length = 6
nb_filter = 5
weights_in = [np.ones((nb_filter, input_dim, filter_length, 1)),
np.ones(nb_filter)]
input = np.ones((nb_samples, nb_steps, input_dim))
for weight in [None, weights_in]:
for border_mode in ['valid', 'same']:
for subsample_length in [1]:
if border_mode == 'same' and subsample_length != 1:
continue
for W_regularizer in [None, 'l2']:
for b_regularizer in [None, 'l2']:
for act_regularizer in [None, 'l2']:
layer = convolutional.Convolution1D(
nb_filter, filter_length,
weights=weight,
border_mode=border_mode,
W_regularizer=W_regularizer,
b_regularizer=b_regularizer,
activity_regularizer=act_regularizer,
subsample_length=subsample_length,
input_shape=(None, input_dim))
layer.input = K.variable(input)
for train in [True, False]:
out = K.eval(layer.get_output(train))
assert input.shape[0] == out.shape[0]
if border_mode == 'same' and subsample_length == 1:
assert input.shape[1] == out.shape[1]
config = layer.get_config()
def test_maxpooling_1d(self):
nb_samples = 9
nb_steps = 7
input_dim = 10
input = np.ones((nb_samples, nb_steps, input_dim))
for stride in [1, 2]:
layer = convolutional.MaxPooling1D(stride=stride,
border_mode='valid')
layer.input = K.variable(input)
for train in [True, False]:
K.eval(layer.get_output(train))
config = layer.get_config()
def test_convolution_2d(self):
nb_samples = 8
nb_filter = 9
stack_size = 7
nb_row = 10
nb_col = 6
input_nb_row = 11
input_nb_col = 12
weights_in = [np.ones((nb_filter, stack_size, nb_row, nb_col)), np.ones(nb_filter)]
input = np.ones((nb_samples, stack_size, input_nb_row, input_nb_col))
for weight in [None, weights_in]:
for border_mode in ['valid', 'same']:
for subsample in [(1, 1), (2, 2)]:
if border_mode == 'same' and subsample != (1, 1):
continue
for W_regularizer in [None, 'l2']:
for b_regularizer in [None, 'l2']:
for act_regularizer in [None, 'l2']:
layer = convolutional.Convolution2D(
nb_filter, nb_row, nb_col,
weights=weight,
border_mode=border_mode,
W_regularizer=W_regularizer,
b_regularizer=b_regularizer,
activity_regularizer=act_regularizer,
subsample=subsample,
input_shape=(stack_size, None, None))
layer.input = K.variable(input)
for train in [True, False]:
out = K.eval(layer.get_output(train))
if border_mode == 'same' and subsample == (1, 1):
assert out.shape[2:] == input.shape[2:]
config = layer.get_config()
def test_maxpooling_2d(self):
nb_samples = 9
stack_size = 7
input_nb_row = 11
input_nb_col = 12
pool_size = (3, 3)
input = np.ones((nb_samples, stack_size, input_nb_row, input_nb_col))
for strides in [(1, 1), (2, 2)]:
layer = convolutional.MaxPooling2D(strides=strides,
border_mode='valid',
pool_size=pool_size)
layer.input = K.variable(input)
for train in [True, False]:
K.eval(layer.get_output(train))
config = layer.get_config()
def test_zero_padding_2d(self):
nb_samples = 9
stack_size = 7
input_nb_row = 11
input_nb_col = 12
input = np.ones((nb_samples, stack_size, input_nb_row, input_nb_col))
layer = convolutional.ZeroPadding2D(padding=(2, 2))
layer.input = K.variable(input)
for train in [True, False]:
out = K.eval(layer.get_output(train))
for offset in [0, 1, -1, -2]:
assert_allclose(out[:, :, offset, :], 0.)
assert_allclose(out[:, :, :, offset], 0.)
assert_allclose(out[:, :, 2:-2, 2:-2], 1.)
config = layer.get_config()
def test_upsample_1d(self):
nb_samples = 9
nb_steps = 7
input_dim = 10
input = np.ones((nb_samples, nb_steps, input_dim))
for length in [2, 3, 9]:
layer = convolutional.UpSample1D(length=length)
layer.input = K.variable(input)
for train in [True, False]:
out = K.eval(layer.get_output(train))
assert out.shape[1] == length * nb_steps
config = layer.get_config()
def test_upsample_2d(self):
nb_samples = 9
stack_size = 7
input_nb_row = 11
input_nb_col = 12
input = np.ones((nb_samples, stack_size, input_nb_row, input_nb_col))
for length_row in [2, 3, 9]:
for length_col in [2, 3, 9]:
layer = convolutional.UpSample2D(size=(length_row, length_col))
layer.input = K.variable(input)
for train in [True, False]:
out = K.eval(layer.get_output(train))
assert out.shape[2] == length_row * input_nb_row
assert out.shape[3] == length_col * input_nb_col
config = layer.get_config()
if __name__ == '__main__':
unittest.main()

@ -1,160 +0,0 @@
import unittest
import numpy as np
from numpy.testing import assert_allclose
from keras import backend as K
from keras.layers import core
class TestLayerBase(unittest.TestCase):
def test_input_output(self):
nb_samples = 10
input_dim = 5
layer = core.Layer()
# Once an input is provided, it should be reachable through the
# appropriate getters
input = np.ones((nb_samples, input_dim))
layer.input = K.variable(input)
for train in [True, False]:
assert_allclose(K.eval(layer.get_input(train)), input)
assert_allclose(K.eval(layer.get_output(train)), input)
def test_connections(self):
nb_samples = 10
input_dim = 5
layer1 = core.Layer()
layer2 = core.Layer()
input = np.ones((nb_samples, input_dim))
layer1.input = K.variable(input)
# After connecting, input of layer1 should be passed through
layer2.set_previous(layer1)
for train in [True, False]:
assert_allclose(K.eval(layer2.get_input(train)), input)
assert_allclose(K.eval(layer2.get_output(train)), input)
class TestConfigParams(unittest.TestCase):
"""
Test the constructor, config and params functions of all layers in core.
"""
def _runner(self, layer):
conf = layer.get_config()
assert (type(conf) == dict)
param = layer.get_params()
# Typically a list or a tuple, but may be any iterable
assert hasattr(param, '__iter__')
def test_base(self):
layer = core.Layer()
self._runner(layer)
def test_masked(self):
layer = core.MaskedLayer()
self._runner(layer)
def test_merge(self):
layer_1 = core.Layer()
layer_2 = core.Layer()
layer_1.set_input_shape((None,))
layer_2.set_input_shape((None,))
layer = core.Merge([layer_1, layer_2])
self._runner(layer)
def test_dropout(self):
layer = core.Dropout(0.5)
self._runner(layer)
def test_activation(self):
layer = core.Activation('linear')
self._runner(layer)
def test_reshape(self):
layer = core.Reshape(dims=(10, 10))
self._runner(layer)
def test_flatten(self):
layer = core.Flatten()
self._runner(layer)
def test_repeat_vector(self):
layer = core.RepeatVector(10)
self._runner(layer)
def test_dense(self):
layer = core.Dense(10, input_shape=(10,))
self._runner(layer)
def test_act_reg(self):
layer = core.ActivityRegularization(0.5, 0.5)
self._runner(layer)
def test_time_dist_dense(self):
layer = core.TimeDistributedDense(10, input_shape=(None, 10))
self._runner(layer)
def test_time_dist_merge(self):
layer = core.TimeDistributedMerge()
self._runner(layer)
def test_autoencoder(self):
layer_1 = core.Layer()
layer_2 = core.Layer()
layer = core.AutoEncoder(layer_1, layer_2)
self._runner(layer)
def test_maxout_dense(self):
layer = core.MaxoutDense(10, 10)
self._runner(layer)
class TestMasking(unittest.TestCase):
"""Test the Masking class"""
def test_sequences(self):
"""Test masking sequences with zeroes as padding"""
# integer inputs, one per timestep, like embeddings
layer = core.Masking()
func = K.function([layer.input], [layer.get_output_mask()])
input_data = np.array([[[1], [2], [3], [0]],
[[0], [4], [5], [0]]], dtype=np.int32)
# This is the expected output mask, one dimension less
expected = np.array([[1, 1, 1, 0], [0, 1, 1, 0]])
# get mask for this input
output = func([input_data])[0]
self.assertTrue(np.all(output == expected))
def test_non_zero(self):
"""Test masking with non-zero mask value"""
layer = core.Masking(5)
func = K.function([layer.input], [layer.get_output_mask()])
input_data = np.array([[[1, 1], [2, 1], [3, 1], [5, 5]],
[[1, 5], [5, 0], [0, 0], [0, 0]]],
dtype=np.int32)
output = func([input_data])[0]
expected = np.array([[1, 1, 1, 0], [1, 1, 1, 1]])
self.assertTrue(np.all(output == expected))
def test_non_zero_output(self):
"""Test output of masking layer with non-zero mask value"""
layer = core.Masking(5)
func = K.function([layer.input], [layer.get_output()])
input_data = np.array([[[1, 1], [2, 1], [3, 1], [5, 5]],
[[1, 5], [5, 0], [0, 0], [0, 0]]],
dtype=np.int32)
output = func([input_data])[0]
expected = np.array([[[1, 1], [2, 1], [3, 1], [0, 0]],
[[1, 5], [5, 0], [0, 0], [0, 0]]])
self.assertTrue(np.all(output == expected))
if __name__ == '__main__':
unittest.main()

@ -1,56 +0,0 @@
import unittest
import numpy as np
from keras.layers import recurrent
from keras import backend as K
nb_samples, timesteps, input_dim, output_dim = 3, 3, 10, 5
def _runner(layer_class):
"""
All the recurrent layers share the same interface,
so we can run through them with a single function.
"""
for ret_seq in [True, False]:
layer = layer_class(output_dim, return_sequences=ret_seq,
weights=None, input_shape=(None, input_dim))
layer.input = K.variable(np.ones((nb_samples, timesteps, input_dim)))
config = layer.get_config()
for train in [True, False]:
out = K.eval(layer.get_output(train))
# Make sure the output has the desired shape
if ret_seq:
assert(out.shape == (nb_samples, timesteps, output_dim))
else:
assert(out.shape == (nb_samples, output_dim))
mask = layer.get_output_mask(train)
class TestRNNS(unittest.TestCase):
"""
Test all the RNNs using a generic test runner function defined above.
"""
def test_simple(self):
_runner(recurrent.SimpleRNN)
def test_gru(self):
_runner(recurrent.GRU)
def test_lstm(self):
_runner(recurrent.LSTM)
# def test_jzs1(self):
# _runner(recurrent.JZS1)
# def test_jzs2(self):
# _runner(recurrent.JZS2)
# def test_jzs3(self):
# _runner(recurrent.JZS3)
if __name__ == '__main__':
unittest.main()

@ -1,76 +0,0 @@
import unittest
from keras import backend as K
import numpy as np
from numpy.testing import assert_allclose
def get_standard_values():
'''
These are just a set of floats used for testing the activation
functions, and are useful in multiple tests.
'''
return np.array([[0, 0.1, 0.5, 0.9, 1.0]], dtype=K.floatx())
class TestActivations(unittest.TestCase):
def test_softmax(self):
from keras.activations import softmax as s
# Test using a reference implementation of softmax
def softmax(values):
m = np.max(values)
e = np.exp(values - m)
return e / np.sum(e)
x = K.placeholder(ndim=2)
exp = s(x)
f = K.function([x], [exp])
test_values = get_standard_values()
result = f([test_values])[0]
expected = softmax(test_values)
assert_allclose(result, expected, rtol=1e-05)
def test_relu(self):
'''
Relu implementation doesn't depend on the value being
a theano variable. Testing ints, floats and theano tensors.
'''
from keras.activations import relu as r
x = K.placeholder(ndim=2)
exp = r(x)
f = K.function([x], [exp])
test_values = get_standard_values()
result = f([test_values])[0]
# because no negatives in test values
assert_allclose(result, test_values, rtol=1e-05)
def test_tanh(self):
from keras.activations import tanh as t
test_values = get_standard_values()
x = K.placeholder(ndim=2)
exp = t(x)
f = K.function([x], [exp])
result = f([test_values])[0]
expected = np.tanh(test_values)
assert_allclose(result, expected, rtol=1e-05)
def test_linear(self):
'''
This function does no input validation, it just returns the thing
that was passed in.
'''
from keras.activations import linear as l
xs = [1, 5, True, None, 'foo']
for x in xs:
assert x == l(x)
if __name__ == '__main__':
unittest.main()

@ -1,72 +0,0 @@
import unittest
import numpy as np
from numpy.testing import assert_allclose
from keras import backend as K
class TestConstraints(unittest.TestCase):
def setUp(self):
self.some_values = [0.1, 0.5, 3, 8, 1e-7]
np.random.seed(3537)
self.example_array = np.random.random((100, 100)) * 100. - 50.
self.example_array[0, 0] = 0. # 0 could possibly cause trouble
def test_maxnorm(self):
from keras.constraints import maxnorm
for m in self.some_values:
norm_instance = maxnorm(m)
normed = norm_instance(K.variable(self.example_array))
assert (np.all(K.eval(normed) < m))
# a more explicit example
norm_instance = maxnorm(2.0)
x = np.array([[0, 0, 0], [1.0, 0, 0], [3, 0, 0], [3, 3, 3]]).T
x_normed_target = np.array([[0, 0, 0], [1.0, 0, 0],
[2.0, 0, 0],
[2. / np.sqrt(3), 2. / np.sqrt(3), 2. / np.sqrt(3)]]).T
x_normed_actual = K.eval(norm_instance(K.variable(x)))
assert_allclose(x_normed_actual, x_normed_target, rtol=1e-05)
def test_nonneg(self):
from keras.constraints import nonneg
nonneg_instance = nonneg()
normed = nonneg_instance(K.variable(self.example_array))
assert (np.all(np.min(K.eval(normed), axis=1) == 0.))
def test_identity(self):
from keras.constraints import identity
identity_instance = identity()
normed = identity_instance(self.example_array)
assert (np.all(normed == self.example_array))
def test_identity_oddballs(self):
"""
test the identity constraint on some more exotic input.
this does not need to pass for the desired real life behaviour,
but it should in the current implementation.
"""
from keras.constraints import identity
identity_instance = identity()
oddball_examples = ["Hello", [1], -1, None]
assert(oddball_examples == identity_instance(oddball_examples))
def test_unitnorm(self):
from keras.constraints import unitnorm
unitnorm_instance = unitnorm()
normalized = unitnorm_instance(K.variable(self.example_array))
norm_of_normalized = np.sqrt(np.sum(K.eval(normalized)**2, axis=1))
# in the unit norm constraint, it should be equal to 1.
difference = norm_of_normalized - 1.
largest_difference = np.max(np.abs(difference))
assert np.abs(largest_difference) < 10e-5
if __name__ == '__main__':
unittest.main()

@ -1,102 +0,0 @@
import unittest
import numpy as np
from numpy.testing import assert_allclose
from keras.layers import normalization
from keras.models import Sequential
from keras import backend as K
class TestBatchNormalization(unittest.TestCase):
def setUp(self):
self.input_1 = np.arange(10)
self.input_2 = np.zeros(10)
self.input_3 = np.ones((10))
self.input_shapes = [np.ones((10, 10)), np.ones((10, 10, 10))]
def test_setup(self):
norm_m0 = normalization.BatchNormalization(input_shape=(10, 10))
norm_m1 = normalization.BatchNormalization(input_shape=(10, 10), mode=1)
# mode 3 does not exist
self.assertRaises(Exception,
normalization.BatchNormalization(input_shape=(10, 10), mode=3))
def test_mode_0(self):
model = Sequential()
norm_m0 = normalization.BatchNormalization(input_shape=(10,))
model.add(norm_m0)
model.compile(loss='mse', optimizer='sgd')
# centered on 5.0, variance 10.0
X = np.random.normal(loc=5.0, scale=10.0, size=(1000, 10))
model.fit(X, X, nb_epoch=5, verbose=0)
norm_m0.input = K.variable(X)
out = (norm_m0.get_output(train=True) - norm_m0.beta) / norm_m0.gamma
self.assertAlmostEqual(K.eval(K.mean(out)), 0.0, places=1)
self.assertAlmostEqual(K.eval(K.std(out)), 1.0, places=1)
def test_mode_1(self):
norm_m1 = normalization.BatchNormalization(input_shape=(10,), mode=1)
for inp in [self.input_1, self.input_2, self.input_3]:
norm_m1.input = K.variable(inp)
out = (norm_m1.get_output(train=True) - norm_m1.beta) / norm_m1.gamma
self.assertAlmostEqual(K.eval(K.mean(out)), 0.0)
if inp.std() > 0.:
self.assertAlmostEqual(K.eval(K.std(out)), 1.0, places=2)
else:
self.assertAlmostEqual(K.eval(K.std(out)), 0.0, places=2)
def test_shapes(self):
"""
Test batch normalization with various input shapes
"""
for inp in self.input_shapes:
norm_m0 = normalization.BatchNormalization(input_shape=inp.shape, mode=0)
norm_m0.input = K.variable(inp)
out = (norm_m0.get_output(train=True) - norm_m0.beta) / norm_m0.gamma
norm_m1 = normalization.BatchNormalization(input_shape=inp.shape, mode=1)
norm_m1.input = K.variable(inp)
out = (norm_m1.get_output(train=True) - norm_m1.beta) / norm_m1.gamma
def test_weight_init(self):
"""
Test weight initialization
"""
norm_m1 = normalization.BatchNormalization(input_shape=(10,), mode=1,
weights=[np.ones(10), np.ones(10), np.zeros(10), np.zeros(10)])
for inp in [self.input_1, self.input_2, self.input_3]:
norm_m1.input = K.variable(inp)
out = (norm_m1.get_output(train=True) - np.ones(10)) / 1.
self.assertAlmostEqual(K.eval(K.mean(out)), 0.0)
if inp.std() > 0.:
self.assertAlmostEqual(K.eval(K.std(out)), 1.0, places=2)
else:
self.assertAlmostEqual(K.eval(K.std(out)), 0.0, places=2)
assert_allclose(K.eval(norm_m1.gamma), np.ones(10))
assert_allclose(K.eval(norm_m1.beta), np.ones(10))
def test_config(self):
norm = normalization.BatchNormalization(input_shape=(10, 10), mode=1,
epsilon=0.1, momentum=0.9)
conf = norm.get_config()
conf_target = {"input_shape": (10, 10),
"name": normalization.BatchNormalization.__name__,
"epsilon": 0.1, "mode": 1, "momentum": 0.9}
self.assertDictEqual(conf, conf_target)
def test_save_weights(self):
norm = normalization.BatchNormalization(input_shape=(10, 10), mode=1,
epsilon=0.1)
weights = norm.get_weights()
assert(len(weights) == 4)
norm.set_weights(weights)
if __name__ == '__main__':
unittest.main()

@ -1,48 +0,0 @@
from __future__ import print_function
import unittest
from keras.datasets import cifar10, cifar100, reuters, imdb, mnist
class TestDatasets(unittest.TestCase):
def test_cifar(self):
print('cifar10')
(X_train, y_train), (X_test, y_test) = cifar10.load_data()
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)
print('cifar100 fine')
(X_train, y_train), (X_test, y_test) = cifar100.load_data('fine')
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)
print('cifar100 coarse')
(X_train, y_train), (X_test, y_test) = cifar100.load_data('coarse')
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)
def test_reuters(self):
print('reuters')
(X_train, y_train), (X_test, y_test) = reuters.load_data()
def test_mnist(self):
print('mnist')
(X_train, y_train), (X_test, y_test) = mnist.load_data()
print(X_train.shape)
print(X_test.shape)
print(y_train.shape)
print(y_test.shape)
def test_imdb(self):
print('imdb')
(X_train, y_train), (X_test, y_test) = imdb.load_data()
if __name__ == '__main__':
print('Test datasets')
unittest.main()

@ -1,30 +0,0 @@
import unittest
import numpy as np
from keras.models import Sequential
from keras.layers.core import Dense, Activation, Flatten
from keras.layers.embeddings import Embedding
from keras.constraints import unitnorm
from keras import backend as K
class TestEmbedding(unittest.TestCase):
def setUp(self):
self.X1 = np.array([[1], [2]], dtype='int32')
self.W1 = np.array([[0.1, 0.2], [0.3, 0.4], [0.5, 0.6]], dtype='float32')
def test_unitnorm_constraint(self):
lookup = Sequential()
lookup.add(Embedding(3, 2, weights=[self.W1],
W_constraint=unitnorm(),
input_length=1))
lookup.add(Flatten())
lookup.add(Dense(1))
lookup.add(Activation('sigmoid'))
lookup.compile(loss='binary_crossentropy', optimizer='sgd',
class_mode='binary')
lookup.train_on_batch(self.X1, np.array([[1], [0]], dtype='int32'))
norm = np.linalg.norm(K.get_value(lookup.params[0]), axis=1)
self.assertTrue(np.allclose(norm, np.ones_like(norm).astype('float32')))
if __name__ == '__main__':
unittest.main()

@ -1,270 +0,0 @@
from __future__ import print_function
import unittest
import numpy as np
np.random.seed(1337)
from keras.models import Graph, Sequential
from keras.layers import containers
from keras.layers.core import Dense, Activation
from keras.utils.test_utils import get_test_data
X = np.random.random((100, 32))
X2 = np.random.random((100, 32))
y = np.random.random((100, 4))
y2 = np.random.random((100,))
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=1000,
nb_test=200,
input_shape=(32,),
classification=False,
output_shape=(4,))
(X2_train, y2_train), (X2_test, y2_test) = get_test_data(nb_train=1000,
nb_test=200,
input_shape=(32,),
classification=False,
output_shape=(1,))
class TestGraph(unittest.TestCase):
def test_1o_1i(self):
print('test a non-sequential graph with 1 input and 1 output')
graph = Graph()
graph.add_input(name='input1', input_shape=(32,))
graph.add_node(Dense(16), name='dense1', input='input1')
graph.add_node(Dense(4), name='dense2', input='input1')
graph.add_node(Dense(4), name='dense3', input='dense1')
graph.add_output(name='output1',
inputs=['dense2', 'dense3'],
merge_mode='sum')
graph.compile('rmsprop', {'output1': 'mse'})
history = graph.fit({'input1': X_train, 'output1': y_train},
nb_epoch=10)
out = graph.predict({'input1': X_test})
assert(type(out == dict))
assert(len(out) == 1)
loss = graph.test_on_batch({'input1': X_test, 'output1': y_test})
loss = graph.train_on_batch({'input1': X_test, 'output1': y_test})
loss = graph.evaluate({'input1': X_test, 'output1': y_test})
print(loss)
assert(loss < 2.5)
# test validation split
history = graph.fit({'input1': X_train, 'output1': y_train},
validation_split=0.2, nb_epoch=1)
# test validation data
history = graph.fit({'input1': X_train, 'output1': y_train},
validation_data={'input1': X_train, 'output1': y_train},
nb_epoch=1)
def test_1o_1i_2(self):
print('test a more complex non-sequential graph with 1 input and 1 output')
graph = Graph()
graph.add_input(name='input1', input_shape=(32,))
graph.add_node(Dense(16), name='dense1', input='input1')
graph.add_node(Dense(4), name='dense2-0', input='input1')
graph.add_node(Activation('relu'), name='dense2', input='dense2-0')
graph.add_node(Dense(16), name='dense3', input='dense2')
graph.add_node(Dense(4), name='dense4', inputs=['dense1', 'dense3'],
merge_mode='sum')
graph.add_output(name='output1', inputs=['dense2', 'dense4'],
merge_mode='sum')
graph.compile('rmsprop', {'output1': 'mse'})
history = graph.fit({'input1': X_train, 'output1': y_train},
nb_epoch=10)
out = graph.predict({'input1': X_train})
assert(type(out == dict))
assert(len(out) == 1)
loss = graph.test_on_batch({'input1': X_test, 'output1': y_test})
loss = graph.train_on_batch({'input1': X_test, 'output1': y_test})
loss = graph.evaluate({'input1': X_test, 'output1': y_test})
print(loss)
assert(loss < 2.5)
graph.get_config(verbose=1)
def test_1o_2i(self):
print('test a non-sequential graph with 2 inputs and 1 output')
graph = Graph()
graph.add_input(name='input1', input_shape=(32,))
graph.add_input(name='input2', input_shape=(32,))
graph.add_node(Dense(16), name='dense1', input='input1')
graph.add_node(Dense(4), name='dense2', input='input2')
graph.add_node(Dense(4), name='dense3', input='dense1')
graph.add_output(name='output1', inputs=['dense2', 'dense3'],
merge_mode='sum')
graph.compile('rmsprop', {'output1': 'mse'})
history = graph.fit({'input1': X_train, 'input2': X2_train, 'output1': y_train},
nb_epoch=10)
out = graph.predict({'input1': X_test, 'input2': X2_test})
assert(type(out == dict))
assert(len(out) == 1)
loss = graph.test_on_batch({'input1': X_test, 'input2': X2_test, 'output1': y_test})
loss = graph.train_on_batch({'input1': X_test, 'input2': X2_test, 'output1': y_test})
loss = graph.evaluate({'input1': X_test, 'input2': X2_test, 'output1': y_test})
print(loss)
assert(loss < 3.0)
graph.get_config(verbose=1)
def test_2o_1i_weights(self):
print('test a non-sequential graph with 1 input and 2 outputs')
graph = Graph()
graph.add_input(name='input1', input_shape=(32,))
graph.add_node(Dense(16), name='dense1', input='input1')
graph.add_node(Dense(4), name='dense2', input='input1')
graph.add_node(Dense(1), name='dense3', input='dense1')
graph.add_output(name='output1', input='dense2')
graph.add_output(name='output2', input='dense3')
graph.compile('rmsprop', {'output1': 'mse', 'output2': 'mse'})
history = graph.fit({'input1': X_train, 'output1': y_train, 'output2': y2_train},
nb_epoch=10)
out = graph.predict({'input1': X_test})
assert(type(out == dict))
assert(len(out) == 2)
loss = graph.test_on_batch({'input1': X_test, 'output1': y_test, 'output2': y2_test})
loss = graph.train_on_batch({'input1': X_test, 'output1': y_test, 'output2': y2_test})
loss = graph.evaluate({'input1': X_test, 'output1': y_test, 'output2': y2_test})
print(loss)
assert(loss < 4.)
print('test weight saving')
graph.save_weights('temp.h5', overwrite=True)
graph = Graph()
graph.add_input(name='input1', input_shape=(32,))
graph.add_node(Dense(16), name='dense1', input='input1')
graph.add_node(Dense(4), name='dense2', input='input1')
graph.add_node(Dense(1), name='dense3', input='dense1')
graph.add_output(name='output1', input='dense2')
graph.add_output(name='output2', input='dense3')
graph.compile('rmsprop', {'output1': 'mse', 'output2': 'mse'})
graph.load_weights('temp.h5')
nloss = graph.evaluate({'input1': X_test, 'output1': y_test, 'output2': y2_test})
print(nloss)
assert(loss == nloss)
def test_2o_1i_sample_weights(self):
print('test a non-sequential graph with 1 input and 2 outputs with sample weights')
graph = Graph()
graph.add_input(name='input1', input_shape=(32,))
graph.add_node(Dense(16), name='dense1', input='input1')
graph.add_node(Dense(4), name='dense2', input='input1')
graph.add_node(Dense(1), name='dense3', input='dense1')
graph.add_output(name='output1', input='dense2')
graph.add_output(name='output2', input='dense3')
weights1 = np.random.uniform(size=y_train.shape[0])
weights2 = np.random.uniform(size=y2_train.shape[0])
weights1_test = np.random.uniform(size=y_test.shape[0])
weights2_test = np.random.uniform(size=y2_test.shape[0])
graph.compile('rmsprop', {'output1': 'mse', 'output2': 'mse'})
history = graph.fit({'input1': X_train, 'output1': y_train, 'output2': y2_train},
nb_epoch=10,
sample_weight={'output1': weights1, 'output2': weights2})
out = graph.predict({'input1': X_test})
assert(type(out == dict))
assert(len(out) == 2)
loss = graph.test_on_batch({'input1': X_test, 'output1': y_test, 'output2': y2_test},
sample_weight={'output1': weights1_test, 'output2': weights2_test})
loss = graph.train_on_batch({'input1': X_train, 'output1': y_train, 'output2': y2_train},
sample_weight={'output1': weights1, 'output2': weights2})
loss = graph.evaluate({'input1': X_train, 'output1': y_train, 'output2': y2_train},
sample_weight={'output1': weights1, 'output2': weights2})
print(loss)
def test_recursive(self):
print('test layer-like API')
graph = containers.Graph()
graph.add_input(name='input1', input_shape=(32,))
graph.add_node(Dense(16), name='dense1', input='input1')
graph.add_node(Dense(4), name='dense2', input='input1')
graph.add_node(Dense(4), name='dense3', input='dense1')
graph.add_output(name='output1', inputs=['dense2', 'dense3'],
merge_mode='sum')
seq = Sequential()
seq.add(Dense(32, input_shape=(32,)))
seq.add(graph)
seq.add(Dense(4))
seq.compile('rmsprop', 'mse')
history = seq.fit(X_train, y_train, batch_size=10, nb_epoch=10)
loss = seq.evaluate(X_test, y_test)
print(loss)
assert(loss < 2.5)
loss = seq.evaluate(X_test, y_test, show_accuracy=True)
pred = seq.predict(X_test)
seq.get_config(verbose=1)
def test_create_output(self):
print('test create_output argument')
graph = Graph()
graph.add_input(name='input1', input_shape=(32,))
graph.add_node(Dense(16), name='dense1', input='input1')
graph.add_node(Dense(4), name='dense2', input='input1')
graph.add_node(Dense(4), name='dense3', input='dense1')
graph.add_node(Dense(4), name='output1', inputs=['dense2', 'dense3'],
merge_mode='sum', create_output=True)
graph.compile('rmsprop', {'output1': 'mse'})
history = graph.fit({'input1': X_train, 'output1': y_train},
nb_epoch=10)
out = graph.predict({'input1': X_test})
assert(type(out == dict))
assert(len(out) == 1)
loss = graph.test_on_batch({'input1': X_test, 'output1': y_test})
loss = graph.train_on_batch({'input1': X_test, 'output1': y_test})
loss = graph.evaluate({'input1': X_test, 'output1': y_test})
print(loss)
assert(loss < 2.5)
def test_count_params(self):
print('test count params')
nb_units = 100
nb_classes = 2
graph = Graph()
graph.add_input(name='input1', input_shape=(32,))
graph.add_input(name='input2', input_shape=(32,))
graph.add_node(Dense(nb_units),
name='dense1', input='input1')
graph.add_node(Dense(nb_classes),
name='dense2', input='input2')
graph.add_node(Dense(nb_classes),
name='dense3', input='dense1')
graph.add_output(name='output', inputs=['dense2', 'dense3'],
merge_mode='sum')
n = 32 * nb_units + nb_units
n += 32 * nb_classes + nb_classes
n += nb_units * nb_classes + nb_classes
self.assertEqual(n, graph.count_params())
graph.compile('rmsprop', {'output': 'binary_crossentropy'})
self.assertEqual(n, graph.count_params())
if __name__ == '__main__':
print('Test graph model')
unittest.main()

@ -1,44 +0,0 @@
import numpy as np
np.random.seed(1337)
import unittest
from keras.models import Sequential, weighted_objective
from keras.layers.core import TimeDistributedDense, Masking
from keras import objectives
from keras import backend as K
class TestLossMasking(unittest.TestCase):
def test_loss_masking(self):
X = np.array(
[[[1, 1], [2, 1], [3, 1], [5, 5]],
[[1, 5], [5, 0], [0, 0], [0, 0]]], dtype=np.int32)
model = Sequential()
model.add(Masking(mask_value=0, input_shape=(None, 2)))
model.add(TimeDistributedDense(1, init='one'))
model.compile(loss='mse', optimizer='sgd')
y = model.predict(X)
history = model.fit(X, 4 * y, nb_epoch=1, batch_size=2, verbose=1)
assert history.history['loss'][0] == 285.
def test_loss_masking_time(self):
weighted_loss = weighted_objective(objectives.get('mae'))
shape = (3, 4, 2)
X = np.arange(24).reshape(shape)
Y = 2 * X
# Normally the trailing 1 is added by standardize_weights
weights = np.ones((3,))
mask = np.ones((3, 4))
mask[1, 0] = 0
out = K.eval(weighted_loss(K.variable(X),
K.variable(Y),
K.variable(weights),
K.variable(mask)))
print(out)
if __name__ == '__main__':
print('Test loss masking')
unittest.main()

@ -1,159 +0,0 @@
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
np.random.seed(1336) # for reproducibility
from keras.datasets import mnist
from keras.models import Sequential, Graph
from keras.layers.core import Dense, Activation
from keras.utils import np_utils
import unittest
nb_classes = 10
batch_size = 128
nb_epoch = 10
weighted_class = 9
standard_weight = 1
high_weight = 5
max_train_samples = 5000
max_test_samples = 1000
# the data, shuffled and split between tran and test sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(60000, 784)[:max_train_samples]
X_test = X_test.reshape(10000, 784)[:max_test_samples]
X_train = X_train.astype("float32") / 255
X_test = X_test.astype("float32") / 255
# convert class vectors to binary class matrices
y_train = y_train[:max_train_samples]
y_test = y_test[:max_test_samples]
Y_train = np_utils.to_categorical(y_train, nb_classes)
Y_test = np_utils.to_categorical(y_test, nb_classes)
test_ids = np.where(y_test == np.array(weighted_class))[0]
class_weight = dict([(i, standard_weight) for i in range(nb_classes)])
class_weight[weighted_class] = high_weight
sample_weight = np.ones((y_train.shape[0])) * standard_weight
sample_weight[y_train == weighted_class] = high_weight
def create_sequential_model():
model = Sequential()
model.add(Dense(50, input_shape=(784,)))
model.add(Activation('relu'))
model.add(Dense(10))
model.add(Activation('softmax'))
return model
def create_graph_model():
model = Graph()
model.add_input(name='input', input_shape=(784,))
model.add_node(Dense(50, activation='relu'), name='d1', input='input')
model.add_node(Dense(10, activation='softmax'), name='d2', input='d1')
model.add_output(name='output', input='d2')
return model
def _test_weights_sequential(model, class_weight=None, sample_weight=None):
if sample_weight is not None:
model.fit(X_train, Y_train, batch_size=batch_size,
nb_epoch=nb_epoch // 3, verbose=0,
class_weight=class_weight, sample_weight=sample_weight)
model.fit(X_train, Y_train, batch_size=batch_size,
nb_epoch=nb_epoch // 3, verbose=0,
class_weight=class_weight, sample_weight=sample_weight,
validation_split=0.1)
model.fit(X_train, Y_train, batch_size=batch_size,
nb_epoch=nb_epoch // 3, verbose=0,
class_weight=class_weight, sample_weight=sample_weight,
validation_data=(X_train, Y_train, sample_weight))
else:
model.fit(X_train, Y_train, batch_size=batch_size,
nb_epoch=nb_epoch // 2, verbose=0,
class_weight=class_weight, sample_weight=sample_weight)
model.fit(X_train, Y_train, batch_size=batch_size,
nb_epoch=nb_epoch // 2, verbose=0,
class_weight=class_weight, sample_weight=sample_weight,
validation_split=0.1)
model.train_on_batch(X_train[:32], Y_train[:32],
class_weight=class_weight,
sample_weight=sample_weight[:32] if sample_weight is not None else None)
model.test_on_batch(X_train[:32], Y_train[:32],
sample_weight=sample_weight[:32] if sample_weight is not None else None)
score = model.evaluate(X_test[test_ids, :], Y_test[test_ids, :], verbose=0)
return score
def _test_weights_graph(model, class_weight=None, sample_weight=None):
model.fit({'input': X_train, 'output': Y_train},
batch_size=batch_size, nb_epoch=nb_epoch // 2, verbose=0,
class_weight={'output': class_weight},
sample_weight={'output': sample_weight})
model.fit({'input': X_train, 'output': Y_train},
batch_size=batch_size, nb_epoch=nb_epoch // 2, verbose=0,
class_weight={'output': class_weight},
sample_weight={'output': sample_weight}, validation_split=0.1)
model.train_on_batch({'input': X_train[:32], 'output': Y_train[:32]},
class_weight={'output': class_weight},
sample_weight={'output': sample_weight[:32] if sample_weight is not None else None})
model.test_on_batch({'input': X_train[:32], 'output': Y_train[:32]},
sample_weight={'output': sample_weight[:32] if sample_weight is not None else None})
score = model.evaluate({'input': X_test[test_ids, :],
'output': Y_test[test_ids, :]},
verbose=0)
return score
class TestLossWeighting(unittest.TestCase):
def test_sequential(self):
for loss in ['mae', 'mse']:
print('loss:', loss)
print('sequential')
# no weights: reference point
model = create_sequential_model()
model.compile(loss=loss, optimizer='rmsprop')
standard_score = _test_weights_sequential(model)
# test class_weight
model = create_sequential_model()
model.compile(loss=loss, optimizer='rmsprop')
score = _test_weights_sequential(model, class_weight=class_weight)
print('score:', score, ' vs.', standard_score)
self.assertTrue(score < standard_score)
# test sample_weight
model = create_sequential_model()
model.compile(loss=loss, optimizer='rmsprop')
score = _test_weights_sequential(model, sample_weight=sample_weight)
print('score:', score, ' vs.', standard_score)
self.assertTrue(score < standard_score)
def test_graph(self):
for loss in ['mae', 'mse']:
print('loss:', loss)
print('graph')
# no weights: reference point
model = create_graph_model()
model.compile(loss={'output': loss}, optimizer='rmsprop')
standard_score = _test_weights_graph(model)
# test class_weight
model = create_graph_model()
model.compile(loss={'output': loss}, optimizer='rmsprop')
score = _test_weights_graph(model, class_weight=class_weight)
print('score:', score, ' vs.', standard_score)
self.assertTrue(score < standard_score)
# test sample_weight
model = create_graph_model()
model.compile(loss={'output': loss}, optimizer='rmsprop')
score = _test_weights_graph(model, sample_weight=sample_weight)
print('score:', score, ' vs.', standard_score)
self.assertTrue(score < standard_score)
if __name__ == '__main__':
print('Test class_weight and sample_weight')
unittest.main()

@ -1,59 +0,0 @@
from __future__ import print_function
import numpy as np
np.random.seed(1337)
from keras.utils.test_utils import get_test_data
from keras.optimizers import SGD, RMSprop, Adagrad, Adadelta, Adam
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.utils.np_utils import to_categorical
import unittest
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=1000, nb_test=200, input_shape=(10,),
classification=True, nb_class=2)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
def get_model(input_dim, nb_hidden, output_dim):
model = Sequential()
model.add(Dense(nb_hidden, input_shape=(input_dim,)))
model.add(Activation('relu'))
model.add(Dense(output_dim))
model.add(Activation('softmax'))
return model
def _test_optimizer(optimizer, target=0.9):
model = get_model(X_train.shape[1], 10, y_train.shape[1])
model.compile(loss='categorical_crossentropy', optimizer=optimizer)
history = model.fit(X_train, y_train, nb_epoch=12, batch_size=16, validation_data=(X_test, y_test), show_accuracy=True, verbose=2)
return history.history['val_acc'][-1] > target
class TestOptimizers(unittest.TestCase):
def test_sgd(self):
print('test SGD')
sgd = SGD(lr=0.01, momentum=0.9, nesterov=True)
self.assertTrue(_test_optimizer(sgd))
def test_rmsprop(self):
print('test RMSprop')
self.assertTrue(_test_optimizer(RMSprop()))
def test_adagrad(self):
print('test Adagrad')
self.assertTrue(_test_optimizer(Adagrad()))
def test_adadelta(self):
print('test Adadelta')
self.assertTrue(_test_optimizer(Adadelta()))
def test_adam(self):
print('test Adam')
self.assertTrue(_test_optimizer(Adam()))
if __name__ == '__main__':
print('Test optimizers')
unittest.main()

@ -1,62 +0,0 @@
import unittest
import numpy as np
np.random.seed(1337) # for reproducibility
from keras.models import Sequential
from keras.layers.core import Merge, Dense, Activation, Flatten, ActivityRegularization
from keras.layers.embeddings import Embedding
from keras.datasets import mnist
from keras.utils import np_utils
from keras import regularizers
nb_classes = 10
batch_size = 128
nb_epoch = 5
weighted_class = 9
standard_weight = 1
high_weight = 5
max_train_samples = 5000
max_test_samples = 1000
# the data, shuffled and split between tran and test sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(60000, 784)[:max_train_samples]
X_test = X_test.reshape(10000, 784)[:max_test_samples]
X_train = X_train.astype("float32") / 255
X_test = X_test.astype("float32") / 255
# convert class vectors to binary class matrices
y_train = y_train[:max_train_samples]
y_test = y_test[:max_test_samples]
Y_train = np_utils.to_categorical(y_train, nb_classes)
Y_test = np_utils.to_categorical(y_test, nb_classes)
test_ids = np.where(y_test == np.array(weighted_class))[0]
def create_model(weight_reg=None, activity_reg=None):
model = Sequential()
model.add(Dense(50, input_shape=(784,)))
model.add(Activation('relu'))
model.add(Dense(10, W_regularizer=weight_reg, activity_regularizer=activity_reg))
model.add(Activation('softmax'))
return model
class TestRegularizers(unittest.TestCase):
def test_W_reg(self):
for reg in [regularizers.identity(), regularizers.l1(), regularizers.l2(), regularizers.l1l2()]:
model = create_model(weight_reg=reg)
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0)
model.evaluate(X_test[test_ids, :], Y_test[test_ids, :], verbose=0)
def test_A_reg(self):
for reg in [regularizers.activity_l1(), regularizers.activity_l2()]:
model = create_model(activity_reg=reg)
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0)
model.evaluate(X_test[test_ids, :], Y_test[test_ids, :], verbose=0)
if __name__ == '__main__':
print('Test weight and activity regularizers')
unittest.main()

@ -1,407 +0,0 @@
from __future__ import absolute_import
from __future__ import print_function
import unittest
import numpy as np
np.random.seed(1337)
from keras import backend as K
from keras.models import Sequential, model_from_json, model_from_yaml
from keras.layers.core import Dense, Activation, Merge, Lambda, LambdaMerge
from keras.utils import np_utils
from keras.utils.test_utils import get_test_data
import pickle
import sys
input_dim = 32
nb_hidden = 16
nb_class = 4
batch_size = 64
nb_epoch = 1
train_samples = 5000
test_samples = 1000
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=train_samples,
nb_test=test_samples,
input_shape=(input_dim,),
classification=True,
nb_class=4)
y_test = np_utils.to_categorical(y_test)
y_train = np_utils.to_categorical(y_train)
print(X_train.shape)
print(y_train.shape)
class TestSequential(unittest.TestCase):
def test_sequential(self):
print('Test sequential')
model = Sequential()
model.add(Dense(nb_hidden, input_shape=(input_dim,)))
model.add(Activation('relu'))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=1, validation_data=(X_test, y_test))
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=2, validation_data=(X_test, y_test))
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=2, validation_split=0.1)
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=1, validation_split=0.1)
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0)
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=1, shuffle=False)
model.train_on_batch(X_train[:32], y_train[:32])
loss = model.evaluate(X_train, y_train, verbose=0)
print('loss:', loss)
if loss > 0.7:
raise Exception('Score too low, learning issue.')
model.predict(X_test, verbose=0)
model.predict_classes(X_test, verbose=0)
model.predict_proba(X_test, verbose=0)
model.get_config(verbose=1)
print('test weight saving')
model.save_weights('temp.h5', overwrite=True)
model = Sequential()
model.add(Dense(nb_hidden, input_shape=(input_dim,)))
model.add(Activation('relu'))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.load_weights('temp.h5')
nloss = model.evaluate(X_train, y_train, verbose=0)
assert(loss == nloss)
# test json serialization
json_data = model.to_json()
model = model_from_json(json_data)
# test yaml serialization
yaml_data = model.to_yaml()
model = model_from_yaml(yaml_data)
def test_merge_sum(self):
print('Test merge: sum')
left = Sequential()
left.add(Dense(nb_hidden, input_shape=(input_dim,)))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(nb_hidden, input_shape=(input_dim,)))
right.add(Activation('relu'))
model = Sequential()
model.add(Merge([left, right], mode='sum'))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_data=([X_test, X_test], y_test))
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_data=([X_test, X_test], y_test))
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_split=0.1)
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_split=0.1)
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0)
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0, shuffle=False)
loss = model.evaluate([X_train, X_train], y_train, verbose=0)
print('loss:', loss)
if loss > 0.7:
raise Exception('Score too low, learning issue.')
model.predict([X_test, X_test], verbose=0)
model.predict_classes([X_test, X_test], verbose=0)
model.predict_proba([X_test, X_test], verbose=0)
model.get_config(verbose=1)
print('test weight saving')
model.save_weights('temp.h5', overwrite=True)
left = Sequential()
left.add(Dense(nb_hidden, input_shape=(input_dim,)))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(nb_hidden, input_shape=(input_dim,)))
right.add(Activation('relu'))
model = Sequential()
model.add(Merge([left, right], mode='sum'))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.load_weights('temp.h5')
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
nloss = model.evaluate([X_train, X_train], y_train, verbose=0)
print(nloss)
assert(loss == nloss)
def test_merge_dot1(self):
print('Test merge: dot')
left = Sequential()
left.add(Dense(input_dim=input_dim, output_dim=nb_hidden))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(input_dim=input_dim, output_dim=nb_hidden))
right.add(Activation('relu'))
model = Sequential()
model.add(Merge([left, right], mode='dot', dot_axes=1))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
def test_merge_dot2(self):
print('Test merge: dot')
left = Sequential()
left.add(Dense(input_dim=input_dim, output_dim=nb_hidden))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(input_dim=input_dim, output_dim=nb_hidden))
right.add(Activation('relu'))
model = Sequential()
model.add(Merge([left, right], mode='dot', dot_axes=([1], [1])))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
def test_merge_concat(self):
print('Test merge: concat')
left = Sequential()
left.add(Dense(nb_hidden, input_shape=(input_dim,)))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(nb_hidden, input_shape=(input_dim,)))
right.add(Activation('relu'))
model = Sequential()
model.add(Merge([left, right], mode='concat'))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_data=([X_test, X_test], y_test))
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_data=([X_test, X_test], y_test))
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_split=0.1)
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_split=0.1)
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0)
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0, shuffle=False)
loss = model.evaluate([X_train, X_train], y_train, verbose=0)
print('loss:', loss)
if loss > 0.7:
raise Exception('Score too low, learning issue.')
model.predict([X_test, X_test], verbose=0)
model.predict_classes([X_test, X_test], verbose=0)
model.predict_proba([X_test, X_test], verbose=0)
model.get_config(verbose=1)
print('test weight saving')
model.save_weights('temp.h5', overwrite=True)
left = Sequential()
left.add(Dense(nb_hidden, input_shape=(input_dim,)))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(nb_hidden, input_shape=(input_dim,)))
right.add(Activation('relu'))
model = Sequential()
model.add(Merge([left, right], mode='concat'))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.load_weights('temp.h5')
nloss = model.evaluate([X_train, X_train], y_train, verbose=0)
assert(loss == nloss)
def test_merge_recursivity(self):
print('Test merge recursivity')
left = Sequential()
left.add(Dense(nb_hidden, input_shape=(input_dim,)))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(nb_hidden, input_shape=(input_dim,)))
right.add(Activation('relu'))
righter = Sequential()
righter.add(Dense(nb_hidden, input_shape=(input_dim,)))
righter.add(Activation('relu'))
intermediate = Sequential()
intermediate.add(Merge([left, right], mode='sum'))
intermediate.add(Dense(nb_hidden))
intermediate.add(Activation('relu'))
model = Sequential()
model.add(Merge([intermediate, righter], mode='sum'))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.fit([X_train, X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_data=([X_test, X_test, X_test], y_test))
model.fit([X_train, X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_data=([X_test, X_test, X_test], y_test))
model.fit([X_train, X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_split=0.1)
model.fit([X_train, X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_split=0.1)
model.fit([X_train, X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0)
model.fit([X_train, X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0, shuffle=False)
loss = model.evaluate([X_train, X_train, X_train], y_train, verbose=0)
print('loss:', loss)
if loss > 0.7:
raise Exception('Score too low, learning issue.')
model.predict([X_test, X_test, X_test], verbose=0)
model.predict_classes([X_test, X_test, X_test], verbose=0)
model.predict_proba([X_test, X_test, X_test], verbose=0)
model.get_config(verbose=1)
model.save_weights('temp.h5', overwrite=True)
model.load_weights('temp.h5')
nloss = model.evaluate([X_train, X_train, X_train], y_train, verbose=0)
print(nloss)
assert(loss == nloss)
def test_merge_overlap(self):
print('Test merge overlap')
left = Sequential()
left.add(Dense(nb_hidden, input_shape=(input_dim,)))
left.add(Activation('relu'))
model = Sequential()
model.add(Merge([left, left], mode='sum'))
model.add(Dense(nb_class))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=1, validation_data=(X_test, y_test))
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=2, validation_data=(X_test, y_test))
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=2, validation_split=0.1)
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=1, validation_split=0.1)
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0)
model.fit(X_train, y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=1, shuffle=False)
model.train_on_batch(X_train[:32], y_train[:32])
loss = model.evaluate(X_train, y_train, verbose=0)
print('loss:', loss)
if loss > 0.7:
raise Exception('Score too low, learning issue.')
model.predict(X_test, verbose=0)
model.predict_classes(X_test, verbose=0)
model.predict_proba(X_test, verbose=0)
model.get_config(verbose=1)
model.save_weights('temp.h5', overwrite=True)
model.load_weights('temp.h5')
nloss = model.evaluate(X_train, y_train, verbose=0)
print(nloss)
assert(loss == nloss)
def test_lambda(self):
print('Test lambda: sum')
def func(X):
s = X[0]
for i in range(1, len(X)):
s += X[i]
return s
def activation(X):
return K.softmax(X)
def output_shape(input_shapes):
return input_shapes[0]
left = Sequential()
left.add(Dense(nb_hidden, input_shape=(input_dim,)))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(nb_hidden, input_shape=(input_dim,)))
right.add(Activation('relu'))
model = Sequential()
model.add(LambdaMerge([left, right], function=func,
output_shape=output_shape))
model.add(Dense(nb_class))
model.add(Lambda(activation))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_data=([X_test, X_test], y_test))
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_data=([X_test, X_test], y_test))
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_split=0.1)
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_split=0.1)
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0)
model.fit([X_train, X_train], y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0, shuffle=False)
loss = model.evaluate([X_train, X_train], y_train, verbose=0)
print('loss:', loss)
if loss > 0.7:
raise Exception('Score too low, learning issue.')
model.predict([X_test, X_test], verbose=0)
model.predict_classes([X_test, X_test], verbose=0)
model.predict_proba([X_test, X_test], verbose=0)
model.get_config(verbose=1)
print('test weight saving')
model.save_weights('temp.h5', overwrite=True)
left = Sequential()
left.add(Dense(nb_hidden, input_shape=(input_dim,)))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(nb_hidden, input_shape=(input_dim,)))
right.add(Activation('relu'))
model = Sequential()
model.add(LambdaMerge([left, right], function=func,
output_shape=output_shape))
model.add(Dense(nb_class))
model.add(Lambda(activation))
model.load_weights('temp.h5')
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
nloss = model.evaluate([X_train, X_train], y_train, verbose=0)
assert(loss == nloss)
print('test serializing')
del func, activation # Make sure that the model has the function code, not just the function name.
sys.setrecursionlimit(50000)
model_str = pickle.dumps(model)
model = pickle.loads(model_str)
nloss = model.evaluate([X_train, X_train], y_train, verbose=0)
assert(loss == nloss)
def test_count_params(self):
print('test count params')
input_dim = 20
nb_units = 10
nb_classes = 2
n = input_dim * nb_units + nb_units
n += nb_units * nb_units + nb_units
n += nb_units * nb_classes + nb_classes
model = Sequential()
model.add(Dense(nb_units, input_shape=(input_dim,)))
model.add(Dense(nb_units))
model.add(Dense(nb_classes))
model.add(Activation('softmax'))
self.assertEqual(n, model.count_params())
model.compile('sgd', 'binary_crossentropy')
self.assertEqual(n, model.count_params())
if __name__ == '__main__':
print('Test Sequential model')
unittest.main()

@ -1,140 +0,0 @@
import unittest
import numpy as np
from keras import backend as K
from keras.layers.core import *
from keras.layers.convolutional import *
from keras.layers.recurrent import SimpleRNN
def check_layer_output_shape(layer, input_data):
ndim = len(input_data.shape)
layer.input = K.placeholder(ndim=ndim)
layer.set_input_shape(input_data.shape[1:])
expected_output_shape = layer.output_shape[1:]
function = K.function([layer.input], [layer.get_output()])
output = function([input_data])[0]
assert output.shape[1:] == expected_output_shape
class TestShapeInference(unittest.TestCase):
# ########
# # Core #
# ########
def test_Reshape(self):
layer = Reshape(dims=(2, 3))
input_data = np.random.random((2, 6))
check_layer_output_shape(layer, input_data)
def test_Permute(self):
layer = Permute(dims=(1, 3, 2))
input_data = np.random.random((2, 2, 4, 3))
check_layer_output_shape(layer, input_data)
def test_Flatten(self):
layer = Flatten()
input_data = np.random.random((2, 2, 3))
check_layer_output_shape(layer, input_data)
def test_RepeatVector(self):
layer = RepeatVector(2)
input_data = np.random.random((2, 2))
check_layer_output_shape(layer, input_data)
def test_Dense(self):
layer = Dense(3)
input_data = np.random.random((2, 2))
check_layer_output_shape(layer, input_data)
def test_TimeDistributedDense(self):
layer = TimeDistributedDense(2)
input_data = np.random.random((2, 2, 3))
check_layer_output_shape(layer, input_data)
#################
# Convolutional #
#################
def test_Convolution1D(self):
for border_mode in ['same', 'valid']:
for filter_length in [2, 3]:
for subsample_length in [1, 2]:
if subsample_length > 1 and border_mode == 'same':
continue
for input_data_shape in [(2, 3, 2), (2, 4, 2)]:
layer = Convolution1D(nb_filter=1,
filter_length=filter_length,
border_mode=border_mode,
subsample_length=subsample_length)
input_data = np.random.random(input_data_shape)
check_layer_output_shape(layer, input_data)
def test_Convolution2D(self):
for border_mode in ['same', 'valid']:
for nb_row, nb_col in [(2, 1), (3, 2)]:
for subsample in [(1, 1), (2, 2)]:
if (subsample[0] > 1 or subsample[1] > 1) and border_mode == 'same':
continue
for input_data_shape in [(2, 1, 3, 3), (2, 1, 4, 4)]:
layer = Convolution2D(nb_filter=1, nb_row=nb_row,
nb_col=nb_row,
border_mode=border_mode,
subsample=subsample)
input_data = np.random.random(input_data_shape)
check_layer_output_shape(layer, input_data)
def test_MaxPooling1D(self):
for ignore_border in [True, False]:
for stride in [1, 2]:
for pool_length in [1, 2]:
for input_data_shape in [(2, 1, 3), (2, 1, 4)]:
layer = MaxPooling1D(pool_length=pool_length,
stride=stride,
border_mode='valid')
input_data = np.random.random(input_data_shape)
check_layer_output_shape(layer, input_data)
def test_MaxPooling2D(self):
for ignore_border in [True, False]:
for strides in [(1, 1), (2, 2)]:
for pool_size in [(2, 2), (3, 3), (4, 4)]:
for input_data_shape in [(2, 1, 3, 3), (2, 1, 4, 4), (2, 1, 5, 5), (2, 1, 6, 6)]:
layer = MaxPooling2D(pool_size=pool_size,
strides=strides,
border_mode='valid')
input_data = np.random.random(input_data_shape)
check_layer_output_shape(layer, input_data)
def test_UpSample1D(self):
layer = UpSample1D(length=2)
input_data = np.random.random((2, 2, 3))
check_layer_output_shape(layer, input_data)
def test_UpSample2D(self):
layer = UpSample2D(size=(2, 2))
input_data = np.random.random((2, 1, 2, 3))
check_layer_output_shape(layer, input_data)
def test_ZeroPadding1D(self):
layer = ZeroPadding1D(1)
input_data = np.random.random((2, 2, 1))
check_layer_output_shape(layer, input_data)
def test_ZeroPadding2D(self):
layer = ZeroPadding2D((1, 2))
input_data = np.random.random((2, 1, 2, 3))
check_layer_output_shape(layer, input_data)
# #############
# # Recurrent #
# #############
def test_SimpleRNN(self):
# all recurrent layers inherit output_shape
# from the same base recurrent layer
layer = SimpleRNN(2)
input_data = np.random.random((2, 2, 3))
check_layer_output_shape(layer, input_data)
if __name__ == "__main__":
unittest.main()

@ -1,132 +0,0 @@
from __future__ import print_function
import numpy as np
np.random.seed(1337)
from keras.utils.test_utils import get_test_data
from keras.models import Sequential
from keras.layers.core import Dense, Activation, TimeDistributedDense, Flatten
from keras.layers.recurrent import GRU
from keras.layers.convolutional import Convolution2D
from keras.utils.np_utils import to_categorical
import unittest
class TestTasks(unittest.TestCase):
def test_vector_clf(self):
nb_hidden = 10
print('vector classification data:')
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=1000, nb_test=200, input_shape=(10,),
classification=True, nb_class=2)
print('X_train:', X_train.shape)
print('X_test:', X_test.shape)
print('y_train:', y_train.shape)
print('y_test:', y_test.shape)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
model = Sequential()
model.add(Dense(nb_hidden, input_shape=(X_train.shape[-1],)))
model.add(Activation('relu'))
model.add(Dense(y_train.shape[-1]))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
history = model.fit(X_train, y_train, nb_epoch=15, batch_size=16, validation_data=(X_test, y_test), show_accuracy=True, verbose=2)
print(history.history)
self.assertTrue(history.history['val_acc'][-1] > 0.8)
def test_vector_reg(self):
nb_hidden = 10
print('vector regression data:')
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=1000, nb_test=200, input_shape=(10,), output_shape=(2,),
classification=False)
print('X_train:', X_train.shape)
print('X_test:', X_test.shape)
print('y_train:', y_train.shape)
print('y_test:', y_test.shape)
model = Sequential()
model.add(Dense(nb_hidden, input_shape=(X_train.shape[-1],)))
model.add(Activation('tanh'))
model.add(Dense(y_train.shape[-1]))
model.compile(loss='hinge', optimizer='adagrad')
history = model.fit(X_train, y_train, nb_epoch=12, batch_size=16, validation_data=(X_test, y_test), verbose=2)
self.assertTrue(history.history['val_loss'][-1] < 0.9)
def test_temporal_clf(self):
print('temporal classification data:')
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=1000, nb_test=200, input_shape=(3, 5),
classification=True, nb_class=2)
print('X_train:', X_train.shape)
print('X_test:', X_test.shape)
print('y_train:', y_train.shape)
print('y_test:', y_test.shape)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
model = Sequential()
model.add(GRU(y_train.shape[-1], input_shape=(None, X_train.shape[-1])))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adadelta')
history = model.fit(X_train, y_train, nb_epoch=12, batch_size=16, validation_data=(X_test, y_test), show_accuracy=True, verbose=2)
self.assertTrue(history.history['val_acc'][-1] > 0.9)
def test_temporal_reg(self):
print('temporal regression data:')
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=1000, nb_test=200, input_shape=(3, 5), output_shape=(2,),
classification=False)
print('X_train:', X_train.shape)
print('X_test:', X_test.shape)
print('y_train:', y_train.shape)
print('y_test:', y_test.shape)
model = Sequential()
model.add(GRU(y_train.shape[-1], input_shape=(None, X_train.shape[-1])))
model.compile(loss='hinge', optimizer='adam')
history = model.fit(X_train, y_train, nb_epoch=12, batch_size=16, validation_data=(X_test, y_test), verbose=2)
self.assertTrue(history.history['val_loss'][-1] < 0.8)
def test_seq_to_seq(self):
print('sequence to sequence data:')
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=1000, nb_test=200, input_shape=(3, 5), output_shape=(3, 5),
classification=False)
print('X_train:', X_train.shape)
print('X_test:', X_test.shape)
print('y_train:', y_train.shape)
print('y_test:', y_test.shape)
model = Sequential()
model.add(TimeDistributedDense(y_train.shape[-1], input_shape=(None, X_train.shape[-1])))
model.compile(loss='hinge', optimizer='rmsprop')
history = model.fit(X_train, y_train, nb_epoch=12, batch_size=16, validation_data=(X_test, y_test), verbose=2)
self.assertTrue(history.history['val_loss'][-1] < 0.8)
def test_img_clf(self):
print('image classification data:')
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=1000, nb_test=200, input_shape=(3, 8, 8),
classification=True, nb_class=2)
print('X_train:', X_train.shape)
print('X_test:', X_test.shape)
print('y_train:', y_train.shape)
print('y_test:', y_test.shape)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
model = Sequential()
model.add(Convolution2D(8, 8, 8, input_shape=(3, 8, 8)))
model.add(Activation('sigmoid'))
model.add(Flatten())
model.add(Dense(y_test.shape[-1]))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='sgd')
history = model.fit(X_train, y_train, nb_epoch=12, batch_size=16, validation_data=(X_test, y_test), show_accuracy=True, verbose=2)
print(history.history['val_acc'][-1])
self.assertTrue(history.history['val_acc'][-1] > 0.9)
if __name__ == '__main__':
print('Test different types of classification and regression tasks')
unittest.main()

@ -1,134 +0,0 @@
from __future__ import absolute_import
from __future__ import print_function
from keras.datasets import mnist
from keras.models import Sequential, model_from_config
from keras.layers.core import AutoEncoder, Dense, Activation, TimeDistributedDense, Flatten
from keras.layers.recurrent import LSTM
from keras.layers.embeddings import Embedding
from keras.layers.core import Layer
from keras.layers import containers
from keras.utils import np_utils
import numpy as np
nb_classes = 10
batch_size = 128
nb_epoch = 5
activation = 'linear'
input_dim = 784
hidden_dim = 392
max_train_samples = 5000
max_test_samples = 1000
# the data, shuffled and split between tran and test sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(60000, input_dim)[:max_train_samples]
X_test = X_test.reshape(10000, input_dim)[:max_test_samples]
X_train = X_train.astype("float32")
X_test = X_test.astype("float32")
X_train /= 255
X_test /= 255
# convert class vectors to binary class matrices
Y_train = np_utils.to_categorical(y_train, nb_classes)[:max_train_samples]
Y_test = np_utils.to_categorical(y_test, nb_classes)[:max_test_samples]
print("X_train: ", X_train.shape)
print("X_test: ", X_test.shape)
##########################
# dense model test #
##########################
print("Training classical fully connected layer for classification")
model_classical = Sequential()
model_classical.add(Dense(input_dim, 10, activation=activation))
model_classical.add(Activation('softmax'))
model_classical.get_config(verbose=1)
model_classical.compile(loss='categorical_crossentropy', optimizer='adam')
model_classical.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_data=(X_test, Y_test))
classical_score = model_classical.evaluate(X_test, Y_test, verbose=0, show_accuracy=True)
print('\nclassical_score:', classical_score)
##########################
# autoencoder model test #
##########################
def build_lstm_autoencoder(autoencoder, X_train, X_test):
X_train = X_train[:, np.newaxis, :]
X_test = X_test[:, np.newaxis, :]
print("Modified X_train: ", X_train.shape)
print("Modified X_test: ", X_test.shape)
# The TimeDistributedDense isn't really necessary, however you need a lot of GPU memory to do 784x394-394x784
autoencoder.add(TimeDistributedDense(input_dim, 16))
autoencoder.add(AutoEncoder(encoder=LSTM(16, 8, activation=activation, return_sequences=True),
decoder=LSTM(8, input_dim, activation=activation, return_sequences=True),
output_reconstruction=False))
return autoencoder, X_train, X_test
def build_deep_classical_autoencoder(autoencoder):
encoder = containers.Sequential([Dense(input_dim, hidden_dim, activation=activation), Dense(hidden_dim, hidden_dim/2, activation=activation)])
decoder = containers.Sequential([Dense(hidden_dim/2, hidden_dim, activation=activation), Dense(hidden_dim, input_dim, activation=activation)])
autoencoder.add(AutoEncoder(encoder=encoder, decoder=decoder, output_reconstruction=False))
return autoencoder
# Try different things here: 'lstm' or 'classical' or 'denoising'
# or 'deep_denoising'
for autoencoder_type in ['classical', 'lstm']:
print(autoencoder_type)
print('-'*40)
# Build our autoencoder model
autoencoder = Sequential()
if autoencoder_type == 'lstm':
print("Training LSTM AutoEncoder")
autoencoder, X_train, X_test = build_lstm_autoencoder(autoencoder, X_train, X_test)
elif autoencoder_type == 'classical':
print("Training Classical AutoEncoder")
autoencoder = build_deep_classical_autoencoder(autoencoder)
else:
print("Error: unknown autoencoder type!")
exit(-1)
autoencoder.compile(loss='mean_squared_error', optimizer='adam')
# Do NOT use validation data with return output_reconstruction=True
autoencoder.fit(X_train, X_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=1)
# Do an inference pass
prefilter_train = autoencoder.predict(X_train, verbose=0)
prefilter_test = autoencoder.predict(X_test, verbose=0)
print("prefilter_train: ", prefilter_train.shape)
print("prefilter_test: ", prefilter_test.shape)
# Classify results from Autoencoder
print("Building classical fully connected layer for classification")
model = Sequential()
if autoencoder_type == 'lstm':
model.add(TimeDistributedDense(8, nb_classes, activation=activation))
model.add(Flatten())
elif autoencoder_type == 'classical':
model.add(Dense(prefilter_train.shape[1], nb_classes, activation=activation))
else:
model.add(Dense(prefilter_train.shape[1], nb_classes, activation=activation))
model.add(Activation('softmax'))
model.get_config(verbose=1)
model.compile(loss='categorical_crossentropy', optimizer='adam')
model.fit(prefilter_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=False, verbose=0, validation_data=(prefilter_test, Y_test))
score = model.evaluate(prefilter_test, Y_test, verbose=0, show_accuracy=True)
print('\nscore:', score)
print('Loss change:', (score[0] - classical_score[0])/classical_score[0], '%')
print('Accuracy change:', (score[1] - classical_score[1])/classical_score[1], '%')
# check serialization
config = autoencoder.get_config(verbose=1)
autoencoder = model_from_config(config)

@ -1,236 +0,0 @@
import numpy as np
import random
import theano
from keras.models import Sequential
from keras.callbacks import Callback
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.regularizers import l2
from keras.layers.convolutional import Convolution2D, MaxPooling2D
from keras.utils import np_utils
from keras.datasets import mnist
import keras.callbacks as cbks
from matplotlib import pyplot as plt
from matplotlib import animation
##############################
# model DrawActivations test #
##############################
print('Running DrawActivations test')
nb_classes = 10
batch_size = 128
nb_epoch = 10
max_train_samples = 512
max_test_samples = 1
np.random.seed(1337)
# the data, shuffled and split between tran and test sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(-1,1,28,28)[:max_train_samples]
X_train = X_train.astype("float32")
X_train /= 255
X_test = X_test.reshape(-1,1,28,28)[:max_test_samples]
X_test = X_test.astype("float32")
X_test /= 255
# convert class vectors to binary class matrices
Y_train = np_utils.to_categorical(y_train, nb_classes)[:max_train_samples]
class Frames(object):
def __init__(self, n_plots=16):
self._n_frames = 0
self._framedata = []
self._titles = []
for i in range(n_plots):
self._framedata.append([])
def add_frame(self, i, frame):
self._framedata[i].append(frame)
def set_title(self, title):
self._titles.append(title)
class SubplotTimedAnimation(animation.TimedAnimation):
def __init__(self, fig, frames, grid=(4, 4), interval=10, blit=False, **kwargs):
self.n_plots = grid[0] * grid[1]
self.axes = [fig.add_subplot(grid[0], grid[1], i + 1) for i in range(self.n_plots)]
for axis in self.axes:
axis.get_xaxis().set_ticks([])
axis.get_yaxis().set_ticks([])
self.frames = frames
self.imgs = [self.axes[i].imshow(frames._framedata[i][0], interpolation='nearest', cmap='bone') for i in range(self.n_plots)]
self.title = fig.suptitle('')
super(SubplotTimedAnimation, self).__init__(fig, interval=interval, blit=blit, **kwargs)
def _draw_frame(self, j):
for i in range(self.n_plots):
self.imgs[i].set_data(self.frames._framedata[i][j])
if len(self.frames._titles) > j:
self.title.set_text(self.frames._titles[j])
self._drawn_artists = self.imgs
def new_frame_seq(self):
return iter(range(len(self.frames._framedata[0])))
def _init_draw(self):
for img in self.imgs:
img.set_data([[]])
def combine_imgs(imgs, grid=(1,1)):
n_imgs, img_h, img_w = imgs.shape
if n_imgs != grid[0] * grid[1]:
raise ValueError()
combined = np.zeros((grid[0] * img_h, grid[1] * img_w))
for i in range(grid[0]):
for j in range(grid[1]):
combined[img_h*i:img_h*(i+1),img_w*j:img_w*(j+1)] = imgs[grid[0] * i + j]
return combined
class DrawActivations(Callback):
def __init__(self, figsize):
self.fig = plt.figure(figsize=figsize)
def on_train_begin(self, logs={}):
self.imgs = Frames(n_plots=5)
layers_0_ids = np.random.choice(32, 16, replace=False)
self.test_layer0 = theano.function([self.model.get_input()], self.model.layers[1].get_output(train=False)[0, layers_0_ids])
layers_1_ids = np.random.choice(64, 36, replace=False)
self.test_layer1 = theano.function([self.model.get_input()], self.model.layers[5].get_output(train=False)[0, layers_1_ids])
self.test_layer2 = theano.function([self.model.get_input()], self.model.layers[10].get_output(train=False)[0])
def on_epoch_begin(self, epoch, logs={}):
self.epoch = epoch
def on_batch_end(self, batch, logs={}):
if batch % 5 == 0:
self.imgs.add_frame(0, X_test[0,0])
self.imgs.add_frame(1, combine_imgs(self.test_layer0(X_test), grid=(4, 4)))
self.imgs.add_frame(2, combine_imgs(self.test_layer1(X_test), grid=(6, 6)))
self.imgs.add_frame(3, self.test_layer2(X_test).reshape((16,16)))
self.imgs.add_frame(4, self.model._predict(X_test)[0].reshape((1,10)))
self.imgs.set_title('Epoch #%d - Batch #%d' % (self.epoch, batch))
def on_train_end(self, logs={}):
anim = SubplotTimedAnimation(self.fig, self.imgs, grid=(1,5), interval=10, blit=False, repeat_delay=1000)
# anim.save('test_gif.gif', fps=15, writer='imagemagick')
plt.show()
# model = Sequential()
# model.add(Dense(784, 50))
# model.add(Activation('relu'))
# model.add(Dense(50, 10))
# model.add(Activation('softmax'))
model = Sequential()
model.add(Convolution2D(32, 1, 3, 3, border_mode='full'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Convolution2D(64, 32, 3, 3, border_mode='full'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(64*8*8, 256))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(256, 10, W_regularizer = l2(0.1)))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
# Fit the model
draw_weights = DrawActivations(figsize=(5.4, 1.35))
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=1, callbacks=[draw_weights])
##########################
# model checkpoint tests #
##########################
print('Running ModelCheckpoint test')
nb_classes = 10
batch_size = 128
nb_epoch = 20
# small sample size to overfit on training data
max_train_samples = 50
max_test_samples = 1000
np.random.seed(1337) # for reproducibility
# the data, shuffled and split between tran and test sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(60000,784)[:max_train_samples]
X_test = X_test.reshape(10000,784)[:max_test_samples]
X_train = X_train.astype("float32")
X_test = X_test.astype("float32")
X_train /= 255
X_test /= 255
# convert class vectors to binary class matrices
Y_train = np_utils.to_categorical(y_train, nb_classes)[:max_train_samples]
Y_test = np_utils.to_categorical(y_test, nb_classes)[:max_test_samples]
# Create a slightly larger network than required to test best validation save only
model = Sequential()
model.add(Dense(784, 500))
model.add(Activation('relu'))
model.add(Dense(500, 10))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
# test file location
path = "/tmp"
filename = "model_weights.hdf5"
import os
f = os.path.join(path, filename)
print("Test model checkpointer")
# only store best validation model in checkpointer
checkpointer = cbks.ModelCheckpoint(filepath=f, verbose=1, save_best_only=True)
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_data=(X_test, Y_test), callbacks =[checkpointer])
if not os.path.isfile(f):
raise Exception("Model weights were not saved to %s" % (f))
print("Test model checkpointer without validation data")
import warnings
warnings.filterwarnings('error')
try:
passed = False
# this should issue a warning
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, callbacks =[checkpointer])
except:
passed = True
if not passed:
raise Exception("Modelcheckpoint tests did not pass")
print("Test model checkpointer with pattern")
filename = "model_weights.{epoch:04d}.hdf5"
f = os.path.join(path, filename)
nb_epoch = 3
checkpointer = cbks.ModelCheckpoint(f)
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0, callbacks=[checkpointer])
for i in range(nb_epoch):
if not os.path.isfile(f.format(epoch=i)):
raise Exception("Model weights were not saved separately for each epoch")
print("Tests passed")

@ -1,100 +0,0 @@
from __future__ import absolute_import
from __future__ import print_function
import keras
from keras.datasets import mnist
import keras.models
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.regularizers import l2, l1
from keras.constraints import maxnorm, nonneg
from keras.optimizers import SGD, Adam, RMSprop
from keras.utils import np_utils, generic_utils
import theano
import theano.tensor as T
import numpy as np
import scipy
batch_size = 100
nb_classes = 10
nb_epoch = 10
# the data, shuffled and split between tran and test sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train=X_train.reshape(60000,784)
X_test=X_test.reshape(10000,784)
X_train = X_train.astype("float32")
X_test = X_test.astype("float32")
X_train /= 255
X_test /= 255
# convert class vectors to binary class matrices
Y_train = np_utils.to_categorical(y_train, nb_classes)
Y_test = np_utils.to_categorical(y_test, nb_classes)
model = Sequential()
model.add(Dense(784, 20, W_constraint=maxnorm(1)))
model.add(Activation('relu'))
model.add(Dropout(0.1))
model.add(Dense(20, 20, W_constraint=nonneg()))
model.add(Activation('relu'))
model.add(Dropout(0.1))
model.add(Dense(20, 10, W_constraint=maxnorm(1)))
model.add(Activation('softmax'))
rms = RMSprop()
model.compile(loss='categorical_crossentropy', optimizer=rms)
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0)
a=model.params[0].eval()
if np.isclose(np.max(np.sqrt(np.sum(a**2, axis=0))),1):
print('Maxnorm test passed')
else:
raise ValueError('Maxnorm test failed!')
b=model.params[2].eval()
if np.min(b)==0 and np.min(a)!=0:
print('Nonneg test passed')
else:
raise ValueError('Nonneg test failed!')
model = Sequential()
model.add(Dense(784, 20))
model.add(Activation('relu'))
model.add(Dense(20, 20, W_regularizer=l1(.01)))
model.add(Activation('relu'))
model.add(Dense(20, 10))
model.add(Activation('softmax'))
rms = RMSprop()
model.compile(loss='categorical_crossentropy', optimizer=rms)
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=20, show_accuracy=True, verbose=0)
a=model.params[2].eval().reshape(400)
(D, p1) = scipy.stats.kurtosistest(a)
model = Sequential()
model.add(Dense(784, 20))
model.add(Activation('relu'))
model.add(Dense(20, 20, W_regularizer=l2(.01)))
model.add(Activation('relu'))
model.add(Dense(20, 10))
model.add(Activation('softmax'))
rms = RMSprop()
model.compile(loss='categorical_crossentropy', optimizer=rms)
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=20, show_accuracy=True, verbose=0)
a=model.params[2].eval().reshape(400)
(D, p2) = scipy.stats.kurtosistest(a)
if p1<.01 and p2>.01:
print('L1 and L2 regularization tests passed')
else:
raise ValueError('L1 and L2 regularization tests failed!')

@ -1,130 +0,0 @@
# Dummy test data as input to RNN. This input is 3 timesteps long where the third timestep always matches the
# first. Without masking it should be able to learn it, with masking it should fail.
import numpy as np
from keras.utils.theano_utils import sharedX
from keras.models import Sequential
from keras.layers.core import Dense, Activation, Merge, Dropout, TimeDistributedDense
from keras.layers.embeddings import Embedding
from keras.layers.recurrent import SimpleRNN, SimpleDeepRNN, LSTM, GRU
import theano
theano.config.exception_verbosity = 'high'
# (nb_samples, timesteps, dimensions)
X = np.random.random_integers(1, 4, size=(500000, 15))
print("About to compile the first model")
model = Sequential()
model.add(Embedding(5, 4, mask_zero=True))
model.add(TimeDistributedDense(4, 4)) # obviously this is redundant. Just testing.
model.add(SimpleRNN(4, 4, activation='relu', return_sequences=True))
model.add(Dropout(0.5))
model.add(SimpleDeepRNN(4, 4, depth=2, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(4, 4, activation='softmax'))
model.compile(loss='categorical_crossentropy',
optimizer='rmsprop', theano_mode=theano.compile.mode.FAST_RUN)
print("Compiled model")
W = model.get_weights() # We'll save these so we can reset it later
X[:, : 10] = 0
Xmask0 = X.copy()
Xmask0[:, 10] = 0
Xmask12 = X.copy()
Xmask12[:, 11] = 0
Xmask12[:, 12] = 0
X0_onehot = np.zeros((X.shape[0], 4))
X1_onehot = np.zeros((X.shape[0], 4))
for i, row in enumerate(X):
X0_onehot[i, row[10] - 1] = 1
X1_onehot[i, row[11] - 1] = 1
# Uniform score: 4 options = ln(4) nats (2 bits)
# we should not do better than this when we mask out the part of the input
# that gives us the correct answer
uniform_score = np.log(4)
batch_size=512
# Train it to guess 0th dim
model.fit(X, X0_onehot, nb_epoch=1, batch_size=batch_size)
score = model.evaluate(X, X0_onehot, batch_size=batch_size)
if score > uniform_score * 0.9:
raise Exception('Failed to learn to copy timestep 0, score %f' % score)
model.set_weights(W)
# Train without showing it the 0th dim to learn 1st dim
model.fit(X[: , 1:], X1_onehot, nb_epoch=1, batch_size=batch_size)
score = model.evaluate(X[:, 1:], X1_onehot, batch_size=batch_size)
if score > uniform_score * 0.9:
raise Exception('Failed to learn to copy timestep 1, score %f' % score)
model.set_weights(W)
# Train to guess 0th dim when 0th dim has been masked (should fail)
model.fit(Xmask0, X0_onehot, nb_epoch=1, batch_size=batch_size)
score = model.evaluate(Xmask0, X0_onehot, batch_size=batch_size)
if score < uniform_score * 0.9:
raise Exception('Somehow learned to copy timestep 0 despite mask, score %f' % score)
model.set_weights(W)
# Train to guess 1st dim when 0th dim has been masked (should succeed)
model.fit(Xmask0, X1_onehot, nb_epoch=1, batch_size=batch_size)
score = model.evaluate(Xmask0, X1_onehot, batch_size=batch_size)
if score > uniform_score * 0.9:
raise Exception('Failed to learn to copy timestep 1 in masked model, score %f' % score)
model.set_weights(W)
# Finally, make sure the mask is actually blocking input, mask out timesteps 1 and 2, and see if
# it can learn timestep 0 (should fail)
model.fit(Xmask12, X0_onehot, nb_epoch=1, batch_size=batch_size)
score = model.evaluate(Xmask12, X0_onehot, batch_size=batch_size)
if score < uniform_score * 0.9:
raise Exception('Somehow learned to copy timestep 0 despite masking 1, score %f' % score)
# Another testing approach, just initialize models and make sure that prepending zeros doesn't affect
# their output
print("About to compile the second model")
model2 = Sequential()
model2.add(Embedding(5, 4, mask_zero=True))
model2.add(TimeDistributedDense(4, 4))
model2.add(Activation('time_distributed_softmax'))
model2.add(LSTM(4, 4, return_sequences=True))
model2.add(Activation('tanh'))
model2.add(GRU(4, 4, activation='softmax', return_sequences=True))
model2.add(SimpleDeepRNN(4, 4, depth=2, activation='relu', return_sequences=True))
model2.add(SimpleRNN(4, 4, activation='relu', return_sequences=True))
model2.compile(loss='categorical_crossentropy',
optimizer='rmsprop', theano_mode=theano.compile.mode.FAST_RUN)
print("Compiled model2")
X2 = np.random.random_integers(1, 4, size=(2, 5))
y2 = np.random.random((X2.shape[0], X2.shape[1], 4))
ref = model2.predict(X2)
ref_eval = model2.evaluate(X2, y2)
mask = np.ones((y2.shape[0], y2.shape[1], 1))
for pre_zeros in range(1, 10):
padded_X2 = np.concatenate((np.zeros((X2.shape[0], pre_zeros)), X2), axis=1)
padded_mask = np.concatenate((np.zeros((mask.shape[0], pre_zeros, mask.shape[2])), mask), axis=1)
padded_y2 = np.concatenate((np.zeros((y2.shape[0], pre_zeros, y2.shape[2])), y2), axis=1)
pred = model2.predict(padded_X2)
if not np.allclose(ref[:, -1, :], pred[:, -1, :]):
raise Exception("Different result after left-padding %d zeros. Ref: %s, Pred: %s" % (pre_zeros, ref, pred))
pad_eval = model2.evaluate(padded_X2, padded_y2, weights=padded_mask)
if not np.allclose([pad_eval], [ref_eval]):
raise Exception("Got dissimilar categorical_crossentropy after left-padding %d zeros. Ref: %f, Pred %f" %\
(pref_eval, pred_val))

@ -1,44 +0,0 @@
from __future__ import absolute_import
from __future__ import print_function
from keras.models import Sequential, Graph
from keras.layers.core import Layer, Activation, Dense, Flatten, Reshape, Merge
from keras.layers.convolutional import Convolution2D, MaxPooling2D
import keras.utils.layer_utils as layer_utils
print('-- Sequential model')
left = Sequential()
left.add(Convolution2D(32, 1, 3, 3, border_mode='valid'))
left.add(MaxPooling2D(pool_size=(2, 2)))
left.add(Flatten())
left.add(Dense(32 * 13 * 13, 50))
left.add(Activation('relu'))
right = Sequential()
right.add(Dense(784, 30))
right.add(Activation('relu'))
model = Sequential()
model.add(Merge([left, right], mode='concat'))
model.add(Dense(80, 10))
model.add(Activation('softmax'))
layer_utils.print_layer_shapes(model, [(1, 1, 28, 28), (1, 784)])
print('-- Graph model')
graph = Graph()
graph.add_input(name='input1', ndim=2)
graph.add_input(name='input2', ndim=4)
graph.add_node(Dense(32, 16), name='dense1', input='input1')
graph.add_node(Dense(16, 4), name='dense3', input='dense1')
graph.add_node(Convolution2D(32, 1, 3, 3), name='conv1', input='input2')
graph.add_node(Flatten(), name='flatten1', input='conv1')
graph.add_node(Dense(32 * 13 * 13, 10), name='dense4', input='flatten1')
graph.add_output(name='output1', inputs=['dense1', 'dense3'], merge_mode='sum')
graph.add_output(name='output2', inputs=['dense1', 'dense4'], merge_mode='concat')
layer_utils.print_layer_shapes(graph, {'input1': (1, 32), 'input2': (1, 1, 28, 28)})
print('Test script complete')

@ -1,40 +0,0 @@
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD
import sys
sys.setrecursionlimit(10000) # to be able to pickle Theano compiled functions
import pickle, numpy
def create_model():
model = Sequential()
model.add(Dense(256, 2048, init='uniform', activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(2048, 2048, init='uniform', activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(2048, 2048, init='uniform', activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(2048, 2048, init='uniform', activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(2048, 256, init='uniform', activation='linear'))
return model
model = create_model()
sgd = SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(loss='mse', optimizer=sgd)
pickle.dump(model, open('/tmp/model.pkl', 'wb'))
model.save_weights('/tmp/model_weights.hdf5')
model_loaded = create_model()
model_loaded.load_weights('/tmp/model_weights.hdf5')
for k in range(len(model.layers)):
weights_orig = model.layers[k].get_weights()
weights_loaded = model_loaded.layers[k].get_weights()
for x, y in zip(weights_orig, weights_loaded):
if numpy.any(x != y):
raise ValueError('Loaded weights are different from pickled weights!')

@ -1,126 +0,0 @@
from __future__ import absolute_import
from __future__ import print_function
from keras.datasets import mnist
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.utils import np_utils
from keras.wrappers.scikit_learn import *
import numpy as np
batch_size = 128
nb_epoch = 1
nb_classes = 10
max_train_samples = 5000
max_test_samples = 1000
np.random.seed(1337) # for reproducibility
############################################
# scikit-learn classification wrapper test #
############################################
print('Beginning scikit-learn classification wrapper test')
print('Loading data')
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(60000, 784)[:max_train_samples]
X_test = X_test.reshape(10000, 784)[:max_test_samples]
X_train = X_train.astype('float32')
X_test = X_test.astype('float32')
X_train /= 255
X_test /= 255
Y_train = np_utils.to_categorical(y_train, nb_classes)[:max_train_samples]
Y_test = np_utils.to_categorical(y_test, nb_classes)[:max_test_samples]
print('Defining model')
model = Sequential()
model.add(Dense(784, 50))
model.add(Activation('relu'))
model.add(Dense(50, 10))
model.add(Activation('softmax'))
print('Creating wrapper')
classifier = KerasClassifier(model, train_batch_size=batch_size, nb_epoch=nb_epoch)
print('Fitting model')
classifier.fit(X_train, Y_train)
print('Testing score function')
score = classifier.score(X_train, Y_train)
print('Score: ', score)
print('Testing predict function')
preds = classifier.predict(X_test)
print('Preds.shape: ', preds.shape)
print('Testing predict proba function')
proba = classifier.predict_proba(X_test)
print('Proba.shape: ', proba.shape)
print('Testing get params')
print(classifier.get_params())
print('Testing set params')
classifier.set_params(optimizer='sgd', loss='binary_crossentropy')
print(classifier.get_params())
print('Testing attributes')
print('Classes')
print(classifier.classes_)
print('Config')
print(classifier.config_)
print('Weights')
print(classifier.weights_)
print('Compiled model')
print(classifier.compiled_model_)
########################################
# scikit-learn regression wrapper test #
########################################
print('Beginning scikit-learn regression wrapper test')
print('Generating data')
X_train = np.random.random((5000, 100))
X_test = np.random.random((1000, 100))
y_train = np.random.random(5000)
y_test = np.random.random(1000)
print('Defining model')
model = Sequential()
model.add(Dense(100, 50))
model.add(Activation('relu'))
model.add(Dense(50, 1))
model.add(Activation('linear'))
print('Creating wrapper')
regressor = KerasRegressor(model, train_batch_size=batch_size, nb_epoch=nb_epoch)
print('Fitting model')
regressor.fit(X_train, y_train)
print('Testing score function')
score = regressor.score(X_train, y_train)
print('Score: ', score)
print('Testing predict function')
preds = regressor.predict(X_test)
print('Preds.shape: ', preds.shape)
print('Testing get params')
print(regressor.get_params())
print('Testing set params')
regressor.set_params(optimizer='sgd', loss='mean_absolute_error')
print(regressor.get_params())
print('Testing attributes')
print('Config')
print(regressor.config_)
print('Weights')
print(regressor.weights_)
print('Compiled model')
print(regressor.compiled_model_)
print('Test script complete.')

@ -1,101 +0,0 @@
from __future__ import absolute_import
from __future__ import print_function
import numpy as np
from keras.utils.test_utils import get_test_data
from keras.preprocessing import sequence
from keras.optimizers import SGD, RMSprop, Adagrad
from keras.utils import np_utils
from keras.models import Sequential, Graph
from keras.layers.core import Dense, Dropout, Activation, Merge
from keras.layers.embeddings import Embedding
from keras.layers.recurrent import LSTM, GRU
from keras.datasets import imdb
from keras.models import model_from_yaml
'''
This is essentially the IMDB test. Deserialized models should yield
the same config as the original one.
'''
max_features = 10000
maxlen = 100
batch_size = 32
(X_train, y_train), (X_test, y_test) = imdb.load_data(nb_words=max_features, test_split=0.2)
X_train = sequence.pad_sequences(X_train, maxlen=maxlen)
X_test = sequence.pad_sequences(X_test, maxlen=maxlen)
model = Sequential()
model.add(Embedding(max_features, 128))
model.add(LSTM(128, 128))
model.add(Dropout(0.5))
model.add(Dense(128, 1, W_regularizer='identity', b_constraint='maxnorm'))
model.add(Activation('sigmoid'))
model.get_config(verbose=1)
#####################################
# save model w/o parameters to yaml #
#####################################
yaml_no_params = model.to_yaml()
no_param_model = model_from_yaml(yaml_no_params)
no_param_model.get_config(verbose=1)
######################################
# save multi-branch sequential model #
######################################
seq = Sequential()
seq.add(Merge([model, model], mode='sum'))
seq.get_config(verbose=1)
merge_yaml = seq.to_yaml()
merge_model = model_from_yaml(merge_yaml)
large_model = Sequential()
large_model.add(Merge([seq,model], mode='concat'))
large_model.get_config(verbose=1)
large_model.to_yaml()
####################
# save graph model #
####################
X = np.random.random((100, 32))
X2 = np.random.random((100, 32))
y = np.random.random((100, 4))
y2 = np.random.random((100,))
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=1000, nb_test=200, input_shape=(32,),
classification=False, output_shape=(4,))
graph = Graph()
graph.add_input(name='input1', ndim=2)
graph.add_node(Dense(32, 16), name='dense1', input='input1')
graph.add_node(Dense(32, 4), name='dense2', input='input1')
graph.add_node(Dense(16, 4), name='dense3', input='dense1')
graph.add_output(name='output1', inputs=['dense2', 'dense3'], merge_mode='sum')
graph.compile('rmsprop', {'output1': 'mse'})
graph.get_config(verbose=1)
history = graph.fit({'input1': X_train, 'output1': y_train}, nb_epoch=10)
original_pred = graph.predict({'input1': X_test})
graph_yaml = graph.to_yaml()
graph.save_weights('temp.h5', overwrite=True)
reloaded_graph = model_from_yaml(graph_yaml)
reloaded_graph.load_weights('temp.h5')
reloaded_graph.get_config(verbose=1)
reloaded_graph.compile('rmsprop', {'output1': 'mse'})
new_pred = reloaded_graph.predict({'input1': X_test})
assert(np.sum(new_pred['output1'] - original_pred['output1']) == 0)