psykoda package

Subpackages

Submodules

psykoda.detection module

Anomaly Detection and Explanation.

class psykoda.detection.DeepSAD(config: psykoda.detection.DeepSAD.Config)[source]

Bases: object

Deep SAD Semi-supervised Anomaly Detector.

Translated from paper author Lukas Ruff’s PyTorch implementation into TensorFlow.

Todo

more detailed description, including comparison with PyTorch version.

dim_hidden

from Config

eta

from Config

lam

from Config

path_pretrained_model

from Config

dim_input

number of features

history
detector

Original License

MIT License

Copyright (c) 2019 lukasruff

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

class Config(dim_hidden: List[int] = <factory>, eta: float = 1.0, lam: float = 1e-06, path_pretrained_model: Optional[str] = None)[source]

Bases: object

Configuration for DeepSAD model

Parameters
  • dim_hidden (List[int]) – number of units in hidden layers

  • eta (float) – Deep SAD regularization hyperparameter eta (must be 0 < eta) balancing the loss for labeled and unlabeled samples

  • lam (float) – regularization parameter on L2-norm of weights

  • path_pretrained_model (Optional[str]) – path to pretrained model (currently unused)

dim_hidden: List[int]
eta: float = 1.0
lam: float = 1e-06
path_pretrained_model: Optional[str] = None
class TrainConfig(epochs_pretrain: int = 10, epochs_train: int = 20, learning_rate: float = 0.001, batch_size: int = 64)[source]

Bases: object

Configuration of training process.

Parameters
  • epochs_pretrain (int) – epochs for pretraining (center initialization)

  • epochs_train (int) – epochs for training of detector

  • learning_rate (float) – learning rate of optimizer

  • batch_size (int) – batch size

batch_size: int = 64
epochs_pretrain: int = 10
epochs_train: int = 20
learning_rate: float = 0.001
compute_anomaly_score(X: Union[numpy.ndarray, scipy.sparse.csr.csr_matrix], scale=True)numpy.ndarray[source]

Compute anomaly score

Parameters
  • X

    shape

    (n_samples, n_features)

  • scale – scale anomaly scores

Returns

score – anomaly scores

shape

(n_samples, )

Return type

ndarray

compute_embeddings(X: Union[numpy.ndarray, scipy.sparse.csr.csr_matrix])Optional[numpy.ndarray][source]

Compute input embeddings (latent representation/output of bottleneck layer)

Parameters

X

shape

(n_samples, n_features)

Returns

feature – embedding for each input

shape

(n_samples, dim_embedding)

Return type

ndarray

explain_anomaly(X_anomaly: Union[numpy.ndarray, scipy.sparse.csr.csr_matrix], background_samples: Union[numpy.ndarray, scipy.sparse.csr.csr_matrix], zero_correction=True, shapvalue_scale=True)[source]

Compute Shapley values (degree of contribution to anomaly) of each feature for anomaly samples

Parameters
  • X_anomaly

    feature matrix of anomaly samples

    shape

    (n_anomaly_samples, n_features)

  • background_samples

    background samples used to compute Shapley values, typically randomly sampled from training set

    shape

    (n_background_samples, n_features)

  • zero_correction (bool) – set Shapley value to zero if the corresponding feature is zero

  • shapvalue_scale (bool) – scale Shapley values into [1,Inf) (just for simplicity)

Returns

shape

(n_anomaly_samples, n_features)

Return type

Shapley values

Notes

Uses SHAP by Scott Lundberg.

load_detector(path_model: str)[source]

Load pre-trained anomaly detector

train(X: Union[numpy.ndarray, scipy.sparse.csr.csr_matrix], y: numpy.ndarray, path_model: str, config: psykoda.detection.DeepSAD.TrainConfig, verbose: int = 1)[source]

Train anomaly detector (self.detector) with encoder (local variable).

Set self.detector, self.dim_input and self.history. Save encoder to path_model and loss-epoch plot next to it.

Parameters
  • X

    feature matrix

    shape

    (n_samples, n_features)

  • y

    label
    0

    not labeled as normal

    1

    labeled as normal

    shape

    (n_samples, )

  • path_model – path ‘**.h5’ to save trained model

  • verbose – verbosity of logging/output

psykoda.detection.dense_block(inputs: tensorflow.python.framework.ops.Tensor, units: int, lam: float, name: str)tensorflow.python.keras.engine.base_layer.Layer[source]

Basic block (Dense-LeakyReLU layers) of multi layer perceptron.

Parameters
  • input – input of block

  • units – number of the units in the Dense layer

  • lam – regularization parameter on the weights in Dense layer

  • name – name of block; “_dense” and “_LeakyReLU” are appended for the layers

Returns

Dense-LeakyReLu layers

Return type

output

psykoda.detection.detection_report(score_sorted: pandas.core.series.Series, shap_value_idx_sorted: pandas.core.frame.DataFrame, shap_top_k: int = 5)pandas.core.frame.DataFrame[source]

detection report

Parameters
  • score_sorted

    anomaly score, sorted in descending order

    index

    (datetime_rounded, src_ip)

  • shap_value_idx_sorted

    Shapley values of anomaly samples, sorted in descending order by anomaly score

    index

    (datetime_rounded, src_ip), top-n of score_sorted

    columns

    features

  • shap_top_k – number of Shapley values to include per (datetime_rounded, src_ip)

Returns

index

(datetime_rounded, src_ip)

columns

anomaly_score, shap_top_{i}, top_{i}_shap_value for 0 < i <= shap_top_k

Return type

detection_report

class psykoda.detection.generator_autoencoder_training(X: scipy.sparse.csr.csr_matrix, batch_size: int)[source]

Bases: tensorflow.python.keras.utils.data_utils.Sequence

Sparse matrix as batches of dense arrays

on_epoch_end()[source]

Method called at the end of every epoch.

psykoda.detection.loss_sad(c, eta=1.0)[source]

Loss function for Deep SAD

References

[1] L. Ruff, R. A. Vandermeulen, N. Görnitz, A. Binder, E. Müller, K.-R. Müller, M. Kloft, “Deep Semi-Supervised Anomaly Detection”, https://arxiv.org/abs/1906.02694

psykoda.feature_extraction module

Extract and manage feature values

class psykoda.feature_extraction.FeatureExtractionConfig(idf: Dict[str, psykoda.feature_extraction.IDFConfig], address_to_location: Optional[str])[source]

Bases: object

Settings for feature_extraction.

address_to_location: Optional[str]
idf: Dict[str, psykoda.feature_extraction.IDFConfig]
class psykoda.feature_extraction.FeatureLabel(*, feature: scipy.sparse.csr.csr_matrix, index: List[Tuple[datetime.datetime, str]], columns: list, label: Optional[numpy.ndarray] = None, idf_sid: pandas.core.series.Series, idf_dport: pandas.core.series.Series)[source]

Bases: object

Feature matrix with label values

Parameters
  • feature

    scipy sparse feature matrix

    shape

    (n_samples, n_features)

  • index

    index of feature matrix

    length

    n_samples

  • columns

    column of feature matrix

    length

    n_features

  • label

    labels

    shape

    (n_samples, )

  • idf_sid – IDF (Inverse Document Frequency) for, and indexed by, sid

  • idf_dport – IDF for, and indexed by, dest_port

extract_nonzeros()[source]

restructure feature matrix by excluding all zero rows/cols

extract_nonzeros_cols()[source]

exclude columns whose elements are all zeros

extract_nonzeros_rows()[source]

exclude rows whose elements are all zeros

loc(sample: Tuple[datetime.datetime, str])pandas.core.series.Series[source]

correspond to DataFrame.loc[sample] e.g. sample = (pandas.Timestamp(“2021-04-01 14:00:00”), “10.1.1.1”)

Parameters

sample

Returns

A series of features corresponding to sample.

Return type

Series

put_labels(labeled_samples: pandas.core.series.Series)[source]

Assign label value 1 to known normal samples

Parameters

labeled_samples

all-1 vector whose indexes are known normal

index

Index[datetime_rounded: datetime, src_ip: str]

split_train_test(date_to_training: datetime.datetime)Tuple[scipy.sparse.csr.csr_matrix, pandas.core.series.Series, scipy.sparse.csr.csr_matrix, pandas.core.indexes.base.Index][source]

split feature matrix and return training and test sets

Parameters

date_to_training – samples (and their features) earlier than date_to_training are used for training.

Returns

  • X_train – feature matrix for training

    shape

    (n_samples_train, n_features)

  • y_train – labels for training

    length

    n_samples_train

  • X_test – feature matrix for anomaly detection

    shape

    (n_samples_test, n_features)

  • index_test (Index[datetime_rounded: datetime, src_ip: str]) – row index for anomaly detection

    length

    n_samples_test

Notes

date_to_training is compared against datetime_rounded.replace(hour=0). Samples with equality are included in training set.

class psykoda.feature_extraction.IDFConfig(min_count: int, num_feature: int)[source]

Bases: object

Settings for IDF (Inverse Document Frequency).

min_count: int
num_feature: int
psykoda.feature_extraction.calculate_idf(log: pandas.core.frame.DataFrame, column: str, num_idf_feature: int, min_count: int)Tuple[pandas.core.series.Series, List[Tuple[Any, pandas.core.frame.DataFrame]]][source]

Calculate Inverse Document Frequency (IDF) of given column.

Every unique index is considered a document. A value of given column is considered to appear in the document if and only if there is at least one row with the index and column values.

Parameters
  • log

  • column – name of column to calculate IDF values on.

  • num_idf_feature – (soft) maximum number of unique values of log[column] to keep IDF for.

  • min_count – minimum number of appearance of a log[column] value to calculate IDF for.

Returns

  • idf – log(1 + raw_idf), indexed by column values e.g. Series([5.3, 3.2, 2.8], index=[22, 80, 3389], name=dest_port_idf)

  • groups – list of (column_value, matching_dataframe), indexed the same as idf

psykoda.feature_extraction.feature_extraction_all(log: pandas.core.frame.DataFrame, idf_config: Dict[str, psykoda.feature_extraction.IDFConfig], iptable: pandas.core.frame.DataFrame)Optional[psykoda.feature_extraction.FeatureLabel][source]

compute feature matrix from preprocessed log for each sample

Parameters
  • log

    data to construct feature matrix from.

    index

    (datetime_rounded, src_ip) (exact match)

    assumed

    (dest_ip, dest_port, sid, src_port) (included)

  • idf_config

    Configuration for IDF

    key

    column

    value

    configuration

    refer to calculate_idf and its unittests.

  • iptable

    IP locations definition table

    Todo

    example

psykoda.feature_extraction.find_ip_location(ip_list: List[str], iptable: pandas.core.frame.DataFrame)pandas.core.series.Series[source]

Find location information for IP addresses

Current implementation returns first matches, but this is not part of specification.

Parameters
  • ip_list

  • iptable

    location information table with at least two columns

    IP_TABLE_SUBNET

    network address in CIDR format

    IP_TABLE_LOCATION

    location name

Returns

location – location information

index

ip_address: str

value

location_name: str

Return type

Series

psykoda.feature_extraction.location_matcher(subnets: Iterable[str], locations: Iterable[str])[source]

Generate a function that returns location when you input ip address.

Parameters
  • subnets – Subnet addresses in CIDR format.

  • locations (Iterable[str]) – Location names for each subnet address.

Returns

A function that returns location when you input ip address.

Return type

matcher

psykoda.utils module

Miscellaneous utilities

class psykoda.utils.DateRange(*, start_inclusive: Optional[datetime.datetime] = None, start_exclusive: Optional[datetime.datetime] = None, end_inclusive: Optional[datetime.datetime] = None, end_exclusive: Optional[datetime.datetime] = None, length: Optional[int] = None)[source]

Bases: object

psykoda.utils.daterange2list(start_inclusive: datetime.datetime, end_inclusive: datetime.datetime)List[datetime.datetime][source]

Construct list from range of dates.

Todo

Replace with date range object with iterator?

psykoda.utils.dmap(f: Callable[[psykoda.utils.K, psykoda.utils.V], psykoda.utils.R], d: Dict[psykoda.utils.K, psykoda.utils.V])Dict[psykoda.utils.K, psykoda.utils.R][source]

map over dict items.

psykoda.utils.first(t: Tuple[psykoda.utils.F, psykoda.utils.S])psykoda.utils.F[source]

First item of 2-tuple.

psykoda.utils.flip(t: Tuple[psykoda.utils.F, psykoda.utils.S])Tuple[psykoda.utils.S, psykoda.utils.F][source]

Swap first and second items of 2-tuple.

psykoda.utils.get_series(index: pandas.core.indexes.base.Index, level: Union[int, str])pandas.core.series.Series[source]

get_velel_values as Series, indexed by itself.

psykoda.utils.index_from_sorted(ls: List[psykoda.utils.V])Dict[psykoda.utils.V, int][source]

Minimal perfect (non-cryptographic) hash from unique values.

Works for unique unsorted list too, but named as sorted.

psykoda.utils.index_from_unsorted(it: Iterable[psykoda.utils.V])Dict[psykoda.utils.V, int][source]

Minimal perfect (non-cryptographic) hash from values.

psykoda.utils.load_json(path: str)dict[source]

Load object from .json file.

json.load(object_hook) is used to construct pandas.Timestamp from object like {type: datetime, value: 2021-04-01}.

psykoda.utils.replace_match(d: Dict[psykoda.utils.V, psykoda.utils.V], v: psykoda.utils.V)psykoda.utils.V[source]

Replace value, if match is found.

Parameters
  • d – replacements

  • v – replacee

psykoda.utils.save_json(obj: dict, path: str)[source]

Save object to .json file.

json.dump(default) is used to serialize datetime as object like {type: datetime, value: 2021-04-01}.

psykoda.utils.second(t: Tuple[psykoda.utils.F, psykoda.utils.S])psykoda.utils.S[source]

Second item of 2-tuple.

psykoda.utils.vmap(f: Callable[[psykoda.utils.V], psykoda.utils.R], d: Dict[psykoda.utils.K, psykoda.utils.V])Dict[psykoda.utils.K, psykoda.utils.R][source]

map over dict values.

Module contents