目录

一、任务介绍

1.任务要求

2.信息内容

3.待思考问题

二、问题解决

1.将评论内容转换成语料库

2.获取每条评论的词向量、标签和长度

3.数据打包

4.建立LSTM循环神经网络模型

1.主程序代码

2.模型代码

5.建立训练集函数和测试集函数


一、任务介绍

1.任务要求

  • 项目任务:对微博评论信息的情感分析,建立模型,自动识别评论信息的情绪状态。

 

2.信息内容

  • 第一行是标头
  • 每一行顶格标着每个评论代表的情绪
  • {0: '喜悦', 1: '愤怒', 2: '厌恶', 3: '低落'}

 

3.待思考问题

  • 思考:向模型中传递数据时,需要提前处理好数据

1、目标:将评论内容转换为词向量。

2、每个词/字转换为词向量长度(维度)200

3、每一次传入的词/字的个数是否就是评论的长度?     

        应该是固定长度,每次传入数据与图像相似。     例如选择长度为32。则传入的数据为32*200

4、一条评论如果超过80个词/字怎么处理?   

        直接删除后面的内容

5、一条评论如果没有70个词/字怎么处理?   

        缺少的内容,统一使用一个数字(非词/字的数字)替代。

6、如果语料库中的词/字太多是否可以压缩?     

        可以,某些词/字出现的频率比较低,可能训练不出特征。因此可以选择频率比较高的词来训练。例如选择4760个。

7、被压缩的词/字如何处理?     

        可以统一使用一个数字(非词/字的数字)替代。

 

二、问题解决

1.将评论内容转换成语料库

  1. 遍历每一行评论,除去第一行
  2. 取每行索引2之后的内容
  3. 然后对每行评论分字
  4. 获取每个字出现的次数,次数等于1的去掉
  5. 然后字作为键,出现次数作为值,将其装入字典
  6. 按照值的大小进行降序排列,只保留前4760个字
  7. 将值更新为索引,之后将<UNK>和<PAD>添加在字典末尾
  8. 至此获取了整个文件的语料库以及每个字的独热编码
  9. 将其以二进制形式保存在pkl文件里
from tqdm import tqdm
import pickle as pkl

MAX_VOCAB_SIZE = 4760  # 词表长度限制
UNK, PAD = '<UNK>', '<PAD>'  # 未知字符号  padding 无含义 unk 识别不出来的字


def build_vocab(file_path, max_size, min_freq):
    '''
    功能:基于文本内容建立词表vocab,vocab中包含语料库中的字
    参数:
    file_path:          需要读取的语料库的路径
    max_size:           获取词频最高的前max_size个词.
    min_freq            剔除字频低于min_freq个的词
    '''
    tokenizer = lambda x: [y for y in x]  # 分字函数
    vocab_dic = {}  # 用于保存词的字典
    with open(file_path, 'r', encoding='utf8') as f:
        i = 0
        for line in tqdm(f):  # 用来显示循环的进度条
            if i == 0:
                i += 1
                continue
            lin = line[2:].strip()  # 获取评论内容 剔除标签  不用split分割 因为评论内容中可能存在逗号
            if not lin:
                continue  # 如果lin中没有内容则 continue
            for word in tokenizer(lin):
                vocab_dic[word] = vocab_dic.get(word, 0) + 1  # 统计每个字出现的次数  .get(key,default) 这个键有值就返回该值 , 没有的话返回默认值
        vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] > min_freq], key=lambda x: x[1], reverse=True)[
                     :max_size]
        vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}
        vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})
        print(vocab_dic)
        pkl.dump(vocab_dic, open('simplifyweibo_4_moods.pkl', 'wb'))  # 将字典以二进制形式保存在pkl 统计完所有文字 每个文字都有独热编码
        print(f"Vocab size:{len(vocab_dic)}")  # 将评论的内容根据词表vocab_dic 转换成词向量
    return vocab_dic


if __name__ == '__main__':
    vocab = build_vocab('simplifyweibo_4_moods.csv', MAX_VOCAB_SIZE, 1)  # 获取语料库中每个字的词向量
    pass
    # print('vocab')

输出:

  • 字典,键是字 值是该字的词向量,整体按照字出现的次数排序

 

2.获取每条评论的词向量、标签和长度

  1. 读取评论文件
  2. 遍历每一行,
  3. 获取评论标签、评论内容以及评论的真实长度
  4. 判断评论长度是否大于70
    1. 若大于,则只取70个字
    2. 若小于,则在末尾填充<PAD>
  5. 读取上一步保存的语料库文件
  6. 判断每条评论中的每个字是否在内,
    1. 不在内的将其转换成<UNK>
    2. 在内的获取该字的值
  7. 一条评论的值装入一个列表,加上该评论的标签和真实长度,将其装入一个元组然后放入另一个列表中
  8. 至此列表中装着每条评论的词向量、标签和长度
  9. 取前80%作为训练集,80%-90%作为验证集,90%-100%作为测试集
from tqdm import tqdm
import pickle as pkl
import random
import torch

UNK, PAD = '<UNK>', '<PAD>'  # 未知字符号

def load_dataset(path, pad_size=70):
    contents = []
    vocab = pkl.load(open('simplifyweibo_4_moods.pkl', 'rb'))  # 读取vocab文件
    tokenizer = lambda x: [y for y in x]
    with open(path, 'r', encoding='utf8') as f:
        i = 0
        for line in tqdm(f):
            if i == 0:
                i += 1
                continue
            if not line:
                continue
            label = int(line[0])  # 获取该行评论标签
            content = line[2:].strip('\n')  # 获取该行评论内容 去掉末尾换行符
            words_line = []
            token = tokenizer(content)  # 将每一行内容进行分字
            seq_len = len(token)  # 获取每一行评论的字长

            if pad_size:
                if len(token) < pad_size:  # 如果一行字少于70 则补充<PAD>
                    token.extend([PAD] * (pad_size - len(token)))
                else:
                    token = token[:pad_size]  # 只取当前评论前70个
                    seq_len = pad_size  # 将当前评论长度换成70
            for word in token:
                words_line.append(vocab.get(word, vocab.get(UNK)))
            contents.append((words_line, int(label), seq_len))
        random.shuffle(contents)  # 打乱顺序
        train_data = contents[:int(len(contents) * 0.8)]  # 前80%为训练
        dev_data = contents[int(len(contents) * 0.8):int(len(contents) * 0.9)]
        test_data = contents[int(len(contents) * 0.9):]
    return vocab, train_data, dev_data, test_data

if __name__ == '__main__':
    vocab, train_data, dev_data, test_data = load_dataset('simplifyweibo_4_moods.csv')
    print(train_data, dev_data, test_data)

输出:

  • 每一个元组第一个元素是列表,列表里装着该条评论每个字的独热编码
  • 第二个元素是该评论的标签
  • 第三个元素使该评论的真实长度

 

3.数据打包

  1. 将数据及其标签打包成128条评论一个的包,并将其转换成张量
  2. 通过if判断,将最后一个不满128的数据打成一个包,同样转换成张量
  3. 最后得到每条评论的独热编码、标签和长度的张量类型数据
  4. 将其传入GPU
class DatasetIterater(object):
    """将数据batches切分为batch_size的包"""

    def __init__(self, batches, batch_size, device):
        self.batches = batches
        self.batch_size = batch_size
        self.device = device
        self.n_batches = len(batches) // batch_size  # 数据划分batch的数量
        self.residue = False  # 记录划分后的数据是否存在剩余的数据
        if len(batches) % self.n_batches != 0:  # 表示有余数
            self.residue = True
        self.index = 0

    def _to_tensor(self, datas):
        x = torch.LongTensor([_[0] for _ in datas]).to(self.device)  # 评论内容
        y = torch.LongTensor([_[1] for _ in datas]).to(self.device)  # 评论情感  最好转换成LongTensor
        # pad前的长度
        seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device)
        return (x, seq_len), y

    def __next__(self):
        if self.residue and self.index == self.n_batches:
            batches = self.batches[self.index * self.batch_size:len(self.batches)]
            self.index += 1
            batches = self._to_tensor(batches)
            return batches

        elif self.index > self.n_batches:
            self.index = 0
            raise StopIteration  # 停止迭代

        else:
            batches = self.batches[self.index * self.batch_size:(self.index + 1) * self.batch_size]
            self.index += 1
            batches = self._to_tensor(batches)
            return batches

    def __iter__(self):
        return self

    def __len__(self):
        if self.residue:
            return self.n_batches + 1
        else:
            return self.n_batches

 

4.建立LSTM循环神经网络模型

1.主程序代码

  • 下载腾讯自然语言处理模型嵌入层的参数并将其转换成张量类型
  • 嵌入层的输出神经元设置为200
embedding_pretrained = torch.tensor(np.load('embedding_Tencent.npz')['embeddings'].astype('float32'))
# embedding_pretrained = None  # 不使用外部训练的词向量
embed = embedding_pretrained.size(1) if embedding_pretrained is not None else 200
class_list = ['喜悦', '愤怒', '厌恶', '低落']
num_classes = len(class_list)
model = demo4TextRNN.Model(embedding_pretrained, len(vocab), embed, num_classes).to(device)
test(model, test_iter, class_list)

 

2.模型代码

  • 告诉模型填充词的独热编码是多少
import torch.nn as nn

class Model(nn.Module):
    def __init__(self, embedding_pretrained, n_vocab, embed, num_classes):
        super(Model, self).__init__()
        if embedding_pretrained is not None:
            self.embedding = nn.Embedding.from_pretrained(embedding_pretrained, padding_idx=n_vocab - 1, freeze=False)
            # embedding_pretrained: Tensor,形状为(n_vocab, embed),其中n_vocab是词汇表大小,embed是嵌入维度。
            # freeze: 是否冻结embedding层的权重
        else:
            self.embedding = nn.Embedding(n_vocab, embed, padding_idx=n_vocab - 1)
        # padding_idx默认None 如果指定 则参数不会对梯度产生影响

        self.lstm = nn.LSTM(embed, 128, 3, bidirectional=True, batch_first=True, dropout=0.3)
        # embed: 输入特征的维度或词嵌入的大小。
        # 128: LSTM 隐藏层的大小,也就是隐藏状态的维度。整数,表示 LSTM 隐藏层输出的特征数量。
        # 3: LSTM层数(堆叠的LSTM层数量)。
        # bidirectional = True: 使用双向LSTM,考虑前向和后向序列信息。
        # batch_first = True: 输入输出形状为(batch_size, seq_length, input_size)。
        # dropout = 0.3: 在LSTM层之间应用的dropout比率(30%  表示 30% 的神经元会被丢弃)。

        self.fc = nn.Linear(128 * 2, num_classes)  # 因为是双向 所以 *2

    def forward(self, x):
        x, _ = x   # 只提取评论的独热编码
        out = self.embedding(x)
        out, _ = self.lstm(out)  # 一个字256个特征 因为是双向的
        out = self.fc(out[:, -1, :])  #
        return out

 

5.建立训练集函数和测试集函数

  1. 传入模型,训练集数据,验证集数据,测试集数据和分类
  2. 后面的操作与多分类时函数逻辑一致
import torch.optim
import numpy as np
from sklearn import metrics
import torch.nn.functional as F


def evaluate(class_list, model, data_iter, test=False):
    model.eval()
    loss_total = 0
    predict_all = np.array([], dtype=int)
    labels_all = np.array([], dtype=int)
    with torch.no_grad():
        for texts, labels in data_iter:
            outputs = model(texts)
            loss = F.cross_entropy(outputs, labels)
            loss_total += loss

            labels = labels.data.cpu().numpy()  # NumPy 操作仅在 CPU 张量上有效
            predict = torch.max(outputs.data, 1)[1].cpu().numpy()
            labels_all = np.append(labels_all, labels)
            predict_all = np.append(predict_all, predict)

    acc = metrics.accuracy_score(labels_all, predict_all)
    if test:
        report = metrics.classification_report(labels_all, predict_all, target_names=class_list, digits=4)
        return acc, loss_total / len(data_iter), report
    return acc, loss_total / len(data_iter)


def test(model, test_iter, class_list):
    model.load_state_dict(torch.load('TextRNN.ckpt'))  # 使用最优模型
    model.eval()
    test_acc, test_loss, test_report = evaluate(class_list, model, test_iter, test=True)
    msg = 'Test Loss:{0:>5.2},Test Acc:{1:>6.2%}'
    print(msg.format(test_loss, test_acc))
    print(test_report)


def train(model, train_iter, dev_iter, test_iter, class_list):
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    total_batch = 0  # 记录进行到多少batch
    dev_best_loss = float('inf')  # 表示无穷大
    last_improve = 0  # 记录上次验证集loss下降的batch数
    flag = False  # 记录是否很久没有效果提升
    epochs = 2
    for epoch in range(epochs):
        print("{}/{}".format(epoch + 1, epochs))

        for i, (trains, labels) in enumerate(train_iter):
            outputs = model(trains)
            loss = F.cross_entropy(outputs, labels)
            model.zero_grad()
            loss.backward()
            optimizer.step()

            if total_batch % 100 == 0:
                predict = torch.max(outputs.data, 1)[1].cpu()  # 第一个参数是要计算的张量,第二个参数是维度。在这里,1 表示按行计算最大值  返回元组 (最大值 对应的索引)
                train_acc = metrics.accuracy_score(labels.data.cpu(), predict)
                dev_acc, dev_loss = evaluate(class_list, model, dev_iter)
                if dev_loss < dev_best_loss:
                    dev_best_loss = dev_loss  # 保存最优模型
                    torch.save(model.state_dict(), 'TextRNN.ckpt')
                    last_improve = total_batch

                msg = 'Iter:{0:>6},Train Loss:{1:>5.2},Train Acc:{2:>6.2%},Val Loss:{3:>5.2},Val Acc:{4:>6.2%}'
                print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc))
                model.train()
            total_batch += 1
            if total_batch - last_improve > 10000:
                print('no')
                flag = True
        if flag:
            break

最后在主程序使用测试集测试一下

由于样本数据不太均衡,所以有些种类的正确率比较低

Logo

助力广东及东莞地区开发者,代码托管、在线学习与竞赛、技术交流与分享、资源共享、职业发展,成为松山湖开发者首选的工作与学习平台

更多推荐