python
tf.keras.backend.clear_session()
model = models.Sequential()
model.add(layers.Embedding(MAX_WORDS,7,input_length=MAX_LEN))
model.add(layers.Conv1D(filters = 64,kernel_size = 5,activation = "relu"))
model.add(layers.MaxPool1D(2))
model.add(layers.Conv1D(filters = 32,kernel_size = 3,activation = "relu"))
model.add(layers.MaxPool1D(2))
model.add(layers.Flatten())
model.add(layers.Dense(1,activation = "sigmoid"))
model.compile(optimizer='Nadam',
loss='binary_crossentropy',
metrics=['accuracy',"AUC"])
model.summary()
python
tf.keras.backend.clear_session()
inputs = layers.Input(shape=[MAX_LEN])
x = layers.Embedding(MAX_WORDS,7)(inputs)
branch1 = layers.SeparableConv1D(64,3,activation="relu")(x)
branch1 = layers.MaxPool1D(3)(branch1)
branch1 = layers.SeparableConv1D(32,3,activation="relu")(branch1)
branch1 = layers.GlobalMaxPool1D()(branch1)
branch2 = layers.SeparableConv1D(64,5,activation="relu")(x)
branch2 = layers.MaxPool1D(5)(branch2)
branch2 = layers.SeparableConv1D(32,5,activation="relu")(branch2)
branch2 = layers.GlobalMaxPool1D()(branch2)
branch3 = layers.SeparableConv1D(64,7,activation="relu")(x)
branch3 = layers.MaxPool1D(7)(branch3)
branch3 = layers.SeparableConv1D(32,7,activation="relu")(branch3)
branch3 = layers.GlobalMaxPool1D()(branch3)
concat = layers.Concatenate()([branch1,branch2,branch3])
outputs = layers.Dense(1,activation = "sigmoid")(concat)
model = models.Model(inputs = inputs,outputs = outputs)
model.compile(optimizer='Nadam',
loss='binary_crossentropy',
metrics=['accuracy',"AUC"])
model.summary()
python
# 先自定义一个残差模块,为自定义Layer
class ResBlock(layers.Layer):
def __init__(self, kernel_size, **kwargs):
super(ResBlock, self).__init__(**kwargs)
self.kernel_size = kernel_size
def build(self,input_shape):
self.conv1 = layers.Conv1D(filters=64,kernel_size=self.kernel_size,
activation = "relu",padding="same")
self.conv2 = layers.Conv1D(filters=32,kernel_size=self.kernel_size,
activation = "relu",padding="same")
self.conv3 = layers.Conv1D(filters=input_shape[-1],
kernel_size=self.kernel_size,activation = "relu",padding="same")
self.maxpool = layers.MaxPool1D(2)
super(ResBlock,self).build(input_shape) # 相当于设置self.built = True
def call(self, inputs):
x = self.conv1(inputs)
x = self.conv2(x)
x = self.conv3(x)
x = layers.Add()([inputs,x])
x = self.maxpool(x)
return x
#如果要让自定义的Layer通过Functional API 组合成模型时可以序列化,需要自定义get_config方法。
def get_config(self):
config = super(ResBlock, self).get_config()
config.update({'kernel_size': self.kernel_size})
return config
# 测试ResBlock
resblock = ResBlock(kernel_size = 3)
resblock.build(input_shape = (None,200,7))
resblock.compute_output_shape(input_shape=(None,200,7))
python
# 自定义模型,实际上也可以使用Sequential或者FunctionalAPI
class ImdbModel(models.Model):
def __init__(self):
super(ImdbModel, self).__init__()
def build(self,input_shape):
self.embedding = layers.Embedding(MAX_WORDS,7)
self.block1 = ResBlock(7)
self.block2 = ResBlock(5)
self.dense = layers.Dense(1,activation = "sigmoid")
super(ImdbModel,self).build(input_shape)
def call(self, x):
x = self.embedding(x)
x = self.block1(x)
x = self.block2(x)
x = layers.Flatten()(x)
x = self.dense(x)
return(x)
python
# 测试
tf.keras.backend.clear_session()
model = ImdbModel()
model.build(input_shape =(None,200))
model.summary()
model.compile(optimizer='Nadam',
loss='binary_crossentropy',
metrics=['accuracy',"AUC"])
模型的训练主要有内置fit方法、内置tran_on_batch方法、自定义训练循环。
python
tf.keras.backend.clear_session()
def create_model():
model = models.Sequential()
model.add(layers.Embedding(MAX_WORDS,7,input_length=MAX_LEN))
model.add(layers.Conv1D(filters = 64,kernel_size = 5,activation = "relu"))
model.add(layers.MaxPool1D(2))
model.add(layers.Conv1D(filters = 32,kernel_size = 3,activation = "relu"))
model.add(layers.MaxPool1D(2))
model.add(layers.Flatten())
model.add(layers.Dense(CAT_NUM,activation = "softmax"))
return(model)
def compile_model(model):
model.compile(optimizer=optimizers.Nadam(),
loss=losses.SparseCategoricalCrossentropy(),
metrics=[metrics.SparseCategoricalAccuracy(),metrics.SparseTopKCategoricalAccuracy(5)])
return(model)
model = create_model()
model.summary()
model = compile_model(model)
# 构建模型
history = model.fit(ds_train,validation_data = ds_test,epochs = 10)
python
def train_model(model,ds_train,ds_valid,epoches):
for epoch in tf.range(1,epoches+1):
model.reset_metrics()
# 在后期降低学习率
if epoch == 5:
model.optimizer.lr.assign(model.optimizer.lr/2.0)
tf.print("Lowering optimizer Learning Rate...\n\n")
for x, y in ds_train:
train_result = model.train_on_batch(x, y)
for x, y in ds_valid:
valid_result = model.test_on_batch(x, y,reset_metrics=False)
if epoch%1 ==0:
printbar()
tf.print("epoch = ",epoch)
print("train:",dict(zip(model.metrics_names,train_result)))
print("valid:",dict(zip(model.metrics_names,valid_result)))
print("")
# 构建模型
train_model(model,ds_train,ds_test,10)
自定义训练循环无需编译模型,直接利用优化器根据损失函数反向传播迭代参数,拥有最高的灵活性。
python
optimizer = optimizers.Nadam()
loss_func = losses.SparseCategoricalCrossentropy()
train_loss = metrics.Mean(name='train_loss')
train_metric = metrics.SparseCategoricalAccuracy(name='train_accuracy')
valid_loss = metrics.Mean(name='valid_loss')
valid_metric = metrics.SparseCategoricalAccuracy(name='valid_accuracy')
@tf.function
def train_step(model, features, labels):
with tf.GradientTape() as tape:
predictions = model(features,training = True)
loss = loss_func(labels, predictions)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_loss.update_state(loss)
train_metric.update_state(labels, predictions)
@tf.function
def valid_step(model, features, labels):
predictions = model(features)
batch_loss = loss_func(labels, predictions)
valid_loss.update_state(batch_loss)
valid_metric.update_state(labels, predictions)
def train_model(model,ds_train,ds_valid,epochs):
for epoch in tf.range(1,epochs+1):
for features, labels in ds_train:
train_step(model,features,labels)
for features, labels in ds_valid:
valid_step(model,features,labels)
logs = 'Epoch={},Loss:{},Accuracy:{},Valid Loss:{},Valid Accuracy:{}'
if epoch%1 ==0:
printbar()
tf.print(tf.strings.format(logs,
(epoch,train_loss.result(),train_metric.result(),valid_loss.result(),valid_metric.result())))
tf.print("")
train_loss.reset_states()
valid_loss.reset_states()
train_metric.reset_states()
valid_metric.reset_states()
# 构建模型
train_model(model,ds_train,ds_test,10)