import numpy as np
from scipy.signal import convolve2d
from functools import reduce
from itertools import groupby
from operator import itemgetter
[docs]def array_thresh(mcthresh, az_volc, az_diff, mdccm, az, vel):
r"""
Find array processing values above multiple set thresholds for MCCM,
back-azimuth, and trace velocity. Uses default 0.25–0.45 km/s for trace
velocity thresholds. Also finds consecutive segments that meet thresholds,
but these values are not currently returned.
Args:
mcthresh (float): MCCM or MdCCM threshold (0–1)
az_volc (float): Back-azimuth to target volcano or source (0–359)
az_diff (float): Tolerance for back-azimuth from `az_volc`
mdccm: MdCCM or MCCM values from array processing (0–1)
az: Back-azimuth values (0–359)
vel: Trace-velocity values (km/s)
Returns:
Indices to time segments that meet set thresholds
"""
# Use numpy to find where thresholds are exceeded
mc_good = np.where(mdccm > mcthresh)[0]
az_good = np.where((az >= az_volc - az_diff) & (az <= az_volc + az_diff))[0]
vel_good = np.where((vel >= .25) & (vel <= .45))[0]
igood = reduce(np.intersect1d, (mc_good, az_good, vel_good))
# Find find number of consecutive values exceeded.
ranges = []
nconsec = []
for k, g in groupby(enumerate(igood), lambda x: x[0]-x[1]):
group = list(map(itemgetter(1), g))
ranges.append((group[0], group[-1]))
nconsec.append(group[-1]-group[0]+1)
if len(nconsec) > 0:
consecmax = max(nconsec)
else:
consecmax = 0
print('%d above trheshold, %d consecutive\n' % (len(igood), consecmax))
return igood
[docs]def phaseAlignData(data, delays, wgt, refTrace, M, Moffset, plotFlag=False):
r"""
Embeds ``n`` phase aligned traces in a data matrix.
Args:
data: ``(m, n)`` array; time series with ``m`` samples from ``n``
traces as columns
delays: ``(n, )`` array; vector of shifts as indices for embedding
traces in an array, such that trace ``i`` will begin at index
``out[i]``
wgt: Vector of relative weights of length ``n`` (0 == exclude trace by
setting to padding value, see `plotFlag`)
refTrace (int): Reference sensor for TDOA information
M (int): Length of best beam result in samples (use ``m`` to let beam
be same length as input traces)
Moffset: Individual trace offsets from arrival model shifts (use ``[0
for i in range(nTraces)]`` to skip this effect)
plotFlag (bool): Flag to indicate output array will be used for
plotting purposes. Default is `False` (pads shifts with zeros; pads
with :data:`numpy.nan` if `True`)
Returns:
``(M, n)`` array of shifted traces as columns
Notes:
The output of :func:`phaseAlignIdx` is used to calculate the input
`delays`.
"""
# (c) 2017 Curt A. L. Szuberla
# University of Alaska Fairbanks, all rights reserved
#
# -- this is low level code w/out error checks or defaults, designed
# -- to be called by wrappers that make use of the indices provided
# size up data
m, nTraces = data.shape
# if plotting, embed in array of np.nan
if plotFlag:
nanOrOne = np.nan
else:
nanOrOne = 1
# correct for negative Moffset elements
# subtract this to ensure corrected delays is positive,
# semi-definite and has (at least) one zero element
maxNegMoffset = min(np.array(Moffset)[np.array(Moffset) <= 0])
# apply Moffset & correction for negative elements of Moffset
delays = delays + Moffset - maxNegMoffset
# start with everything in the window as a default (trim||pad later)
data_align = np.zeros((max(delays) + m, nTraces)) * nanOrOne
# embed shifted traces in array
for k in range(nTraces):
if wgt[k]:
data_align[delays[k]:delays[k]+m, k] = data[:, k] * wgt[k]
# truncate|| pad data_align if M >< m, centered on refTrace
mp = data_align.shape[0] # new value for m
if M is not None and M is not mp:
alignBounds = [delays[refTrace] + m//2 - M//2,
delays[refTrace] + m//2 + M//2]
# trap round-off errors and force (M, nTraces) data_align
if alignBounds[1]-alignBounds[0] != M:
alignBounds[1] += 1
if not (alignBounds[1]-alignBounds[0])%2:
alignBounds[0] -= 1
# -- LHS (graphically, but actually topside in array-land!)
if alignBounds[0] < 0:
# pad LHS of traces w zeros or np.nans
data_align = np.vstack((np.zeros((-alignBounds[0], nTraces)) * nanOrOne,
data_align))
elif alignBounds[0] > 0:
data_align = data_align[alignBounds[0]:]
# -- RHS (graphically, but actually bottom in array-land!)
if alignBounds[1] > mp:
# pad RHS of traces w zeros or np.nans
data_align = np.vstack((data_align, np.zeros((alignBounds[1] - mp,
nTraces)) * nanOrOne))
elif alignBounds[1] < mp:
data_align = data_align[:M]
return data_align
[docs]def phaseAlignIdx(tau, Hz, wgt, refTrace):
r"""
Calculate shifts required to phase align ``n`` traces in a data matrix.
Args:
tau: ``(n(n-1)//2, )`` array; time delays of relative signal arrivals
(TDOA) for all unique sensor pairings
Hz (int or float): Sample rate
wgt: Vector of relative weights of length ``n`` (0 = exclude trace)
refTrace (int): Reference sensor for TDOA information
Returns:
``(n, )`` array; vector of shifts as indices for embedding traces in an
array, such that trace ``i`` will begin at index ``out[i]``
Notes:
The output of this function is compatible with the inputs of
:func:`phaseAlignData`.
"""
# -- this is low level code w/out error checks or defaults, designed
# -- to be called by wrappers that make use of the indices provided
# solve for number of traces from pairings in tau
nTraces = int(1 + np.sqrt(1 + 8 * len(tau))) // 2
# calculate delays (samples) relative to refTrace for each trace
# -- first pass grabs delays starting with refTrace as i in ij
delayIdx = (nTraces*refTrace - refTrace*(refTrace+1)//2,
nTraces*(refTrace+1) - (refTrace+1)*(refTrace+2)//2)
delays = np.hstack((0, (tau[range(delayIdx[0], delayIdx[1])] * Hz))).astype(int)
# the std. rij list comprehension for unique inter-trace pairs
tau_ij = [(i, j) for i in range(nTraces) for j in range(i+1, nTraces)]
# -- second pass grabs delays with refTrace as j in ij
preRefTau_idx = [k for k in range(len(tau)) if tau_ij[k][1] == refTrace]
delays = np.hstack((-tau[preRefTau_idx] * Hz, delays)).astype(int)
# re-shift delays such that the first sample of the trace requiring the
# largest shift left relative to the refTrace (hence, largest positive,
# semi-definite element of delays) has an index of zero; i.e., all traces
# have a valid starting index for embedding into an array (no padding)
return -delays + max(delays)
[docs]def tauCalcPW(vel, azPhi, rij):
r"""
Calculates theoretical tau vector for a plane wave moving across an array
of ``n`` elements.
Args:
vel (float): Signal velocity across array
azPhi: Back azimuth (float) from co-array coordinate origin (° CW from
N); back azimuth and elevation angle (array) from co-array
coordinate origin (° CW from N, ° from N-E plane)
rij: ``(d, n)`` array; ``n`` element coordinates as [easting, northing,
{elevation}] column vectors in ``d`` dimensions
Returns:
``(n(n-1)//2, )`` array; time delays of relative signal arrivals (TDOA)
for all unique sensor pairings
"""
dim, nTraces = rij.shape
if dim == 2:
rij = np.vstack((rij, np.zeros((1, nTraces))))
idx = [(i, j) for i in range(rij.shape[1]-1)
for j in range(i+1, rij.shape[1])]
X = rij[:, [i[0] for i in idx]] - rij[:, [j[1] for j in idx]]
if np.isscalar(azPhi):
phi = 0
az = azPhi
else:
phi = azPhi[1] / 180 * np.pi
az = azPhi[0]
az = np.pi * (-az / 180 + 0.5)
s = np.array([np.cos(az), np.sin(az), np.sin(phi)])
s[:-1] *= np.cos(phi)
return X.T@(s/vel)
[docs]def tauCalcSW(vel, rAzPhi, rij):
r"""
Calculates theoretical tau vector for a spherical wave moving across an
array of ``n`` elements.
Args:
vel (float): Signal velocity across array
rAzPhi: Range to source and back azimuth from co-array coordinate
origin (° CW from N); range to source, back azimuth and elevation
angle from co-array coordinate origin (° CW from N, ° from N-E
plane)
rij: ``(d, n)`` array; ``n`` element coordinates as [easting, northing,
{elevation}] column vectors in ``d`` dimensions
Returns:
``(n(n-1)//2, )`` array; time delays of relative signal arrivals (TDOA)
for all unique sensor pairings
"""
dim, nTraces = rij.shape
if dim == 2:
rij = np.vstack((rij, np.zeros((1, nTraces))))
if len(rAzPhi) == 3:
phi = rAzPhi[2] / 180 * np.pi
else:
phi = 0
idx = [(i, j) for i in range(rij.shape[1]-1)
for j in range(i+1, rij.shape[1])]
# aw, this is so convolutedly elegant that it must be saved in a
# comment for posterity!, but the line below it is "simpler"
# az = -( (rAzPhi[1]/180*pi - 2*pi)%(2*pi) - pi/2 )%(2*pi)
az = np.pi * (-rAzPhi[1] / 180 + 0.5)
source = rAzPhi[0] * np.array([np.cos(az), np.sin(az), np.sin(phi)])
source[:-1] *= np.cos(phi)
tau2sensor = np.linalg.norm(rij - np.tile(source, nTraces).reshape(nTraces, 3).T,
2, axis=0)/vel
return tau2sensor[[j[1] for j in idx]] - tau2sensor[[i[0] for i in idx]]
[docs]def tauCalcSWxy(vel, xy, rij):
r"""
Calculates theoretical tau vector for a spherical wave moving across an
array of ``n`` elements.
Args:
vel (float): Signal velocity across array
xy: ``(d, )`` array; source location as 2-D [easting, northing] or 3-D
[easting, northing, elevation] coordinates
rij: ``(d, n)`` array; ``n`` element coordinates as [easting, northing,
{elevation}] column vectors in ``d`` dimensions
Returns:
``(n(n-1)//2, )`` array; time delays of relative signal arrivals (TDOA)
for all unique sensor pairings
"""
dim, nTraces = len(rij), len(rij[0])
if dim == 2:
rij = np.vstack((rij, [0] * nTraces))
else:
rij = np.vstack((rij,))
if len(xy) == 2:
xy0 = 0
else:
xy0 = []
source = np.hstack((xy, xy0))
idx = [(i, j) for i in range(rij.shape[1]-1)
for j in range(i+1, rij.shape[1])]
tau2sensor = np.linalg.norm(rij - np.tile(source, nTraces).reshape(nTraces, 3).T,
2, axis=0)/vel
return tau2sensor[[j[1] for j in idx]] - tau2sensor[[i[0] for i in idx]]
[docs]def randc(N, beta=0.0):
r"""
Colored noise generator. This function generates pseudo-random colored
noise (power spectrum proportional to a power of frequency) via fast
Fourier inversion of the appropriate amplitudes and complex phases.
Args:
N (int or tuple): Shape of output array
beta (float): Spectrum of output will be proportional to ``f**(-beta)``
Returns:
Colored noise sequences as columns with shape `N`, each normalized to
zero-mean and unit-variance. Result is always real valued
Notes:
Spectrum of output will be :math:`\sim1/f^\beta`. White noise is the
default (:math:`\beta = 0`); others are pink (:math:`\beta = 1`) or
brown/surf (:math:`\beta = 2`). Since the output is zero-mean, the DC
spectral component(s) will be identically zero.
"""
# catch scalar input & form tuple
if type(N) is int:
N = (N,)
# ensure DC component of output will be zero (unless beta == 0, see below)
if beta < 0:
c0 = 0
else:
c0 = np.inf
# catch the case of a 1D array in python, so dimensions act like a matrix
if len(N) == 1:
M = (N[0], 1) # use M[1] any time # of columns is called for
else:
M = N
# phase array with size (# of unique complex Fourier components,
# columns of original data)
n = int(np.floor((N[0] - 1) / 2)) # works for odd/even cases
cPhase = np.random.random_sample((n, M[1])) * 2 * np.pi
# Nyquist placeholders
if N[0]%2:
# odd case: Nyquist is 1/2 freq step between highest components
# so it is empty
cFiller = np.empty((0,))
pFiller = np.empty((0, M[1]))
else:
# even case: we have a Nyquist component
cFiller = N[0]/2
pFiller = np.zeros((1, M[1]))
# noise amplitudes are just indices (unit-offset!!) to be normalized
# later, phases are arranged as Fourier conjugates
r = np.hstack((c0, np.arange(1, n + 1), cFiller, np.arange(n, 0, -1)))
phasor = np.exp(np.vstack((np.zeros((1, M[1])), 1j * cPhase, pFiller,
-1j * np.flipud(cPhase))))
# this is like my cols.m function in MATLAB
r = np.tile(r, M[1]).reshape(M[1], N[0]).T ** (-beta / 2)
# catch beta = 0 case here to ensure zero DC component
if not beta:
r[0] = 0
# inverse transform to get time series as columns, ensuring real output
X = r*phasor
r = np.real(np.fft.ifft(X, axis=0)*X.shape[0])
# renormalize r such that mean = 0 & std = 1 (MATLAB dof default used)
# and return it in its original shape (i.e., a 1D vector, if req'd)
return r.reshape(N) / np.std(r, ddof=1)
[docs]def psf(x, p=2.0, w=3, n=3.0, window=None):
r"""
Pure-state filter a data matrix. This function uses a generalized coherence
estimator to enhance coherent channels in an ensemble of time series.
Args:
x: ``(m, d)`` array of real-valued time series data as columns
p (float): Level of contrast in filter
w (int): Width of smoothing window in frequency domain
n (float): Number of times to smooth in frequency domain
window: Type of smoothing window in frequency domain. Default is
`None`, which results in a triangular window
Returns:
tuple: Tuple containing:
- **x_psf** – ``(m, d)`` array; real-valued, pure state-filtered
version of `x`
- **P** – ``(m//2+1, )`` array; degree of polarization (generalized
coherence estimate) in frequency components of `x` from DC to the
Nyquist
Notes:
See any of Samson & Olson's early 1980s papers, or Szuberla's 1997 PhD
thesis, for a full description of the underlying theory. The code
implementation's defaults reflect historical values for the smoothing
window — a more realistic `w` would be of order :math:`\sqrt{m}`
combined with a smoother window, such as :func:`numpy.hanning`. Letting
`n=3` is a reasonable choice for all window types to ensure confidence
in the spectral estimates used to construct the filter.
For :math:`m` samples of :math:`d` channels of data, a ``(d, d)``
spectral matrix :math:`\mathbf{S}[f]` can be formed at each of the
``m//2+1`` real frequency components from DC to the Nyquist. The
generalized coherence among all of the :math:`d` channels at each
frequency is estimated by
.. math::
P[f] = \frac{d \left(\text{Tr}\,\mathbf{S}^2[f]\right) -
\left(\text{Tr}\,\mathbf{S}[f]\right)^2}
{\left(d-1\right)\left(\text{Tr}\,\mathbf{S}[f]\right)^2},
where :math:`\text{Tr}\,\mathbf{S}[f]` is the trace of the spectral
matrix at frequency :math:`f`. The filter is constructed by applying
the following multiplication in the frequency domain
.. math::
\hat{\mathbf{X}}[f] = P[f]^p\mathbf{X}[f],
where :math:`\mathbf{X}[f]` is the Fourier transform component of the
all channels at frequency :math:`f` and :math:`p` is the level of
contrast. The inverse Fourier transform of the matrix
:math:`\hat{\mathbf{X}}` gives the filtered time series.
The estimator :math:`\mathbf{P}[f] = 1`, identically, without smoothing
in the spectral domain (a consequence of the variance in the raw
Fourier components), but it is bound by
:math:`\mathbf{P}[f]\in[0,1]` even withn smoothing, hence its
utility as a multiplicative filter in the frequency domain. Similarly,
this bound allows the contrast between channels to be enhanced based on
their generalized coherence if :math:`p>1`.
Data channels should be pre-processed to have unit-variance, since
unlike the traditional two-channel magnitude squared coherence
estimators, the generalized coherence estimate can be biased by
relative amplitude variations among the channels. To mitigate the
effects of smoothing complex values into the DC and Nyquist components,
they are set to zero before computing the inverse transform of
:math:`\hat{\mathbf{X}}`.
"""
# private functions up front
def Ssmooth(S, w, n, window):
# smooth special format of spectral matries as vectors
for k in range(n):
# f@#$ing MATLAB treats odd/even differently with mode='full'
# but the behavior below now matches conv2 exactly
S = convolve2d(S, window(w).reshape(-1, 1),
mode='full')[w//2:-w//2+1, :]
return S
def triang(N):
# for historical reasons, the default window shape
return np.bartlett(N + 2)[1:-1]
# size up input data
N, d = x.shape
# Fourier transform of data matrix by time series columns, retain only
# the diagonal & above (unique spectral components)
Nx = x.shape
X = np.fft.fft(x, axis=0)/Nx[0]
X = X[:N//2+1, :]
# form spectral matrix stack in reduced vector form (**significant**
# speed improvement due to memory problem swith 3D tensor format -- what
# was too slow in 1995 is still too slow in 2017!)
# -- pre-allocate stack & fill it in
Sidx = [(i, j) for i in range(d) for j in range(i, d)]
S = np.empty((X.shape[0], d * (d + 1) // 2), dtype=complex)
for i in range(X.shape[0]):
# at each freq w, S_w is outer product of raw Fourier transforms
# of each column at that freq; select unique components to store
S_w = np.outer(X[i, :], X[i, :].conj())
S[i, :] = S_w[[i[0] for i in Sidx], [j[1] for j in Sidx]]
# smooth each column of S (i,e., in freq. domain)
if not window:
# use default window
window = triang
S = Ssmooth(S, w, n, window)
# trace calculations (notation consistent w traceCalc.m in MATLAB), but
# since results are positive, semi-definite, real -- return as such
# -- diagonal elements
didx = [i for i in range(len(Sidx)) if Sidx[i][0] == Sidx[i][1]]
# -- traceS**2 of each flapjack (really a vector here)
trS = sum(S[:, didx].real.T)**2
# -- trace of each flapjack (ditto, vector), here we recognize that
# trace(S@S.T) is just sum square magnitudes of all the
# non-redundant components of S, doubling squares of the non-diagonal
# elements
S = (S*(S.conj())*2).real
S[:, didx] /= 2
trS2 = sum(S.T)
# estimate samson-esque polarization estimate (if d==2, same as fowler)
P = (d*trS2 - trS)/((d-1)*trS)
# a litle trick here to handle odd/even number of samples and zero-out
# both the DC & Nyquist (they're both complex-contaminated due to Ssmooth)
P[0] = 0
if N%2:
# odd case: Nyquist is 1/2 freq step between highest components
fudgeIdx = 0
else:
# even case: we have a Nyquist component
fudgeIdx = 1
P[-1] = 0
# apply P as contrast agent to frequency series
X *= np.tile(P ** p, d).reshape(X.shape[::-1]).T
# inverse transform X and ensure real output
XX = np.vstack((X[list(range(N // 2 + 1))],
X[list(range(N//2-fudgeIdx, 0, -1))].conj()))
x_psf = np.real(np.fft.ifft(XX, axis=0)*XX.shape[0])
return x_psf, P