Skip to content

Pipelines API

This section details the high-level entry points for running the analysis workflows.

How to pass arguments: each class exposes run(...). Pass keyword arguments matching the parameters documented below. Typical flows:

  1. Call run(line_num=..., project_path=..., data_path=..., **other_kwargs) from Python.
  2. Optionally load per-line defaults from analysis_parameters.csv with load_analysis_params and merge: run(**{**csv_params, "line_num": n, ...}).
  3. CLI modules (python -m ace_neuro.pipelines.*) only expose a few flags; they merge CSV + defaults internally — see Getting started section 5a.

Miniscope Pipeline

ace_neuro.pipelines.miniscope.MiniscopePipeline

High-level API for calcium imaging analysis workflows.

Orchestrates the complete miniscope analysis pipeline from raw video through CNMF-E source extraction and post-processing. Designed for non-technical users with sensible defaults.

Attributes:

Name Type Description
miniscope_data_manager MiniscopeDataManager

Data manager populated after run().

preprocessor MiniscopePreprocessor

MiniscopePreprocessor instance.

processor MiniscopeProcessor

MiniscopeProcessor instance.

postprocessor MiniscopePostprocessor

MiniscopePostprocessor instance.

Source code in src/ace_neuro/pipelines/miniscope.py
class MiniscopePipeline:
    """High-level API for calcium imaging analysis workflows.

    Orchestrates the complete miniscope analysis pipeline from raw video
    through CNMF-E source extraction and post-processing. Designed for
    non-technical users with sensible defaults.

    Attributes:
        miniscope_data_manager: Data manager populated after run().
        preprocessor: MiniscopePreprocessor instance.
        processor: MiniscopeProcessor instance.
        postprocessor: MiniscopePostprocessor instance.
    """

    miniscope_data_manager: MiniscopeDataManager
    preprocessor: MiniscopePreprocessor
    processor: MiniscopeProcessor
    postprocessor: MiniscopePostprocessor

    def __init__(self) -> None:
        """Initialize the MiniscopePipeline."""
        pass

    def run(
        self, 
        line_num: int,
        project_path: Optional[Union[str, Path]] = None,
        data_path: Optional[Union[str, Path]] = None,
        filenames: List[str] = [],

        # preprocessing parameters
        crop: bool = True,
        crop_coords: Optional[Union[List[int], Tuple[int, int, int, int]]] = None,
        detrend_method: Optional[str] = 'median',
        df_over_f: bool = False,
        # if df_over_f = True
        secs_window: float = 5,                     
        quantile_min: float = 8,
        df_over_f_method: str = 'delta_f_over_sqrt_f',

        # processing parameters    
        parallel: bool = False,
        n_processes: int = 12,
        apply_motion_correction: bool = False,
        inspect_motion_correction: bool = False,
        plot_params: bool = False,
        run_CNMFE: bool = False,
        save_estimates: bool = True,
        save_CNMFE_estimates_filename: str = 'estimates.hdf5',
        save_CNMFE_params: bool = False,

        # post processing parameters
        remove_components_with_gui: bool = True,  
        find_calcium_events: bool = True,
        derivative_for_estimates: str = 'first', 
        event_height: float = 5, 
        compute_miniscope_phase: bool = True, 
        filter_miniscope_data: bool = True,
        n: int = 2, 
        cut: List[float] = [0.1, 1.5], 
        ftype: str = 'butter', 
        btype: str = 'bandpass', 
        inline: bool = False,
        compute_miniscope_spectrogram: bool = True,
        window_length: float = 30, 
        window_step: float = 3, 
        freq_lims: List[float] = [0, 15], 
        time_bandwidth: float = 2,
        headless: bool = False
    ) -> None:
        """Run the complete miniscope analysis pipeline.

        Executes preprocessing (crop, detrend, DF/F), processing (motion
        correction, CNMF-E), and post-processing (component selection,
        event detection, spectral analysis) in sequence.

        Args:
            line_num: Experiment line number in experiments.csv.
            filenames: List of movie filenames to load (e.g., ['0.avi']).
            crop: If True, crop the movie.
            crop_coords: Crop coordinates as (x0, y0, x1, y1) tuple/list.
                If None, reads from analysis_parameters.csv or opens the GUI.
            detrend_method: 'median' or 'linear' for photobleaching correction.
            df_over_f: If True, compute DF/F normalization.
            secs_window: Window size for DF/F baseline estimation.
            quantile_min: Percentile for DF/F baseline.
            df_over_f_method: 'delta_f_over_sqrt_f' or 'delta_f_over_f'.
            parallel: If True, use multiprocessing.
            n_processes: Number of parallel processes.
            apply_motion_correction: If True, run motion correction.
            inspect_motion_correction: If True, show motion diagnostics.
            plot_params: If True, display CNMF-E parameter plots.
            run_CNMFE: If True, run CNMF-E source extraction.
            save_estimates: If True, save CNMF-E results to disk.
            save_CNMFE_estimates_filename: Filename for estimates.
            save_CNMFE_params: If True, save parameters to JSON.
            remove_components_with_gui: If True, open component curation GUI.
            find_calcium_events: If True, detect calcium transients.
            derivative_for_estimates: 'zeroth', 'first', or 'second'.
            event_height: Threshold for peak detection.
            compute_miniscope_phase: If True, compute Hilbert phase.
            filter_miniscope_data: If True, apply bandpass filter.
            n: Filter order.
            cut: [low, high] cutoff frequencies.
            ftype: Filter type ('butter').
            btype: Band type ('bandpass').
            inline: If True, replace data with filtered version.
            compute_miniscope_spectrogram: If True, compute spectrogram.
            window_length: Spectrogram window in seconds.
            window_step: Spectrogram step in seconds.
            freq_lims: [low, high] frequency range.
            time_bandwidth: Multitaper time-bandwidth product.
            headless: If True, disable all GUI interactions.
        """


        if headless:
            inspect_motion_correction = False
            remove_components_with_gui = False
            plot_params = False
            inline = False
            print("Running in HEADLESS mode. GUI steps disabled.", flush=True)

        try:
            self.miniscope_data_manager = MiniscopeDataManager.create(
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                filenames=filenames,
                auto_import_data=True,
            )
        except FileNotFoundError as e:
            raise DataNotFoundError(
                "Required miniscope input files were not found.",
                stage="create_data_manager",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Verify experiments.csv paths and ensure miniscope recordings exist under data_path.",
            ) from e
        except Exception as e:
            raise PipelineExecutionError(
                "Failed to initialize MiniscopeDataManager.",
                stage="create_data_manager",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Check metadata row values and input filenames.",
            ) from e



        #get cropping coordinates from crop_coords argument or from analysis_params
        if crop_coords is not None:
            coords_dict = {
                'x0': crop_coords[0], 'y0': crop_coords[1],
                'x1': crop_coords[2], 'y1': crop_coords[3]
            }
            crop_job_name = '_crop'
        else:
            coords_dict, crop_job_name = get_coords_dict_from_analysis_params(self.miniscope_data_manager)

        try:
            self.preprocessor = MiniscopePreprocessor(self.miniscope_data_manager)
            self.miniscope_data_manager = self.preprocessor.preprocess_calcium_movie(
                coords_dict,
                crop=crop,
                detrend_method=detrend_method,
                df_over_f=df_over_f,
                crop_job_name_for_file=crop_job_name,
                secs_window=secs_window,
                quantile_min=quantile_min,
                df_over_f_method=df_over_f_method,
                headless=headless,
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Miniscope preprocessing failed.",
                stage="preprocess_calcium_movie",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Inspect crop/detrend/df_over_f parameters for this experiment row.",
            ) from e

        if self.miniscope_data_manager.coords is not None:
            analysis_params_csv = self.miniscope_data_manager.project_path / "analysis_parameters.csv"
            print(f"updating {analysis_params_csv} with your cropping coordinates", flush=True)
            update_csv_cell(self.miniscope_data_manager.coords, 'crop_coords', line_num, analysis_params_csv)


        #Ensure self.miniscope.data_manager has 'movie' and 'preprocessed_movie_filepath' filled in with the movie that you want to process before you process

        try:
            self.processor = MiniscopeProcessor(self.miniscope_data_manager)
            self.miniscope_data_manager = self.processor.process_calcium_movie(
                parallel,
                n_processes,
                apply_motion_correction,
                inspect_motion_correction,
                plot_params,
                run_CNMFE,
                save_estimates,
                save_CNMFE_estimates_filename,
                save_CNMFE_params,
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Miniscope processing stage failed.",
                stage="process_calcium_movie",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Check CNMF-E and motion-correction parameters and data integrity.",
            ) from e



        if self.miniscope_data_manager.CNMFE_obj is not None:
            if not headless:
                if hasattr(tkinter, '_default_root') and tkinter._default_root:  # Check if Tkinter root exists
                    tkinter._default_root.destroy()  # Force close any Tkinter root
                matplotlib.use('Qt5Agg')  # Switch to Qt backend so that we can use interactive plotting during estimate evaluation
            else:
                matplotlib.use('Agg')

            try:
                self.postprocessor = MiniscopePostprocessor(self.miniscope_data_manager)
                self.miniscope_data_manager = self.postprocessor.postprocess_calcium_movie(
                    remove_components_with_gui,
                    find_calcium_events,
                    derivative_for_estimates,
                    event_height,
                    compute_miniscope_phase,
                    filter_miniscope_data,
                    n,
                    cut,
                    ftype,
                    btype,
                    inline,
                    compute_miniscope_spectrogram,
                    window_length,
                    window_step,
                    freq_lims,
                    time_bandwidth,
                )
            except Exception as e:
                raise PipelineExecutionError(
                    "Miniscope postprocessing failed.",
                    stage="postprocess_calcium_movie",
                    line_num=line_num,
                    project_path=project_path,
                    data_path=data_path,
                    hint="Check event detection/filter/spectrogram parameters and CNMF-E outputs.",
                ) from e

__init__()

Initialize the MiniscopePipeline.

Source code in src/ace_neuro/pipelines/miniscope.py
def __init__(self) -> None:
    """Initialize the MiniscopePipeline."""
    pass

run(line_num, project_path=None, data_path=None, filenames=[], crop=True, crop_coords=None, detrend_method='median', df_over_f=False, secs_window=5, quantile_min=8, df_over_f_method='delta_f_over_sqrt_f', parallel=False, n_processes=12, apply_motion_correction=False, inspect_motion_correction=False, plot_params=False, run_CNMFE=False, save_estimates=True, save_CNMFE_estimates_filename='estimates.hdf5', save_CNMFE_params=False, remove_components_with_gui=True, find_calcium_events=True, derivative_for_estimates='first', event_height=5, compute_miniscope_phase=True, filter_miniscope_data=True, n=2, cut=[0.1, 1.5], ftype='butter', btype='bandpass', inline=False, compute_miniscope_spectrogram=True, window_length=30, window_step=3, freq_lims=[0, 15], time_bandwidth=2, headless=False)

Run the complete miniscope analysis pipeline.

Executes preprocessing (crop, detrend, DF/F), processing (motion correction, CNMF-E), and post-processing (component selection, event detection, spectral analysis) in sequence.

Parameters:

Name Type Description Default
line_num int

Experiment line number in experiments.csv.

required
filenames List[str]

List of movie filenames to load (e.g., ['0.avi']).

[]
crop bool

If True, crop the movie.

True
crop_coords Optional[Union[List[int], Tuple[int, int, int, int]]]

Crop coordinates as (x0, y0, x1, y1) tuple/list. If None, reads from analysis_parameters.csv or opens the GUI.

None
detrend_method Optional[str]

'median' or 'linear' for photobleaching correction.

'median'
df_over_f bool

If True, compute DF/F normalization.

False
secs_window float

Window size for DF/F baseline estimation.

5
quantile_min float

Percentile for DF/F baseline.

8
df_over_f_method str

'delta_f_over_sqrt_f' or 'delta_f_over_f'.

'delta_f_over_sqrt_f'
parallel bool

If True, use multiprocessing.

False
n_processes int

Number of parallel processes.

12
apply_motion_correction bool

If True, run motion correction.

False
inspect_motion_correction bool

If True, show motion diagnostics.

False
plot_params bool

If True, display CNMF-E parameter plots.

False
run_CNMFE bool

If True, run CNMF-E source extraction.

False
save_estimates bool

If True, save CNMF-E results to disk.

True
save_CNMFE_estimates_filename str

Filename for estimates.

'estimates.hdf5'
save_CNMFE_params bool

If True, save parameters to JSON.

False
remove_components_with_gui bool

If True, open component curation GUI.

True
find_calcium_events bool

If True, detect calcium transients.

True
derivative_for_estimates str

'zeroth', 'first', or 'second'.

'first'
event_height float

Threshold for peak detection.

5
compute_miniscope_phase bool

If True, compute Hilbert phase.

True
filter_miniscope_data bool

If True, apply bandpass filter.

True
n int

Filter order.

2
cut List[float]

[low, high] cutoff frequencies.

[0.1, 1.5]
ftype str

Filter type ('butter').

'butter'
btype str

Band type ('bandpass').

'bandpass'
inline bool

If True, replace data with filtered version.

False
compute_miniscope_spectrogram bool

If True, compute spectrogram.

True
window_length float

Spectrogram window in seconds.

30
window_step float

Spectrogram step in seconds.

3
freq_lims List[float]

[low, high] frequency range.

[0, 15]
time_bandwidth float

Multitaper time-bandwidth product.

2
headless bool

If True, disable all GUI interactions.

False
Source code in src/ace_neuro/pipelines/miniscope.py
def run(
    self, 
    line_num: int,
    project_path: Optional[Union[str, Path]] = None,
    data_path: Optional[Union[str, Path]] = None,
    filenames: List[str] = [],

    # preprocessing parameters
    crop: bool = True,
    crop_coords: Optional[Union[List[int], Tuple[int, int, int, int]]] = None,
    detrend_method: Optional[str] = 'median',
    df_over_f: bool = False,
    # if df_over_f = True
    secs_window: float = 5,                     
    quantile_min: float = 8,
    df_over_f_method: str = 'delta_f_over_sqrt_f',

    # processing parameters    
    parallel: bool = False,
    n_processes: int = 12,
    apply_motion_correction: bool = False,
    inspect_motion_correction: bool = False,
    plot_params: bool = False,
    run_CNMFE: bool = False,
    save_estimates: bool = True,
    save_CNMFE_estimates_filename: str = 'estimates.hdf5',
    save_CNMFE_params: bool = False,

    # post processing parameters
    remove_components_with_gui: bool = True,  
    find_calcium_events: bool = True,
    derivative_for_estimates: str = 'first', 
    event_height: float = 5, 
    compute_miniscope_phase: bool = True, 
    filter_miniscope_data: bool = True,
    n: int = 2, 
    cut: List[float] = [0.1, 1.5], 
    ftype: str = 'butter', 
    btype: str = 'bandpass', 
    inline: bool = False,
    compute_miniscope_spectrogram: bool = True,
    window_length: float = 30, 
    window_step: float = 3, 
    freq_lims: List[float] = [0, 15], 
    time_bandwidth: float = 2,
    headless: bool = False
) -> None:
    """Run the complete miniscope analysis pipeline.

    Executes preprocessing (crop, detrend, DF/F), processing (motion
    correction, CNMF-E), and post-processing (component selection,
    event detection, spectral analysis) in sequence.

    Args:
        line_num: Experiment line number in experiments.csv.
        filenames: List of movie filenames to load (e.g., ['0.avi']).
        crop: If True, crop the movie.
        crop_coords: Crop coordinates as (x0, y0, x1, y1) tuple/list.
            If None, reads from analysis_parameters.csv or opens the GUI.
        detrend_method: 'median' or 'linear' for photobleaching correction.
        df_over_f: If True, compute DF/F normalization.
        secs_window: Window size for DF/F baseline estimation.
        quantile_min: Percentile for DF/F baseline.
        df_over_f_method: 'delta_f_over_sqrt_f' or 'delta_f_over_f'.
        parallel: If True, use multiprocessing.
        n_processes: Number of parallel processes.
        apply_motion_correction: If True, run motion correction.
        inspect_motion_correction: If True, show motion diagnostics.
        plot_params: If True, display CNMF-E parameter plots.
        run_CNMFE: If True, run CNMF-E source extraction.
        save_estimates: If True, save CNMF-E results to disk.
        save_CNMFE_estimates_filename: Filename for estimates.
        save_CNMFE_params: If True, save parameters to JSON.
        remove_components_with_gui: If True, open component curation GUI.
        find_calcium_events: If True, detect calcium transients.
        derivative_for_estimates: 'zeroth', 'first', or 'second'.
        event_height: Threshold for peak detection.
        compute_miniscope_phase: If True, compute Hilbert phase.
        filter_miniscope_data: If True, apply bandpass filter.
        n: Filter order.
        cut: [low, high] cutoff frequencies.
        ftype: Filter type ('butter').
        btype: Band type ('bandpass').
        inline: If True, replace data with filtered version.
        compute_miniscope_spectrogram: If True, compute spectrogram.
        window_length: Spectrogram window in seconds.
        window_step: Spectrogram step in seconds.
        freq_lims: [low, high] frequency range.
        time_bandwidth: Multitaper time-bandwidth product.
        headless: If True, disable all GUI interactions.
    """


    if headless:
        inspect_motion_correction = False
        remove_components_with_gui = False
        plot_params = False
        inline = False
        print("Running in HEADLESS mode. GUI steps disabled.", flush=True)

    try:
        self.miniscope_data_manager = MiniscopeDataManager.create(
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            filenames=filenames,
            auto_import_data=True,
        )
    except FileNotFoundError as e:
        raise DataNotFoundError(
            "Required miniscope input files were not found.",
            stage="create_data_manager",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Verify experiments.csv paths and ensure miniscope recordings exist under data_path.",
        ) from e
    except Exception as e:
        raise PipelineExecutionError(
            "Failed to initialize MiniscopeDataManager.",
            stage="create_data_manager",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Check metadata row values and input filenames.",
        ) from e



    #get cropping coordinates from crop_coords argument or from analysis_params
    if crop_coords is not None:
        coords_dict = {
            'x0': crop_coords[0], 'y0': crop_coords[1],
            'x1': crop_coords[2], 'y1': crop_coords[3]
        }
        crop_job_name = '_crop'
    else:
        coords_dict, crop_job_name = get_coords_dict_from_analysis_params(self.miniscope_data_manager)

    try:
        self.preprocessor = MiniscopePreprocessor(self.miniscope_data_manager)
        self.miniscope_data_manager = self.preprocessor.preprocess_calcium_movie(
            coords_dict,
            crop=crop,
            detrend_method=detrend_method,
            df_over_f=df_over_f,
            crop_job_name_for_file=crop_job_name,
            secs_window=secs_window,
            quantile_min=quantile_min,
            df_over_f_method=df_over_f_method,
            headless=headless,
        )
    except Exception as e:
        raise PipelineExecutionError(
            "Miniscope preprocessing failed.",
            stage="preprocess_calcium_movie",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Inspect crop/detrend/df_over_f parameters for this experiment row.",
        ) from e

    if self.miniscope_data_manager.coords is not None:
        analysis_params_csv = self.miniscope_data_manager.project_path / "analysis_parameters.csv"
        print(f"updating {analysis_params_csv} with your cropping coordinates", flush=True)
        update_csv_cell(self.miniscope_data_manager.coords, 'crop_coords', line_num, analysis_params_csv)


    #Ensure self.miniscope.data_manager has 'movie' and 'preprocessed_movie_filepath' filled in with the movie that you want to process before you process

    try:
        self.processor = MiniscopeProcessor(self.miniscope_data_manager)
        self.miniscope_data_manager = self.processor.process_calcium_movie(
            parallel,
            n_processes,
            apply_motion_correction,
            inspect_motion_correction,
            plot_params,
            run_CNMFE,
            save_estimates,
            save_CNMFE_estimates_filename,
            save_CNMFE_params,
        )
    except Exception as e:
        raise PipelineExecutionError(
            "Miniscope processing stage failed.",
            stage="process_calcium_movie",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Check CNMF-E and motion-correction parameters and data integrity.",
        ) from e



    if self.miniscope_data_manager.CNMFE_obj is not None:
        if not headless:
            if hasattr(tkinter, '_default_root') and tkinter._default_root:  # Check if Tkinter root exists
                tkinter._default_root.destroy()  # Force close any Tkinter root
            matplotlib.use('Qt5Agg')  # Switch to Qt backend so that we can use interactive plotting during estimate evaluation
        else:
            matplotlib.use('Agg')

        try:
            self.postprocessor = MiniscopePostprocessor(self.miniscope_data_manager)
            self.miniscope_data_manager = self.postprocessor.postprocess_calcium_movie(
                remove_components_with_gui,
                find_calcium_events,
                derivative_for_estimates,
                event_height,
                compute_miniscope_phase,
                filter_miniscope_data,
                n,
                cut,
                ftype,
                btype,
                inline,
                compute_miniscope_spectrogram,
                window_length,
                window_step,
                freq_lims,
                time_bandwidth,
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Miniscope postprocessing failed.",
                stage="postprocess_calcium_movie",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Check event detection/filter/spectrogram parameters and CNMF-E outputs.",
            ) from e

Ephys Pipeline

ace_neuro.pipelines.ephys.EphysPipeline

High-level API for electrophysiology data analysis workflows.

Provides simplified methods for loading, filtering, and visualizing Neuralynx ephys data with configurable analysis parameters.

Attributes:

Name Type Description
ephys_data_manager EphysDataManager

EphysDataManager instance (set after run()).

Source code in src/ace_neuro/pipelines/ephys.py
class EphysPipeline:
    """High-level API for electrophysiology data analysis workflows.

    Provides simplified methods for loading, filtering, and visualizing
    Neuralynx ephys data with configurable analysis parameters.

    Attributes:
        ephys_data_manager: EphysDataManager instance (set after run()).
    """

    ephys_data_manager: EphysDataManager

    def __init__(self) -> None:
        """Initialize the EphysPipeline."""
        pass



    def run(
        self, 
        line_num: int,
        project_path: Optional[Union[str, Path]] = None,
        data_path: Optional[Union[str, Path]] = None,
        channel_name: str = 'PFCLFPvsCBEEG',
        remove_artifacts: bool = False,
        filter_type: Optional[str] = None, # If desired, enter the type, eg "butter"
        filter_range: List[float] = [0.5, 4],
        compute_phases: bool = False,
        plot_channel: bool = False,
        plot_spectrogram: bool = False,
        plot_phases: bool = False,
        logging_level: Union[str, int] = "CRITICAL",
        headless: bool = False
    ) -> None:
        """Run the ephys analysis pipeline for a single channel.

        Loads ephys data, optionally filters and computes phases, and
        generates plots based on the provided parameters.

        Args:
            line_num: Experiment line number in experiments.csv.
            project_path: Optional explicit path to project repository.
            data_path: Optional explicit base path for raw experimental data.
            channel_name: Name of ephys channel to analyze.
            remove_artifacts: If True, apply artifact removal.
            filter_type: Filter type ('butter', 'fir') or None to skip.
            filter_range: [low, high] cutoff frequencies for bandpass.
            compute_phases: If True, compute instantaneous phase via Hilbert.
            plot_channel: If True, plot the time-domain signal.
            plot_spectrogram: If True, plot the multitaper spectrogram.
            plot_phases: If True, plot phase distribution histogram.
            logging_level: Logging verbosity ('DEBUG', 'INFO', 'CRITICAL').
            headless: If True, disable GUI and use Agg backend.
        """

        if headless:
            print("Running in HEADLESS mode. Plotting disabled.", flush=True)
            plot_channel = False
            plot_spectrogram = False
            plot_phases = False
            matplotlib.use('Agg')
        elif hasattr(tkinter, '_default_root') and tkinter._default_root:
            tkinter._default_root.destroy()
            matplotlib.use('Qt5Agg')


        logger = logging.getLogger(__name__)
        logger.setLevel(logging_level)

        # Set the filter boolean based on if filter_type is None
        filter_bool = True if filter_type is not None else False

        try:
            experiment_data_manager = ExperimentDataManager(
                line_num,
                project_path=project_path,
                data_path=data_path,
                logging_level=logging_level,
            )
        except FileNotFoundError as e:
            raise DataNotFoundError(
                "Project metadata files were not found for ephys run.",
                stage="load_experiment_metadata",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Ensure project_path points to a directory containing experiments.csv.",
            ) from e
        except Exception as e:
            raise PipelineExecutionError(
                "Failed to initialize ExperimentDataManager for ephys run.",
                stage="load_experiment_metadata",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Check metadata formatting and path configuration.",
            ) from e

        # Extract the one relevant piece of information that EphysDataManager needs from metadata--the path to the ephys directory
        ephys_directory = experiment_data_manager.get_ephys_directory()

        # Verify we downloaded the Ephys Data
        experiments_csv = experiment_data_manager.project_path / "experiments.csv"
        try:
            file_downloader.verify_file_by_line(
                line_num=line_num,
                csv_path=experiments_csv,
                do_type="ephys",
                base_file_path=experiment_data_manager.data_path,
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Ephys data verification failed.",
                stage="verify_ephys_data",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Confirm ephys files are present and accessible from metadata paths.",
            ) from e

        if ephys_directory is None:
             raise ValueError("Ephys directory could not be determined from experiment metadata.")

        # Create instance of EphysDataManager, process the block into channels
        try:
            self.ephys_data_manager = EphysDataManager.create(
                ephys_directory=ephys_directory,
                auto_import_ephys_block=True,
                auto_process_block=False,
                auto_compute_phases=False,
            )
            self.ephys_data_manager.process_ephys_block_to_channels(
                remove_artifacts=remove_artifacts,
                channels=[channel_name],
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Failed to import/process ephys block into channels.",
                stage="process_ephys_block",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Verify ephys channel metadata and raw recording format compatibility.",
            ) from e

        logger.debug(self.ephys_data_manager.channels)

        # If filter_type is not None, filter the signal and add it to ephys_data_manager.channels[channel_name].signal_filtered
        if filter_bool:
            print(f'Filtering ephys data with filter type "{filter_type}" and cut {filter_range}')
            try:
                self.ephys_data_manager.filter_ephys(
                    channel_name,
                    ftype=str(filter_type),
                    cut=filter_range,
                    replace_signal=False,
                )
            except Exception as e:
                raise PipelineExecutionError(
                    "Ephys filtering failed.",
                    stage="filter_ephys",
                    line_num=line_num,
                    project_path=project_path,
                    data_path=data_path,
                    hint="Check filter_type/filter_range for valid values.",
                ) from e

        if compute_phases:
            # Compute phases after filtering
            try:
                self.ephys_data_manager.compute_phases_all_channels()
            except Exception as e:
                raise PipelineExecutionError(
                    "Phase computation failed for ephys data.",
                    stage="compute_phases",
                    line_num=line_num,
                    project_path=project_path,
                    data_path=data_path,
                    hint="Ensure filtered channel data is available before computing phases.",
                ) from e

        # Extract correct channel and visualize
        logger.info(f"Visualizing channel: {channel_name}")
        try:
            channel = self.ephys_data_manager.get_channel(channel_name)
        except Exception as e:
            raise PipelineExecutionError(
                f"Could not load requested channel '{channel_name}'.",
                stage="get_channel",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Confirm channel_name appears in experiment metadata and imported channels.",
            ) from e
        channel_worker = ChannelWorker(channel)


        if plot_channel:
            channel_worker.plot_channel(use_filtered = filter_bool)

        if plot_spectrogram:
            channel_worker.plot_spectrogram(use_filtered = filter_bool, plot_events=False)  

        if plot_phases:
            channel_worker.plot_phases()



    def run_all_channels(
        self, 
        line_num: int,
        remove_artifacts: bool = False,
        filter_type: Optional[str] = None, # if desired, enter the type, eg "butter"
        filter_range: List[float] = [0.5, 4],
        plot_channel: bool = False,
        plot_spectrogram: bool = False,
        logging_level: Union[str, int] = "CRITICAL"
    ) -> None:
        """Run ephys analysis pipeline for all channels in an experiment.

        Iterates through all channels listed in the experiment metadata
        and performs the analysis workflow on each.

        Args:
            line_num: Experiment line number in experiments.csv.
            remove_artifacts: If True, apply artifact removal.
            filter_type: Filter type ('butter', 'fir') or None to skip.
            filter_range: [low, high] cutoff frequencies for bandpass.
            plot_channel: If True, plot time-domain signals.
            plot_spectrogram: If True, plot spectrograms.
            logging_level: Logging verbosity.
        """


        logger = logging.getLogger(__name__)
        logger.setLevel(logging_level)

        # set the filter boolean based on if filter_type is None
        filter: bool = True if filter_type is not None else False

        experiment_data_manager = ExperimentDataManager(line_num, logging_level = logging_level)

        if experiment_data_manager.metadata is None:
             raise ValueError(f"Metadata could not be loaded for line {line_num}")
        channels_str = experiment_data_manager.metadata['LFP and EEG CSCs']
        channels_list: List = [*channels_str] # unpack

        ephys_directory = experiment_data_manager.get_ephys_directory()

__init__()

Initialize the EphysPipeline.

Source code in src/ace_neuro/pipelines/ephys.py
def __init__(self) -> None:
    """Initialize the EphysPipeline."""
    pass

run(line_num, project_path=None, data_path=None, channel_name='PFCLFPvsCBEEG', remove_artifacts=False, filter_type=None, filter_range=[0.5, 4], compute_phases=False, plot_channel=False, plot_spectrogram=False, plot_phases=False, logging_level='CRITICAL', headless=False)

Run the ephys analysis pipeline for a single channel.

Loads ephys data, optionally filters and computes phases, and generates plots based on the provided parameters.

Parameters:

Name Type Description Default
line_num int

Experiment line number in experiments.csv.

required
project_path Optional[Union[str, Path]]

Optional explicit path to project repository.

None
data_path Optional[Union[str, Path]]

Optional explicit base path for raw experimental data.

None
channel_name str

Name of ephys channel to analyze.

'PFCLFPvsCBEEG'
remove_artifacts bool

If True, apply artifact removal.

False
filter_type Optional[str]

Filter type ('butter', 'fir') or None to skip.

None
filter_range List[float]

[low, high] cutoff frequencies for bandpass.

[0.5, 4]
compute_phases bool

If True, compute instantaneous phase via Hilbert.

False
plot_channel bool

If True, plot the time-domain signal.

False
plot_spectrogram bool

If True, plot the multitaper spectrogram.

False
plot_phases bool

If True, plot phase distribution histogram.

False
logging_level Union[str, int]

Logging verbosity ('DEBUG', 'INFO', 'CRITICAL').

'CRITICAL'
headless bool

If True, disable GUI and use Agg backend.

False
Source code in src/ace_neuro/pipelines/ephys.py
def run(
    self, 
    line_num: int,
    project_path: Optional[Union[str, Path]] = None,
    data_path: Optional[Union[str, Path]] = None,
    channel_name: str = 'PFCLFPvsCBEEG',
    remove_artifacts: bool = False,
    filter_type: Optional[str] = None, # If desired, enter the type, eg "butter"
    filter_range: List[float] = [0.5, 4],
    compute_phases: bool = False,
    plot_channel: bool = False,
    plot_spectrogram: bool = False,
    plot_phases: bool = False,
    logging_level: Union[str, int] = "CRITICAL",
    headless: bool = False
) -> None:
    """Run the ephys analysis pipeline for a single channel.

    Loads ephys data, optionally filters and computes phases, and
    generates plots based on the provided parameters.

    Args:
        line_num: Experiment line number in experiments.csv.
        project_path: Optional explicit path to project repository.
        data_path: Optional explicit base path for raw experimental data.
        channel_name: Name of ephys channel to analyze.
        remove_artifacts: If True, apply artifact removal.
        filter_type: Filter type ('butter', 'fir') or None to skip.
        filter_range: [low, high] cutoff frequencies for bandpass.
        compute_phases: If True, compute instantaneous phase via Hilbert.
        plot_channel: If True, plot the time-domain signal.
        plot_spectrogram: If True, plot the multitaper spectrogram.
        plot_phases: If True, plot phase distribution histogram.
        logging_level: Logging verbosity ('DEBUG', 'INFO', 'CRITICAL').
        headless: If True, disable GUI and use Agg backend.
    """

    if headless:
        print("Running in HEADLESS mode. Plotting disabled.", flush=True)
        plot_channel = False
        plot_spectrogram = False
        plot_phases = False
        matplotlib.use('Agg')
    elif hasattr(tkinter, '_default_root') and tkinter._default_root:
        tkinter._default_root.destroy()
        matplotlib.use('Qt5Agg')


    logger = logging.getLogger(__name__)
    logger.setLevel(logging_level)

    # Set the filter boolean based on if filter_type is None
    filter_bool = True if filter_type is not None else False

    try:
        experiment_data_manager = ExperimentDataManager(
            line_num,
            project_path=project_path,
            data_path=data_path,
            logging_level=logging_level,
        )
    except FileNotFoundError as e:
        raise DataNotFoundError(
            "Project metadata files were not found for ephys run.",
            stage="load_experiment_metadata",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Ensure project_path points to a directory containing experiments.csv.",
        ) from e
    except Exception as e:
        raise PipelineExecutionError(
            "Failed to initialize ExperimentDataManager for ephys run.",
            stage="load_experiment_metadata",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Check metadata formatting and path configuration.",
        ) from e

    # Extract the one relevant piece of information that EphysDataManager needs from metadata--the path to the ephys directory
    ephys_directory = experiment_data_manager.get_ephys_directory()

    # Verify we downloaded the Ephys Data
    experiments_csv = experiment_data_manager.project_path / "experiments.csv"
    try:
        file_downloader.verify_file_by_line(
            line_num=line_num,
            csv_path=experiments_csv,
            do_type="ephys",
            base_file_path=experiment_data_manager.data_path,
        )
    except Exception as e:
        raise PipelineExecutionError(
            "Ephys data verification failed.",
            stage="verify_ephys_data",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Confirm ephys files are present and accessible from metadata paths.",
        ) from e

    if ephys_directory is None:
         raise ValueError("Ephys directory could not be determined from experiment metadata.")

    # Create instance of EphysDataManager, process the block into channels
    try:
        self.ephys_data_manager = EphysDataManager.create(
            ephys_directory=ephys_directory,
            auto_import_ephys_block=True,
            auto_process_block=False,
            auto_compute_phases=False,
        )
        self.ephys_data_manager.process_ephys_block_to_channels(
            remove_artifacts=remove_artifacts,
            channels=[channel_name],
        )
    except Exception as e:
        raise PipelineExecutionError(
            "Failed to import/process ephys block into channels.",
            stage="process_ephys_block",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Verify ephys channel metadata and raw recording format compatibility.",
        ) from e

    logger.debug(self.ephys_data_manager.channels)

    # If filter_type is not None, filter the signal and add it to ephys_data_manager.channels[channel_name].signal_filtered
    if filter_bool:
        print(f'Filtering ephys data with filter type "{filter_type}" and cut {filter_range}')
        try:
            self.ephys_data_manager.filter_ephys(
                channel_name,
                ftype=str(filter_type),
                cut=filter_range,
                replace_signal=False,
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Ephys filtering failed.",
                stage="filter_ephys",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Check filter_type/filter_range for valid values.",
            ) from e

    if compute_phases:
        # Compute phases after filtering
        try:
            self.ephys_data_manager.compute_phases_all_channels()
        except Exception as e:
            raise PipelineExecutionError(
                "Phase computation failed for ephys data.",
                stage="compute_phases",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Ensure filtered channel data is available before computing phases.",
            ) from e

    # Extract correct channel and visualize
    logger.info(f"Visualizing channel: {channel_name}")
    try:
        channel = self.ephys_data_manager.get_channel(channel_name)
    except Exception as e:
        raise PipelineExecutionError(
            f"Could not load requested channel '{channel_name}'.",
            stage="get_channel",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Confirm channel_name appears in experiment metadata and imported channels.",
        ) from e
    channel_worker = ChannelWorker(channel)


    if plot_channel:
        channel_worker.plot_channel(use_filtered = filter_bool)

    if plot_spectrogram:
        channel_worker.plot_spectrogram(use_filtered = filter_bool, plot_events=False)  

    if plot_phases:
        channel_worker.plot_phases()

run_all_channels(line_num, remove_artifacts=False, filter_type=None, filter_range=[0.5, 4], plot_channel=False, plot_spectrogram=False, logging_level='CRITICAL')

Run ephys analysis pipeline for all channels in an experiment.

Iterates through all channels listed in the experiment metadata and performs the analysis workflow on each.

Parameters:

Name Type Description Default
line_num int

Experiment line number in experiments.csv.

required
remove_artifacts bool

If True, apply artifact removal.

False
filter_type Optional[str]

Filter type ('butter', 'fir') or None to skip.

None
filter_range List[float]

[low, high] cutoff frequencies for bandpass.

[0.5, 4]
plot_channel bool

If True, plot time-domain signals.

False
plot_spectrogram bool

If True, plot spectrograms.

False
logging_level Union[str, int]

Logging verbosity.

'CRITICAL'
Source code in src/ace_neuro/pipelines/ephys.py
def run_all_channels(
    self, 
    line_num: int,
    remove_artifacts: bool = False,
    filter_type: Optional[str] = None, # if desired, enter the type, eg "butter"
    filter_range: List[float] = [0.5, 4],
    plot_channel: bool = False,
    plot_spectrogram: bool = False,
    logging_level: Union[str, int] = "CRITICAL"
) -> None:
    """Run ephys analysis pipeline for all channels in an experiment.

    Iterates through all channels listed in the experiment metadata
    and performs the analysis workflow on each.

    Args:
        line_num: Experiment line number in experiments.csv.
        remove_artifacts: If True, apply artifact removal.
        filter_type: Filter type ('butter', 'fir') or None to skip.
        filter_range: [low, high] cutoff frequencies for bandpass.
        plot_channel: If True, plot time-domain signals.
        plot_spectrogram: If True, plot spectrograms.
        logging_level: Logging verbosity.
    """


    logger = logging.getLogger(__name__)
    logger.setLevel(logging_level)

    # set the filter boolean based on if filter_type is None
    filter: bool = True if filter_type is not None else False

    experiment_data_manager = ExperimentDataManager(line_num, logging_level = logging_level)

    if experiment_data_manager.metadata is None:
         raise ValueError(f"Metadata could not be loaded for line {line_num}")
    channels_str = experiment_data_manager.metadata['LFP and EEG CSCs']
    channels_list: List = [*channels_str] # unpack

    ephys_directory = experiment_data_manager.get_ephys_directory()

Multimodal Pipeline

ace_neuro.pipelines.multimodal.MultimodalPipeline

Orchestrates ephys + miniscope analysis and multimodal alignment.

After :meth:run, these instance attributes are populated (None if a stage did not apply):

  • :attr:ephys_pipeline — :class:EphysPipeline instance used for this run
  • :attr:miniscope_pipeline — :class:MiniscopePipeline instance used
  • :attr:t_ca_im — aligned calcium frame times from TTL sync
  • :attr:low_confidence_periods — sync quality mask from alignment
  • :attr:ephys_idx_all_TTL_events — ephys sample indices for TTL events
  • :attr:ephys_idx_ca_events — ephys indices at calcium events (if ca_events)
  • :attr:ca_frame_num_of_ephys_idx — per-frame mapping (if TTL indices exist)
  • :attr:ca_events_phases_ephys — phase samples for CA events (ephys band)
  • :attr:ca_events_phases_miniscope — phase samples for CA events (miniscope)
  • :attr:phase_hist_ephys / :attr:phase_bin_edges_ephys — histogram of ephys phases
  • :attr:phase_hist_miniscope / :attr:phase_bin_edges_miniscope — histogram of miniscope phases
Source code in src/ace_neuro/pipelines/multimodal.py
class MultimodalPipeline:
    """Orchestrates ephys + miniscope analysis and multimodal alignment.

    After :meth:`run`, these instance attributes are populated (``None`` if a
    stage did not apply):

    - :attr:`ephys_pipeline` — :class:`EphysPipeline` instance used for this run
    - :attr:`miniscope_pipeline` — :class:`MiniscopePipeline` instance used
    - :attr:`t_ca_im` — aligned calcium frame times from TTL sync
    - :attr:`low_confidence_periods` — sync quality mask from alignment
    - :attr:`ephys_idx_all_TTL_events` — ephys sample indices for TTL events
    - :attr:`ephys_idx_ca_events` — ephys indices at calcium events (if ``ca_events``)
    - :attr:`ca_frame_num_of_ephys_idx` — per-frame mapping (if TTL indices exist)
    - :attr:`ca_events_phases_ephys` — phase samples for CA events (ephys band)
    - :attr:`ca_events_phases_miniscope` — phase samples for CA events (miniscope)
    - :attr:`phase_hist_ephys` / :attr:`phase_bin_edges_ephys` — histogram of ephys phases
    - :attr:`phase_hist_miniscope` / :attr:`phase_bin_edges_miniscope` — histogram of miniscope phases
    """

    def __init__(self) -> None:
        self.miniscope_pipeline = MiniscopePipeline()
        self.ephys_pipeline = EphysPipeline()
        self.t_ca_im: np.ndarray | None = None
        self.low_confidence_periods: Any = None
        self.ephys_idx_all_TTL_events: Any = None
        self.ephys_idx_ca_events: Any = None
        self.ca_frame_num_of_ephys_idx: Any = None
        self.ca_events_phases_ephys: Any = None
        self.ca_events_phases_miniscope: Any = None
        self.phase_hist_ephys: Any = None
        self.phase_bin_edges_ephys: Any = None
        self.phase_hist_miniscope: Any = None
        self.phase_bin_edges_miniscope: Any = None

    def run(
        self,
        line_num: int,
        project_path: str | Path | None = None,
        data_path: str | Path | None = None,
        # ephys parameters
        channel_name: str = 'PFCLFPvsCBEEG',
        remove_artifacts: bool = False,
        filter_type: str | None = None,
        filter_range: list[float] = [0.5, 4],
        plot_channel: bool = False,
        plot_spectrogram: bool = False,
        plot_phases: bool = False,
        logging_level: str = "CRITICAL",

        # miniscope parameters
        miniscope_filenames: list[str] = [],
        # preprocessing parameters
        crop: bool = True,
        crop_coords: list[int] | tuple[int, int, int, int] | None = None,
        detrend_method: str = 'median',
        df_over_f: bool = False,
        # if df_over_f = True
        secs_window: float = 5,
        quantile_min: float = 8,
        df_over_f_method: str = 'delta_f_over_sqrt_f',

        # processing parameters
        parallel: bool = False,
        n_processes: int = 6,
        apply_motion_correction: bool = True,
        inspect_motion_correction: bool = True,
        plot_params: bool = False,
        run_CNMFE: bool = True,
        save_estimates: bool = True,
        save_CNMFE_estimates_filename: str = 'estimates.hdf5',
        save_CNMFE_params: bool = False,

        # post-processing parameters
        remove_components_with_gui: bool = True,
        find_calcium_events: bool = True,
        derivative_for_estimates: str = 'first',
        event_height: float = 5,
        compute_miniscope_phase: bool = True,
        filter_miniscope_data: bool = True,
        n: int = 2,
        cut: list[float] = [0.1, 1.5],
        ftype: str = 'butter',
        btype: str = 'bandpass',
        inline: bool = False,
        compute_miniscope_spectrogram: bool = True,
        window_length: float = 30,
        window_step: float = 3,
        freq_lims: list[float] = [0, 15],
        time_bandwidth: float = 23,

        # multimodal parameters
        delete_TTLs: bool = True,
        fix_TTL_gaps: bool = False,
        only_experiment_events: bool = True,
        all_TTL_events: bool = True,
        ca_events: bool = False,
        time_range: list[float] | None = None,
        headless: bool = False
    ) -> None:
        """Run the complete multimodal analysis pipeline.

        Executes both ephys and miniscope pipelines, synchronizes their
        timestamps via TTL events, and performs phase-locked calcium event
        analysis.

        Args:
            line_num: Experiment line number in experiments.csv.
            channel_name: Ephys channel name to analyze.
            remove_artifacts: If True, remove ephys artifacts.
            filter_type: Ephys filter type ('butter', 'fir') or None.
            filter_range: [low, high] bandpass cutoffs for ephys.
            plot_channel: If True, plot ephys time series.
            plot_spectrogram: If True, plot ephys spectrogram.
            plot_phases: If True, plot phase histograms.
            logging_level: Verbosity level.
            miniscope_filenames: List of movie files to load.
            crop: If True, crop the movie.
            crop_coords: Crop coordinates as (x0, y0, x1, y1) tuple/list.
                If None, reads from analysis_parameters.csv or opens the GUI.
            detrend_method: 'median' or 'linear' detrending.
            df_over_f: If True, compute DF/F.
            parallel: If True, use multiprocessing.
            n_processes: Number of parallel processes.
            apply_motion_correction: If True, correct motion.
            run_CNMFE: If True, run source extraction.
            delete_TTLs: If True, remove dropped frame TTLs.
            fix_TTL_gaps: If True, interpolate missing TTLs.
            only_experiment_events: If True, keep only experiment events.
            all_TTL_events: If True, process all TTL events.
            ca_events: If True, include calcium event analysis.
            time_range: Optional [start, end] time range to analyze.
            headless: If True, disable all GUI interactions.
        """
        self.t_ca_im = None
        self.low_confidence_periods = None
        self.ephys_idx_all_TTL_events = None
        self.ephys_idx_ca_events = None
        self.ca_frame_num_of_ephys_idx = None
        self.ca_events_phases_ephys = None
        self.ca_events_phases_miniscope = None
        self.phase_hist_ephys = None
        self.phase_bin_edges_ephys = None
        self.phase_hist_miniscope = None
        self.phase_bin_edges_miniscope = None

        self.ephys_pipeline = EphysPipeline()
        try:
            self.ephys_pipeline.run(
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                channel_name=channel_name,
                remove_artifacts=remove_artifacts,
                filter_type=filter_type,
                filter_range=filter_range,
                plot_channel=plot_channel,
                plot_spectrogram=plot_spectrogram,
                plot_phases=plot_phases,
                logging_level=logging_level,
                headless=headless,
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Multimodal run failed during ephys sub-pipeline.",
                stage="run_ephys_pipeline",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Check ephys input paths and channel parameters before multimodal sync.",
            ) from e

        self.miniscope_pipeline = MiniscopePipeline()
        try:
            self.miniscope_pipeline.run(
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                filenames=miniscope_filenames,
                crop=crop,
                crop_coords=crop_coords,
                detrend_method=detrend_method,
                df_over_f=df_over_f,
                secs_window=secs_window,
                quantile_min=quantile_min,
                df_over_f_method=df_over_f_method,
                parallel=parallel,
                n_processes=n_processes,
                apply_motion_correction=apply_motion_correction,
                inspect_motion_correction=inspect_motion_correction,
                plot_params=plot_params,
                run_CNMFE=run_CNMFE,
                save_estimates=save_estimates,
                save_CNMFE_estimates_filename=save_CNMFE_estimates_filename,
                save_CNMFE_params=save_CNMFE_params,
                remove_components_with_gui=remove_components_with_gui,
                find_calcium_events=find_calcium_events,
                derivative_for_estimates=derivative_for_estimates,
                event_height=event_height,
                compute_miniscope_phase=compute_miniscope_phase,
                filter_miniscope_data=filter_miniscope_data,
                n=n,
                cut=cut,
                ftype=ftype,
                btype=btype,
                inline=inline,
                compute_miniscope_spectrogram=compute_miniscope_spectrogram,
                window_length=window_length,
                window_step=window_step,
                freq_lims=freq_lims,
                time_bandwidth=time_bandwidth,
                headless=headless,
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Multimodal run failed during miniscope sub-pipeline.",
                stage="run_miniscope_pipeline",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Check miniscope inputs and CNMF-E parameters before multimodal sync.",
            ) from e

        channel_object = self.ephys_pipeline.ephys_data_manager.get_channel(channel_name)
        frame_rate = self.miniscope_pipeline.miniscope_data_manager.fr
        ca_events_idx = self.miniscope_pipeline.miniscope_data_manager.ca_events_idx
        miniscope_phases = self.miniscope_pipeline.miniscope_data_manager.miniscope_phases

        try:
            t_ca_im, low_confidence_periods, channel_object, miniscope_dm = sync_neuralynx_miniscope_timestamps(
                channel_object,
                self.miniscope_pipeline.miniscope_data_manager,
                self.ephys_pipeline.ephys_data_manager,
                delete_TTLs=delete_TTLs,
                fix_TTL_gaps=fix_TTL_gaps,
                only_experiment_events=only_experiment_events,
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Timestamp synchronization failed in multimodal pipeline.",
                stage="sync_timestamps",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Validate TTL events and ensure both pipelines produced aligned timing metadata.",
            ) from e

        print("\nSuccess! Setting changed variables.")
        self.miniscope_pipeline.miniscope_data_manager = miniscope_dm
        self.ephys_pipeline.ephys_data_manager.channels[channel_name] = channel_object

        self.t_ca_im = t_ca_im
        self.low_confidence_periods = low_confidence_periods

        try:
            ephys_idx_all_TTL_events, ephys_idx_ca_events = find_ephys_idx_of_TTL_events(
                t_ca_im,
                channel_object,
                frame_rate,
                all_TTL_events=all_TTL_events,
                ca_events_idx=ca_events_idx if ca_events else None,
            )
        except Exception as e:
            raise PipelineExecutionError(
                "Failed to map TTL events into ephys indices.",
                stage="map_ttl_to_ephys_indices",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Check frame-rate metadata and TTL event quality.",
            ) from e
        self.ephys_idx_all_TTL_events = ephys_idx_all_TTL_events
        self.ephys_idx_ca_events = ephys_idx_ca_events

        if ephys_idx_all_TTL_events is not None:
            self.ca_frame_num_of_ephys_idx = find_ca_movie_frame_num_of_ephys_idx(channel_object, ephys_idx_all_TTL_events)

        if ephys_idx_ca_events is not None:
            try:
                self.ca_events_phases_ephys = ephys_phase_ca_events(ephys_idx_ca_events, channel_object, neurons='all')
                self.ca_events_phases_miniscope = miniscope_phase_ca_events(ca_events_idx, miniscope_phases, neurons='all')
            except Exception as e:
                raise PipelineExecutionError(
                    "Failed to compute event-locked phases.",
                    stage="compute_phase_locked_events",
                    line_num=line_num,
                    project_path=project_path,
                    data_path=data_path,
                    hint="Ensure phase arrays and calcium event indices are valid and non-empty.",
                ) from e

        hist1, bin_edges1 = None, None
        hist2, bin_edges2 = None, None

        if ephys_idx_ca_events is not None:
            if self.ca_events_phases_ephys is not None:
                res1 = phase_ca_events_histogram(self.ca_events_phases_ephys)
                hist1, bin_edges1 = res1[0], res1[1]

            if self.ca_events_phases_miniscope is not None:
                res2 = phase_ca_events_histogram(self.ca_events_phases_miniscope)
                hist2, bin_edges2 = res2[0], res2[1]

        self.phase_hist_ephys = hist1
        self.phase_bin_edges_ephys = bin_edges1
        self.phase_hist_miniscope = hist2
        self.phase_bin_edges_miniscope = bin_edges2

run(line_num, project_path=None, data_path=None, channel_name='PFCLFPvsCBEEG', remove_artifacts=False, filter_type=None, filter_range=[0.5, 4], plot_channel=False, plot_spectrogram=False, plot_phases=False, logging_level='CRITICAL', miniscope_filenames=[], crop=True, crop_coords=None, detrend_method='median', df_over_f=False, secs_window=5, quantile_min=8, df_over_f_method='delta_f_over_sqrt_f', parallel=False, n_processes=6, apply_motion_correction=True, inspect_motion_correction=True, plot_params=False, run_CNMFE=True, save_estimates=True, save_CNMFE_estimates_filename='estimates.hdf5', save_CNMFE_params=False, remove_components_with_gui=True, find_calcium_events=True, derivative_for_estimates='first', event_height=5, compute_miniscope_phase=True, filter_miniscope_data=True, n=2, cut=[0.1, 1.5], ftype='butter', btype='bandpass', inline=False, compute_miniscope_spectrogram=True, window_length=30, window_step=3, freq_lims=[0, 15], time_bandwidth=23, delete_TTLs=True, fix_TTL_gaps=False, only_experiment_events=True, all_TTL_events=True, ca_events=False, time_range=None, headless=False)

Run the complete multimodal analysis pipeline.

Executes both ephys and miniscope pipelines, synchronizes their timestamps via TTL events, and performs phase-locked calcium event analysis.

Parameters:

Name Type Description Default
line_num int

Experiment line number in experiments.csv.

required
channel_name str

Ephys channel name to analyze.

'PFCLFPvsCBEEG'
remove_artifacts bool

If True, remove ephys artifacts.

False
filter_type str | None

Ephys filter type ('butter', 'fir') or None.

None
filter_range list[float]

[low, high] bandpass cutoffs for ephys.

[0.5, 4]
plot_channel bool

If True, plot ephys time series.

False
plot_spectrogram bool

If True, plot ephys spectrogram.

False
plot_phases bool

If True, plot phase histograms.

False
logging_level str

Verbosity level.

'CRITICAL'
miniscope_filenames list[str]

List of movie files to load.

[]
crop bool

If True, crop the movie.

True
crop_coords list[int] | tuple[int, int, int, int] | None

Crop coordinates as (x0, y0, x1, y1) tuple/list. If None, reads from analysis_parameters.csv or opens the GUI.

None
detrend_method str

'median' or 'linear' detrending.

'median'
df_over_f bool

If True, compute DF/F.

False
parallel bool

If True, use multiprocessing.

False
n_processes int

Number of parallel processes.

6
apply_motion_correction bool

If True, correct motion.

True
run_CNMFE bool

If True, run source extraction.

True
delete_TTLs bool

If True, remove dropped frame TTLs.

True
fix_TTL_gaps bool

If True, interpolate missing TTLs.

False
only_experiment_events bool

If True, keep only experiment events.

True
all_TTL_events bool

If True, process all TTL events.

True
ca_events bool

If True, include calcium event analysis.

False
time_range list[float] | None

Optional [start, end] time range to analyze.

None
headless bool

If True, disable all GUI interactions.

False
Source code in src/ace_neuro/pipelines/multimodal.py
def run(
    self,
    line_num: int,
    project_path: str | Path | None = None,
    data_path: str | Path | None = None,
    # ephys parameters
    channel_name: str = 'PFCLFPvsCBEEG',
    remove_artifacts: bool = False,
    filter_type: str | None = None,
    filter_range: list[float] = [0.5, 4],
    plot_channel: bool = False,
    plot_spectrogram: bool = False,
    plot_phases: bool = False,
    logging_level: str = "CRITICAL",

    # miniscope parameters
    miniscope_filenames: list[str] = [],
    # preprocessing parameters
    crop: bool = True,
    crop_coords: list[int] | tuple[int, int, int, int] | None = None,
    detrend_method: str = 'median',
    df_over_f: bool = False,
    # if df_over_f = True
    secs_window: float = 5,
    quantile_min: float = 8,
    df_over_f_method: str = 'delta_f_over_sqrt_f',

    # processing parameters
    parallel: bool = False,
    n_processes: int = 6,
    apply_motion_correction: bool = True,
    inspect_motion_correction: bool = True,
    plot_params: bool = False,
    run_CNMFE: bool = True,
    save_estimates: bool = True,
    save_CNMFE_estimates_filename: str = 'estimates.hdf5',
    save_CNMFE_params: bool = False,

    # post-processing parameters
    remove_components_with_gui: bool = True,
    find_calcium_events: bool = True,
    derivative_for_estimates: str = 'first',
    event_height: float = 5,
    compute_miniscope_phase: bool = True,
    filter_miniscope_data: bool = True,
    n: int = 2,
    cut: list[float] = [0.1, 1.5],
    ftype: str = 'butter',
    btype: str = 'bandpass',
    inline: bool = False,
    compute_miniscope_spectrogram: bool = True,
    window_length: float = 30,
    window_step: float = 3,
    freq_lims: list[float] = [0, 15],
    time_bandwidth: float = 23,

    # multimodal parameters
    delete_TTLs: bool = True,
    fix_TTL_gaps: bool = False,
    only_experiment_events: bool = True,
    all_TTL_events: bool = True,
    ca_events: bool = False,
    time_range: list[float] | None = None,
    headless: bool = False
) -> None:
    """Run the complete multimodal analysis pipeline.

    Executes both ephys and miniscope pipelines, synchronizes their
    timestamps via TTL events, and performs phase-locked calcium event
    analysis.

    Args:
        line_num: Experiment line number in experiments.csv.
        channel_name: Ephys channel name to analyze.
        remove_artifacts: If True, remove ephys artifacts.
        filter_type: Ephys filter type ('butter', 'fir') or None.
        filter_range: [low, high] bandpass cutoffs for ephys.
        plot_channel: If True, plot ephys time series.
        plot_spectrogram: If True, plot ephys spectrogram.
        plot_phases: If True, plot phase histograms.
        logging_level: Verbosity level.
        miniscope_filenames: List of movie files to load.
        crop: If True, crop the movie.
        crop_coords: Crop coordinates as (x0, y0, x1, y1) tuple/list.
            If None, reads from analysis_parameters.csv or opens the GUI.
        detrend_method: 'median' or 'linear' detrending.
        df_over_f: If True, compute DF/F.
        parallel: If True, use multiprocessing.
        n_processes: Number of parallel processes.
        apply_motion_correction: If True, correct motion.
        run_CNMFE: If True, run source extraction.
        delete_TTLs: If True, remove dropped frame TTLs.
        fix_TTL_gaps: If True, interpolate missing TTLs.
        only_experiment_events: If True, keep only experiment events.
        all_TTL_events: If True, process all TTL events.
        ca_events: If True, include calcium event analysis.
        time_range: Optional [start, end] time range to analyze.
        headless: If True, disable all GUI interactions.
    """
    self.t_ca_im = None
    self.low_confidence_periods = None
    self.ephys_idx_all_TTL_events = None
    self.ephys_idx_ca_events = None
    self.ca_frame_num_of_ephys_idx = None
    self.ca_events_phases_ephys = None
    self.ca_events_phases_miniscope = None
    self.phase_hist_ephys = None
    self.phase_bin_edges_ephys = None
    self.phase_hist_miniscope = None
    self.phase_bin_edges_miniscope = None

    self.ephys_pipeline = EphysPipeline()
    try:
        self.ephys_pipeline.run(
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            channel_name=channel_name,
            remove_artifacts=remove_artifacts,
            filter_type=filter_type,
            filter_range=filter_range,
            plot_channel=plot_channel,
            plot_spectrogram=plot_spectrogram,
            plot_phases=plot_phases,
            logging_level=logging_level,
            headless=headless,
        )
    except Exception as e:
        raise PipelineExecutionError(
            "Multimodal run failed during ephys sub-pipeline.",
            stage="run_ephys_pipeline",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Check ephys input paths and channel parameters before multimodal sync.",
        ) from e

    self.miniscope_pipeline = MiniscopePipeline()
    try:
        self.miniscope_pipeline.run(
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            filenames=miniscope_filenames,
            crop=crop,
            crop_coords=crop_coords,
            detrend_method=detrend_method,
            df_over_f=df_over_f,
            secs_window=secs_window,
            quantile_min=quantile_min,
            df_over_f_method=df_over_f_method,
            parallel=parallel,
            n_processes=n_processes,
            apply_motion_correction=apply_motion_correction,
            inspect_motion_correction=inspect_motion_correction,
            plot_params=plot_params,
            run_CNMFE=run_CNMFE,
            save_estimates=save_estimates,
            save_CNMFE_estimates_filename=save_CNMFE_estimates_filename,
            save_CNMFE_params=save_CNMFE_params,
            remove_components_with_gui=remove_components_with_gui,
            find_calcium_events=find_calcium_events,
            derivative_for_estimates=derivative_for_estimates,
            event_height=event_height,
            compute_miniscope_phase=compute_miniscope_phase,
            filter_miniscope_data=filter_miniscope_data,
            n=n,
            cut=cut,
            ftype=ftype,
            btype=btype,
            inline=inline,
            compute_miniscope_spectrogram=compute_miniscope_spectrogram,
            window_length=window_length,
            window_step=window_step,
            freq_lims=freq_lims,
            time_bandwidth=time_bandwidth,
            headless=headless,
        )
    except Exception as e:
        raise PipelineExecutionError(
            "Multimodal run failed during miniscope sub-pipeline.",
            stage="run_miniscope_pipeline",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Check miniscope inputs and CNMF-E parameters before multimodal sync.",
        ) from e

    channel_object = self.ephys_pipeline.ephys_data_manager.get_channel(channel_name)
    frame_rate = self.miniscope_pipeline.miniscope_data_manager.fr
    ca_events_idx = self.miniscope_pipeline.miniscope_data_manager.ca_events_idx
    miniscope_phases = self.miniscope_pipeline.miniscope_data_manager.miniscope_phases

    try:
        t_ca_im, low_confidence_periods, channel_object, miniscope_dm = sync_neuralynx_miniscope_timestamps(
            channel_object,
            self.miniscope_pipeline.miniscope_data_manager,
            self.ephys_pipeline.ephys_data_manager,
            delete_TTLs=delete_TTLs,
            fix_TTL_gaps=fix_TTL_gaps,
            only_experiment_events=only_experiment_events,
        )
    except Exception as e:
        raise PipelineExecutionError(
            "Timestamp synchronization failed in multimodal pipeline.",
            stage="sync_timestamps",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Validate TTL events and ensure both pipelines produced aligned timing metadata.",
        ) from e

    print("\nSuccess! Setting changed variables.")
    self.miniscope_pipeline.miniscope_data_manager = miniscope_dm
    self.ephys_pipeline.ephys_data_manager.channels[channel_name] = channel_object

    self.t_ca_im = t_ca_im
    self.low_confidence_periods = low_confidence_periods

    try:
        ephys_idx_all_TTL_events, ephys_idx_ca_events = find_ephys_idx_of_TTL_events(
            t_ca_im,
            channel_object,
            frame_rate,
            all_TTL_events=all_TTL_events,
            ca_events_idx=ca_events_idx if ca_events else None,
        )
    except Exception as e:
        raise PipelineExecutionError(
            "Failed to map TTL events into ephys indices.",
            stage="map_ttl_to_ephys_indices",
            line_num=line_num,
            project_path=project_path,
            data_path=data_path,
            hint="Check frame-rate metadata and TTL event quality.",
        ) from e
    self.ephys_idx_all_TTL_events = ephys_idx_all_TTL_events
    self.ephys_idx_ca_events = ephys_idx_ca_events

    if ephys_idx_all_TTL_events is not None:
        self.ca_frame_num_of_ephys_idx = find_ca_movie_frame_num_of_ephys_idx(channel_object, ephys_idx_all_TTL_events)

    if ephys_idx_ca_events is not None:
        try:
            self.ca_events_phases_ephys = ephys_phase_ca_events(ephys_idx_ca_events, channel_object, neurons='all')
            self.ca_events_phases_miniscope = miniscope_phase_ca_events(ca_events_idx, miniscope_phases, neurons='all')
        except Exception as e:
            raise PipelineExecutionError(
                "Failed to compute event-locked phases.",
                stage="compute_phase_locked_events",
                line_num=line_num,
                project_path=project_path,
                data_path=data_path,
                hint="Ensure phase arrays and calcium event indices are valid and non-empty.",
            ) from e

    hist1, bin_edges1 = None, None
    hist2, bin_edges2 = None, None

    if ephys_idx_ca_events is not None:
        if self.ca_events_phases_ephys is not None:
            res1 = phase_ca_events_histogram(self.ca_events_phases_ephys)
            hist1, bin_edges1 = res1[0], res1[1]

        if self.ca_events_phases_miniscope is not None:
            res2 = phase_ca_events_histogram(self.ca_events_phases_miniscope)
            hist2, bin_edges2 = res2[0], res2[1]

    self.phase_hist_ephys = hist1
    self.phase_bin_edges_ephys = bin_edges1
    self.phase_hist_miniscope = hist2
    self.phase_bin_edges_miniscope = bin_edges2