Source code for MEArec.drift_tools

import numpy as np
import scipy.signal


[docs]def generate_drift_dict_from_params( drift_fs, duration, template_locations, external_drift_vector_um=None, external_drift_times=None, t_start_drift=None, t_end_drift=None, drift_mode_probe="rigid", drift_mode_speed="slow", non_rigid_gradient_mode="linear", non_rigid_linear_direction=1, non_rigid_linear_min_factor=0.5, non_rigid_step_depth_boundary=None, non_rigid_step_factors=None, preferred_dir=[0, 0, 1], slow_drift_velocity=5, slow_drift_waveform="triangluar", slow_drift_amplitude=None, fast_drift_period=10.0, fast_drift_max_jump=20.0, fast_drift_min_jump=5.0, external_drift_factors=None, ): """ Generate for each units a drift position vector. The output vector handle of indexes for drift steps along time. Parameters ---------- drift_fs: float Sampling frequency in Hz duration: float Duration drift vector in s template_locations: np.array 3d Shape: (num_cells, drift_steps, 3) Template locations for every drift steps external_drift_vector_um : 1d array or None External drift signal to apply. It needs to be consistent with drift_fs and duration (or external_drift_times) external_drift_times : 1d array or None External drift times to use for external_drift_signal_um. t_start_drift: float When the drift start in second drift_mode_probe: 'rigid' or 'non-rigid' Global drift or per cell. drift_mode_speed: str 'slow', 'fast', 'slow+fast' Drift mode speed non_rigid_gradient_mode: 'linear' | 'step' Mode to generate non rigid gradient over units on velocity. If 'linear', drift factors (0.5-1) are assigned to each cells linearly with depth (deeper cells have more drift). non_rigid_linear_direction : int Direction of linear gradient. Only used if 'drift_mode_probe' is 'non-rigid' and 'non_rigid_gradient_mode' is 'linear' If positive, deeper cells drift more than surface cells. If negative, deeper cells drift more than surface cells. Zero is not allowed, by default 1 non_rigid_linear_min_factor: float (default 0.5) When non_rigid_gradient_mode='linear' this control the minimum factor. The max is always 1. non_rigid_step_depth_boundary : float or None Depth boundary in the preferred_dir dimension to apply the drift step, by default is half of the template locations extensions. Only used if 'drift_mode_probe' is 'non-rigid' and 'non_rigid_gradient_mode' is 'step' non_rigid_step_factors : None or tuple/list of two elements Factors to apply to cells above (non_rigid_step_factors[0]) and below (non_rigid_step_factors[1]) step depth boundary. Default is (1, 0). Only used if 'drift_mode_probe' is 'non-rigid' and 'non_rigid_gradient_mode' is 'step' preferred_dir: list of int Use for gradient when non rigid mode. slow_drift_velocity: float Slow drift velocity in um/min. slow_drift_waveform: 'trianglar' or 'sine' Waveform shape for slow drift fast_drift_period: float Period between fast drift events fast_drift_max_jump: float Maximum 'jump' in um for fast drifts fast_drift_min_jump: float Minimum 'jump' in um for fast drifts external_drift_factors: None If templates are selected externally (and template_ids) is used in the generate_recordings() function, this vector specifies the factor (0-1) for each cell. Returns ------- drift_list: list of dicts Each dict contains info for a drift signal: * "drift_times" (1d array): times for different drift vectors. (Only for external times) * "drift_fs" (float): sampling frequency * "drift_vector_um" (1d array): drift signals in um * "drift_vector_idxs" (1d array): drift signals in template idxs (with respect to middle step) * "drift_factor" (1d array): array with gradient for each unit for the drift signal vector. For rigid motion, all values are 1. """ num_cells, drift_steps, _ = template_locations.shape n_samples = int(drift_fs * duration) drift_distances = np.linalg.norm(template_locations[:, -1] - template_locations[:, 0]) mean_distance = np.mean(drift_distances) assert np.max(drift_distances - mean_distance) < 0.05 * mean_distance, ( "Drift distances are not constant across templates. You can control this by setting " "'max_drift' and 'min_drift' to the same value and 'drift_zlim = [desired_drift, desired_drift]' " "in the template parameters." ) # velocity and jump are computed on bigger drift over cell loc0 = template_locations[:, 0, :] loc1 = template_locations[:, -1, :] dist = np.sum((loc0 - loc1) ** 2, axis=1) ** 0.5 dist_boundary = np.min(dist) step = dist_boundary / drift_steps if external_drift_vector_um is None: assert drift_mode_speed in ["slow", "fast"] t_start_drift = 0.0 if t_start_drift is None else t_start_drift t_end_drift = duration if t_end_drift is None else t_end_drift assert t_start_drift < duration, f"'t_start_drift' must preceed 'duration'!" assert t_end_drift <= duration, f"'t_end_drift' must preceed 'duration'!" start_drift_index = int(t_start_drift * drift_fs) end_drift_index = int(t_end_drift * drift_fs) drift_vector_um = np.zeros(n_samples, dtype="float32") # drift times is only used for external drift that are not defined over the entire recording drift_times = None if drift_mode_speed == "slow": if slow_drift_amplitude is None: slow_drift_amplitude = dist_boundary # compute half period for a regular drift speed half_period = slow_drift_amplitude / (slow_drift_velocity / 60) # triangle / sine frequency depends on the velocity freq = 1.0 / (2 * half_period) drift_times = np.arange(end_drift_index - start_drift_index) / drift_fs if slow_drift_waveform == "triangluar": triangle = np.abs(scipy.signal.sawtooth(2 * np.pi * freq * drift_times + np.pi / 2)) triangle *= slow_drift_amplitude triangle -= slow_drift_amplitude / 2.0 drift_vector_um[start_drift_index:end_drift_index] = triangle drift_vector_um[end_drift_index:] = triangle[-1] elif slow_drift_waveform == "sine": sine = np.cos(2 * np.pi * freq * drift_times + np.pi / 2) sine *= slow_drift_amplitude / 2.0 drift_vector_um[start_drift_index:end_drift_index] = sine drift_vector_um[end_drift_index:] = sine[-1] else: raise NotImplementedError("slow_drift_waveform") else: # 'fast' in drift_mode_speed: period_samples = int(fast_drift_period * drift_fs) n = int(np.round((end_drift_index - start_drift_index) / period_samples)) t_idx = start_drift_index + period_samples for i in range(1, n): jump = np.random.rand() * (fast_drift_max_jump - fast_drift_min_jump) + fast_drift_min_jump if np.random.rand() > 0.5: jump = -jump # protect from boundaries if jump > 0: if (np.max(drift_vector_um[t_idx : t_idx + period_samples]) + jump) >= dist_boundary / 2.0: jump = -jump # protect from boundaries if jump < 0: if (np.min(drift_vector_um[t_idx : t_idx + period_samples]) + jump) <= -dist_boundary / 2.0: jump = -jump drift_vector_um[t_idx:] += jump t_idx += period_samples else: if external_drift_times is None: assert ( len(external_drift_vector_um) == n_samples ), f"'external_drift_times' must have a length of {n_samples} (duration * drift_fs)." else: assert all(np.diff(external_drift_times) > 0), "'external_drift_times' should be monotonically increasing" assert external_drift_times[-1] <= duration, "'external_drift_times' cannot exceed duration" assert len(external_drift_times) == len( external_drift_vector_um ), "'external_drift_times' and 'external_drift_vector_um' must have the same length" drift_vector_um = external_drift_vector_um drift_vector_um *= 0.99999 drift_times = external_drift_times if external_drift_factors is None: if drift_mode_probe == "rigid": drift_factors = np.ones(num_cells) elif drift_mode_probe == "non-rigid": assert non_rigid_gradient_mode in ("linear", "step") # project locations over preferred direction preferred_dir = np.array(preferred_dir).reshape(-1, 1) locs = template_locations[:, drift_steps // 2, :] proj = np.dot(locs, preferred_dir).squeeze() if non_rigid_gradient_mode == "linear": assert 0 <= non_rigid_linear_min_factor < 1, "non_rigid_linear_min_factor must between 0 and 1" assert non_rigid_linear_direction in (1, -1, 1.0, -1.0), "non_rigid_linear_direction must be 1 or -1" # vector with shape (num_cells, ) and value between 0.5 and 1 which is a factor of the velocity # the upper units on the 'preferred_dir' vector get 0.5 the lower get 1 print(f"Linear gradient with Min: {non_rigid_linear_min_factor} Dir: {non_rigid_linear_direction}") drift_factors = (proj - np.min(proj)) / (np.max(proj) - np.min(proj)) if non_rigid_linear_direction > 0: drift_factors = 1 - drift_factors f = non_rigid_linear_min_factor drift_factors = drift_factors * (1 - f) + f elif non_rigid_gradient_mode == "step": if non_rigid_step_depth_boundary is None: non_rigid_step_depth_boundary = np.mean(proj) else: assert ( np.min(proj) < non_rigid_step_depth_boundary < np.max(proj) ), f"'non_rigid_step_depth_boundary' needs to be in between {np.min(proj)} and {np.max(proj)}" if non_rigid_step_factors is None: non_rigid_step_factors = (1, 0) else: assert len(non_rigid_step_factors) == 2, "'non_rigid_step_factors' needs two values" assert ( 0 <= non_rigid_step_factors[0] <= 1 ), "'non_rigid_step_factors' needs to be in the [0, 1] range" assert ( 0 <= non_rigid_step_factors[1] <= 1 ), "'non_rigid_step_factors' needs to be in the [0, 1] range" drift_factors = non_rigid_step_factors[0] * np.ones(num_cells) drift_factors[proj < non_rigid_step_depth_boundary] = non_rigid_step_factors[1] else: assert ( len(external_drift_factors) == num_cells ), f"'external_drift_factors' must be defined for all {num_cells} cells" assert all(0 <= d <= 1 for d in external_drift_factors), "'external_drift_factors' must be in the [0, 1] range" drift_factors = external_drift_factors # avoid possible clipping drift_vector_um = np.clip(drift_vector_um, -dist_boundary / 2, dist_boundary / 2) # avoid boundary effect drift_vector_um *= 0.99999 # shift with respect to mid point in int16 drift_vector_idxs = (np.floor(drift_vector_um / step)).astype("int16") drift_dict = {} drift_dict["drift_vector_um"] = drift_vector_um drift_dict["drift_vector_idxs"] = drift_vector_idxs drift_dict["drift_fs"] = drift_fs drift_dict["drift_times"] = drift_times drift_dict["drift_factors"] = drift_factors drift_dict["drift_steps"] = drift_steps return drift_dict
def test_generate_drift_position_vector(): fs = 2.0 duration = 600 num_cells = 4 drift_steps = 21 template_locations = np.zeros((num_cells, drift_steps, 3)) # one cells every 30 um template_locations[:, :, 2] = np.arange(num_cells).reshape(-1, 1) * 25.0 # drift step 1.5 um of Z template_locations[:, :, 2] += np.arange(drift_steps) * 1.5 drift_dict = generate_drift_dict_from_params( drift_fs=fs, template_locations=template_locations, duration=duration, t_start_drift=10.0, # drift_mode_probe='rigid', drift_mode_probe="non-rigid", # drift_mode_speed='slow+fast', # drift_mode_speed='fast', drift_mode_speed="slow", non_rigid_gradient_mode="linear", preferred_dir=[0, 0, 1], # slow_drift_velocity=5., # slow_drift_waveform='triangluar', slow_drift_velocity=17, slow_drift_waveform="sine", fast_drift_period=120.0, fast_drift_max_jump=5, fast_drift_min_jump=1.0, ) drift_vectors = drift_dict["drift_vectors"] drift_fs = drift_dict["drift_fs"] times = np.arange(int(fs * duration)) / drift_fs print(drift_vectors.shape) import matplotlib.pyplot as plt fig, ax = plt.subplots() ax.plot(times, drift_vectors) ax.axhline(drift_steps - 1, ls="--", color="k") plt.ion() plt.show() if __name__ == "__main__": test_generate_drift_position_vector()