Source code for psykoda.detection

"""Anomaly Detection and Explanation."""

import dataclasses
import os
import random
from logging import getLogger
from typing import List, Optional, Tuple, Union

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import shap
import tensorflow as tf
from pandas import DataFrame, Series
from scipy.sparse import csr_matrix
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.layers import Dense, Input, Layer, LeakyReLU
from tensorflow.keras.models import load_model
from tensorflow.keras.utils import Sequence

logger = getLogger(__name__)

KERNEL_INITIALIZER = "he_normal"
LAYERNAME_ENCODER_OUTPUT = "encoder_output"
REGULARIZER_L2 = (
    tf.keras.regularizers.L2 if tf.__version__ == "2.3.0" else tf.keras.regularizers.l2
)


[docs]class generator_autoencoder_training(Sequence): """Sparse matrix as batches of dense arrays""" def __init__(self, X: csr_matrix, batch_size: int): self.X = X self.batch_size = batch_size self.num_samples: int = X.shape[0] self.steps_per_epoch = int(np.ceil(self.num_samples / self.batch_size)) def __len__(self) -> int: return self.steps_per_epoch def __getitem__(self, idx: int) -> Tuple[np.ndarray, np.ndarray]: batch_X = self.X[idx * self.batch_size : (idx + 1) * self.batch_size].toarray() # autoencoder return batch_X, batch_X
[docs] def on_epoch_end(self): # add ops of shuffling all samples, if available pass
[docs]def loss_sad(c, eta=1.0): """Loss function for Deep SAD References ---------- [1] L. Ruff, R. A. Vandermeulen, N. Görnitz, A. Binder, E. Müller, K.-R. Müller, M. Kloft, "Deep Semi-Supervised Anomaly Detection", https://arxiv.org/abs/1906.02694 """ def loss_function(labels: tf.Tensor, embeddings: tf.Tensor): """Loss function for Deep SAD Parameters ---------- labels ground truth labels :shape: (batch_size, 1) embeddings outputs of encoder phi :shape: (batch_size, dim_embedding) """ labels = tf.reshape(labels, [-1]) # flatten GT loss_nolabeled = tf.reduce_sum( (embeddings - c) ** 2 + 1e-6, axis=1 ) # loss for not labeled samples loss_labeled = eta * tf.pow(loss_nolabeled, labels) # loss for labeled samples mask = tf.equal(labels, tf.zeros_like(labels)) loss_total = tf.where(mask, loss_nolabeled, loss_labeled) loss = tf.reduce_mean(loss_total) return loss return loss_function
[docs]def dense_block(inputs: tf.Tensor, units: int, lam: float, name: str) -> Layer: """Basic block (Dense-LeakyReLU layers) of multi layer perceptron. Parameters ---------- input input of block units number of the units in the Dense layer lam regularization parameter on the weights in Dense layer name name of block; "_dense" and "_LeakyReLU" are appended for the layers Returns ------- output Dense-LeakyReLu layers """ output = Dense( units=units, use_bias=False, activation="linear", name=name + "_dense", kernel_regularizer=REGULARIZER_L2(lam), kernel_initializer=KERNEL_INITIALIZER, )(inputs) return LeakyReLU(name=name + "_LeakyReLU")(output)
[docs]class DeepSAD: """Deep SAD Semi-supervised Anomaly Detector. Translated from `paper author Lukas Ruff's PyTorch implementation <https://github.com/lukasruff/Deep-SAD-PyTorch>`_ into TensorFlow. .. todo:: more detailed description, including comparison with PyTorch version. Attributes ---------- dim_hidden from Config eta from Config lam from Config path_pretrained_model from Config dim_input number of features history detector Original License ---------------- MIT License Copyright (c) 2019 lukasruff Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """
[docs] @dataclasses.dataclass class Config: """Configuration for DeepSAD model Parameters ---------- dim_hidden number of units in hidden layers eta Deep SAD regularization hyperparameter eta (must be 0 < eta) balancing the loss for labeled and unlabeled samples lam regularization parameter on L2-norm of weights path_pretrained_model path to pretrained model (currently unused) """ dim_hidden: List[int] = dataclasses.field( default_factory=lambda: [128, 128, 64, 64, 32] ) eta: float = 1.0 lam: float = 1e-6 path_pretrained_model: Optional[str] = None
def __init__( self, config: Config, ): self.dim_hidden = config.dim_hidden self.eta = config.eta self.lam = config.lam self.path_pretrained_model = config.path_pretrained_model self.dim_input: Optional[int] = None self.history = None self.detector: Optional[tf.keras.Model] = None def _build_encoder(self) -> tf.keras.Model: """Build encoder model (Multi Layer Perceptron) Returns ------- encoder input: (dim_input, ), output: (dim_hidden[-1], ) """ inputs = Input(shape=(self.dim_input,), sparse=False, name="encoder_input") outputs = inputs for i, dim in enumerate(self.dim_hidden[:-1]): outputs = dense_block( outputs, units=dim, lam=self.lam, name="encoder_block" + str(i + 1) ) outputs = Dense( units=self.dim_hidden[-1], use_bias=False, activation="linear", name=LAYERNAME_ENCODER_OUTPUT, kernel_regularizer=REGULARIZER_L2(self.lam), kernel_initializer=KERNEL_INITIALIZER, )(outputs) encoder = tf.keras.Model(inputs=inputs, outputs=outputs) return encoder def _build_autoencoder( self, encoder_input: tf.Tensor, encoder_output: tf.Tensor ) -> tf.keras.Model: """Build autoencoder model Parameters ---------- encoder_input input of the first layer of the encoder encoder_ouput output of the last layer of the encoder (bottleneck layer) Returns ------- autoencoder input: (dim_input, ), output: (self.dim_input, ) """ outputs = LeakyReLU(name="decoder_1st_LeakyReLU")(encoder_output) for i, dim in enumerate(self.dim_hidden[::-1][1:]): outputs = dense_block( outputs, units=dim, lam=self.lam, name="decoder_block" + str(i + 1) ) outputs = Dense( units=self.dim_input, use_bias=False, activation="linear", name="decoder_output", kernel_regularizer=REGULARIZER_L2(self.lam), kernel_initializer=KERNEL_INITIALIZER, )(outputs) autoencoder = tf.keras.Model(inputs=encoder_input, outputs=outputs) return autoencoder def _build_detector( self, encoder: tf.keras.Model, center: np.ndarray ) -> tf.keras.Model: r"""Build anomaly detector model Parameters ---------- encoder center center of embeddings (encoded feature) ("c" in the paper) :shape: (dim_embedding, ) Returns ------- detector anomaly detector detector(x) = \| encoder(x) - c \|^2 the higher, the more anomalous """ assert center.ndim == 1 score = tf.reduce_sum((encoder.output - tf.constant(center)) ** 2, axis=1) detector = tf.keras.Model(inputs=encoder.input, outputs=score) return detector
[docs] @dataclasses.dataclass class TrainConfig: """Configuration of training process. Parameters ---------- epochs_pretrain epochs for pretraining (center initialization) epochs_train epochs for training of detector learning_rate learning rate of optimizer batch_size batch size """ epochs_pretrain: int = 10 epochs_train: int = 20 learning_rate: float = 1e-3 batch_size: int = 64
[docs] def train( self, X: Union[np.ndarray, csr_matrix], y: np.ndarray, path_model: str, config: TrainConfig, verbose: int = 1, ): r"""Train anomaly detector (self.detector) with encoder (local variable). Set self.detector, self.dim_input and self.history. Save encoder to path_model and loss-epoch plot next to it. Parameters ---------- X feature matrix :shape: (n_samples, n_features) y label 0 not labeled as normal 1 labeled as normal :shape: (n_samples, ) path_model path '\*\*.h5' to save trained model verbose verbosity of logging/output """ assert X.ndim == 2 assert y.ndim == 1 assert X.shape[0] == y.shape[0] os.makedirs(os.path.dirname(path_model), exist_ok=True) self.dim_input = X.shape[-1] optimizer = tf.keras.optimizers.Adam(learning_rate=config.learning_rate) # training autoencoder for the weight and center initialization if self.path_pretrained_model is None: logger.info( "start detector weight initialization with epochs %s", config.epochs_pretrain, ) encoder = self._build_encoder() autoencoder = self._build_autoencoder( encoder.input, encoder.layers[-1].output ) autoencoder.compile(optimizer=optimizer, loss="mse") if isinstance(X, csr_matrix): generator = generator_autoencoder_training(X, config.batch_size) autoencoder.fit( generator, epochs=config.epochs_pretrain, verbose=verbose ) elif isinstance(X, np.ndarray): autoencoder.fit(X, X, epochs=config.epochs_pretrain, verbose=verbose) else: logger.info("load pre-trained detector from %s", self.path_pretrained_model) detector = tf.keras.models.load_model(self.path_pretrained_model) encoder = tf.keras.Model( inputs=detector.input, outputs=detector.get_layer(LAYERNAME_ENCODER_OUTPUT).output, ) center = encoder.predict(X).mean(axis=0) loss = loss_sad(c=center, eta=self.eta) callbacks = [ ModelCheckpoint( filepath=path_model, monitor="loss", verbose=0, save_best_only=True, mode="auto", ) ] # encoder training logger.info("start detector training with epochs %s", config.epochs_train) encoder.compile(optimizer=optimizer, loss=loss) self.history = encoder.fit( X, y, epochs=config.epochs_train, initial_epoch=0, callbacks=callbacks, verbose=verbose, ) encoder.load_weights(path_model) # build anomaly detector from the trained encoder and center self.detector = self._build_detector(encoder, center) self.detector.save(path_model) logger.info("save detector on %s", path_model) # plot and save training process for debugging plt.plot(self.history.history["loss"], marker="o", label="training loss") # plt.plot(epochs, val_loss, 'b' , label= 'validation loss') plt.title("detector training process") plt.xlabel("epoch") plt.ylabel("loss") plt.legend() plt.savefig(os.path.join(os.path.dirname(path_model), "training_process.png")) plt.close()
[docs] def load_detector(self, path_model: str): """Load pre-trained anomaly detector""" self.detector = load_model(path_model)
[docs] def compute_anomaly_score( self, X: Union[np.ndarray, csr_matrix], scale=True ) -> np.ndarray: """Compute anomaly score Parameters ---------- X :shape: (n_samples, n_features) scale scale anomaly scores Returns ------- score : ndarray anomaly scores :shape: (n_samples, ) """ # Without type annotation ": ndarray" after score, sphinx treats "score" as type. # some text and a blank line is needed before :shape: too. score = self.detector.predict(X) if not scale: return score # scale anomaly score med = np.median(score) var = np.median(np.abs(score - med)) if var == 0: var = med return (score - med) / var
[docs] def compute_embeddings( self, X: Union[np.ndarray, csr_matrix] ) -> Optional[np.ndarray]: """Compute input embeddings (latent representation/output of bottleneck layer) Parameters ---------- X :shape: (n_samples, n_features) Returns ------- feature : ndarray embedding for each input :shape: (n_samples, dim_embedding) """ if X.shape[0] == 0: return None encoder = tf.keras.Model( inputs=self.detector.input, outputs=self.detector.get_layer(LAYERNAME_ENCODER_OUTPUT).output, ) return encoder.predict(X)
[docs] def explain_anomaly( self, X_anomaly: Union[np.ndarray, csr_matrix], background_samples: Union[np.ndarray, csr_matrix], zero_correction=True, shapvalue_scale=True, ): """Compute Shapley values (degree of contribution to anomaly) of each feature for anomaly samples Parameters ---------- X_anomaly feature matrix of anomaly samples :shape: (n_anomaly_samples, n_features) background_samples background samples used to compute Shapley values, typically randomly sampled from training set :shape: (n_background_samples, n_features) zero_correction: bool set Shapley value to zero if the corresponding feature is zero shapvalue_scale: bool scale Shapley values into [1,Inf) (just for simplicity) Returns ------- Shapley values :shape: (n_anomaly_samples, n_features) Notes ----- Uses `SHAP by Scott Lundberg <https://github.com/slundberg/shap>`_. """ if isinstance(X_anomaly, csr_matrix): X_anomaly = X_anomaly.toarray() num_background_samples = background_samples.shape[0] if num_background_samples >= 100: idx = random.sample(range(num_background_samples), 100) background_samples = background_samples[idx] if isinstance(background_samples, csr_matrix): background_samples = background_samples.toarray() explainer = shap.GradientExplainer(self.detector, background_samples) # explainer = shap.DeepExplainer( self.detector, background_samples ) # DeepExplainer is not available shap_values = explainer.shap_values(X_anomaly) if zero_correction: shap_values[X_anomaly == 0] = 0 if shapvalue_scale: zero_mask = X_anomaly != 0 shap_mins = (shap_values * zero_mask).min(axis=-1, keepdims=True) shap_values = shap_values - shap_mins + 1 shap_values[~zero_mask] = 0 return shap_values
[docs]def detection_report( score_sorted: Series, shap_value_idx_sorted: DataFrame, shap_top_k: int = 5, ) -> DataFrame: """detection report Parameters ---------- score_sorted anomaly score, sorted in descending order :index: (datetime_rounded, src_ip) shap_value_idx_sorted Shapley values of anomaly samples, sorted in descending order by anomaly score :index: (datetime_rounded, src_ip), top-n of score_sorted :columns: features shap_top_k number of Shapley values to include per (datetime_rounded, src_ip) Returns ------- detection_report :index: (datetime_rounded, src_ip) :columns: anomaly_score, shap_top_{i}, top_{i}_shap_value for 0 < i <= shap_top_k """ logger.info(score_sorted.index) logger.info(shap_value_idx_sorted.index) shap_top_k = min(shap_top_k, shap_value_idx_sorted.shape[-1]) columns = [ ["shap_top_" + str(k + 1), "top_" + str(k + 1) + "_shap_value"] for k in range(shap_top_k) ] columns = sum(columns, []) dtypes = dict(zip(columns, ["str", "float"] * shap_top_k)) df_shap = pd.DataFrame( 0, index=score_sorted.index, columns=columns, ).astype(dtypes) for i, sample in enumerate(shap_value_idx_sorted.index): shap_values = shap_value_idx_sorted.loc[sample].sort_values(ascending=False) fe = ["__".join(l) for l in list(shap_values.index[:shap_top_k])] value = list(shap_values.iloc[:shap_top_k]) for k in range(shap_top_k): if value[k] == 0: fe[k] = 0 df_shap.iloc[i, np.arange(0, shap_top_k * 2, 2)] = fe df_shap.iloc[i, np.arange(1, shap_top_k * 2, 2)] = value df_shap.insert(0, "anomaly_score", score_sorted) return df_shap