Source code for fastkde.empiricalCharacteristicFunction

#!/usr/bin/env python
import numpy as npy
import fastkde.nufft as nufft


[docs] class ECF: """ Calculates the empirical characteristic function of arbitrary sets of variables. Uses either the direct Fourier transform or nuFFT method (described by O'Brien et al. (2014, J. Roy. Stat. Soc. C) to calculate the Fourier transform of the data to yield the ECF. input: ------ input_data : The input data. Array like with shape = (nvariables,npoints). tgrids : The frequency-space grids to which to transform the data A list of frequency arrays for each variable dimension. use_fft_approximation : Flag whether to use the nuFFT approximation to the DFT be_verbose : Flags whether to be verbose output: ------- An ECF object. The ECF itself is stored in self.ECF """ def __init__( self, input_data, tgrids, precision=2, use_fft_approximation=True, be_verbose=False, ): # Set whether we use the nuFFT approximation self.use_fft_approximation = use_fft_approximation # Get the data shape (nvariables,ndatapoints) dshape = npy.shape(input_data) rank = len(dshape) if rank != 2: raise ValueError( "input_data must be a rank-2 array of shape [nvariables,ndatapoints]; " "got rank = {}".format(rank) ) # Extract the number of variables self.nvariables = dshape[0] # Extract the number of data points self.ndatapoints = dshape[1] # Set the frequency points self.tgrids = list(tgrids) try: gridRank = len(self.tgrids) except TypeError: raise ValueError("Could not determine the number of tgrids") if gridRank != self.nvariables: raise ValueError( "The rank of tgrids should be {}. It is {}".format( gridRank, self.nvariables ) ) # Check for regularity if we are doing nuFFT if self.use_fft_approximation: for n in range(self.nvariables): tpoints = tgrids[n] # Get the spacing of the first two points dt = tpoints[1] - tpoints[0] # Get the spacing of all points deltaT = tpoints[1:] - tpoints[:-1] # Get the difference between these spacings deltaT_diff = deltaT - dt tolerance = dt / 1e6 # Check that all these differences are less than 1/1e6 if not all(abs(deltaT_diff < tolerance)): raise ValueError( "All grids in tgrids must be regularly spaced if " "use_fft_approximation is True" ) # Set verbosity self.be_verbose = be_verbose # Set the precision self.precision = precision # Set the fill value for the frequency grids fill_value = -1e20 # Get the maximum frequency grid length ntmax = npy.amax([len(tgrid) for tgrid in tgrids]) # Create the frequency grids array frequency_grids = fill_value * npy.ones([self.nvariables, ntmax]) # Fill the frequency grids array for v in range(self.nvariables): frequency_grids[v, : len(tgrids[v])] = tgrids[v] # Simply pass in the input data as provided # Calculate the ECF if self.use_fft_approximation: # Calculate the ECF using the fast method myECF = nufft.nuifft( abscissas=input_data, ordinates=npy.ones([input_data.shape[1]], dtype=npy.complex128), frequency_grids=frequency_grids, missing_freq_val=fill_value, precision=precision, be_verbose=int(be_verbose), ) else: # Calculate the ECF using the slow (direct, but exact) method myECF = nufft.idft( abscissas=input_data, ordinates=npy.ones([input_data.shape[1]], dtype=npy.complex128), frequency_grids=frequency_grids, missing_freq_val=fill_value, ) # Ensure that the ECF is normalized mid_point_accessor = tuple([int((len(tgrid) - 1) / 2) for tgrid in tgrids]) if myECF[mid_point_accessor] > 0.0: # Save the ECF in the object self.ECF = myECF / myECF[mid_point_accessor] else: raise RuntimeError( "Midpoint of ECF is 0.0. min(ECF) = {}, max(ECF) = {}".format( npy.amin(myECF), npy.amax(myECF) ) ) return
# ******************************************************************************* # ******************************************************************************* # ******************** Unit testing code **************************************** # ******************************************************************************* # ******************************************************************************* if __name__ == "__main__": # Set the random seed to 0 so the results are repetable npy.random.seed(0) # Flag whether to do the 1-D test doOneDimensionalTest = False if doOneDimensionalTest: import matplotlib.pyplot as plt def mySTDGaus1D(x): return 1.0 / npy.sqrt(2 * npy.pi) * npy.exp(-(x**2) / 2) # Set the real-space/frequency points (Hermitian FFT-friendly) numXPoints = 513 xpoints = npy.linspace(-20, 20, numXPoints) tpoints = nufft.calcTfromX(xpoints) # Calculate the FFT of an actual gaussian; use # this as the empirical characteristic function standard mygaus1d = mySTDGaus1D(xpoints) mygauscf = npy.fft.fftshift(npy.fft.ifftn(npy.fft.ifftshift(mygaus1d))) nh = (len(tpoints) - 1) / 2 mygauscf /= mygauscf[nh] # Set the number of data points ndatapoints = 2**10 # Set the number of variables nvariables = 1 # Randomly sample from a normal distribution xyrand = npy.random.normal(loc=0.0, scale=1.0, size=[nvariables, ndatapoints]) # Calculat the ECF using the fast method ecfFFT = ECF(xyrand, tpoints[npy.newaxis, :], use_fft_approximation=True).ECF # Calculat the ECF using the slow method ecfDFT = ECF(xyrand, tpoints[npy.newaxis, :], use_fft_approximation=False).ECF # Print the 0-frequencies (should be 1 for all) print(ecfFFT[nh], ecfDFT[nh], mygauscf[nh]) plt.subplot(121, xscale="log", yscale="log") # Plot the magnitude of the fast and slow ECFs # (these should overlap for all but the highest half of the frequencies) plt.plot(tpoints, abs(ecfFFT), "r-") plt.plot(tpoints, abs(ecfDFT), "b-") # Plot the gaussian characteristic function standard plt.plot(tpoints, abs(mygauscf), "k-") plt.subplot(122, xscale="log", yscale="log") ihalf = len(ecfDFT) / 2 ithreequarters = ihalf + ihalf / 2 sh = slice(ihalf, ithreequarters) plt.plot(tpoints[sh], abs(ecfDFT[sh] - ecfFFT[sh]), "k-") plt.show() doTwoDimensionalTest = True # Flag whether to do 2D tests if doTwoDimensionalTest: import matplotlib.pyplot as plt def std_norm_2d(x, y): return 1.0 / (2 * npy.pi) * npy.exp(-(x**2 + y**2) / 2) # Set the frequency points (Hermitian FFT-friendly) numXPoints = 127 xpoints = npy.linspace(-20, 20, numXPoints) tpoints = nufft.calcTfromX(xpoints) # Calculate points from a 2D gaussian, and take their 2D FFT # to estimate the characteristic function standard xp2d, yp2d = npy.meshgrid(xpoints, xpoints) mygaus2d = std_norm_2d(xp2d, yp2d) mygauscf = npy.fft.fftshift(npy.fft.ifftn(npy.fft.ifftshift(mygaus2d))) nh = (len(tpoints) - 1) / 2 mid_point_accessor = tuple(2 * [nh]) mygauscf /= mygauscf[mid_point_accessor] # Sample points from a gaussian distribution ndatapoints = 2**5 nvariables = 2 xyrand = npy.random.normal(loc=0.0, scale=1.0, size=[nvariables, ndatapoints]) tpointgrids = npy.concatenate(2 * (tpoints[npy.newaxis, :],), axis=0) # Calculate the ECF using the fast method CecfFFT = ECF(xyrand, tpointgrids, use_fft_approximation=True, be_verbose=True) ecfFFT = CecfFFT.ECF # Calculate the ECF using the slow method CecfDFT = ECF(xyrand, tpointgrids, use_fft_approximation=False, be_verbose=True) ecfDFT = CecfDFT.ECF # Use meshgrid to generate 2D arrays of the frequency points tp2d, wp2d = npy.meshgrid(tpoints, tpoints) # Create a figure fig = plt.figure() # Create a 3D set of axes ax = fig.add_subplot(221, projection="3d") # ax.plot_wireframe(tp2d[::4,::4],wp2d[::4,::4],(abs(mygauscf)**2)[::4,::4],color='k') # plot the fast and slow ECFs using a wireframe (they should overlap to the eye) ax.plot_wireframe( tp2d[::4, ::4], wp2d[::4, ::4], (abs(ecfFFT) ** 2)[::4, ::4], color="r" ) ax.plot_wireframe( tp2d[::4, ::4], wp2d[::4, ::4], (abs(ecfDFT) ** 2)[::4, ::4], color="b" ) # Create a 2D set of axes ax2 = fig.add_subplot(222, xscale="log", yscale="log") # Print the normalization constants (should be 1) print( ecfFFT[mid_point_accessor], ecfDFT[mid_point_accessor], mygauscf[mid_point_accessor], ) # plot the magnitudes of the fast and slow ECFs along # an aribtrary slice (they should overlap except in the high frequency range) ax2.plot(tpoints, abs(ecfFFT[nh + 5, :]), "r-") ax2.plot(tpoints, abs(ecfDFT[nh + 5, :]), "b-") # Plot the magnitude of the gaussian characteristic function ax2.plot(tpoints, abs(mygauscf[nh + 5, :]), "k-") # Plot the average difference between the slow and fast ECFs # (will be relatively high because I use a coarse X grid, so that # the slow calculation will finish in my lifetime) errorK = npy.average(abs(ecfFFT - ecfDFT), 0) ax3 = fig.add_subplot(223, xscale="log", yscale="log") ax3.plot( tpoints[len(tpoints) / 2 : 3 * len(tpoints) / 4], errorK[len(tpoints) / 2 : 3 * len(tpoints) / 4], "k-", ) plt.show()