import numpy as np
from scipy.fft import next_fast_len
from datetime import datetime
from dateutil.tz import tzlocal
from pynwb import NWBFile
[docs]def dtype(X, precision):
"""Return the type to cast to depending on precision and whether `X` is complex.
Parameters
----------
X : ndarray
Input data.
precision : str
Either `single` for float32/complex64 or `double` for float/complex.
"""
precision = precision.lower()
if precision not in ['single', 'double']:
raise ValueError(f'`precision` should be either `single` or `double`. Got {precision}.')
if np.iscomplexobj(X):
if precision == 'single':
return np.complex64
else:
return complex
else:
if precision == 'single':
return np.float32
else:
return float
[docs]def log_spaced_cfs(fmin, fmax, ncfs):
"""Return log-spaced center frequencies.
Parameters
----------
fmin : float
Minimum center frequency.
fmax : float
Maximum center frequency.
ncfs : int
Number of bands to generate.
"""
return np.logspace(np.log10(fmin), np.log10(fmax), ncfs)
[docs]def const_Q_sds(cfs, Q=8):
"""Compute constant Q bandwidths for a set of center frequencies.
Parameters
----------
cfs : ndarray
Center frequencies.
Q : float
Q value for the wavelet. Default = 8.
"""
return cfs / Q
[docs]def chang_sds(cfs):
"""Compute variable bandwidths for a set of center frequencies used by the Chang lab.
Parameters
----------
cfs : ndarray
Center frequencies.
Q : float
Q value for the wavelet. Default = 8.
"""
scale = 0.39
return 10. ** (np.log10(scale) + .5 * (np.log10(cfs))) * np.sqrt(2.)
"""
The following code is based on MNE-Python
Copyright © 2011-2019, authors of MNE-Python
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
def _npads(X, npad, ratio=1.):
"""Calculate padding parameters.
"""
n_time = X.shape[0]
bad_msg = 'npad must be "auto", "fast", or an integer'
if isinstance(npad, str):
if npad == 'auto':
# Figure out reasonable pad that gets us to a power of 2
min_add = min(n_time // 8, 100) * 2
npad = 2 ** int(np.ceil(np.log2(n_time + min_add))) - n_time
npad, extra = divmod(npad, 2)
npads = np.array([npad, npad + extra], int)
elif npad == 'fast':
npad = next_fast_len(n_time) - n_time
npad, extra = divmod(npad, 2)
npads = np.array([npad, npad + extra], int)
else:
raise ValueError(bad_msg)
else:
if npad != int(npad):
raise ValueError(bad_msg)
npads = np.array([npad, npad], int)
# prep for resampling now
orig_len = n_time + npads.sum() # length after padding
new_len = int(round(ratio * orig_len)) # length after resampling
final_len = int(round(ratio * n_time))
to_removes = [int(round(ratio * npads[0]))]
to_removes.append(new_len - final_len - to_removes[0])
to_removes = np.array(to_removes)
# This should hold:
# assert np.abs(to_removes[1] - to_removes[0]) <= int(np.ceil(ratio))
return npads, to_removes, new_len
def _trim(X, to_removes):
"""Trim the padding.
"""
if (to_removes > 0).any():
n_times = X.shape[0]
X = X[to_removes[0]:n_times - to_removes[1]]
return X
def _smart_pad(X, npads, pad='reflect_limited'):
"""Pad vector X.
"""
other_shape = X.shape[1:]
npads = np.asarray(npads)
assert npads.shape == (2,)
if (npads == 0).all():
return X
elif (npads < 0).any():
raise RuntimeError('npad must be non-negative')
if pad == 'reflect_limited':
# need to pad with zeros if len(x) <= npad
l_z_pad = np.zeros((max(npads[0] - len(X) + 1, 0),) + other_shape, dtype=X.dtype)
r_z_pad = np.zeros((max(npads[1] - len(X) + 1, 0),) + other_shape, dtype=X.dtype)
return np.concatenate([l_z_pad, 2 * X[[0]] - X[npads[0]:0:-1], X,
2 * X[[-1]] - X[-2:-npads[1] - 2:-1], r_z_pad], axis=0)
else:
return np.pad(X, (tuple(npads), 0), pad)
[docs]def generate_synthetic_data(duration, nchannels, rate, high_gamma=True,
linenoise=True, seed=0):
"""Generate synthetic data by smoothing white noise with a boxcar for
testing or tutorials. Optional high gamma and 60 Hz linenoise can be added.
Parameters
----------
duration : float
Data duration in seconds.
nchannels : int
Number of channels
rate : float
Sampling rate in Hz
high_gamma : bool
If True, include extra modulating power in the high gamma range.
linenoise : bool
If True, include 60 Hz linenoise.
Returns
-------
neural_data : ndarray (time, channels)
Synthetic neural data
"""
kernel_length = 50
rng = np.random.default_rng(seed=seed)
neural_data = rng.standard_normal((int(duration * rate), nchannels)) / 100.
kernel = np.ones(kernel_length) / kernel_length
for ch in range(nchannels):
neural_data[:, ch] = np.convolve(neural_data[:, ch], kernel, mode='same')
neural_data /= neural_data.std() * 2.
if high_gamma or linenoise:
t = np.linspace(0, duration, neural_data.shape[0])[:, np.newaxis]
if high_gamma:
# Add a high gamma signal at 100 Hz that modulates at 2 Hz
phase = 2 * np.pi * rng.random(nchannels)[np.newaxis]
high_gamma = np.sin(2 * np.pi * t * 100. + phase)
phase = 2 * np.pi * rng.random(nchannels)[np.newaxis]
high_gamma *= np.sin(2 * np.pi * t * 1. + phase)**2 + 0.2
neural_data += high_gamma
if linenoise:
# Add common noise but with different weights to each channel
weights = rng.standard_normal((1, nchannels))
if rate > 120.:
for ii, hz in enumerate(np.arange(60., rate, 60.)):
line_noise = np.sin(2 * np.pi * t * hz) / 2. ** (ii + 1)
neural_data += line_noise * weights
return neural_data
[docs]def generate_nwbfile(nchannels=4):
"""Generate an `NWBFile` object that an `ElectricalSeries` can be added to.
Returns
-------
nwbfile : NWBFile
NWBFile object
Device : Device
Device for the ElectricalSeries
electrode_group : ElectrodeGroup
ElectrodeGroup for the ElectricalSeries
electrodes : Electrodes
Electrodes for the ElectricalSeries
"""
start_time = datetime(2020, 12, 31, 11, 28, tzinfo=tzlocal())
nwbfile = NWBFile(session_description='Demonstrate `process_nwb` on an NWBFile',
identifier='NWB123',
session_start_time=start_time)
device = nwbfile.create_device(name='ECoG_grid')
electrode_group = nwbfile.create_electrode_group('Grid',
description='Grid',
location='cortex',
device=device)
for idx in range(nchannels):
nwbfile.add_electrode(id=idx,
x=1.0, y=2.0, z=3.0,
imp=float(-idx),
location='cortex', filtering='none',
group=electrode_group)
electrodes = nwbfile.create_electrode_table_region(list(range(nchannels)), 'Electrodes')
return nwbfile, device, electrode_group, electrodes