使用核外学习进行文本分类

Machine LearningMachine LearningBeginner
立即练习

This tutorial is from open-source community. Access the source code

💡 本教程由 AI 辅助翻译自英文原版。如需查看原文,您可以 切换至英文原版

简介

本实验提供了一个示例,展示如何使用 scikit-learn 通过核外学习进行文本分类。目标是从无法装入主内存的数据中进行学习。为实现这一目标,我们使用了一个支持 partial_fit 方法的在线分类器,该分类器将分批接收示例数据。为确保特征空间随时间保持不变,我们利用了一个哈希向量化器(HashingVectorizer),它会将每个示例投影到相同的特征空间中。这在文本分类中特别有用,因为每一批数据中可能会出现新的特征(单词)。

虚拟机使用提示

虚拟机启动完成后,点击左上角切换到“笔记本”标签页,以访问 Jupyter Notebook 进行练习。

有时,你可能需要等待几秒钟让 Jupyter Notebook 完成加载。由于 Jupyter Notebook 的限制,操作验证无法自动化。

如果你在学习过程中遇到问题,可以随时向 Labby 提问。课程结束后请提供反馈,我们会及时为你解决问题。


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL sklearn(("Sklearn")) -.-> sklearn/CoreModelsandAlgorithmsGroup(["Core Models and Algorithms"]) sklearn(("Sklearn")) -.-> sklearn/DataPreprocessingandFeatureEngineeringGroup(["Data Preprocessing and Feature Engineering"]) sklearn(("Sklearn")) -.-> sklearn/UtilitiesandDatasetsGroup(["Utilities and Datasets"]) ml(("Machine Learning")) -.-> ml/FrameworkandSoftwareGroup(["Framework and Software"]) sklearn/CoreModelsandAlgorithmsGroup -.-> sklearn/linear_model("Linear Models") sklearn/CoreModelsandAlgorithmsGroup -.-> sklearn/naive_bayes("Naive Bayes") sklearn/DataPreprocessingandFeatureEngineeringGroup -.-> sklearn/feature_extraction("Feature Extraction") sklearn/UtilitiesandDatasetsGroup -.-> sklearn/datasets("Datasets") ml/FrameworkandSoftwareGroup -.-> ml/sklearn("scikit-learn") subgraph Lab Skills sklearn/linear_model -.-> lab-49235{{"使用核外学习进行文本分类"}} sklearn/naive_bayes -.-> lab-49235{{"使用核外学习进行文本分类"}} sklearn/feature_extraction -.-> lab-49235{{"使用核外学习进行文本分类"}} sklearn/datasets -.-> lab-49235{{"使用核外学习进行文本分类"}} ml/sklearn -.-> lab-49235{{"使用核外学习进行文本分类"}} end

导入库并定义解析器

import itertools
from pathlib import Path
from hashlib import sha256
import re
import tarfile
import time
import sys

import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams

from html.parser import HTMLParser
from urllib.request import urlretrieve
from sklearn.datasets import get_data_home
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.linear_model import Perceptron
from sklearn.naive_bayes import MultinomialNB


class ReutersParser(HTMLParser):
    """用于解析 SGML 文件并一次生成一个文档的实用工具类。"""

    def __init__(self, encoding="latin-1"):
        HTMLParser.__init__(self)
        self._reset()
        self.encoding = encoding

    def handle_starttag(self, tag, attrs):
        method = "start_" + tag
        getattr(self, method, lambda x: None)(attrs)

    def handle_endtag(self, tag):
        method = "end_" + tag
        getattr(self, method, lambda: None)()

    def _reset(self):
        self.in_title = 0
        self.in_body = 0
        self.in_topics = 0
        self.in_topic_d = 0
        self.title = ""
        self.body = ""
        self.topics = []
        self.topic_d = ""

    def parse(self, fd):
        self.docs = []
        for chunk in fd:
            self.feed(chunk.decode(self.encoding))
            for doc in self.docs:
                yield doc
            self.docs = []
        self.close()

    def handle_data(self, data):
        if self.in_body:
            self.body += data
        elif self.in_title:
            self.title += data
        elif self.in_topic_d:
            self.topic_d += data

    def start_reuters(self, attributes):
        pass

    def end_reuters(self):
        self.body = re.sub(r"\s+", r" ", self.body)
        self.docs.append(
            {"title": self.title, "body": self.body, "topics": self.topics}
        )
        self._reset()

    def start_title(self, attributes):
        self.in_title = 1

    def end_title(self):
        self.in_title = 0

    def start_body(self, attributes):
        self.in_body = 1

    def end_body(self):
        self.in_body = 0

    def start_topics(self, attributes):
        self.in_topics = 1

    def end_topics(self):
        self.in_topics = 0

    def start_d(self, attributes):
        self.in_topic_d = 1

    def end_d(self):
        self.in_topic_d = 0
        self.topics.append(self.topic_d)
        self.topic_d = ""

定义路透社文档流

def stream_reuters_documents(data_path=None):
    """遍历路透社数据集的文档。

    如果 `data_path` 目录不存在,路透社存档将自动下载并解压。

    文档表示为具有'body'(字符串)、'title'(字符串)、'topics'(字符串列表)键的字典。

    """

    DOWNLOAD_URL = (
        "http://archive.ics.uci.edu/ml/machine-learning-databases/"
        "reuters21578-mld/reuters21578.tar.gz"
    )
    ARCHIVE_SHA256 = "3bae43c9b14e387f76a61b6d82bf98a4fb5d3ef99ef7e7075ff2ccbcf59f9d30"
    ARCHIVE_FILENAME = "reuters21578.tar.gz"

    if data_path is None:
        data_path = Path(get_data_home()) / "reuters"
    else:
        data_path = Path(data_path)
    if not data_path.exists():
        """下载数据集。"""
        print("downloading dataset (once and for all) into %s" % data_path)
        data_path.mkdir(parents=True, exist_ok=True)

        def progress(blocknum, bs, size):
            total_sz_mb = "%.2f MB" % (size / 1e6)
            current_sz_mb = "%.2f MB" % ((blocknum * bs) / 1e6)
            if _not_in_sphinx():
                sys.stdout.write("\rdownloaded %s / %s" % (current_sz_mb, total_sz_mb))

        archive_path = data_path / ARCHIVE_FILENAME

        urlretrieve(DOWNLOAD_URL, filename=archive_path, reporthook=progress)
        if _not_in_sphinx():
            sys.stdout.write("\r")

        ## 检查存档是否未被篡改:
        assert sha256(archive_path.read_bytes()).hexdigest() == ARCHIVE_SHA256

        print("untarring Reuters dataset...")
        tarfile.open(archive_path, "r:gz").extractall(data_path)
        print("done.")

    parser = ReutersParser()
    for filename in data_path.glob("*.sgm"):
        for doc in parser.parse(open(filename, "rb")):
            yield doc

设置向量化器并留出测试集

## 创建向量化器并将特征数量限制在合理的最大值
vectorizer = HashingVectorizer(decode_error="ignore", n_features=2**18, alternate_sign=False)

## 遍历解析后的路透社 SGML 文件的迭代器。
data_stream = stream_reuters_documents()

## 我们学习“acq”类与其他所有类之间的二分类。
## 选择“acq”是因为它在路透社文件中分布大致均匀。对于其他数据集,应该注意创建一个包含实际比例正例的测试集。
all_classes = np.array([0, 1])
positive_class = "acq"

## 这里有一些支持 `partial_fit` 方法的分类器
partial_fit_classifiers = {
    "SGD": SGDClassifier(max_iter=5),
    "Perceptron": Perceptron(),
    "NB Multinomial": MultinomialNB(alpha=0.01),
    "Passive-Aggressive": PassiveAggressiveClassifier(),
}

## 测试数据统计信息
test_stats = {"n_test": 0, "n_test_pos": 0}

## 首先我们留出一些示例来估计准确率
n_test_documents = 1000
X_test_text, y_test = get_minibatch(data_stream, 1000)
X_test = vectorizer.transform(X_test_text)
test_stats["n_test"] += len(y_test)
test_stats["n_test_pos"] += sum(y_test)
print("Test set is %d documents (%d positive)" % (len(y_test), sum(y_test)))

定义一个函数来获取一批示例

def get_minibatch(doc_iter, size, pos_class=positive_class):
    """提取一批示例,返回一个元组 X_text, y。

    注意:size 是在排除没有分配主题的无效文档之前的数量。

    """
    data = [
        ("{title}\n\n{body}".format(**doc), pos_class in doc["topics"])
        for doc in itertools.islice(doc_iter, size)
        if doc["topics"]
    ]
    if not len(data):
        return np.asarray([], dtype=int), np.asarray([], dtype=int)
    X_text, y = zip(*data)
    return X_text, np.asarray(y, dtype=int)

定义一个生成器函数来遍历小批量数据

def iter_minibatches(doc_iter, minibatch_size):
    """小批量数据的生成器。"""
    X_text, y = get_minibatch(doc_iter, minibatch_size)
    while len(X_text):
        yield X_text, y
        X_text, y = get_minibatch(doc_iter, minibatch_size)

遍历示例的小批量数据并更新分类器

## 我们将为分类器提供包含 1000 个文档的小批量数据;这意味着我们在任何时候最多在内存中保留 1000 个文档。文档批次越小,部分拟合方法的相对开销就越大。
minibatch_size = 1000

## 创建解析路透社 SGML 文件并将文档作为流进行迭代的数据流。
minibatch_iterators = iter_minibatches(data_stream, minibatch_size)
total_vect_time = 0.0

## 主循环:遍历示例的小批量数据
for i, (X_train_text, y_train) in enumerate(minibatch_iterators):
    tick = time.time()
    X_train = vectorizer.transform(X_train_text)
    total_vect_time += time.time() - tick

    for cls_name, cls in partial_fit_classifiers.items():
        tick = time.time()
        ## 使用当前小批量中的示例更新估计器
        cls.partial_fit(X_train, y_train, classes=all_classes)

        ## 累积测试准确率统计信息
        cls_stats[cls_name]["total_fit_time"] += time.time() - tick
        cls_stats[cls_name]["n_train"] += X_train.shape[0]
        cls_stats[cls_name]["n_train_pos"] += sum(y_train)
        tick = time.time()
        cls_stats[cls_name]["accuracy"] = cls.score(X_test, y_test)
        cls_stats[cls_name]["prediction_time"] = time.time() - tick
        acc_history = (cls_stats[cls_name]["accuracy"], cls_stats[cls_name]["n_train"])
        cls_stats[cls_name]["accuracy_history"].append(acc_history)
        run_history = (
            cls_stats[cls_name]["accuracy"],
            total_vect_time + cls_stats[cls_name]["total_fit_time"],
        )
        cls_stats[cls_name]["runtime_history"].append(run_history)

        if i % 3 == 0:
            print(progress(cls_name, cls_stats[cls_name]))
    if i % 3 == 0:
        print("\n")

绘制结果

## 绘制准确率变化
plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## 绘制准确率随示例数量的变化
    accuracy, n_examples = zip(*stats["accuracy_history"])
    plot_accuracy(n_examples, accuracy, "训练示例数量 (#)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="最佳")

plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## 绘制准确率随运行时间的变化
    accuracy, runtime = zip(*stats["runtime_history"])
    plot_accuracy(runtime, accuracy, "运行时间 (秒)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="最佳")

## 绘制拟合时间
plt.figure()
fig = plt.gcf()
cls_runtime = [stats["total_fit_time"] for cls_name, stats in sorted(cls_stats.items())]

cls_runtime.append(total_vect_time)
cls_names.append("向量化")
bar_colors = ["b", "g", "r", "c", "m", "y"]

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=10)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("运行时间 (秒)")
ax.set_title("训练时间")


def autolabel(rectangles):
    """在矩形上附加一些文本作为自动标签。"""
    for rect in rectangles:
        height = rect.get_height()
        ax.text(
            rect.get_x() + rect.get_width() / 2.0,
            1.05 * height,
            "%.4f" % height,
            ha="中心",
            va="底部",
        )
        plt.setp(plt.xticks()[1], rotation=30)


autolabel(rectangles)
plt.tight_layout()
plt.show()

## 绘制预测时间
plt.figure()
cls_runtime = []
cls_names = list(sorted(cls_stats.keys()))
for cls_name, stats in sorted(cls_stats.items()):
    cls_runtime.append(stats["prediction_time"])
cls_runtime.append(parsing_time)
cls_names.append("读取/解析\n+ 特征提取")
cls_runtime.append(vectorizing_time)
cls_names.append("哈希\n+ 向量化")

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=8)
plt.setp(plt.xticks()[1], rotation=30)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("运行时间 (秒)")
ax.set_title("预测时间 (%d 个实例)" % n_test_documents)
autolabel(rectangles)
plt.tight_layout()
plt.show()

总结

在这个实验中,我们学习了如何使用 scikit-learn 通过核外学习进行文本分类。我们使用了一个支持 partial_fit 方法的在线分类器,并向其输入一批批的示例。我们还利用了 HashingVectorizer 来确保特征空间随时间保持不变。然后,我们留出一个测试集,并遍历示例的小批量数据来更新分类器。最后,我们绘制结果以可视化准确率的变化和训练时间。