循环神经网络-动手学深度学习

动手学深度学习v2

课程链接:https://courses.d2l.ai/zh-v2/

序列模型

思路

  • 在时间 t 观察到$ x_t $,得到 T 个不独立的随机变量

  • 自回归模型:对见过的数据建模

  • 马尔可夫模型:只和过去$ \tau $个数据点有关

用 MLP 即可建模

  • 潜变量模型

引入潜变量$ h_t $表示过去的信息

这使得 x 和 h 只与两个变量相关,方便建模

代码

1
2
3
4
5
6
7
8
9
%matplotlib inline
import torch
from torch import nn
from d2l import torch as d2l

T = 1000  # 总共产生1000个点
time = torch.arange(1, T + 1, dtype = torch.float32)
x = torch.sin(0.01 * time) + torch.normal(0, 0.2, (T,)) # T后面加单号,使T被识别为元组
d2l.plot(time, [x], 'time', 'x', xlim=[1, 1000], figsize=(6, 3))
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
tau = 4
features = torch.zeros((T - tau, tau)) # (T - tau)行,每行tau个时间步
for i in range(tau): # 每次循环赋值1个时间步的数据
    features[:, i] = x[i: T - tau + i]
labels = x[tau:].reshape((-1, 1)) # reshape()将行数列数变为1

batch_size, n_train = 16, 600
# 只有前n_train个样本用于训练
train_iter = d2l.load_array((features[:n_train], labels[:n_train]),
                            batch_size, is_train=True)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
# 初始化网络权重的函数
def init_weights(m):
    if type(m) == nn.Linear:
        nn.init.xavier_uniform_(m.weight)

# 一个简单的多层感知机
def get_net():
    net = nn.Sequential(nn.Linear(4, 10),
                        nn.ReLU(),
                        nn.Linear(10, 1))
    net.apply(init_weights)
    return net

# 平方损失。注意:MSELoss计算平方误差时不带系数1/2
loss = nn.MSELoss(reduction='none')
# epoch 1, loss: 0.078787
# epoch 2, loss: 0.054550
# epoch 3, loss: 0.053713
# epoch 4, loss: 0.052182
# epoch 5, loss: 0.055492

单步预测效果较好

1
2
3
4
5
onestep_preds = net(features)
d2l.plot([time, time[tau:]],
         [x.detach().numpy(), onestep_preds.detach().numpy()], 'time',
         'x', legend=['data', '1-step preds'], xlim=[1, 1000],
         figsize=(6, 3))

604 之后如果使用自己的预测数据来继续预测,由于错误的累积,效果不好(绿线)

已知 4 个点,分别预测未来 1、4、16、64 个点

文本预处理

读取数据集

1
2
3
import collections
import re
from d2l import torch as d2l
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
d2l.DATA_HUB['time_machine'] = (d2l.DATA_URL + 'timemachine.txt',
                                '090b5e7e70c295757f55df93cb0a180b9691891a')

def read_time_machine():  #@save
    """将时间机器数据集加载到文本行的列表中"""
    with open(d2l.download('time_machine'), 'r') as f:
        lines = f.readlines()
    # re.sub('[^A-Za-z]+', ' ', line): 正则表达式,将line中非字母字符替换成空格
    # .strip(): 去掉字符串两端的空白字符
    # .lower(): 转换成小写
    return [re.sub('[^A-Za-z]+', ' ', line).strip().lower() for line in lines]

lines = read_time_machine()
print(f'# 文本总行数: {len(lines)}')
print(lines[0])
print(lines[10])

词元化

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def tokenize(lines, token='word'):  #@save
    """将文本行拆分为单词或字符词元"""
    if token == 'word':
        return [line.split() for line in lines]
    elif token == 'char':
        return [list(line) for line in lines]
    else:
        print('错误:未知词元类型:' + token)

tokens = tokenize(lines)
print(tokens[0])
# ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']

词表

构建一个字典,将每个 token 映射到数字索引上

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Vocab:  #@save
    """文本词表"""
    # min_freq: 出现频率超过该值则加入字典
    def __init__(self, tokens=None, min_freq=0, reserved_tokens=None):
        if tokens is None:
            tokens = []
        if reserved_tokens is None:
            reserved_tokens = []
        # 按出现频率排序
        counter = count_corpus(tokens)
        # key=lambda x: x[1] 每个(key, value)元组的第二个元素
        self._token_freqs = sorted(counter.items(), key=lambda x: x[1], 
                                   reverse=True)
        # 未知词元的索引为0
        self.idx_to_token = ['<unk>'] + reserved_tokens
        self.token_to_idx = {token: idx
                             for idx, token in enumerate(self.idx_to_token)}
        # 将每个token加入字典
        for token, freq in self._token_freqs:
            if freq < min_freq:
                break
            if token not in self.token_to_idx:
                self.idx_to_token.append(token)
                self.token_to_idx[token] = len(self.idx_to_token) - 1

    def __len__(self):
        return len(self.idx_to_token)

    def __getitem__(self, tokens):
        # 判断是否是列表或元组
        if not isinstance(tokens, (list, tuple)):
            return self.token_to_idx.get(tokens, self.unk)
        return [self.__getitem__(token) for token in tokens]

    def to_tokens(self, indices):
        if not isinstance(indices, (list, tuple)):
            return self.idx_to_token[indices]
        return [self.idx_to_token[index] for index in indices]

    @property
    def unk(self):  # 未知词元的索引为0
        return 0

    @property
    def token_freqs(self):
        return self._token_freqs

def count_corpus(tokens):  #@save
    """统计词元的频率"""
    # 这里的tokens是1D列表或2D列表
    if len(tokens) == 0 or isinstance(tokens[0], list):
        # 将词元列表展平成一个列表
        tokens = [token for line in tokens for token in line]
    return collections.Counter(tokens)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
# 打印验证最高频词
vocab = Vocab(tokens)
print(list(vocab.token_to_idx.items())[:10])
# [('<unk>', 0), ('the', 1), ('i', 2), ('and', 3), ('of', 4), ('a', 5), ('to', 6), ('was', 7), ('in', 8), ('that', 9)]

for i in [0, 10]:
    print('文本:', tokens[i])
    print('索引:', vocab[tokens[i]])
# 文本: ['the', 'time', 'machine', 'by', 'h', 'g', 'wells']
# 索引: [1, 19, 50, 40, 2183, 2184, 400]
# 文本: ['twinkled', 'and', 'his', 'usually', 'pale', 'face', 'was', 'flushed', 'and', 'animated', 'the']
# 索引: [2186, 3, 25, 1044, 362, 113, 7, 1421, 3, 1045, 1]

功能整合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
def load_corpus_time_machine(max_tokens=-1):  #@save
    """返回时光机器数据集的词元索引列表和词表"""
    lines = read_time_machine()
    tokens = tokenize(lines, 'char')
    vocab = Vocab(tokens)
    # 因为时光机器数据集中的每个文本行不一定是一个句子或一个段落,
    # 所以将所有文本行展平到一个列表中
    corpus = [vocab[token] for line in tokens for token in line]
    if max_tokens > 0:
        corpus = corpus[:max_tokens]
    return corpus, vocab

corpus, vocab = load_corpus_time_machine()
# vocab为记录字符的字典,corpus为字符对应索引组成的对应文本向量
len(corpus), len(vocab)
# (170580, 28)

语言模型和数据集

语言模型

给定文本序列$ x_1, …x_T $,语言模型目标是估计联合概率$ p(x_1, …, x_T) $

当序列很长时,可使用马尔可夫假设

三元语法:

自然语言统计

1
2
3
import random
import torch
from d2l import torch as d2l
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
tokens = d2l.tokenize(d2l.read_time_machine())
# 因为每个文本行不一定是一个句子或一个段落,因此我们把所有文本行拼接到一起
corpus = [token for line in tokens for token in line]
vocab = d2l.Vocab(corpus)
vocab.token_freqs[:10]
# [('the', 2261),
#  ('i', 1267),
#  ('and', 1245),
#  ('of', 1155),
#  ('a', 816),
#  ('to', 695),
#  ('was', 552),
#  ('in', 541),
#  ('that', 443),
#  ('my', 440)]

最流行的词通常没有什么意义,被称为停用词

1
2
3
4
# 画出词频图
freqs = [freq for token, freq in vocab.token_freqs]
d2l.plot(freqs, xlabel='token: x', ylabel='frequency: n(x)',
         xscale='log', yscale='log')

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
# 将两个corpus错位后排成组,查看二元语法中最高频的词
bigram_tokens = [pair for pair in zip(corpus[:-1], corpus[1:])]
bigram_vocab = d2l.Vocab(bigram_tokens)
bigram_vocab.token_freqs[:10]
# [(('of', 'the'), 309),
#  (('in', 'the'), 169),
#  (('i', 'had'), 130),
#  (('i', 'was'), 112),
#  (('and', 'the'), 109),
#  (('the', 'time'), 102),
#  (('it', 'was'), 99),
#  (('to', 'the'), 85),
#  (('as', 'i'), 78),
#  (('of', 'a'), 73)]
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
# 同样方法统计三元语法的词频
trigram_tokens = [triple for triple in zip(
    corpus[:-2], corpus[1:-1], corpus[2:])]
trigram_vocab = d2l.Vocab(trigram_tokens)
trigram_vocab.token_freqs[:10]
# [(('the', 'time', 'traveller'), 59),
#  (('the', 'time', 'machine'), 30),
#  (('the', 'medical', 'man'), 24),
#  (('it', 'seemed', 'to'), 16),
#  (('it', 'was', 'a'), 15),
#  (('here', 'and', 'there'), 15),
#  (('seemed', 'to', 'me'), 14),
#  (('i', 'did', 'not'), 14),
#  (('i', 'saw', 'the'), 13),
#  (('i', 'began', 'to'), 13)]
1
2
3
4
5
6
# 画出曲线
bigram_freqs = [freq for token, freq in bigram_vocab.token_freqs]
trigram_freqs = [freq for token, freq in trigram_vocab.token_freqs]
d2l.plot([freqs, bigram_freqs, trigram_freqs], xlabel='token: x',
         ylabel='frequency: n(x)', xscale='log', yscale='log',
         legend=['unigram', 'bigram', 'trigram'])

读取长序列数据

随机采样

在 0 到 T-1 之间随机选取起始位置,每 T 个 token 分成一个 batch

这样可以减小切分处序列的影响

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def seq_data_iter_random(corpus, batch_size, num_steps):  #@save
    """使用随机抽样生成一个小批量子序列"""
    # 从随机偏移量开始对序列进行分区,随机范围包括num_steps-1
    corpus = corpus[random.randint(0, num_steps - 1):]
    # 减去1,保证最后一个序列有label
    num_subseqs = (len(corpus) - 1) // num_steps
    # 长度为num_steps的子序列的起始索引
    initial_indices = list(range(0, num_subseqs * num_steps, num_steps))
    # 在随机抽样的迭代过程中,
    # 来自两个相邻的、随机的、小批量中的子序列不一定在原始序列上相邻
    random.shuffle(initial_indices)

    def data(pos):
        # 返回从pos位置开始的长度为num_steps的序列
        return corpus[pos: pos + num_steps]

    num_batches = num_subseqs // batch_size
    for i in range(0, batch_size * num_batches, batch_size):
        # 在这里,initial_indices包含子序列的随机起始索引
        initial_indices_per_batch = initial_indices[i: i + batch_size]
        X = [data(j) for j in initial_indices_per_batch]
        Y = [data(j + 1) for j in initial_indices_per_batch]
        yield torch.tensor(X), torch.tensor(Y)

顺序分区

保证两个相邻的小批量中的子序列在原始序列上也是相邻的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
def seq_data_iter_sequential(corpus, batch_size, num_steps):  #@save
    """使用顺序分区生成一个小批量子序列"""
    # 从随机偏移量开始划分序列
    offset = random.randint(0, num_steps)
    num_tokens = ((len(corpus) - offset - 1) // batch_size) * batch_size
    Xs = torch.tensor(corpus[offset: offset + num_tokens])
    Ys = torch.tensor(corpus[offset + 1: offset + 1 + num_tokens])
    Xs, Ys = Xs.reshape(batch_size, -1), Ys.reshape(batch_size, -1)
    num_batches = Xs.shape[1] // num_steps
    for i in range(0, num_steps * num_batches, num_steps):
        X = Xs[:, i: i + num_steps]
        Y = Ys[:, i: i + num_steps]
        yield X, Y
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
for X, Y in seq_data_iter_sequential(my_seq, batch_size=2, num_steps=5):
    print('X: ', X, '\nY:', Y)
# X:  tensor([[ 0,  1,  2,  3,  4],
#         [17, 18, 19, 20, 21]]) 
# Y: tensor([[ 1,  2,  3,  4,  5],
#         [18, 19, 20, 21, 22]])
# X:  tensor([[ 5,  6,  7,  8,  9],
#         [22, 23, 24, 25, 26]]) 
# Y: tensor([[ 6,  7,  8,  9, 10],
#         [23, 24, 25, 26, 27]])
# X:  tensor([[10, 11, 12, 13, 14],
#         [27, 28, 29, 30, 31]]) 
# Y: tensor([[11, 12, 13, 14, 15],
#         [28, 29, 30, 31, 32]])

包装到类:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
class SeqDataLoader:  #@save
    """加载序列数据的迭代器"""
    def __init__(self, batch_size, num_steps, use_random_iter, max_tokens):
        if use_random_iter:
            self.data_iter_fn = d2l.seq_data_iter_random
        else:
            self.data_iter_fn = d2l.seq_data_iter_sequential
        self.corpus, self.vocab = d2l.load_corpus_time_machine(max_tokens)
        self.batch_size, self.num_steps = batch_size, num_steps

    def __iter__(self):
        return self.data_iter_fn(self.corpus, self.batch_size, self.num_steps)
1
2
3
4
5
6
def load_data_time_machine(batch_size, num_steps,  #@save
                           use_random_iter=False, max_tokens=10000):
    """返回时光机器数据集的迭代器和词表"""
    data_iter = SeqDataLoader(
        batch_size, num_steps, use_random_iter, max_tokens)
    return data_iter, data_iter.vocab

循环神经网络 RNN

相比于 MLP 加入了隐变量

困惑度 Perplexity

衡量语言模型好坏时可以用平均交叉熵,平均交叉熵时一个序列中所有的 n 个词元的交叉熵损失的平均值

NLP 中使用困惑度来衡量:$ exp(\pi) $

1 表示完美,最坏情况是无穷大

梯度裁剪

T 时间步上的梯度在反向传播时会产生长度为 O(T) 的矩阵乘法链,导致数值不稳定

梯度裁剪来防止梯度爆炸

梯度长度超过$ \theta $时,拖影回长度$ \theta $

确保了梯度不会大于 1

RNN 应用

从零实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
%matplotlib inline
import math
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

# 定义批量大小和步长
batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

独热编码

通过独热编码可以将词元的数字索引转换成特征向量

1
2
3
4
5
6
# 0 和 2 的独热变量
F.one_hot(torch.tensor([0, 2]), len(vocab))
# tensor([[1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#          0, 0, 0, 0],
#         [0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
#          0, 0, 0, 0]])
  • 采样的小批量数据形状:(批量大小,时间步数)
  • 转置输入的维度,以便获得形状为(时间步数,批量大小,词表大小)的输出
  • 之所以这样做,是为了方便通过外层的维度,一步一步更新小批量数据的隐状态
    • 这样一来 X[i, :, :] 代表 T(i)的状态
1
2
3
X = torch.arange(10).reshape((2, 5))
F.one_hot(X.T, 28).shape
# torch.Size([5, 2, 28])

初始化模型参数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def get_params(vocab_size, num_hiddens, device):
    # 输入和输出都来自词表,因此和词表大小相同
    num_inputs = num_outputs = vocab_size

    # 之后频繁用到的初始化函数
    def normal(shape):
        return torch.randn(size=shape, device=device) * 0.01

    # 隐藏层参数
    # 输入变量x到隐藏层变量的矩阵
    W_xh = normal((num_inputs, num_hiddens))
    # 前一个隐藏变量到下一个隐藏变量
    W_hh = normal((num_hiddens, num_hiddens))
    b_h = torch.zeros(num_hiddens, device=device)
    # 输出层参数
    W_hq = normal((num_hiddens, num_outputs))
    b_q = torch.zeros(num_outputs, device=device)
    # 附加梯度
    params = [W_xh, W_hh, b_h, W_hq, b_q]
    for param in params:
        param.requires_grad_(True)
    return params

循环神经网络模型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 初始化隐藏状态
def init_rnn_state(batch_size, num_hiddens, device):
    # 返回一个元组,后续隐状态可能包含多个变量
    return (torch.zeros((batch_size, num_hiddens), device=device), )

# 定义一个时间步内计算隐状态和输出
def rnn(inputs, state, params):
    # inputs的形状:(时间步数量,批量大小,词表大小)
    W_xh, W_hh, b_h, W_hq, b_q = params
    H, = state
    outputs = []
    # 之前取了转置,这里可以直接迭代,每次取出一个时间步的 X
    # X的形状:(批量大小,词表大小)
    for X in inputs:
        # 选用tanh作为激活函数
        H = torch.tanh(torch.mm(X, W_xh) + torch.mm(H, W_hh) + b_h)
        Y = torch.mm(H, W_hq) + b_q
        outputs.append(Y)
    # 输出沿时间步维度拼接
    return torch.cat(outputs, dim=0), (H,)

class RNNModelScratch: #@save
    """从零开始实现的循环神经网络模型"""
    def __init__(self, vocab_size, num_hiddens, device,
                 get_params, init_state, forward_fn):
        self.vocab_size, self.num_hiddens = vocab_size, num_hiddens
        self.params = get_params(vocab_size, num_hiddens, device)
        self.init_state, self.forward_fn = init_state, forward_fn

    def __call__(self, X, state):
        X = F.one_hot(X.T, self.vocab_size).type(torch.float32)
        return self.forward_fn(X, state, self.params)

    def begin_state(self, batch_size, device):
        return self.init_state(batch_size, self.num_hiddens, device)

# 检验输出形状
num_hiddens = 512
net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,
                      init_rnn_state, rnn)
state = net.begin_state(X.shape[0], d2l.try_gpu())
Y, new_state = net(X.to(d2l.try_gpu()), state)
Y.shape, len(new_state), new_state[0].shape
# Y.shape = (批量大小*时间步数, 对下一步的预测向量)
# (torch.Size([10, 28]), 1, torch.Size([2, 512]))

预测

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def predict_ch8(prefix, num_preds, net, vocab, device):  #@save
    """在prefix后面生成新字符"""
    # 生成初始的隐藏状态
    state = net.begin_state(batch_size=1, device=device)
    outputs = [vocab[prefix[0]]]
    # 取出output中最后一个词作为input
    get_input = lambda: torch.tensor([outputs[-1]], device=device).reshape((1, 1))
    for y in prefix[1:]:  # 预热期:将prefix存入output并更新state
        _, state = net(get_input(), state)
        outputs.append(vocab[y]) # 直接把真实值放入output
    # 接下来开始对已有数据的后续数据进行预测
    for _ in range(num_preds):  # 预测num_preds步
        y, state = net(get_input(), state)
        # 选取分类后最大值的坐标,这个坐标便是独热编码对应值
        outputs.append(int(y.argmax(dim=1).reshape(1)))
    # 将output中编码在vocab中查找并返回
    return ''.join([vocab.idx_to_token[i] for i in outputs])

梯度裁剪

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
def grad_clipping(net, theta):  #@save
    """裁剪梯度"""
    # 将所有层的参数都取出
    if isinstance(net, nn.Module):
        params = [p for p in net.parameters() if p.requires_grad]
    else:
        params = net.params
    # 求所有层参数组成的向量的二阶范数
    norm = torch.sqrt(sum(torch.sum((p.grad ** 2)) for p in params))
    if norm > theta:
        for param in params:
            param.grad[:] *= theta / norm 

训练

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def train_epoch_ch8(net, train_iter, loss, updater, device, use_random_iter):
    # use_random_iter: 采用不同的采样方法(随机、顺序分区)将会导致隐状态初始化差异
    """训练网络一个迭代周期(定义见第8章)"""
    state, timer = None, d2l.Timer()
    metric = d2l.Accumulator(2)  # 训练损失之和,词元数量
    for X, Y in train_iter:
        # 如果随机采样,在每个batch开始前都会重新初始化state为0
        if state is None or use_random_iter:
            # 在第一次迭代或使用随机抽样时初始化state
            state = net.begin_state(batch_size=X.shape[0], device=device)
        else:
            if isinstance(net, nn.Module) and not isinstance(state, tuple):
                # state对于nn.GRU是个张量
                # 由于隐状态计算依赖于先前批量数据,反复累计会使梯度计算变得复杂 
                # 为了降低计算量,在每轮训练开始前,先把隐状态先前带有的梯度分离 
                # 只专注于该轮的梯度计算
                state.detach_()
            else:
                # state对于nn.LSTM或对于我们从零开始实现的模型是个张量
                for s in state:
                    s.detach_()
        y = Y.T.reshape(-1)
        X, y = X.to(device), y.to(device)
        y_hat, state = net(X, state)
        # 对loss来说y就是(批量大小*时间步数)大小的样本,因此直接拉成一条向量
        # 在rnn()中,y_hat也被沿dim=0时间步维度拉成了一条向量
        l = loss(y_hat, y.long()).mean()
        if isinstance(updater, torch.optim.Optimizer):
            # 如果用的是torch.optim.Optimizer的updater,需要梯度置零,因为默认累加
            updater.zero_grad()
            l.backward()
            grad_clipping(net, 1) # 梯度剪裁
            updater.step()
        else:
            l.backward()
            grad_clipping(net, 1)
            # 因为已经调用了mean函数
            updater(batch_size=1)
        metric.add(l * y.numel(), y.numel())
        # metric通常包含两个元素:元素存储总损失、元素储存样本数量,用来在训练结束时计算平均损失
    # 返回困惑度、每秒处理样本数
    return math.exp(metric[0] / metric[1]), metric[1] / timer.stop()
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
              use_random_iter=False):
    """训练模型(定义见第8章)"""
    loss = nn.CrossEntropyLoss()
    animator = d2l.Animator(xlabel='epoch', ylabel='perplexity',
                            legend=['train'], xlim=[10, num_epochs])
    # 初始化
    if isinstance(net, nn.Module):
        updater = torch.optim.SGD(net.parameters(), lr)
    else:
        updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
    predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
    # 训练和预测
    for epoch in range(num_epochs):
        ppl, speed = train_epoch_ch8(
            net, train_iter, loss, updater, device, use_random_iter)
        if (epoch + 1) % 10 == 0:
            print(predict('time traveller'))
            animator.add(epoch + 1, [ppl])
    print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
    print(predict('time traveller'))
    print(predict('traveller'))
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#@save
def train_ch8(net, train_iter, vocab, lr, num_epochs, device,
              use_random_iter=False):
    """训练模型(定义见第8章)"""
    loss = nn.CrossEntropyLoss()
    animator = d2l.Animator(xlabel='epoch', ylabel='perplexity',
                            legend=['train'], xlim=[10, num_epochs])
    # 初始化
    if isinstance(net, nn.Module):
        updater = torch.optim.SGD(net.parameters(), lr)
    else:
        updater = lambda batch_size: d2l.sgd(net.params, lr, batch_size)
    predict = lambda prefix: predict_ch8(prefix, 50, net, vocab, device)
    # 训练和预测
    for epoch in range(num_epochs):
        ppl, speed = train_epoch_ch8(
            net, train_iter, loss, updater, device, use_random_iter)
        if (epoch + 1) % 10 == 0:
            print(predict('time traveller'))
            animator.add(epoch + 1, [ppl])
    print(f'困惑度 {ppl:.1f}, {speed:.1f} 词元/秒 {str(device)}')
    print(predict('time traveller'))
    print(predict('traveller'))

结果

1
2
num_epochs, lr = 500, 1
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu())

1
2
3
4
5
# 随机抽样
net = RNNModelScratch(len(vocab), num_hiddens, d2l.try_gpu(), get_params,
                      init_rnn_state, rnn)
train_ch8(net, train_iter, vocab, lr, num_epochs, d2l.try_gpu(),
          use_random_iter=True)

简洁实现

1
2
3
4
5
6
7
import torch
from torch import nn
from torch.nn import functional as F
from d2l import torch as d2l

batch_size, num_steps = 32, 35
train_iter, vocab = d2l.load_data_time_machine(batch_size, num_steps)

定义模型

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
num_hiddens = 256
rnn_layer = nn.RNN(len(vocab), num_hiddens)
# 只包含隐藏的循环层,需要手动创建一个单独的输出层

# 初始化隐状态
state = torch.zeros((1, batch_size, num_hiddens))
state.shape
# torch.Size([1, 32, 256])

X = torch.rand(size=(num_steps, batch_size, len(vocab)))
# 这里的 Y 是最后一个隐藏层而不是输出,因此维度是256而不是len(vocab)
Y, state_new = rnn_layer(X, state)
Y.shape, state_new.shape
(torch.Size([35, 32, 256]), torch.Size([1, 32, 256]))
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class RNNModel(nn.Module):
    """循环神经网络模型"""
    def __init__(self, rnn_layer, vocab_size, **kwargs):
        super(RNNModel, self).__init__(**kwargs)
        self.rnn = rnn_layer
        self.vocab_size = vocab_size
        self.num_hiddens = self.rnn.hidden_size
        # 如果RNN是双向的(之后将介绍),num_directions应该是2,否则应该是1
        # rnn_layer中只包含隐藏层,这里需要加上输出层
        if not self.rnn.bidirectional:
            self.num_directions = 1
            self.linear = nn.Linear(self.num_hiddens, self.vocab_size)
        else:
            self.num_directions = 2
            self.linear = nn.Linear(self.num_hiddens * 2, self.vocab_size)

    def forward(self, inputs, state):
        # 先转换成长整型再进行独热编码
        X = F.one_hot(inputs.T.long(), self.vocab_size)
        # 再转换回float32
        X = X.to(torch.float32)
        Y, state = self.rnn(X, state)
        # 全连接层首先将Y的形状由(时间步数,批量大小,隐藏单元数)改为(时间步数*批量大小,隐藏单元数)
        # 它的输出形状是(时间步数*批量大小,词表大小)
        # 第一个-1代表该维度大小由pytorch自动计算
        output = self.linear(Y.reshape((-1, Y.shape[-1]))) 
        return output, state

    def begin_state(self, device, batch_size=1):
        if not isinstance(self.rnn, nn.LSTM):
            # nn.GRU以张量作为隐状态
            return  torch.zeros((self.num_directions * self.rnn.num_layers,
                                 batch_size, self.num_hiddens),
                                device=device)
        else:
            # nn.LSTM以元组作为隐状态
            return (torch.zeros((
                self.num_directions * self.rnn.num_layers,
                batch_size, self.num_hiddens), device=device),
                    torch.zeros((
                        self.num_directions * self.rnn.num_layers,
                        batch_size, self.num_hiddens), device=device))