新闻文本分类项目:从TextCNN到多粒度BERT模型的演进

新闻文本分类实践记录

本文主要分享新闻文本分类的两个实践版本:第一版基于TextCNN,通过jieba分词、自建词表和固定长度序列处理数据,利用多尺度卷积提取特征,还提及预训练词向量加载、数据清洗等优化方向;第二版采用Multi-Granularity BERT增强模型,融合BERT不同层次隐藏状态,结合多尺度卷积和自注意力机制强化语义理解,通过分层学习率等策略提升效果。此外,文章还介绍了避免浏览器渲染Markdown出错的方法,如包裹代码块、转义特殊符号等,最后总结两版特点——TextCNN适合快速搭建基线,BERT版适合追求高精度场景,并强调工程实践中的细节处理。


版本一:TextCNN 实现

其实我最开始并没有直接上大模型,而是想先用一个简单点的方法跑通全流程。思路就是这样:先把新闻读进来,用 jieba 分词,然后自己造个词表,把每条新闻切成固定长度的词索引序列,再用一个 TextCNN 模型做分类。下面把核心代码贴出来,接着我会在段落里详细说明。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import jieba
from collections import defaultdict, Counter

class TextClassifierDataset(Dataset):
def __init__(self, file_path, vocab=None, label_map=None, max_len=100):
self.labels, self.texts = [], []
label_counter = defaultdict(int)

# 1. 读取文件并分词
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
label, text = line.strip().split('\t', 1)
self.labels.append(label)
self.texts.append(list(jieba.cut(text)))
label_counter[label] += 1

# 2. 标签映射:训练时自己生成,测试时沿用
if label_map is None:
self.label_map = {label: idx for idx, label in enumerate(label_counter.keys())}
else:
self.label_map = label_map

# 3. 词表构建:统计所有分词,把高频前20000个词拿出来
if vocab is None:
word_counts = Counter([word for text in self.texts for word in text])
self.vocab = {
'<PAD>': 0,
'<UNK>': 1,
**{word: i+2 for i, (word, _) in enumerate(word_counts.most_common(20000))}
}
else:
self.vocab = vocab

self.max_len = max_len

def __len__(self):
return len(self.texts)

def __getitem__(self, idx):
text = self.texts[idx][:self.max_len]
# 把词映射到索引,没在词表里的用1(<UNK>),再pad到max_len
text_indices = [self.vocab.get(word, 1) for word in text]
if len(text_indices) < self.max_len:
text_indices += [0] * (self.max_len - len(text_indices))

label = self.label_map[self.labels[idx]]
return torch.LongTensor(text_indices), torch.tensor(label, dtype=torch.long)

class TextCNN(nn.Module):
def __init__(self, vocab_size, embed_dim, num_classes, filter_sizes=(3,4,5), num_filters=100, dropout=0.5):
super().__init__()
# 嵌入层
self.embedding = nn.Embedding(vocab_size, embed_dim)
# 多尺寸卷积
self.convs = nn.ModuleList([
nn.Conv2d(1, num_filters, (k, embed_dim))
for k in filter_sizes
])
# 分类头
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(num_filters * len(filter_sizes), num_classes)

def forward(self, x):
# x: [batch, seq_len]
x = self.embedding(x) # [batch, seq_len, embed_dim]
x = x.unsqueeze(1) # [batch, 1, seq_len, embed_dim]

pooled_outputs = []
for conv in self.convs:
conv_out = torch.relu(conv(x)).squeeze(3) # [batch, num_filters, seq_len - k + 1]
pooled = torch.max(conv_out, dim=2)[0] # [batch, num_filters]
pooled_outputs.append(pooled)

x = torch.cat(pooled_outputs, dim=1) # [batch, num_filters*len(filter_sizes)]
x = self.dropout(x)
return self.fc(x) # [batch, num_classes]

def train_model():
BATCH_SIZE = 64
EMBED_DIM = 300
EPOCHS = 10

train_set = TextClassifierDataset('train.txt')
test_set = TextClassifierDataset('test.txt',
vocab=train_set.vocab,
label_map=train_set.label_map)

model = TextCNN(
vocab_size=len(train_set.vocab),
embed_dim=EMBED_DIM,
num_classes=len(train_set.label_map)
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.CrossEntropyLoss()

train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE)

for epoch in range(EPOCHS):
model.train()
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()

model.eval()
correct = 0
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = model(inputs)
preds = torch.argmax(outputs, dim=1)
correct += (preds == labels).sum().item()

accuracy = correct / len(test_set)
print(f'Epoch {epoch+1}/{EPOCHS} | Test Acc: {accuracy:.4f}')

if __name__ == '__main__':
train_model()
````

看完这段代码,思路其实挺直白。先把新闻文本按行读进来,用 `jieba` 切成一个个词,然后把每个词映射到词表索引(不存在的用 `<UNK>`),固定长度截断/补齐,得到一个 `[max_len]` 的整数列表,给模型做 Embedding 时就能直接拿整数索引了。嵌入层之后,TextCNN 核心在于多尺寸的 2D 卷积:把 `[batch, 1, seq_len, embed_dim]` 看成一个“单通道图像”,三个卷积核大小分别是 `(3, embed_dim)`、`(4, embed_dim)`、`(5, embed_dim)`,也就是在时间方向上滑三格、四格、五格,每次都覆盖整个 `embed_dim`。卷积后做 ReLU,再对序列方向做 max-over-time pooling,就剩下 `[batch, num_filters]`,三个尺寸的输出拼在一起,Dropout 之后接全连接层拿 logits,用交叉熵损失训练。每个 epoch 跑完训练,就在测试集上 eval 一下,算个整体准确率。

**几点心得和可改进之处:**

1. **预训练词向量。**
代码里嵌入层随机初始化,如果想让模型更快收敛或效果更好,可以把中文的预训练词向量(比如 Tencent AI Lab 或 FastText)加载进来:

```Python
embed_matrix = load_pretrained_embeddings() # 形状 [vocab_size, EMBED_DIM]
model.embedding.weight.data.copy_(torch.from_numpy(embed_matrix))

如果不想在训练时破坏预训练向量,还可以设置 model.embedding.weight.requires_grad = False,固定这部分权重。

  1. 监控训练集损失/准确率。
    我这段代码只在每个 epoch 后打印测试集准确率,跑久了有时候会过拟合都不知道。更好的做法是每个 epoch 结束时,也在训练集上算个 loss 或 accuracy 曲线,这样能直观地判断哪里需要减小学习率或者早停(Early Stopping)。

  2. 数据清洗。
    现在对文本只做了 jieba 分词,没有去停用词、标点符号、URL 等噪声。如果新闻里有链接、邮箱、HTML 标签,最好先正则去除,再分词效果会更干净。

  3. 平衡数据与评价指标。
    如果新闻类别分布不均衡,比如“娱乐”占了太多,而“科技”很少,仅仅看整体准确率容易误导。建议加上 precision/recall/F1,或者把某个特别关心的类别 ROC-AUC 单独看一下。

  4. 超参数调优。
    TextCNN 里用的滤波器数量是 100、嵌入维度是 300、截断长度 100,这些都可以调。根据新闻文本平均长度,可以把 max_len 调到 50、80 或者 200; num_filters 可以调成 200、300 看效果;卷积核大小也可以加上 (2,3,4)(3,4,5,6) 组合试试。做网格搜索或贝叶斯优化能找到更优 hyper-parameters。


版本二:Multi-Granularity BERT 增强版

TextCNN 是个好基线,但它对长距离依赖和深层语义的理解有限。于是我在第二版里直接用 BERT 预训练模型,把它当成一个强大的特征提取器,然后在 BERT 的基础上再做多粒度融合、多尺度卷积和自注意力,尽可能兼顾局部模式和全局上下文。先贴出代码,再解释思路。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from transformers import AutoModel, AutoTokenizer
from tqdm.auto import tqdm
from sklearn.metrics import classification_report

class TextDataset(Dataset):
def __init__(self, file_path, tokenizer, max_len=128, label_map=None):
self.tokenizer = tokenizer
self.max_len = max_len
self.texts, self.labels = self._load_data(file_path)

if label_map is None:
# 训练集将标签去重排个序,测试集沿用
self.unique_labels = sorted(set(self.labels))
self.label_map = {label: i for i, label in enumerate(self.unique_labels)}
else:
self.label_map = label_map
invalid_labels = [l for l in self.labels if l not in self.label_map]
if invalid_labels:
raise ValueError(f"有无法对齐的标签: {set(invalid_labels)}")

def _load_data(self, file_path):
texts, labels = [], []
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
label, text = line.strip().split('\t', 1)
texts.append(text)
labels.append(label)
return texts, labels

def __len__(self):
return len(self.texts)

def __getitem__(self, idx):
text = self.texts[idx]
encoding = self.tokenizer.encode_plus(
text,
add_special_tokens=True,
max_length=self.max_len,
padding='max_length',
truncation=True,
return_attention_mask=True,
return_tensors='pt'
)
return {
'input_ids': encoding['input_ids'].squeeze(0),
'attention_mask': encoding['attention_mask'].squeeze(0).bool(),
'label': torch.tensor(self.label_map[self.labels[idx]], dtype=torch.long)
}

class MultiGranularityBERT(nn.Module):
def __init__(self, num_classes, pretrained_name='bert-base-chinese'):
super().__init__()
# 加载预训练 BERT
self.bert = AutoModel.from_pretrained(pretrained_name)

# 多尺度卷积:in=768, out=256, kernel=3,5,7
self.conv_layers = nn.ModuleList([
nn.Conv1d(768, 256, kernel_size=k, padding=k//2)
for k in [3, 5, 7]
])

# 多头自注意力:embed_dim=256*3=768, num_heads=8
self.attention = nn.MultiheadAttention(
embed_dim=256 * 3,
num_heads=8,
batch_first=True
)

# 分类头
self.classifier = nn.Sequential(
nn.Linear(256 * 3, 512),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(512, num_classes)
)

def forward(self, input_ids, attention_mask):
# BERT 前向,返回所有隐藏层
outputs = self.bert(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True
)
hidden_states = outputs.hidden_states
# hidden_states[0]: embedding 层
# hidden_states[-4]: 倒数第4层
# hidden_states[-1]: 顶层
# 取三层做融合
conv_input = torch.stack([
hidden_states[-1],
hidden_states[-4],
hidden_states[0]
], dim=-1).mean(dim=-1) # [batch, seq_len, 768]

# 多尺度卷积:先 permute -> [batch, 768, seq_len]
conv_outputs = []
for conv in self.conv_layers:
x = conv_input.permute(0, 2, 1) # [B, 768, L]
conv_out = conv(x) # [B, 256, L]
conv_out = nn.functional.gelu(conv_out)
conv_outputs.append(conv_out.permute(0, 2, 1)) # [B, L, 256]

# 拼接 -> [B, L, 768]
combined = torch.cat(conv_outputs, dim=2)

# 自注意力:mask 掉 padding
key_padding_mask = (attention_mask == 0)
attn_output, _ = self.attention(
combined, combined, combined,
key_padding_mask=key_padding_mask
) # [B, L, 768]

# 平均池化 -> [B, 768]
pooled = attn_output.mean(dim=1)
logits = self.classifier(pooled) # [B, num_classes]
return logits

def train_enhanced():
tokenizer = AutoTokenizer.from_pretrained('bert-base-chinese')
train_set = TextDataset('train.txt', tokenizer)
test_set = TextDataset('test.txt', tokenizer, label_map=train_set.label_map)

label_map = train_set.label_map
id2label = {v: k for k, v in label_map.items()}

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = MultiGranularityBERT(num_classes=len(label_map))
if torch.cuda.device_count() > 1:
print(f"使用 {torch.cuda.device_count()} 个 GPU 进行训练")
model = nn.DataParallel(model)
model.to(device)
model_core = getattr(model, 'module', model)

optimizer = optim.AdamW([
{'params': model_core.bert.parameters(), 'lr': 2e-5},
{'params': model_core.classifier.parameters(), 'lr': 1e-3}
], weight_decay=0.01)
criterion = nn.CrossEntropyLoss()

train_loader = DataLoader(train_set, batch_size=32, shuffle=True, num_workers=4)
test_loader = DataLoader(test_set, batch_size=64, num_workers=4)

best_acc = 0.0
for epoch in range(10):
model.train()
epoch_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/10")
for batch in epoch_bar:
inputs = {k: v.to(device) for k, v in batch.items() if k != 'label'}
labels = batch['label'].to(device)

optimizer.zero_grad()
outputs = model(**inputs)
loss = criterion(outputs, labels)
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()

epoch_bar.set_postfix({'loss': f"{loss.item():.4f}", 'lr': optimizer.param_groups[0]['lr']})

# 验证阶段
model.eval()
all_preds, all_labels = [], []
class_correct = {cls: 0 for cls in label_map.keys()}
class_total = {cls: 0 for cls in label_map.keys()}

with torch.no_grad():
test_bar = tqdm(test_loader, desc="测试中", leave=False)
for batch in test_bar:
inputs = {k: v.to(device) for k, v in batch.items() if k != 'label'}
labels = batch['label'].to(device)

outputs = model(**inputs)
preds = torch.argmax(outputs, dim=1)

all_preds.extend(preds.cpu().numpy())
all_labels.extend(labels.cpu().numpy())

for label, pred in zip(labels, preds):
cls_name = id2label[label.item()]
class_total[cls_name] += 1
if pred == label:
class_correct[cls_name] += 1

acc_so_far = 100 * sum(class_correct.values()) / sum(class_total.values())
test_bar.set_postfix({'acc': f"{acc_so_far:.1f}%"})

# 打印分类报告
print("\n" + "="*50)
print(f"Epoch {epoch+1} 分类报告:")
print(classification_report(all_labels, all_preds, target_names=label_map.keys(), digits=4))

print("\n各类别准确率:")
for cls in label_map:
total = class_total[cls]
correct = class_correct[cls]
acc = 100 * correct / total if total > 0 else 0
print(f" {cls}: {acc:.2f}% ({correct}/{total})")

# 保存最佳模型
current_acc = sum(class_correct.values()) / sum(class_total.values())
if current_acc > best_acc:
best_acc = current_acc
torch.save(getattr(model, 'module', model).state_dict(), 'best_model.pth')
print(f"发现新最佳模型,准确率: {best_acc:.4f}")

if __name__ == '__main__':
train_enhanced()

这段代码里,我直接用了 bert-base-chinese,并且在 forward 中把 output_hidden_states=True,拿到 BERT 的 13 层输出 (hidden_states)。为了把不同层次的语义信息都用上,选取最顶层(第 12 层)、倒数第 4 层(第 8 层)和最底层 embedding(第 0 层),把它们在“隐藏向量维度”上堆叠并求平均,得到一个融合后的向量 conv_input,形状是 [batch, seq_len, 768]

接着,做三路 Conv1d,窗口大小分别是 3、5、7,对 conv_input 先做维度变换变成 [batch, 768, seq_len],然后经过 nn.Conv1d(768,256,kernel=k,padding=k//2),得到 [batch,256,seq_len],激活用 GELU,然后再把维度变回 [batch,seq_len,256],三路拼接成 [batch,seq_len,768]。这一步可以看作是在“词向量+位置信息+中层语义+高层语义”融合之后,进行不同粒度的 n-gram 局部特征提取。

卷积之后,我加了一个多头自注意力层 (nn.MultiheadAttention(embed_dim=768,num_heads=8,batch_first=True))。它的输入、键、值都用同一个 combined,并且用 key_padding_mask=(attention_mask==0) 来屏蔽掉 padding。这样每个 token 的新表示就不仅有局部卷积特征,还融合了全局上下文,输出形状仍然是 [batch,seq_len,768]

最后对那条序列在时间维度上做平均池化(pooled = attn_output.mean(dim=1)),得到 [batch,768],再串联两层全连接(512 + ReLU + Dropout + 最终映射到类别数)拿到 logits,送交叉熵损失训练。同 TextCNN 那里类似,我在训练循环里也做了分层学习率:BERT 参数 lr=2e-5,分类头 lr=1e-3,梯度裁剪 norm_max=1.0,多 GPU 自动并行,测试阶段还打印分类报告和各类别准确率,并保存最优模型。

版本一 vs 版本二:改进思路

第一版跑通之后,整体结构和流程很清晰了,但是体验到模型在理解语义时有明显局限:TextCNN 只能捕捉固定窗口大小的 n-gram(比如说“人民大会堂”这种连续三词组合能抓到,但是一句话里后半截的上下文就没办法感知到),如果新闻比较长、句子间需要远距离依赖,就很难分辨。有好几次我试过把 max_len 拉到 200,发现效果也不明显提升,反而训练慢。

所以第二版就上了 BERT,把它当作一棵已经“训练好”的深度语义特征树,用它提取的上下文向量再去做更深层次的加工。具体改进点如下:

  1. 分词和词表
    TextCNN 里用 jieba 分词,还得自己去统计高频 20000 词做词表。但 BERT 自带的分词器(WordPiece/BPE)已经比较完备,词表在几十万规模,拆子词也能更好地涵盖新词和低频词。这样我们不再担心 OOV(out-of-vocabulary)的问题,也不用自己统计词频、构造词表。

  2. 多层级特征
    BERT 每层 encoder 都捕捉了不同层次的语义,底层 embedding 偏向词形+位置,中层更偏句法结构,顶层偏全局上下文。直接用顶层有时会丢一些低层信息,比如词性和近似词义;只用底层则没有全局视野。融合这三层能让模型对“局部搭配”和“全局语义”都有感知。

  3. 多尺度卷积 + 自注意力
    TextCNN 本身有多尺度卷积,但它从头到尾都是在随机初始化的嵌入上做卷积。第二版先用 BERT 抽取出高维向量序列,再在高维向量序列上继续做卷积,能得到更具语义信息的 n-gram 特征。同时加一个自注意力层,能让模型在局部卷积特征基础上再捕获跨越很远的依赖(比如标题和结尾的呼应),提高整体表示能力。

  4. 分层学习率与细粒度调参
    TextCNN 用同一个学习率就行;BERT 预训练参数如果不小心用太大 lr,很容易把预训练权重“冲散”,只要 10510^{-5} 级别就好。新加的卷积层和分类头一开始是随机初始化,需要稍大点的 lr 才能快速收敛,用 10310^{-3} 刚好。这样分层调参,有助于整体更稳定地训练。

综上,第二版在训练速度、效果和稳定性上都比第一版有明显提升,但也要付出更多硬件资源和时间成本:最少得用一块 12GB 以上显存的 GPU,如果数据集大可能要多卡并行。如果只是想快速搭建一个 baseline,第一版就够用了。如果想追求更高准确率或产品化,第二版更适合。


总结

  1. 版本一 用 TextCNN:流程简单,自己做分词、词表、固定长度索引,跑得快,代码量少,最适合作为入门或资源有限时的 baseline。
  2. 版本二 用 BERT + 多粒度卷积 + 自注意力:利用预训练模型的语义能力,再叠加卷积和注意力,能更好地抓住新闻里的深层次信息,适合追求更高准确率的场景,但需要更多 GPU 资源和更细腻的调参。
Donate
  • Copyrights © 2015-2025 Xinyu Zhuang
  • Visitors: | Views:

请我喝杯咖啡吧~

支付宝
微信