Open RicardoMBorges opened 9 months ago
I've attempted to refactor the entire code, but due to time constraints, I haven't been able to do so. However, this should still function properly. Can you confirm this @RicardoMBorges ?
from __future__ import division, print_function
import numpy
import scipy as sp
from scipy.sparse import dia_matrix
from numpy import nanmean, nanmedian
from copy import copy
import sys
import logging
try:
basestring
except NameError:
basestring = str
def is_number(s):
try:
if isinstance(s, basestring):
if s.isdigit():
return True
else:
float(s)
return True
elif isinstance(s, int):
return True
elif isinstance(s, list) and len(s) != 0:
if(all(isinstance(x, (int, float)) for x in s)):
# l = [n for n in s if n.isdigit()]
# if len(l) == len(s):
return True
else:
float(s)
return True
except ValueError:
return False
def cat(dim, *args):
# type: (object, object) -> object
try:
return numpy.concatenate([r for r in args if r.shape[0] > 0], axis=dim)
except:
return numpy.stack([r for r in args if r.shape[0]>0], axis=-1)
def sortrows(a, i):
i = numpy.argsort(a[:, i])
b = a[i, :]
return b
def nans(r, c):
a = numpy.empty((r, c))
a[:] = numpy.nan
return a
def min_with_indices(d):
d = d.flatten()
ml = numpy.min(d)
mi = numpy.array(list(d).index(ml))
return ml, mi
def max_with_indices(d):
d = d.flatten()
ml = numpy.max(d)
mi = numpy.array(list(d).index(ml))
return ml, mi
def icoshift(xt, xp, inter='whole', n='f', scale=None, coshift_preprocessing=False,
coshift_preprocessing_max_shift=None, fill_with_previous=True, average2_multiplier=3):
# RETURNS [xcs, ints, ind, target]
# Take a copy of the xp vector since we mangle it somewhat
xp = copy(xp)
if scale is None:
using_custom_scale = False
scale = numpy.array(range(0, xp.shape[1]))
else:
using_custom_scale = True
dec_scale = numpy.diff(scale)
inc_scale = scale[0] - scale[1]
flag_scale_dir = inc_scale < 0
flag_di_scale = numpy.any(abs(dec_scale) > 2 * numpy.min(abs(dec_scale)))
if len(scale) != max(scale.shape):
raise(Exception, 'scale must be a vector')
if max(scale.shape) != xp.shape[1]:
raise(Exception, 'x and scale are not of compatible length %d vs. %d' % (max(scale.shape), xp.shape[1]))
if inc_scale == 0 or not numpy.all(numpy.sign(dec_scale) == - numpy.sign(inc_scale)):
raise(Exception, 'scale must be strictly monotonic')
if coshift_preprocessing_max_shift is None:
coshift_preprocessing_max_shift = n
block_size = 2 ** 25
max_flag = False
avg2_flag = False
xt_basis = xt
if isinstance(xt, str):
if xt == 'average':
xt = nanmean(xp, axis=0).reshape(1, -1)
elif xt == 'median':
xt = nanmedian(xp, axis=0).reshape(1, -1)
elif xt == 'average2':
xt = nanmean(xp, axis=0).reshape(1,-1)
avg2_flag = True
elif xt == 'max':
xt = numpy.zeros((1, xp.shape[1]))
max_flag = True
nt, mt = xt.shape
np, mp = xp.shape
if mt != mp:
raise(Exception, 'Target "xt" and sample "xp" must have the same number of columns')
if is_number(inter):
try:
if isinstance(inter, basestring):
if inter.isdigit():
inter = int(inter)
else:
inter = float(inter)
if inter > mp:
raise (Exception, 'Number of intervals "inter" must be smaller than number of variables in xp')
except:
if (all(a >= mp for a in inter)):
raise(Exception, 'Number of intervals "inter" must be smaller than number of variables in xp')
# Set defaults if the settings are not set
# options = [options[oi] if oi < len(options) else d for oi, d in enumerate([1, 1, 0, 0, 0]) ]
if using_custom_scale:
prec = abs(numpy.min(numpy.unique(dec_scale)))
if flag_di_scale:
logging.warn('Scale vector is not continuous, the defined intervals might not reflect actual ranges')
flag_coshift = (not inter == 'whole') and coshift_preprocessing
if flag_coshift:
if using_custom_scale:
coshift_preprocessing_max_shift = dscal2dpts(coshift_preprocessing_max_shift, scale, prec)
if max_flag:
xt = nanmean(xp, axis=0).reshape(1,-1)
co_shift_scale = scale if using_custom_scale else None
xp, nil, wint, _ = icoshift(xt, xp, 'whole',
coshift_preprocessing=False,
coshift_preprocessing_max_shift=coshift_preprocessing_max_shift,
scale=co_shift_scale,
fill_with_previous=True,
average2_multiplier=average2_multiplier )
if xt_basis == 'average' or xt_basis == 'average2':
xt = nanmean(xp).reshape(1, -1)
elif xt_basis == 'median':
xt = nanmedian(xp).reshape(1, -1)
else: # max?
xt = xt.reshape(1,-1)
whole = False
flag2 = False
if isinstance(inter, basestring):
if inter == 'whole':
inter = numpy.array([0, mp - 1]).reshape(1, -1)
whole = True
elif '-' in inter:
interv = regexp(inter, '(-{0,1}\\d*\\.{0,1}\\d*)-(-{0,1}\\d*\\.{0,1}\\d*)', 'tokens')
interv = sort(scal2pts(float(cat(0, interv[:])), scale, prec))
if interv.size != 2:
raise(Exception, 'Invalid range for reference signal')
inter = range(interv[0], (interv[1] + 1))
flag2 = True
else:
interv = float(inter)
if using_custom_scale:
interv = dscal2dpts(interv, scale, prec)
else:
interv = int(round(interv))
inter = defints(xp, interv)
elif isinstance(inter, int):
remain = mp % inter
step = int( float(mp) / inter )
segments = []
o = 0
while o < mp:
segments.extend([o, o])
if remain > 0:
o += 1
remain -=1
o += step
# Chop of duplicate zero
segments = segments[1:]
segments.append(mp) # Add on final step
inter = numpy.array(segments, dtype=int).reshape(1, -1)
logging.info("Calculated intervals: %s" % inter)
elif isinstance(inter, (list, tuple)): # if is a list of tuples ; add else
inter = numpy.asarray(inter)
flag2 = numpy.array_equal(numpy.fix(inter), inter) and max(inter.shape) > 1 and numpy.array_equal(
numpy.array([1, numpy.max(inter) - numpy.min(inter) + 1]).reshape(1, -1), inter.shape) and numpy.array_equal(numpy.unique(numpy.diff(inter, 1, 2)), 1)
if not flag2 and using_custom_scale:
inter = scal2pts(inter, scale, prec)
if numpy.any(inter[0:2:] > inter[1:2:]) and not flag_scale_dir:
inter = numpy.flipud(numpy.reshape(inter, 2, max(inter.shape) / 2))
inter = inter[:].T
else:
raise(Exception, 'The number of intervals must be "whole", an integer, or a list of tuples of integers')
if(len(inter.shape) > 1):
nint, mint = inter.shape
else:
nint = 1
mint = inter.shape[0]
inter = inter.reshape(1,-1)
scfl = numpy.array_equal(numpy.fix(scale), scale) and not using_custom_scale
if isinstance(inter, basestring) and n not in ['b', 'f']:
raise(Exception, '"n" must be a scalar b or f')
elif isinstance(n, int) or isinstance(n, float):
if n <= 0:
raise(Exception, 'Shift(s) "n" must be larger than zero')
if scfl and not isinstance(n, int):
logging.warn('"n" must be an integer if scale is ignored; first element (i.e. %d) used' % n)
n = numpy.round(n)
else:
if using_custom_scale:
n = dscal2dpts(n, scale, prec)
if not flag2 and numpy.any(numpy.diff(numpy.reshape(inter, (2, mint // 2)), 1, 0) < n):
raise(Exception, 'Shift "n" must be not larger than the size of the smallest interval')
flag = numpy.isnan(cat(0, xt.reshape(1, -1), xp))
frag = False
ref = lambda e: numpy.reshape(e, (2, max(e.shape) // 2))
vec = lambda a: a.flatten()
mi, pmi = min_with_indices(inter)
ma, pma = max_with_indices(inter)
# There are missing values in the dataset; so remove them before starting
# if they line up between datasets
if vec(flag).any():
if numpy.array_equal(flag[numpy.ones((np, 1), dtype=int), :], flag[1:,:]):
select = numpy.any
else:
select = numpy.all
if flag2:
intern_ = remove_nan(
numpy.array([0, pma - pmi]).reshape(1, -1), cat(0, xt[:, inter], xp[:, inter]), select)
if intern_.shape[0] != 1:
raise(Exception, 'Reference region contains a pattern of missing that cannot be handled consistently')
elif not numpy.array_equal(intern_, numpy.array([1, inter[-2] - inter[0] + 1]).reshape(1, -1)):
logging.warn('The missing values at the boundaries of the reference region will be ignored')
intern_ = range(inter[0] + intern_[0], (inter[0] + intern_[1] + 1))
else:
intern_, flag_nan = remove_nan(
ref(inter), cat(0, xt, xp), select, flags=True)
intern_ = intern_.flatten()
if 0 in intern_.shape:
raise(Exception, 'Cannot handle this pattern of missing values.')
if max(intern_.shape) != max(inter.shape) and not flag2:
if whole:
if max(intern_.shape) > 2:
xseg, in_or = extract_segments(cat(0, xt, xp), ref(intern_))
InOrf = in_or.flatten()
inter = numpy.array([InOrf[0], InOrf[-1]]).reshape(1, -1) #check this
in_or = cat(1, ref(intern_), in_or)
xp = xseg[1:, :]
xt = xseg[0, :].reshape(1, -1)
frag = True
else:
logging.warn('To handle the pattern of missing values, %d segments are created/removed' % (abs(max(intern_.shape) - max(inter.shape)) / 2) )
inter = intern_
nint, mint = inter.shape
xcs = xp
mi, pmi = min_with_indices(inter)
ma, pma = max_with_indices(inter)
flag = max(inter.shape) > 1 and numpy.array_equal(numpy.array([1, pma - pmi + 1]).reshape(1, -1), numpy.array(inter.shape).reshape(1, -1)) \
and numpy.array_equal(numpy.unique(numpy.diff(inter.reshape(1,-1), 1, 1)),numpy.array([1]))
if flag:
if n == 'b':
logging.info('Automatic searching for the best "n" for the reference window "ref_w" enabled. That can take a longer time.')
elif n == 'f':
logging.info('Fast automatic searching for the best "n" for the reference window "ref_w" enabled.')
if max_flag:
amax, bmax = max_with_indices( numpy.sum(xp) )
xt[mi:ma] = xp[bmax, mi:ma]
ind = nans(np, 1)
missind = numpy.logical_not(numpy.all(numpy.isnan(xp), axis=1))
xcs[missind, :], ind[missind], _ = coshifta(xt, xp[missind,:], inter, n, fill_with_previous=fill_with_previous,
block_size=block_size)
ints = numpy.array([1, mi, ma]).reshape(1, -1)
else:
if mint > 1:
if mint % 2:
raise(Exception, 'Wrong definition of intervals ("inter")')
if ma > mp:
raise(Exception, 'Intervals ("inter") exceed samples matrix dimension')
# allint=[(1:round(mint/2))' inter(1:2:mint)' inter(2:2:mint)'];
# allint =
# 1 1 6555
# 2 6555 13109
# 3 13109 19663
# 4 19663 26216
# 5 26216 32768
# ans =
# 5 3
inter_list = list(inter.flatten())
allint = numpy.array([
range(mint//2),
inter_list[0::2],
inter_list[1::2],
])
allint = allint.T
sinter = numpy.sort(allint, axis=0)
intdif = numpy.diff(sinter)
if numpy.any(intdif[1:2:max(intdif.shape)] < 0):
logging.warn('The user-defined intervals are overlapping: is that intentional?')
ints = allint
ints = numpy.append(ints, ints[:, 2] - ints[:, 1])
ind = numpy.zeros((np, allint.shape[0]))
if n == 'b':
logging.info('Automatic searching for the best "n" for each interval enabled. This can take a long time...')
elif n == 'f':
logging.info('Fast automatic searching for the best "n" for each interval enabled')
for i in range(0, allint.shape[0]):
if whole:
logging.info('Co-shifting the whole %s samples...' % np)
else:
logging.info('Co-shifting interval no. %s of %s...' % (i, allint.shape[0]) )
# FIXME? 0:2, or 1:2?
intervalnow = xp[:, allint[i, 1]:allint[i, 2]]
if max_flag:
amax, bmax = max_with_indices(numpy.sum(intervalnow, axis=1))
target = intervalnow[bmax, :]
xt[0, allint[i, 1]:allint[i, 2]] = target
else:
target = xt[:, allint[i, 1]:allint[i, 2]]
missind = ~numpy.all(numpy.isnan(intervalnow), axis=1)
if not numpy.all(numpy.isnan(target)) and numpy.any(missind):
cosh_interval, loc_ind, _ = coshifta(target, intervalnow[missind, :], numpy.array([0]), n,
fill_with_previous=fill_with_previous, block_size=block_size)
xcs[missind, allint[i, 1]:allint[i, 2]] = cosh_interval
ind[missind, i] = loc_ind.flatten()
else:
xcs[:, allint[i, 1]:allint[i, 1]] = intervalnow
if avg2_flag:
for i in range(0, allint.shape[0]):
if whole:
logging.info('Co-shifting again the whole %d samples... ' % np)
else:
logging.info('Co-shifting again interval no. %d of %d... ' % (i, allint.shape[0]))
intervalnow = xp[:, allint[i, 1]:allint[i, 2]]
target1 = numpy.mean(xcs[:, allint[i, 1]:allint[i, 2]], axis=0)
min_interv = numpy.min(target1)
target = (target1 - min_interv) * average2_multiplier
missind = ~numpy.all(numpy.isnan(intervalnow), 1)
if (not numpy.all(numpy.isnan(target))) and (numpy.sum(missind) != 0):
cosh_interval, loc_ind, _ = coshifta(target, intervalnow[missind, :], 0, n,
fill_with_previous=fill_with_previous, block_size=block_size)
xcs[missind, allint[i, 1]:allint[i, 2]] = cosh_interval
xt[0, allint[i, 1]:allint[i, 2]] = target
ind[missind, i] = loc_ind.T
else:
xcs[:, allint[i, 1]:allint[i, 2]] = intervalnow
if frag:
xn = nans(np, mp)
for i_sam in range(0, np):
for i_seg in range(0, in_or.shape[0]):
xn[i_sam, in_or[i_seg, 0]:in_or[i_seg, 1]
+ 1] = xcs[i_sam, in_or[i_seg, 2]:in_or[i_seg, 3] + 1]
if loc_ind[i_sam] < 0:
if flag_nan[i_seg, 0, i_sam]:
xn[i_sam, in_or[i_seg, 0]:in_or[i_seg, 0]
- loc_ind[i_sam, 0] + 1] = numpy.nan
else:
if loc_ind[i_sam] > 0:
if flag_nan[i_seg, 1, i_sam]:
xn[i_sam, (int(in_or[i_seg, 1] - loc_ind[i_sam, 0] + 1)):in_or[i_seg, 1]+1] = numpy.nan
xcs = xn
target = xt
if flag_coshift:
ind = ind + wint * numpy.ones( (1, ind.shape[1]) )
return xcs, ints, ind, target
def coshifta(xt, xp, ref_w=0, n=numpy.array([1, 2, 3]), fill_with_previous=True, block_size=(2 ** 25)):
if len(ref_w) == 0 or ref_w.shape[0] == 0:
ref_w = numpy.array([0])
if numpy.all(ref_w >= 0):
rw = max(ref_w.shape)
else:
rw = 1
if fill_with_previous:
filling = -numpy.inf
else:
filling = numpy.nan
if isinstance(xt, str) and xt == 'average':
xt = nanmean(xp, axis=0)
# Make two dimensional
xt = xt.reshape(1, -1)
nt, mt = xt.shape
np, mp = xp.shape
if len(ref_w.shape) > 1:
nr, mr = ref_w.shape
else:
nr, mr = ref_w.shape[0], 0
logging.info('mt=%d, mp=%d' % (mt, mp))
if mt != mp:
raise(Exception, 'Target "xt" and sample "xp" must be of compatible size (%d, %d)' % (mt, mp) )
if not isinstance(n, str) and numpy.any(n <= 0):
raise(Exception, 'shift(s) "n" must be larger than zero')
if nr != 1:
raise(Exception, 'Reference windows "ref_w" must be either a single vector or 0')
if rw > 1 and (numpy.min(ref_w) < 1) or (numpy.max(ref_w) > mt):
raise(Exception, 'Reference window "ref_w" must be a subset of xp')
if nt != 1:
raise(Exception, 'Target "xt" must be a single row spectrum/chromatogram')
auto = 0
if n == 'b':
auto = 1
if rw != 1:
n = int(0.05 * mr)
n = 10 if n < 10 else n
src_step = int(mr * 0.05)
else:
n = int(0.05 * mp)
n = 10 if n < 10 else n
src_step = int(mp * 0.05)
try_last = 0
elif n == 'f':
auto = 1
if rw != 1:
n = mr - 1
src_step = numpy.round(mr / 2) - 1
else:
n = mp - 1
src_step = numpy.round(mp / 2) - 1
try_last = 1
if nt != 1:
raise(Exception, 'ERROR: Target "xt" must be a single row spectrum/chromatogram')
xw = nans(np, mp)
ind = numpy.zeros((1, np))
n_blocks = int(numpy.ceil(sys.getsizeof(xp) / block_size))
sam_xblock = numpy.array([int(np / n_blocks)])
sam_xblock = sam_xblock.T
ind_blocks = sam_xblock[numpy.ones(n_blocks, dtype=bool)]
ind_blocks[0:int(np % sam_xblock)] = sam_xblock + 1
ind_blocks = numpy.array([numpy.array([0]),numpy.cumsum(ind_blocks, 0)], dtype=ind_blocks.dtype).flatten()
if auto == 1:
while auto == 1:
if filling == -numpy.inf:
xtemp = cat(1, numpy.tile(xp[:, :1], (1, n)),
xp, numpy.tile(xp[:, -1:, ], (1, n)))
elif numpy.isnan(filling):
# FIXME
xtemp = cat(1,
nans(np, n), xp, nans(np, n))
if rw == 1:
ref_w = numpy.arange(0, mp).reshape(1,-1)
ind = nans(np, 1)
r = False
for i_block in range(0, n_blocks):
block_indices = numpy.array(range(ind_blocks[i_block], ind_blocks[i_block + 1]))
xpTemp = xp[:, ref_w[0,:]]
xpTemp = xpTemp[block_indices,:]
# xpTemp = xpTemp.take(ref_w, axis=1)
# xpTemp = xpTemp.reshape(max(block_indices.shape),max(ref_w.shape))
_, ind[block_indices], ri = cc_fft_shift(xt[0, ref_w].reshape(1,-1), xpTemp,
numpy.array([-n, n, 2, 1, -99999], dtype=int) )
if not r:
r = numpy.empty((0, ri.shape[1]))
r = cat(0, r, ri).T
temp_index = range(-n, n+1)
for i_sam in range(0, np):
index = numpy.flatnonzero(temp_index == ind[i_sam])[0]
t_index = range(index, index+mp)
xw[i_sam, :] = [xtemp[i_sam, j] for j in t_index]
if (numpy.max(abs(ind)) == n) and try_last != 1:
if n + src_step >= ref_w.shape[0]:
try_last = 1
continue
n += src_step
continue
else:
if (numpy.max(abs(ind)) < n) and n + src_step < len(ref_w) and try_last != 1:
n += src_step
try_last = 1
continue
else:
auto = 0
logging.info('Best shift allowed for this interval = %d' % n)
else:
if filling == -numpy.inf:
xtemp = cat(1, numpy.tile(xp[:, :1], (1., n)),
xp, numpy.tile(xp[:, -1:, ], (1., n)))
elif numpy.isnan(filling):
xtemp = cat(1,
nans(np, n), xp, nans(np, n))
if rw == 1:
ref_w = numpy.arange(0, mp) #.reshape(1,-1)
ind = nans(np, 1)
r = numpy.array([])
for i_block in range(n_blocks):
block_indices = range(ind_blocks[i_block], ind_blocks[i_block + 1])
dummy, ind[block_indices], ri = cc_fft_shift(xt[0, ref_w].reshape(1,-1), xp[block_indices, :][:, ref_w],
numpy.array([-n, n, 2, 1, filling]))
r = cat(0, r, ri)
temp_index = numpy.arange(-n, n+1)
for i_sam in range(0, np):
index = numpy.flatnonzero(temp_index == ind[i_sam])
xw[i_sam, :] = xtemp[i_sam, index:index + mp]
if numpy.max(abs(ind)) == n:
logging.warn('Scrolling window size "n" may not be enough wide because extreme limit has been reached')
return xw, ind, r
def defints(xp, interv):
np, mp = xp.shape
sizechk = mp / interv - round(mp / interv)
plus = (mp / interv - round(mp / interv)) * interv
logging.warn('The last interval will not fulfill the selected intervals size "inter" = %f' % interv)
if plus >= 0:
logging.warn('Size of the last interval = %d ' % plus)
else:
logging.warn('Size of the last interval = %d' % (interv + plus))
if sizechk != 0:
logging.info('The last interval will not fulfill the selected intervals size "inter"=%f.' % interv)
logging.info('Size of the last interval = %f ' % plus)
t = range(0, (mp + 1), interv)
t.extend([mp])
# t = cat(1, numpy.array(range(0, (mp + 1), interv)), numpy.array(mp))
if t[-1] == t[-2]:
t[-1] = numpy.array([])
t = cat(1, numpy.array(t[0: - 1]), numpy.array(t[1:])-1)
inter = t.reshape(-1,1)[:,0]
return inter
def cc_fft_shift(t, x=False, options=numpy.array([])):
dim_x = numpy.array(x.shape)
dim_t = numpy.array(t.shape)
options_default = numpy.array([-numpy.fix(dim_t[-1] * 0.5), numpy.fix(dim_t[-1] * 0.5), len(t.shape) -1, 1, numpy.nan])
options = numpy.array([options[oi] if oi < len(options) else d for oi, d in enumerate(options_default)], dtype=int)
options[numpy.isnan(options)] = options_default[numpy.isnan(options)]
if options[0] > options[1]:
raise(Exception, 'Lower bound for shift is larger than upper bound')
time_dim = int(options[2] - 1) #why???????????
if dim_x[time_dim] != dim_t[time_dim]:
raise(Exception, 'Target and signals do not have compatible dimensions')
ord_ = numpy.array(
[time_dim] +
list(range(1, time_dim)) +
list(range(time_dim, len(x.shape) - 1)) +
[0]
).T
x_fft = numpy.transpose(x, ord_) # permute
x_fft = numpy.reshape(x_fft, (dim_x[time_dim], numpy.prod(dim_x[ord_[1:]])))
# FIXME? Sparse/dense switchg
p = numpy.arange(0, numpy.prod(dim_x[ ord_[1:] ] ) )
s = numpy.max(p) + 1
b = dia_matrix( (1.0/numpy.sqrt(numpy.nansum(x_fft ** 2, axis=0)), [0]), shape=(s,s) ).todense()
x_fft = numpy.dot(x_fft, b)
t = numpy.transpose(t, ord_)
t = numpy.reshape(t, (dim_t[time_dim], numpy.prod(dim_t[ord_[1:]])))
t = normalise(t)
np, mp = x_fft.shape
nt = t.shape[0]
flag_miss = numpy.any(numpy.isnan(x_fft)) or numpy.any(numpy.isnan(t))
if flag_miss:
if len(x.shape) > 2:
raise(Exception, 'Multidimensional handling of missing not implemented, yet')
miss_off = nans(1, mp)
for i_signal in range(0, mp):
limits = remove_nan(
numpy.array([0, np - 1]).reshape(1, -1), x_fft[:, i_signal].reshape(1, -1), numpy.all)
if limits.shape != (2, 1):
raise(Exception, 'Missing values can be handled only if leading or trailing')
if numpy.any(cat(1, limits[0], mp - limits[1]) > numpy.max(abs(options[0:2]))):
raise(Exception, 'Missing values band larger than largest admitted shift')
miss_off[i_signal] = limits[0]
if numpy.any(miss_off[i_signal-1] > 1):
x_fft[0:limits[1] - limits[0] + 1,
i_signal] = x_fft[limits[0]:limits[1], i_signal]
if limits[1] < np:
x_fft[(limits[1] - limits[0] + 1):np, i_signal] = 0
limits = remove_nan(numpy.array([0, nt - 1]), t.T, numpy.all)
t[0:limits[1] - limits[0] + 1, :] = t[limits[0]:limits[1],:]
t[limits[1] - limits[0] + 1:np, :] = 0
miss_off = miss_off[0:mp] - limits[0]
x_fft = cat(0, x_fft, numpy.zeros(
(numpy.max(numpy.abs(options[0:2])), numpy.prod(dim_x[int(ord_[1:])], axis=0))
))
t = cat(0, t, numpy.zeros(
(numpy.max(numpy.abs(options[0:2])), numpy.prod(dim_t[ord_[1:]], axis=0))
))
len_fft = max(x_fft.shape[0], t.shape[0])
shift = numpy.arange(options[0], options[1] + 1)
if (options[0] < 0) and (options[1] > 0):
ind = list(range(int(len_fft + options[0]), int(len_fft))) + \
list(range(0, int(options[1] + 1)))
elif (options[0] < 0) and (options[1] < 0):
ind = list(range(len_fft + options[0], (len_fft + options[1] + 1)))
elif (options[0] < 0) and (options[1] == 0):
ind = list(range(int(len_fft + options[0]),
int(len_fft + options[1] + 1))) + [1]
else:
# ind = Options(1) + 1:Options(2) + 1
ind = range(int(options[0]), int(options[1] + 1))
# Pad to next ^2 for performance on the FFT
fft_pad = int( 2**numpy.ceil( numpy.log2(len_fft) ) )
x_fft = numpy.fft.fft(x_fft, fft_pad, axis=0)
t_fft = numpy.fft.fft(t, fft_pad, axis=0)
t_fft = numpy.conj(t_fft)
t_fft = numpy.tile(t_fft, (1, dim_x[0]))
dt = x_fft * t_fft
cc = numpy.fft.ifft(dt, fft_pad, axis=0)
if len(ord_[1:-1]) == 0:
k = 1
else:
k = numpy.prod(dim_x[ord_[1:-1]])
cc = numpy.reshape(cc[ind, :], ( options[1]-options[0]+1, k, dim_x[0]) )
if options[3] == 0:
cc = numpy.squeeze(numpy.mean(cc, axis=1))
else:
if options[3] == 1:
cc = numpy.squeeze(numpy.prod(cc, axis=1))
else:
raise(Exception, 'Invalid options for correlation of multivariate signals')
pos = cc.argmax(axis=0)
values = cat(1, numpy.reshape(shift, (len(shift), 1)), cc)
shift = shift[pos]
if flag_miss:
shift = shift + miss_off
x_warp = nans(*[dim_x[0]] + list(dim_t[1:]))
ind = numpy.tile(numpy.nan, (len(x.shape), 18))
indw = ind
time_dim = numpy.array([time_dim])
for i_X in range(0, dim_x[0]):
ind_c = i_X
if shift[i_X] >= 0:
ind = numpy.arange(shift[i_X], dim_x[time_dim]).reshape(1, -1)
indw = numpy.arange(0, dim_x[time_dim] - shift[i_X]).reshape(1, -1)
if options[4] == - numpy.inf:
o = numpy.zeros(abs(shift[i_X])).astype(int)
if len(o) > 0:
ind = cat(1,
ind,
numpy.array(dim_x[time_dim[o]] - 1).reshape(1, -1)
)
indw = cat(1,
indw,
numpy.arange(dim_x[time_dim] - shift[i_X],
dim_x[time_dim]).reshape(1, -1)
)
elif shift[i_X] < 0:
ind = numpy.arange(0, dim_x[time_dim] + shift[i_X]).reshape(1, -1)
indw = numpy.arange(-shift[i_X], dim_x[time_dim]).reshape(1, -1)
if options[4] == - numpy.inf:
ind = cat(1, numpy.zeros((1, -shift[i_X])), ind)
indw = cat( 1, numpy.arange(0, -shift[i_X]).reshape(1, -1), indw)
x_warp[ind_c, indw.astype(int)] = x[ind_c, ind.astype(int)]
shift = numpy.reshape(shift, (len(shift), 1))
return x_warp, shift, values
def remove_nan(b, signal, select=numpy.any, flags=False):
c = nans(b.shape[0], b.shape[1] if len(b.shape) > 1 else 1)
b = b.reshape(1, -1)
count = 0
signal = numpy.isnan(signal)
for i_el in range(0, b.shape[0]):
ind = numpy.arange(b[i_el, 0], b[i_el, 1] + 1)
in_ = select(signal[:, ind], axis=0)
if numpy.any(in_):
p = numpy.diff(numpy.array([0] + in_).reshape(1, -1), 1, axis=1)
a = numpy.flatnonzero(p < 0) + 1
b = numpy.flatnonzero(p > 0)
if numpy.any(~in_[0]):
a = cat(1, numpy.array([0]), a)
else:
b = b[1:]
if numpy.any(~in_[-1]):
b = cat(1, b, numpy.array([max(ind.shape) - 1]))
a = numpy.unique(a)
b = numpy.unique(b)
# d = ind[cat(1, a, b)]
d = numpy.stack((a, b), axis=-1)
c.resize(d.shape, refcheck=False)
c[count:count + max(a.shape) + 1] = d
count = count + max(a.shape)
else:
c[count, :] = b[i_el,:]
count += 1
c = c.astype(int)
an = c
if flags:
flag = numpy.empty((c.shape[0], 2, signal.shape[0]), dtype=bool)
flag[:] = False
c_inds = c[:, 0] > 1
c_inds = c_inds.astype(bool)
c_inde = c[:, 1] < (signal.shape[1] -1)
c_inde = c_inde.astype(bool)
flag[c_inds, 0, :] = signal[:, c[c_inds, 0] - 1].T
flag[c_inde, 1, :] = signal[:, c[c_inde, 1] + 1].T
return an, flag
else:
return an
def normalise(x, flag=False):
if not flag:
p_att = ~numpy.isnan(x)
flag = numpy.any(~p_att[:])
else:
p_att = ~numpy.isnan(x)
m, n = x.shape
xn = nans(m, n)
if flag:
for i_n in range(0, n):
n = numpy.linalg.norm(x[p_att[:, i_n], i_n])
if not n:
n = 1
xn[p_att[:, i_n], i_n] = x[p_att[:, i_n], i_n] / n
else:
for i_n in range(0, n):
n = numpy.linalg.norm(x[:, i_n])
if not n:
n = 1
xn[:, i_n] = x[:, i_n] / n
return xn
def extract_segments(x, segments):
n, p = x.shape
# segments = segments.T
Sd = numpy.diff(segments, axis=1)
q = numpy.sum(Sd + 1)
s, t = segments.shape
flag_si = t != 2
flag_in = numpy.any(segments[:] != numpy.fix(segments[:]))
flag_ob = numpy.any(segments[:, 0] < 0) or numpy.any(segments[:, 1] > p-1)
flag_ni = numpy.any(numpy.diff(segments[:, 0]) < 0) or numpy.any(
numpy.diff(segments[:, 1]) < 0)
flag_ab = numpy.any(Sd < 2)
if flag_si:
raise(Exception, 'Segment boundary matrix must have two columns')
if flag_in:
raise(Exception, 'Segment boundaries must be integers')
if flag_ob:
raise(Exception, 'Segment boundaries outside of segment')
if flag_ni:
raise(Exception, 'segments boundaries must be monotonically increasing')
if flag_ab:
raise(Exception, 'segments must be at least two points long')
xseg = nans(n, q)
origin = 0
segnew = []
for seg in segments:
data = x[:, seg[0]:seg[1] + 1]
segment_size = data.shape[1]
xseg[:, origin:origin + segment_size] = data
segnew.append([origin, origin + segment_size - 1])
origin = origin + segment_size
segnew = numpy.array(segnew)
return xseg, segnew
def find_nearest(array, value):
idx = (numpy.abs(array-value)).argmin()
return array[idx], idx
def scal2pts(ppmi, ppm=[], prec=None):
rev = ppm[0] > ppm[1]
if prec is None:
prec = min(abs(numpy.unique(numpy.diff(ppm))))
pts = []
for i in ppmi:
nearest_v, idx = find_nearest(ppm, i)
if abs(nearest_v-i) > prec:
pts.append(numpy.nan)
else:
pts.append( idx )
return numpy.array(pts)
def dscal2dpts(d, ppm, prec=None):
if d == 0:
return 0
if d <= 0:
raise(Exception, 'delta must be positive')
if ppm[0] < ppm[1]: # Scale in order
i = scal2pts(numpy.array([ppm[0] + d]), ppm, prec) -1
else:
i = max(ppm.shape) - scal2pts(numpy.array([ppm[-1] + d]), ppm, prec) +1
return i[0]
Helloi @mfitzp, this is to ask you if this script is still alive/valid. I´m trying it but I´m finding an issue probably related to versions... It would be great if you could help me. Thanks
My plan is to use it to align chromatography data (vectors).
ImportError Traceback (most recent call last) Input In [22], in <cell line: 1>() ----> 1 import icoshift
File ~\AppData\Roaming\Python\Python39\site-packages\icoshift__init__.py:1, in
----> 1 from .icoshift import *
File ~\AppData\Roaming\Python\Python39\site-packages\icoshift\icoshift.py:4, in
2 import numpy
3 import scipy as sp
----> 4 from scipy.stats import nanmean, nanmedian
5 from copy import copy
6 import sys
ImportError: cannot import name 'nanmean' from 'scipy.stats' (C:\ProgramData\Anaconda3\lib\site-packages\scipy\stats__init__.py)