Source code for fastkde.plot

import numpy as np
import pylab as PP
import fastkde.fastKDE as fastKDE


[docs] def cumulative_integral(pdf, axes, integration_axes=None, reverse_axes=None): """ Calculates the cumulative integral of the pdf, given the axis values input: ------ pdf : a PDF (presumably from the .pdf member of a fastKDE object) axes : a list of PDF axes (presumably from the .axes member of a fastKDE object) integration_axes : the axes along which to integrate (default is all). These have the same ordering as axes in the axes input variable. reverse_axes : axes along which to reverse the direction of the cumulative calculation, if any output: ------- returns an array with the same shape as PDF, but with the integral calculated cumulatively """ # Set the default value for integration_axes if integration_axes is None: integration_axes = range(len(np.shape(pdf))) # Set the rank of the pdf rank = len(np.shape(pdf)) # Set the default value for reverse_axes if reverse_axes is None: reverse_axes = [] # Make sure that axes is an iterable of iterables try: axes[0][0] except IndexError: axes = [axes] # Make sure that reverse_axes is an iterable try: len(reverse_axes) except TypeError: reverse_axes = [reverse_axes] # Make sure that integration_axes is an iterable try: len(integration_axes) except TypeError: integration_axes = [integration_axes] # Copy the pdf array cpdf = np.array(pdf) # Loop over the axes for which we are calculating the cumulative for i in integration_axes: # Calculate the grid spacing; assume it is uniform dx = np.diff(axes[i]) dx = np.concatenate([dx, [dx[-1]]]) # Calculate the index of pdf corresponding to axis i iprime = rank - i - 1 # broadcast dx to the ~same shape as cpdf broadcast_tuple = len(cpdf.shape) * [np.newaxis] broadcast_tuple[iprime] = slice(None, None, None) broadcast_tuple = tuple(broadcast_tuple) dx = dx[broadcast_tuple] # Determine if we are reversing cumulative orientation if i in reverse_axes: # Set a slice that reversese only axis i in the PDF reverseSlice = rank * [slice(None, None, None)] reverseSlice[iprime] = slice(None, None, -1) reverseSlice = tuple(reverseSlice) # Do the cumulative calculation on the reversed PDF (and reverse # back to the original orientation) cpdf = np.cumsum(cpdf[reverseSlice] * dx, axis=iprime)[reverseSlice] else: # Do the cumulative calculation cpdf = np.cumsum(cpdf * dx, axis=iprime) # Return the cumulative PDF return cpdf
[docs] def calculate_probability_contour(pdf, axes, pvals, axis=0): """Calculates the probability level below which the PDF integrates to pval. input: ------ pdf : a conditional PDF from fastKDE axes : a list of axis values returned from fastKDE pvals : the integrated amount of PDF that should be outside a contour of constant PDF (scalar or array-like) output: ------- an array of PDF values for which the integral of the PDF outside the PDF value should integrate to pval. Each value in the array corresponds to a value within axes[axis] (nominally, axis should be the variable on which the PDF is conditioned). """ # calculate the dx values diff_axes = [] for i in range(len(axes)): if i != axis: dx = np.diff(axes[i]) dx = np.concatenate((dx, [dx[-1]])) diff_axes.append(dx) # create a meshgrid of dx values dxmesh = np.meshgrid(*diff_axes) # create the dx product dxgrid = np.prod(dxmesh, axis=0) # broadcast the dxgrid to the shape of the PDF broadcast_shape = len(axes) * [slice(None, None, None)] broadcast_shape[axis] = np.newaxis dxgrid = dxgrid[tuple(broadcast_shape[::-1])] # multiply the pdf by dx dxgrid * pdf # reshape the PDF to unravel the summation dimensions new_shape = [-1, len(axes[axis])] pdf_unraveled = np.reshape(pdf, new_shape) # get the indices to sort by probability isort_inds = np.ma.argsort(pdf_unraveled, axis=0) # generate an index array suitable for use in a numpy array isort = ( [np.array(i) for i in zip(*isort_inds.T)], np.arange(len(axes[axis]), dtype=int)[np.newaxis, :], ) # sort by probability pdf_dx_unraveled = np.reshape(pdf * dxgrid, new_shape)[isort] # take the cumulative sum pdf_csum = np.ma.cumsum(pdf_dx_unraveled, axis=0) # find the location along each axis nearest to the given PDF value ind_closest = np.ma.argmin(abs(pdf_csum - pvals), axis=0) ind_closest = (ind_closest, np.arange(len(axes[axis]), dtype=int)) # get the probability value at that location pdf_vals = np.reshape(pdf, new_shape)[isort][ind_closest] # return the value return pdf_vals
[docs] def pair_plot( var_list, conditional=False, fig_size=None, var_names=None, draw_scatter=None, axis_limits=None, log_scale=False, auto_show=True, cdf_levels=[0.1, 0.25, 0.5, 0.75, 0.9], axis_lengths=None, ): """Generate a multivariate pair plot of the input data. input: ------ var_list : a list of input variables (all must be vectors of the same length) with at least two variables conditional : flags whether to plot the bivariate PDFs as conditionals. var_names : a list of strings corresponding to variable names fig_size : The size of the figure (if none, it is set to 10,10) draw_scatter : flag whether to draw scatter plots on top of the PDFs. If None, this turns on/off automatically, depending on the number of points (1000 is the upper cutoff) axis_limits : limits for all variables (applies to all axes) auto_show : flags whether to automatically call PP.show() (no values are returned if this flag is on) cdf_levels : a set of CDF levels (betwen 0 and 1 exclusive) for which to plot PDF contours (i.e., a value of 0.5 means that 50% of the PDF falls outside the corresponding contour of constant PDF that will be plotted). If None is given, matplotlib will choose PDF levels. log_scale : flags whether to use the log_axes argument for fastKDE. If a single bool value, it applies to all variables; if a list, each item in the list corresponds to a variable axis_lengths : The length of axis variables. If axis_lengths is None, then fastKDE automatically sets axis lengths output: ------- **NOTE**: no values are returned if auto_show is True fig, axs, marginal_vals, marginal_pdfs, bivariate_pdfs fig, axs : a matplotlib figure and axis matrix object marginal_vals : the values at which the PDFs (both marginal and bivariate) are defined marginal_pdfs : the marginal PDFs of the input variables bivariate_pdfs : the bivariate PDFs """ # convert the list to an array try: input_var_array = np.array(var_list) assert len(input_var_array) != 0, "var_list must be a non-empty list" except TypeError or AssertionError: raise ValueError( "var_list could not be converted to a numpy array; are all vectors " "of the same length?" ) # get the array rank rank = len(input_var_array.shape) if rank != 2: raise ValueError("input variables in var_list must be vectors") # get the array shape num_vars = input_var_array.shape[0] if num_vars < 2: raise ValueError("pair_plot requires at least two variables in var_list") if log_scale is False or log_scale is True: log_scale = num_vars * [log_scale] marginal_pdfs = num_vars * [None] marginal_vals = num_vars * [None] var_limits = num_vars * [None] if draw_scatter is None: if input_var_array.shape[1] > 1000: draw_scatter = False else: draw_scatter = True # calculate the marginals and define limits for n in range(num_vars): # calculate PDFs marginal_pdfs[n], marginal_vals[n] = fastKDE.pdf( input_var_array[n, :], log_axes=log_scale[n], use_xarray=False, ) # define axis limits if axis_limits is None: var_limits[n] = [input_var_array[n, :].min(), input_var_array[n, :].max()] else: var_limits[n] = axis_limits bivariate_pdfs = {} # calculate the bivariate PDFs for n1 in range(num_vars): for n2 in range(n1, num_vars): if n1 != n2: if not conditional: bivariate_pdfs[n1, n2], _ = fastKDE.pdf( input_var_array[n1, :], input_var_array[n2, :], log_axes=[log_scale[n1], log_scale[n2]], use_xarray=False, ) else: bivariate_pdfs[n1, n2], _ = fastKDE.conditional( input_var_array[n2, :], input_var_array[n1, :], log_axes=[log_scale[n1], log_scale[n2]], use_xarray=False, ) bivariate_pdfs[n2, n1], _ = fastKDE.conditional( input_var_array[n1, :], input_var_array[n2, :], log_axes=[log_scale[n2], log_scale[n1]], use_xarray=False, ) # set variable labels if needed if var_names is None: var_names = ["var {}".format(n) for n in range(num_vars)] # set the figure size if needed if fig_size is None: fig_size = (10, 10) # create the figure fig, axs = PP.subplots(num_vars, num_vars, figsize=fig_size) # plot the marginals for n in range(num_vars): axs[n, n].plot(marginal_vals[n], marginal_pdfs[n]) axs[n, n].set_xlim(var_limits[n]) axs[n, n].tick_params(labelleft=False) if n < num_vars - 1: axs[n, n].tick_params(labelbottom=False) else: axs[n, n].set_xlabel(var_names[n]) if log_scale[n]: axs[n, n].set_xscale("log") # plot the bivariate PDFs for n1 in range(num_vars): for n2 in range(n1, num_vars): if n1 != n2: if not conditional: # get the PDF levels if cdf_levels is None: levels = None else: levels = np.sort( np.array( [ calculate_probability_contour( bivariate_pdfs[n1, n2][..., np.newaxis], [[0], marginal_vals[n1], marginal_vals[n2]], c, ) for c in cdf_levels ] ) ).squeeze() # plot the PDF pdf_to_plot = bivariate_pdfs[n1, n2] if any([log_scale[n1], log_scale[n2]]): pdf_to_plot = np.ma.log( np.ma.masked_less_equal(bivariate_pdfs[n1, n2], 0) ) levels = np.log(levels) else: levels = cdf_levels pdf_to_plot = cumulative_integral( bivariate_pdfs[n1, n2], [marginal_vals[n1], marginal_vals[n2]], integration_axes=1, ) # mask CDF parts that don't normalize to something close to 1 pdf_to_plot = np.ma.masked_where( np.logical_not( np.isclose( np.ma.ones(pdf_to_plot.shape) * pdf_to_plot[-1, :][np.newaxis, :], 1.0, ) ), pdf_to_plot, ) axs[n2, n1].contour( marginal_vals[n1], marginal_vals[n2], pdf_to_plot, levels=levels ) if log_scale[n1]: axs[n2, n1].set_xscale("log") if log_scale[n2]: axs[n2, n1].set_yscale("log") # plot the scatter if draw_scatter: axs[n2, n1].plot( input_var_array[n1, :], input_var_array[n2, :], "k.", alpha=0.3 ) # set axis limits axs[n2, n1].set_xlim(var_limits[n1]) axs[n2, n1].set_ylim(var_limits[n2]) if not conditional: # turn of axes for the other part of the triangle axs[n1, n2].axis("off") else: levels = cdf_levels pdf_to_plot = cumulative_integral( bivariate_pdfs[n2, n1], [marginal_vals[n2], marginal_vals[n1]], integration_axes=1, ) # mask CDF parts that don't normalize to something close to # 1 pdf_to_plot = np.ma.masked_where( np.logical_not( np.isclose( np.ma.ones(pdf_to_plot.shape) * pdf_to_plot[-1, :][np.newaxis, :], 1.0, ) ), pdf_to_plot, ) axs[n1, n2].contour( marginal_vals[n2], marginal_vals[n1], pdf_to_plot, levels=levels ) if log_scale[n2]: axs[n1, n2].set_xscale("log") if log_scale[n1]: axs[n1, n2].set_yscale("log") # plot the scatter if draw_scatter: axs[n1, n2].plot( input_var_array[n2, :], input_var_array[n1, :], "k.", alpha=0.3, ) # set axis limits axs[n1, n2].set_xlim(var_limits[n2]) axs[n1, n2].set_ylim(var_limits[n1]) # turn off axis limits and set axis labels for n1 in range(num_vars): for n2 in range(n1, num_vars): if n1 != n2: if n1 == 0: axs[n2, n1].set_ylabel(var_names[n2]) axs[n2, n1].tick_params(labelleft=True) else: axs[n2, n1].tick_params(labelleft=False) if n2 == num_vars - 1: axs[n2, n1].set_xlabel(var_names[n1]) axs[n2, n1].tick_params(labelbottom=True) else: axs[n2, n1].tick_params(labelbottom=False) if conditional: axs[n1, n2].tick_params(labelleft=False) axs[n1, n2].tick_params(labelbottom=False) if n2 == num_vars - 1: axs[n1, n2].tick_params(labelright=True) if auto_show: PP.show() return return fig, axs, marginal_vals, marginal_pdfs, bivariate_pdfs