psykoda.cli package¶
Submodules¶
psykoda.cli.internal module¶
- class psykoda.cli.internal.AnomalyDetectionConfig(required_srcip: psykoda.cli.internal.SkipDetectionConfig, deepsad: psykoda.detection.DeepSAD.Config, train: psykoda.detection.DeepSAD.TrainConfig, threshold: psykoda.cli.internal.ThresholdConfig)[source]¶
Bases:
object
- deepsad: psykoda.detection.DeepSAD.Config¶
- required_srcip: psykoda.cli.internal.SkipDetectionConfig¶
- threshold: psykoda.cli.internal.ThresholdConfig¶
- class psykoda.cli.internal.ArgumentsConfig(target_period: psykoda.cli.internal.TargetPeriod)[source]¶
Bases:
object
Arguments modification configuration
- Parameters
target_period (psykoda.cli.internal.TargetPeriod) – default target period used to determine date_from and date_to values if missing.
- target_period: psykoda.cli.internal.TargetPeriod¶
- class psykoda.cli.internal.DetectConfig(arguments: psykoda.cli.internal.ArgumentsConfig, detection_units: psykoda.cli.internal.DetectionUnitConfig, io: psykoda.cli.internal.IOConfig, preprocess: psykoda.cli.internal.PreprocessConfig, feature_extraction: psykoda.feature_extraction.FeatureExtractionConfig, anomaly_detection: psykoda.cli.internal.AnomalyDetectionConfig)[source]¶
Bases:
object
- anomaly_detection: psykoda.cli.internal.AnomalyDetectionConfig¶
- arguments: psykoda.cli.internal.ArgumentsConfig¶
- detection_units: psykoda.cli.internal.DetectionUnitConfig¶
- feature_extraction: psykoda.feature_extraction.FeatureExtractionConfig¶
- preprocess: psykoda.cli.internal.PreprocessConfig¶
- class psykoda.cli.internal.DetectionUnitConfig(services: Dict[str, psykoda.cli.internal.Service], subnets: Dict[str, psykoda.cli.internal.Subnet])[source]¶
Bases:
object
Detection unit configuration
- Parameters
services (Dict[str, psykoda.cli.internal.Service]) – map from names of service to service definitions
subnets (Dict[str, psykoda.cli.internal.Subnet]) – map from names of subnet to subnet configurations
- services: Dict[str, psykoda.cli.internal.Service]¶
- subnets: Dict[str, psykoda.cli.internal.Subnet]¶
- class psykoda.cli.internal.IOConfig(input: psykoda.cli.internal.InputConfig, previous: psykoda.cli.internal.PreviousConfig, output: psykoda.cli.internal.OutputConfig)[source]¶
Bases:
object
- previous: psykoda.cli.internal.PreviousConfig¶
- class psykoda.cli.internal.LoadPreviousConfig(known_normal: Optional[psykoda.cli.internal.LoadPreviousConfigItem], known_anomaly: Optional[psykoda.cli.internal.LoadPreviousConfigItem], unknown: Optional[psykoda.cli.internal.LoadPreviousConfigItem])[source]¶
Bases:
object
Log loading settings.
- Parameters
list – path to CSV file in which labeled IP addresses are listed
ndate – time range for labeled IP addresses, in days
- known_anomaly: Optional[psykoda.cli.internal.LoadPreviousConfigItem]¶
- known_normal: Optional[psykoda.cli.internal.LoadPreviousConfigItem]¶
- unknown: Optional[psykoda.cli.internal.LoadPreviousConfigItem]¶
- class psykoda.cli.internal.LoadPreviousConfigItem(list: Union[str, NoneType], ndate: int = 730)[source]¶
Bases:
object
- list: Optional[str]¶
- ndate: int = 730¶
- class psykoda.cli.internal.OutputConfig(dir: str, share_dir: Union[str, NoneType], subdir: Union[str, NoneType])[source]¶
Bases:
object
- dir: str¶
- subdir: Optional[str]¶
- class psykoda.cli.internal.PreprocessConfig(exclude_lists: Union[str, NoneType], screening: psykoda.preprocess.ScreeningConfig)[source]¶
Bases:
object
- exclude_lists: Optional[str]¶
- screening: psykoda.preprocess.ScreeningConfig¶
- class psykoda.cli.internal.PreviousConfig(load: psykoda.cli.internal.LoadPreviousConfig, log: psykoda.io.labeled.file.FileStorageConfig)[source]¶
Bases:
object
- class psykoda.cli.internal.Service(include: Optional[List[int]], exclude: Optional[List[int]])[source]¶
Bases:
object
Service definition: set of destination port numbers
Examples
>>> all = Service() >>> ssh = Service(include=[22]) >>> all_but_ssh = Service(exclude=[22]) >>> ssh_or_https = Service(include=[22, 443])
- exclude: Optional[List[int]]¶
- include: Optional[List[int]]¶
- class psykoda.cli.internal.SkipDetectionConfig(train: int, test: int)[source]¶
Bases:
object
- test: int¶
- train: int¶
- class psykoda.cli.internal.Subnet(cidrs: List[str], services: List[str])[source]¶
Bases:
object
Subnet configuration: set of CIDR-formatted IP addresses with services to analyze
Examples
>>> private_A = Subnet(["10.0.0.0/8"], get_names_of_services_from_config()) >>> private = Subnet(["private-A", "private-B", "private-C"], get_names_of_services_from_config()) # these constants are available for convenience and readability >>> my_network = Subnet(["10.0.0.0/16", "10.1.1.0/24"], get_names_of_services_from_config())
- cidrs: List[str]¶
- services: List[str]¶
- class psykoda.cli.internal.ThresholdConfig(num_anomaly: int, min_score: float)[source]¶
Bases:
object
- min_score: float¶
- num_anomaly: int¶
- psykoda.cli.internal.apply_exclude_lists(log: pandas.core.frame.DataFrame, dir_exclude_lists: Optional[str]) → pandas.core.frame.DataFrame[source]¶
exclude logs according to exclude lists in dir_exclude_lists
- Parameters
log – Source log.
dir_exclude_lists – The path of directory containing exclude list csv files.
- Returns
Log after applying exclude list.
- Return type
log
- psykoda.cli.internal.configure_logging(debug: bool)[source]¶
Configure execution log settings.
- Parameters
debug – Whether to log “debug levels”.
- psykoda.cli.internal.detect_per_unit(config: psykoda.cli.internal.DetectConfig, service_name: str, log_all: pandas.core.frame.DataFrame, subnet: Tuple[str, psykoda.cli.internal.Subnet], args)[source]¶
- psykoda.cli.internal.load_log(dir_IDS_log: str, date_from: datetime.datetime, date_to: datetime.datetime, nrows_read: Optional[int] = None) → pandas.core.frame.DataFrame[source]¶
load IDS logs of the dates in [ date_from, date_to ]
- Parameters
dir_IDS_log – The path of the directory containing logs to be load.
date_from – Start date.
date_to – End date.
nrows_read – Maximum number of rows to load, by default None
- Returns
IDS log.
- Return type
log
- psykoda.cli.internal.load_preprocess_log(args, config: psykoda.cli.internal.DetectConfig)[source]¶
Load and preprocess log.
Warning
Sets config.io.output.subdir
- psykoda.cli.internal.load_previous(config: psykoda.cli.internal.LoadPreviousConfigItem, date_to: datetime.datetime, label_value: float) → pandas.core.series.Series[source]¶
- psykoda.cli.internal.main_detection(args, config: psykoda.cli.internal.DetectConfig, log: pandas.core.frame.DataFrame, label: pandas.core.series.Series)[source]¶
- Parameters
args –
config –
log –
- index
- columns
label – filled with 1 :index:
- psykoda.cli.internal.main_detection_after_prepare_data(args, label: pandas.core.series.Series, feature_label: psykoda.feature_extraction.FeatureLabel)[source]¶
Split data and construct labeled training feature.
- psykoda.cli.internal.main_detection_prepare_data(args, config: psykoda.feature_extraction.FeatureExtractionConfig, log: pandas.core.frame.DataFrame, label: pandas.core.series.Series) → Optional[psykoda.feature_extraction.FeatureLabel][source]¶
Feature extraction
- psykoda.cli.internal.main_detection_skip_or_detect(args, log: pandas.core.frame.DataFrame, label: pandas.core.series.Series, dir_report: str, feature_label: psykoda.feature_extraction.FeatureLabel, train_test_splitted, x_train_labeled: scipy.sparse.csr.csr_matrix, anomaly_detection_config: psykoda.cli.internal.AnomalyDetectionConfig, previous_config: psykoda.io.labeled.file.FileStorageConfig) → dict[source]¶
Anomaly detection and output the result.
- psykoda.cli.internal.main_preproc_and_detection(args, config: psykoda.cli.internal.DetectConfig)[source]¶
Data preprocessing and anomaly detection.
- psykoda.cli.internal.output_result(args, log: pandas.core.frame.DataFrame, label: pandas.core.series.Series, dir_report: str, *, x_train_labeled_embeddings, x_test_embeddings, idx_anomaly, shap_value_idx_sorted, anomaly_score_sorted, stats: dict, previous_config: psykoda.io.labeled.file.FileStorageConfig)[source]¶
Plot the detection result and output the report.
- psykoda.cli.internal.report_all(path_list_stats: List[str], path_save: str)[source]¶
Summarizing all reports and save it.
- Parameters
path_list_stats (list) – List of stats file paths
path_save (str) – File path where the report will be saved
- psykoda.cli.internal.report_transfer(path: str, dir_to: str)[source]¶
Copy Report Files to the Directory
- Parameters
path (str) – File path of the report to copy.
dir_to (str) – Directory Path of destination directory. If you specify a directory that does not exist, a new directory is created.
- Raises
TypeError – Destination directory not specified.
TypeError – Report file does not exist.
- psykoda.cli.internal.set_default_date_detect(args, config: psykoda.cli.internal.ArgumentsConfig)[source]¶
Configure training from/to dates according to args and config.
- Parameters
args – Command line args.
config – Settings for arguments.
- Returns
Command line args with training from/to dates added.
- Return type
args
Module contents¶
Command line interface
- psykoda.cli.main()[source]¶
Parse command line arguments and call main routine.
- Raises
ValueError – Command line arguments are invalid.
- psykoda.cli.main_detect(args, config: psykoda.cli.internal.DetectConfig)[source]¶
Main routine for anmaly detection.
- Parameters
args – Command line arguments.
config – Settings for this command.