"""Matching functions"""
from itertools import combinations, groupby
import warnings
import numpy as np
import pandas as pd
from pint.errors import UnitStrippedWarning
from ..info import timestep
from ..calc import distance
[docs]
def match(
tracksets,
names=None,
max_dist=300,
mean_dist=None,
min_overlap=0,
consecutive_overlap=False,
tracks1_is_ref=False,
distance_method="haversine",
):
"""
Match the provided track sets between them.
Parameters
----------
tracksets : list[xarray.Dataset]
list of track datasets to match together. Must be of length two or more.
names : list, optional
list of track datasets names. Must be the same size as tracksets. The default is
['1','2', ...].
max_dist : float, optional
Threshold for maximum distance between two tracks. The default is 300.
mean_dist : float, optional
Threshold for mean distance between two tracks, taken over all timesteps that
include both tracks.
min_overlap : int, optional
Minimum number of overlapping time steps for matching. The default is 0.
consecutive_overlap: bool, optional
If min_overlap > 1, require that min_overlap points also need to be consective
tracks1_is_ref: bool, optional
If True, treat the first set of tracks as the reference set. If one track
matches multiple reference tracks, only keep the longest match
distance_method: str, optional
The method to use to calculate distance between track points.
One of "haversine", "geod"
Returns
-------
pandas.DataFrame
Dataframe containing the matching tracks with
the id from corresponding datasets
the number of matching time steps (if only two datasets provided)
the distance between two tracks (if only two datasets provided)
"""
# Check input
if len(tracksets) < 2:
raise ValueError("You must provide at least two datasets to match")
if names is None:
names = [str(n) for n in range(1, len(tracksets) + 1)]
if len(names) != len(tracksets):
raise ValueError(
"Number of names provided do not correspond to the number of track sets"
)
# Two track sets
if len(tracksets) == 2:
return _match_pair(
*tracksets,
*names,
max_dist=max_dist,
mean_dist=mean_dist,
min_overlap=min_overlap,
consecutive_overlap=consecutive_overlap,
tracks1_is_ref=tracks1_is_ref,
distance_method=distance_method,
)
# More than two track sets
else:
return _match_multiple(
tracksets,
names,
max_dist=max_dist,
mean_dist=mean_dist,
min_overlap=min_overlap,
consecutive_overlap=consecutive_overlap,
tracks1_is_ref=tracks1_is_ref,
distance_method=distance_method,
)
def _match_pair(
tracks1,
tracks2,
name1="1",
name2="2",
max_dist=300,
mean_dist=None,
min_overlap=0,
consecutive_overlap=False,
tracks1_is_ref=False,
distance_method="haversine",
):
# Prepare data
tracks1, tracks2 = (
tracks1[["track_id", "lon", "lat", "time"]].to_dataframe(),
tracks2[["track_id", "lon", "lat", "time"]].to_dataframe(),
)
tracks1["lon"] = np.where(tracks1.lon > 180, tracks1.lon - 360, tracks1.lon)
tracks2["lon"] = np.where(tracks2.lon > 180, tracks2.lon - 360, tracks2.lon)
# Find corresponding points (same time step, less than max_dist km)
merged = pd.merge(tracks1, tracks2, on="time").drop_duplicates()
if len(merged) == 0:
# if there exist no matching points, return empty dataframe
return pd.DataFrame(columns=["id_" + name1, "id_" + name2, "temp", "dist"])
# if there exist matching points, continue
# Calculate the distance between all points paired in time and subset points
# that are within the threshold distance specified
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UnitStrippedWarning)
merged["dist"] = (
distance(
merged.lon_x,
merged.lat_x,
merged.lon_y,
merged.lat_y,
method=distance_method,
)
* 1e-3
)
# Only consider points within the maximum distance
if max_dist is not None:
merged = merged[merged.dist <= max_dist]
# Precompute groupby
track_groups = merged.groupby(["track_id_x", "track_id_y"])
# Compute temporal overlap
temp = track_groups[["dist"]].count().rename(columns={"dist": "temp"})
# Only considering tracks within the mean distance
if mean_dist is not None:
temp = temp[track_groups[["dist"]].mean().dist < mean_dist]
if min_overlap >= 2:
if consecutive_overlap:
# Replace temporal overlap with longest consecutive set of merged points
# for each track
dt = timestep(tracks1.time, tracks1.track_id)
for index, _ in temp.iterrows():
track = track_groups.get_group(index)
if len(track) >= min_overlap:
nconsecutive = (
max(
[
sum([1 for _ in grouper]) if value else 0
for value, grouper in groupby(np.diff(track.time) == dt)
]
)
+ 1
)
temp.loc[index].temp = nconsecutive
# Subset by tracks sharing min_overlap points
temp = temp[temp.temp >= min_overlap]
# if there exist no matching points, return empty dataframe
if len(temp) == 0:
return pd.DataFrame(columns=["id_" + name1, "id_" + name2, "temp", "dist"])
# Build a table of all pairs of tracks still present in temp
matches = (
merged[["track_id_x", "track_id_y"]]
.drop_duplicates()
.join(temp, on=["track_id_x", "track_id_y"])
).dropna()
# Add average distance to the output
dist = track_groups[["dist"]].mean()
matches = matches.merge(dist, on=["track_id_x", "track_id_y"])
# Treat duplicates if required
if tracks1_is_ref:
# Treat the duplicates where one tracks2 track has several corresponding
# tracks1:
# Keep the couple with the longest overlap
matches = (
matches.sort_values("temp", ascending=False)
.groupby("track_id_y")
.first()
.reset_index()
)
# Rename columns before output
matches = matches.rename(
columns={"track_id_x": "id_" + name1, "track_id_y": "id_" + name2}
)
return matches
def _match_multiple(
datasets,
names,
max_dist=300,
mean_dist=None,
min_overlap=0,
consecutive_overlap=False,
tracks1_is_ref=False,
distance_method="haversine",
):
"""
match any number of tracks sets
"""
matches = pd.DataFrame(columns=["id_" + n for n in names[:2]])
for names_pair, dataset_pair in zip(
combinations(names, 2), combinations(datasets, 2)
):
m = _match_pair(
*dataset_pair,
*names_pair,
max_dist=max_dist,
mean_dist=mean_dist,
min_overlap=min_overlap,
consecutive_overlap=consecutive_overlap,
tracks1_is_ref=tracks1_is_ref * (names_pair[0] == names[0]),
distance_method=distance_method,
)
if len(m) == 0:
raise NotImplementedError(
"For the moment, the case where two datasets have no match is not"
"handled. Problem raised by datasets " + str(names_pair) # TODO
)
matches = matches.merge(
m[["id_" + names_pair[0], "id_" + names_pair[1]]], how="outer"
)
return matches
# TODO: Deal with duplicates: merge, max...?