Détection d'anomalies à l'aide d'algorithmes Scikit-Learn

Machine LearningMachine LearningBeginner
Pratiquer maintenant

This tutorial is from open-source community. Access the source code

💡 Ce tutoriel est traduit par l'IA à partir de la version anglaise. Pour voir la version originale, vous pouvez cliquer ici

Introduction

Ce laboratoire montre comment utiliser Scikit-Learn pour effectuer la détection d'anomalies sur des jeux de données de détection d'anomalies classiques en utilisant les algorithmes de facteur d'anomalie local (LOF) et forêt d'isolation (IForest). Les performances des algorithmes sont évaluées dans un contexte de détection d'anomalies, et les courbes ROC sont utilisées pour tracer les résultats.

Conseils sur la machine virtuelle

Une fois le démarrage de la machine virtuelle terminé, cliquez dans le coin supérieur gauche pour basculer vers l'onglet Notebook pour accéder à Jupyter Notebook pour la pratique.

Parfois, vous devrez peut-être attendre quelques secondes pour que Jupyter Notebook ait fini de charger. La validation des opérations ne peut pas être automatisée en raison des limitations de Jupyter Notebook.

Si vous rencontrez des problèmes pendant l'apprentissage, n'hésitez pas à demander à Labby. Donnez votre feedback après la session, et nous résoudrons rapidement le problème pour vous.


Skills Graph

%%%%{init: {'theme':'neutral'}}%%%% flowchart RL sklearn(("Sklearn")) -.-> sklearn/CoreModelsandAlgorithmsGroup(["Core Models and Algorithms"]) sklearn(("Sklearn")) -.-> sklearn/DataPreprocessingandFeatureEngineeringGroup(["Data Preprocessing and Feature Engineering"]) sklearn(("Sklearn")) -.-> sklearn/ModelSelectionandEvaluationGroup(["Model Selection and Evaluation"]) sklearn(("Sklearn")) -.-> sklearn/UtilitiesandDatasetsGroup(["Utilities and Datasets"]) ml(("Machine Learning")) -.-> ml/FrameworkandSoftwareGroup(["Framework and Software"]) sklearn/CoreModelsandAlgorithmsGroup -.-> sklearn/neighbors("Nearest Neighbors") sklearn/CoreModelsandAlgorithmsGroup -.-> sklearn/ensemble("Ensemble Methods") sklearn/DataPreprocessingandFeatureEngineeringGroup -.-> sklearn/preprocessing("Preprocessing and Normalization") sklearn/ModelSelectionandEvaluationGroup -.-> sklearn/metrics("Metrics") sklearn/UtilitiesandDatasetsGroup -.-> sklearn/datasets("Datasets") ml/FrameworkandSoftwareGroup -.-> ml/sklearn("scikit-learn") subgraph Lab Skills sklearn/neighbors -.-> lab-49236{{"Détection d'anomalies à l'aide d'algorithmes Scikit-Learn"}} sklearn/ensemble -.-> lab-49236{{"Détection d'anomalies à l'aide d'algorithmes Scikit-Learn"}} sklearn/preprocessing -.-> lab-49236{{"Détection d'anomalies à l'aide d'algorithmes Scikit-Learn"}} sklearn/metrics -.-> lab-49236{{"Détection d'anomalies à l'aide d'algorithmes Scikit-Learn"}} sklearn/datasets -.-> lab-49236{{"Détection d'anomalies à l'aide d'algorithmes Scikit-Learn"}} ml/sklearn -.-> lab-49236{{"Détection d'anomalies à l'aide d'algorithmes Scikit-Learn"}} end

Prétraitement des données

La première étape consiste à prétraiter le jeu de données. Dans cet exemple, nous utilisons des jeux de données du monde réel disponibles dans le module datasets de Scikit-Learn. La taille d'échantillonnage de certains jeux de données est réduite pour accélérer les calculs. Après le prétraitement des données, les cibles des jeux de données auront deux classes, 0 représentant les données normales et 1 représentant les anomalies. La fonction preprocess_dataset renvoie les données et la cible.

import numpy as np
from sklearn.datasets import fetch_kddcup99, fetch_covtype, fetch_openml
from sklearn.preprocessing import LabelBinarizer
import pandas as pd

rng = np.random.RandomState(42)

def preprocess_dataset(dataset_name):
    ## chargement et vectorisation
    print(f"Chargement des données {dataset_name}")
    if dataset_name in ["http", "smtp", "SA", "SF"]:
        dataset = fetch_kddcup99(subset=dataset_name, percent10=True, random_state=rng)
        X = dataset.data
        y = dataset.target
        lb = LabelBinarizer()

        if dataset_name == "SF":
            idx = rng.choice(X.shape[0], int(X.shape[0] * 0.1), replace=False)
            X = X[idx]  ## réduire la taille d'échantillonnage
            y = y[idx]
            x1 = lb.fit_transform(X[:, 1].astype(str))
            X = np.c_[X[:, :1], x1, X[:, 2:]]
        elif dataset_name == "SA":
            idx = rng.choice(X.shape[0], int(X.shape[0] * 0.1), replace=False)
            X = X[idx]  ## réduire la taille d'échantillonnage
            y = y[idx]
            x1 = lb.fit_transform(X[:, 1].astype(str))
            x2 = lb.fit_transform(X[:, 2].astype(str))
            x3 = lb.fit_transform(X[:, 3].astype(str))
            X = np.c_[X[:, :1], x1, x2, x3, X[:, 4:]]
        y = (y!= b"normal.").astype(int)
    if dataset_name == "forestcover":
        dataset = fetch_covtype()
        X = dataset.data
        y = dataset.target
        idx = rng.choice(X.shape[0], int(X.shape[0] * 0.1), replace=False)
        X = X[idx]  ## réduire la taille d'échantillonnage
        y = y[idx]

        ## les données normales sont celles avec l'attribut 2
        ## les anomalies sont celles avec l'attribut 4
        s = (y == 2) + (y == 4)
        X = X[s, :]
        y = y[s]
        y = (y!= 2).astype(int)
    if dataset_name in ["glass", "wdbc", "cardiotocography"]:
        dataset = fetch_openml(
            name=dataset_name, version=1, as_frame=False, parser="pandas"
        )
        X = dataset.data
        y = dataset.target

        if dataset_name == "glass":
            s = y == "tableware"
            y = s.astype(int)
        if dataset_name == "wdbc":
            s = y == "2"
            y = s.astype(int)
            X_mal, y_mal = X[s], y[s]
            X_ben, y_ben = X[~s], y[~s]

            ## sous-échantillonnage à 39 points (9,8% d'anomalies)
            idx = rng.choice(y_mal.shape[0], 39, replace=False)
            X_mal2 = X_mal[idx]
            y_mal2 = y_mal[idx]
            X = np.concatenate((X_ben, X_mal2), axis=0)
            y = np.concatenate((y_ben, y_mal2), axis=0)
        if dataset_name == "cardiotocography":
            s = y == "3"
            y = s.astype(int)
    ## 0 représente les données normales, et 1 représente les anomalies
    y = pd.Series(y, dtype="category")
    return (X, y)

Fonction de prédiction d'anomalies

L'étape suivante consiste à définir une fonction de prédiction d'anomalies. Dans cet exemple, nous utilisons les algorithmes LocalOutlierFactor et IsolationForest. La fonction compute_prediction renvoie la note moyenne d'anomalie de X.

from sklearn.neighbors import LocalOutlierFactor
from sklearn.ensemble import IsolationForest

def compute_prediction(X, model_name):
    print(f"Calcul de la prédiction {model_name}...")
    if model_name == "LOF":
        clf = LocalOutlierFactor(n_neighbors=20, contamination="auto")
        clf.fit(X)
        y_pred = clf.negative_outlier_factor_
    if model_name == "IForest":
        clf = IsolationForest(random_state=rng, contamination="auto")
        y_pred = clf.fit(X).decision_function(X)
    return y_pred

Tracer et interpréter les résultats

La dernière étape consiste à tracer et à interpréter les résultats. Les performances de l'algorithme dépendent de la qualité du taux de vrais positifs (TPR) pour de faibles valeurs du taux de faux positifs (FPR). Les meilleurs algorithmes ont une courbe dans le coin supérieur gauche du graphique et une aire sous la courbe (AUC) proche de 1. La ligne pointillée diagonale représente une classification aléatoire d'anomalies et de données normales.

import math
import matplotlib.pyplot as plt
from sklearn.metrics import RocCurveDisplay

datasets_name = [
    "http",
    "smtp",
    "SA",
    "SF",
    "forestcover",
    "glass",
    "wdbc",
    "cardiotocography",
]

models_name = [
    "LOF",
    "IForest",
]

## paramètres de tracé
cols = 2
linewidth = 1
pos_label = 0  ## signifie que 0 appartient à la classe positive
rows = math.ceil(len(datasets_name) / cols)

fig, axs = plt.subplots(rows, cols, figsize=(10, rows * 3), sharex=True, sharey=True)

for i, dataset_name in enumerate(datasets_name):
    (X, y) = preprocess_dataset(dataset_name=dataset_name)

    for model_idx, model_name in enumerate(models_name):
        y_pred = compute_prediction(X, model_name=model_name)
        display = RocCurveDisplay.from_predictions(
            y,
            y_pred,
            pos_label=pos_label,
            name=model_name,
            linewidth=linewidth,
            ax=axs[i // cols, i % cols],
            plot_chance_level=(model_idx == len(models_name) - 1),
            chance_level_kw={
                "linewidth": linewidth,
                "linestyle": ":",
            },
        )
    axs[i // cols, i % cols].set_title(dataset_name)
plt.tight_layout(pad=2.0)  ## espacement entre les sous-graphiques
plt.show()

Sommaire

Ce laboratoire a montré comment utiliser Scikit-Learn pour effectuer la détection d'anomalies sur des jeux de données de détection d'anomalies classiques en utilisant les algorithmes de facteur d'anomalie local (LOF) et forêt d'isolation (IForest). Les performances des algorithmes ont été évaluées dans un contexte de détection d'anomalies, et les courbes ROC ont été utilisées pour tracer les résultats.