Clasificación de texto utilizando aprendizaje fuera de núcleo

Beginner

This tutorial is from open-source community. Access the source code

Introducción

Esta práctica proporciona un ejemplo de cómo utilizar scikit-learn para la clasificación de texto mediante aprendizaje fuera de núcleo. El objetivo es aprender a partir de datos que no caben en la memoria principal. Para lograr esto, utilizamos un clasificador en línea que admite el método partial_fit, al que se alimentarán lotes de ejemplos. Para garantizar que el espacio de características permanezca constante con el tiempo, aprovechamos un HashingVectorizer que proyectará cada ejemplo al mismo espacio de características. Esto es especialmente útil en el caso de la clasificación de texto donde pueden aparecer nuevas características (palabras) en cada lote.

Consejos sobre la VM

Una vez finalizada la inicialización de la VM, haga clic en la esquina superior izquierda para cambiar a la pestaña Cuaderno y acceder a Jupyter Notebook para practicar.

A veces, es posible que tenga que esperar unos segundos a que Jupyter Notebook termine de cargarse. La validación de las operaciones no se puede automatizar debido a las limitaciones de Jupyter Notebook.

Si tiene problemas durante el aprendizaje, no dude en preguntar a Labby. Deje su retroalimentación después de la sesión y resolveremos rápidamente el problema para usted.

Importar bibliotecas y definir el analizador

import itertools
from pathlib import Path
from hashlib import sha256
import re
import tarfile
import time
import sys

import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams

from html.parser import HTMLParser
from urllib.request import urlretrieve
from sklearn.datasets import get_data_home
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.linear_model import Perceptron
from sklearn.naive_bayes import MultinomialNB


class ReutersParser(HTMLParser):
    """Clase de utilidad para analizar un archivo SGML y generar documentos uno por uno."""

    def __init__(self, encoding="latin-1"):
        HTMLParser.__init__(self)
        self._reset()
        self.encoding = encoding

    def handle_starttag(self, tag, attrs):
        method = "start_" + tag
        getattr(self, method, lambda x: None)(attrs)

    def handle_endtag(self, tag):
        method = "end_" + tag
        getattr(self, method, lambda: None)()

    def _reset(self):
        self.in_title = 0
        self.in_body = 0
        self.in_topics = 0
        self.in_topic_d = 0
        self.title = ""
        self.body = ""
        self.topics = []
        self.topic_d = ""

    def parse(self, fd):
        self.docs = []
        for chunk in fd:
            self.feed(chunk.decode(self.encoding))
            for doc in self.docs:
                yield doc
            self.docs = []
        self.close()

    def handle_data(self, data):
        if self.in_body:
            self.body += data
        elif self.in_title:
            self.title += data
        elif self.in_topic_d:
            self.topic_d += data

    def start_reuters(self, attributes):
        pass

    def end_reuters(self):
        self.body = re.sub(r"\s+", r" ", self.body)
        self.docs.append(
            {"title": self.title, "body": self.body, "topics": self.topics}
        )
        self._reset()

    def start_title(self, attributes):
        self.in_title = 1

    def end_title(self):
        self.in_title = 0

    def start_body(self, attributes):
        self.in_body = 1

    def end_body(self):
        self.in_body = 0

    def start_topics(self, attributes):
        self.in_topics = 1

    def end_topics(self):
        self.in_topics = 0

    def start_d(self, attributes):
        self.in_topic_d = 1

    def end_d(self):
        self.in_topic_d = 0
        self.topics.append(self.topic_d)
        self.topic_d = ""

Definir el flujo de documentos de Reuters

def stream_reuters_documents(data_path=None):
    """Iterar sobre los documentos del conjunto de datos de Reuters.

    El archivo de Reuters se descargará y descomprimirá automáticamente si
    el directorio `data_path` no existe.

    Los documentos se representan como diccionarios con las claves 'body' (str),
    'title' (str), 'topics' (list(str)).

    """

    DOWNLOAD_URL = (
        "http://archive.ics.uci.edu/ml/machine-learning-databases/"
        "reuters21578-mld/reuters21578.tar.gz"
    )
    ARCHIVE_SHA256 = "3bae43c9b14e387f76a61b6d82bf98a4fb5d3ef99ef7e7075ff2ccbcf59f9d30"
    ARCHIVE_FILENAME = "reuters21578.tar.gz"

    if data_path is None:
        data_path = Path(get_data_home()) / "reuters"
    else:
        data_path = Path(data_path)
    if not data_path.exists():
        """Descargar el conjunto de datos."""
        print("descargando el conjunto de datos (una sola vez) en %s" % data_path)
        data_path.mkdir(parents=True, exist_ok=True)

        def progress(blocknum, bs, size):
            total_sz_mb = "%.2f MB" % (size / 1e6)
            current_sz_mb = "%.2f MB" % ((blocknum * bs) / 1e6)
            if _not_in_sphinx():
                sys.stdout.write("\rdescargado %s / %s" % (current_sz_mb, total_sz_mb))

        archive_path = data_path / ARCHIVE_FILENAME

        urlretrieve(DOWNLOAD_URL, filename=archive_path, reporthook=progress)
        if _not_in_sphinx():
            sys.stdout.write("\r")

        ## Comprobar que el archivo no ha sido alterado:
        assert sha256(archive_path.read_bytes()).hexdigest() == ARCHIVE_SHA256

        print("descomprimiendo el conjunto de datos de Reuters...")
        tarfile.open(archive_path, "r:gz").extractall(data_path)
        print("hecho.")

    parser = ReutersParser()
    for filename in data_path.glob("*.sgm"):
        for doc in parser.parse(open(filename, "rb")):
            yield doc

Configurar el vectorizador y reservar un conjunto de prueba

## Crear el vectorizador y limitar el número de características a un máximo razonable
vectorizer = HashingVectorizer(decode_error="ignore", n_features=2**18, alternate_sign=False)

## Iterador sobre los archivos SGML de Reuters analizados.
data_stream = stream_reuters_documents()

## Aprendemos una clasificación binaria entre la clase "acq" y todas las demás.
## "acq" fue elegido porque está más o menos equitativamente distribuida en los archivos de Reuters.
## Para otros conjuntos de datos, se debe tener cuidado de crear un conjunto de prueba con una proporción realista de instancias positivas.
all_classes = np.array([0, 1])
positive_class = "acq"

## Aquí hay algunos clasificadores que admiten el método `partial_fit`
partial_fit_classifiers = {
    "SGD": SGDClassifier(max_iter=5),
    "Perceptron": Perceptron(),
    "NB Multinomial": MultinomialNB(alpha=0.01),
    "Passive-Aggressive": PassiveAggressiveClassifier(),
}

## Estadísticas de los datos de prueba
test_stats = {"n_test": 0, "n_test_pos": 0}

## Primero, reservamos un número de ejemplos para estimar la precisión
n_test_documents = 1000
X_test_text, y_test = get_minibatch(data_stream, 1000)
X_test = vectorizer.transform(X_test_text)
test_stats["n_test"] += len(y_test)
test_stats["n_test_pos"] += sum(y_test)
print("El conjunto de prueba es %d documentos (%d positivos)" % (len(y_test), sum(y_test)))

Definir una función para obtener un lote pequeño de ejemplos

def get_minibatch(doc_iter, size, pos_class=positive_class):
    """Extraer un lote pequeño de ejemplos, devolver una tupla X_text, y.

    Nota: size es antes de excluir los documentos no válidos sin temas asignados.

    """
    data = [
        ("{title}\n\n{body}".format(**doc), pos_class in doc["topics"])
        for doc in itertools.islice(doc_iter, size)
        if doc["topics"]
    ]
    if not len(data):
        return np.asarray([], dtype=int), np.asarray([], dtype=int)
    X_text, y = zip(*data)
    return X_text, np.asarray(y, dtype=int)

Definir una función generadora para iterar sobre lotes pequeños

def iter_minibatches(doc_iter, minibatch_size):
    """Generador de lotes pequeños."""
    X_text, y = get_minibatch(doc_iter, minibatch_size)
    while len(X_text):
        yield X_text, y
        X_text, y = get_minibatch(doc_iter, minibatch_size)

Iterar sobre lotes pequeños de ejemplos y actualizar los clasificadores

## Le alimentaremos el clasificador con lotes pequeños de 1000 documentos; esto significa
## que tendremos como máximo 1000 documentos en memoria en cualquier momento.
## Cuanto más pequeño sea el lote de documentos, mayor será el costo relativo
## de los métodos de ajuste parcial.
minibatch_size = 1000

## Crear el data_stream que analiza los archivos SGML de Reuters y itera sobre
## los documentos como un flujo.
minibatch_iterators = iter_minibatches(data_stream, minibatch_size)
total_vect_time = 0.0

## Bucle principal: iterar sobre lotes pequeños de ejemplos
for i, (X_train_text, y_train) in enumerate(minibatch_iterators):
    tick = time.time()
    X_train = vectorizer.transform(X_train_text)
    total_vect_time += time.time() - tick

    for cls_name, cls in partial_fit_classifiers.items():
        tick = time.time()
        ## actualizar el estimador con los ejemplos del lote pequeño actual
        cls.partial_fit(X_train, y_train, classes=all_classes)

        ## acumular estadísticas de precisión en el test
        cls_stats[cls_name]["total_fit_time"] += time.time() - tick
        cls_stats[cls_name]["n_train"] += X_train.shape[0]
        cls_stats[cls_name]["n_train_pos"] += sum(y_train)
        tick = time.time()
        cls_stats[cls_name]["accuracy"] = cls.score(X_test, y_test)
        cls_stats[cls_name]["prediction_time"] = time.time() - tick
        acc_history = (cls_stats[cls_name]["accuracy"], cls_stats[cls_name]["n_train"])
        cls_stats[cls_name]["accuracy_history"].append(acc_history)
        run_history = (
            cls_stats[cls_name]["accuracy"],
            total_vect_time + cls_stats[cls_name]["total_fit_time"],
        )
        cls_stats[cls_name]["runtime_history"].append(run_history)

        if i % 3 == 0:
            print(progress(cls_name, cls_stats[cls_name]))
    if i % 3 == 0:
        print("\n")

Graficar los resultados

## Graficar la evolución de la precisión
plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## Graficar la evolución de la precisión con el número de ejemplos
    accuracy, n_examples = zip(*stats["accuracy_history"])
    plot_accuracy(n_examples, accuracy, "ejemplos de entrenamiento (#)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## Graficar la evolución de la precisión con el tiempo de ejecución
    accuracy, runtime = zip(*stats["runtime_history"])
    plot_accuracy(runtime, accuracy, "tiempo de ejecución (s)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

## Graficar los tiempos de ajuste
plt.figure()
fig = plt.gcf()
cls_runtime = [stats["total_fit_time"] for cls_name, stats in sorted(cls_stats.items())]

cls_runtime.append(total_vect_time)
cls_names.append("Vectorización")
bar_colors = ["b", "g", "r", "c", "m", "y"]

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=10)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("tiempo de ejecución (s)")
ax.set_title("Tiempos de entrenamiento")


def autolabel(rectangles):
    """adjuntar un texto mediante autolabel en los rectángulos."""
    for rect in rectangles:
        height = rect.get_height()
        ax.text(
            rect.get_x() + rect.get_width() / 2.0,
            1.05 * height,
            "%.4f" % height,
            ha="center",
            va="bottom",
        )
        plt.setp(plt.xticks()[1], rotation=30)


autolabel(rectangles)
plt.tight_layout()
plt.show()

## Graficar los tiempos de predicción
plt.figure()
cls_runtime = []
cls_names = list(sorted(cls_stats.keys()))
for cls_name, stats in sorted(cls_stats.items()):
    cls_runtime.append(stats["prediction_time"])
cls_runtime.append(parsing_time)
cls_names.append("Lectura/Analizado\n+Extracción de Características")
cls_runtime.append(vectorizing_time)
cls_names.append("Hash\n+Vectorización")

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=8)
plt.setp(plt.xticks()[1], rotation=30)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("tiempo de ejecución (s)")
ax.set_title("Tiempos de predicción (%d instancias)" % n_test_documents)
autolabel(rectangles)
plt.tight_layout()
plt.show()

Resumen

En este laboratorio, aprendimos cómo usar scikit-learn para la clasificación de texto utilizando aprendizaje fuera de núcleo. Utilizamos un clasificador en línea que soporta el método partial_fit, el cual fue alimentado con lotes de ejemplos. También aprovechamos un HashingVectorizer para asegurar que el espacio de características permaneciera el mismo con el tiempo. Luego, reservamos un conjunto de prueba y iteramos sobre lotes pequeños de ejemplos para actualizar los clasificadores. Finalmente, graficamos los resultados para visualizar la evolución de la precisión y los tiempos de entrenamiento.