Classificação de Texto Usando Aprendizado Fora da Memória

Beginner

This tutorial is from open-source community. Access the source code

Introdução

Este laboratório fornece um exemplo de como usar o scikit-learn para classificação de texto utilizando aprendizado fora da memória principal. O objetivo é aprender com dados que não cabem na memória principal. Para isso, utilizamos um classificador online que suporta o método partial_fit, que será alimentado com lotes de exemplos. Para garantir que o espaço de características permaneça o mesmo ao longo do tempo, utilizamos um HashingVectorizer que projetará cada exemplo no mesmo espaço de características. Isto é especialmente útil no caso de classificação de texto, onde novas características (palavras) podem aparecer em cada lote.

Dicas da Máquina Virtual

Após o arranque da máquina virtual, clique no canto superior esquerdo para mudar para a aba Notebook para aceder ao Jupyter Notebook para praticar.

Por vezes, pode ser necessário esperar alguns segundos para o Jupyter Notebook terminar de carregar. A validação das operações não pode ser automatizada devido a limitações no Jupyter Notebook.

Se tiver problemas durante o aprendizado, não hesite em contactar o Labby. Forneça feedback após a sessão e resolveremos o problema rapidamente.

Importar bibliotecas e definir o analisador

import itertools
from pathlib import Path
from hashlib import sha256
import re
import tarfile
import time
import sys

import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams

from html.parser import HTMLParser
from urllib.request import urlretrieve
from sklearn.datasets import get_data_home
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.linear_model import SGDClassifier
from sklearn.linear_model import PassiveAggressiveClassifier
from sklearn.linear_model import Perceptron
from sklearn.naive_bayes import MultinomialNB


class ReutersParser(HTMLParser):
    """Classe utilitária para analisar um ficheiro SGML e gerar documentos um de cada vez."""

    def __init__(self, encoding="latin-1"):
        HTMLParser.__init__(self)
        self._reset()
        self.encoding = encoding

    def handle_starttag(self, tag, attrs):
        method = "start_" + tag
        getattr(self, method, lambda x: None)(attrs)

    def handle_endtag(self, tag):
        method = "end_" + tag
        getattr(self, method, lambda: None)()

    def _reset(self):
        self.in_title = 0
        self.in_body = 0
        self.in_topics = 0
        self.in_topic_d = 0
        self.title = ""
        self.body = ""
        self.topics = []
        self.topic_d = ""

    def parse(self, fd):
        self.docs = []
        for chunk in fd:
            self.feed(chunk.decode(self.encoding))
            for doc in self.docs:
                yield doc
            self.docs = []
        self.close()

    def handle_data(self, data):
        if self.in_body:
            self.body += data
        elif self.in_title:
            self.title += data
        elif self.in_topic_d:
            self.topic_d += data

    def start_reuters(self, attributes):
        pass

    def end_reuters(self):
        self.body = re.sub(r"\s+", r" ", self.body)
        self.docs.append(
            {"title": self.title, "body": self.body, "topics": self.topics}
        )
        self._reset()

    def start_title(self, attributes):
        self.in_title = 1

    def end_title(self):
        self.in_title = 0

    def start_body(self, attributes):
        self.in_body = 1

    def end_body(self):
        self.in_body = 0

    def start_topics(self, attributes):
        self.in_topics = 1

    def end_topics(self):
        self.in_topics = 0

    def start_d(self, attributes):
        self.in_topic_d = 1

    def end_d(self):
        self.in_topic_d = 0
        self.topics.append(self.topic_d)
        self.topic_d = ""

Definir o fluxo de documentos Reuters

def stream_reuters_documents(data_path=None):
    """Iterar sobre os documentos do conjunto de dados Reuters.

    O arquivo Reuters será automaticamente baixado e descompactado se o diretório
    `data_path` não existir.

    Os documentos são representados como dicionários com as chaves 'body' (str),
    'title' (str), 'topics' (lista(str)).

    """

    DOWNLOAD_URL = (
        "http://archive.ics.uci.edu/ml/machine-learning-databases/"
        "reuters21578-mld/reuters21578.tar.gz"
    )
    ARCHIVE_SHA256 = "3bae43c9b14e387f76a61b6d82bf98a4fb5d3ef99ef7e7075ff2ccbcf59f9d30"
    ARCHIVE_FILENAME = "reuters21578.tar.gz"

    if data_path is None:
        data_path = Path(get_data_home()) / "reuters"
    else:
        data_path = Path(data_path)
    if not data_path.exists():
        """Baixar o conjunto de dados."""
        print("baixando o conjunto de dados (uma vez por todas) em %s" % data_path)
        data_path.mkdir(parents=True, exist_ok=True)

        def progress(blocknum, bs, size):
            total_sz_mb = "%.2f MB" % (size / 1e6)
            current_sz_mb = "%.2f MB" % ((blocknum * bs) / 1e6)
            if _not_in_sphinx():
                sys.stdout.write("\rdownloaded %s / %s" % (current_sz_mb, total_sz_mb))

        archive_path = data_path / ARCHIVE_FILENAME

        urlretrieve(DOWNLOAD_URL, filename=archive_path, reporthook=progress)
        if _not_in_sphinx():
            sys.stdout.write("\r")

        ## Verificar se o arquivo não foi adulterado:
        assert sha256(archive_path.read_bytes()).hexdigest() == ARCHIVE_SHA256

        print("descompactando o conjunto de dados Reuters...")
        tarfile.open(archive_path, "r:gz").extractall(data_path)
        print("feito.")

    parser = ReutersParser()
    for filename in data_path.glob("*.sgm"):
        for doc in parser.parse(open(filename, "rb")):
            yield doc

Configurar o vetorizador e reservar um conjunto de teste

## Criar o vetorizador e limitar o número de recursos a um máximo razoável
vectorizer = HashingVectorizer(decode_error="ignore", n_features=2**18, alternate_sign=False)

## Iterador sobre arquivos SGML Reuters analisados.
data_stream = stream_reuters_documents()

## Aprendemos uma classificação binária entre a classe "acq" e todas as outras.
## "acq" foi escolhida porque está mais ou menos distribuída uniformemente nos arquivos Reuters.
## Para outros conjuntos de dados, deve-se ter cuidado para criar um conjunto de teste com uma porção realista de instâncias positivas.
all_classes = np.array([0, 1])
positive_class = "acq"

## Aqui estão alguns classificadores que suportam o método `partial_fit`
partial_fit_classifiers = {
    "SGD": SGDClassifier(max_iter=5),
    "Perceptron": Perceptron(),
    "NB Multinomial": MultinomialNB(alpha=0.01),
    "Passive-Aggressive": PassiveAggressiveClassifier(),
}

## Estatísticas dos dados de teste
test_stats = {"n_test": 0, "n_test_pos": 0}

## Primeiro, reservamos um número de exemplos para estimar a precisão
n_test_documents = 1000
X_test_text, y_test = get_minibatch(data_stream, 1000)
X_test = vectorizer.transform(X_test_text)
test_stats["n_test"] += len(y_test)
test_stats["n_test_pos"] += sum(y_test)
print("O conjunto de teste é de %d documentos (%d positivos)" % (len(y_test), sum(y_test)))

Definir uma função para obter um mini lote de exemplos

def get_minibatch(doc_iter, size, pos_class=positive_class):
    """Extrair um mini lote de exemplos, retornar uma tupla X_text, y.

    Nota: o tamanho é antes de excluir documentos inválidos sem tópicos atribuídos.

    """
    data = [
        ("{title}\n\n{body}".format(**doc), pos_class in doc["topics"])
        for doc in itertools.islice(doc_iter, size)
        if doc["topics"]
    ]
    if not len(data):
        return np.asarray([], dtype=int), np.asarray([], dtype=int)
    X_text, y = zip(*data)
    return X_text, np.asarray(y, dtype=int)

Definir uma função geradora para iterar sobre minilotes

def iter_minibatches(doc_iter, minibatch_size):
    """Gerador de minilotes."""
    X_text, y = get_minibatch(doc_iter, minibatch_size)
    while len(X_text):
        yield X_text, y
        X_text, y = get_minibatch(doc_iter, minibatch_size)

Iterar sobre mini-lotes de exemplos e atualizar os classificadores

## Vamos alimentar o classificador com mini-lotes de 1000 documentos; isso significa
## que temos no máximo 1000 documentos na memória em qualquer momento. Quanto menor o
## lote de documentos, maior a sobrecarga relativa dos métodos de ajuste parcial.
minibatch_size = 1000

## Criar o fluxo de dados que analisa arquivos SGML do Reuters e itera sobre
## documentos como um fluxo.
minibatch_iterators = iter_minibatches(data_stream, minibatch_size)
total_vect_time = 0.0

## Loop principal: iterar sobre mini-lotes de exemplos
for i, (X_train_text, y_train) in enumerate(minibatch_iterators):
    tick = time.time()
    X_train = vectorizer.transform(X_train_text)
    total_vect_time += time.time() - tick

    for cls_name, cls in partial_fit_classifiers.items():
        tick = time.time()
        ## atualizar o estimador com exemplos no mini-lote atual
        cls.partial_fit(X_train, y_train, classes=all_classes)

        ## acumular estatísticas de precisão de teste
        cls_stats[cls_name]["total_fit_time"] += time.time() - tick
        cls_stats[cls_name]["n_train"] += X_train.shape[0]
        cls_stats[cls_name]["n_train_pos"] += sum(y_train)
        tick = time.time()
        cls_stats[cls_name]["accuracy"] = cls.score(X_test, y_test)
        cls_stats[cls_name]["prediction_time"] = time.time() - tick
        acc_history = (cls_stats[cls_name]["accuracy"], cls_stats[cls_name]["n_train"])
        cls_stats[cls_name]["accuracy_history"].append(acc_history)
        run_history = (
            cls_stats[cls_name]["accuracy"],
            total_vect_time + cls_stats[cls_name]["total_fit_time"],
        )
        cls_stats[cls_name]["runtime_history"].append(run_history)

        if i % 3 == 0:
            print(progress(cls_name, cls_stats[cls_name]))
    if i % 3 == 0:
        print("\n")

Plotar os resultados

## Plotar a evolução da precisão
plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## Plotar a evolução da precisão com o número de exemplos
    accuracy, n_examples = zip(*stats["accuracy_history"])
    plot_accuracy(n_examples, accuracy, "exemplos de treinamento (#)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

plt.figure()
for _, stats in sorted(cls_stats.items()):
    ## Plotar a evolução da precisão com o tempo de execução
    accuracy, runtime = zip(*stats["runtime_history"])
    plot_accuracy(runtime, accuracy, "tempo de execução (s)")
    ax = plt.gca()
    ax.set_ylim((0.8, 1))
plt.legend(cls_names, loc="best")

## Plotar os tempos de ajuste
plt.figure()
fig = plt.gcf()
cls_runtime = [stats["total_fit_time"] for cls_name, stats in sorted(cls_stats.items())]

cls_runtime.append(total_vect_time)
cls_names.append("Vectorização")
bar_colors = ["b", "g", "r", "c", "m", "y"]

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=10)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("tempo de execução (s)")
ax.set_title("Tempos de Treinamento")


def autolabel(rectangles):
    """adicionar algum texto via autolabel nos retângulos."""
    for rect in rectangles:
        height = rect.get_height()
        ax.text(
            rect.get_x() + rect.get_width() / 2.0,
            1.05 * height,
            "%.4f" % height,
            ha="center",
            va="bottom",
        )
        plt.setp(plt.xticks()[1], rotation=30)


autolabel(rectangles)
plt.tight_layout()
plt.show()

## Plotar os tempos de previsão
plt.figure()
cls_runtime = []
cls_names = list(sorted(cls_stats.keys()))
for cls_name, stats in sorted(cls_stats.items()):
    cls_runtime.append(stats["prediction_time"])
cls_runtime.append(parsing_time)
cls_names.append("Leitura/Parse\n+Extr. de Feat.")
cls_runtime.append(vectorizing_time)
cls_names.append("Hashing\n+Vet.")

ax = plt.subplot(111)
rectangles = plt.bar(range(len(cls_names)), cls_runtime, width=0.5, color=bar_colors)

ax.set_xticks(np.linspace(0, len(cls_names) - 1, len(cls_names)))
ax.set_xticklabels(cls_names, fontsize=8)
plt.setp(plt.xticks()[1], rotation=30)
ymax = max(cls_runtime) * 1.2
ax.set_ylim((0, ymax))
ax.set_ylabel("tempo de execução (s)")
ax.set_title("Tempos de Predição (%d instâncias)" % n_test_documents)
autolabel(rectangles)
plt.tight_layout()
plt.show()

Resumo

Neste laboratório, aprendemos a utilizar o scikit-learn para classificação de texto usando aprendizado fora da memória. Usamos um classificador online que suporta o método partial_fit, alimentado com lotes de exemplos. Também aproveitamos um HashingVectorizer para garantir que o espaço de características permanecesse o mesmo ao longo do tempo. Em seguida, reservamos um conjunto de teste e iteramos sobre mini-lotes de exemplos para atualizar os classificadores. Finalmente, plotamos os resultados para visualizar a evolução da precisão e os tempos de treinamento.