基础操作
安装
- 从 TensorFlow 2.1 开始,pip 包 tensorflow 即同时包含 GPU 支持,无需通过特定的 pip 包 tensorflow-gpu 安装 GPU 版本。如果对 pip 包的大小敏感,可使用 tensorflow-cpu 包安装仅支持 CPU 的 TensorFlow 版本。
- conda虚拟环境
conda create --name [env-name] # 建立名为[env-name]的Conda虚拟环境
conda activate [env-name] # 进入名为[env-name]的Conda虚拟环境
conda deactivate # 退出当前的Conda虚拟环境
conda env remove --name [env-name] # 删除名为[env-name]的Conda虚拟环境
conda env list # 列出所有Conda虚拟环境
宿主机驱动
- 测试:nvidia-smi
CUDA Toolkit 和 cuDNN 的安装
conda install cudatoolkit=X.X
conda install cudnn=X.X.X
conda install tensorflow-gpu
限定程序使用GPU
- 方法1
import os
os.environ [“CUDA_DEVICE_ORDER”] = “PCI_BUS_ID”
os.environ [“CUDA_VISIBLE_DEVICES”] = “-1”
- 方法2
with tf.device (’/cpu:0’):
A = tf.constant ([[1, 2], [3, 4]])
B = tf.constant ([[5, 6], [7, 8]])
C = tf.matmul (A, B)
- 方法3
import tensorflow as tf
tf.debugging.set_log_device_placement (True) # 设置输出运算所在的设备cpus = tf.config.list_physical_devices ('CPU') # 获取当前设备的 CPU 列表
tf.config.set_visible_devices (cpus) # 设置 TensorFlow 的可见设备范围为 cpuA = tf.constant ([[1, 2], [3, 4]])
B = tf.constant ([[5, 6], [7, 8]])
C = tf.matmul (A, B)print (C)
tnsorflow2
基础知识
- 前置知识
- python 对象的属性
- dir(对象)
- vars(对象)
- help(对象)
python技巧
- Python 中可以使用整数后加小数点表示将该整数定义为浮点数类型。例如 3. 代表浮点数 3.0
- zip()在python3中是一个生成器,
TensorFlow 模型建立与训练
- 模型的构建: tf.keras.Model 和 tf.keras.layers
- keras 中两个比较重要的概念是是模型(Model)和层(Layer)。曾将各种计算流程和变量进行了封装,而模型则将各个层进行了组织和链接,并且封装成了一个整体,描述了如何将输入数据通过各个层以及运算而得到输出。需要调用模型的时候就需要:y=model(X)的形式就可以。keras中layers中预定义层,同时也允许我们自定义层。
- 模型的损失函数: tf.keras.losses
- 模型的优化器: tf.keras.optimizer
- 模型的评估: tf.keras.metrics
class MyModel(tf.keras.Model):
def __init__(self):
super().__init__() # Python 2 下使用 super(MyModel, self).__init__()
# 此处添加初始化代码(包含 call 方法中会用到的层),例如
# layer1 = tf.keras.layers.BuiltInLayer(...)
# layer2 = MyCustomLayer(...) def call(self, input):
# 此处添加模型调用的代码(处理输入并返回输出),例如
# x = layer1(input)
# output = layer2(x)
return output # 还可以添加自定义的方法
示例代码
import tensorflow as tf
print(tf.__version__)
# 设定cpu或者gpu
tf.debugging.set_log_device_placement (True)
cpus = tf.config.list_physical_devices(device_type='CPU')
print('cpus',cpus)
tf.config.set_visible_devices (cpus)
A = tf.constant([[1,2],[3,4]])print('A',A,type(A),A.numpy(),dir(A))B = tf.constant([[5,6],[7,8]])
C = tf.matmul(A,B)
print(C,type(C),C.shape,vars(C))# 临时指定cpu或者gpu
with tf.device('/cpu:0'):
A = tf.constant([[1,2],[3,4]])
B = tf.constant([[5,6],[7,8]])
C = tf.matmul(A,B)
print(C,type(C),C.shape,C.device)# 张量,标量随机数
random_float = tf.random.uniform(shape=())
print('random_float',random_float)# 定义一个有2个元素的零向量
zero_vector = tf.zeros(shape=(2))
print('zero_vector',zero_vector)# 自动求导机制
x = tf.Variable(initial_value=3.)with tf.GradientTape() as tape:
y = tf.square(x)
y_grad = tape.gradient(y,x)
print('y_grad',y_grad)print('tape',tape,type(tape))X = tf.constant([[1.,2.],[3.,4.]])
y = tf.constant([[1.],[2.]])
w = tf.Variable(initial_value=[[1.],[2.]])
b = tf.Variable(initial_value=1.)
with tf.GradientTape() as tape:
L = tf.reduce_sum(tf.square(tf.matmul(X, w) + b - y))
w_grad,b_grad = tape.gradient(L,[w,b])
print(w_grad,b_grad)
tensorflow2线性模型
步骤
- 使用 tf.keras.datasets 获得数据集并预处理
- 使用 tf.keras.Model 和 tf.keras.layers 构建模型
- 构建模型训练流程,使用 tf.keras.losses 计算损失函数,并使用 tf.keras.optimizer 优化模型
- 构建模型评估流程,使用 tf.keras.metrics 计算评估指标
源代码
# Data
import numpy as np
import tensorflow as tf# data
X_raw = np.array([2013, 2014, 2015, 2016, 2017], dtype=np.float32)
y_raw = np.array([12000, 14000, 15000, 16500, 17500], dtype=np.float32)# 归一化
X = (X_raw - X_raw.min()) / (X_raw.max() - X_raw.min())
y = (y_raw - y_raw.min()) / (y_raw.max() - y_raw.min())X = tf.constant(X)
y = tf.constant(y)a = tf.Variable(initial_value=0.)
b = tf.Variable(initial_value=0.)
variables = [a, b]num_epoch = 10000
optimizer = tf.keras.optimizers.SGD(learning_rate=5e-4)
for e in range(num_epoch):
# 使用tf.GradientTape()记录损失函数的梯度信息
with tf.GradientTape() as tape:
y_pred = a * X + b
loss = tf.reduce_sum(tf.square(y_pred - y))
# TensorFlow自动计算损失函数关于自变量(模型参数)的梯度
grads = tape.gradient(loss, variables)
# TensorFlow自动根据梯度更新参数
optimizer.apply_gradients(grads_and_vars=zip(grads, variables))print(a, b)# 方法2 keras
X = tf.constant([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
y = tf.constant([[10.0], [20.0]])class Linear(tf.keras.Model):
def __init__(self,):
super().__init__()
self.dense = tf.keras.layers.Dense(units=1,
use_bias=True,
activation=tf.nn.relu, kernel_regularizer=tf.zeros_initializer(),
bias_regularizer=tf.zeros_initializer) def call(self,input):
output = self.dense(input)
return outputmodel = Linear()
optimizer = tf.keras.optimizers.SGD(learning_rate=0.01)
for i in range(100):
with tf.GradientTape() as tape:
y_pred = model(X)
loss = tf.reduce_mean(tf.square(y_pred-y))
grads = tape.gradient(loss,model.variables)
optimizer.apply_gradients(grads_and_vars=zip(grads,model.variables))print(model.variables)# 模型的评估
sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
num_batches = int(data_loader.num_test_data // batch_size)
for batch_index in range(num_batches):
start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
y_pred = model.predict(data_loader.test_data[start_index: end_index])
sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
print("test accuracy: %f" % sparse_categorical_accuracy.result())
CNN
基础知识
- 卷积神经网络(Convolutional Neural Network, CNN)是一种结构类似于人类或动物的 视觉系统 的人工神经网络,包含一个或多个卷积层(Convolutional Layer)、池化层(Pooling Layer)和全连接层(Fully-connected Layer)。
示例代码
# -*- coding:utf-8 -*-
# /usr/bin/python
'''
-------------------------------------------------
File Name : CNN
Description : AIM: 卷积神经网络
Functions: 1. 卷积层
2. 池化层
Envs : python == 3.7
pip install tensorflow==2.1.0 -i https://pypi.douban.com/simple
Author : errol
Date : 2020/5/1 15:15
CodeStyle : 规范,简洁,易懂,可阅读,可维护,可移植!
-------------------------------------------------
Change Activity:
2020/5/1 : 手写字体识别实现卷积和池化层和全连接层的计算
-------------------------------------------------
'''import tensorflow as tf
import numpy as npclass MNISTLoader():
def __init__(self,):
mnist = tf.keras.datasets.mnist
(self.train_data,self.train_label),(self.test_data,self.test_label) =mnist.load_data()
# MNIST中的图像默认为uint8(0-255的数字)。以下代码将其归一化到0-1之间的浮点数,并在最后增加一维作为颜色通道
self.train_data = np.expand_dims(self.train_data.astype(np.float32) / 255.0, axis=-1) # [60000, 28, 28, 1]
self.test_data = np.expand_dims(self.test_data.astype(np.float32) / 255.0, axis=-1) # [10000, 28, 28, 1]
self.train_label = self.train_label.astype(np.int32) # [60000]
self.test_label = self.test_label.astype(np.int32) # [10000]
self.num_train_data, self.num_test_data = self.train_data.shape[0], self.test_data.shape[0] def get_batch(self, batch_size):
# 从数据集中随机取出batch_size个元素并返回
index = np.random.randint(0, np.shape(self.train_data)[0], batch_size)
return self.train_data[index, :], self.train_label[index]class CNN(tf.keras.Model): def __init__(self,):
super().__init__()
# 卷积层
self.conv1 = tf.keras.layers.Conv2D(
filters=32, # 卷积层神经元数目(卷积核)
kernel_size = [5,5], # 感受野大小
padding = 'same', # padding策略(vaild 或者 same)
activation =tf.nn.relu # 激活函数
)
# 池化层
self.pool1 = tf.keras.layers.MaxPool2D(pool_size=[2,2],strides=2)
# 卷积层
self.conv2 = tf.keras.layers.Conv2D(
filters=64, # 卷积层神经元数目(卷积核)
kernel_size=[5, 5], # 感受野大小
padding='same', # padding策略(vaild 或者 same)
activation=tf.nn.relu # 激活函数
)
# 池化层
self.pool2 = tf.keras.layers.MaxPool2D(pool_size=[2,2],strides=2)
# 重新reshap
self.flatten = tf.keras.layers.Reshape(target_shape=[7*7*64,])
self.dense1 = tf.keras.layers.Dense(units=1024,activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units=10) def call(self,inputs):
'''重载,目的 mode(x)'''
x = self.conv1(inputs) # [batch_size, 28, 28, 32]
x = self.pool1(x) # [batch_size, 14, 14, 32]
x = self.conv2(x) # [batch_size, 14, 14, 64]
x = self.pool2(x) # [batch_size, 7, 7, 64]
x = self.flatten(x) # [batch_size, 7 * 7 * 64]
x = self.dense1(x) # [batch_size, 1024]
x = self.dense2(x) # [batch_size, 10]
output = tf.nn.softmax(x)
return output
# 训练模型
num_epochs = 5
batch_size = 50
learning_rate = 0.02model = CNN()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
for batch_index in range(num_batches):
X, y = data_loader.get_batch(batch_size)
with tf.GradientTape() as tape:
y_pred = model(X)
loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
loss = tf.reduce_mean(loss)
print("batch %d: loss %f" % (batch_index, loss.numpy()))
grads = tape.gradient(loss, model.variables)
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))# 模型的评估sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
num_batches = int(data_loader.num_test_data // batch_size)
for batch_index in range(num_batches):
start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
y_pred = model.predict(data_loader.test_data[start_index: end_index])
sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
print("test accuracy: %f" % sparse_categorical_accuracy.result())
keras 预定义了卷积神经网络结构
- tf.keras.applications中有很多经典的卷积神经网络结构。VGG16、VGG19、ResNet、MobileNet等
- model = tf.keras.applications.MobileNetV2()
- 默认的参数包括:input_shape 默认大小是2242243,3232 或7575有长款至少限制;include_top:在网络最后是否包含全连接层,默认是true;weights:预训练权值,默认是’imagenet‘即为当前模型载入在ImageNet数据集上预训练的权值。classes:分类数,默认1000,修改该参数需要include_top参数为True且weights参数为None。
实例代码
import tensorflow as tf
import tensorflow_datasets as tfdsnum_batches = 1000
batch_size = 50
learning_rate = 0.001dataset = tfds.load("tf_flowers", split=tfds.Split.TRAIN, as_supervised=True)
dataset = dataset.map(lambda img, label: (tf.image.resize(img, [224, 224]) / 255.0, label)).shuffle(1024).batch(32)
model = tf.keras.applications.MobileNetV2(weights=None, classes=5)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
for images, labels in dataset:
with tf.GradientTape() as tape:
labels_pred = model(images)
loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=labels, y_pred=labels_pred)
loss = tf.reduce_mean(loss)
print("loss %f" % loss.numpy())
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(grads_and_vars=zip(grads, model.trainable_variables))
循环神经网络(RNN)
- 循环神经网络(Recurrent Neural Network, RNN)是一种适宜于处理序列数据的神经网络,被广泛用于语言模型、文本生成、机器翻译等。
示例代码
# -*- coding:utf-8 -*-
# /usr/bin/python
'''
-------------------------------------------------
File Name : RNN
Description : AIM:
Functions: 1. 基本原理
2. 概念
3. 示例代码
Envs : python == 3.7
pip install tensorflow==2.1.0 -i https://pypi.douban.com/simple
Author : errol
Date : 2020/5/2 15:20
CodeStyle : 规范,简洁,易懂,可阅读,可维护,可移植!
-------------------------------------------------
Change Activity:
2020/5/2 : text
-------------------------------------------------
'''import numpy as np
import tensorflow as tfclass DataLoader(object): def __init__(self,):
path = tf.keras.utils.get_file('nietzsche.txt',
origin='https://s3.amazonaws.com/text-datasets/nietzsche.txt') with open(path, encoding='utf-8') as file:
self.raw_text = file.read().lower()
self.chars = sorted(list(set(self.raw_text)))
self.char_indices = dic((c,i) for i,c in enumerate(self.chars))
self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
self.text = [self.char_indices[c] for c in self.raw_text] def get_batch(self, seq_length, batch_size):
seq = []
next_char = []
for i in range(batch_size):
index = np.random.randint(0, len(self.text) - seq_length)
seq.append(self.text[index:index + seq_length])
next_char.append(self.text[index + seq_length])
return np.array(seq), np.array(next_char) # [batch_size, seq_length], [num_batch]class RNN(tf.keras.Model): def __init__(self,num_chars,batch_size,seq_length):
super().__init__()
self.num_chars = num_chars
self.seq_length = seq_length
self.batch_size = batch_size
self.cell = tf.keras.layers.LSTMCell(units=256)
self.dense = tf.keras.layers.Dense(units=self.num_chars) def call(self,inputs,from_logits=False):
inputs = tf.one_hot(inputs,depth=self.num_chars)
state = self.cell.get_initial_state(batch_size=self.batch_size,dtype=tf.float32)
for i in range(self.seq_length):
output,stat = self.cell(inputs[:,t,:],state)
logits =self.dense(output)
if from_logits:
return logits
else:
return tf.nn.softmax(logits)
num_batches = 1000
seq_length = 40
batch_size = 50
learning_rate = 1e-3
'''
从 DataLoader 中随机取一批训练数据;
将这批数据送入模型,计算出模型的预测值;
将模型预测值与真实值进行比较,计算损失函数(loss);
计算损失函数关于模型变量的导数;
使用优化器更新模型参数以最小化损失函数。
'''
data_loader = DataLoader()
model = RNN(num_chars=len(data_loader.chars), batch_size=batch_size, seq_length=seq_length)
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
for batch_index in range(num_batches):
X, y = data_loader.get_batch(seq_length, batch_size)
with tf.GradientTape() as tape:
y_pred = model(X)
loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
loss = tf.reduce_mean(loss)
print("batch %d: loss %f" % (batch_index, loss.numpy()))
grads = tape.gradient(loss, model.variables)
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
def predict(self, inputs, temperature=1.):
batch_size, _ = tf.shape(inputs)
logits = self(inputs, from_logits=True)
prob = tf.nn.softmax(logits / temperature).numpy()
return np.array([np.random.choice(self.num_chars, p=prob[i, :])
for i in range(batch_size.numpy())])X_, _ = data_loader.get_batch(seq_length, 1)
for diversity in [0.2, 0.5, 1.0, 1.2]:
X = X_
print("diversity %f:" % diversity)
for t in range(400):
y_pred = model.predict(X, diversity)
print(data_loader.indices_char[y_pred[0]], end='', flush=True)
X = np.concatenate([X[:, 1:], np.expand_dims(y_pred, axis=1)], axis=-1)
print("\n")
DRL 深度强化学习
- 强化学习 (Reinforcement learning,RL)强调如何基于环境而行动,以取得最大化的预期利益。结合了深度学习技术后的强化学习(Deep Reinforcement learning,DRL)更是如虎添翼。近年广为人知的 AlphaGo 即是深度强化学习的典型应用。
- DRL中有两个新的概念:”智能体“和”环境“。智能体通过与环境的交互学习策略,从而最大化自己在环境中所获得的的奖励。
- 监督学习关注的是预测,那么强化学习关注的是决策。
动态规划 (Dynamic Programming DP)
- 基本思路:将一个问题分解为若干个结构相同的子问题。并保存已经解决的子问题的人答案。
- 最优子结构:一个最优策略的子策略也最优的。
- 无后效性:过去的步骤只能通过当前的状态影响未来的发展,当前是历史的总结。
数字三角形
从直接算法到迭代算法
示例:倒立摆
# -*- coding:utf-8 -*-
# /usr/bin/python
'''
-------------------------------------------------
File Name : DRL
Description : AIM: 倒立摆
Functions: 1. DRl理论
2. 示例代码
Envs : python ==
pip install -i https://pypi.douban.com/simple
Author : errol
Date : 2020/5/2 16:25
CodeStyle : 规范,简洁,易懂,可阅读,可维护,可移植!
-------------------------------------------------
Change Activity:
2020/5/2 : text
-------------------------------------------------
'''
import tensorflow as tf
import numpy as np
import gym
import random
from collections import dequenum_episodes = 500 # 游戏训练的总episode数量
num_exploration_episodes = 100 # 探索过程所占的episode数量
max_len_episode = 1000 # 每个episode的最大回合数
batch_size = 32 # 批次大小
learning_rate = 1e-3 # 学习率
gamma = 1. # 折扣因子
initial_epsilon = 1. # 探索起始时的探索率
final_epsilon = 0.01 # 探索终止时的探索率
class QNetwork(tf.keras.Model):
def __init__(self):
super().__init__()
self.dense1 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units=24, activation=tf.nn.relu)
self.dense3 = tf.keras.layers.Dense(units=2) def call(self, inputs):
x = self.dense1(inputs)
x = self.dense2(x)
x = self.dense3(x)
return x def predict(self, inputs):
q_values = self(inputs)
return tf.argmax(q_values, axis=-1)if __name__ == '__main__':
env = gym.make('CartPole-v1') # 实例化一个游戏环境,参数为游戏名称
model = QNetwork()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
replay_buffer = deque(maxlen=10000) # 使用一个 deque 作为 Q Learning 的经验回放池
epsilon = initial_epsilon
for episode_id in range(num_episodes):
state = env.reset() # 初始化环境,获得初始状态
epsilon = max( # 计算当前探索率
initial_epsilon * (num_exploration_episodes - episode_id) / num_exploration_episodes,
final_epsilon)
for t in range(max_len_episode):
env.render() # 对当前帧进行渲染,绘图到屏幕
if random.random() < epsilon: # epsilon-greedy 探索策略,以 epsilon 的概率选择随机动作
action = env.action_space.sample() # 选择随机动作(探索)
else:
action = model.predict(np.expand_dims(state, axis=0)).numpy() # 选择模型计算出的 Q Value 最大的动作
action = action[0] # 让环境执行动作,获得执行完动作的下一个状态,动作的奖励,游戏是否已结束以及额外信息
next_state, reward, done, info = env.step(action)
# 如果游戏Game Over,给予大的负奖励
reward = -10. if done else reward
# 将(state, action, reward, next_state)的四元组(外加 done 标签表示是否结束)放入经验回放池
replay_buffer.append((state, action, reward, next_state, 1 if done else 0))
# 更新当前 state
state = next_state if done: # 游戏结束则退出本轮循环,进行下一个 episode
print("episode %d, epsilon %f, score %d" % (episode_id, epsilon, t))
break if len(replay_buffer) >= batch_size:
# 从经验回放池中随机取一个批次的四元组,并分别转换为 NumPy 数组
batch_state, batch_action, batch_reward, batch_next_state, batch_done = zip(
*random.sample(replay_buffer, batch_size))
batch_state, batch_reward, batch_next_state, batch_done = \
[np.array(a, dtype=np.float32) for a in [batch_state, batch_reward, batch_next_state, batch_done]]
batch_action = np.array(batch_action, dtype=np.int32) q_value = model(batch_next_state)
y = batch_reward + (gamma * tf.reduce_max(q_value, axis=1)) * (1 - batch_done) # 计算 y 值
with tf.GradientTape() as tape:
loss = tf.keras.losses.mean_squared_error( # 最小化 y 和 Q-value 的距离
y_true=y,
y_pred=tf.reduce_sum(model(batch_state) * tf.one_hot(batch_action, depth=2), axis=1)
)
grads = tape.gradient(loss, model.variables)
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables)) # 计算梯度并更新参数
Keras Sequential/Functional API 模式建立模型
- 最典型和常用的神经网络结构是将一堆层按特定的顺序叠加起来。tf.keras.models.Sequential() 提供一个层的列表。
model = tf.keras.models.Sequential([
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(100, activation=tf.nn.relu),
tf.keras.layers.Dense(10),
tf.keras.layers.Softmax()
])
- Keras 提供了 Functional API,帮助我们建立更为复杂的模型,例如多输入 / 输出或存在参数共享的模型。
inputs = tf.keras.Input(shape=(28, 28, 1))
x = tf.keras.layers.Flatten()(inputs)
x = tf.keras.layers.Dense(units=100, activation=tf.nn.relu)(x)
x = tf.keras.layers.Dense(units=10)(x)
outputs = tf.keras.layers.Softmax()(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
- 使用keras Model的compile/fit 和evaluate方法训练和评估模型。
model.compile(
optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
loss=tf.keras.losses.sparse_categorical_crossentropy,
metrics=[tf.keras.metrics.sparse_categorical_accuracy]
)
- oplimizer :优化器,可从 tf.keras.optimizers 中选择;
- loss :损失函数,可从 tf.keras.losses 中选择;
- metrics :评估指标,可从 tf.keras.metrics 中选择。
- model.fit(data_loader.train_data, data_loader.train_label, epochs=num_epochs, batch_size=batch_size)
- x :训练数据;
- y :目标数据(数据标签);
- epochs :将训练数据迭代多少遍;
- batch_size :批次的大小;
- validation_data :验证数据,可用于在训练过程中监控模型的性能。
- print(model.evaluate(data_loader.test_data, data_loader.test_label))
自定义层、损失函数和评估指标
自定义层
- 自定义层需要继承tf.keras.layers.Layer类,并重写__init__、build和call三个方法
class MyLayer(tf.keras.layers.Layer):
def __init__(self):
super().__init__()
# 初始化代码 def build(self, input_shape): # input_shape 是一个 TensorShape 类型对象,提供输入的形状
# 在第一次使用该层的时候调用该部分代码,在这里创建变量可以使得变量的形状自适应输入的形状
# 而不需要使用者额外指定变量形状。
# 如果已经可以完全确定变量的形状,也可以在__init__部分创建变量
self.variable_0 = self.add_weight(...)
self.variable_1 = self.add_weight(...) def call(self, inputs):
# 模型调用的代码(处理输入并返回输出)
return output
- 重写全连接层
class LinearLayer(tf.keras.layers.Layer):
def __init__(self, units):
super().__init__()
self.units = units def build(self, input_shape): # 这里 input_shape 是第一次运行call()时参数inputs的形状
self.w = self.add_variable(name='w',
shape=[input_shape[-1], self.units], initializer=tf.zeros_initializer())
self.b = self.add_variable(name='b',
shape=[self.units], initializer=tf.zeros_initializer()) def call(self, inputs):
y_pred = tf.matmul(inputs, self.w) + self.b
return y_pred
- LinearLayer
class LinearModel(tf.keras.Model):
def __init__(self):
super().__init__()
self.layer = LinearLayer(units=1) def call(self, inputs):
output = self.layer(inputs)
return output
自定义损失函数和评估指标
- 自定义损失函数需要继承 tf.keras.losses.Loss 类,重写 call 方法即可,输入真实值 y_true 和模型预测值 y_pred ,输出模型预测值和真实值之间通过自定义的损失函数计算出的损失值。
class MeanSquaredError(tf.keras.losses.Loss):
def call(self, y_true, y_pred):
return tf.reduce_mean(tf.square(y_pred - y_true))
- 自定义评估指标需要继承 tf.keras.metrics.Metric 类,并重写 init 、 update_state 和 result 三个方法。
class SparseCategoricalAccuracy(tf.keras.metrics.Metric):
def __init__(self):
super().__init__()
self.total = self.add_weight(name='total', dtype=tf.int32, initializer=tf.zeros_initializer())
self.count = self.add_weight(name='count', dtype=tf.int32, initializer=tf.zeros_initializer()) def update_state(self, y_true, y_pred, sample_weight=None):
values = tf.cast(tf.equal(y_true, tf.argmax(y_pred, axis=-1, output_type=tf.int32)), tf.int32)
self.total.assign_add(tf.shape(y_true)[0])
self.count.assign_add(tf.reduce_sum(values)) def result(self):
return self.count / self.total
Tensorflow常用模块
- 模型保存和加载:save() 和 restore()
- tf.keras.optimizer、tf.Variable、tf.keras.Layer或者tf.keras.Model实例都可以被保存。
- 保存模型:checkpoint = tf.train.Checkpoint(model=model)
- 保存模型和优化器:checkpoint = tf.train.Checkpoint(myAwesomeModel=model, myAwesomeOptimizer=optimizer)
- 加载模型
model_to_be_restored = MyModel() # 待恢复参数的同一模型
checkpoint = tf.train.Checkpoint(myAwesomeModel=model_to_be_restored) # 键名保持为“myAwesomeModel”
# tf.train.latest_checkpoint('./save') # 加载最新的模型
checkpoint.restore(save_path_with_prefix_and_index)
- 总结
# 训练
model = MyModel()
# 实例化Checkpoint,指定保存对象为model(如果需要保存Optimizer的参数也可加入)
checkpoint = tf.train.Checkpoint(myModel=model)
# ...(模型训练代码)
# 模型训练完毕后将参数保存到文件(也可以在模型训练过程中每隔一段时间就保存一次)
checkpoint.save('./save/model.ckpt')
# 模型使用:预测model = MyModel()
checkpoint = tf.train.Checkpoint(myModel=model) # 实例化Checkpoint,指定恢复对象为model
checkpoint.restore(tf.train.latest_checkpoint('./save')) # 从文件恢复模型参数
# 模型使用代码
示例
# -*- coding:utf-8 -*-
# /usr/bin/python
'''
-------------------------------------------------
File Name : MLP
Description : AIM:
Functions: 1. 使用 tf.keras.datasets 获得数据集并预处理
2. 使用 tf.keras.Model 和 tf.keras.layers 构建模型
3. 构建模型训练流程,使用 tf.keras.losses 计算损失函数,并使用 tf.keras.optimizer 优化模型
4. 构建模型评估流程,使用 tf.keras.metrics 计算评估指标
Envs : python ==
pip install tensorflow==2.1.0 -i https://pypi.douban.com/simple
Author : yanerrol
Date : 2020/4/30 16:40
CodeStyle : 规范,简洁,易懂,可阅读,可维护,可移植!
-------------------------------------------------
Change Activity:
2020/4/30 : text
-------------------------------------------------
'''
import tensorflow as tf
import numpy as npclass MNISTLoader():
def __init__(self,):
mnist = tf.keras.datasets.mnist
(self.train_data,self.train_label),(self.test_data,self.test_label) =mnist.load_data()
# MNIST中的图像默认为uint8(0-255的数字)。以下代码将其归一化到0-1之间的浮点数,并在最后增加一维作为颜色通道
self.train_data = np.expand_dims(self.train_data.astype(np.float32) / 255.0, axis=-1) # [60000, 28, 28, 1]
self.test_data = np.expand_dims(self.test_data.astype(np.float32) / 255.0, axis=-1) # [10000, 28, 28, 1]
self.train_label = self.train_label.astype(np.int32) # [60000]
self.test_label = self.test_label.astype(np.int32) # [10000]
self.num_train_data, self.num_test_data = self.train_data.shape[0], self.test_data.shape[0] def get_batch(self, batch_size):
# 从数据集中随机取出batch_size个元素并返回
index = np.random.randint(0, np.shape(self.train_data)[0], batch_size)
return self.train_data[index, :], self.train_label[index]
class MLP(tf.keras.Model):
def __init__(self,):
super().__init__()
self.flatten = tf.keras.layers.Flatten()
self.dense1 = tf.keras.layers.Dense(units=100,activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units=10) def call(self,inputs):
x = self.flatten(inputs)
x = self.dense1(x)
x = self.dense2(x)
output = tf.nn.softmax(x)
return output
# 训练模型
num_epochs = 5
batch_size = 50
learning_rate = 0.02model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
checkpoint = tf.train.Checkpoint(myAwesomeModel=model) # 实例化Checkpoint,设置保存对象为model
for batch_index in range(num_batches):
X, y = data_loader.get_batch(batch_size)
with tf.GradientTape() as tape:
y_pred = model(X)
loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
loss = tf.reduce_mean(loss)
print("batch %d: loss %f" % (batch_index, loss.numpy()))
grads = tape.gradient(loss, model.variables)
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables))
if batch_index % 100 == 0: # 每隔100个Batch保存一次
path = checkpoint.save('./checkpoint/model.ckpt') # 保存模型参数到文件
print("model saved to %s" % path)# 模型的评估sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
num_batches = int(data_loader.num_test_data // batch_size)
for batch_index in range(num_batches):
start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
y_pred = model.predict(data_loader.test_data[start_index: end_index])
sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
print("test accuracy: %f" % sparse_categorical_accuracy.result())
# 预测
model_to_be_restored = MLP()
# 实例化Checkpoint,设置恢复对象为新建立的模型model_to_be_restored
checkpoint = tf.train.Checkpoint(myAwesomeModel=model_to_be_restored)
checkpoint.restore(tf.train.latest_checkpoint('./checkpoint')) # 从文件恢复模型参数
y_pred = np.argmax(model_to_be_restored.predict(data_loader.test_data), axis=-1)
print("test accuracy: %f" % (sum(y_pred == data_loader.test_label) / data_loader.num_test_data))
管理checkpoint
- 管理Checkpiont
checkpoint = tf.train.Checkpoint(model=model)
manager = tf.train.CheckpointManager(checkpoint, directory='./save', checkpoint_name='model.ckpt', max_to_keep=k) #此处, directory 参数为文件保存的路径, checkpoint_name 为文件名前缀(不提供则默认为 ckpt ), max_to_keep 为保留的 Checkpoint 数目
TensorBoard:训练过程可视化
- 实时查看参数变化情况
- summary_writer = tf.summary.create_file_writer(‘./tensorboard’) # 参数为记录文件所保存的目录
summary_writer = tf.summary.create_file_writer('./tensorboard')
# 开始模型训练
for batch_index in range(num_batches):
# ...(训练代码,当前batch的损失值放入变量loss中)
with summary_writer.as_default(): # 希望使用的记录器
tf.summary.scalar("loss", loss, step=batch_index)
tf.summary.scalar("MyScalar", my_scalar, step=batch_index) # 还可以添加其他自定义的变量
- 查看 Graph 和 Profile 信息
tf.summary.trace_on(graph=True, profiler=True) # 开启Trace,可以记录图结构和profile信息
# 进行训练
with summary_writer.as_default():
tf.summary.trace_export(name="model_trace", step=0, profiler_outdir=log_dir) # 保存Trace信息到文件
示例
# -*- coding:utf-8 -*-
# /usr/bin/python
'''
-------------------------------------------------
File Name : MLP
Description : AIM:
Functions: 1. 使用 tf.keras.datasets 获得数据集并预处理
2. 使用 tf.keras.Model 和 tf.keras.layers 构建模型
3. 构建模型训练流程,使用 tf.keras.losses 计算损失函数,并使用 tf.keras.optimizer 优化模型
4. 构建模型评估流程,使用 tf.keras.metrics 计算评估指标
Envs : python ==
pip install tensorflow==2.1.0 -i https://pypi.douban.com/simple
Author : yanerrol
Date : 2020/4/30 16:40
CodeStyle : 规范,简洁,易懂,可阅读,可维护,可移植!
-------------------------------------------------
Change Activity:
2020/4/30 : text
-------------------------------------------------
'''
import tensorflow as tf
import numpy as npclass MNISTLoader():
def __init__(self,):
mnist = tf.keras.datasets.mnist
(self.train_data,self.train_label),(self.test_data,self.test_label) =mnist.load_data()
# MNIST中的图像默认为uint8(0-255的数字)。以下代码将其归一化到0-1之间的浮点数,并在最后增加一维作为颜色通道
self.train_data = np.expand_dims(self.train_data.astype(np.float32) / 255.0, axis=-1) # [60000, 28, 28, 1]
self.test_data = np.expand_dims(self.test_data.astype(np.float32) / 255.0, axis=-1) # [10000, 28, 28, 1]
self.train_label = self.train_label.astype(np.int32) # [60000]
self.test_label = self.test_label.astype(np.int32) # [10000]
self.num_train_data, self.num_test_data = self.train_data.shape[0], self.test_data.shape[0] def get_batch(self, batch_size):
# 从数据集中随机取出batch_size个元素并返回
index = np.random.randint(0, np.shape(self.train_data)[0], batch_size)
return self.train_data[index, :], self.train_label[index]
class MLP(tf.keras.Model):
def __init__(self,):
super().__init__()
self.flatten = tf.keras.layers.Flatten()
self.dense1 = tf.keras.layers.Dense(units=100,activation=tf.nn.relu)
self.dense2 = tf.keras.layers.Dense(units=10) def call(self,inputs):
x = self.flatten(inputs)
x = self.dense1(x)
x = self.dense2(x)
output = tf.nn.softmax(x)
return output
# 训练模型
num_epochs = 5
batch_size = 50
learning_rate = 0.02model = MLP()
data_loader = MNISTLoader()
optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)num_batches = int(data_loader.num_train_data // batch_size * num_epochs)
checkpoint = tf.train.Checkpoint(myAwesomeModel=model) # 实例化Checkpoint,设置保存对象为model
manager = tf.train.CheckpointManager(checkpoint, directory='./checkpoint', max_to_keep=3)
summary_writer = tf.summary.create_file_writer('./tensorboard')
tf.summary.trace_on(profiler=True) # 开启Trace(可选)
for batch_index in range(num_batches):
X, y = data_loader.get_batch(batch_size)
with tf.GradientTape() as tape:
y_pred = model(X)
loss = tf.keras.losses.sparse_categorical_crossentropy(y_true=y, y_pred=y_pred)
loss = tf.reduce_mean(loss)
print("batch %d: loss %f" % (batch_index, loss.numpy()))
grads = tape.gradient(loss, model.variables)
optimizer.apply_gradients(grads_and_vars=zip(grads, model.variables)) if batch_index % 100 == 0: # 每隔100个Batch保存一次
# path = checkpoint.save('./checkpoint/model.ckpt') # 保存模型参数到文件
path = manager.save(checkpoint_number=batch_index)
print("model saved to %s" % path) with summary_writer.as_default(): # 希望使用的记录器
tf.summary.scalar("loss", loss, step=batch_index)
with summary_writer.as_default():
tf.summary.trace_export(name="model_trace", step=0, profiler_outdir='./tensorboard') # 保存Trace信息到文件(可选)# 模型的评估sparse_categorical_accuracy = tf.keras.metrics.SparseCategoricalAccuracy()
num_batches = int(data_loader.num_test_data // batch_size)
for batch_index in range(num_batches):
start_index, end_index = batch_index * batch_size, (batch_index + 1) * batch_size
y_pred = model.predict(data_loader.test_data[start_index: end_index])
sparse_categorical_accuracy.update_state(y_true=data_loader.test_label[start_index: end_index], y_pred=y_pred)
print("test accuracy: %f" % sparse_categorical_accuracy.result())
# 预测
model_to_be_restored = MLP()
# 实例化Checkpoint,设置恢复对象为新建立的模型model_to_be_restored
checkpoint = tf.train.Checkpoint(myAwesomeModel=model_to_be_restored)
checkpoint.restore(tf.train.latest_checkpoint('./checkpoint')) # 从文件恢复模型参数
y_pred = np.argmax(model_to_be_restored.predict(data_loader.test_data), axis=-1)
print("test accuracy: %f" % (sum(y_pred == data_loader.test_label) / data_loader.num_test_data))
数据集的构建和预处理
- tf.data.Dataset类,提供了对数据集的高层封装。tf.data.Dataset由可迭代的访问元素组成。每个元素包含一个或多个张量。
- 使用于小数据集的方法:tf.data.Dataset.from_tensor_slices() ,构成一个大的张量。
数据集的预处理办法
- Dataset.map(f):对数据集的每个元素应用函数f,得到一个新的数据集
- Dataset.shuffle(buffer_size) :将数据集打乱
- Dataset.batch(batch_size) :将数据集分成批次,即对每 batch_size 个元素,使用 tf.stack() 在第 0 维合并,成为一个元素;
TFRecord :TensorFlow 数据集存储格式
- TFRecord 可以理解为一系列序列化的 tf.train.Example 元素所组成的列表文件,而每一个 tf.train.Example 又由若干个 tf.train.Feature 的字典组成。
# dataset.tfrecords
[
{ # example 1 (tf.train.Example)
'feature_1': tf.train.Feature,
...
'feature_k': tf.train.Feature
},
...
{ # example N (tf.train.Example)
'feature_1': tf.train.Feature,
...
'feature_k': tf.train.Feature
}
]
- 写TFRecord文件
with tf.io.TFRecordWriter(tfrecord_file) as writer:
for filename, label in zip(train_filenames, train_labels):
image = open(filename, 'rb').read() # 读取数据集图片到内存,image 为一个 Byte 类型的字符串
feature = { # 建立 tf.train.Feature 字典
'image': tf.train.Feature(bytes_list=tf.train.BytesList(value=[image])), # 图片是一个 Bytes 对象
'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[label])) # 标签是一个 Int 对象
}
example = tf.train.Example(features=tf.train.Features(feature=feature)) # 通过字典建立 Example
writer.write(example.SerializeToString()) # 将Example序列化并写入 TFRecord 文件
- 读TFRecord文件
raw_dataset = tf.data.TFRecordDataset(tfrecord_file) # 读取 TFRecord 文件feature_description = { # 定义Feature结构,告诉解码器每个Feature的类型是什么
'image': tf.io.FixedLenFeature([], tf.string),
'label': tf.io.FixedLenFeature([], tf.int64),
}def _parse_example(example_string): # 将 TFRecord 文件中的每一个序列化的 tf.train.Example 解码
feature_dict = tf.io.parse_single_example(example_string, feature_description)
feature_dict['image'] = tf.io.decode_jpeg(feature_dict['image']) # 解码JPEG图片
return feature_dict['image'], feature_dict['label']dataset = raw_dataset.map(_parse_example)
部署
模型导出
- SaveModel:与前面介绍的 Checkpoint 不同,SavedModel 包含了一个 TensorFlow 程序的完整信息: 不仅包含参数的权值,还包含计算的流程(即计算图) 。当模型导出为 SavedModel 文件时,无需建立模型的源代码即可再次运行模型,这使得 SavedModel 尤其适用于模型的分享和部署。后文的 TensorFlow Serving(服务器端部署模型)、TensorFlow Lite(移动端部署模型)以及 TensorFlow.js 都会用到这一格式。
tf.saved_model.save(model, "保存的目标文件夹名称")
model = tf.saved_model.load("保存的目标文件夹名称")
分布式计算
单机 MirroredStrategy
- tf.distribute.MirroredStrategy 是一种简单且高性能的,数据并行的同步式分布式策略,主要支持多个 GPU 在同一台主机上训练。
- strategy = tf.distribute.MirroredStrategy(devices=[“/gpu:0”, “/gpu:1”])
多机 MultiWorkerMirroredStrategy
num_workers = 2
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:20000", "localhost:20001"]
},
'task': {'type': 'worker', 'index': 0}
})
strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy()
batch_size = batch_size_per_replica * num_workers
部署Docker 环境
- Docker 是轻量级的容器环境,将程序放在虚拟的 “容器” 或者说 “保护层” 中运行,既避免了配置各种库、依赖和环境变量的麻烦,又克服了虚拟机资源占用多、启动慢的缺点。
- 拉取Tensorflow映像
$ docker image pull tensorflow/tensorflow:latest-py3 # 最新稳定版本TensorFlow(Python 3.5,CPU版)
$ docker image pull tensorflow/tensorflow:latest-gpu-py3 # 最新稳定版本TensorFlow(Python 3.5,GPU版)
- GPU
$ docker container run -it --runtime=nvidia tensorflow/tensorflow:latest-gpu-py3 bash