Введение
Этот лаба предоставляет пример использования scikit - learn для классификации текста с использованием out - of - core обучения. Цель - научиться работать с данными, которые не помещаются в оперативную память. Для этого мы используем онлайн - классификатор, поддерживающий метод partial_fit, которому будут подаваться пакеты примеров. Чтобы обеспечить постоянство пространства признаков во времени, мы используем HashingVectorizer, который проектирует каждый пример в то же пространство признаков. Это особенно полезно в случае классификации текста, когда в каждом пакете могут появляться новые признаки (слова).
Советы по работе с ВМ
После запуска ВМ нажмите в левом верхнем углу, чтобы переключиться на вкладку Notebook и получить доступ к Jupyter Notebook для практики.
Иногда вам может потребоваться подождать несколько секунд, пока Jupyter Notebook загрузится. Валидация операций не может быть автоматизирована из - за ограничений Jupyter Notebook.
Если вы сталкиваетесь с проблемами во время обучения, не стесняйтесь обращаться к Labby. Оставьте отзыв после занятия, и мы оперативно решим проблему для вас.
Импортируем библиотеки и определяем парсер
import itertools
from pathlib import Path
from hashlib import sha256
import re
import tarfile
import time
import sys
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
from html.parser import HTMLParser
from urllib.request import urlretrieve
from sklearn.datasets import get_data_home
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.linear_model import Perceptron
from sklearn.naive_bayes import MultinomialNB
class ReutersParser(HTMLParser):
"""Вспомогательный класс для разбора файла в формате SGML и выдачи документов по одному."""
def __init__(self, encoding="latin-1"):
HTMLParser.__init__(self)
self._reset()
self.encoding = encoding
def handle_starttag(self, tag, attrs):
method = "start_" + tag
getattr(self, method, lambda x: None)(attrs)
def handle_endtag(self, tag):
method = "end_" + tag
getattr(self, method, lambda: None)()
def _reset(self):
self.in_title = 0
self.in_body = 0
self.in_topics = 0
self.in_topic_d = 0
self.title = ""
self.body = ""
self.topics = []
self.topic_d = ""
def parse(self, fd):
self.docs = []
for chunk in fd:
self.feed(chunk.decode(self.encoding))
for doc in self.docs:
yield doc
self.docs = []
self.close()
def handle_data(self, data):
if self.in_body:
self.body += data
elif self.in_title:
self.title += data
elif self.in_topic_d:
self.topic_d += data
def start_reuters(self, attributes):
pass
def end_reuters(self):
self.body = re.sub(r"\s+", r" ", self.body)
self.docs.append(
{"title": self.title, "body": self.body, "topics": self.topics}
)
self._reset()
def start_title(self, attributes):
self.in_title = 1
def end_title(self):
self.in_title = 0
def start_body(self, attributes):
self.in_body = 1
def end_body(self):
self.in_body = 0
def start_topics(self, attributes):
self.in_topics = 1
def end_topics(self):
self.in_topics = 0
def start_d(self, attributes):
self.in_topic_d = 1
def end_d(self):
self.in_topic_d = 0
self.topics.append(self.topic_d)
self.topic_d = ""
Определяем поток документов Reuters
def stream_reuters_documents(data_path=None):
"""Перебираем документы датасета Reuters.
Архив Reuters будет автоматически скачан и распакован, если
директория `data_path` не существует.
Документы представляются в виде словарей с ключами 'body' (str),
'title' (str), 'topics' (list(str)).
"""
DOWNLOAD_URL = (
"http://archive.ics.uci.edu/ml/machine-learning-databases/"
"reuters21578-mld/reuters21578.tar.gz"
)
ARCHIVE_SHA256 = "3bae43c9b14e387f76a61b6d82bf98a4fb5d3ef99ef7e7075ff2ccbcf59f9d30"
ARCHIVE_FILENAME = "reuters21578.tar.gz"
if data_path is None:
data_path = Path(get_data_home()) / "reuters"
else:
data_path = Path(data_path)
if not data_path.exists():
"""Скачиваем датасет."""
print("скачиваем датасет (один раз навсегда) в %s" % data_path)
data_path.mkdir(parents=True, exist_ok=True)
def progress(blocknum, bs, size):
total_sz_mb = "%.2f MB" % (size / 1e6)
current_sz_mb = "%.2f MB" % ((blocknum * bs) / 1e6)
if _not_in_sphinx():
sys.stdout.write("\rскачано %s / %s" % (current_sz_mb, total_sz_mb))
archive_path = data_path / ARCHIVE_FILENAME
urlretrieve(DOWNLOAD_URL, filename=archive_path, reporthook=progress)
if _not_in_sphinx():
sys.stdout.write("\r")
## Проверяем, что архив не был подделан:
assert sha256(archive_path.read_bytes()).hexdigest() == ARCHIVE_SHA256
print("распаковываем датасет Reuters...")
tarfile.open(archive_path, "r:gz").extractall(data_path)
print("готово.")
parser = ReutersParser()
for filename in data_path.glob("*.sgm"):
for doc in parser.parse(open(filename, "rb")):
yield doc
Настраиваем векторизатор и выделяем тестовую выборку
## Создаем векторизатор и ограничиваем количество признаков до разумного
## максимума
vectorizer = HashingVectorizer(decode_error="ignore", n_features=2**18, alternate_sign=False)
## Итератор по разобранным файлам Reuters в формате SGML.
data_stream = stream_reuters_documents()
## Мы обучаем бинарную классификацию между классом "acq" и всеми остальными.
## "acq" был выбран, так как он более - менее равномерно распределен в файлах Reuters.
## Для других датасетов нужно убедиться, что тестовая выборка содержит
## реальный процент положительных инстансов.
all_classes = np.array([0, 1])
positive_class = "acq"
## Вот некоторые классификаторы, которые поддерживают метод `partial_fit`
partial_fit_classifiers = {
"SGD": SGDClassifier(max_iter=5),
"Perceptron": Perceptron(),
"NB Multinomial": MultinomialNB(alpha=0.01),
"Passive - Aggressive": PassiveAggressiveClassifier(),
}
## Статистика по тестовым данным
test_stats = {"n_test": 0, "n_test_pos": 0}
## Во - первых, мы выделяем некоторое количество примеров для оценки точности
n_test_documents = 1000
X_test_text, y_test = get_minibatch(data_stream, 1000)
X_test = vectorizer.transform(X_test_text)
test_stats["n_test"] += len(y_test)
test_stats["n_test_pos"] += sum(y_test)
print("Тестовая выборка состоит из %d документов (%d положительных)" % (len(y_test), sum(y_test)))
Определяем функцию для получения мини - пакета примеров
def get_minibatch(doc_iter, size, pos_class=positive_class):
"""Извлекает мини - пакет примеров, возвращает кортеж X_text, y.
Примечание: размер указывается до исключения недействительных документов без назначенных тем.
"""
data = [
("{title}\n\n{body}".format(**doc), pos_class in doc["topics"])
for doc in itertools.islice(doc_iter, size)
if doc["topics"]
]
if not len(data):
return np.asarray([], dtype=int), np.asarray([], dtype=int)
X_text, y = zip(*data)
return X_text, np.asarray(y, dtype=int)
Определяем генераторную функцию для перебора мини - пакетов
def iter_minibatches(doc_iter, minibatch_size):
"""Генератор мини - пакетов."""
X_text, y = get_minibatch(doc_iter, minibatch_size)
while len(X_text):
yield X_text, y
X_text, y = get_minibatch(doc_iter, minibatch_size)
Перебираем мини - пакеты примеров и обновляем классификаторы
## Мы будем подавать классификатору мини - пакеты по 1000 документам; это означает,
## что в памяти будет в любом случае не более 1000 документов. Чем меньше размер
## пакета документов, тем больше относительный накладной расход методов partial fit.
minibatch_size = 1000
## Создаем data_stream, который разбирает файлы Reuters в формате SGML и
## итерируется по документам как по потоку.
minibatch_iterators = iter_minibatches(data_stream, minibatch_size)
total_vect_time = 0.0
## Основной цикл: итерируемся по мини - пакетам примеров
for i, (X_train_text, y_train) in enumerate(minibatch_iterators):
tick = time.time()
X_train = vectorizer.transform(X_train_text)
total_vect_time += time.time() - tick
for cls_name, cls in partial_fit_classifiers.items():
tick = time.time()
## обновляем оценщик примерами из текущего мини - пакета
cls.partial_fit(X_train, y_train, classes=all_classes)
## накапливаем статистику по точности теста
cls_stats[cls_name]["total_fit_time"] += time.time() - tick
cls_stats[cls_name]["n_train"] += X_train.shape[0]
cls_stats[cls_name]["n_train_pos"] += sum(y_train)
tick = time.time()
cls_stats[cls_name]["accuracy"] = cls.score(X_test, y_test)
cls_stats[cls_name]["prediction_time"] = time.time() - tick
acc_history = (cls_stats[cls_name]["accuracy"], cls_stats[cls_name]["n_train"])
cls_stats[cls_name]["accuracy_history"].append(acc_history)
run_history = (
cls_stats[cls_name]["accuracy"],
total_vect_time + cls_stats[cls_name]["total_fit_time"],
)
cls_stats[cls_name]["runtime_history"].append(run_history)
if i % 3 == 0:
print(progress(cls_name, cls_stats[cls_name]))
if i % 3 == 0:
print("\n")
Построение графиков результатов
## Построение графика изменения точности
plt.figure()
for _, stats in sorted(cls_stats.items()):
## Построение графика изменения точности в зависимости от количества примеров
accuracy, n_examples = zip(*stats["accuracy_history"])
plot_accuracy(n_examples, accuracy, "тренировочные примеры (#)")
ax = plt.gca()
ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")
plt.figure()
for _, stats in sorted(cls_stats.items()):
## Построение графика изменения точности в зависимости от времени выполнения
accuracy, runtime = zip(*stats["runtime_history"])
plot_accuracy(runtime, accuracy, "время выполнения (с)")
ax = plt.gca()
ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")
## Построение графиков времени обучения
plt.figure()
fig = plt.gcf()
cls_runtime = [stats["total_fit_time"] for cls_name, stats in sorted(cls_stats.items())]
cls_runtime.append(total_vect_time)
cls_names.append("Векторизация")
bar_colors = ["b", "g", "r", "c", "m", "y"]
ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)
ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=10)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("время выполнения (с)")
ax.set_title("Время обучения")
def autolabel(rectangles):
"""прикрепляет некоторый текст к прямоугольникам с помощью autolabel."""
for rect in rectangles:
height = rect.get_height()
ax.text(
rect.get_x() + rect.get_width() / 2.0,
1.05 * height,
"%.4f" % height,
ha="center",
va="bottom",
)
plt.setp(plt.xticks()[1], rotation=30)
autolabel(rectangles)
plt.tight_layout()
plt.show()
## Построение графиков времени предсказания
plt.figure()
cls_runtime = []
cls_names = list(sorted(cls_stats.keys()))
for cls_name, stats in sorted(cls_stats.items()):
cls_runtime.append(stats["prediction_time"])
cls_runtime.append(parsing_time)
cls_names.append("Чтение/Разбор\n+Извлечение признаков")
cls_runtime.append(vectorizing_time)
cls_names.append("Хеширование\n+Векторизация")
ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)
ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=8)
plt.setp(plt.xticks()[1], rotation=30)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("время выполнения (с)")
ax.set_title("Время предсказания (%d инстансов)" % n_test_documents)
autolabel(rectangles)
plt.tight_layout()
plt.show()
Резюме
В этом практическом занятии мы научились использовать scikit - learn для классификации текстов с использованием out - of - core обучения. Мы использовали онлайн - классификатор, поддерживающий метод partial_fit, которому подавались пакеты примеров. Также мы воспользовались HashingVectorizer, чтобы обеспечить неизменность пространства признаков во времени. Затем мы выделили тестовую выборку и итерировались по мини - пакетам примеров для обновления классификаторов. Наконец, мы построили графики результатов, чтобы визуализировать изменение точности и время обучения.