psykoda.cli package

Submodules

psykoda.cli.internal module

class psykoda.cli.internal.AnomalyDetectionConfig(required_srcip: psykoda.cli.internal.SkipDetectionConfig, deepsad: psykoda.detection.DeepSAD.Config, train: psykoda.detection.DeepSAD.TrainConfig, threshold: psykoda.cli.internal.ThresholdConfig)[source]

Bases: object

deepsad: psykoda.detection.DeepSAD.Config
required_srcip: psykoda.cli.internal.SkipDetectionConfig
threshold: psykoda.cli.internal.ThresholdConfig
train: psykoda.detection.DeepSAD.TrainConfig
class psykoda.cli.internal.ArgumentsConfig(target_period: psykoda.cli.internal.TargetPeriod)[source]

Bases: object

Arguments modification configuration

Parameters

target_period (psykoda.cli.internal.TargetPeriod) – default target period used to determine date_from and date_to values if missing.

target_period: psykoda.cli.internal.TargetPeriod
class psykoda.cli.internal.DetectConfig(arguments: psykoda.cli.internal.ArgumentsConfig, detection_units: psykoda.cli.internal.DetectionUnitConfig, io: psykoda.cli.internal.IOConfig, preprocess: psykoda.cli.internal.PreprocessConfig, feature_extraction: psykoda.feature_extraction.FeatureExtractionConfig, anomaly_detection: psykoda.cli.internal.AnomalyDetectionConfig)[source]

Bases: object

anomaly_detection: psykoda.cli.internal.AnomalyDetectionConfig
arguments: psykoda.cli.internal.ArgumentsConfig
detection_units: psykoda.cli.internal.DetectionUnitConfig
feature_extraction: psykoda.feature_extraction.FeatureExtractionConfig
io: psykoda.cli.internal.IOConfig
preprocess: psykoda.cli.internal.PreprocessConfig
class psykoda.cli.internal.DetectionUnitConfig(services: Dict[str, psykoda.cli.internal.Service], subnets: Dict[str, psykoda.cli.internal.Subnet])[source]

Bases: object

Detection unit configuration

Parameters
services: Dict[str, psykoda.cli.internal.Service]
subnets: Dict[str, psykoda.cli.internal.Subnet]
class psykoda.cli.internal.IOConfig(input: psykoda.cli.internal.InputConfig, previous: psykoda.cli.internal.PreviousConfig, output: psykoda.cli.internal.OutputConfig)[source]

Bases: object

input: psykoda.cli.internal.InputConfig
output: psykoda.cli.internal.OutputConfig
previous: psykoda.cli.internal.PreviousConfig
exception psykoda.cli.internal.Incomplete_Args_Exception[source]

Bases: Exception

class psykoda.cli.internal.InputConfig(dir: str)[source]

Bases: object

dir: str
class psykoda.cli.internal.LoadPreviousConfig(known_normal: Optional[psykoda.cli.internal.LoadPreviousConfigItem], known_anomaly: Optional[psykoda.cli.internal.LoadPreviousConfigItem], unknown: Optional[psykoda.cli.internal.LoadPreviousConfigItem])[source]

Bases: object

Log loading settings.

Parameters
  • list – path to CSV file in which labeled IP addresses are listed

  • ndate – time range for labeled IP addresses, in days

known_anomaly: Optional[psykoda.cli.internal.LoadPreviousConfigItem]
known_normal: Optional[psykoda.cli.internal.LoadPreviousConfigItem]
unknown: Optional[psykoda.cli.internal.LoadPreviousConfigItem]
class psykoda.cli.internal.LoadPreviousConfigItem(list: Union[str, NoneType], ndate: int = 730)[source]

Bases: object

list: Optional[str]
ndate: int = 730
class psykoda.cli.internal.OutputConfig(dir: str, share_dir: Union[str, NoneType], subdir: Union[str, NoneType])[source]

Bases: object

dir: str
share_dir: Optional[str]
subdir: Optional[str]
class psykoda.cli.internal.PreprocessConfig(exclude_lists: Union[str, NoneType], screening: psykoda.preprocess.ScreeningConfig)[source]

Bases: object

exclude_lists: Optional[str]
screening: psykoda.preprocess.ScreeningConfig
class psykoda.cli.internal.PreviousConfig(load: psykoda.cli.internal.LoadPreviousConfig, log: psykoda.io.labeled.file.FileStorageConfig)[source]

Bases: object

load: psykoda.cli.internal.LoadPreviousConfig
log: psykoda.io.labeled.file.FileStorageConfig
class psykoda.cli.internal.Service(include: Optional[List[int]], exclude: Optional[List[int]])[source]

Bases: object

Service definition: set of destination port numbers

Examples

>>> all = Service()
>>> ssh = Service(include=[22])
>>> all_but_ssh = Service(exclude=[22])
>>> ssh_or_https = Service(include=[22, 443])
exclude: Optional[List[int]]
include: Optional[List[int]]
class psykoda.cli.internal.SkipDetectionConfig(train: int, test: int)[source]

Bases: object

test: int
train: int
class psykoda.cli.internal.Subnet(cidrs: List[str], services: List[str])[source]

Bases: object

Subnet configuration: set of CIDR-formatted IP addresses with services to analyze

Examples

>>> private_A = Subnet(["10.0.0.0/8"], get_names_of_services_from_config())
>>> private = Subnet(["private-A", "private-B", "private-C"], get_names_of_services_from_config())  # these constants are available for convenience and readability
>>> my_network = Subnet(["10.0.0.0/16", "10.1.1.0/24"], get_names_of_services_from_config())
cidrs: List[str]
services: List[str]
class psykoda.cli.internal.TargetPeriod(days: int = 30)[source]

Bases: object

days: int = 30
class psykoda.cli.internal.ThresholdConfig(num_anomaly: int, min_score: float)[source]

Bases: object

min_score: float
num_anomaly: int
psykoda.cli.internal.apply_exclude_lists(log: pandas.core.frame.DataFrame, dir_exclude_lists: Optional[str])pandas.core.frame.DataFrame[source]

exclude logs according to exclude lists in dir_exclude_lists

Parameters
  • log – Source log.

  • dir_exclude_lists – The path of directory containing exclude list csv files.

Returns

Log after applying exclude list.

Return type

log

psykoda.cli.internal.configure_logging(debug: bool)[source]

Configure execution log settings.

Parameters

debug – Whether to log “debug levels”.

psykoda.cli.internal.detect_per_unit(config: psykoda.cli.internal.DetectConfig, service_name: str, log_all: pandas.core.frame.DataFrame, subnet: Tuple[str, psykoda.cli.internal.Subnet], args)[source]
psykoda.cli.internal.load_log(dir_IDS_log: str, date_from: datetime.datetime, date_to: datetime.datetime, nrows_read: Optional[int] = None)pandas.core.frame.DataFrame[source]

load IDS logs of the dates in [ date_from, date_to ]

Parameters
  • dir_IDS_log – The path of the directory containing logs to be load.

  • date_from – Start date.

  • date_to – End date.

  • nrows_read – Maximum number of rows to load, by default None

Returns

IDS log.

Return type

log

psykoda.cli.internal.load_preprocess_log(args, config: psykoda.cli.internal.DetectConfig)[source]

Load and preprocess log.

Warning

Sets config.io.output.subdir

psykoda.cli.internal.load_previous(config: psykoda.cli.internal.LoadPreviousConfigItem, date_to: datetime.datetime, label_value: float)pandas.core.series.Series[source]
psykoda.cli.internal.main_detection(args, config: psykoda.cli.internal.DetectConfig, log: pandas.core.frame.DataFrame, label: pandas.core.series.Series)[source]
Parameters
  • args

  • config

  • log

    index

    columns

  • label – filled with 1 :index:

psykoda.cli.internal.main_detection_after_prepare_data(args, label: pandas.core.series.Series, feature_label: psykoda.feature_extraction.FeatureLabel)[source]

Split data and construct labeled training feature.

psykoda.cli.internal.main_detection_prepare_data(args, config: psykoda.feature_extraction.FeatureExtractionConfig, log: pandas.core.frame.DataFrame, label: pandas.core.series.Series)Optional[psykoda.feature_extraction.FeatureLabel][source]

Feature extraction

psykoda.cli.internal.main_detection_skip_or_detect(args, log: pandas.core.frame.DataFrame, label: pandas.core.series.Series, dir_report: str, feature_label: psykoda.feature_extraction.FeatureLabel, train_test_splitted, x_train_labeled: scipy.sparse.csr.csr_matrix, anomaly_detection_config: psykoda.cli.internal.AnomalyDetectionConfig, previous_config: psykoda.io.labeled.file.FileStorageConfig)dict[source]

Anomaly detection and output the result.

psykoda.cli.internal.main_preproc_and_detection(args, config: psykoda.cli.internal.DetectConfig)[source]

Data preprocessing and anomaly detection.

psykoda.cli.internal.output_result(args, log: pandas.core.frame.DataFrame, label: pandas.core.series.Series, dir_report: str, *, x_train_labeled_embeddings, x_test_embeddings, idx_anomaly, shap_value_idx_sorted, anomaly_score_sorted, stats: dict, previous_config: psykoda.io.labeled.file.FileStorageConfig)[source]

Plot the detection result and output the report.

psykoda.cli.internal.report_all(path_list_stats: List[str], path_save: str)[source]

Summarizing all reports and save it.

Parameters
  • path_list_stats (list) – List of stats file paths

  • path_save (str) – File path where the report will be saved

psykoda.cli.internal.report_transfer(path: str, dir_to: str)[source]

Copy Report Files to the Directory

Parameters
  • path (str) – File path of the report to copy.

  • dir_to (str) – Directory Path of destination directory. If you specify a directory that does not exist, a new directory is created.

Raises
  • TypeError – Destination directory not specified.

  • TypeError – Report file does not exist.

psykoda.cli.internal.set_default_date_detect(args, config: psykoda.cli.internal.ArgumentsConfig)[source]

Configure training from/to dates according to args and config.

Parameters
  • args – Command line args.

  • config – Settings for arguments.

Returns

Command line args with training from/to dates added.

Return type

args

psykoda.cli.internal.validate_patterns(patterns: pandas.core.indexes.base.Index)[source]

Strip whitespaces in Index from left and right sides.

Parameters

patterns – Index.

Returns

Index with whitespace removed.

Return type

ret

Module contents

Command line interface

psykoda.cli.main()[source]

Parse command line arguments and call main routine.

Raises

ValueError – Command line arguments are invalid.

psykoda.cli.main_detect(args, config: psykoda.cli.internal.DetectConfig)[source]

Main routine for anmaly detection.

Parameters
  • args – Command line arguments.

  • config – Settings for this command.

psykoda.cli.strptime(date_string: str)[source]

Convert a string to datetime in COMMANDLINE_DATE_FORMAT format.

Parameters

date_string – Date string

Returns

Return type

datetime