Skip to content

Shared Modules API

This section details the shared data structures and configuration utilities of the pipeline.

Experiment Data Manager

ace_neuro.shared.experiment_data_manager.ExperimentDataManager

Manages experiment metadata and analysis parameters.

This class loads data from two CSV files in the project repository: - experiments.csv: Experiment metadata (subject, date, directories, etc.) - analysis_parameters.csv: Processing settings for each experiment

Both project_path and data_path must be supplied explicitly — either directly or via CLI flags / notebook variables. There is no automatic fallback to environment variables or .env files.

Attributes:

Name Type Description
line_num int

The experiment line number.

metadata Optional[Dict[str, Any]]

Dict of experiment metadata from experiments.csv.

analysis_params Optional[Dict[str, Any]]

Dict of analysis parameters from analysis_parameters.csv.

Example

edm = ExperimentDataManager( ... 96, ... project_path="/home/user/projects/my_project", ... data_path="/data/raw" ... ) print(edm.metadata['id']) 'R230706B'

Source code in src/ace_neuro/shared/experiment_data_manager.py
class ExperimentDataManager:
    """Manages experiment metadata and analysis parameters.

    This class loads data from two CSV files in the project repository:
    - experiments.csv: Experiment metadata (subject, date, directories, etc.)
    - analysis_parameters.csv: Processing settings for each experiment

    Both ``project_path`` and ``data_path`` must be supplied explicitly —
    either directly or via CLI flags / notebook variables.  There is no
    automatic fallback to environment variables or .env files.

    Attributes:
        line_num: The experiment line number.
        metadata: Dict of experiment metadata from experiments.csv.
        analysis_params: Dict of analysis parameters from analysis_parameters.csv.

    Example:
        >>> edm = ExperimentDataManager(
        ...     96,
        ...     project_path="/home/user/projects/my_project",
        ...     data_path="/data/raw"
        ... )
        >>> print(edm.metadata['id'])
        'R230706B'
    """

    def __init__(
        self, 
        line_num: int,
        project_path: Optional[Union[str, Path]] = None,
        data_path: Optional[Union[str, Path]] = None,
        auto_import_metadata: bool = True,
        auto_import_analysis_params: bool = True,
        logging_level: Union[str, int] = logging.CRITICAL
    ) -> None:
        """Initialize the data manager for a specific experiment.

        Args:
            line_num: Experiment line number (matches 'line number' column in CSVs).
            project_path: Path to project directory containing metadata CSVs.
                Required for most workflows. Defaults to PROJECT_ROOT/data if
                not provided.
            data_path: Base path for raw experimental data.
                Defaults to PROJECT_ROOT/data/downloaded_data if not provided.
            auto_import_metadata: If True, load metadata from experiments.csv on init.
            auto_import_analysis_params: If True, load analysis params on init.
            logging_level: Logging verbosity ('DEBUG', 'INFO', 'WARNING', 'CRITICAL').
        """
        self.line_num: int = line_num
        self.project_path: Path = Path(project_path) if project_path else PROJECT_ROOT / "data"
        self.data_path: Path = Path(data_path) if data_path else PROJECT_ROOT / "data" / "downloaded_data"

        self.metadata: Optional[Dict[str, Any]] = None
        self.analysis_params: Optional[Dict[str, Any]] = None
        self.logger: logging.Logger = logging.getLogger(__name__)
        self.logger.setLevel(logging_level)

        if auto_import_metadata:
            self.import_metadata()

        if auto_import_analysis_params:
            self.import_analysis_parameters()

    def import_metadata(self) -> None:
        """Load experiment metadata from project_path/experiments.csv.

        Populates self.metadata with converted data types. Directory paths
        are resolved relative to self.data_path.
        """
        experiments_csv = self.project_path / "experiments.csv"

        if not experiments_csv.exists():
            raise FileNotFoundError(
                f"Experiments file not found: {experiments_csv}\n"
                f"Did you forget to initialize your project data folder?\n"
                f"Please copy the `experiments_template.csv` from `ace_neuro.shared.metadata_templates` "
                f"into your project directory. See docs/guides/data_management.md for details."
            )

        metadata_unconverted = CSVWorker.csv_row_to_dict(experiments_csv, self.line_num)
        if metadata_unconverted is None:
            raise ValueError(f"Line {self.line_num} not found in {experiments_csv}")

        metadata_converted = CSVWorker.convert_data_types(metadata_unconverted)
        self.metadata = metadata_converted

        # Resolve directory paths
        if self.metadata.get('ephys directory'):
            self.metadata['ephys directory'] = self.data_path / Path(str(self.metadata['ephys directory']))
        if self.metadata.get('calcium imaging directory'):
            self.metadata['calcium imaging directory'] = self.data_path / Path(str(self.metadata['calcium imaging directory']))

    def import_analysis_parameters(self) -> None:
        """Load analysis parameters from project_path/analysis_parameters.csv.

        Populates self.analysis_params. If the file doesn't exist or the line
        number isn't found, sets analysis_params to an empty dict (allowing
        pipeline defaults to be used).
        """
        analysis_params_csv = self.project_path / "analysis_parameters.csv"

        if not analysis_params_csv.exists():
            self.logger.warning(
                f"No analysis_parameters.csv found at {analysis_params_csv}.\n"
                f"For full reproducible control, copy the `analysis_parameters_template.csv` "
                f"from `ace_neuro.shared.metadata_templates` into your project directory.\n"
                f"Falling back to built-in pipeline defaults..."
            )
            self.analysis_params = {}
            return

        analysis_params_unconverted = CSVWorker.csv_row_to_dict(analysis_params_csv, self.line_num)
        if analysis_params_unconverted is None:
            self.logger.info(
                f"Line {self.line_num} not found in {analysis_params_csv}. "
                f"Using pipeline defaults."
            )
            self.analysis_params = {}
            return

        analysis_params_converted = CSVWorker.convert_data_types(analysis_params_unconverted)
        self.analysis_params = analysis_params_converted

    def get_pipeline_params(self) -> Dict[str, Any]:
        """Return analysis parameters formatted for pipeline.run().

        Converts the raw analysis_params dict to kwargs compatible with
        MiniscopePipeline.run() and EphysPipeline.run().

        Returns:
            Dict of kwargs to pass to pipeline.run()
        """
        from ace_neuro.shared.config_utils import parse_analysis_params
        return parse_analysis_params(self.analysis_params or {})

    def get_ephys_directory(self) -> Optional[Path]:
        """Return the ephys directory path from metadata."""
        if self.metadata:
            val = self.metadata.get('ephys directory')
            return Path(val) if val else None
        return None

    def get_miniscope_directory(self) -> Optional[Path]:
        """Return the miniscope/calcium imaging directory path from metadata."""
        if self.metadata:
            val = self.metadata.get('calcium imaging directory')
            return Path(val) if val else None
        return None

__init__(line_num, project_path=None, data_path=None, auto_import_metadata=True, auto_import_analysis_params=True, logging_level=logging.CRITICAL)

Initialize the data manager for a specific experiment.

Parameters:

Name Type Description Default
line_num int

Experiment line number (matches 'line number' column in CSVs).

required
project_path Optional[Union[str, Path]]

Path to project directory containing metadata CSVs. Required for most workflows. Defaults to PROJECT_ROOT/data if not provided.

None
data_path Optional[Union[str, Path]]

Base path for raw experimental data. Defaults to PROJECT_ROOT/data/downloaded_data if not provided.

None
auto_import_metadata bool

If True, load metadata from experiments.csv on init.

True
auto_import_analysis_params bool

If True, load analysis params on init.

True
logging_level Union[str, int]

Logging verbosity ('DEBUG', 'INFO', 'WARNING', 'CRITICAL').

CRITICAL
Source code in src/ace_neuro/shared/experiment_data_manager.py
def __init__(
    self, 
    line_num: int,
    project_path: Optional[Union[str, Path]] = None,
    data_path: Optional[Union[str, Path]] = None,
    auto_import_metadata: bool = True,
    auto_import_analysis_params: bool = True,
    logging_level: Union[str, int] = logging.CRITICAL
) -> None:
    """Initialize the data manager for a specific experiment.

    Args:
        line_num: Experiment line number (matches 'line number' column in CSVs).
        project_path: Path to project directory containing metadata CSVs.
            Required for most workflows. Defaults to PROJECT_ROOT/data if
            not provided.
        data_path: Base path for raw experimental data.
            Defaults to PROJECT_ROOT/data/downloaded_data if not provided.
        auto_import_metadata: If True, load metadata from experiments.csv on init.
        auto_import_analysis_params: If True, load analysis params on init.
        logging_level: Logging verbosity ('DEBUG', 'INFO', 'WARNING', 'CRITICAL').
    """
    self.line_num: int = line_num
    self.project_path: Path = Path(project_path) if project_path else PROJECT_ROOT / "data"
    self.data_path: Path = Path(data_path) if data_path else PROJECT_ROOT / "data" / "downloaded_data"

    self.metadata: Optional[Dict[str, Any]] = None
    self.analysis_params: Optional[Dict[str, Any]] = None
    self.logger: logging.Logger = logging.getLogger(__name__)
    self.logger.setLevel(logging_level)

    if auto_import_metadata:
        self.import_metadata()

    if auto_import_analysis_params:
        self.import_analysis_parameters()

get_ephys_directory()

Return the ephys directory path from metadata.

Source code in src/ace_neuro/shared/experiment_data_manager.py
def get_ephys_directory(self) -> Optional[Path]:
    """Return the ephys directory path from metadata."""
    if self.metadata:
        val = self.metadata.get('ephys directory')
        return Path(val) if val else None
    return None

get_miniscope_directory()

Return the miniscope/calcium imaging directory path from metadata.

Source code in src/ace_neuro/shared/experiment_data_manager.py
def get_miniscope_directory(self) -> Optional[Path]:
    """Return the miniscope/calcium imaging directory path from metadata."""
    if self.metadata:
        val = self.metadata.get('calcium imaging directory')
        return Path(val) if val else None
    return None

get_pipeline_params()

Return analysis parameters formatted for pipeline.run().

Converts the raw analysis_params dict to kwargs compatible with MiniscopePipeline.run() and EphysPipeline.run().

Returns:

Type Description
Dict[str, Any]

Dict of kwargs to pass to pipeline.run()

Source code in src/ace_neuro/shared/experiment_data_manager.py
def get_pipeline_params(self) -> Dict[str, Any]:
    """Return analysis parameters formatted for pipeline.run().

    Converts the raw analysis_params dict to kwargs compatible with
    MiniscopePipeline.run() and EphysPipeline.run().

    Returns:
        Dict of kwargs to pass to pipeline.run()
    """
    from ace_neuro.shared.config_utils import parse_analysis_params
    return parse_analysis_params(self.analysis_params or {})

import_analysis_parameters()

Load analysis parameters from project_path/analysis_parameters.csv.

Populates self.analysis_params. If the file doesn't exist or the line number isn't found, sets analysis_params to an empty dict (allowing pipeline defaults to be used).

Source code in src/ace_neuro/shared/experiment_data_manager.py
def import_analysis_parameters(self) -> None:
    """Load analysis parameters from project_path/analysis_parameters.csv.

    Populates self.analysis_params. If the file doesn't exist or the line
    number isn't found, sets analysis_params to an empty dict (allowing
    pipeline defaults to be used).
    """
    analysis_params_csv = self.project_path / "analysis_parameters.csv"

    if not analysis_params_csv.exists():
        self.logger.warning(
            f"No analysis_parameters.csv found at {analysis_params_csv}.\n"
            f"For full reproducible control, copy the `analysis_parameters_template.csv` "
            f"from `ace_neuro.shared.metadata_templates` into your project directory.\n"
            f"Falling back to built-in pipeline defaults..."
        )
        self.analysis_params = {}
        return

    analysis_params_unconverted = CSVWorker.csv_row_to_dict(analysis_params_csv, self.line_num)
    if analysis_params_unconverted is None:
        self.logger.info(
            f"Line {self.line_num} not found in {analysis_params_csv}. "
            f"Using pipeline defaults."
        )
        self.analysis_params = {}
        return

    analysis_params_converted = CSVWorker.convert_data_types(analysis_params_unconverted)
    self.analysis_params = analysis_params_converted

import_metadata()

Load experiment metadata from project_path/experiments.csv.

Populates self.metadata with converted data types. Directory paths are resolved relative to self.data_path.

Source code in src/ace_neuro/shared/experiment_data_manager.py
def import_metadata(self) -> None:
    """Load experiment metadata from project_path/experiments.csv.

    Populates self.metadata with converted data types. Directory paths
    are resolved relative to self.data_path.
    """
    experiments_csv = self.project_path / "experiments.csv"

    if not experiments_csv.exists():
        raise FileNotFoundError(
            f"Experiments file not found: {experiments_csv}\n"
            f"Did you forget to initialize your project data folder?\n"
            f"Please copy the `experiments_template.csv` from `ace_neuro.shared.metadata_templates` "
            f"into your project directory. See docs/guides/data_management.md for details."
        )

    metadata_unconverted = CSVWorker.csv_row_to_dict(experiments_csv, self.line_num)
    if metadata_unconverted is None:
        raise ValueError(f"Line {self.line_num} not found in {experiments_csv}")

    metadata_converted = CSVWorker.convert_data_types(metadata_unconverted)
    self.metadata = metadata_converted

    # Resolve directory paths
    if self.metadata.get('ephys directory'):
        self.metadata['ephys directory'] = self.data_path / Path(str(self.metadata['ephys directory']))
    if self.metadata.get('calcium imaging directory'):
        self.metadata['calcium imaging directory'] = self.data_path / Path(str(self.metadata['calcium imaging directory']))

Configuration helpers (analysis_parameters.csv)

ace_neuro.shared.config_utils.load_analysis_params(line_num, project_path=None)

Load analysis parameters for an experiment from the project directory.

Reads from project_path/analysis_parameters.csv.

Parameters:

Name Type Description Default
line_num int

Experiment line number (matches 'line number' column in CSV)

required
project_path Optional[Path]

Path to project directory containing analysis_parameters.csv. Required — raises ValueError if not provided.

None

Returns:

Type Description
Dict[str, Any]

Dict of parameters ready to pass to pipeline.run()

Raises:

Type Description
ValueError

If project_path is not provided

FileNotFoundError

If analysis_parameters.csv doesn't exist

ValueError

If line_num not found in CSV

Example

params = load_analysis_params(96, project_path=Path("/path/to/project")) api = MiniscopePipeline() api.run(line_num=96, project_path="/path/to/project", **params)

Source code in src/ace_neuro/shared/config_utils.py
def load_analysis_params(line_num: int, project_path: Optional[Path] = None) -> Dict[str, Any]:
    """Load analysis parameters for an experiment from the project directory.

    Reads from ``project_path/analysis_parameters.csv``.

    Args:
        line_num: Experiment line number (matches 'line number' column in CSV)
        project_path: Path to project directory containing analysis_parameters.csv.
            Required — raises ValueError if not provided.

    Returns:
        Dict of parameters ready to pass to pipeline.run()

    Raises:
        ValueError: If project_path is not provided
        FileNotFoundError: If analysis_parameters.csv doesn't exist
        ValueError: If line_num not found in CSV

    Example:
        >>> params = load_analysis_params(96, project_path=Path("/path/to/project"))
        >>> api = MiniscopePipeline()
        >>> api.run(line_num=96, project_path="/path/to/project", **params)
    """
    from ace_neuro.shared.csv_worker import CSVWorker

    if project_path is None:
        raise ValueError(
            "project_path is required. Pass the path to the directory "
            "containing your analysis_parameters.csv."
        )

    active_project = Path(project_path)
    target_csv = active_project / "analysis_parameters.csv"

    if not target_csv.exists():
        raise FileNotFoundError(
            f"Analysis parameters not found: {target_csv}\n"
            f"Make sure project_path is correct (currently: {active_project})"
        )

    raw = CSVWorker.csv_row_to_dict(target_csv, line_num)
    if raw is None:
        raise ValueError(f"Line {line_num} not found in {target_csv}")

    converted = CSVWorker.convert_data_types(raw)
    return parse_analysis_params(converted)

ace_neuro.shared.config_utils.parse_analysis_params(params)

Convert CSV column values to pipeline kwargs.

Maps column names from analysis_parameters.csv to the exact argument names expected by MiniscopePipeline.run() and EphysPipeline.run(). Empty/None values are skipped, allowing pipeline defaults to apply.

Parameters:

Name Type Description Default
params Dict[str, Any]

Dict from CSVWorker.csv_row_to_dict()

required

Returns:

Type Description
Dict[str, Any]

Dict with keys matching pipeline.run() arguments

Source code in src/ace_neuro/shared/config_utils.py
def parse_analysis_params(params: Dict[str, Any]) -> Dict[str, Any]:
    """Convert CSV column values to pipeline kwargs.

    Maps column names from analysis_parameters.csv to the exact argument
    names expected by MiniscopePipeline.run() and EphysPipeline.run().
    Empty/None values are skipped, allowing pipeline defaults to apply.

    Args:
        params: Dict from CSVWorker.csv_row_to_dict()

    Returns:
        Dict with keys matching pipeline.run() arguments
    """
    # Columns that map directly (CSV column name == kwarg name)
    DIRECT_KEYS: List[str] = [
        # Miniscope preprocessing
        'filenames', 'crop_coords',
        'detrend_method', 'df_over_f', 'secs_window', 'quantile_min',
        # Miniscope processing
        'parallel', 'n_processes', 'apply_motion_correction',
        'inspect_motion_correction', 'plot_params',
        'run_CNMFE', 'save_estimates', 'save_CNMFE_estimates_filename',
        'save_CNMFE_params',
        # Miniscope postprocessing
        'remove_components_with_gui', 'find_calcium_events',
        'derivative_for_estimates', 'event_height',
        'compute_miniscope_phase', 'n', 'cut', 'ftype', 'btype', 'inline',
        'window_length', 'window_step', 'freq_lims', 'time_bandwidth',
        # Ephys
        'channel_name', 'remove_artifacts', 'filter_type', 'filter_range',
        'compute_phases', 'plot_channel', 'plot_spectrogram', 'plot_phases',
        'logging_level'
    ]

    # Columns with different names in CSV vs kwargs
    RENAMED_KEYS: Dict[str, str] = {
        'filter_data': 'filter_miniscope_data',
        'spectrogram': 'compute_miniscope_spectrogram',
        'method': 'df_over_f_method',
    }

    args: Dict[str, Any] = {}

    for key in DIRECT_KEYS:
        if key in params and params[key] is not None:
            args[key] = params[key]

    for csv_key, kwarg_key in RENAMED_KEYS.items():
        if csv_key in params and params[csv_key] is not None:
            args[kwarg_key] = params[csv_key]

    return args

Path Resolution

ace_neuro.shared.paths

Path configuration for the experiment_analysis package.

All data paths (project_path, data_path) must be provided explicitly by the user — either as arguments to Pipeline/DataManager constructors, or as CLI flags (--project-path, --data-path).

There is no hidden state: no .env files, no environment variable lookups.

Misc Functions

ace_neuro.shared.misc_functions

append_row_csv(data, filename)

Appends a new row to a CSV file. Args: data: Dictionary of data to be added to the csv file filename: Name of the CSV file to write to.

Source code in src/ace_neuro/shared/misc_functions.py
def append_row_csv(data: Dict[str, Any], filename: Union[str, Path]) -> None:
    """Appends a new row to a CSV file.
    Args:
        data: Dictionary of data to be added to the csv file
        filename: Name of the CSV file to write to.
    """
    file_exists = os.path.exists(filename)
    with open(filename, 'a', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=list(data.keys()))
        if not file_exists:
            writer.writeheader()
        writer.writerow(data)

conv_quat_to_euler(line)

Convert a CSV line of quaternion data to Euler angles.

Parameters:

Name Type Description Default
line List[Any]

List of [time, qw, qx, qy, qz].

required

Returns:

Type Description
Optional[List[Any]]

List of [time, roll, pitch, yaw].

Source code in src/ace_neuro/shared/misc_functions.py
def conv_quat_to_euler(line: List[Any]) -> Optional[List[Any]]:
    """Convert a CSV line of quaternion data to Euler angles.

    Args:
        line: List of [time, qw, qx, qy, qz].

    Returns:
        List of [time, roll, pitch, yaw].
    """
    if len(line) != 5:
        print('!!! ERROR: Invalid file')  # FIXME
        return
    time = line[0]
    qw = line[1]
    qx = line[2]
    qy = line[3]
    qz = line[4]
    eulerAngles = list(quat_to_euler(qw, qx, qy, qz, degrees=False))
    eulerAngles.insert(0, time)  # prepend time
    return eulerAngles

denoise_movie(dataDir, dataFilePrefix='', showVideo=False, startingFileNum=0, framesPerFile=1000, fs=30, frameStep=10, goodRadius=2000, notchHalfWidth=3, centerHalfHeightToLeave=90, cutoff=3.0, butterOrder=6, mode='display', compressionCodec='FFV1', jobID='')

Remove horizontal bands and slow flicker from miniscope movies.

Applies 2D FFT-based denoising to remove traveling horizontal bands and whole-image flicker artifacts. Based on Daniel Aharoni's denoising notebook: https://github.com/Aharoni-Lab/Miniscope-v4/tree/master/Miniscope-v4-Denoising-Notebook

Parameters:

Name Type Description Default
dataDir Union[str, List[str]]

Directory containing movie files to denoise.

required
dataFilePrefix str

Prefix before file numbers (e.g., 'msCam' for 'msCam0.avi').

''
showVideo bool

If True, display movie before analysis.

False
startingFileNum int

First file number to process; all subsequent files included.

0
framesPerFile int

Number of frames per file (set by Miniscope software).

1000
fs float

Frame acquisition rate in Hz.

30
frameStep int

Step size for 2D FFT generation (skip frames to speed up).

10
goodRadius int

Radius parameter for FFT filtering.

2000
notchHalfWidth int

Half-width of notch filter.

3
centerHalfHeightToLeave int

Half-height of pass frequencies in 2D FFT.

90
cutoff float

Cutoff frequency for filtering.

3.0
butterOrder int

Butterworth filter order (4-9 recommended to avoid artifacts).

6
mode str

'save' to write output or 'display' to show denoised movie.

'display'
compressionCodec str

Video codec for saving ('FFV1' or 'GREY').

'FFV1'
jobID str

Optional job identifier string.

''
Source code in src/ace_neuro/shared/misc_functions.py
def denoise_movie(
    dataDir: Union[str, List[str]], 
    dataFilePrefix: str = '', 
    showVideo: bool = False, 
    startingFileNum: int = 0,
    framesPerFile: int = 1000, 
    fs: float = 30, 
    frameStep: int = 10, 
    goodRadius: int = 2000,
    notchHalfWidth: int = 3, 
    centerHalfHeightToLeave: int = 90, 
    cutoff: float = 3.0,
    butterOrder: int = 6, 
    mode: str = 'display', 
    compressionCodec: str = 'FFV1', 
    jobID: str = ''
) -> None:
    """Remove horizontal bands and slow flicker from miniscope movies.

    Applies 2D FFT-based denoising to remove traveling horizontal bands and
    whole-image flicker artifacts. Based on Daniel Aharoni's denoising notebook:
    https://github.com/Aharoni-Lab/Miniscope-v4/tree/master/Miniscope-v4-Denoising-Notebook

    Args:
        dataDir: Directory containing movie files to denoise.
        dataFilePrefix: Prefix before file numbers (e.g., 'msCam' for 'msCam0.avi').
        showVideo: If True, display movie before analysis.
        startingFileNum: First file number to process; all subsequent files included.
        framesPerFile: Number of frames per file (set by Miniscope software).
        fs: Frame acquisition rate in Hz.
        frameStep: Step size for 2D FFT generation (skip frames to speed up).
        goodRadius: Radius parameter for FFT filtering.
        notchHalfWidth: Half-width of notch filter.
        centerHalfHeightToLeave: Half-height of pass frequencies in 2D FFT.
        cutoff: Cutoff frequency for filtering.
        butterOrder: Butterworth filter order (4-9 recommended to avoid artifacts).
        mode: 'save' to write output or 'display' to show denoised movie.
        compressionCodec: Video codec for saving ('FFV1' or 'GREY').
        jobID: Optional job identifier string.
    """
    difVideos = []

    if not isinstance(dataDir, list):
        dataDir = [dataDir]

    print(f"Processing directories: {dataDir}")

    for filePath in dataDir:
        # Skip already-denoised directories
        if 'Denoised' in filePath or (filePath + '\\Denoised') in dataDir:
            print(f"Skipping denoised directory: {filePath}")
            continue

        # Ensure path ends with /
        if filePath[-1] != "/":
            filePath = filePath + "/"
        print(f"Processing: {filePath}")

        # Step 1: Compute mean FFT across all frames
        sumFFT, rows, cols, vignette = _compute_mean_fft(
            filePath, dataFilePrefix, startingFileNum, framesPerFile, 
            frameStep, applyVignette=True, showVideo=showVideo
        )

        if sumFFT is None:
            print(f"No video files found in {filePath}")
            continue

        # Step 2: Create FFT spatial filter mask
        maskFFT = _create_fft_mask(rows, cols, goodRadius, notchHalfWidth, centerHalfHeightToLeave)

        # Step 3: Optional preview of filtered video
        if showVideo:
            _preview_filtered_video(filePath, dataFilePrefix, startingFileNum, 
                                   framesPerFile, frameStep, maskFFT)

        # Step 4: Calculate mean fluorescence per frame
        meanFrame = _compute_mean_fluorescence(filePath, dataFilePrefix, startingFileNum,
                                                framesPerFile, maskFFT)

        # Step 5: Apply temporal lowpass filter
        try:
            meanFiltered = _create_lowpass_filter(meanFrame, fs, cutoff, butterOrder)
        except (ValueError, RuntimeError) as e:
            print(f"ERROR filtering {filePath}: {e}")
            difVideos.append(filePath)
            continue

        # Step 6: Process and save/display final output
        _process_and_save_frames(filePath, dataFilePrefix, startingFileNum, framesPerFile,
                                  maskFFT, meanFiltered, mode, compressionCodec, jobID,
                                  rows, cols)

    if difVideos:
        print(f"ERRORS with: {difVideos}")
        print("Consider investigating")

filter_data(data, n, cut, ftype, btype, fs, bodePlot=False)

Use ftype to indicate FIR or Butterworth filter.

For the FIR filter indicate a LowPass, HighPass, or BandPass with btype = lowpass, highpass, or bandpass, respectively. n is the length of the filter (number of coefficients, i.e. the filter order + 1). numtaps must be odd if a passband includes the Nyquist frequency. A good value for n is 10000. Channel should be set to desired .ncs file

The Butterworth filters have a more linear phase response in the pass-band than other types and is able to provide better group delay performance, and also a lower level of overshoot. Indicate the filter type by setting btype = 'low', 'high', or 'band'. The default for n is n = 2 For a bandpass filter indicate the lowstop and the highstop by using an array. example: wn= ([10, 30])

Source code in src/ace_neuro/shared/misc_functions.py
def filter_data(
    data: np.ndarray, 
    n: int, 
    cut: Union[float, List[float], np.ndarray], 
    ftype: str, 
    btype: str, 
    fs: float, 
    bodePlot: bool = False
) -> np.ndarray:
    """ Use ftype to indicate FIR or Butterworth filter.

    For the FIR filter indicate a LowPass, HighPass, or BandPass with btype = lowpass, highpass, or bandpass, respectively. 
    n is the length of the filter (number of coefficients, i.e. the filter order + 1). numtaps must be odd if a passband includes the Nyquist frequency.
    A good value for n is 10000.
    Channel should be set to desired .ncs file

    The Butterworth filters have a more linear phase response in the pass-band than other types and is able to provide better group delay performance, and also a lower level of overshoot.
    Indicate the filter type by setting btype = 'low', 'high', or 'band'.
    The default for n is n = 2
    For a bandpass filter indicate the lowstop and the highstop by using an array. example: wn= ([10, 30])"""

    filteredData: np.ndarray
    if ftype.lower() == 'fir':
        h = firwin(n, cut, pass_zero=btype, fs=fs)  # Build the FIR filter
        filteredData = filtfilt(h, 1, data)  # Zero-phase filter the data
        if bodePlot:
            w, a = freqz(h, worN=10000, fs=fs if fs else 2000)
            plt.figure()
            plt.semilogx(w, abs(a))

            w_b, mag, phase = bode((h, 1), w=2 * np.pi * w)
            plt.figure()
            plt.semilogx(w_b, mag)
            plt.figure()
            plt.semilogx(w_b, phase)

    elif ftype.lower() in ('butterworth', 'butter'):
        b, a_filt = butter(n, cut, btype=btype, fs=fs)
        filteredData = filtfilt(b, a_filt, data)

        if bodePlot:
            w, h_resp = freqz(b, a_filt, worN=10000, fs=fs if fs else 2000)
            plt.figure()
            plt.semilogx(w, abs(h_resp))

            w_b, mag, phase = bode((b, a_filt), w=2 * np.pi * w)
            plt.figure()
            plt.semilogx(w_b, mag)
            plt.figure()
            plt.semilogx(w_b, phase)
    else:
        raise ValueError(f"Unknown filter type: {ftype}")

    return filteredData

get_coords_dict_from_analysis_params(miniscope_data_manager)

Extract crop coordinates from analysis parameters.

Reads the 'crop_coords' column from analysis_params and returns a dict with x0, y0, x1, y1 keys suitable for cropping.

Parameters:

Name Type Description Default
miniscope_data_manager Any

Data manager with analysis_params.

required

Returns:

Type Description
Optional[Dict[str, int]]

Tuple of (coords_dict, crop_job_name). coords_dict is None

str

if no crop coordinates are found.

Source code in src/ace_neuro/shared/misc_functions.py
def get_coords_dict_from_analysis_params(miniscope_data_manager: Any) -> Tuple[Optional[Dict[str, int]], str]:
    """Extract crop coordinates from analysis parameters.

    Reads the 'crop_coords' column from analysis_params and returns
    a dict with x0, y0, x1, y1 keys suitable for cropping.

    Args:
        miniscope_data_manager: Data manager with analysis_params.

    Returns:
        Tuple of (coords_dict, crop_job_name). coords_dict is None
        if no crop coordinates are found.
    """
    coords_dict: Optional[Dict[str, int]] = None
    crop_job_name: str = ''
    try:
        if miniscope_data_manager.analysis_params:
            previous_coords = miniscope_data_manager.analysis_params.get('crop_coords')
            if previous_coords and len(previous_coords) >= 4:
                coords_dict = {
                    'x0': int(previous_coords[0]),
                    'y0': int(previous_coords[1]),
                    'x1': int(previous_coords[2]),
                    'y1': int(previous_coords[3])
                }
                crop_job_name = '_crop'
    except (KeyError, TypeError, IndexError):
        print("Did not find valid crop coordinates in analysis_params['crop_coords']")

    return coords_dict, crop_job_name

import_video_as_numpy_array(filename, frames='all', displayFrame=False, frameToDisplay=10)

Import a video file directly into a NumPy array.

This function leverages OpenCV to read video frames sequentially and load them into a preallocated 4D NumPy array (frames, height, width, channels).

Credit: Adapted from https://stackoverflow.com/questions/42163058/how-to-turn-a-video-into-numpy-array

Parameters:

Name Type Description Default
filename str

The absolute or relative path to the video file.

required
frames int or all

Number of frames to read. Defaults to 'all'.

'all'
displayFrame bool

If True, displays a specific frame after loading. Defaults to False.

False
frameToDisplay int

The 1-indexed frame number to display if displayFrame is True. Defaults to 10.

10

Returns:

Type Description
ndarray

np.ndarray: A 4D uint8 array containing the video data (frames, height, width, 3).

Source code in src/ace_neuro/shared/misc_functions.py
def import_video_as_numpy_array(
    filename: str, 
    frames: Union[int, str] = 'all', 
    displayFrame: bool = False, 
    frameToDisplay: int = 10
) -> np.ndarray:
    """Import a video file directly into a NumPy array.

    This function leverages OpenCV to read video frames sequentially and load them
    into a preallocated 4D NumPy array `(frames, height, width, channels)`.

    *Credit: Adapted from https://stackoverflow.com/questions/42163058/how-to-turn-a-video-into-numpy-array*

    Args:
        filename (str): The absolute or relative path to the video file.
        frames (int or 'all', optional): Number of frames to read. Defaults to 'all'.
        displayFrame (bool, optional): If True, displays a specific frame after loading. Defaults to False.
        frameToDisplay (int, optional): The 1-indexed frame number to display if `displayFrame` is True. Defaults to 10.

    Returns:
        np.ndarray: A 4D uint8 array containing the video data `(frames, height, width, 3)`.
    """
    cap = cv2.VideoCapture(filename)
    frameCount = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frameWidth = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frameHeight = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    if frames != 'all':
        frameCount = int(frames)
    buf = np.empty((int(frameCount), int(frameHeight), int(frameWidth), 3), np.dtype('uint8'))
    fc = 0
    ret = True
    while (fc < frameCount and ret):
        ret, buf[fc] = cap.read()
        fc += 1
    cap.release()
    if displayFrame:
        cv2.namedWindow('frame ' + str(frameToDisplay))
        cv2.imshow('frame ' + str(frameToDisplay), buf[frameToDisplay - 1])
    return buf

load_obj(filename)

Load a pickled object from disk.

Useful for loading previously saved class instances.

Parameters:

Name Type Description Default
filename Union[str, Path]

Path to the pickle file.

required

Returns:

Type Description
Any

The unpickled Python object.

Source code in src/ace_neuro/shared/misc_functions.py
def load_obj(filename: Union[str, Path]) -> Any:
    """Load a pickled object from disk.

    Useful for loading previously saved class instances.

    Args:
        filename: Path to the pickle file.

    Returns:
        The unpickled Python object.
    """
    with open(filename, 'rb') as fileToRead:
        loadedObject = pickle.load(fileToRead)
    return loadedObject

mark_events(axisHandle, eventTimes)

Draw vertical event markers on a plot at specified times.

Parameters:

Name Type Description Default
axisHandle Axes

Matplotlib axis to draw on.

required
eventTimes Union[float, List[float], ndarray]

Single time or list of times to mark.

required
Source code in src/ace_neuro/shared/misc_functions.py
def mark_events(axisHandle: plt.Axes, eventTimes: Union[float, List[float], np.ndarray]) -> None:
    """Draw vertical event markers on a plot at specified times.

    Args:
        axisHandle: Matplotlib axis to draw on.
        eventTimes: Single time or list of times to mark.
    """
    # Mark Neuralynx events on a given plot
    yLimits = axisHandle.get_ylim()
    xLimits = axisHandle.get_xlim()
    lineLength = np.diff(yLimits)
    lineOffset = yLimits[0] + (lineLength / 2)
    if not isinstance(eventTimes, (list, np.ndarray)):
        eventPoints = [eventTimes]
    else:
        eventPoints = eventTimes

    axisHandle.eventplot(eventPoints, lineoffsets=float(lineOffset), linelengths=float(lineLength), colors='k')
    axisHandle.axis((xLimits[0], xLimits[1], yLimits[0], yLimits[1]))

quat_to_euler(qw, qx, qy, qz, degrees=False)

Convert quaternion to Euler angles (roll, pitch, yaw).

Parameters:

Name Type Description Default
qw float

Quaternion w component.

required
qx float

Quaternion x component.

required
qy float

Quaternion y component.

required
qz float

Quaternion z component.

required
degrees bool

If True, return angles in degrees; otherwise radians.

False

Returns:

Type Description
List[float]

List of [roll, pitch, yaw] angles.

Source code in src/ace_neuro/shared/misc_functions.py
def quat_to_euler(qw: float, qx: float, qy: float, qz: float, degrees: bool = False) -> List[float]:
    """Convert quaternion to Euler angles (roll, pitch, yaw).

    Args:
        qw: Quaternion w component.
        qx: Quaternion x component.
        qy: Quaternion y component.
        qz: Quaternion z component.
        degrees: If True, return angles in degrees; otherwise radians.

    Returns:
        List of [roll, pitch, yaw] angles.
    """
    m00 = 1.0 - 2.0 * qy * qy - 2.0 * qz * qz
    m01 = 2.0 * qx * qy + 2.0 * qz * qw
    m02 = 2.0 * qx * qz - 2.0 * qy * qw
    m10 = 2.0 * qx * qy - 2.0 * qz * qw
    m11 = 1 - 2.0 * qx * qx - 2.0 * qz * qz
    m12 = 2.0 * qy * qz + 2.0 * qx * qw
    m20 = 2.0 * qx * qz + 2.0 * qy * qw
    m21 = 2.0 * qy * qz - 2.0 * qx * qw
    m22 = 1.0 - 2.0 * qx * qx - 2.0 * qy * qy

    eulerAngles = []

    R = np.arctan2(m12, m22)  # Roll
    eulerAngles.append(R)
    c2 = np.sqrt(m00 * m00 + m01 * m01)
    P = np.arctan2(-m02, c2)  # Pitch
    eulerAngles.append(P)
    s1 = np.sin(R)
    c1 = np.cos(R)
    Y = np.arctan2(s1 * m20 - c1 * m10, c1 * m11 - s1 * m21)  # Yaw
    eulerAngles.append(Y)
    if degrees == True:
        eulerAngles = [math.degrees(R), math.degrees(P), math.degrees(Y)]
    return eulerAngles

spectrogram(tVec, freqVec, specData, cBarPercentLims=[5.0, 95.0], xLabel='Time (s)', yLabel='Frequency (Hz)', cLabel='Power (dB)')

Plots a spectrogram that has already been computed. TVEC is a vector of the x-axis time points or a time vector consisting of just [min, max]. FREQVEC is a vector of the y-axis frequency points, or a frequency vector consisting of just [min, max]. SPECDATA is the matrix of spectral power. CBARPERCENTLIMS sets the bounds on the color bar by finding the specified percentages of the power in specData.

Source code in src/ace_neuro/shared/misc_functions.py
def spectrogram(
    tVec: np.ndarray, 
    freqVec: np.ndarray, 
    specData: np.ndarray, 
    cBarPercentLims: List[float] = [5., 95.], 
    xLabel: str = 'Time (s)', 
    yLabel: str = 'Frequency (Hz)',
    cLabel: str = 'Power (dB)'
) -> Tuple[plt.Figure, plt.Axes]:
    """
    Plots a spectrogram that has already been computed.
    TVEC is a vector of the x-axis time points or a time vector consisting of just [min, max].
    FREQVEC is a vector of the y-axis frequency points, or a frequency vector consisting of just [min, max].
    SPECDATA is the matrix of spectral power.
    CBARPERCENTLIMS sets the bounds on the color bar by finding the specified percentages of the power in specData.
    """
    h, ax = _prep_axes(xLabel=xLabel, yLabel=yLabel)
    if isinstance(ax, list):
        ax = ax[0]

    cBarMin = np.percentile(specData, cBarPercentLims[0])
    cBarMax = np.percentile(specData, cBarPercentLims[1])
    spectrogramPlot = ax.imshow(specData, interpolation='none', extent=(tVec[0], tVec[-1], freqVec[0], freqVec[-1]),
                                aspect='auto', vmin=cBarMin, vmax=cBarMax, origin='lower')
    cbar = h.colorbar(spectrogramPlot, ax=ax)
    cbar.set_label(cLabel)
    return h, ax

spike_trig_avg(eventArray, dataArray, framesb, framesa)

Compute the average spike values starting 'framesb' before the event and ending 'framesa' after the event.

Parameters:

Name Type Description Default
eventArray ndarray

A numpy array of when and/or where events occur. Can either be in the format of [[component, frame],...] or [[frame],...]

required
dataArray ndarray

A numpy array of the signal values at each frame.

required
framesb int

Number of frames before the event to include.

required
framesa int

Number of frames after the event to include.

required

Returns: avgEventDict: a dictionary dictionary where the keys represent the component number from the dataArray and the value is a numpy array of the average values at each frame of the designated window around the event

Source code in src/ace_neuro/shared/misc_functions.py
def spike_trig_avg(eventArray: np.ndarray, dataArray: np.ndarray, framesb: int, framesa: int) -> Dict[int, np.ndarray]:       
    """
    Compute the average spike values starting 'framesb' before the event
    and ending 'framesa' after the event.

    Args:
        eventArray: A numpy array of when and/or where events occur. Can either
                    be in the format of [[component, frame],...] or
                    [[frame],...]
        dataArray: A numpy array of the signal values at each frame.
        framesb: Number of frames before the event to include.
        framesa: Number of frames after the event to include.
    Returns:
        avgEventDict: a dictionary dictionary where the keys represent the
                      component number from the dataArray and the value is
                      a numpy array of the average values at each frame
                      of the designated window around the event
    """
    avgEventDict: Dict[int, np.ndarray] = {}
    if dataArray.ndim == 1:
        valid_events = 0
        for event in eventArray:
            idx = int(event[0])
            if idx >= framesb and idx <= dataArray.size - framesa - 1:
                chunk = dataArray[idx - framesb:idx + framesa + 1]
                if 0 in avgEventDict:
                    avgEventDict[0] = avgEventDict[0] + chunk
                else:
                    avgEventDict[0] = chunk.astype(float)
                valid_events += 1
        if 0 in avgEventDict and valid_events > 0:
            avgEventDict[0] /= valid_events
    else:
        for event in eventArray:
            comp = int(event[0])
            idx = int(event[1])
            if idx >= framesb and idx <= dataArray[comp].size - framesa - 1:
                chunk = dataArray[comp][idx - framesb:idx + framesa + 1]
                if comp in avgEventDict:
                    avgEventDict[comp] = avgEventDict[comp] + chunk
                else:
                    avgEventDict[comp] = chunk.astype(float)

        for component in avgEventDict:
            num_events = len(np.argwhere(eventArray[:, 0] == component))
            if num_events > 0:
                avgEventDict[component] /= num_events
    return avgEventDict

thresh_func(dataArray, threshVal)

Find indices where data crosses above a threshold.

Parameters:

Name Type Description Default
dataArray ndarray

Input data array.

required
threshVal float

Threshold value.

required

Returns:

Type Description
ndarray

Array of indices where threshold crossings occur.

Source code in src/ace_neuro/shared/misc_functions.py
def thresh_func(dataArray: np.ndarray, threshVal: float) -> np.ndarray:
    """Find indices where data crosses above a threshold.

    Args:
        dataArray: Input data array.
        threshVal: Threshold value.

    Returns:
        Array of indices where threshold crossings occur.
    """
    binary_array = np.where(dataArray >= threshVal, 1, 0)
    indices = np.argwhere(np.diff(binary_array) == 1)
    addArr = np.zeros(np.shape(indices))
    if indices.size > 0:
        addArr[..., -1] = 1
    return indices + addArr

update_csv_cell(data, columnTitle, lineNum, csvFile)

Update a single cell in a CSV file.

Parameters:

Name Type Description Default
data Any

New value to write.

required
columnTitle str

Column header name.

required
lineNum int

Line number to update.

required
csvFile Union[str, Path]

Path to CSV file.

required
Source code in src/ace_neuro/shared/misc_functions.py
def update_csv_cell(data: Any, columnTitle: str, lineNum: int, csvFile: Union[str, Path]) -> None:
    """Update a single cell in a CSV file.

    Args:
        data: New value to write.
        columnTitle: Column header name.
        lineNum: Line number to update.
        csvFile: Path to CSV file.
    """
    csvData: List[Dict[str, str]] = []
    fieldnames: Optional[List[str]] = None

    with open(csvFile, 'r') as file:
        reader = csv.DictReader(file)
        if reader.fieldnames is not None:
            fieldnames = list(reader.fieldnames)
        for row in reader:
            if row.get('line number') == str(lineNum):
                row[columnTitle] = str(data)
            csvData.append(row)

    if fieldnames:
        with open(csvFile, 'w', newline='') as writeFile:
            writer = csv.DictWriter(writeFile, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(csvData)

z_score(dataArray, frameWindow=1000)

Compute the z-score of the data array values every designated frame window length based on the values within that frame window

Parameters:

Name Type Description Default
dataArray ndarray

A numpy array of values where the row represents the component and the column represents the frame number

required
frameWindow int

An integer value that determines the length of the window which the function z-scores across. Defaults to 1000 frames

1000

Returns:

Name Type Description
zScoreArray ndarray

A numpy array of the same shape as dataArray containing the z-score values of each frame

Source code in src/ace_neuro/shared/misc_functions.py
def z_score(dataArray: np.ndarray, frameWindow: int = 1000) -> np.ndarray:
    """
    Compute the z-score of the data array values every designated frame window
    length based on the values within that frame window

    Args:
        dataArray: A numpy array of values where the row represents the component
                   and the column represents the frame number
        frameWindow: An integer value that determines the length of the window
                     which the function z-scores across. Defaults to 1000 frames

    Returns:
        zScoreArray: A numpy array of the same shape as dataArray containing the 
                     z-score values of each frame
    """

    zScoreArray = np.zeros_like(dataArray, dtype=float)
    num_components, num_frames = dataArray.shape if dataArray.ndim > 1 else (1, dataArray.size)

    for i in range(0, math.ceil(num_frames / frameWindow)):
        start = i * frameWindow
        end = min((i + 1) * frameWindow, num_frames)
        if start >= num_frames:
            break

        if dataArray.ndim == 1:
            zScoreArray[start:end] = stats.zscore(dataArray[start:end])
        else:
            zScoreArray[:, start:end] = stats.zscore(dataArray[:, start:end], axis=1)

    return np.nan_to_num(zScoreArray)