keras/tests/manual/check_callbacks.py
2015-10-09 16:32:34 -04:00

237 lines
7.8 KiB
Python

import numpy as np
import random
import theano
from keras.models import Sequential
from keras.callbacks import Callback
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.regularizers import l2
from keras.layers.convolutional import Convolution2D, MaxPooling2D
from keras.utils import np_utils
from keras.datasets import mnist
import keras.callbacks as cbks
from matplotlib import pyplot as plt
from matplotlib import animation
##############################
# model DrawActivations test #
##############################
print('Running DrawActivations test')
nb_classes = 10
batch_size = 128
nb_epoch = 10
max_train_samples = 512
max_test_samples = 1
np.random.seed(1337)
# the data, shuffled and split between tran and test sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(-1,1,28,28)[:max_train_samples]
X_train = X_train.astype("float32")
X_train /= 255
X_test = X_test.reshape(-1,1,28,28)[:max_test_samples]
X_test = X_test.astype("float32")
X_test /= 255
# convert class vectors to binary class matrices
Y_train = np_utils.to_categorical(y_train, nb_classes)[:max_train_samples]
class Frames(object):
def __init__(self, n_plots=16):
self._n_frames = 0
self._framedata = []
self._titles = []
for i in range(n_plots):
self._framedata.append([])
def add_frame(self, i, frame):
self._framedata[i].append(frame)
def set_title(self, title):
self._titles.append(title)
class SubplotTimedAnimation(animation.TimedAnimation):
def __init__(self, fig, frames, grid=(4, 4), interval=10, blit=False, **kwargs):
self.n_plots = grid[0] * grid[1]
self.axes = [fig.add_subplot(grid[0], grid[1], i + 1) for i in range(self.n_plots)]
for axis in self.axes:
axis.get_xaxis().set_ticks([])
axis.get_yaxis().set_ticks([])
self.frames = frames
self.imgs = [self.axes[i].imshow(frames._framedata[i][0], interpolation='nearest', cmap='bone') for i in range(self.n_plots)]
self.title = fig.suptitle('')
super(SubplotTimedAnimation, self).__init__(fig, interval=interval, blit=blit, **kwargs)
def _draw_frame(self, j):
for i in range(self.n_plots):
self.imgs[i].set_data(self.frames._framedata[i][j])
if len(self.frames._titles) > j:
self.title.set_text(self.frames._titles[j])
self._drawn_artists = self.imgs
def new_frame_seq(self):
return iter(range(len(self.frames._framedata[0])))
def _init_draw(self):
for img in self.imgs:
img.set_data([[]])
def combine_imgs(imgs, grid=(1,1)):
n_imgs, img_h, img_w = imgs.shape
if n_imgs != grid[0] * grid[1]:
raise ValueError()
combined = np.zeros((grid[0] * img_h, grid[1] * img_w))
for i in range(grid[0]):
for j in range(grid[1]):
combined[img_h*i:img_h*(i+1),img_w*j:img_w*(j+1)] = imgs[grid[0] * i + j]
return combined
class DrawActivations(Callback):
def __init__(self, figsize):
self.fig = plt.figure(figsize=figsize)
def on_train_begin(self, logs={}):
self.imgs = Frames(n_plots=5)
layers_0_ids = np.random.choice(32, 16, replace=False)
self.test_layer0 = theano.function([self.model.get_input()], self.model.layers[1].get_output(train=False)[0, layers_0_ids])
layers_1_ids = np.random.choice(64, 36, replace=False)
self.test_layer1 = theano.function([self.model.get_input()], self.model.layers[5].get_output(train=False)[0, layers_1_ids])
self.test_layer2 = theano.function([self.model.get_input()], self.model.layers[10].get_output(train=False)[0])
def on_epoch_begin(self, epoch, logs={}):
self.epoch = epoch
def on_batch_end(self, batch, logs={}):
if batch % 5 == 0:
self.imgs.add_frame(0, X_test[0,0])
self.imgs.add_frame(1, combine_imgs(self.test_layer0(X_test), grid=(4, 4)))
self.imgs.add_frame(2, combine_imgs(self.test_layer1(X_test), grid=(6, 6)))
self.imgs.add_frame(3, self.test_layer2(X_test).reshape((16,16)))
self.imgs.add_frame(4, self.model._predict(X_test)[0].reshape((1,10)))
self.imgs.set_title('Epoch #%d - Batch #%d' % (self.epoch, batch))
def on_train_end(self, logs={}):
anim = SubplotTimedAnimation(self.fig, self.imgs, grid=(1,5), interval=10, blit=False, repeat_delay=1000)
# anim.save('test_gif.gif', fps=15, writer='imagemagick')
plt.show()
# model = Sequential()
# model.add(Dense(784, 50))
# model.add(Activation('relu'))
# model.add(Dense(50, 10))
# model.add(Activation('softmax'))
model = Sequential()
model.add(Convolution2D(32, 1, 3, 3, border_mode='full'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Convolution2D(64, 32, 3, 3, border_mode='full'))
model.add(Activation('relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(64*8*8, 256))
model.add(Activation('relu'))
model.add(Dropout(0.5))
model.add(Dense(256, 10, W_regularizer = l2(0.1)))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
# Fit the model
draw_weights = DrawActivations(figsize=(5.4, 1.35))
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=1, callbacks=[draw_weights])
##########################
# model checkpoint tests #
##########################
print('Running ModelCheckpoint test')
nb_classes = 10
batch_size = 128
nb_epoch = 20
# small sample size to overfit on training data
max_train_samples = 50
max_test_samples = 1000
np.random.seed(1337) # for reproducibility
# the data, shuffled and split between tran and test sets
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = X_train.reshape(60000,784)[:max_train_samples]
X_test = X_test.reshape(10000,784)[:max_test_samples]
X_train = X_train.astype("float32")
X_test = X_test.astype("float32")
X_train /= 255
X_test /= 255
# convert class vectors to binary class matrices
Y_train = np_utils.to_categorical(y_train, nb_classes)[:max_train_samples]
Y_test = np_utils.to_categorical(y_test, nb_classes)[:max_test_samples]
# Create a slightly larger network than required to test best validation save only
model = Sequential()
model.add(Dense(784, 500))
model.add(Activation('relu'))
model.add(Dense(500, 10))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='rmsprop')
# test file location
path = "/tmp"
filename = "model_weights.hdf5"
import os
f = os.path.join(path, filename)
print("Test model checkpointer")
# only store best validation model in checkpointer
checkpointer = cbks.ModelCheckpoint(filepath=f, verbose=1, save_best_only=True)
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, validation_data=(X_test, Y_test), callbacks =[checkpointer])
if not os.path.isfile(f):
raise Exception("Model weights were not saved to %s" % (f))
print("Test model checkpointer without validation data")
import warnings
warnings.filterwarnings('error')
try:
passed = False
# this should issue a warning
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, show_accuracy=True, verbose=0, callbacks =[checkpointer])
except:
passed = True
if not passed:
raise Exception("Modelcheckpoint tests did not pass")
print("Test model checkpointer with pattern")
filename = "model_weights.{epoch:04d}.hdf5"
f = os.path.join(path, filename)
nb_epoch = 3
checkpointer = cbks.ModelCheckpoint(f)
model.fit(X_train, Y_train, batch_size=batch_size, nb_epoch=nb_epoch, verbose=0, callbacks=[checkpointer])
for i in range(nb_epoch):
if not os.path.isfile(f.format(epoch=i)):
raise Exception("Model weights were not saved separately for each epoch")
print("Tests passed")