はじめに
この実験では、コア外学習を使って scikit - learn をテキスト分類にどのように使うかを示す例を提供します。目的は、メインメモリに収まらないデータから学習することです。これを達成するために、partial_fit メソッドをサポートするオンライン分類器を利用し、これにサンプルのバッチを供給します。特徴空間が時間の経過とともに同じままであることを確認するために、各サンプルを同じ特徴空間に射影する HashingVectorizer を利用します。これは、各バッチに新しい特徴(単語)が現れる可能性のあるテキスト分類の場合に特に役立ちます。
VM のヒント
VM の起動が完了した後、左上隅をクリックしてノートブックタブに切り替え、Jupyter Notebook を使って練習します。
時々、Jupyter Notebook が読み込み終了するまで数秒待つ必要があります。Jupyter Notebook の制限により、操作の検証を自動化することはできません。
学習中に問題に遭遇した場合は、Labby にお問い合わせください。セッション後にフィードバックを提供してください。そうすれば、迅速に問題を解決いたします。
ライブラリのインポートとパーサの定義
import itertools
from pathlib import Path
from hashlib import sha256
import re
import tarfile
import time
import sys
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
from html.parser import HTMLParser
from urllib.request import urlretrieve
from sklearn.datasets import get_data_home
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.linear_model import Perceptron
from sklearn.naive_bayes import MultinomialNB
class ReutersParser(HTMLParser):
"""SGML ファイルを解析し、1 回に 1 つの文書を生成するためのユーティリティクラス。"""
def __init__(self, encoding="latin-1"):
HTMLParser.__init__(self)
self._reset()
self.encoding = encoding
def handle_starttag(self, tag, attrs):
method = "start_" + tag
getattr(self, method, lambda x: None)(attrs)
def handle_endtag(self, tag):
method = "end_" + tag
getattr(self, method, lambda: None)()
def _reset(self):
self.in_title = 0
self.in_body = 0
self.in_topics = 0
self.in_topic_d = 0
self.title = ""
self.body = ""
self.topics = []
self.topic_d = ""
def parse(self, fd):
self.docs = []
for chunk in fd:
self.feed(chunk.decode(self.encoding))
for doc in self.docs:
yield doc
self.docs = []
self.close()
def handle_data(self, data):
if self.in_body:
self.body += data
elif self.in_title:
self.title += data
elif self.in_topic_d:
self.topic_d += data
def start_reuters(self, attributes):
pass
def end_reuters(self):
self.body = re.sub(r"\s+", r" ", self.body)
self.docs.append(
{"title": self.title, "body": self.body, "topics": self.topics}
)
self._reset()
def start_title(self, attributes):
self.in_title = 1
def end_title(self):
self.in_title = 0
def start_body(self, attributes):
self.in_body = 1
def end_body(self):
self.in_body = 0
def start_topics(self, attributes):
self.in_topics = 1
def end_topics(self):
self.in_topics = 0
def start_d(self, attributes):
self.in_topic_d = 1
def end_d(self):
self.in_topic_d = 0
self.topics.append(self.topic_d)
self.topic_d = ""
ルイト社の文書のストリームを定義する
def stream_reuters_documents(data_path=None):
"""ルイト社のデータセットの文書を反復処理する。
`data_path` ディレクトリが存在しない場合、ルイト社のアーカイブは自動的にダウンロードされて解凍されます。
文書は、'body'(str)、'title'(str)、'topics'(list(str))のキーを持つ辞書として表されます。
"""
DOWNLOAD_URL = (
"http://archive.ics.uci.edu/ml/machine-learning-databases/"
"reuters21578-mld/reuters21578.tar.gz"
)
ARCHIVE_SHA256 = "3bae43c9b14e387f76a61b6d82bf98a4fb5d3ef99ef7e7075ff2ccbcf59f9d30"
ARCHIVE_FILENAME = "reuters21578.tar.gz"
if data_path is None:
data_path = Path(get_data_home()) / "reuters"
else:
data_path = Path(data_path)
if not data_path.exists():
"""データセットをダウンロードする。"""
print("downloading dataset (once and for all) into %s" % data_path)
data_path.mkdir(parents=True, exist_ok=True)
def progress(blocknum, bs, size):
total_sz_mb = "%.2f MB" % (size / 1e6)
current_sz_mb = "%.2f MB" % ((blocknum * bs) / 1e6)
if _not_in_sphinx():
sys.stdout.write("\rdownloaded %s / %s" % (current_sz_mb, total_sz_mb))
archive_path = data_path / ARCHIVE_FILENAME
urlretrieve(DOWNLOAD_URL, filename=archive_path, reporthook=progress)
if _not_in_sphinx():
sys.stdout.write("\r")
## アーカイブが改竄されていないことを確認する:
assert sha256(archive_path.read_bytes()).hexdigest() == ARCHIVE_SHA256
print("untarring Reuters dataset...")
tarfile.open(archive_path, "r:gz").extractall(data_path)
print("done.")
parser = ReutersParser()
for filename in data_path.glob("*.sgm"):
for doc in parser.parse(open(filename, "rb")):
yield doc
ベクトル化器を設定し、テストセットを分離する
## ベクトル化器を作成し、機能の数を合理的な最大値に制限する
vectorizer = HashingVectorizer(decode_error="ignore", n_features=2**18, alternate_sign=False)
## 解析済みのルイト社の SGML ファイルの反復処理。
data_stream = stream_reuters_documents()
## 「acq」クラスとその他のすべてのクラスの間の 2 値分類を学習する。
## 「acq」はルイト社のファイルにほぼ均等に分布しているため選択された。他のデータセットの場合、正例の現実的な割合を持つテストセットを作成することに注意する必要がある。
all_classes = np.array([0, 1])
positive_class = "acq"
## ここには、`partial_fit` メソッドをサポートするいくつかの分類器がある
partial_fit_classifiers = {
"SGD": SGDClassifier(max_iter=5),
"Perceptron": Perceptron(),
"NB Multinomial": MultinomialNB(alpha=0.01),
"Passive-Aggressive": PassiveAggressiveClassifier(),
}
## テストデータの統計
test_stats = {"n_test": 0, "n_test_pos": 0}
## まず、精度を推定するためにいくつかのサンプルを分離する
n_test_documents = 1000
X_test_text, y_test = get_minibatch(data_stream, 1000)
X_test = vectorizer.transform(X_test_text)
test_stats["n_test"] += len(y_test)
test_stats["n_test_pos"] += sum(y_test)
print("Test set is %d documents (%d positive)" % (len(y_test), sum(y_test)))
サンプルのミニバッチを取得する関数を定義する
def get_minibatch(doc_iter, size, pos_class=positive_class):
"""サンプルのミニバッチを抽出し、タプル X_text, y を返す。
注:サイズは、割り当てられていないトピックがない無効な文書を除外する前のものである。
"""
data = [
("{title}\n\n{body}".format(**doc), pos_class in doc["topics"])
for doc in itertools.islice(doc_iter, size)
if doc["topics"]
]
if not len(data):
return np.asarray([], dtype=int), np.asarray([], dtype=int)
X_text, y = zip(*data)
return X_text, np.asarray(y, dtype=int)
ミニバッチを反復処理するためのジェネレータ関数を定義する
def iter_minibatches(doc_iter, minibatch_size):
"""ミニバッチのジェネレータ。"""
X_text, y = get_minibatch(doc_iter, minibatch_size)
while len(X_text):
yield X_text, y
X_text, y = get_minibatch(doc_iter, minibatch_size)
サンプルのミニバッチを反復処理し、分類器を更新する
## 分類器に 1000 個の文書のミニバッチを供給します。これは、いつでもメモリに最大 1000 個の文書があることを意味します。文書バッチが小さいほど、部分的なフィットメソッドの相対的なオーバーヘッドが大きくなります。
minibatch_size = 1000
## ルイト社の SGML ファイルを解析し、文書をストリームとして反復処理する data_stream を作成します。
minibatch_iterators = iter_minibatches(data_stream, minibatch_size)
total_vect_time = 0.0
## メインループ:サンプルのミニバッチを反復処理する
for i, (X_train_text, y_train) in enumerate(minibatch_iterators):
tick = time.time()
X_train = vectorizer.transform(X_train_text)
total_vect_time += time.time() - tick
for cls_name, cls in partial_fit_classifiers.items():
tick = time.time()
## 現在のミニバッチのサンプルで推定器を更新する
cls.partial_fit(X_train, y_train, classes=all_classes)
## テスト精度の統計を蓄積する
cls_stats[cls_name]["total_fit_time"] += time.time() - tick
cls_stats[cls_name]["n_train"] += X_train.shape[0]
cls_stats[cls_name]["n_train_pos"] += sum(y_train)
tick = time.time()
cls_stats[cls_name]["accuracy"] = cls.score(X_test, y_test)
cls_stats[cls_name]["prediction_time"] = time.time() - tick
acc_history = (cls_stats[cls_name]["accuracy"], cls_stats[cls_name]["n_train"])
cls_stats[cls_name]["accuracy_history"].append(acc_history)
run_history = (
cls_stats[cls_name]["accuracy"],
total_vect_time + cls_stats[cls_name]["total_fit_time"],
)
cls_stats[cls_name]["runtime_history"].append(run_history)
if i % 3 == 0:
print(progress(cls_name, cls_stats[cls_name]))
if i % 3 == 0:
print("\n")
結果をプロットする
## 精度の推移をプロットする
plt.figure()
for _, stats in sorted(cls_stats.items()):
## サンプル数に対する精度の推移をプロットする
accuracy, n_examples = zip(*stats["accuracy_history"])
plot_accuracy(n_examples, accuracy, "学習サンプル数 (#)")
ax = plt.gca()
ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")
plt.figure()
for _, stats in sorted(cls_stats.items()):
## 実行時間に対する精度の推移をプロットする
accuracy, runtime = zip(*stats["runtime_history"])
plot_accuracy(runtime, accuracy, "実行時間 (s)")
ax = plt.gca()
ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")
## フィッティング時間をプロットする
plt.figure()
fig = plt.gcf()
cls_runtime = [stats["total_fit_time"] for cls_name, stats in sorted(cls_stats.items())]
cls_runtime.append(total_vect_time)
cls_names.append("ベクトル化")
bar_colors = ["b", "g", "r", "c", "m", "y"]
ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)
ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=10)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("実行時間 (s)")
ax.set_title("学習時間")
def autolabel(rectangles):
"""長方形に自動的にテキストを付ける"""
for rect in rectangles:
height = rect.get_height()
ax.text(
rect.get_x() + rect.get_width() / 2.0,
1.05 * height,
"%.4f" % height,
ha="center",
va="bottom",
)
plt.setp(plt.xticks()[1], rotation=30)
autolabel(rectangles)
plt.tight_layout()
plt.show()
## 予測時間をプロットする
plt.figure()
cls_runtime = []
cls_names = list(sorted(cls_stats.keys()))
for cls_name, stats in sorted(cls_stats.items()):
cls_runtime.append(stats["prediction_time"])
cls_runtime.append(parsing_time)
cls_names.append("読み取り/解析\n+ 特徴抽出")
cls_runtime.append(vectorizing_time)
cls_names.append("ハッシュ化\n+ ベクトル化")
ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)
ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=8)
plt.setp(plt.xticks()[1], rotation=30)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("実行時間 (s)")
ax.set_title("予測時間 (%d インスタンス)" % n_test_documents)
autolabel(rectangles)
plt.tight_layout()
plt.show()
まとめ
この実験では、コア外学習を用いたテキスト分類に scikit-learn をどのように使用するかを学びました。partial_fit メソッドをサポートするオンライン分類器を使用し、これにサンプルのバッチを供給しました。また、特徴空間が時間の経過とともに同じままになるように、HashingVectorizer を活用しました。その後、テストセットを分割し、サンプルのミニバッチを反復処理して分類器を更新しました。最後に、結果をプロットして精度の推移と学習時間を視覚化しました。