Source code for huracanpy.calc._translation

"""
Utils related to translation distance and time
"""

from datetime import timedelta
import shapely
import warnings

from cartopy.crs import Geodetic, Orthographic
from haversine import haversine_vector
from metpy.units import units
from metpy.xarray import preprocess_and_wrap
import nvector
import numpy as np
import pandas as pd
from pint.errors import UnitStrippedWarning
import pyproj


from ._rates import delta, _dummy_track_id, _align_array
from .._metpy import dequantify_results


def _get_distance_azimuth_geod(lon1, lat1, lon2, lat2, ellps="WGS84"):
    # initialize Geod object
    geodesic = pyproj.Geod(ellps=ellps)

    # Compute distance for all data
    with warnings.catch_warnings():
        warnings.filterwarnings(
            "ignore",
            category=UnitStrippedWarning,
            message="The unit of the quantity is stripped when downcasting to ndarray.",
        )
        fwd_azimuth, back_azimuth, dist = geodesic.inv(lon1, lat1, lon2, lat2)

    return dist * units("m"), fwd_azimuth * units("degrees")


def _get_distance_haversine(lon1, lat1, lon2, lat2):
    # Convert longitudes beyond 180° (necessary for haversine to work)
    lon1 = ((lon1 + 180) % 360) - 180
    lon2 = ((lon2 + 180) % 360) - 180

    with warnings.catch_warnings():
        warnings.filterwarnings(
            "ignore",
            category=UnitStrippedWarning,
            message="The unit of the quantity is stripped when downcasting to ndarray.",
        )
        yx1 = np.asarray([lat1, lon1]).T
        yx2 = np.asarray([lat2, lon2]).T

    return haversine_vector(yx1, yx2, unit="m") * units("m")


[docs] @dequantify_results @preprocess_and_wrap(wrap_like="lon") def azimuth(lon, lat, track_id=None, ellps="WGS84", centering="forward"): """Compute azimuth between points using geodesic calculation. Parameters ---------- lon, lat : xarray.DataArray Longitude and latitude points track_id : array_like, optional Track ID at each point ellps : str, optional The definition of the globe to use for the geodesic calculation (see `pyproj.Geod`). Default is "WGS84". centering: str, optional - "forward" gives the angle based on the track point and the following track point. The last point of each track will be NaN - "backward" gives the angle based on the track point and the previous track point. The first point of each track will be NaN - "centre" gives the angle based on the centred difference of track points. The first and last points of each track will be NaN - "adaptive" gives the same as centred, but fills in the first point of each track with the forward difference, and the last point of each track with the backward difference Returns ------- xarray.DataArray Azimuth in degrees. 0° corresponds to northward (or stagnating); 90° corresponds to eastward; 180° corresponds to southward; -90° corresponds to westwards. """ if track_id is None: track_id = _dummy_track_id(lon) # Compute azimuth _, fwd_azimuth = _get_distance_azimuth_geod( lon[:-1], lat[:-1], lon[1:], lat[1:], ellps=ellps ) if centering in ["forward", "backward"]: return _align_array(fwd_azimuth, track_id, centering) else: # Compute angle in steps of two _, centred_azimuth = _get_distance_azimuth_geod( lon[:-2], lat[:-2], lon[2:], lat[2:], ellps=ellps ) centred_azimuth[track_id[2:] != track_id[:-2]] = np.nan * centred_azimuth[0] return _align_array(fwd_azimuth, track_id, centering, centred_azimuth)
[docs] @dequantify_results @preprocess_and_wrap(wrap_like="lon") def distance( lon, lat, *args, track_id=None, method="geod", ellps="WGS84", centering="forward" ): """Compute distance between longitude/latitude coordinates using geodesic or haversine calculation >>> distance(lon, lat, track_id) Computes the distance between successive lon, lat points, without including differences between the end and start points of different tracks >>> distance(lon1, lat1, lon2, lat2) Computes the distance between each point in (lon1, lat1) and each point in (lon2, lat2) Parameters ---------- lon, lat : xarray.DataArray Longitude and latitude points *args : xarray.DataArray - 0 arguments. Leave empty to calculate distance between successive points - 1 argument, track_id. Same as 0 arguments but inserts NaNs where successive points are from different tracks - 2 arguments, lon and lat arrays. Calculate distances between two tracks, e.g. radius of maximum wind speed, using storm centre locations and maximum wind speed locations track_id : array_like, optional Track ID at each point method : str, optional The method of computing distances, either geodesic (`"geod"`/`"geodesic"`) or haversine (`"haversine"`) ellps : str, optional The definition of the globe to use for the geodesic calculation (see `pyproj.Geod`). Default is "WGS84". centering: str, optional - "forward" gives the distance based on the track point and the following track point. The last point of each track will be NaN - "backward" gives the distance based on the track point and the previous track point. The first point of each track will be NaN - "centre" gives the distance based on the centred difference of track points. The first and last points of each track will be NaN - "adaptive" gives the same as centred, but fills in the first point of each track with the forward difference, and the last point of each track with the backward difference Returns ------- xarray.DataArray """ # Curate input # If track_id is not provided, all points are considered to belong to the same track if len(args) < 2: lon1 = lon[:-1] lat1 = lat[:-1] lon2 = lon[1:] lat2 = lat[1:] if len(args) == 1: if track_id is None: track_id = args[0] else: raise ValueError( "Distance either takes 2 arrays (lon/lat) or 4 arrays 2x(lon/lat)" ) if track_id is None: track_id = _dummy_track_id(lon) elif len(args) == 2: lon1 = lon lat1 = lat lon2, lat2 = args else: raise ValueError( "Distance either takes 2 arrays (lon/lat) or 4 arrays 2x(lon/lat)" ) if method in ["geod", "geodesic"]: dist, _ = _get_distance_azimuth_geod(lon1, lat1, lon2, lat2, ellps=ellps) elif method == "haversine": dist = _get_distance_haversine(lon1, lat1, lon2, lat2) else: raise ValueError( f"Method {method} for distance calculation not recognised, use one of" f"('geod', 'haversine')" ) if len(args) < 2 and track_id is not None: dist = _align_array(dist, track_id, centering) return dist
[docs] @dequantify_results @preprocess_and_wrap(wrap_like="lon") def translation_speed( lon, lat, time, track_id=None, method="geod", ellps="WGS84", centering="forward" ): """ Compute translation speed along tracks Parameters ---------- lon, lat : xarray.DataArray Longitude and latitude points time : xarray.DataArray Time for each point track_id : array_like, optional Track ID at each points method : str, optional The method of computing distances, either geodesic (`"geod"`) or haversine (`"haversine"`) ellps : str, optional The definition of the globe to use for the geodesic calculation (see `pyproj.Geod`). Default is "WGS84". centering: str, optional - "forward" gives the speed based on the track point and the following track point. The last point of each track will be NaN - "backward" gives the speed based on the track point and the previous track point. The first point of each track will be NaN - "centre" gives the speed based on the centred difference of track points. The first and last points of each track will be NaN - "adaptive" gives the same as centred, but fills in the first point of each track with the forward difference, and the last point of each track with the backward difference Returns ------- xarray.DataArray """ # Curate input # If track_id is not provided, all points are considered to belong to the same track if track_id is None: track_id = _dummy_track_id(lon) # Distance between each points dx = distance( lon, lat, track_id=track_id, method=method, ellps=ellps, centering=centering ) # time between each points dt = delta(time, track_id, centering=centering) return dx / dt
[docs] @dequantify_results @preprocess_and_wrap(wrap_like="lon") def corral_radius(lon, lat, time=None, track_id=None, *, window=None, min_points=None): """Find the minimum radius encircling a set of points By default, calling corral radius with a set of lons/lats will return the corral radius for all these points. The returned array with have the same length as lons/lats, but the same value for all points >>> corral_radius(lons, lats) If you also pass a track_id, the corral radius is calculated separately for each unique track. The returned array still has the same length as lons/lats so the value for each track is repeated >>> corral_radius(lons, lats, track_id=track_id) Passing a time and window will calculate the corral radius in a rolling window. The window is by default in hours (you can explicitly pass a datetime.timedelta to use a more specific window). The code below will calculate the corral radius for each lon/lat include the lons/lats withing +/- 36 hours. Points where the window is outside the times are given NaNs >>> corral_radius(lons, lats, time=time, window=36) Including both a track_id and a time/window will ensure the corral radius is calculated separately for each track, leaving NaNs at the start and end of each track where the time window is outside the track times >>> corral_radius(lons, lats, time=time, track_id=track_id, window=36) Parameters ---------- lon, lat : array_like Longitude and latitude points time : array_like, optional Time at each point track_id : array_like, optional Track ID at each point window : scalar or datetime.timedelta, optional Half-width of the window. i.e. include all times within +/- window min_points : int, optional Minimum number of points required in the window. Returns ------- np.ndarray Corral radii in metres (NaN if not computed) """ if track_id is None: track_id = _dummy_track_id(lon) # Convert time to pandas # Comparison below fails when using xarray. It seems to interpret a numpy timedelta # of zero as an integer and throws a TypeError if time is not None: with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UnitStrippedWarning) time = pd.to_datetime(time) if np.isscalar(window): window = timedelta(hours=window) corral_radii = np.full(len(lon), np.nan) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UnitStrippedWarning) track_id, indices, counts = np.unique( np.asarray(track_id), return_index=True, return_counts=True ) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=UnitStrippedWarning) for idx, count in zip(indices, counts): if time is not None and window is not None: times = time[idx : idx + count] for n, centre_time in enumerate(times): if (centre_time - times[0] >= window) and ( times[-1] - centre_time >= window ): subset = np.where(np.abs(times - centre_time) <= window)[0] if min_points is None or len(subset) >= min_points: radius = _make_circle( np.asarray(lon[idx + subset]), np.asarray(lat[idx + subset]), ) corral_radii[idx + n] = radius # else: leave as np.nan else: corral_radii[idx : idx + count] = _make_circle( np.asarray(lon[idx : idx + count]), np.asarray(lat[idx : idx + count]), ) return corral_radii * units("m")
def _make_circle(lons, lats): xyz = _latlon_to_xy(lons, lats) line = shapely.LineString(xyz[:, :2]) return shapely.minimum_bounding_radius(line) def _latlon_to_xy(lon, lat): # Convert lat/lon to x/y using Orthographic projection for small areas # For more accuracy, use spherical geometry, but for <2000km, this is fine # Center for projection lon0, lat0 = _centre_point(lon, lat) projection = Orthographic(central_longitude=lon0, central_latitude=lat0) return projection.transform_points(Geodetic(), lon, lat) def _centre_point(lons, lats): points = nvector.GeoPoint.from_degrees(longitude=lons, latitude=lats) centre = points.to_nvector().mean().to_geo_point() return centre.longitude_deg, centre.latitude_deg