Merge branch 'main' of github.com:keras-team/keras-core

This commit is contained in:
Francois Chollet 2023-08-01 16:30:46 -07:00
parent 1ffa864cfe
commit a4b58acfbe
14 changed files with 1918 additions and 50 deletions

@ -0,0 +1,484 @@
"""
Title: Classification with Neural Decision Forests
Author: [Khalid Salama](https://www.linkedin.com/in/khalid-salama-24403144/)
Date created: 2021/01/15
Last modified: 2021/01/15
Description: How to train differentiable decision trees for end-to-end learning in deep neural networks.
Accelerator: GPU
"""
"""
## Introduction
This example provides an implementation of the
[Deep Neural Decision Forest](https://ieeexplore.ieee.org/document/7410529)
model introduced by P. Kontschieder et al. for structured data classification.
It demonstrates how to build a stochastic and differentiable decision tree model,
train it end-to-end, and unify decision trees with deep representation learning.
## The dataset
This example uses the
[United States Census Income Dataset](https://archive.ics.uci.edu/ml/datasets/census+income)
provided by the
[UC Irvine Machine Learning Repository](https://archive.ics.uci.edu/ml/index.php).
The task is binary classification
to predict whether a person is likely to be making over USD 50,000 a year.
The dataset includes 48,842 instances with 14 input features (such as age, work class, education, occupation, and so on): 5 numerical features
and 9 categorical features.
"""
"""
## Setup
"""
import keras_core as keras
from keras_core import layers
from keras_core.layers import StringLookup
from keras_core import ops
from tensorflow import data as tf_data
import numpy as np
import pandas as pd
import math
_dtype = "float32"
"""
## Prepare the data
"""
CSV_HEADER = [
"age",
"workclass",
"fnlwgt",
"education",
"education_num",
"marital_status",
"occupation",
"relationship",
"race",
"gender",
"capital_gain",
"capital_loss",
"hours_per_week",
"native_country",
"income_bracket",
]
train_data_url = (
"https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.data"
)
train_data = pd.read_csv(train_data_url, header=None, names=CSV_HEADER)
test_data_url = (
"https://archive.ics.uci.edu/ml/machine-learning-databases/adult/adult.test"
)
test_data = pd.read_csv(test_data_url, header=None, names=CSV_HEADER)
print(f"Train dataset shape: {train_data.shape}")
print(f"Test dataset shape: {test_data.shape}")
"""
Remove the first record (because it is not a valid data example) and a trailing
'dot' in the class labels.
"""
test_data = test_data[1:]
test_data.income_bracket = test_data.income_bracket.apply(
lambda value: value.replace(".", "")
)
"""
We store the training and test data splits locally as CSV files.
"""
train_data_file = "train_data.csv"
test_data_file = "test_data.csv"
train_data.to_csv(train_data_file, index=False, header=False)
test_data.to_csv(test_data_file, index=False, header=False)
"""
## Define dataset metadata
Here, we define the metadata of the dataset that will be useful for reading and parsing
and encoding input features.
"""
# A list of the numerical feature names.
NUMERIC_FEATURE_NAMES = [
"age",
"education_num",
"capital_gain",
"capital_loss",
"hours_per_week",
]
# A dictionary of the categorical features and their vocabulary.
CATEGORICAL_FEATURES_WITH_VOCABULARY = {
"workclass": sorted(list(train_data["workclass"].unique())),
"education": sorted(list(train_data["education"].unique())),
"marital_status": sorted(list(train_data["marital_status"].unique())),
"occupation": sorted(list(train_data["occupation"].unique())),
"relationship": sorted(list(train_data["relationship"].unique())),
"race": sorted(list(train_data["race"].unique())),
"gender": sorted(list(train_data["gender"].unique())),
"native_country": sorted(list(train_data["native_country"].unique())),
}
# A list of the columns to ignore from the dataset.
IGNORE_COLUMN_NAMES = ["fnlwgt"]
# A list of the categorical feature names.
CATEGORICAL_FEATURE_NAMES = list(CATEGORICAL_FEATURES_WITH_VOCABULARY.keys())
# A list of all the input features.
FEATURE_NAMES = NUMERIC_FEATURE_NAMES + CATEGORICAL_FEATURE_NAMES
# A list of column default values for each feature.
COLUMN_DEFAULTS = [
[0.0]
if feature_name in NUMERIC_FEATURE_NAMES + IGNORE_COLUMN_NAMES
else ["NA"]
for feature_name in CSV_HEADER
]
# The name of the target feature.
TARGET_FEATURE_NAME = "income_bracket"
# A list of the labels of the target features.
TARGET_LABELS = [" <=50K", " >50K"]
"""
## Create `tf_data.Dataset` objects for training and validation
We create an input function to read and parse the file, and convert features and labels
into a [`tf_data.Dataset`](https://www.tensorflow.org/guide/datasets)
for training and validation. We also preprocess the input by mapping the target label
to an index.
"""
target_label_lookup = StringLookup(
vocabulary=TARGET_LABELS, mask_token=None, num_oov_indices=0
)
lookup_dict = {}
for feature_name in CATEGORICAL_FEATURE_NAMES:
vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
# Create a lookup to convert a string values to an integer indices.
# Since we are not using a mask token, nor expecting any out of vocabulary
# (oov) token, we set mask_token to None and num_oov_indices to 0.
lookup = StringLookup(
vocabulary=vocabulary, mask_token=None, num_oov_indices=0
)
lookup_dict[feature_name] = lookup
def encode_categorical(batch_x, batch_y):
for feature_name in CATEGORICAL_FEATURE_NAMES:
batch_x[feature_name] = lookup_dict[feature_name](batch_x[feature_name])
return batch_x, batch_y
def get_dataset_from_csv(csv_file_path, shuffle=False, batch_size=128):
dataset = (
tf_data.experimental.make_csv_dataset(
csv_file_path,
batch_size=batch_size,
column_names=CSV_HEADER,
column_defaults=COLUMN_DEFAULTS,
label_name=TARGET_FEATURE_NAME,
num_epochs=1,
header=False,
na_value="?",
shuffle=shuffle,
)
.map(lambda features, target: (features, target_label_lookup(target)))
.map(encode_categorical)
)
return dataset.cache()
"""
## Create model inputs
"""
def create_model_inputs():
inputs = {}
for feature_name in FEATURE_NAMES:
if feature_name in NUMERIC_FEATURE_NAMES:
inputs[feature_name] = layers.Input(
name=feature_name, shape=(), dtype=_dtype
)
else:
inputs[feature_name] = layers.Input(
name=feature_name, shape=(), dtype="int32"
)
return inputs
"""
## Encode input features
"""
def encode_inputs(inputs):
encoded_features = []
for feature_name in inputs:
if feature_name in CATEGORICAL_FEATURE_NAMES:
vocabulary = CATEGORICAL_FEATURES_WITH_VOCABULARY[feature_name]
# Create a lookup to convert a string values to an integer indices.
# Since we are not using a mask token, nor expecting any out of vocabulary
# (oov) token, we set mask_token to None and num_oov_indices to 0.
value_index = inputs[feature_name]
embedding_dims = int(math.sqrt(lookup.vocabulary_size()))
# Create an embedding layer with the specified dimensions.
embedding = layers.Embedding(
input_dim=lookup.vocabulary_size(), output_dim=embedding_dims
)
# Convert the index values to embedding representations.
encoded_feature = embedding(value_index)
else:
# Use the numerical features as-is.
encoded_feature = inputs[feature_name]
if inputs[feature_name].shape[-1] is None:
encoded_feature = keras.ops.expand_dims(encoded_feature, -1)
encoded_features.append(encoded_feature)
encoded_features = layers.concatenate(encoded_features)
return encoded_features
"""
## Deep Neural Decision Tree
A neural decision tree model has two sets of weights to learn. The first set is `pi`,
which represents the probability distribution of the classes in the tree leaves.
The second set is the weights of the routing layer `decision_fn`, which represents the probability
of going to each leave. The forward pass of the model works as follows:
1. The model expects input `features` as a single vector encoding all the features of an instance
in the batch. This vector can be generated from a Convolution Neural Network (CNN) applied to images
or dense transformations applied to structured data features.
2. The model first applies a `used_features_mask` to randomly select a subset of input features to use.
3. Then, the model computes the probabilities (`mu`) for the input instances to reach the tree leaves
by iteratively performing a *stochastic* routing throughout the tree levels.
4. Finally, the probabilities of reaching the leaves are combined by the class probabilities at the
leaves to produce the final `outputs`.
"""
class NeuralDecisionTree(keras.Model):
def __init__(self, depth, num_features, used_features_rate, num_classes):
super().__init__()
self.depth = depth
self.num_leaves = 2**depth
self.num_classes = num_classes
# Create a mask for the randomly selected features.
num_used_features = int(num_features * used_features_rate)
one_hot = np.eye(num_features)
sampled_feature_indices = np.random.choice(
np.arange(num_features), num_used_features, replace=False
)
self.used_features_mask = ops.convert_to_tensor(
one_hot[sampled_feature_indices], dtype=_dtype
)
# Initialize the weights of the classes in leaves.
self.pi = self.add_weight(
initializer="random_normal",
shape=[self.num_leaves, self.num_classes],
dtype=_dtype,
trainable=True,
)
# Initialize the stochastic routing layer.
self.decision_fn = layers.Dense(
units=self.num_leaves, activation="sigmoid", name="decision"
)
def call(self, features):
batch_size = ops.shape(features)[0]
# Apply the feature mask to the input features.
features = ops.matmul(
features, ops.transpose(self.used_features_mask)
) # [batch_size, num_used_features]
# Compute the routing probabilities.
decisions = ops.expand_dims(
self.decision_fn(features), axis=2
) # [batch_size, num_leaves, 1]
# Concatenate the routing probabilities with their complements.
decisions = layers.concatenate(
[decisions, 1 - decisions], axis=2
) # [batch_size, num_leaves, 2]
mu = ops.ones([batch_size, 1, 1])
begin_idx = 1
end_idx = 2
# Traverse the tree in breadth-first order.
for level in range(self.depth):
mu = ops.reshape(
mu, [batch_size, -1, 1]
) # [batch_size, 2 ** level, 1]
mu = ops.tile(mu, (1, 1, 2)) # [batch_size, 2 ** level, 2]
level_decisions = decisions[
:, begin_idx:end_idx, :
] # [batch_size, 2 ** level, 2]
mu = mu * level_decisions # [batch_size, 2**level, 2]
begin_idx = end_idx
end_idx = begin_idx + 2 ** (level + 1)
mu = ops.reshape(
mu, [batch_size, self.num_leaves]
) # [batch_size, num_leaves]
probabilities = keras.activations.softmax(
self.pi
) # [num_leaves, num_classes]
outputs = ops.matmul(mu, probabilities) # [batch_size, num_classes]
return outputs
"""
## Deep Neural Decision Forest
The neural decision forest model consists of a set of neural decision trees that are
trained simultaneously. The output of the forest model is the average outputs of its trees.
"""
class NeuralDecisionForest(keras.Model):
def __init__(
self, num_trees, depth, num_features, used_features_rate, num_classes
):
super().__init__()
self.ensemble = []
# Initialize the ensemble by adding NeuralDecisionTree instances.
# Each tree will have its own randomly selected input features to use.
for _ in range(num_trees):
self.ensemble.append(
NeuralDecisionTree(
depth, num_features, used_features_rate, num_classes
)
)
def call(self, inputs):
# Initialize the outputs: a [batch_size, num_classes] matrix of zeros.
batch_size = ops.shape(inputs)[0]
outputs = ops.zeros([batch_size, num_classes])
# Aggregate the outputs of trees in the ensemble.
for tree in self.ensemble:
outputs += tree(inputs)
# Divide the outputs by the ensemble size to get the average.
outputs /= len(self.ensemble)
return outputs
"""
Finally, let's set up the code that will train and evaluate the model.
"""
learning_rate = 0.01
batch_size = 265
num_epochs = 10
def run_experiment(model):
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
loss=keras.losses.SparseCategoricalCrossentropy(),
metrics=[keras.metrics.SparseCategoricalAccuracy()],
)
print("Start training the model...")
train_dataset = get_dataset_from_csv(
train_data_file, shuffle=True, batch_size=batch_size
)
model.fit(train_dataset, epochs=num_epochs)
print("Model training finished")
print("Evaluating the model on the test data...")
test_dataset = get_dataset_from_csv(test_data_file, batch_size=batch_size)
_, accuracy = model.evaluate(test_dataset)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")
"""
## Experiment 1: train a decision tree model
In this experiment, we train a single neural decision tree model
where we use all input features.
"""
num_trees = 10
depth = 10
used_features_rate = 1.0
num_classes = len(TARGET_LABELS)
def create_tree_model():
inputs = create_model_inputs()
features = encode_inputs(inputs)
features = layers.BatchNormalization()(features)
num_features = features.shape[1]
tree = NeuralDecisionTree(
depth, num_features, used_features_rate, num_classes
)
outputs = tree(features)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
tree_model = create_tree_model()
run_experiment(tree_model)
"""
## Experiment 2: train a forest model
In this experiment, we train a neural decision forest with `num_trees` trees
where each tree uses randomly selected 50% of the input features. You can control the number
of features to be used in each tree by setting the `used_features_rate` variable.
In addition, we set the depth to 5 instead of 10 compared to the previous experiment.
"""
num_trees = 25
depth = 5
used_features_rate = 0.5
def create_forest_model():
inputs = create_model_inputs()
features = encode_inputs(inputs)
features = layers.BatchNormalization()(features)
num_features = features.shape[1]
forest_model = NeuralDecisionForest(
num_trees, depth, num_features, used_features_rate, num_classes
)
outputs = forest_model(features)
model = keras.Model(inputs=inputs, outputs=outputs)
return model
forest_model = create_forest_model()
run_experiment(forest_model)
"""
You can use the trained model hosted on [Hugging Face Hub](https://huggingface.co/keras-io/neural-decision-forest)
and try the demo on [Hugging Face Spaces](https://huggingface.co/spaces/keras-io/Neural-Decision-Forest).
"""

@ -0,0 +1,443 @@
"""
Title: Self-supervised contrastive learning with SimSiam
Author: [Sayak Paul](https://twitter.com/RisingSayak)
Date created: 2021/03/19
Last modified: 2021/03/20
Description: Implementation of a self-supervised learning method for computer vision.
Accelerator: GPU
"""
"""
Self-supervised learning (SSL) is an interesting branch of study in the field of
representation learning. SSL systems try to formulate a supervised signal from a corpus
of unlabeled data points. An example is we train a deep neural network to predict the
next word from a given set of words. In literature, these tasks are known as *pretext
tasks* or *auxiliary tasks*. If we [train such a network](https://arxiv.org/abs/1801.06146) on a huge dataset (such as
the [Wikipedia text corpus](https://www.corpusdata.org/wikipedia.asp)) it learns very effective
representations that transfer well to downstream tasks. Language models like
[BERT](https://arxiv.org/abs/1810.04805), [GPT-3](https://arxiv.org/abs/2005.14165),
[ELMo](https://allennlp.org/elmo) all benefit from this.
Much like the language models we can train computer vision models using similar
approaches. To make things work in computer vision, we need to formulate the learning
tasks such that the underlying model (a deep neural network) is able to make sense of the
semantic information present in vision data. One such task is to a model to _contrast_
between two different versions of the same image. The hope is that in this way the model
will have learn representations where the similar images are grouped as together possible
while the dissimilar images are further away.
In this example, we will be implementing one such system called **SimSiam** proposed in
[Exploring Simple Siamese Representation Learning](https://arxiv.org/abs/2011.10566). It
is implemented as the following:
1. We create two different versions of the same dataset with a stochastic data
augmentation pipeline. Note that the random initialization seed needs to be the same
during create these versions.
2. We take a ResNet without any classification head (**backbone**) and we add a shallow
fully-connected network (**projection head**) on top of it. Collectively, this is known
as the **encoder**.
3. We pass the output of the encoder through a **predictor** which is again a shallow
fully-connected network having an
[AutoEncoder](https://en.wikipedia.org/wiki/Autoencoder) like structure.
4. We then train our encoder to maximize the cosine similarity between the two different
versions of our dataset.
"""
"""
## Setup
"""
import os
os.environ['KERAS_BACKEND'] = 'tensorflow'
from keras_core import layers
from keras_core import regularizers
import keras_core as keras
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
"""
## Define hyperparameters
"""
AUTO = tf.data.AUTOTUNE
BATCH_SIZE = 128
EPOCHS = 5
CROP_TO = 32
SEED = 26
PROJECT_DIM = 2048
LATENT_DIM = 512
WEIGHT_DECAY = 0.0005
"""
## Load the CIFAR-10 dataset
"""
(x_train, y_train), (x_test, y_test) = keras.datasets.cifar10.load_data()
print(f"Total training examples: {len(x_train)}")
print(f"Total test examples: {len(x_test)}")
"""
## Defining our data augmentation pipeline
As studied in [SimCLR](https://arxiv.org/abs/2002.05709) having the right data
augmentation pipeline is critical for SSL systems to work effectively in computer vision.
Two particular augmentation transforms that seem to matter the most are: 1.) Random
resized crops and 2.) Color distortions. Most of the other SSL systems for computer
vision (such as [BYOL](https://arxiv.org/abs/2006.07733),
[MoCoV2](https://arxiv.org/abs/2003.04297), [SwAV](https://arxiv.org/abs/2006.09882),
etc.) include these in their training pipelines.
"""
def flip_random_crop(image):
# With random crops we also apply horizontal flipping.
image = tf.image.random_flip_left_right(image)
image = tf.image.random_crop(image, (CROP_TO, CROP_TO, 3))
return image
def color_jitter(x, strength=[0.4, 0.4, 0.4, 0.1]):
x = tf.image.random_brightness(x, max_delta=0.8 * strength[0])
x = tf.image.random_contrast(
x, lower=1 - 0.8 * strength[1], upper=1 + 0.8 * strength[1]
)
x = tf.image.random_saturation(
x, lower=1 - 0.8 * strength[2], upper=1 + 0.8 * strength[2]
)
x = tf.image.random_hue(x, max_delta=0.2 * strength[3])
# Affine transformations can disturb the natural range of
# RGB images, hence this is needed.
x = tf.clip_by_value(x, 0, 255)
return x
def color_drop(x):
x = tf.image.rgb_to_grayscale(x)
x = tf.tile(x, [1, 1, 3])
return x
def random_apply(func, x, p):
if tf.random.uniform([], minval=0, maxval=1) < p:
return func(x)
else:
return x
def custom_augment(image):
# As discussed in the SimCLR paper, the series of augmentation
# transformations (except for random crops) need to be applied
# randomly to impose translational invariance.
image = flip_random_crop(image)
image = random_apply(color_jitter, image, p=0.8)
image = random_apply(color_drop, image, p=0.2)
return image
"""
It should be noted that an augmentation pipeline is generally dependent on various
properties of the dataset we are dealing with. For example, if images in the dataset are
heavily object-centric then taking random crops with a very high probability may hurt the
training performance.
Let's now apply our augmentation pipeline to our dataset and visualize a few outputs.
"""
"""
## Convert the data into TensorFlow `Dataset` objects
Here we create two different versions of our dataset *without* any ground-truth labels.
"""
ssl_ds_one = tf.data.Dataset.from_tensor_slices(x_train)
ssl_ds_one = (
ssl_ds_one.shuffle(1024, seed=SEED)
.map(custom_augment, num_parallel_calls=AUTO)
.batch(BATCH_SIZE)
.prefetch(AUTO)
)
ssl_ds_two = tf.data.Dataset.from_tensor_slices(x_train)
ssl_ds_two = (
ssl_ds_two.shuffle(1024, seed=SEED)
.map(custom_augment, num_parallel_calls=AUTO)
.batch(BATCH_SIZE)
.prefetch(AUTO)
)
# We then zip both of these datasets.
ssl_ds = tf.data.Dataset.zip((ssl_ds_one, ssl_ds_two))
# Visualize a few augmented images.
sample_images_one = next(iter(ssl_ds_one))
plt.figure(figsize=(10, 10))
for n in range(25):
ax = plt.subplot(5, 5, n + 1)
plt.imshow(sample_images_one[n].numpy().astype("int"))
plt.axis("off")
plt.show()
# Ensure that the different versions of the dataset actually contain
# identical images.
sample_images_two = next(iter(ssl_ds_two))
plt.figure(figsize=(10, 10))
for n in range(25):
ax = plt.subplot(5, 5, n + 1)
plt.imshow(sample_images_two[n].numpy().astype("int"))
plt.axis("off")
plt.show()
"""
Notice that the images in `samples_images_one` and `sample_images_two` are essentially
the same but are augmented differently.
"""
"""
## Defining the encoder and the predictor
We use an implementation of ResNet20 that is specifically configured for the CIFAR10
dataset. The code is taken from the
[keras-idiomatic-programmer](https://github.com/GoogleCloudPlatform/keras-idiomatic-programmer/blob/master/zoo/resnet/resnet_cifar10_v2.py) repository. The hyperparameters of
these architectures have been referred from Section 3 and Appendix A of [the original
paper](https://arxiv.org/abs/2011.10566).
"""
"""shell
wget -q https://shorturl.at/QS369 -O resnet_cifar10_v2.py
"""
import resnet_cifar10_v2
N = 2
DEPTH = N * 9 + 2
NUM_BLOCKS = ((DEPTH - 2) // 9) - 1
def get_encoder():
# Input and backbone.
inputs = layers.Input((CROP_TO, CROP_TO, 3))
x = layers.Rescaling(scale=1.0 / 127.5, offset=-1)(inputs)
x = resnet_cifar10_v2.stem(x)
x = resnet_cifar10_v2.learner(x, NUM_BLOCKS)
x = layers.GlobalAveragePooling2D(name="backbone_pool")(x)
# Projection head.
x = layers.Dense(
PROJECT_DIM,
use_bias=False,
kernel_regularizer=regularizers.l2(WEIGHT_DECAY),
)(x)
x = layers.BatchNormalization()(x)
x = layers.ReLU()(x)
x = layers.Dense(
PROJECT_DIM,
use_bias=False,
kernel_regularizer=regularizers.l2(WEIGHT_DECAY),
)(x)
outputs = layers.BatchNormalization()(x)
return keras.Model(inputs, outputs, name="encoder")
def get_predictor():
model = keras.Sequential(
[
# Note the AutoEncoder-like structure.
layers.Input((PROJECT_DIM,)),
layers.Dense(
LATENT_DIM,
use_bias=False,
kernel_regularizer=regularizers.l2(WEIGHT_DECAY),
),
layers.ReLU(),
layers.BatchNormalization(),
layers.Dense(PROJECT_DIM),
],
name="predictor",
)
return model
"""
## Defining the (pre-)training loop
One of the main reasons behind training networks with these kinds of approaches is to
utilize the learned representations for downstream tasks like classification. This is why
this particular training phase is also referred to as _pre-training_.
We start by defining the loss function.
"""
def compute_loss(p, z):
# The authors of SimSiam emphasize the impact of
# the `stop_gradient` operator in the paper as it
# has an important role in the overall optimization.
z = tf.stop_gradient(z)
p = tf.math.l2_normalize(p, axis=1)
z = tf.math.l2_normalize(z, axis=1)
# Negative cosine similarity (minimizing this is
# equivalent to maximizing the similarity).
return -tf.reduce_mean(tf.reduce_sum((p * z), axis=1))
"""
We then define our training loop by overriding the `train_step()` function of the
`keras.Model` class.
"""
class SimSiam(keras.Model):
def __init__(self, encoder, predictor):
super().__init__()
self.encoder = encoder
self.predictor = predictor
self.loss_tracker = keras.metrics.Mean(name="loss")
@property
def metrics(self):
return [self.loss_tracker]
def train_step(self, data):
# Unpack the data.
ds_one, ds_two = data
# Forward pass through the encoder and predictor.
with tf.GradientTape() as tape:
z1, z2 = self.encoder(ds_one), self.encoder(ds_two)
p1, p2 = self.predictor(z1), self.predictor(z2)
# Note that here we are enforcing the network to match
# the representations of two differently augmented batches
# of data.
loss = compute_loss(p1, z2) / 2 + compute_loss(p2, z1) / 2
# Compute gradients and update the parameters.
learnable_params = (
self.encoder.trainable_variables
+ self.predictor.trainable_variables
)
gradients = tape.gradient(loss, learnable_params)
self.optimizer.apply_gradients(zip(gradients, learnable_params))
# Monitor loss.
self.loss_tracker.update_state(loss)
return {"loss": self.loss_tracker.result()}
"""
## Pre-training our networks
In the interest of this example, we will train the model for only 5 epochs. In reality,
this should at least be 100 epochs.
"""
# Create a cosine decay learning scheduler.
num_training_samples = len(x_train)
steps = EPOCHS * (num_training_samples // BATCH_SIZE)
lr_decayed_fn = keras.optimizers.schedules.CosineDecay(
initial_learning_rate=0.03, decay_steps=steps
)
# Create an early stopping callback.
early_stopping = keras.callbacks.EarlyStopping(
monitor="loss", patience=5, restore_best_weights=True
)
# Compile model and start training.
simsiam = SimSiam(get_encoder(), get_predictor())
simsiam.compile(optimizer=keras.optimizers.SGD(lr_decayed_fn, momentum=0.6))
history = simsiam.fit(ssl_ds, epochs=EPOCHS, callbacks=[early_stopping])
# Visualize the training progress of the model.
plt.plot(history.history["loss"])
plt.grid()
plt.title("Negative Cosine Similairty")
plt.show()
"""
If your solution gets very close to -1 (minimum value of our loss) very quickly with a
different dataset and a different backbone architecture that is likely because of
*representation collapse*. It is a phenomenon where the encoder yields similar output for
all the images. In that case additional hyperparameter tuning is required especially in
the following areas:
* Strength of the color distortions and their probabilities.
* Learning rate and its schedule.
* Architecture of both the backbone and their projection head.
"""
"""
## Evaluating our SSL method
The most popularly used method to evaluate a SSL method in computer vision (or any other
pre-training method as such) is to learn a linear classifier on the frozen features of
the trained backbone model (in this case it is ResNet20) and evaluate the classifier on
unseen images. Other methods include
[fine-tuning](https://keras.io/guides/transfer_learning/) on the source dataset or even a
target dataset with 5% or 10% labels present. Practically, we can use the backbone model
for any downstream task such as semantic segmentation, object detection, and so on where
the backbone models are usually pre-trained with *pure supervised learning*.
"""
# We first create labeled `Dataset` objects.
train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train))
test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test))
# Then we shuffle, batch, and prefetch this dataset for performance. We
# also apply random resized crops as an augmentation but only to the
# training set.
train_ds = (
train_ds.shuffle(1024)
.map(lambda x, y: (flip_random_crop(x), y), num_parallel_calls=AUTO)
.batch(BATCH_SIZE)
.prefetch(AUTO)
)
test_ds = test_ds.batch(BATCH_SIZE).prefetch(AUTO)
# Extract the backbone ResNet20.
backbone = keras.Model(
simsiam.encoder.input, simsiam.encoder.get_layer("backbone_pool").output
)
# We then create our linear classifier and train it.
backbone.trainable = False
inputs = layers.Input((CROP_TO, CROP_TO, 3))
x = backbone(inputs, training=False)
outputs = layers.Dense(10, activation="softmax")(x)
linear_model = keras.Model(inputs, outputs, name="linear_model")
# Compile model and start training.
linear_model.compile(
loss="sparse_categorical_crossentropy",
metrics=["accuracy"],
optimizer=keras.optimizers.SGD(lr_decayed_fn, momentum=0.9),
)
history = linear_model.fit(
train_ds, validation_data=test_ds, epochs=EPOCHS, callbacks=[early_stopping]
)
_, test_acc = linear_model.evaluate(test_ds)
print("Test accuracy: {:.2f}%".format(test_acc * 100))
"""
## Notes
* More data and longer pre-training schedule benefit SSL in general.
* SSL is particularly very helpful when you do not have access to very limited *labeled*
training data but you can manage to build a large corpus of unlabeled data. Recently,
using an SSL method called [SwAV](https://arxiv.org/abs/2006.09882), a group of
researchers at Facebook trained a [RegNet](https://arxiv.org/abs/2006.09882) on 2 Billion
images. They were able to achieve downstream performance very close to those achieved by
pure supervised pre-training. For some downstream tasks, their method even outperformed
the supervised counterparts. You can check out [their
paper](https://arxiv.org/pdf/2103.01988.pdf) to know the details.
* If you are interested to understand why contrastive SSL helps networks learn meaningful
representations, you can check out the following resources:
* [Self-supervised learning: The dark matter of
intelligence](https://ai.facebook.com/blog/self-supervised-learning-the-dark-matter-of-intelligence/)
* [Understanding self-supervised learning using controlled datasets with known
structure](https://sslneuips20.github.io/files/CameraReadys%203-77/64/CameraReady/Understanding_self_supervised_learning.pdf)
"""

@ -0,0 +1,209 @@
"""
Title: Visualizing what convnets learn
Author: [fchollet](https://twitter.com/fchollet)
Date created: 2020/05/29
Last modified: 2020/05/29
Description: Displaying the visual patterns that convnet filters respond to.
Accelerator: GPU
"""
"""
## Introduction
In this example, we look into what sort of visual patterns image classification models
learn. We'll be using the `ResNet50V2` model, trained on the ImageNet dataset.
Our process is simple: we will create input images that maximize the activation of
specific filters in a target layer (picked somewhere in the middle of the model: layer
`conv3_block4_out`). Such images represent a visualization of the
pattern that the filter responds to.
"""
"""
## Setup
"""
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import keras_core as keras
import numpy as np
import tensorflow as tf
# The dimensions of our input image
img_width = 180
img_height = 180
# Our target layer: we will visualize the filters from this layer.
# See `model.summary()` for list of layer names, if you want to change this.
layer_name = "conv3_block4_out"
"""
## Build a feature extraction model
"""
# Build a ResNet50V2 model loaded with pre-trained ImageNet weights
model = keras.applications.ResNet50V2(weights="imagenet", include_top=False)
# Set up a model that returns the activation values for our target layer
layer = model.get_layer(name=layer_name)
feature_extractor = keras.Model(inputs=model.inputs, outputs=layer.output)
"""
## Set up the gradient ascent process
The "loss" we will maximize is simply the mean of the activation of a specific filter in
our target layer. To avoid border effects, we exclude border pixels.
"""
def compute_loss(input_image, filter_index):
activation = feature_extractor(input_image)
# We avoid border artifacts by only involving non-border pixels in the loss.
filter_activation = activation[:, 2:-2, 2:-2, filter_index]
return tf.reduce_mean(filter_activation)
"""
Our gradient ascent function simply computes the gradients of the loss above
with regard to the input image, and update the update image so as to move it
towards a state that will activate the target filter more strongly.
"""
@tf.function
def gradient_ascent_step(img, filter_index, learning_rate):
with tf.GradientTape() as tape:
tape.watch(img)
loss = compute_loss(img, filter_index)
# Compute gradients.
grads = tape.gradient(loss, img)
# Normalize gradients.
grads = tf.math.l2_normalize(grads)
img += learning_rate * grads
return loss, img
"""
## Set up the end-to-end filter visualization loop
Our process is as follow:
- Start from a random image that is close to "all gray" (i.e. visually netural)
- Repeatedly apply the gradient ascent step function defined above
- Convert the resulting input image back to a displayable form, by normalizing it,
center-cropping it, and restricting it to the [0, 255] range.
"""
def initialize_image():
# We start from a gray image with some random noise
img = tf.random.uniform((1, img_width, img_height, 3))
# ResNet50V2 expects inputs in the range [-1, +1].
# Here we scale our random inputs to [-0.125, +0.125]
return (img - 0.5) * 0.25
def visualize_filter(filter_index):
# We run gradient ascent for 20 steps
iterations = 30
learning_rate = 10.0
img = initialize_image()
for iteration in range(iterations):
loss, img = gradient_ascent_step(img, filter_index, learning_rate)
# Decode the resulting input image
img = deprocess_image(img[0].numpy())
return loss, img
def deprocess_image(img):
# Normalize array: center on 0., ensure variance is 0.15
img -= img.mean()
img /= img.std() + 1e-5
img *= 0.15
# Center crop
img = img[25:-25, 25:-25, :]
# Clip to [0, 1]
img += 0.5
img = np.clip(img, 0, 1)
# Convert to RGB array
img *= 255
img = np.clip(img, 0, 255).astype("uint8")
return img
"""
Let's try it out with filter 0 in the target layer:
"""
from IPython.display import Image, display
loss, img = visualize_filter(0)
keras.utils.save_img("0.png", img)
"""
This is what an input that maximizes the response of filter 0 in the target layer would
look like:
"""
display(Image("0.png"))
"""
## Visualize the first 64 filters in the target layer
Now, let's make a 8x8 grid of the first 64 filters
in the target layer to get of feel for the range
of different visual patterns that the model has learned.
"""
# Compute image inputs that maximize per-filter activations
# for the first 64 filters of our target layer
all_imgs = []
for filter_index in range(64):
print("Processing filter %d" % (filter_index,))
loss, img = visualize_filter(filter_index)
all_imgs.append(img)
# Build a black picture with enough space for
# our 8 x 8 filters of size 128 x 128, with a 5px margin in between
margin = 5
n = 8
cropped_width = img_width - 25 * 2
cropped_height = img_height - 25 * 2
width = n * cropped_width + (n - 1) * margin
height = n * cropped_height + (n - 1) * margin
stitched_filters = np.zeros((width, height, 3))
# Fill the picture with our saved filters
for i in range(n):
for j in range(n):
img = all_imgs[i * n + j]
stitched_filters[
(cropped_width + margin) * i : (cropped_width + margin) * i + cropped_width,
(cropped_height + margin) * j : (cropped_height + margin) * j
+ cropped_height,
:,
] = img
keras.utils.save_img("stiched_filters.png", stitched_filters)
from IPython.display import Image, display
display(Image("stiched_filters.png"))
"""
Image classification models see the world by decomposing their inputs over a "vector
basis" of texture filters such as these.
See also
[this old blog post](https://blog.keras.io/how-convolutional-neural-networks-see-the-world.html)
for analysis and interpretation.
Example available on HuggingFace.
[![Generic badge](https://img.shields.io/badge/🤗%20Spaces-What%20Convnets%20Learn-black.svg)](https://huggingface.co/spaces/keras-io/what-convnets-learn)
"""

@ -0,0 +1,695 @@
# -*- coding: utf-8 -*-
"""
Author: [lukewood](https://lukewood.xyz)
Date created: 03/28/2023
Last modified: 07/25/2023
Description: Use KerasCV to train powerful image classifiers.
"""
"""
## Introduction
Classification is the process of predicting a categorical label for a given
input image.
While classification is a relatively straightforward computer vision task,
modern approaches still are built of several complex components.
Luckily, KerasCV provides APIs to construct commonly used components.
This guide demonstrates KerasCV's modular approach to solving image
classification problems at three levels of complexity:
- Inference with a pretrained classifier
- Fine-tuning a pretrained backbone
- Training a image classifier from scratch
## Multi-Backend Support
KerasCV's `ImageClassifier` model supports several backends like JAX, PyTorch,
and TensorFlow with the help of `keras_core`. To enable multi-backend support
in KerasCV, set the `KERAS_CV_MULTI_BACKEND` environment variable. We can
then switch between different backends by setting the `KERAS_BACKEND`
environment variable. Currently, `"tensorflow"`, `"jax"`, and `"torch"` are
supported.
This demonstration uses the Jax backend.
"""
import os
os.environ["KERAS_CV_MULTI_BACKEND"] = "1"
os.environ["KERAS_BACKEND"] = "jax"
import json
import math
import keras_cv
import keras_core as keras
from keras_core import ops
from keras_core import losses
from keras_core import optimizers
from keras_core.optimizers import schedules
from keras_core import metrics
import tensorflow as tf
from tensorflow import data as tf_data
import tensorflow_datasets as tfds
import numpy as np
"""## Inference with a pretrained classifier
Let's get started with the simplest KerasCV API: a pretrained classifier.
In this example, we will construct a classifier that was
pretrained on the ImageNet dataset.
We'll use this model to solve the age old "Cat or Dog" problem.
The highest level module in KerasCV is a *task*. A *task* is a `keras.Model`
consisting of a (generally pretrained) backbone model and task-specific
layers. Here's an example using `keras_cv.models.ImageClassifier` with an
EfficientNetV2B0 Backbone.
EfficientNetV2B0 is a great starting model when constructing an image
classification pipeline.
This architecture manages to achieve high accuracy, while using a
parameter count of 7M.
If an EfficientNetV2B0 is not powerful enough for the task you are hoping to
solve, be sure to check out
[KerasCV's other available Backbones](https://github.com/keras-team/keras-cv/tree/master/keras_cv/models/backbones)!
"""
classifier = keras_cv.models.ImageClassifier.from_preset(
"efficientnetv2_b0_imagenet_classifier"
)
"""You may notice a small deviation from the old `keras.applications` API;
where you would construct the class with
`EfficientNetV2B0(weights="imagenet")`. While the old API was great for
classification, it did not scale effectively to other use cases that required
complex architectures, like object deteciton and semantic segmentation.
Now that our classifier is built, let's apply it to this cute cat picture!
"""
filepath = keras.utils.get_file(origin="https://i.imgur.com/9i63gLN.jpg")
image = keras.utils.load_img(filepath)
image = np.array(image)
keras_cv.visualization.plot_image_gallery(
image[None, ...], rows=1, cols=1, value_range=(0, 255), show=True, scale=4
)
"""Next, let's get some predictions from our classifier:"""
predictions = classifier.predict(np.expand_dims(image, axis=0))
"""Predictions come in the form of softmax-ed category rankings.
We can find the index of the top classes using a simple argsort function:
"""
top_classes = predictions[0].argsort(axis=-1)
"""In order to decode the class mappings, we can construct a mapping from
category indices to ImageNet class names.
For convenience, I've stored the ImageNet class mapping in a GitHub gist.
Let's download and load it now.
"""
classes = keras.utils.get_file(
origin="https://gist.githubusercontent.com/LukeWood/62eebcd5c5c4a4d0e0b7845780f76d55/raw/fde63e5e4c09e2fa0a3436680f436bdcb8325aac/ImagenetClassnames.json"
)
with open(classes, "rb") as f:
classes = json.load(f)
"""Now we can simply look up the class names via index:"""
top_two = [classes[str(i)] for i in top_classes[-2:]]
print("Top two classes are:", top_two)
"""Great! Both of these appear to be correct!
However, one of the classes is "Velvet".
We're trying to classify Cats VS Dogs.
We don't care about the velvet blanket!
Ideally, we'd have a classifier that only performs computation to determine if
an image is a cat or a dog, and has all of its resources dedicated to this
task. This can be solved by fine tuning our own classifier.
# Fine tuning a pretrained classifier
When labeled images specific to our task are available, fine-tuning a custom
classifier can improve performance.
If we want to train a Cats vs Dogs Classifier, using explicitly labeled Cat vs
Dog data should perform better than the generic classifier!
For many tasks, no relevant pretrained model
will be available (e.g., categorizing images specific to your application).
First, let's get started by loading some data:
"""
BATCH_SIZE = 32
IMAGE_SIZE = (224, 224)
AUTOTUNE = tf_data.AUTOTUNE
tfds.disable_progress_bar()
data, dataset_info = tfds.load(
"cats_vs_dogs",
with_info=True,
as_supervised=True
)
train_steps_per_epoch = (
dataset_info.splits["train"].num_examples // BATCH_SIZE
)
train_dataset = data["train"]
num_classes = dataset_info.features["label"].num_classes
resizing = keras_cv.layers.Resizing(
IMAGE_SIZE[0], IMAGE_SIZE[1], crop_to_aspect_ratio=True
)
encoder = keras.layers.CategoryEncoding(num_classes, "one_hot", dtype="int32")
def preprocess_inputs(image, label):
# Staticly resize images as we only iterate the dataset once.
return resizing(image), encoder(label)
# Shuffle the dataset to increase diversity of batches.
# 10*BATCH_SIZE follows the assumption that bigger machines can handle bigger
# shuffle buffers.
train_dataset = train_dataset.shuffle(
10 * BATCH_SIZE, reshuffle_each_iteration=True
).map(preprocess_inputs, num_parallel_calls=AUTOTUNE)
train_dataset = train_dataset.batch(BATCH_SIZE)
images = next(iter(train_dataset.take(1)))[0]
keras_cv.visualization.plot_image_gallery(images, value_range=(0, 255))
"""Meow!
Next let's construct our model.
The use of imagenet in the preset name indicates that the backbone was
pretrained on the ImageNet dataset.
Pretrained backbones extract more information from our labeled examples by
leveraging patterns extracted from potentially much larger datasets.
Next lets put together our classifier:
"""
model = keras_cv.models.ImageClassifier.from_preset(
"efficientnetv2_b0_imagenet", num_classes=2
)
model.compile(
loss="categorical_crossentropy",
optimizer=keras.optimizers.SGD(learning_rate=0.01),
metrics=["accuracy"],
)
"""Here our classifier is just a simple `keras.Sequential`.
All that is left to do is call `model.fit()`:
"""
model.fit(train_dataset)
"""Let's look at how our model performs after the fine tuning:"""
predictions = model.predict(np.expand_dims(image, axis=0))
classes = {0: "cat", 1: "dog"}
print("Top class is:", classes[predictions[0].argmax()])
"""Awesome - looks like the model correctly classified the image.
# Train a Classifier from Scratch
Now that we've gotten our hands dirty with classification, let's take on one
last task: training a classification model from scratch!
A standard benchmark for image classification is the ImageNet dataset, however
due to licensing constraints we will use the CalTech 101 image classification
dataset in this tutorial.
While we use the simpler CalTech 101 dataset in this guide, the same training
template may be used on ImageNet to achieve near state-of-the-art scores.
Let's start out by tackling data loading:
"""
NUM_CLASSES = 101
# Change epochs to 100~ to fully train.
EPOCHS = 1
encoder = keras.layers.CategoryEncoding(NUM_CLASSES, "one_hot", dtype="int32")
def package_inputs(image, label):
return {"images": image, "labels": encoder(label)}
train_ds, eval_ds = tfds.load(
"caltech101", split=["train", "test"], as_supervised="true"
)
train_ds = train_ds.map(package_inputs, num_parallel_calls=tf_data.AUTOTUNE)
eval_ds = eval_ds.map(package_inputs, num_parallel_calls=tf_data.AUTOTUNE)
train_ds = train_ds.shuffle(BATCH_SIZE * 16)
"""The CalTech101 dataset has different sizes for every image, so we use the
`ragged_batch()` API to batch them together while maintaining each individual
image's shape information.
"""
train_ds = train_ds.ragged_batch(BATCH_SIZE)
eval_ds = eval_ds.ragged_batch(BATCH_SIZE)
batch = next(iter(train_ds.take(1)))
image_batch = batch["images"]
label_batch = batch["labels"]
keras_cv.visualization.plot_image_gallery(
image_batch.to_tensor(),
rows=3,
cols=3,
value_range=(0, 255),
show=True,
)
"""## Data Augmentation
In our previous finetuning exmaple, we performed a static resizing operation
and did not utilize any image augmentation.
This is because a single pass over the training set was sufficient to achieve
decent results.
When training to solve a more difficult task, you'll want to include data
augmentation in your data pipeline.
Data augmentation is a technique to make your model robust to changes in input
data such as lighting, cropping, and orientation.
KerasCV includes some of the most useful augmentations in the
`keras_cv.layers` API.
Creating an optimal pipeline of augmentations is an art, but in this section
of the guide we'll offer some tips on best practices for classification.
One caveat to be aware of with image data augmentation is that you must be
careful to not shift your augmented data distribution too far from the
original data distribution.
The goal is to prevent overfitting and increase generalization,
but samples that lie completely out of the data distribution simply add noise
to the training process.
The first augmentation we'll use is `RandomFlip`.
This augmentation behaves more or less how you'd expect: it either flips the
image or not.
While this augmentation is useful in CalTech101 and ImageNet, it should be
noted that it should not be used on tasks where the data distribution is not
vertical mirror invariant.
An example of a dataset where this occurs is MNIST hand written digits.
Flipping a `6` over the
vertical axis will make the digit appear more like a `7` than a `6`, but the
label will still show a `6`.
"""
random_flip = keras_cv.layers.RandomFlip()
augmenters = [random_flip]
image_batch = random_flip(image_batch)
keras_cv.visualization.plot_image_gallery(
image_batch.to_tensor(),
rows=3,
cols=3,
value_range=(0, 255),
show=True,
)
"""Half of the images have been flipped!
The next augmentation we'll use is `RandomCropAndResize`.
This operation selects a random subset of the image, then resizes it to the
provided target size.
By using this augmentation, we force our classifier to become spatially
invariant.
Additionally, this layer accepts an `aspect_ratio_factor` which can be used to
distort the aspect ratio of the image.
While this can improve model performance, it should be used with caution.
It is very easy for an aspect ratio distortion to shift a sample too far from
the original training set's data distribution.
Remember - the goal of data augmentation is to produce more training samples
that align with the data distribution of your training set!
`RandomCropAndResize` also can handle `tf.RaggedTensor` inputs. In the
CalTech101 image dataset images come in a wide variety of sizes.
As such they cannot easily be batched together into a dense training batch.
Luckily, `RandomCropAndResize` handles the Ragged -> Dense conversion process
for you!
Let's add a `RandomCropAndResize` to our set of augmentations:
"""
crop_and_resize = keras_cv.layers.RandomCropAndResize(
target_size=IMAGE_SIZE,
crop_area_factor=(0.8, 1.0),
aspect_ratio_factor=(0.9, 1.1),
)
augmenters += [crop_and_resize]
image_batch = crop_and_resize(image_batch)
keras_cv.visualization.plot_image_gallery(
image_batch,
rows=3,
cols=3,
value_range=(0, 255),
show=True,
)
"""Great! We are now working with a batch of dense images.
Next up, lets include some spatial and color-based jitter to our training set.
This will allow us to produce a classifier that is robust to lighting
flickers, shadows, and more.
There are limitless ways to augment an image by altering color and spatial
features, but perhaps the most battle tested technique is
[`RandAugment`](https://arxiv.org/abs/1909.13719).
`RandAugment` is actually a set of 10 different augmentations:
`AutoContrast`, `Equalize`, `Solarize`, `RandomColorJitter`, `RandomContrast`,
`RandomBrightness`, `ShearX`, `ShearY`, `TranslateX` and `TranslateY`.
At inference time, `num_augmentations` augmenters are sampled for each image,
and random magnitude factors are sampled for each.
These augmentations are then applied sequentially.
KerasCV makes tuning these parameters easy using the `augmentations_per_image`
and `magnitude` parameters!
Let's take it for a spin:
"""
rand_augment = keras_cv.layers.RandAugment(
augmentations_per_image=3,
magnitude=0.3,
value_range=(0, 255),
)
augmenters += [rand_augment]
image_batch = rand_augment(image_batch)
keras_cv.visualization.plot_image_gallery(
image_batch,
rows=3,
cols=3,
value_range=(0, 255),
show=True,
)
"""Looks great; but we're not done yet!
What if an image is missing one critical feature of a class? For example,
what if a leaf is blocking the view of a cat's ear, but our classifier
learned to classify cats simply by observing their ears?
One easy approach to tackling this is to use `RandomCutout`, which randomly
strips out a sub-section of the image:
"""
random_cutout = keras_cv.layers.RandomCutout(
width_factor=0.4, height_factor=0.4
)
keras_cv.visualization.plot_image_gallery(
random_cutout(image_batch),
rows=3,
cols=3,
value_range=(0, 255),
show=True,
)
"""While this tackles the problem reasonably well, it can cause the classifier
to develop responses to borders between features and black pixel areas caused
by the cutout.
[`CutMix`](https://arxiv.org/abs/1905.04899) solves the same issue by using
a more complex (and more effective) technique.
Instead of replacing the cut-out areas with black pixels, `CutMix` replaces
these regions with regions of other images sampled from within your training
set!
Following this replacement, the image's classification label is updated to be
a blend of the original and mixed image's class label.
What does this look like in practice? Let's check it out:
"""
cut_mix = keras_cv.layers.CutMix()
# CutMix needs to modify both images and labels
inputs = {"images": image_batch, "labels": tf.cast(label_batch, "float32")}
keras_cv.visualization.plot_image_gallery(
cut_mix(inputs)["images"],
rows=3,
cols=3,
value_range=(0, 255),
show=True,
)
"""Let's hold off from adding it to our augmenter for a minute - more on that
soon!
Next, let's look into `MixUp()`.
Unfortunately, while `MixUp()` has been empirically shown to *substantially*
improve both the robustness and the generalization of the trained model,
it is not well-understood why such improvement occurs... but
a little alchemy never hurt anyone!
`MixUp()` works by sampling two images from a batch, then proceeding to
literally blend together their pixel intensities as well as their
classification labels.
Let's see it in action:
"""
mix_up = keras_cv.layers.MixUp()
# MixUp needs to modify both images and labels
inputs = {"images": image_batch, "labels": tf.cast(label_batch, "float32")}
keras_cv.visualization.plot_image_gallery(
mix_up(inputs)["images"],
rows=3,
cols=3,
value_range=(0, 255),
show=True,
)
"""If you look closely, you'll see that the images have been blended together.
Instead of applying `CutMix()` and `MixUp()` to every image, we instead pick
one or the other to apply to each batch.
This can be expressed using `keras_cv.layers.RandomChoice()`
"""
cut_mix_or_mix_up = keras_cv.layers.RandomChoice(
[cut_mix, mix_up], batchwise=True
)
augmenters += [cut_mix_or_mix_up]
"""Now let's apply our final augmenter to the training data:"""
augmenter = keras_cv.layers.Augmenter(augmenters)
train_ds = train_ds.map(augmenter, num_parallel_calls=tf_data.AUTOTUNE)
image_batch = next(iter(train_ds.take(1)))["images"]
keras_cv.visualization.plot_image_gallery(
image_batch,
rows=3,
cols=3,
value_range=(0, 255),
show=True,
)
"""We also need to resize our evaluation set to get dense batches of the image
size expected by our model. We use the deterministic
`keras_cv.layers.Resizing` in this case to avoid adding noise to our
evaluation metric.
"""
inference_resizing = keras_cv.layers.Resizing(
IMAGE_SIZE[0], IMAGE_SIZE[1], crop_to_aspect_ratio=True
)
eval_ds = eval_ds.map(inference_resizing, num_parallel_calls=tf_data.AUTOTUNE)
inference_resizing = keras_cv.layers.Resizing(
IMAGE_SIZE[0], IMAGE_SIZE[1], crop_to_aspect_ratio=True
)
eval_ds = eval_ds.map(inference_resizing, num_parallel_calls=tf_data.AUTOTUNE)
image_batch = next(iter(eval_ds.take(1)))["images"]
keras_cv.visualization.plot_image_gallery(
image_batch,
rows=3,
cols=3,
value_range=(0, 255),
show=True,
)
"""Finally, lets unpackage our datasets and prepare to pass them to
`model.fit()`, which accepts a tuple of `(images, labels)`.
"""
def unpackage_dict(inputs):
return inputs["images"], inputs["labels"]
train_ds = train_ds.map(unpackage_dict, num_parallel_calls=tf_data.AUTOTUNE)
eval_ds = eval_ds.map(unpackage_dict, num_parallel_calls=tf_data.AUTOTUNE)
"""Data augmentation is by far the hardest piece of training a modern
classifier.
Congratulations on making it this far!
## Optimizer Tuning
To achieve optimal performance, we need to use a learning rate schedule
instead of a single learning rate. While we won't go into detail on the
Cosine decay with warmup schedule used here, [you can read more about it
here](https://scorrea92.medium.com/cosine-learning-rate-decay-e8b50aa455b).
"""
def lr_warmup_cosine_decay(
global_step,
warmup_steps,
hold=0,
total_steps=0,
start_lr=0.0,
target_lr=1e-2,
):
# Cosine decay
learning_rate = (
0.5
* target_lr
* (
1
+ ops.cos(
math.pi
* ops.convert_to_tensor(
global_step - warmup_steps - hold, dtype="float32"
)
/ ops.convert_to_tensor(
total_steps - warmup_steps - hold, dtype="float32"
)
)
)
)
warmup_lr = (target_lr * (global_step / warmup_steps))
if hold > 0:
learning_rate = ops.where(
global_step > warmup_steps + hold, learning_rate, target_lr
)
learning_rate = ops.where(
global_step < warmup_steps, warmup_lr, learning_rate
)
return learning_rate
class WarmUpCosineDecay(
schedules.LearningRateSchedule
):
def __init__(
self, warmup_steps, total_steps, hold, start_lr=0.0, target_lr=1e-2
):
super().__init__()
self.start_lr = start_lr
self.target_lr = target_lr
self.warmup_steps = warmup_steps
self.total_steps = total_steps
self.hold = hold
def __call__(self, step):
lr = lr_warmup_cosine_decay(
global_step=step,
total_steps=self.total_steps,
warmup_steps=self.warmup_steps,
start_lr=self.start_lr,
target_lr=self.target_lr,
hold=self.hold,
)
return ops.where(step > self.total_steps, 0.0, lr)
"""![WarmUpCosineDecay schedule](https://i.imgur.com/YCr5pII.png)
The schedule looks a as we expect.
Next let's construct this optimizer:
"""
total_images = 9000
total_steps = (total_images // BATCH_SIZE) * EPOCHS
warmup_steps = int(0.1 * total_steps)
hold_steps = int(0.45 * total_steps)
schedule = WarmUpCosineDecay(
start_lr=0.05,
target_lr=1e-2,
warmup_steps=warmup_steps,
total_steps=total_steps,
hold=hold_steps,
)
optimizer = optimizers.SGD(
weight_decay=5e-4,
learning_rate=schedule,
momentum=0.9,
)
"""At long last, we can now build our model and call `fit()`!
`keras_cv.models.EfficientNetV2B0Backbone()` is a convenience alias for
`keras_cv.models.EfficientNetV2Backbone.from_preset('efficientnetv2_b0')`.
Note that this preset does not come with any pretrained weights.
"""
backbone = keras_cv.models.ResNet18V2Backbone()
model = keras.Sequential(
[
backbone,
keras.layers.GlobalMaxPooling2D(),
keras.layers.Dropout(rate=0.5),
keras.layers.Dense(101, activation="softmax"),
]
)
"""Since the labels produced by MixUp() and CutMix() are somewhat artificial,
we employ label smoothing to prevent the model from overfitting to artifacts
of this augmentation process.
"""
loss = losses.CategoricalCrossentropy(label_smoothing=0.1)
"""Let's compile our model:"""
model.compile(
loss=loss,
optimizer=optimizer,
metrics=[
metrics.CategoricalAccuracy(),
metrics.TopKCategoricalAccuracy(k=5),
],
)
"""and finally call fit()."""
model.fit(
train_ds,
epochs=EPOCHS,
validation_data=eval_ds,
)
"""Congratulations! You now know how to train a powerful image classifier
from scratch in KerasCV.
Depending on the availability of labeled data for your application, training
from scratch may or may not be more powerful than using transfer learning in
addition to the data augmentations discussed above. For smaller datasets,
pretrained models generally produce high accuracy and faster convergence.
## Conclusions
While image classification is perhaps the simplest problem in computer vision,
the modern landscape has numerous complex components.
Luckily, KerasCV offers robust, production-grade APIs to make assembling most
of these components possible in one line of code.
Through the use of KerasCV's `ImageClassifier` API, pretrained weights, and
KerasCV data augmentations you can assemble everything you need to train a
powerful classifier in a few hundred lines of code!
As a follow up exercise, give the following a try:
- Fine tune a KerasCV classifier on your own dataset
- Learn more about [KerasCV's data augmentations](https://keras.io/guides/keras_cv/cut_mix_mix_up_and_rand_augment/)
- Check out how we train our models on [ImageNet](https://github.com/keras-team/keras-cv/blob/master/examples/training/classification/imagenet/basic_training.py)
"""

@ -76,4 +76,6 @@ class LearningRateScheduler(Callback):
def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
logs["learning_rate"] = self.model.optimizer.learning_rate.value
logs["learning_rate"] = float(
backend.convert_to_numpy(self.model.optimizer.learning_rate)
)

@ -107,3 +107,18 @@ class LearningRateSchedulerTest(testing.TestCase):
callbacks=[lr_scheduler],
epochs=2,
)
@pytest.mark.requires_trainable_backend
def test_learning_rate_in_history(self):
lr_scheduler = callbacks.LearningRateScheduler(lambda step, lr: 0.5)
history = self.model.fit(
self.x_train,
self.y_train,
callbacks=[lr_scheduler],
epochs=1,
)
self.assertTrue("learning_rate" in history.history)
self.assertEqual(type(history.history["learning_rate"][0]), float)
self.assertEqual(history.history["learning_rate"][0], 0.5)

@ -391,7 +391,6 @@ class TestTensorBoardV2(testing.TestCase):
},
)
expected_image_summaries = {
_ObservedSummary(logdir=train_dir, tag="image"),
_ObservedSummary(logdir=train_dir, tag="bias/image"),
_ObservedSummary(logdir=train_dir, tag="kernel/image"),
}

@ -1,6 +1,5 @@
import numpy as np
import pytest
import tensorflow as tf
from keras_core import testing
from keras_core.layers.activations import elu
@ -21,11 +20,12 @@ class ELUTest(testing.TestCase):
)
def test_correctness(self):
def np_elu(x, alpha=1.0):
return (x > 0) * x + (x <= 0) * alpha * (np.exp(x) - 1)
x = np.random.random((2, 2, 5))
elu_layer = elu.ELU()
tf_elu_layer = tf.keras.layers.ELU()
self.assertAllClose(elu_layer(x), tf_elu_layer(x))
self.assertAllClose(elu_layer(x), np_elu(x))
elu_layer = elu.ELU(alpha=0.7)
tf_elu_layer = tf.keras.layers.ELU(alpha=0.7)
self.assertAllClose(elu_layer(x), tf_elu_layer(x))
self.assertAllClose(elu_layer(x), np_elu(x, alpha=0.7))

@ -1,6 +1,5 @@
import numpy as np
import pytest
import tensorflow as tf
from keras_core import testing
from keras_core.layers.activations import prelu
@ -22,6 +21,9 @@ class PReLUTest(testing.TestCase):
)
def test_prelu_correctness(self):
def np_prelu(x, alpha):
return (x > 0) * x + (x <= 0) * alpha * x
inputs = np.random.randn(2, 10, 5, 3)
prelu_layer = prelu.PReLU(
alpha_initializer="glorot_uniform",
@ -29,18 +31,9 @@ class PReLUTest(testing.TestCase):
alpha_constraint="non_neg",
shared_axes=(1, 2),
)
tf_prelu_layer = tf.keras.layers.PReLU(
alpha_initializer="glorot_uniform",
alpha_regularizer="l1",
alpha_constraint="non_neg",
shared_axes=(1, 2),
)
prelu_layer.build(inputs.shape)
tf_prelu_layer.build(inputs.shape)
weights = np.random.random((1, 1, 3))
prelu_layer.alpha.assign(weights)
tf_prelu_layer.alpha.assign(weights)
self.assertAllClose(prelu_layer(inputs), tf_prelu_layer(inputs))
ref_out = np_prelu(inputs, weights)
self.assertAllClose(prelu_layer(inputs), ref_out)

@ -4,6 +4,7 @@ import string
import numpy as np
from keras_core import backend
from keras_core import constraints
from keras_core import initializers
from keras_core import ops
@ -115,6 +116,8 @@ class MultiHeadAttention(Layer):
self.supports_masking = True
self._num_heads = num_heads
self._key_dim = key_dim
# Cache 1.0 / math.sqrt(self._key_dim).
self._inverse_sqrt_key_dim = None
self._value_dim = value_dim if value_dim else key_dim
self._dropout = dropout
self._use_bias = use_bias
@ -311,6 +314,9 @@ class MultiHeadAttention(Layer):
)
self._softmax = Softmax(axis=norm_axes)
self._dropout_layer = Dropout(rate=self._dropout)
self._inverse_sqrt_key_dim = backend.convert_to_tensor(
1.0 / math.sqrt(float(self._key_dim))
)
def _masked_softmax(self, attention_scores, attention_mask=None):
# Normalize the attention scores to probabilities.
@ -355,7 +361,7 @@ class MultiHeadAttention(Layer):
# Note: Applying scalar multiply at the smaller end of einsum improves
# XLA performance, but may introduce slight numeric differences in
# the Transformer attention head.
query = ops.multiply(query, 1.0 / math.sqrt(float(self._key_dim)))
query = ops.multiply(query, self._inverse_sqrt_key_dim)
# Take the dot product between "query" and "key" to get the raw
# attention scores.

@ -87,12 +87,14 @@ class Dense(Layer):
def build(self, input_shape):
input_dim = input_shape[-1]
self.kernel = self.add_weight(
name="kernel",
shape=(input_dim, self.units),
initializer=self.kernel_initializer,
regularizer=self.kernel_regularizer,
)
if self.use_bias:
self.bias = self.add_weight(
name="bias",
shape=(self.units,),
initializer=self.bias_initializer,
regularizer=self.bias_regularizer,

@ -1,6 +1,5 @@
import numpy as np
import pytest
import tensorflow as tf
from absl.testing import parameterized
from keras_core import layers
@ -95,21 +94,30 @@ class GlobalAveragePoolingCorrectnessTest(
("channels_last", False),
("channels_last", True),
("channels_first", False),
("channels_first", True),
)
def test_global_average_pooling1d(self, data_format, keepdims):
inputs = np.arange(24, dtype="float32").reshape((2, 3, 4))
def np_gap1d(x, data_format, keepdims, mask=None):
steps_axis = 1 if data_format == "channels_last" else 2
if mask is not None:
mask = np.expand_dims(
mask, 2 if data_format == "channels_last" else 1
)
x *= mask
res = np.sum(x, axis=steps_axis) / np.sum(mask, axis=steps_axis)
else:
res = np.mean(x, axis=steps_axis)
if keepdims:
res = np.expand_dims(res, axis=steps_axis)
return res
inputs = np.arange(24, dtype="float32").reshape((2, 3, 4))
layer = layers.GlobalAveragePooling1D(
data_format=data_format,
keepdims=keepdims,
)
tf_keras_layer = tf.keras.layers.GlobalAveragePooling1D(
data_format=data_format,
keepdims=keepdims,
)
outputs = layer(inputs)
expected = tf_keras_layer(inputs)
expected = np_gap1d(inputs, data_format, keepdims)
self.assertAllClose(outputs, expected)
if data_format == "channels_last":
@ -117,47 +125,53 @@ class GlobalAveragePoolingCorrectnessTest(
else:
mask = np.array([[1, 1, 0, 0], [0, 1, 0, 1]], dtype="int32")
outputs = layer(inputs, mask)
expected = tf_keras_layer(inputs, mask)
expected = np_gap1d(inputs, data_format, keepdims, mask)
self.assertAllClose(outputs, expected)
@parameterized.parameters(
("channels_last", False),
("channels_last", True),
("channels_first", False),
("channels_first", True),
)
def test_global_average_pooling2d(self, data_format, keepdims):
inputs = np.arange(96, dtype="float32").reshape((2, 3, 4, 4))
def np_gap2d(x, data_format, keepdims):
steps_axis = [1, 2] if data_format == "channels_last" else [2, 3]
res = np.apply_over_axes(np.mean, x, steps_axis)
if not keepdims:
res = res.squeeze()
return res
inputs = np.arange(96, dtype="float32").reshape((2, 3, 4, 4))
layer = layers.GlobalAveragePooling2D(
data_format=data_format,
keepdims=keepdims,
)
tf_keras_layer = tf.keras.layers.GlobalAveragePooling2D(
data_format=data_format,
keepdims=keepdims,
)
outputs = layer(inputs)
expected = tf_keras_layer(inputs)
expected = np_gap2d(inputs, data_format, keepdims)
self.assertAllClose(outputs, expected)
@parameterized.parameters(
("channels_last", False),
("channels_last", True),
("channels_first", False),
("channels_first", True),
)
def test_global_average_pooling3d(self, data_format, keepdims):
inputs = np.arange(360, dtype="float32").reshape((2, 3, 3, 5, 4))
def np_gap3d(x, data_format, keepdims):
steps_axis = (
[1, 2, 3] if data_format == "channels_last" else [2, 3, 4]
)
res = np.apply_over_axes(np.mean, x, steps_axis)
if not keepdims:
res = res.squeeze()
return res
inputs = np.arange(360, dtype="float32").reshape((2, 3, 3, 5, 4))
layer = layers.GlobalAveragePooling3D(
data_format=data_format,
keepdims=keepdims,
)
tf_keras_layer = tf.keras.layers.GlobalAveragePooling3D(
data_format=data_format,
keepdims=keepdims,
)
outputs = layer(inputs)
expected = tf_keras_layer(inputs)
expected = np_gap3d(inputs, data_format, keepdims)
self.assertAllClose(outputs, expected)

@ -271,6 +271,8 @@ class ExtractPatches(Operation):
data_format="channels_last",
):
super().__init__()
if isinstance(size, int):
size = (size, size)
self.size = size
self.strides = strides
self.dilation_rate = dilation_rate
@ -348,14 +350,16 @@ def extract_patches(
Examples:
>>> image = np.random.random((1, 20, 20, 3)) # batch of 2 RGB images
>>> image = np.random.random(
... (2, 20, 20, 3)
... ).astype("float32") # batch of 2 RGB images
>>> patches = keras_core.ops.image.extract_patches(image, (5, 5))
>>> patches.shape
(1, 4, 4, 75)
>>> image = np.random.random((20, 20, 3)) # batch of 2 RGB images
(2, 4, 4, 75)
>>> image = np.random.random((20, 20, 3)).astype("float32") # 1 RGB image
>>> patches = keras_core.ops.image.extract_patches(image, (3, 3), (1, 1))
>>> patches.shape
(4, 4, 75)
(18, 18, 27)
"""
if any_symbolic_tensors((image,)):
return ExtractPatches(

@ -31,6 +31,8 @@ class ImageOpsDynamicShapeTest(testing.TestCase):
p_h, p_w = 5, 5
out = kimage.extract_patches(x, (p_h, p_w))
self.assertEqual(out.shape, (None, 4, 4, 75))
out = kimage.extract_patches(x, 5)
self.assertEqual(out.shape, (None, 4, 4, 75))
class ImageOpsStaticShapeTest(testing.TestCase):
@ -50,6 +52,8 @@ class ImageOpsStaticShapeTest(testing.TestCase):
p_h, p_w = 5, 5
out = kimage.extract_patches(x, (p_h, p_w))
self.assertEqual(out.shape, (4, 4, 75))
out = kimage.extract_patches(x, 5)
self.assertEqual(out.shape, (4, 4, 75))
AFFINE_TRANSFORM_INTERPOLATIONS = { # map to order
@ -310,9 +314,7 @@ class ImageOpsCorrectnessTest(testing.TestCase, parameterized.TestCase):
and backend.backend() == "tensorflow"
and dilation_rate > 1
):
pytest.skip(
"dilation_rate>1 with strides>1 than not supported with TF"
)
pytest.skip("dilation_rate>1 with strides>1 not supported with TF")
if data_format == "channels_first":
image = np.random.uniform(size=(1, 3, 20, 20))
else: