#python-3.x #tensorflow2.0 #tensorboard #tf.keras #asynccallback
Вопрос:
Я пытаюсь реализовать пользовательский обратный вызов, чтобы отобразить карты объектов каждого Conv2D
слоя в сети TensorBoard
.
Когда я запускаю код, Example 1
я получаю следующую ошибку:
<ipython-input-44-b691dabedd05> in on_epoch_end(self, epoch, logs)
28
29 # 3) Build partial model
---> 30 partial_model = keras.Model(
31 inputs=self.model.model.input,
32 outputs=output_layers
ValueError: Output tensors of a Functional model must be the output of a TensorFlow `Layer` (thus holding past layer metadata). Found: <keras.engine.base_layer.Layer object at 0x000002773C631CA0>
который работает так, как будто он не может построить частичную сеть, что странно, потому что он работает успешно, когда выполняется отдельно от основного потока.
Вот пример, иллюстрирующий проблему:
Пример 1
import os
import io
import datetime as dt
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras.datasets import cifar10
import matplotlib.pyplot as plt
'''
You can adjust the verbosity of the logs which are being printed by TensorFlow
by changing the value of TF_CPP_MIN_LOG_LEVEL:
0 = all messages are logged (default behavior)
1 = INFO messages are not printed
2 = INFO and WARNING messages are not printed
3 = INFO, WARNING, and ERROR messages are not printed
'''
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
DEBUG = False
class ConvModel(keras.Model):
def __init__(self, input_shape):
super().__init__()
self.input_image_shape = input_shape
self.model = keras.Sequential([
layers.Input(shape=input_shape),
layers.Conv2D(32, 3),
layers.BatchNormalization(),
layers.ReLU(),
layers.MaxPool2D(),
layers.Conv2D(64, 5),
layers.BatchNormalization(),
layers.ReLU(),
layers.MaxPool2D(),
layers.Conv2D(128, 3, kernel_regularizer=keras.regularizers.l2(0.01)),
layers.BatchNormalization(),
layers.ReLU(),
layers.Flatten(),
layers.Dense(64, activation='relu', kernel_regularizer=keras.regularizers.l2(0.01)),
layers.Dropout(0.5),
layers.Dense(10)
])
def call(self, inputs):
return self.model(inputs)
def find_sub_string(string: str, sub_string: str):
return True if string.find(sub_string) > -1 else False
def get_file_type(file_name: str):
file_type = None
if isinstance(file_name, str):
dot_idx = file_name.find('.')
if dot_idx > -1:
file_type = file_name[dot_idx 1:]
return file_type
def get_image_from_figure(figure):
buffer = io.BytesIO()
plt.savefig(buffer, format='png')
plt.close(figure)
buffer.seek(0)
image = tf.image.decode_png(buffer.getvalue(), channels=4)
image = tf.expand_dims(image, 0)
return image
class ConvLayerVis(keras.callbacks.Callback):
def __init__(self, X, figure_configs: dict, log_dir: str, log_interval: int):
super().__init__()
self.X_test = X
n_dims = len(self.X_test.shape)
assert 2 < n_dims < 5, f'The shape of the test image should be less than 5 and grater than 2, but current shape is {self.X_test.shape}'
# In case the image is not represented as a tensor - add a dimension to the left for the batch
if len(self.X_test.shape) < 4:
self.X_test = np.reshape(self.X_test, (1,) self.X_test.shape)
self.file_writer = tf.summary.create_file_writer(log_dir)
self.figure_configs = figure_configs
self.log_interval = log_interval
def on_training_begin(self, logs=None):
pass
def on_epoch_end(self, epoch, logs=None):
# 1) Get the layers
if epoch % self.log_interval == 0:
# 1) Get the layers
output_layer_tuples = [(idx, layer) for idx, layer in enumerate(self.model.model.layers) if find_sub_string(layer.name, 'conv2d') or find_sub_string(layer.name, 'max_pooling2d')]
output_layers = [layer_tuple[1] for layer_tuple in output_layer_tuples]
# 2) Get the layer names
conv_layer_name_tuples = [(layer_tuple[0], f'Layer #{layer_tuple[0]} - Conv 2D ') for layer_tuple in output_layer_tuples if find_sub_string(layer_tuple[1].name, 'conv2d')]
max_pool_layer_name_tuples = [(layer_tuple[0], f'Layer #{layer_tuple[0]} - Max Pooling 2D') for layer_tuple in output_layer_tuples if find_sub_string(layer_tuple[1].name, 'max_pooling2d')]
layer_name_tuples = (conv_layer_name_tuples max_pool_layer_name_tuples)
layer_name_tuples.sort(key=lambda x: x[0])
layer_names = [layer_name_tuple[1] for layer_name_tuple in layer_name_tuples]
# 3) Build partial model
partial_model = keras.Model(
inputs=model.model.input,
outputs=output_layers
)
# 4) Get the feature maps
feature_maps = partial_model.predict(self.X_test)
# 5) Plot
rows, cols = self.figure_configs.get('rows'), self.figure_configs.get('cols')
for feature_map, layer_name in zip(feature_maps, layer_names):
fig, ax = plt.subplots(rows, cols, figsize=self.figure_configs.get('figsize'))
for row in range(rows):
for col in range(cols):
ax[row][col].imshow(feature_map[0, :, :, row col], cmap=self.figure_configs.get('cmap'))
fig.suptitle(f'{layer_name}')
with self.file_writer.as_default():
tf.summary.image(f'{layer_name} Feature Maps', get_image_from_figure(figure=fig), step=epoch)
if __name__ == '__main__':
print(tf.config.list_physical_devices('GPU'))
# Load the data
(X, y), (X_test, y_test) = cifar10.load_data()
X, X_test = X.astype(np.float32) / 255.0, X_test.astype(np.float32) / 255.0
n, w, h, c = X.shape[0], X.shape[1], X.shape[2], X.shape[3]
n_test, w_test, h_test, c_test = X_test.shape[0], X_test.shape[1], X_test.shape[2], X_test.shape[3]
print(f'''
Dataset Stats:
Number of train images: {n}
Dimensions:
> Train:
width = {w}, height = {h}, channels = {c}
> Test:
width = {w_test}, height = {h_test}, channels = {c_test}
''')
# Model with keras.Sequential
model = ConvModel(input_shape=(w, h, c))
model.compile(loss=keras.losses.SparseCategoricalCrossentropy(from_logits=True), optimizer=keras.optimizers.Adam(learning_rate=3e-4), metrics=['accuracy'])
log_dir = f'./logs/{dt.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}'
callbacks = [
keras.callbacks.TensorBoard(
log_dir=log_dir,
write_images=True
),
ConvLayerVis(
X=X[0],
figure_configs=dict(rows=5, cols=5, figsize=(35, 35), cmap='gray'),
log_dir=f'{log_dir}/train',
log_interval=3
)
]
model.fit(
X,
y,
batch_size=64,
epochs=15,
callbacks=callbacks
)
Заранее спасибо за любую помощь по этому вопросу.
Ответ №1:
Только что выяснил проблему:
output_layers = [layer_tuple[1].output for layer_tuple in output_layer_tuples]
Должен был быть восстановлен output
атрибут каждого слоя.