Source code for sleap.info.summary

"""
Module for getting a series which gives some statistic based on labeling
data for each frame of some labeled video.
"""

import attr
import numpy as np

from typing import Callable, Dict

from sleap.io.dataset import Labels
from sleap.io.video import Video


[docs]@attr.s(auto_attribs=True) class StatisticSeries: """ Class to calculate various statistical series for labeled frames. Each method returns a series which is a dictionary in which keys are frame index and value are some numerical value for the frame. Args: labels: The :class:`Labels` for which to calculate series. """ labels: Labels
[docs] def get_point_count_series(self, video: Video) -> Dict[int, float]: """Get series with total number of labeled points in each frame.""" series = dict() for lf in self.labels.find(video): val = sum(len(inst.points) for inst in lf if hasattr(inst, "score")) series[lf.frame_idx] = val return series
[docs] def get_point_score_series( self, video: Video, reduction: str = "sum" ) -> Dict[int, float]: """Get series with statistic of point scores in each frame. Args: video: The :class:`Video` for which to calculate statistic. reduction: name of function applied to scores: * sum * min Returns: The series dictionary (see class docs for details) """ reduce_funct = dict(sum=sum, min=lambda x: min(x, default=0))[reduction] series = dict() for lf in self.labels.find(video): val = reduce_funct( point.score for inst in lf for point in inst.points if hasattr(inst, "score") ) series[lf.frame_idx] = val return series
[docs] def get_instance_score_series(self, video, reduction="sum") -> Dict[int, float]: """Get series with statistic of instance scores in each frame. Args: video: The :class:`Video` for which to calculate statistic. reduction: name of function applied to scores: * sum * min Returns: The series dictionary (see class docs for details) """ reduce_funct = dict(sum=sum, min=lambda x: min(x, default=0))[reduction] series = dict() for lf in self.labels.find(video): val = reduce_funct(inst.score for inst in lf if hasattr(inst, "score")) series[lf.frame_idx] = val return series
[docs] def get_point_displacement_series(self, video, reduction="sum") -> Dict[int, float]: """ Get series with statistic of point displacement in each frame. Point displacement is the distance between the point location in frame and the location of the corresponding point (same node, same track) from the closest earlier labeled frame. Args: video: The :class:`Video` for which to calculate statistic. reduction: name of function applied to point scores: * sum * mean * max Returns: The series dictionary (see class docs for details) """ reduce_funct = dict(sum=np.sum, mean=np.nanmean, max=np.max)[reduction] series = dict() last_lf = None for lf in self.labels.find(video): val = self._calculate_frame_velocity(lf, last_lf, reduce_funct) last_lf = lf if not np.isnan(val): series[lf.frame_idx] = val # len(lf.instances) return series
[docs] def get_primary_point_displacement_series( self, video, reduction="sum", primary_node=None ): """ Get sum of displacement for single node of each instance per frame. Args: video: The :class:`Video` for which to calculate statistic. reduction: name of function applied to point scores: * sum * mean * max primary_node: The node for which we'll calculate displacement. This can be name of node or `Node` object. If not specified, then defaults to first node. Returns: The series dictionary (see class docs for details) """ reduce_funct = dict(sum=np.sum, mean=np.nanmean, max=np.max)[reduction] track_count = self.labels.get_track_count(video) try: primary_node_idx = self.labels.skeletons[0].node_to_index(primary_node) except ValueError: print(f"Unable to locate node {primary_node} so using node 0") primary_node_idx = 0 last_frame_idx = video.num_frames - 1 location_matrix = np.full( (last_frame_idx + 1, track_count, 2), np.nan, dtype=np.float ) last_track_pos = np.full((track_count, 2), 0, dtype=np.float) has_seen_track_idx = set() for frame_idx in range(last_frame_idx + 1): lfs = self.labels.find(video, frame_idx) # Start by setting all track positions to where they were last, # so that we won't get "jumps" when an instance is missing for # some frames. location_matrix[frame_idx] = last_track_pos # Now update any positions we do have for the frame if lfs: lf = lfs[0] for inst in lf.instances: if inst.track is not None: track_idx = self.labels.tracks.index(inst.track) if track_idx < track_count: point = inst.points_array[primary_node_idx, :2] location_matrix[frame_idx, track_idx] = point if not np.all(np.isnan(point)): # Keep track of where this track was last. last_track_pos[track_idx] = point # If this is the first time we've seen this # track, then use initial location for all # previous frames so first occurrence doesn't # have high displacement. if track_idx not in has_seen_track_idx: location_matrix[:frame_idx, track_idx] = point has_seen_track_idx.add(track_idx) # Calculate the displacements. Note these will be offset by 1 frame # since we're starting from frame 1 rather than 0. displacement = location_matrix[1:, ...] - location_matrix[:-1, ...] displacement_distances = np.linalg.norm(displacement, axis=2) result = reduce_funct(displacement_distances, axis=1) result[np.isnan(result)] = 0 # Shift back by 1 frame so offsets line up with frame index. result[1:] = result[:-1] return result
def get_min_centroid_proximity_series(self, video): series = dict() def min_centroid_dist(instances): if len(instances) < 2: return np.nan # centroids for all instances in frame centroids = np.array([inst.centroid for inst in instances]) # calculate distance between each pair of instance centroids distances = np.linalg.norm( centroids[np.newaxis, :, :] - centroids[:, np.newaxis, :], axis=-1 ) # clear distance from each instance to itself np.fill_diagonal(distances, np.nan) # return the min return np.nanmin(distances) for lf in self.labels.find(video): val = min_centroid_dist(lf.instances) if not np.isnan(val): series[lf.frame_idx] = val return series @staticmethod def _calculate_frame_velocity( lf: "LabeledFrame", last_lf: "LabeledFrame", reduce_function: Callable ) -> float: """ Calculate total point displacement between two given frames. Args: lf: The :class:`LabeledFrame` for which we want velocity last_lf: The frame from which to calculate displacement. reduce_function: Numpy function (e.g., np.sum, np.nanmean) is applied to *point* displacement, and then those instance values are summed for the whole frame. Returns: The total velocity for instances in frame. """ val = 0 for inst in lf: if last_lf is not None: last_inst = last_lf.find(track=inst.track) if last_inst: points_a = inst.points_array points_b = last_inst[0].points_array point_dist = np.linalg.norm(points_a - points_b, axis=1) inst_dist = reduce_function(point_dist) val += inst_dist if not np.isnan(inst_dist) else 0 return val