Textklassifizierung mit out-of-core Learning

Beginner

This tutorial is from open-source community. Access the source code

Einführung

Dieses Labor bietet ein Beispiel dafür, wie man scikit-learn für die Textklassifikation mit out-of-core-Lernen verwendet. Ziel ist es, aus Daten zu lernen, die nicht in den Hauptspeicher passen. Dazu verwenden wir einen Online-Klassifizierer, der die partial_fit-Methode unterstützt und mit Batches von Beispielen gefüttert wird. Um sicherzustellen, dass der Merkmalsraum über die Zeit gleich bleibt, nutzen wir einen HashingVectorizer, der jedes Beispiel in den gleichen Merkmalsraum projiziert. Dies ist besonders nützlich bei der Textklassifikation, wenn in jedem Batch neue Merkmale (Wörter) auftauchen können.

Tipps für die VM

Nachdem der Start der VM abgeschlossen ist, klicken Sie in der oberen linken Ecke, um zur Registerkarte Notebook zu wechseln und Jupyter Notebook für die Übung zu nutzen.

Manchmal müssen Sie einige Sekunden warten, bis Jupyter Notebook vollständig geladen ist. Die Validierung von Vorgängen kann aufgrund von Einschränkungen in Jupyter Notebook nicht automatisiert werden.

Wenn Sie bei der Lernphase Probleme haben, können Sie Labby gerne fragen. Geben Sie nach der Sitzung Feedback, und wir werden das Problem für Sie prompt beheben.

Bibliotheken importieren und den Parser definieren

import itertools
from pathlib import Path
from hashlib import sha256
import re
import tarfile
import time
import sys

import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams

from html.parser import HTMLParser
from urllib.request import urlretrieve
from sklearn.datasets import get_data_home
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.linear_model import Perceptron
from sklearn.naive_bayes import MultinomialNB


class ReutersParser(HTMLParser):
    """Hilfsklasse, um eine SGML-Datei zu analysieren und Dokumente nacheinander zu liefern."""

    def __init__(self, encoding="latin-1"):
        HTMLParser.__init__(self)
        self._reset()
        self.encoding = encoding

    def handle_starttag(self, tag, attrs):
        method = "start_" + tag
        getattr(self, method, lambda x: None)(attrs)

    def handle_endtag(self, tag):
        method = "end_" + tag
        getattr(self, method, lambda: None)()

    def _reset(self):
        self.in_title = 0
        self.in_body = 0
        self.in_topics = 0
        self.in_topic_d = 0
        self.title = ""
        self.body = ""
        self.topics = []
        self.topic_d = ""

    def parse(self, fd):
        self.docs = []
        for chunk in fd:
            self.feed(chunk.decode(self.encoding))
            for doc in self.docs:
                yield doc
            self.docs = []
        self.close()

    def handle_data(self, data):
        if self.in_body:
            self.body += data
        elif self.in_title:
            self.title += data
        elif self.in_topic_d:
            self.topic_d += data

    def start_reuters(self, attributes):
        pass

    def end_reuters(self):
        self.body = re.sub(r"\s+", r" ", self.body)
        self.docs.append(
            {"title": self.title, "body": self.body, "topics": self.topics}
        )
        self._reset()

    def start_title(self, attributes):
        self.in_title = 1

    def end_title(self):
        self.in_title = 0

    def start_body(self, attributes):
        self.in_body = 1

    def end_body(self):
        self.in_body = 0

    def start_topics(self, attributes):
        self.in_topics = 1

    def end_topics(self):
        self.in_topics = 0

    def start_d(self, attributes):
        self.in_topic_d = 1

    def end_d(self):
        self.in_topic_d = 0
        self.topics.append(self.topic_d)
        self.topic_d = ""

Definieren des Streams von Reuters-Dokumenten

def stream_reuters_documents(data_path=None):
    """Iteriere über die Dokumente des Reuters-Datasets.

    Der Reuters-Archiv wird automatisch heruntergeladen und entpackt, wenn
    das Verzeichnis `data_path` nicht existiert.

    Dokumente werden als Dictionaries mit den Schlüsseln 'body' (str),
    'title' (str), 'topics' (list(str)) repräsentiert.

    """

    DOWNLOAD_URL = (
        "http://archive.ics.uci.edu/ml/machine-learning-databases/"
        "reuters21578-mld/reuters21578.tar.gz"
    )
    ARCHIVE_SHA256 = "3bae43c9b14e387f76a61b6d82bf98a4fb5d3ef99ef7e7075ff2ccbcf59f9d30"
    ARCHIVE_FILENAME = "reuters21578.tar.gz"

    if data_path is None:
        data_path = Path(get_data_home()) / "reuters"
    else:
        data_path = Path(data_path)
    if not data_path.exists():
        """Lade das Dataset herunter."""
        print("lade Dataset (einmal für alle) in %s" % data_path)
        data_path.mkdir(parents=True, exist_ok=True)

        def progress(blocknum, bs, size):
            total_sz_mb = "%.2f MB" % (size / 1e6)
            current_sz_mb = "%.2f MB" % ((blocknum * bs) / 1e6)
            if _not_in_sphinx():
                sys.stdout.write("\rdownloaded %s / %s" % (current_sz_mb, total_sz_mb))

        archive_path = data_path / ARCHIVE_FILENAME

        urlretrieve(DOWNLOAD_URL, filename=archive_path, reporthook=progress)
        if _not_in_sphinx():
            sys.stdout.write("\r")

        ## Überprüfe, dass das Archiv nicht manipuliert wurde:
        assert sha256(archive_path.read_bytes()).hexdigest() == ARCHIVE_SHA256

        print("entpacke Reuters Dataset...")
        tarfile.open(archive_path, "r:gz").extractall(data_path)
        print("fertig.")

    parser = ReutersParser()
    for filename in data_path.glob("*.sgm"):
        for doc in parser.parse(open(filename, "rb")):
            yield doc

Einrichten des Vektorisierers und Zurückhalten eines Testsets

## Erzeuge den Vektorisierer und begrenze die Anzahl der Merkmale auf eine vernünftige
## Maximalzahl
vectorizer = HashingVectorizer(decode_error="ignore", n_features=2**18, alternate_sign=False)

## Iterator über die analysierten Reuters-SGML-Dateien.
data_stream = stream_reuters_documents()

## Wir lernen eine binäre Klassifikation zwischen der Klasse "acq" und allen anderen.
## "acq" wurde gewählt, da sie in den Reuters-Dateien mehr oder weniger gleichmäßig verteilt ist.
## Für andere Datensätze sollte man sich darum bemühen, ein Testset mit einem realistischen Anteil
## positiver Instanzen zu erstellen.
all_classes = np.array([0, 1])
positive_class = "acq"

## Hier sind einige Klassifizierer, die die `partial_fit`-Methode unterstützen
partial_fit_classifiers = {
    "SGD": SGDClassifier(max_iter=5),
    "Perceptron": Perceptron(),
    "NB Multinomial": MultinomialNB(alpha=0.01),
    "Passive-Aggressive": PassiveAggressiveClassifier(),
}

## Testdatenstatistiken
test_stats = {"n_test": 0, "n_test_pos": 0}

## Zunächst halten wir eine Anzahl von Beispielen zurück, um die Genauigkeit zu schätzen
n_test_documents = 1000
X_test_text, y_test = get_minibatch(data_stream, 1000)
X_test = vectorizer.transform(X_test_text)
test_stats["n_test"] += len(y_test)
test_stats["n_test_pos"] += sum(y_test)
print("Testset ist %d Dokumente (%d positiv)" % (len(y_test), sum(y_test)))

Definiere eine Funktion, um einen Minibatch von Beispielen zu erhalten

def get_minibatch(doc_iter, size, pos_class=positive_class):
    """Extrahiere einen Minibatch von Beispielen, gib ein Tupel X_text, y zurück.

    Hinweis: size ist vor dem Ausschließen von ungültigen Dokumenten ohne zugewiesene Themen.

    """
    data = [
        ("{title}\n\n{body}".format(**doc), pos_class in doc["topics"])
        for doc in itertools.islice(doc_iter, size)
        if doc["topics"]
    ]
    if not len(data):
        return np.asarray([], dtype=int), np.asarray([], dtype=int)
    X_text, y = zip(*data)
    return X_text, np.asarray(y, dtype=int)

Definiere eine Generatorfunktion, um über Minibatches zu iterieren

def iter_minibatches(doc_iter, minibatch_size):
    """Generator von Minibatches."""
    X_text, y = get_minibatch(doc_iter, minibatch_size)
    while len(X_text):
        yield X_text, y
        X_text, y = get_minibatch(doc_iter, minibatch_size)

Iteriere über Minibatches von Beispielen und aktualisiere die Klassifizierer

## Wir werden den Klassifizierer mit Minibatches von 1000 Dokumenten versorgen; das bedeutet,
## dass wir zu jedem Zeitpunkt maximal 1000 Dokumente im Speicher haben. Je kleiner die Dokumentenmenge
## pro Batch ist, desto größer ist der relative Aufwand der partielle Anpassungsmethoden.
minibatch_size = 1000

## Erstelle den data_stream, der die Reuters-SGML-Dateien analysiert und über die Dokumente als Stream iteriert.
minibatch_iterators = iter_minibatches(data_stream, minibatch_size)
total_vect_time = 0.0

## Hauptschleife: Iteriere über Minibatches von Beispielen
for i, (X_train_text, y_train) in enumerate(minibatch_iterators):
    tick = time.time()
    X_train = vectorizer.transform(X_train_text)
    total_vect_time += time.time() - tick

    for cls_name, cls in partial_fit_classifiers.items():
        tick = time.time()
        ## Aktualisiere den Schätzer mit den Beispielen im aktuellen Minibatch
        cls.partial_fit(X_train, y_train, classes=all_classes)

        ## Akkumuliere die Testgenauigkeitsstatistiken
        cls_stats[cls_name]["total_fit_time"] += time.time() - tick
        cls_stats[cls_name]["n_train"] += X_train.shape[0]
        cls_stats[cls_name]["n_train_pos"] += sum(y_train)
        tick = time.time()
        cls_stats[cls_name]["accuracy"] = cls.score(X_test, y_test)
        cls_stats[cls_name]["prediction_time"] = time.time() - tick
        acc_history = (cls_stats[cls_name]["accuracy"], cls_stats[cls_name]["n_train"])
        cls_stats[cls_name]["accuracy_history"].append(acc_history)
        run_history = (
            cls_stats[cls_name]["accuracy"],
            total_vect_time + cls_stats[cls_name]["total_fit_time"],
        )
        cls_stats[cls_name]["runtime_history"].append(run_history)

        if i % 3 == 0:
            print(progress(cls_name, cls_stats[cls_name]))
    if i % 3 == 0:
        print("\n")

Zeichne die Ergebnisse

## Zeichne die Genauigkeitsentwicklung
plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## Zeichne die Genauigkeitsentwicklung mit der Anzahl der Beispiele
    accuracy, n_examples = zip(*stats["accuracy_history"])
    plot_accuracy(n_examples, accuracy, "training examples (#)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## Zeichne die Genauigkeitsentwicklung mit der Laufzeit
    accuracy, runtime = zip(*stats["runtime_history"])
    plot_accuracy(runtime, accuracy, "runtime (s)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

## Zeichne die Anpassungszeiten
plt.figure()
fig = plt.gcf()
cls_runtime = [stats["total_fit_time"] for cls_name, stats in sorted(cls_stats.items())]

cls_runtime.append(total_vect_time)
cls_names.append("Vectorization")
bar_colors = ["b", "g", "r", "c", "m", "y"]

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=10)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("runtime (s)")
ax.set_title("Training Times")


def autolabel(rectangles):
    """Füge einigen Text via autolabel zu den Rechtecken hinzu."""
    for rect in rectangles:
        height = rect.get_height()
        ax.text(
            rect.get_x() + rect.get_width() / 2.0,
            1.05 * height,
            "%.4f" % height,
            ha="center",
            va="bottom",
        )
        plt.setp(plt.xticks()[1], rotation=30)


autolabel(rectangles)
plt.tight_layout()
plt.show()

## Zeichne die Vorhersagezeiten
plt.figure()
cls_runtime = []
cls_names = list(sorted(cls_stats.keys()))
for cls_name, stats in sorted(cls_stats.items()):
    cls_runtime.append(stats["prediction_time"])
cls_runtime.append(parsing_time)
cls_names.append("Read/Parse\n+Feat.Extr.")
cls_runtime.append(vectorizing_time)
cls_names.append("Hashing\n+Vect.")

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=8)
plt.setp(plt.xticks()[1], rotation=30)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("runtime (s)")
ax.set_title("Prediction Times (%d instances)" % n_test_documents)
autolabel(rectangles)
plt.tight_layout()
plt.show()

Zusammenfassung

In diesem Lab haben wir gelernt, wie man scikit-learn für die Textklassifizierung mit out-of-core Learning verwendet. Wir haben einen Online-Klassifizierer verwendet, der die partial_fit-Methode unterstützt und mit Batches von Beispielen versorgt wurde. Wir haben auch einen HashingVectorizer genutzt, um sicherzustellen, dass der Merkmalsraum über die Zeit gleich blieb. Anschließend haben wir einen Testsatz zurückgehalten und über Minibatches von Beispielen iteriert, um die Klassifizierer zu aktualisieren. Schließlich haben wir die Ergebnisse geplottet, um die Genauigkeitsentwicklung und die Trainingszeiten zu visualisieren.