コア外学習を用いたテキスト分類

Machine LearningMachine LearningBeginner
今すぐ練習

This tutorial is from open-source community. Access the source code

💡 このチュートリアルは英語版からAIによって翻訳されています。原文を確認するには、 ここをクリックしてください

はじめに

この実験では、コア外学習を使ってscikit - learnをテキスト分類にどのように使うかを示す例を提供します。目的は、メインメモリに収まらないデータから学習することです。これを達成するために、partial_fitメソッドをサポートするオンライン分類器を利用し、これにサンプルのバッチを供給します。特徴空間が時間の経過とともに同じままであることを確認するために、各サンプルを同じ特徴空間に射影するHashingVectorizerを利用します。これは、各バッチに新しい特徴(単語)が現れる可能性のあるテキスト分類の場合に特に役立ちます。

VMのヒント

VMの起動が完了した後、左上隅をクリックしてノートブックタブに切り替え、Jupyter Notebookを使って練習します。

時々、Jupyter Notebookが読み込み終了するまで数秒待つ必要があります。Jupyter Notebookの制限により、操作の検証を自動化することはできません。

学習中に問題に遭遇した場合は、Labbyにお問い合わせください。セッション後にフィードバックを提供してください。そうすれば、迅速に問題を解決いたします。


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL sklearn(("Sklearn")) -.-> sklearn/DataPreprocessingandFeatureEngineeringGroup(["Data Preprocessing and Feature Engineering"]) sklearn(("Sklearn")) -.-> sklearn/UtilitiesandDatasetsGroup(["Utilities and Datasets"]) ml(("Machine Learning")) -.-> ml/FrameworkandSoftwareGroup(["Framework and Software"]) sklearn(("Sklearn")) -.-> sklearn/CoreModelsandAlgorithmsGroup(["Core Models and Algorithms"]) sklearn/CoreModelsandAlgorithmsGroup -.-> sklearn/linear_model("Linear Models") sklearn/CoreModelsandAlgorithmsGroup -.-> sklearn/naive_bayes("Naive Bayes") sklearn/DataPreprocessingandFeatureEngineeringGroup -.-> sklearn/feature_extraction("Feature Extraction") sklearn/UtilitiesandDatasetsGroup -.-> sklearn/datasets("Datasets") ml/FrameworkandSoftwareGroup -.-> ml/sklearn("scikit-learn") subgraph Lab Skills sklearn/linear_model -.-> lab-49235{{"コア外学習を用いたテキスト分類"}} sklearn/naive_bayes -.-> lab-49235{{"コア外学習を用いたテキスト分類"}} sklearn/feature_extraction -.-> lab-49235{{"コア外学習を用いたテキスト分類"}} sklearn/datasets -.-> lab-49235{{"コア外学習を用いたテキスト分類"}} ml/sklearn -.-> lab-49235{{"コア外学習を用いたテキスト分類"}} end

ライブラリのインポートとパーサの定義

import itertools
from pathlib import Path
from hashlib import sha256
import re
import tarfile
import time
import sys

import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams

from html.parser import HTMLParser
from urllib.request import urlretrieve
from sklearn.datasets import get_data_home
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.linear_model import Perceptron
from sklearn.naive_bayes import MultinomialNB


class ReutersParser(HTMLParser):
    """SGMLファイルを解析し、1回に1つの文書を生成するためのユーティリティクラス。"""

    def __init__(self, encoding="latin-1"):
        HTMLParser.__init__(self)
        self._reset()
        self.encoding = encoding

    def handle_starttag(self, tag, attrs):
        method = "start_" + tag
        getattr(self, method, lambda x: None)(attrs)

    def handle_endtag(self, tag):
        method = "end_" + tag
        getattr(self, method, lambda: None)()

    def _reset(self):
        self.in_title = 0
        self.in_body = 0
        self.in_topics = 0
        self.in_topic_d = 0
        self.title = ""
        self.body = ""
        self.topics = []
        self.topic_d = ""

    def parse(self, fd):
        self.docs = []
        for chunk in fd:
            self.feed(chunk.decode(self.encoding))
            for doc in self.docs:
                yield doc
            self.docs = []
        self.close()

    def handle_data(self, data):
        if self.in_body:
            self.body += data
        elif self.in_title:
            self.title += data
        elif self.in_topic_d:
            self.topic_d += data

    def start_reuters(self, attributes):
        pass

    def end_reuters(self):
        self.body = re.sub(r"\s+", r" ", self.body)
        self.docs.append(
            {"title": self.title, "body": self.body, "topics": self.topics}
        )
        self._reset()

    def start_title(self, attributes):
        self.in_title = 1

    def end_title(self):
        self.in_title = 0

    def start_body(self, attributes):
        self.in_body = 1

    def end_body(self):
        self.in_body = 0

    def start_topics(self, attributes):
        self.in_topics = 1

    def end_topics(self):
        self.in_topics = 0

    def start_d(self, attributes):
        self.in_topic_d = 1

    def end_d(self):
        self.in_topic_d = 0
        self.topics.append(self.topic_d)
        self.topic_d = ""

ルイト社の文書のストリームを定義する

def stream_reuters_documents(data_path=None):
    """ルイト社のデータセットの文書を反復処理する。

    `data_path` ディレクトリが存在しない場合、ルイト社のアーカイブは自動的にダウンロードされて解凍されます。

    文書は、'body'(str)、'title'(str)、'topics'(list(str))のキーを持つ辞書として表されます。

    """

    DOWNLOAD_URL = (
        "http://archive.ics.uci.edu/ml/machine-learning-databases/"
        "reuters21578-mld/reuters21578.tar.gz"
    )
    ARCHIVE_SHA256 = "3bae43c9b14e387f76a61b6d82bf98a4fb5d3ef99ef7e7075ff2ccbcf59f9d30"
    ARCHIVE_FILENAME = "reuters21578.tar.gz"

    if data_path is None:
        data_path = Path(get_data_home()) / "reuters"
    else:
        data_path = Path(data_path)
    if not data_path.exists():
        """データセットをダウンロードする。"""
        print("downloading dataset (once and for all) into %s" % data_path)
        data_path.mkdir(parents=True, exist_ok=True)

        def progress(blocknum, bs, size):
            total_sz_mb = "%.2f MB" % (size / 1e6)
            current_sz_mb = "%.2f MB" % ((blocknum * bs) / 1e6)
            if _not_in_sphinx():
                sys.stdout.write("\rdownloaded %s / %s" % (current_sz_mb, total_sz_mb))

        archive_path = data_path / ARCHIVE_FILENAME

        urlretrieve(DOWNLOAD_URL, filename=archive_path, reporthook=progress)
        if _not_in_sphinx():
            sys.stdout.write("\r")

        ## アーカイブが改竄されていないことを確認する:
        assert sha256(archive_path.read_bytes()).hexdigest() == ARCHIVE_SHA256

        print("untarring Reuters dataset...")
        tarfile.open(archive_path, "r:gz").extractall(data_path)
        print("done.")

    parser = ReutersParser()
    for filename in data_path.glob("*.sgm"):
        for doc in parser.parse(open(filename, "rb")):
            yield doc

ベクトル化器を設定し、テストセットを分離する

## ベクトル化器を作成し、機能の数を合理的な最大値に制限する
vectorizer = HashingVectorizer(decode_error="ignore", n_features=2**18, alternate_sign=False)

## 解析済みのルイト社のSGMLファイルの反復処理。
data_stream = stream_reuters_documents()

## 「acq」クラスとその他のすべてのクラスの間の2値分類を学習する。
## 「acq」はルイト社のファイルにほぼ均等に分布しているため選択された。他のデータセットの場合、正例の現実的な割合を持つテストセットを作成することに注意する必要がある。
all_classes = np.array([0, 1])
positive_class = "acq"

## ここには、`partial_fit` メソッドをサポートするいくつかの分類器がある
partial_fit_classifiers = {
    "SGD": SGDClassifier(max_iter=5),
    "Perceptron": Perceptron(),
    "NB Multinomial": MultinomialNB(alpha=0.01),
    "Passive-Aggressive": PassiveAggressiveClassifier(),
}

## テストデータの統計
test_stats = {"n_test": 0, "n_test_pos": 0}

## まず、精度を推定するためにいくつかのサンプルを分離する
n_test_documents = 1000
X_test_text, y_test = get_minibatch(data_stream, 1000)
X_test = vectorizer.transform(X_test_text)
test_stats["n_test"] += len(y_test)
test_stats["n_test_pos"] += sum(y_test)
print("Test set is %d documents (%d positive)" % (len(y_test), sum(y_test)))

サンプルのミニバッチを取得する関数を定義する

def get_minibatch(doc_iter, size, pos_class=positive_class):
    """サンプルのミニバッチを抽出し、タプルX_text, yを返す。

    注:サイズは、割り当てられていないトピックがない無効な文書を除外する前のものである。

    """
    data = [
        ("{title}\n\n{body}".format(**doc), pos_class in doc["topics"])
        for doc in itertools.islice(doc_iter, size)
        if doc["topics"]
    ]
    if not len(data):
        return np.asarray([], dtype=int), np.asarray([], dtype=int)
    X_text, y = zip(*data)
    return X_text, np.asarray(y, dtype=int)

ミニバッチを反復処理するためのジェネレータ関数を定義する

def iter_minibatches(doc_iter, minibatch_size):
    """ミニバッチのジェネレータ。"""
    X_text, y = get_minibatch(doc_iter, minibatch_size)
    while len(X_text):
        yield X_text, y
        X_text, y = get_minibatch(doc_iter, minibatch_size)

サンプルのミニバッチを反復処理し、分類器を更新する

## 分類器に1000個の文書のミニバッチを供給します。これは、いつでもメモリに最大1000個の文書があることを意味します。文書バッチが小さいほど、部分的なフィットメソッドの相対的なオーバーヘッドが大きくなります。
minibatch_size = 1000

## ルイト社のSGMLファイルを解析し、文書をストリームとして反復処理するdata_streamを作成します。
minibatch_iterators = iter_minibatches(data_stream, minibatch_size)
total_vect_time = 0.0

## メインループ:サンプルのミニバッチを反復処理する
for i, (X_train_text, y_train) in enumerate(minibatch_iterators):
    tick = time.time()
    X_train = vectorizer.transform(X_train_text)
    total_vect_time += time.time() - tick

    for cls_name, cls in partial_fit_classifiers.items():
        tick = time.time()
        ## 現在のミニバッチのサンプルで推定器を更新する
        cls.partial_fit(X_train, y_train, classes=all_classes)

        ## テスト精度の統計を蓄積する
        cls_stats[cls_name]["total_fit_time"] += time.time() - tick
        cls_stats[cls_name]["n_train"] += X_train.shape[0]
        cls_stats[cls_name]["n_train_pos"] += sum(y_train)
        tick = time.time()
        cls_stats[cls_name]["accuracy"] = cls.score(X_test, y_test)
        cls_stats[cls_name]["prediction_time"] = time.time() - tick
        acc_history = (cls_stats[cls_name]["accuracy"], cls_stats[cls_name]["n_train"])
        cls_stats[cls_name]["accuracy_history"].append(acc_history)
        run_history = (
            cls_stats[cls_name]["accuracy"],
            total_vect_time + cls_stats[cls_name]["total_fit_time"],
        )
        cls_stats[cls_name]["runtime_history"].append(run_history)

        if i % 3 == 0:
            print(progress(cls_name, cls_stats[cls_name]))
    if i % 3 == 0:
        print("\n")

結果をプロットする

## 精度の推移をプロットする
plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## サンプル数に対する精度の推移をプロットする
    accuracy, n_examples = zip(*stats["accuracy_history"])
    plot_accuracy(n_examples, accuracy, "学習サンプル数 (#)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## 実行時間に対する精度の推移をプロットする
    accuracy, runtime = zip(*stats["runtime_history"])
    plot_accuracy(runtime, accuracy, "実行時間 (s)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

## フィッティング時間をプロットする
plt.figure()
fig = plt.gcf()
cls_runtime = [stats["total_fit_time"] for cls_name, stats in sorted(cls_stats.items())]

cls_runtime.append(total_vect_time)
cls_names.append("ベクトル化")
bar_colors = ["b", "g", "r", "c", "m", "y"]

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=10)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("実行時間 (s)")
ax.set_title("学習時間")


def autolabel(rectangles):
    """長方形に自動的にテキストを付ける"""
    for rect in rectangles:
        height = rect.get_height()
        ax.text(
            rect.get_x() + rect.get_width() / 2.0,
            1.05 * height,
            "%.4f" % height,
            ha="center",
            va="bottom",
        )
        plt.setp(plt.xticks()[1], rotation=30)


autolabel(rectangles)
plt.tight_layout()
plt.show()

## 予測時間をプロットする
plt.figure()
cls_runtime = []
cls_names = list(sorted(cls_stats.keys()))
for cls_name, stats in sorted(cls_stats.items()):
    cls_runtime.append(stats["prediction_time"])
cls_runtime.append(parsing_time)
cls_names.append("読み取り/解析\n+特徴抽出")
cls_runtime.append(vectorizing_time)
cls_names.append("ハッシュ化\n+ベクトル化")

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=8)
plt.setp(plt.xticks()[1], rotation=30)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("実行時間 (s)")
ax.set_title("予測時間 (%d インスタンス)" % n_test_documents)
autolabel(rectangles)
plt.tight_layout()
plt.show()

まとめ

この実験では、コア外学習を用いたテキスト分類にscikit-learnをどのように使用するかを学びました。partial_fitメソッドをサポートするオンライン分類器を使用し、これにサンプルのバッチを供給しました。また、特徴空間が時間の経過とともに同じままになるように、HashingVectorizerを活用しました。その後、テストセットを分割し、サンプルのミニバッチを反復処理して分類器を更新しました。最後に、結果をプロットして精度の推移と学習時間を視覚化しました。