Классификация текстов с использованием out - of - core обучения

Beginner

This tutorial is from open-source community. Access the source code

Введение

Этот лаба предоставляет пример использования scikit - learn для классификации текста с использованием out - of - core обучения. Цель - научиться работать с данными, которые не помещаются в оперативную память. Для этого мы используем онлайн - классификатор, поддерживающий метод partial_fit, которому будут подаваться пакеты примеров. Чтобы обеспечить постоянство пространства признаков во времени, мы используем HashingVectorizer, который проектирует каждый пример в то же пространство признаков. Это особенно полезно в случае классификации текста, когда в каждом пакете могут появляться новые признаки (слова).

Советы по работе с ВМ

После запуска ВМ нажмите в левом верхнем углу, чтобы переключиться на вкладку Notebook и получить доступ к Jupyter Notebook для практики.

Иногда вам может потребоваться подождать несколько секунд, пока Jupyter Notebook загрузится. Валидация операций не может быть автоматизирована из - за ограничений Jupyter Notebook.

Если вы сталкиваетесь с проблемами во время обучения, не стесняйтесь обращаться к Labby. Оставьте отзыв после занятия, и мы оперативно решим проблему для вас.

Импортируем библиотеки и определяем парсер

import itertools
from pathlib import Path
from hashlib import sha256
import re
import tarfile
import time
import sys

import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams

from html.parser import HTMLParser
from urllib.request import urlretrieve
from sklearn.datasets import get_data_home
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.linear_model import Perceptron
from sklearn.naive_bayes import MultinomialNB


class ReutersParser(HTMLParser):
    """Вспомогательный класс для разбора файла в формате SGML и выдачи документов по одному."""

    def __init__(self, encoding="latin-1"):
        HTMLParser.__init__(self)
        self._reset()
        self.encoding = encoding

    def handle_starttag(self, tag, attrs):
        method = "start_" + tag
        getattr(self, method, lambda x: None)(attrs)

    def handle_endtag(self, tag):
        method = "end_" + tag
        getattr(self, method, lambda: None)()

    def _reset(self):
        self.in_title = 0
        self.in_body = 0
        self.in_topics = 0
        self.in_topic_d = 0
        self.title = ""
        self.body = ""
        self.topics = []
        self.topic_d = ""

    def parse(self, fd):
        self.docs = []
        for chunk in fd:
            self.feed(chunk.decode(self.encoding))
            for doc in self.docs:
                yield doc
            self.docs = []
        self.close()

    def handle_data(self, data):
        if self.in_body:
            self.body += data
        elif self.in_title:
            self.title += data
        elif self.in_topic_d:
            self.topic_d += data

    def start_reuters(self, attributes):
        pass

    def end_reuters(self):
        self.body = re.sub(r"\s+", r" ", self.body)
        self.docs.append(
            {"title": self.title, "body": self.body, "topics": self.topics}
        )
        self._reset()

    def start_title(self, attributes):
        self.in_title = 1

    def end_title(self):
        self.in_title = 0

    def start_body(self, attributes):
        self.in_body = 1

    def end_body(self):
        self.in_body = 0

    def start_topics(self, attributes):
        self.in_topics = 1

    def end_topics(self):
        self.in_topics = 0

    def start_d(self, attributes):
        self.in_topic_d = 1

    def end_d(self):
        self.in_topic_d = 0
        self.topics.append(self.topic_d)
        self.topic_d = ""

Определяем поток документов Reuters

def stream_reuters_documents(data_path=None):
    """Перебираем документы датасета Reuters.

    Архив Reuters будет автоматически скачан и распакован, если
    директория `data_path` не существует.

    Документы представляются в виде словарей с ключами 'body' (str),
    'title' (str), 'topics' (list(str)).

    """

    DOWNLOAD_URL = (
        "http://archive.ics.uci.edu/ml/machine-learning-databases/"
        "reuters21578-mld/reuters21578.tar.gz"
    )
    ARCHIVE_SHA256 = "3bae43c9b14e387f76a61b6d82bf98a4fb5d3ef99ef7e7075ff2ccbcf59f9d30"
    ARCHIVE_FILENAME = "reuters21578.tar.gz"

    if data_path is None:
        data_path = Path(get_data_home()) / "reuters"
    else:
        data_path = Path(data_path)
    if not data_path.exists():
        """Скачиваем датасет."""
        print("скачиваем датасет (один раз навсегда) в %s" % data_path)
        data_path.mkdir(parents=True, exist_ok=True)

        def progress(blocknum, bs, size):
            total_sz_mb = "%.2f MB" % (size / 1e6)
            current_sz_mb = "%.2f MB" % ((blocknum * bs) / 1e6)
            if _not_in_sphinx():
                sys.stdout.write("\rскачано %s / %s" % (current_sz_mb, total_sz_mb))

        archive_path = data_path / ARCHIVE_FILENAME

        urlretrieve(DOWNLOAD_URL, filename=archive_path, reporthook=progress)
        if _not_in_sphinx():
            sys.stdout.write("\r")

        ## Проверяем, что архив не был подделан:
        assert sha256(archive_path.read_bytes()).hexdigest() == ARCHIVE_SHA256

        print("распаковываем датасет Reuters...")
        tarfile.open(archive_path, "r:gz").extractall(data_path)
        print("готово.")

    parser = ReutersParser()
    for filename in data_path.glob("*.sgm"):
        for doc in parser.parse(open(filename, "rb")):
            yield doc

Настраиваем векторизатор и выделяем тестовую выборку

## Создаем векторизатор и ограничиваем количество признаков до разумного
## максимума
vectorizer = HashingVectorizer(decode_error="ignore", n_features=2**18, alternate_sign=False)

## Итератор по разобранным файлам Reuters в формате SGML.
data_stream = stream_reuters_documents()

## Мы обучаем бинарную классификацию между классом "acq" и всеми остальными.
## "acq" был выбран, так как он более - менее равномерно распределен в файлах Reuters.
## Для других датасетов нужно убедиться, что тестовая выборка содержит
## реальный процент положительных инстансов.
all_classes = np.array([0, 1])
positive_class = "acq"

## Вот некоторые классификаторы, которые поддерживают метод `partial_fit`
partial_fit_classifiers = {
    "SGD": SGDClassifier(max_iter=5),
    "Perceptron": Perceptron(),
    "NB Multinomial": MultinomialNB(alpha=0.01),
    "Passive - Aggressive": PassiveAggressiveClassifier(),
}

## Статистика по тестовым данным
test_stats = {"n_test": 0, "n_test_pos": 0}

## Во - первых, мы выделяем некоторое количество примеров для оценки точности
n_test_documents = 1000
X_test_text, y_test = get_minibatch(data_stream, 1000)
X_test = vectorizer.transform(X_test_text)
test_stats["n_test"] += len(y_test)
test_stats["n_test_pos"] += sum(y_test)
print("Тестовая выборка состоит из %d документов (%d положительных)" % (len(y_test), sum(y_test)))

Определяем функцию для получения мини - пакета примеров

def get_minibatch(doc_iter, size, pos_class=positive_class):
    """Извлекает мини - пакет примеров, возвращает кортеж X_text, y.

    Примечание: размер указывается до исключения недействительных документов без назначенных тем.

    """
    data = [
        ("{title}\n\n{body}".format(**doc), pos_class in doc["topics"])
        for doc in itertools.islice(doc_iter, size)
        if doc["topics"]
    ]
    if not len(data):
        return np.asarray([], dtype=int), np.asarray([], dtype=int)
    X_text, y = zip(*data)
    return X_text, np.asarray(y, dtype=int)

Определяем генераторную функцию для перебора мини - пакетов

def iter_minibatches(doc_iter, minibatch_size):
    """Генератор мини - пакетов."""
    X_text, y = get_minibatch(doc_iter, minibatch_size)
    while len(X_text):
        yield X_text, y
        X_text, y = get_minibatch(doc_iter, minibatch_size)

Перебираем мини - пакеты примеров и обновляем классификаторы

## Мы будем подавать классификатору мини - пакеты по 1000 документам; это означает,
## что в памяти будет в любом случае не более 1000 документов. Чем меньше размер
## пакета документов, тем больше относительный накладной расход методов partial fit.
minibatch_size = 1000

## Создаем data_stream, который разбирает файлы Reuters в формате SGML и
## итерируется по документам как по потоку.
minibatch_iterators = iter_minibatches(data_stream, minibatch_size)
total_vect_time = 0.0

## Основной цикл: итерируемся по мини - пакетам примеров
for i, (X_train_text, y_train) in enumerate(minibatch_iterators):
    tick = time.time()
    X_train = vectorizer.transform(X_train_text)
    total_vect_time += time.time() - tick

    for cls_name, cls in partial_fit_classifiers.items():
        tick = time.time()
        ## обновляем оценщик примерами из текущего мини - пакета
        cls.partial_fit(X_train, y_train, classes=all_classes)

        ## накапливаем статистику по точности теста
        cls_stats[cls_name]["total_fit_time"] += time.time() - tick
        cls_stats[cls_name]["n_train"] += X_train.shape[0]
        cls_stats[cls_name]["n_train_pos"] += sum(y_train)
        tick = time.time()
        cls_stats[cls_name]["accuracy"] = cls.score(X_test, y_test)
        cls_stats[cls_name]["prediction_time"] = time.time() - tick
        acc_history = (cls_stats[cls_name]["accuracy"], cls_stats[cls_name]["n_train"])
        cls_stats[cls_name]["accuracy_history"].append(acc_history)
        run_history = (
            cls_stats[cls_name]["accuracy"],
            total_vect_time + cls_stats[cls_name]["total_fit_time"],
        )
        cls_stats[cls_name]["runtime_history"].append(run_history)

        if i % 3 == 0:
            print(progress(cls_name, cls_stats[cls_name]))
    if i % 3 == 0:
        print("\n")

Построение графиков результатов

## Построение графика изменения точности
plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## Построение графика изменения точности в зависимости от количества примеров
    accuracy, n_examples = zip(*stats["accuracy_history"])
    plot_accuracy(n_examples, accuracy, "тренировочные примеры (#)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## Построение графика изменения точности в зависимости от времени выполнения
    accuracy, runtime = zip(*stats["runtime_history"])
    plot_accuracy(runtime, accuracy, "время выполнения (с)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

## Построение графиков времени обучения
plt.figure()
fig = plt.gcf()
cls_runtime = [stats["total_fit_time"] for cls_name, stats in sorted(cls_stats.items())]

cls_runtime.append(total_vect_time)
cls_names.append("Векторизация")
bar_colors = ["b", "g", "r", "c", "m", "y"]

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=10)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("время выполнения (с)")
ax.set_title("Время обучения")


def autolabel(rectangles):
    """прикрепляет некоторый текст к прямоугольникам с помощью autolabel."""
    for rect in rectangles:
        height = rect.get_height()
        ax.text(
            rect.get_x() + rect.get_width() / 2.0,
            1.05 * height,
            "%.4f" % height,
            ha="center",
            va="bottom",
        )
        plt.setp(plt.xticks()[1], rotation=30)


autolabel(rectangles)
plt.tight_layout()
plt.show()

## Построение графиков времени предсказания
plt.figure()
cls_runtime = []
cls_names = list(sorted(cls_stats.keys()))
for cls_name, stats in sorted(cls_stats.items()):
    cls_runtime.append(stats["prediction_time"])
cls_runtime.append(parsing_time)
cls_names.append("Чтение/Разбор\n+Извлечение признаков")
cls_runtime.append(vectorizing_time)
cls_names.append("Хеширование\n+Векторизация")

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=8)
plt.setp(plt.xticks()[1], rotation=30)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("время выполнения (с)")
ax.set_title("Время предсказания (%d инстансов)" % n_test_documents)
autolabel(rectangles)
plt.tight_layout()
plt.show()

Резюме

В этом практическом занятии мы научились использовать scikit - learn для классификации текстов с использованием out - of - core обучения. Мы использовали онлайн - классификатор, поддерживающий метод partial_fit, которому подавались пакеты примеров. Также мы воспользовались HashingVectorizer, чтобы обеспечить неизменность пространства признаков во времени. Затем мы выделили тестовую выборку и итерировались по мини - пакетам примеров для обновления классификаторов. Наконец, мы построили графики результатов, чтобы визуализировать изменение точности и время обучения.