Source code for huracanpy.calc._rates

"""
Module containing functions to compute rates.
"""

import warnings

import numpy as np
from metpy.units import units
from metpy.xarray import preprocess_and_wrap

from .._metpy import dequantify_results


def _dummy_track_id(var):
    warnings.warn(
        "track_id is not provided, all points are considered to come from the sametrack"
    )
    return np.zeros(np.shape(var))


[docs] @dequantify_results @preprocess_and_wrap(wrap_like="var") def delta(var, track_ids=None, centering="forward"): """Take the differences across var, without including differences between the end and start of different tracks Parameters ---------- var : xarray.DataArray The variable to calculate the delta from track_ids : array_like, optional Track ID for each point centering: str, optional - "forward" gives the delta based on the track point and the following track point. The last point of each track will be NaN - "backward" gives the delta based on the track point and the previous track point. The first point of each track will be NaN - "centre" gives the delta based on the centred difference of track points. The first and last points of each track will be NaN - "adaptive" gives the same as centred, but fills in the first point of each track with the forward difference, and the last point of each track with the backward difference Returns ------- xarray.DataArray """ # Curate input # If track_id is not provided, all points are considered to belong to the same track if track_ids is None: track_ids = _dummy_track_id(var) # Compute delta diff = var[1:] - var[:-1] # Apply centering diff = _align_array(diff, track_ids, centering) # Fix for timedeltas if np.issubdtype(diff.magnitude.dtype, np.timedelta64): diff = diff / np.timedelta64(1, "s") diff = diff.magnitude * units("s") return diff
[docs] @dequantify_results @preprocess_and_wrap(wrap_like="var") def rate(var, time, track_ids=None, centering="forward"): """Compute rate of change of var, without including differences between the end and start of different tracks Parameters ---------- var : xarray.DataArray The variable used to calculate the rate time : xarray.DataArray Time at each point track_ids : array_like, optional Track ID at each point centering: str, optional - "forward" gives the rate based on the track point and the following track point. The last point of each track will be NaN - "backward" gives the rate based on the track point and the previous track point. The first point of each track will be NaN - "centre" gives the rate based on the centred difference of track points. The first and last points of each track will be NaN - "adaptive" gives the same as centred, but fills in the first point of each track with the forward difference, and the last point of each track with the backward difference Returns ------- xarray.DataArray """ # Curate input # If track_id is not provided, all points are considered to belong to the same track if track_ids is None: track_ids = _dummy_track_id(var) ## Sort data by track_id and time # rate_var, track_ids, time = [a.sortby(time) for a in [rate_var, track_ids, time]] # rate_var, time, track_ids = [ # a.sortby(track_ids) for a in [rate_var, time, track_ids] # ] # Compute deltas dx = delta(var, track_ids, centering=centering) dt = delta(time, track_ids, centering=centering) return dx / dt
def _align_array(array, track_id, centering, centred=None): # Index n, where the track ID changes between n and n+1 # array is already a difference. So index n in array transition_points = np.where(track_id[1:] != track_id[:-1])[0] # Mask points where track_id changes # Multiplying np.nan by an array element gives us the correct type of nan for both # np.timedelta and pint.Quantity array[transition_points] = np.nan * array[0] if centering in ["centre", "center", "adaptive"]: if centred is None: centred = 0.5 * (array[1:] + array[:-1]) centred = np.concatenate([[np.nan * array[0]], centred, [np.nan * array[0]]]) if centering in ["centre", "center"]: return centred else: # Replace start/end points with forward and backward deltas centred[0] = array[0] centred[transition_points] = array[transition_points - 1] centred[transition_points + 1] = array[transition_points + 1] centred[-1] = array[-1] return centred elif centering == "forward": return np.concatenate([array, [np.nan * array[0]]]) elif centering == "backward": return np.concatenate([[np.nan * array[0]], array]) else: raise ValueError( f"Option align='{centering}' not recognised. Use one of" f" ['forward', 'backward', 'centre'/'center', 'adaptive']" )