Source code for psykoda.io.labeled.file

"""File-based Previous Log Loader and Saver"""

from __future__ import annotations

import logging
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Optional

import pandas

from psykoda.constants import col
from psykoda.io.internal import load_csv_optional_zip
from psykoda.io.labeled.loader import Loader
from psykoda.io.labeled.saver import Saver

logger = logging.getLogger(__name__)


[docs]@dataclass class FileStorageBaseConfig: """Common configuration for FileLoader and FileSaver""" dir: str labeled_basename_format_datetime: str = "%Y-%m-%d-%H"
[docs]@dataclass class FileStorageConfig: """Configuration fed to factory""" base: FileStorageBaseConfig load: FileLoader.Config save: FileSaver.Config
[docs]class FileLoader(Loader): """File-based Loader"""
[docs] @dataclass class Config: """Configuration of FileLoader"""
def __init__(self, base_config: FileStorageBaseConfig, config: Config): self.base_config = base_config self.config = config
[docs] def load_previous_log(self, entries: pandas.MultiIndex) -> pandas.DataFrame: logs = [] for entry in entries: log = self._load_previous_log(*entry) if log is not None: logs.append(log.loc[entry]) if logs: return pandas.concat(logs) return pandas.DataFrame()
def _load_previous_log( self, dt: datetime, src_ip: str ) -> Optional[pandas.DataFrame]: base_file_name = os.path.join( self.base_config.dir, dt.strftime( f"{self.base_config.labeled_basename_format_datetime}__{src_ip}" ), ) try: return load_csv_optional_zip( base_file_name, parse_dates=[col.DATETIME_ROUNDED, col.DATETIME_FULL] ).set_index([col.DATETIME_ROUNDED, col.SRC_IP]) except FileNotFoundError as ex: logger.warning( "labeled[%s] does not exist in %s", (dt, src_ip), self.base_config.dir, exc_info=ex, ) return None
[docs]class FileSaver(Saver): """File-based Saver"""
[docs] @dataclass class Config: """Configuration of FileSaver""" all: bool = False compression: bool = False
def __init__(self, base_config: FileStorageConfig, config: Config): self.base_config = base_config self.config = config if not os.path.isdir(self.base_config.dir): os.makedirs(self.base_config.dir)
[docs] def save_previous_log(self, df: pandas.DataFrame, entries: pandas.MultiIndex): for (dt, src_ip) in entries: self._save_previous_log(df, dt, src_ip)
def _save_previous_log( self, df: pandas.DataFrame, dt: datetime, src_ip: str, ) -> str: basename = dt.strftime( f"{self.base_config.labeled_basename_format_datetime}__{src_ip}" ) if self.config.compression: kwargs = { "path_or_buf": os.path.join(self.base_config.dir, basename + ".zip"), "compression": { "method": "zip", "archive_name": basename, }, } else: kwargs = { "path_or_buf": os.path.join(self.base_config.dir, basename + ".csv"), } logger.debug("len(df) = %s", len(df)) df = df[df.index.get_level_values(col.SRC_IP) == src_ip] logger.debug("len(df|%s) = %s", src_ip, len(df)) if not self.config.all: df = df[df.index.get_level_values(col.DATETIME_ROUNDED) == dt] logger.debug("len(df|(%s,%s)) = %s", dt, src_ip, len(df)) df.to_csv(**kwargs) return kwargs["path_or_buf"]