Skip to content

Ephys Modules API

This section details the pipeline for processing electrophysiological data (Neuralynx/ONIX).

Ephys Data Manager

ace_neuro.ephys.ephys_data_manager.EphysDataManager

Bases: ABC

Abstract base class for ephys data managers. Manages the import of raw ephys data and processes it into channels. Stores the processed channels in self.channels, where the key is the channel name and the value is a Channel object.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
class EphysDataManager(ABC):
    """
    Abstract base class for ephys data managers.
    Manages the import of raw ephys data and processes it into channels.
    Stores the processed channels in self.channels, where the key is the channel name and the value is a Channel object.
    """

    _registry: List[Type['EphysDataManager']] = []
    logger: logging.Logger
    channels: Dict[str, Channel]
    ephys_block: Any

    def __init_subclass__(cls, **kwargs: Any) -> None:
        super().__init_subclass__(**kwargs)
        if cls not in cls._registry:
            cls._registry.append(cls)

    @classmethod
    def create(cls: Type[T], ephys_directory: Union[str, Path], **kwargs: Any) -> T:
        """Factory method to create the appropriate subclasses for the directory."""
        if ephys_directory is None:
            raise ValueError("ephys_directory must be provided to create() factory.")

        for subclass in cls._registry:
            if subclass.can_handle(ephys_directory):
                return cast(T, subclass(ephys_directory=ephys_directory, **kwargs))

        raise ValueError(f"No EphysDataManager subclass found that can handle directory: {ephys_directory}")

    @classmethod
    @abstractmethod
    def can_handle(cls, directory: Union[str, Path]) -> bool:
        """Return True if this class can handle the format in the given directory."""
        pass


    def __init__(
        self, 
        ephys_directory: Optional[Union[str, Path]] = None,
        auto_import_ephys_block: bool = True, 
        auto_process_block: bool = True, 
        auto_compute_phases: bool = True, 
        level: Union[str, int] = "CRITICAL", 
        channels: Optional[List[str]] = None, 
        remove_artifacts: bool = False
    ) -> None:
        """Initialize the EphysDataManager and optionally load data.

        Args:
            ephys_directory: Path to directory containing ephys data.
            auto_import_ephys_block: If True, automatically import raw ephys data.
            auto_process_block: If True, automatically process block into channels.
            auto_compute_phases: If True, automatically compute phase for all channels.
            level: Logging level string.
            channels: Channel names to process (optional).
            remove_artifacts: If True, apply artifact removal during processing.
        """
        self.logger = logging.getLogger(self.__class__.__name__)
        self.logger.setLevel(level)

        self.channels = {}  # Processed channels
        self.ephys_block = None  # Raw data storage

        if auto_import_ephys_block:
            assert ephys_directory is not None
            self.import_ephys_block(ephys_directory)

        if auto_process_block:
            self.process_ephys_block_to_channels(channels=channels, remove_artifacts=remove_artifacts)

        if auto_compute_phases:
            self.compute_phases_all_channels()


    @abstractmethod
    def import_ephys_block(self, ephys_directory: Union[str, Path]) -> None:
        """Load raw ephys data from disk."""
        pass

    @abstractmethod
    def process_ephys_block_to_channels(self, channels: Optional[List[str]] = None, remove_artifacts: bool = False) -> None:
        """Process raw ephys data into Channel objects."""
        pass

    @abstractmethod
    def get_sync_timestamps(self, channel_name: Optional[str] = None) -> np.ndarray:
        """
        Extract raw hardware sync timestamps from an ephys channel.
        To be overridden by subclasses.
        """
        pass

    def compute_phases_all_channels(self) -> None:
        """Compute instantaneous phase for all loaded channels."""
        for key, value in self.channels.items():
            self.channels[key] = self.compute_phase(value)


    def compute_phase(self, channel: Channel) -> Channel:
        """Compute instantaneous phase using Hilbert transform.

        Args:
            channel: Channel object with signal data.

        Returns:
            Channel object with phases attribute populated.
        """
        print(f"Computing phase for {channel.name}")
        analytic_signal = hilbert(channel.signal)
        channel.phases = np.angle(analytic_signal)
        return channel


    def filter_ephys(
        self, 
        channel_name: str, 
        n: int = 2, 
        cut: Union[float, List[float], np.ndarray] = [0.5, 4], 
        ftype: str = 'butter', 
        btype: str = 'bandpass', 
        replace_signal: bool = True
    ) -> np.ndarray:
        """Apply a frequency filter to a channel's signal.

        Supports FIR and Butterworth filter types with configurable
        cutoff frequencies and band types.

        Args:
            channel_name: Name of the channel to filter.
            n: Filter order (Butterworth) or number of taps (FIR).
            cut: Cutoff frequency or [low, high] for bandpass.
            ftype: Filter type ('butter', 'butterworth', or 'fir').
            btype: Band type ('low', 'high', 'band', 'bandpass').
            replace_signal: If True, overwrite signal; else store in signal_filtered.

        Returns:
            Filtered signal as 1D numpy array.

        Raises:
            ValueError: If channel is not found in loaded channels.
        """
        # self.logger.info('Filtering ' + channel_name + ' with a(n) ' + ftype + ' filter ...')
        try:
            channel: Channel = self.channels[channel_name]
        except KeyError:
            raise ValueError("Channel not found in data_manager. Please import the data first.")

        print(f"Filtering the ephys signal: {channel_name}")

        filtered_data = self._filter_data(
            channel.signal,
            n=n,
            cut=cut,
            ftype=ftype,
            btype=btype,
            fs=channel.sampling_rate
        )

        if (replace_signal):
            self.channels[channel_name].signal = filtered_data
        else:
            self.channels[channel_name].signal_filtered = filtered_data

        return filtered_data

    def get_channels(self) -> Dict[str, Channel]:
        """Return dictionary of all processed channels."""
        return self.channels

    def get_channel(self, channel_name: str) -> Channel:
        """Return a single channel by name.

        Args:
            channel_name: Name of the channel to retrieve.
        """
        return self.channels[channel_name]


    @staticmethod
    def _filter_data(
        data: np.ndarray, 
        n: int, 
        cut: Union[float, List[float], np.ndarray], 
        ftype: str, 
        btype: str, 
        fs: float, 
        bodePlot: bool = False
    ) -> np.ndarray:
        """Apply FIR or Butterworth filter to signal data.

        Args:
            data: 1D numpy array of signal values.
            n: Filter order (Butterworth) or number of taps (FIR).
            cut: Cutoff frequency or [low, high] for bandpass.
            ftype: Filter type ('fir', 'butter', or 'butterworth').
            btype: Band type ('low', 'high', 'band', 'bandpass', etc.).
            fs: Sampling frequency in Hz.
            bodePlot: If True, plot Bode diagram of filter response.

        Returns:
            Filtered signal as 1D numpy array.
        """
        from scipy.signal import butter, freqz, filtfilt, firwin, bode  # type: ignore
        import logging

        # Set up logging
        logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(levelname)s - %(message)s') # turn to DEBUG for more info

        # Log input variables
        logging.info(f"Input variables:")
        logging.info(f"- data: {data}")
        logging.info(f"- n: {n}")
        logging.info(f"- cut: {cut}")
        logging.info(f"- ftype: {ftype}")
        logging.info(f"- btype: {btype}")
        logging.info(f"- fs: {fs}")
        logging.info(f"- bodePlot: {bodePlot}")

        # For the FIR filter indicate a LowPass, HighPass, or BandPass with btype = lowpass, highpass, or bandpass, respectively. 
        # n is the length of the filter (number of coefficients, i.e. the filter order + 1). numtaps must be odd if a passband includes the Nyquist frequency.
        # A good value for n is 10000.
        # Channel should be set to desired .ncs file
        # 
        # The Butterworth filters have a more linear phase response in the pass-band than other types and is able to provide better group delay performance, and also a lower level of overshoot.
        # Indicate the filter type by setting btype = 'low', 'high', or 'band'.
        # The default for n is n = 2
        # For a bandpass filter indicate the lowstop and the highstop by using an array. example: wn= ([10, 30])

        if ftype.lower() == 'fir':
            h = firwin(n, cut, pass_zero=btype, fs=fs)  # Build the FIR filter
            filteredData = filtfilt(h, 1, data)  # Zero-phase filter the data
            if bodePlot:
                w, a = freqz(h, worN=10000,fs=2000)
                plt.figure()
                plt.semilogx(w, abs(a))

                w, mag, phase = bode((h,1),w=2*np.pi*w)
                plt.figure()
                plt.semilogx(w,mag)
                plt.figure()
                plt.semilogx(w,phase)

        if ftype.lower() == 'butterworth' or ftype.lower() == 'butter':
            print(f"fs: {type(fs)}")
            b, a = butter(n, cut, btype=btype, fs=fs)
            filteredData = filtfilt(b, a, data)

            if bodePlot:
                w, h = freqz(b, a, worN=10000,fs=2000)
                plt.figure()
                plt.semilogx(w, abs(h))

                w, mag, phase = bode((b,a),w=2*np.pi*w)
                plt.figure()
                plt.semilogx(w,mag)
                plt.figure()
                plt.semilogx(w,phase)
        else:
            raise ValueError(f"Unknown filter type: {ftype}")

        return filteredData

__init__(ephys_directory=None, auto_import_ephys_block=True, auto_process_block=True, auto_compute_phases=True, level='CRITICAL', channels=None, remove_artifacts=False)

Initialize the EphysDataManager and optionally load data.

Parameters:

Name Type Description Default
ephys_directory Optional[Union[str, Path]]

Path to directory containing ephys data.

None
auto_import_ephys_block bool

If True, automatically import raw ephys data.

True
auto_process_block bool

If True, automatically process block into channels.

True
auto_compute_phases bool

If True, automatically compute phase for all channels.

True
level Union[str, int]

Logging level string.

'CRITICAL'
channels Optional[List[str]]

Channel names to process (optional).

None
remove_artifacts bool

If True, apply artifact removal during processing.

False
Source code in src/ace_neuro/ephys/ephys_data_manager.py
def __init__(
    self, 
    ephys_directory: Optional[Union[str, Path]] = None,
    auto_import_ephys_block: bool = True, 
    auto_process_block: bool = True, 
    auto_compute_phases: bool = True, 
    level: Union[str, int] = "CRITICAL", 
    channels: Optional[List[str]] = None, 
    remove_artifacts: bool = False
) -> None:
    """Initialize the EphysDataManager and optionally load data.

    Args:
        ephys_directory: Path to directory containing ephys data.
        auto_import_ephys_block: If True, automatically import raw ephys data.
        auto_process_block: If True, automatically process block into channels.
        auto_compute_phases: If True, automatically compute phase for all channels.
        level: Logging level string.
        channels: Channel names to process (optional).
        remove_artifacts: If True, apply artifact removal during processing.
    """
    self.logger = logging.getLogger(self.__class__.__name__)
    self.logger.setLevel(level)

    self.channels = {}  # Processed channels
    self.ephys_block = None  # Raw data storage

    if auto_import_ephys_block:
        assert ephys_directory is not None
        self.import_ephys_block(ephys_directory)

    if auto_process_block:
        self.process_ephys_block_to_channels(channels=channels, remove_artifacts=remove_artifacts)

    if auto_compute_phases:
        self.compute_phases_all_channels()

can_handle(directory) abstractmethod classmethod

Return True if this class can handle the format in the given directory.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
@classmethod
@abstractmethod
def can_handle(cls, directory: Union[str, Path]) -> bool:
    """Return True if this class can handle the format in the given directory."""
    pass

compute_phase(channel)

Compute instantaneous phase using Hilbert transform.

Parameters:

Name Type Description Default
channel Channel

Channel object with signal data.

required

Returns:

Type Description
Channel

Channel object with phases attribute populated.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
def compute_phase(self, channel: Channel) -> Channel:
    """Compute instantaneous phase using Hilbert transform.

    Args:
        channel: Channel object with signal data.

    Returns:
        Channel object with phases attribute populated.
    """
    print(f"Computing phase for {channel.name}")
    analytic_signal = hilbert(channel.signal)
    channel.phases = np.angle(analytic_signal)
    return channel

compute_phases_all_channels()

Compute instantaneous phase for all loaded channels.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
def compute_phases_all_channels(self) -> None:
    """Compute instantaneous phase for all loaded channels."""
    for key, value in self.channels.items():
        self.channels[key] = self.compute_phase(value)

create(ephys_directory, **kwargs) classmethod

Factory method to create the appropriate subclasses for the directory.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
@classmethod
def create(cls: Type[T], ephys_directory: Union[str, Path], **kwargs: Any) -> T:
    """Factory method to create the appropriate subclasses for the directory."""
    if ephys_directory is None:
        raise ValueError("ephys_directory must be provided to create() factory.")

    for subclass in cls._registry:
        if subclass.can_handle(ephys_directory):
            return cast(T, subclass(ephys_directory=ephys_directory, **kwargs))

    raise ValueError(f"No EphysDataManager subclass found that can handle directory: {ephys_directory}")

filter_ephys(channel_name, n=2, cut=[0.5, 4], ftype='butter', btype='bandpass', replace_signal=True)

Apply a frequency filter to a channel's signal.

Supports FIR and Butterworth filter types with configurable cutoff frequencies and band types.

Parameters:

Name Type Description Default
channel_name str

Name of the channel to filter.

required
n int

Filter order (Butterworth) or number of taps (FIR).

2
cut Union[float, List[float], ndarray]

Cutoff frequency or [low, high] for bandpass.

[0.5, 4]
ftype str

Filter type ('butter', 'butterworth', or 'fir').

'butter'
btype str

Band type ('low', 'high', 'band', 'bandpass').

'bandpass'
replace_signal bool

If True, overwrite signal; else store in signal_filtered.

True

Returns:

Type Description
ndarray

Filtered signal as 1D numpy array.

Raises:

Type Description
ValueError

If channel is not found in loaded channels.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
def filter_ephys(
    self, 
    channel_name: str, 
    n: int = 2, 
    cut: Union[float, List[float], np.ndarray] = [0.5, 4], 
    ftype: str = 'butter', 
    btype: str = 'bandpass', 
    replace_signal: bool = True
) -> np.ndarray:
    """Apply a frequency filter to a channel's signal.

    Supports FIR and Butterworth filter types with configurable
    cutoff frequencies and band types.

    Args:
        channel_name: Name of the channel to filter.
        n: Filter order (Butterworth) or number of taps (FIR).
        cut: Cutoff frequency or [low, high] for bandpass.
        ftype: Filter type ('butter', 'butterworth', or 'fir').
        btype: Band type ('low', 'high', 'band', 'bandpass').
        replace_signal: If True, overwrite signal; else store in signal_filtered.

    Returns:
        Filtered signal as 1D numpy array.

    Raises:
        ValueError: If channel is not found in loaded channels.
    """
    # self.logger.info('Filtering ' + channel_name + ' with a(n) ' + ftype + ' filter ...')
    try:
        channel: Channel = self.channels[channel_name]
    except KeyError:
        raise ValueError("Channel not found in data_manager. Please import the data first.")

    print(f"Filtering the ephys signal: {channel_name}")

    filtered_data = self._filter_data(
        channel.signal,
        n=n,
        cut=cut,
        ftype=ftype,
        btype=btype,
        fs=channel.sampling_rate
    )

    if (replace_signal):
        self.channels[channel_name].signal = filtered_data
    else:
        self.channels[channel_name].signal_filtered = filtered_data

    return filtered_data

get_channel(channel_name)

Return a single channel by name.

Parameters:

Name Type Description Default
channel_name str

Name of the channel to retrieve.

required
Source code in src/ace_neuro/ephys/ephys_data_manager.py
def get_channel(self, channel_name: str) -> Channel:
    """Return a single channel by name.

    Args:
        channel_name: Name of the channel to retrieve.
    """
    return self.channels[channel_name]

get_channels()

Return dictionary of all processed channels.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
def get_channels(self) -> Dict[str, Channel]:
    """Return dictionary of all processed channels."""
    return self.channels

get_sync_timestamps(channel_name=None) abstractmethod

Extract raw hardware sync timestamps from an ephys channel. To be overridden by subclasses.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
@abstractmethod
def get_sync_timestamps(self, channel_name: Optional[str] = None) -> np.ndarray:
    """
    Extract raw hardware sync timestamps from an ephys channel.
    To be overridden by subclasses.
    """
    pass

import_ephys_block(ephys_directory) abstractmethod

Load raw ephys data from disk.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
@abstractmethod
def import_ephys_block(self, ephys_directory: Union[str, Path]) -> None:
    """Load raw ephys data from disk."""
    pass

process_ephys_block_to_channels(channels=None, remove_artifacts=False) abstractmethod

Process raw ephys data into Channel objects.

Source code in src/ace_neuro/ephys/ephys_data_manager.py
@abstractmethod
def process_ephys_block_to_channels(self, channels: Optional[List[str]] = None, remove_artifacts: bool = False) -> None:
    """Process raw ephys data into Channel objects."""
    pass

Block processor

ace_neuro.ephys.block_processor.BlockProcessor

Processes a Neo Block containing raw ephys data into Channel objects.

Handles the conversion of segmented Neuralynx recordings into continuous signal arrays, including artifact removal and event extraction.

Attributes:

Name Type Description
logger Logger

Logger instance for debug output.

ephys_block Block

Neo Block object containing raw ephys segments.

Source code in src/ace_neuro/ephys/block_processor.py
class BlockProcessor:
    """Processes a Neo Block containing raw ephys data into Channel objects.

    Handles the conversion of segmented Neuralynx recordings into continuous
    signal arrays, including artifact removal and event extraction.

    Attributes:
        logger: Logger instance for debug output.
        ephys_block: Neo Block object containing raw ephys segments.
    """

    logger: logging.Logger
    ephys_block: Block

    def __init__(self, ephys_block: Block, logger: logging.Logger):
        """Initialize a BlockProcessor with an ephys Block.

        Args:
            ephys_block: Neo Block object containing raw ephys data.
            logger: Logger instance for debug/info messages.
        """
        self.logger = logger
        self.ephys_block = ephys_block


    def process_raw_ephys(
        self, 
        channels: Union[str, List[str]], 
        remove_artifacts: bool = False
    ) -> Dict[str, Channel]:
        """Convert raw ephys data into processed Channel objects.

        Iterates through requested channel names, extracts signal data from
        all segments, and optionally removes artifacts.

        Args:
            channels: Channel name string or list of channel names to process.
            remove_artifacts: If True, apply artifact removal to each channel.

        Returns:
            Dict mapping channel names to Channel objects.

        Raises:
            ValueError: If ephys_block has not been loaded.
        """
        if not self.ephys_block:
            raise ValueError("Load raw data first using EphysDataManager.import_ephys_data()")

        if type(channels) == str:
            channels = [channels]

        print('Processing raw ephys data into channels...')

        channels_dict = {}
        print(f"channels: {channels}")

        for channel_name in channels:
            # self.logger.info(f"channel_name = {channel_name}")
            print(f"channel_name = {channel_name}")
            assert isinstance(channel_name, str)
            new_channel = self._process_single_channel(channel_name)

            if remove_artifacts:
                self.remove_artifacts(new_channel)

            channels_dict[channel_name] = new_channel

        return channels_dict




    def remove_artifacts(
        self, 
        channel: Channel, 
        volt_threshold: float = 1500, 
        time_threshold: float = 60, 
        hannNum: int = 75
    ) -> None:
        """Remove high-amplitude artifacts from a channel using Hann window smoothing.

        Identifies samples exceeding the voltage threshold, fills short gaps
        between artifact regions, and applies a Hann window to smooth transitions.

        Args:
            channel: Channel object to process (modified in-place).
            volt_threshold: Voltage threshold in µV for artifact detection.
            time_threshold: Maximum gap duration (seconds) to fill between artifacts.
            hannNum: Size of the Hann window for smoothing artifact edges.
        """
        print('Removing artifacts from ' + channel.name + '...')
        dt = channel.time_vector[1] - channel.time_vector[0]
        mean = np.mean(channel.signal)
        channel.signal = channel.signal - mean

        mask = np.abs(channel.signal) > volt_threshold
        mask = self._fill_gaps(mask, dt, time_threshold)
        han_window = self._create_hann_window(hannNum)

        self._apply_hann_window(channel, mask, han_window, dt)



    def _process_single_channel(self, channel_name: str) -> Channel:
        """Process a single channel from raw segment data.

        Extracts signal data across all segments, builds continuous time vector,
        and collects associated events.

        Args:
            channel_name: Name of the channel to process (e.g., 'PFCLFPvsCBEEG').

        Returns:
            Channel object with signal, timing, and event data.

        Raises:
            ValueError: If the channel is not found in the segment data.
        """
        print(f"Channel name: {channel_name}")
        # Get the first and last segments
        first_segment = self.ephys_block.segments[0].analogsignals
        last_segment = self.ephys_block.segments[-1].analogsignals

        # Find the channel in the first segment by name
        try:
            # Get the channel and its index in the first segment
            channel_index, channel = next(
                (i, c) for i, c in enumerate(first_segment) 
                if c.name == channel_name
            )
        except StopIteration:
            raise ValueError(f"Channel '{channel_name}' not found in the first segment.")

        # Find the corresponding channel in the last segment by name
        try:
            last_channel = next(c for c in last_segment if c.name == channel_name)
        except StopIteration:
            raise ValueError(f"Channel '{channel_name}' not found in the last segment.")

        # Extract timing details
        sampling_rate = channel.sampling_rate.magnitude.item()
        dt = 1 / sampling_rate
        t_start = channel.t_start.magnitude
        t_stop = last_channel.t_stop.magnitude  # Use last_segment's channel

        # Generate time vector
        if (t_stop - t_start) % dt <= dt / 2:
            t_stop -= 0.51 * dt
        time_vector = np.arange(t_start, t_stop, dt)

        # Build signal array using the index from the first segment, and extract events
        signal, events= self._scan_segments(channel_index, channel_name, time_vector)

        # Return processed channel
        return Channel(channel_name, signal, sampling_rate, time_vector, events)



    def _scan_segments(
        self, 
        channel_index: int, 
        channel_name: str, 
        time_vector: np.ndarray
    ) -> Tuple[np.ndarray, Dict[str, Any]]:
        """Construct a continuous signal array from multiple Neo segments.

        Iterates through all segments in the ephys block, extracting signal
        data and events, then concatenates them into continuous arrays.

        Args:
            channel_index: Index of the channel within each segment's analogsignals.
            channel_name: Name of the channel being processed.
            time_vector: Pre-computed time vector for the full recording.

        Returns:
            Tuple of (signal, events) where signal is a 1D numpy array and
            events is a dict with 'labels' and 'timestamps' keys.
        """
        n_points = len(time_vector)
        signal = np.full(n_points, np.nan)
        # events = []

        unsorted_labels = []
        unsorted_timestamps = []

        for seg in self.ephys_block.segments:


            # signal processing

            sig = seg.analogsignals[channel_index]
            signal_data = sig.magnitude.squeeze()  # NEW: Flatten to 1D
            # Calculate start/end indices
            start_idx = np.argmin(np.abs(time_vector - sig.t_start.magnitude))
            end_idx = start_idx + signal_data.size  # Use flattened data size
            # Avoid overfilling
            end_idx = min(end_idx, n_points)  # NEW: Prevent index overflow
            if start_idx > 0 and np.isnan(signal[start_idx - 1]):
                self._interpolate_missing_data(channel_name, signal, int(start_idx), time_vector, sig.t_start.magnitude)
            # Assign flattened data
            signal[start_idx:end_idx] = signal_data[:end_idx-start_idx]  # MODIFIED


            # PREVIOUSLY IMPORTNEURALYNXEVENTS()
            # event processing Luke's: 
            # for event in seg.events:
            #     for t in event.times:
            #         events.append((event.name, t.magnitude.item()))



            for e in seg.events:
                for k, l in enumerate(e.labels.astype(str)):
                    unsorted_labels.append(l)
                    unsorted_timestamps.append(e.times[k].magnitude)
        # Sort all of the events
        np_unsorted_labels = np.array(unsorted_labels)
        np_unsorted_timestamps = np.array(unsorted_timestamps)
        reordered_indices = np.argsort(np_unsorted_timestamps)
        event_labels = np_unsorted_labels[reordered_indices]
        event_timestamps = np_unsorted_timestamps[reordered_indices] # - self.zeroTime[next(iter(self.zeroTime))]

        events = {
            'labels': event_labels,
            'timestamps': event_timestamps
        }

        return signal, events

    def _interpolate_missing_data(
        self, 
        channel_name: str, 
        signal: np.ndarray, 
        start_idx: int, 
        time_vector: np.ndarray, 
        t_start: float
    ) -> None:
        """Fill gaps between segments with linear interpolation.

        When segments don't perfectly align, this fills NaN regions with
        linearly interpolated values to create a continuous signal.

        Args:
            channel_name: Name of channel (for logging).
            signal: Signal array to modify (in-place).
            start_idx: Index where the new segment starts.
            time_vector: Full recording time vector.
            t_start: Start time of the new segment.
        """
        interp_start = np.where(np.isnan(signal))[0][0]
        interp_length = start_idx - interp_start
        x = np.linspace(signal[interp_start - 1], signal[interp_start], interp_length + 2)
        signal[interp_start:start_idx] = x[1:-1]



    def _fill_gaps(self, mask: np.ndarray, dt: float, threshold: float) -> np.ndarray:
        """Extend artifact mask to fill short gaps between detected artifacts.

        Prevents fragmented artifact detection by connecting nearby regions.

        Args:
            mask: Boolean array marking artifact samples.
            dt: Sample interval in seconds.
            threshold: Maximum gap duration (seconds) to fill.

        Returns:
            Modified mask with short gaps filled.
        """
        diff = np.diff(mask.astype(int))
        starts = np.where(diff == -1)[0]

        for start in starts:
            end = np.where(diff[start:] == 1)[0]
            if end.size > 0 and (end[0] * dt) < threshold:
                mask[start:start + end[0] + 1] = True
        return mask

    def _create_hann_window(self, size: int) -> np.ndarray:
        """Create an inverted Hann window for artifact smoothing.

        The window is inverted (1 - hann) so that artifact regions are
        attenuated while preserving surrounding signal.

        Args:
            size: Number of samples in the window.

        Returns:
            1D numpy array containing the inverted Hann window.
        """
        window = hann(size)
        return np.abs(window - 1)

    def _apply_hann_window(self, channel: Channel, mask: np.ndarray, window: np.ndarray, dt: float) -> None:
        """Apply Hann window smoothing to artifact regions in the signal.

        Multiplies signal values in and around artifact regions by the
        Hann window to create smooth transitions.

        Args:
            channel: Channel object with signal to modify (in-place).
            mask: Boolean array marking artifact samples.
            window: Pre-computed Hann window array.
            dt: Sample interval in seconds.
        """
        half_len = len(window) // 2
        indices = np.where(mask)[0]

        for idx in indices:
            start = max(0, idx - half_len)
            end = min(len(channel.signal), idx + half_len + 1)
            segment = channel.signal[start:end]
            channel.signal[start:end] = segment * window[:len(segment)]

__init__(ephys_block, logger)

Initialize a BlockProcessor with an ephys Block.

Parameters:

Name Type Description Default
ephys_block Block

Neo Block object containing raw ephys data.

required
logger Logger

Logger instance for debug/info messages.

required
Source code in src/ace_neuro/ephys/block_processor.py
def __init__(self, ephys_block: Block, logger: logging.Logger):
    """Initialize a BlockProcessor with an ephys Block.

    Args:
        ephys_block: Neo Block object containing raw ephys data.
        logger: Logger instance for debug/info messages.
    """
    self.logger = logger
    self.ephys_block = ephys_block

process_raw_ephys(channels, remove_artifacts=False)

Convert raw ephys data into processed Channel objects.

Iterates through requested channel names, extracts signal data from all segments, and optionally removes artifacts.

Parameters:

Name Type Description Default
channels Union[str, List[str]]

Channel name string or list of channel names to process.

required
remove_artifacts bool

If True, apply artifact removal to each channel.

False

Returns:

Type Description
Dict[str, Channel]

Dict mapping channel names to Channel objects.

Raises:

Type Description
ValueError

If ephys_block has not been loaded.

Source code in src/ace_neuro/ephys/block_processor.py
def process_raw_ephys(
    self, 
    channels: Union[str, List[str]], 
    remove_artifacts: bool = False
) -> Dict[str, Channel]:
    """Convert raw ephys data into processed Channel objects.

    Iterates through requested channel names, extracts signal data from
    all segments, and optionally removes artifacts.

    Args:
        channels: Channel name string or list of channel names to process.
        remove_artifacts: If True, apply artifact removal to each channel.

    Returns:
        Dict mapping channel names to Channel objects.

    Raises:
        ValueError: If ephys_block has not been loaded.
    """
    if not self.ephys_block:
        raise ValueError("Load raw data first using EphysDataManager.import_ephys_data()")

    if type(channels) == str:
        channels = [channels]

    print('Processing raw ephys data into channels...')

    channels_dict = {}
    print(f"channels: {channels}")

    for channel_name in channels:
        # self.logger.info(f"channel_name = {channel_name}")
        print(f"channel_name = {channel_name}")
        assert isinstance(channel_name, str)
        new_channel = self._process_single_channel(channel_name)

        if remove_artifacts:
            self.remove_artifacts(new_channel)

        channels_dict[channel_name] = new_channel

    return channels_dict

remove_artifacts(channel, volt_threshold=1500, time_threshold=60, hannNum=75)

Remove high-amplitude artifacts from a channel using Hann window smoothing.

Identifies samples exceeding the voltage threshold, fills short gaps between artifact regions, and applies a Hann window to smooth transitions.

Parameters:

Name Type Description Default
channel Channel

Channel object to process (modified in-place).

required
volt_threshold float

Voltage threshold in µV for artifact detection.

1500
time_threshold float

Maximum gap duration (seconds) to fill between artifacts.

60
hannNum int

Size of the Hann window for smoothing artifact edges.

75
Source code in src/ace_neuro/ephys/block_processor.py
def remove_artifacts(
    self, 
    channel: Channel, 
    volt_threshold: float = 1500, 
    time_threshold: float = 60, 
    hannNum: int = 75
) -> None:
    """Remove high-amplitude artifacts from a channel using Hann window smoothing.

    Identifies samples exceeding the voltage threshold, fills short gaps
    between artifact regions, and applies a Hann window to smooth transitions.

    Args:
        channel: Channel object to process (modified in-place).
        volt_threshold: Voltage threshold in µV for artifact detection.
        time_threshold: Maximum gap duration (seconds) to fill between artifacts.
        hannNum: Size of the Hann window for smoothing artifact edges.
    """
    print('Removing artifacts from ' + channel.name + '...')
    dt = channel.time_vector[1] - channel.time_vector[0]
    mean = np.mean(channel.signal)
    channel.signal = channel.signal - mean

    mask = np.abs(channel.signal) > volt_threshold
    mask = self._fill_gaps(mask, dt, time_threshold)
    han_window = self._create_hann_window(hannNum)

    self._apply_hann_window(channel, mask, han_window, dt)

Channel

ace_neuro.ephys.channel.Channel

Represents a single electrophysiology recording channel.

Stores signal data, timing information, and associated events for one channel of an ephys recording.

Attributes:

Name Type Description
name str

Channel identifier (e.g., "PFCLFPvsCBEEG").

signal ndarray

Raw signal data as numpy array.

sampling_rate float

Sampling frequency in Hz.

time_vector ndarray

Timestamps corresponding to each sample.

events Dict[str, Any]

Dict containing event labels and timestamps.

signal_filtered Optional[ndarray]

Filtered signal data (set after filtering).

phases Optional[ndarray]

Instantaneous phase values (set after phase computation).

Source code in src/ace_neuro/ephys/channel.py
class Channel:
    """Represents a single electrophysiology recording channel.

    Stores signal data, timing information, and associated events for one
    channel of an ephys recording.

    Attributes:
        name: Channel identifier (e.g., "PFCLFPvsCBEEG").
        signal: Raw signal data as numpy array.
        sampling_rate: Sampling frequency in Hz.
        time_vector: Timestamps corresponding to each sample.
        events: Dict containing event labels and timestamps.
        signal_filtered: Filtered signal data (set after filtering).
        phases: Instantaneous phase values (set after phase computation).
    """

    name: str
    signal: np.ndarray
    sampling_rate: float
    time_vector: np.ndarray
    events: Dict[str, Any]
    signal_filtered: Optional[np.ndarray]
    phases: Optional[np.ndarray]

    def __init__(self, name: str, signal: np.ndarray, sampling_rate: float, time_vector: np.ndarray, events: Dict[str, Any]):
        """Initialize a Channel with signal data and metadata.

        Args:
            name: Channel identifier string.
            signal: 1D numpy array of signal values.
            sampling_rate: Sampling frequency in Hz.
            time_vector: 1D numpy array of timestamps (same length as signal).
            events: Dict with 'labels' and 'timestamps' arrays for event markers.
        """
        self.name = name
        self.signal = signal
        self.sampling_rate = sampling_rate
        self.time_vector = time_vector
        self.events = events
        self.signal_filtered = None
        self.phases = None

__init__(name, signal, sampling_rate, time_vector, events)

Initialize a Channel with signal data and metadata.

Parameters:

Name Type Description Default
name str

Channel identifier string.

required
signal ndarray

1D numpy array of signal values.

required
sampling_rate float

Sampling frequency in Hz.

required
time_vector ndarray

1D numpy array of timestamps (same length as signal).

required
events Dict[str, Any]

Dict with 'labels' and 'timestamps' arrays for event markers.

required
Source code in src/ace_neuro/ephys/channel.py
def __init__(self, name: str, signal: np.ndarray, sampling_rate: float, time_vector: np.ndarray, events: Dict[str, Any]):
    """Initialize a Channel with signal data and metadata.

    Args:
        name: Channel identifier string.
        signal: 1D numpy array of signal values.
        sampling_rate: Sampling frequency in Hz.
        time_vector: 1D numpy array of timestamps (same length as signal).
        events: Dict with 'labels' and 'timestamps' arrays for event markers.
    """
    self.name = name
    self.signal = signal
    self.sampling_rate = sampling_rate
    self.time_vector = time_vector
    self.events = events
    self.signal_filtered = None
    self.phases = None