"""
A SLEAP dataset collects labeled video frames, together with required metadata.
This contains labeled frame data (user annotations and/or predictions),
together with all the other data that is saved for a SLEAP project
(videos, skeletons, etc.).
The most convenient way to load SLEAP labels files is to use the high level loader:
> import sleap
> labels = sleap.load_file(filename)
The Labels class provides additional functionality for loading SLEAP labels files. To
load a labels dataset file from disk:
> labels = Labels.load_file(filename)
If you're opening a dataset file created on a different computer (or if you've
moved the video files), it's likely that the paths to the original videos will
not work. We automatically check for the videos in the same directory as the
labels file, but if the videos aren't there, you can tell `load_file` where
to search for the videos. There are various ways to do this:
> Labels.load_file(filename, single_path_to_search)
> Labels.load_file(filename, [path_a, path_b])
> Labels.load_file(filename, callback_function)
> Labels.load_file(filename, video_search=...)
The callback_function can be created via `make_video_callback()` and has the
option to make a callback with a GUI window so the user can locate the videos.
To save a labels dataset file, run:
> Labels.save_file(labels, filename)
If the filename has a supported extension (e.g., ".slp", ".h5", ".json") then
the file will be saved in the corresponding format. You can also specify the
default extension to use if none is provided in the filename.
"""
import itertools
import os
from collections import MutableSequence
from typing import (
Callable,
List,
Union,
Dict,
Optional,
Tuple,
Text,
Iterable,
Any,
Set,
)
import attr
import cattr
import h5py as h5
import numpy as np
try:
from typing import ForwardRef
except:
from typing import _ForwardRef as ForwardRef
from sleap.skeleton import Skeleton, Node
from sleap.instance import (
Instance,
LabeledFrame,
Track,
make_instance_cattr,
PredictedInstance,
)
from sleap.io import pathutils
from sleap.io.video import Video, ImgStoreVideo, HDF5Video
from sleap.gui.suggestions import SuggestionFrame
from sleap.gui.dialogs.missingfiles import MissingFilesDialog
from sleap.rangelist import RangeList
from sleap.util import uniquify, json_dumps
"""
The version number to put in the Labels JSON format.
"""
LABELS_JSON_FILE_VERSION = "2.0.0"
# For debugging, we can replace missing video files with a "dummy" video
USE_DUMMY_FOR_MISSING_VIDEOS = os.getenv("SLEAP_USE_DUMMY_VIDEOS", default="")
[docs]@attr.s(auto_attribs=True)
class LabelsDataCache:
"""Class for maintaining cache of data in labels dataset."""
labels: "Labels"
def __attrs_post_init__(self):
self.update()
[docs] def update(self, new_frame: Optional[LabeledFrame] = None):
"""Build (or rebuilds) various caches."""
# Data structures for caching
if new_frame is None:
self._lf_by_video = dict()
self._frame_idx_map = dict()
self._track_occupancy = dict()
self._frame_count_cache = dict()
for video in self.labels.videos:
self._lf_by_video[video] = [
lf for lf in self.labels if lf.video == video
]
self._frame_idx_map[video] = {
lf.frame_idx: lf for lf in self._lf_by_video[video]
}
self._track_occupancy[video] = self._make_track_occupancy(video)
else:
new_vid = new_frame.video
if new_vid not in self._lf_by_video:
self._lf_by_video[new_vid] = []
if new_vid not in self._frame_idx_map:
self._frame_idx_map[new_vid] = dict()
self._lf_by_video[new_vid].append(new_frame)
self._frame_idx_map[new_vid][new_frame.frame_idx] = new_frame
[docs] def find_frames(
self, video: Video, frame_idx: Optional[Union[int, Iterable[int]]] = None
) -> Optional[List[LabeledFrame]]:
"""Return list of LabeledFrames matching video/frame_idx, or None."""
if frame_idx is not None:
if video not in self._frame_idx_map:
return None
if isinstance(frame_idx, Iterable):
return [
self._frame_idx_map[video][idx]
for idx in frame_idx
if idx in self._frame_idx_map[video]
]
if frame_idx not in self._frame_idx_map[video]:
return None
return [self._frame_idx_map[video][frame_idx]]
else:
if video not in self._lf_by_video:
return None
return self._lf_by_video[video]
[docs] def find_fancy_frame_idxs(self, video, from_frame_idx, reverse):
"""Return a list of frame idxs, with optional start position/order."""
if video not in self._frame_idx_map:
return None
# Get sorted list of frame indexes for this video
frame_idxs = sorted(self._frame_idx_map[video].keys())
# Find the next frame index after (before) the specified frame
if not reverse:
next_frame_idx = min(
filter(lambda x: x > from_frame_idx, frame_idxs), default=frame_idxs[0]
)
else:
next_frame_idx = max(
filter(lambda x: x < from_frame_idx, frame_idxs), default=frame_idxs[-1]
)
cut_list_idx = frame_idxs.index(next_frame_idx)
# Shift list of frame indices to start with specified frame
frame_idxs = frame_idxs[cut_list_idx:] + frame_idxs[:cut_list_idx]
return frame_idxs
def _make_track_occupancy(self, video: Video) -> Dict[Video, RangeList]:
"""Build cached track occupancy data."""
frame_idx_map = self._frame_idx_map[video]
tracks = dict()
frame_idxs = sorted(frame_idx_map.keys())
for frame_idx in frame_idxs:
instances = frame_idx_map[frame_idx]
for instance in instances:
if instance.track not in tracks:
tracks[instance.track] = RangeList()
tracks[instance.track].add(frame_idx)
return tracks
[docs] def get_track_occupancy(self, video: Video, track: Track) -> RangeList:
"""Access track occupancy cache that adds video/track as needed."""
if video not in self._track_occupancy:
self._track_occupancy[video] = dict()
if track not in self._track_occupancy[video]:
self._track_occupancy[video][track] = RangeList()
return self._track_occupancy[video][track]
[docs] def get_video_track_occupancy(self, video: Video) -> Dict[Track, RangeList]:
"""Return track occupancy information for specified video."""
if video not in self._track_occupancy:
self._track_occupancy[video] = dict()
return self._track_occupancy[video]
[docs] def remove_frame(self, frame: LabeledFrame):
"""Remvoe frame and update cache as needed."""
self._lf_by_video[frame.video].remove(frame)
# We'll assume that there's only a single LabeledFrame for this video and
# frame_idx, and remove the frame_idx from the cache.
if frame.video in self._frame_idx_map:
if frame.frame_idx in self._frame_idx_map[frame.video]:
del self._frame_idx_map[frame.video][frame.frame_idx]
[docs] def remove_video(self, video: Video):
"""Remove video and update cache as needed."""
if video in self._lf_by_video:
del self._lf_by_video[video]
if video in self._frame_idx_map:
del self._frame_idx_map[video]
[docs] def track_swap(
self,
video: Video,
new_track: Track,
old_track: Optional[Track],
frame_range: tuple,
):
"""Swap tracks and update cache as needed."""
# Get ranges in track occupancy cache
_, within_old, _ = self.get_track_occupancy(video, old_track).cut_range(
frame_range
)
_, within_new, _ = self.get_track_occupancy(video, new_track).cut_range(
frame_range
)
if old_track is not None:
# Instances that didn't already have track can't be handled here.
# See track_set_instance for this case.
self._track_occupancy[video][old_track].remove(frame_range)
self._track_occupancy[video][new_track].remove(frame_range)
self._track_occupancy[video][old_track].insert_list(within_new)
self._track_occupancy[video][new_track].insert_list(within_old)
[docs] def add_track(self, video: Video, track: Track):
"""Add track a track to the labels."""
self._track_occupancy[video][track] = RangeList()
[docs] def add_instance(self, frame: LabeledFrame, instance: Instance):
"""Add an instance to the labels."""
if frame.video not in self._track_occupancy:
self._track_occupancy[frame.video] = dict()
# Add track in its not already present in labels
if instance.track not in self._track_occupancy[frame.video]:
self._track_occupancy[frame.video][instance.track] = RangeList()
self._track_occupancy[frame.video][instance.track].insert(
(frame.frame_idx, frame.frame_idx + 1)
)
self.update_counts_for_frame(frame)
[docs] def remove_instance(self, frame: LabeledFrame, instance: Instance):
"""Remove an instance and update the cache as needed."""
if instance.track not in self._track_occupancy[frame.video]:
return
# If this is only instance in track in frame, then remove frame from track.
if len(frame.find(track=instance.track)) == 1:
self._track_occupancy[frame.video][instance.track].remove(
(frame.frame_idx, frame.frame_idx + 1)
)
self.update_counts_for_frame(frame)
[docs] def get_frame_count(self, video: Optional[Video] = None, filter: Text = "") -> int:
"""Return (possibly cached) count of frames matching video/filter."""
if filter not in ("", "user", "predicted"):
raise ValueError(
f"Labels.get_labeled_frame_count() invalid filter: {filter}"
)
if video not in self._frame_count_cache:
self._frame_count_cache[video] = dict()
if self._frame_count_cache[video].get(filter, None) is None:
self._frame_count_cache[video][filter] = self.get_filtered_frame_idxs(
video, filter
)
return len(self._frame_count_cache[video][filter])
[docs] def get_filtered_frame_idxs(
self, video: Optional[Video] = None, filter: Text = ""
) -> Set[Tuple[int, int]]:
"""Return list of (video_idx, frame_idx) tuples matching video/filter."""
if filter == "":
filter_func = lambda lf: video is None or lf.video == video
elif filter == "user":
filter_func = (
lambda lf: (video is None or lf.video == video)
and lf.has_user_instances
)
elif filter == "predicted":
filter_func = (
lambda lf: (video is None or lf.video == video)
and lf.has_predicted_instances
)
else:
raise ValueError(f"Invalid filter: {filter}")
# Make a set of (video_idx, frame_idx) tuples.
# We'll use a set since it's faster to remove items, and we need the
# video_idx so that we count frames from distinct videos with the same
# frame index.
if video is not None:
video_idx = self.labels.videos.index(video)
return {(video_idx, lf.frame_idx) for lf in self.labels if filter_func(lf)}
return {
(self.labels.videos.index(lf.video), lf.frame_idx)
for lf in self.labels
if filter_func(lf)
}
[docs] def update_counts_for_frame(self, frame: LabeledFrame):
"""
Updated the cached count. Should be called after frame is modified.
"""
video = frame.video
if video is None or video not in self._frame_count_cache:
return
frame_idx = frame.frame_idx
video_idx = self.labels.videos.index(video)
# Update count of frames with user instances
if frame.has_user_instances:
self._add_count_cache(video, video_idx, frame_idx, "user")
else:
self._del_count_cache(video, video_idx, frame_idx, "user")
# Update count of frames with predicted instances
if frame.has_predicted_instances:
self._add_count_cache(video, video_idx, frame_idx, "predicted")
else:
self._del_count_cache(video, video_idx, frame_idx, "predicted")
# Update count of all labeled frames
if len(frame.instances):
self._add_count_cache(video, video_idx, frame_idx, "")
else:
self._del_count_cache(video, video_idx, frame_idx, "")
def _add_count_cache(self, video, video_idx, frame_idx, type_key: str):
idx_pair = (video_idx, frame_idx)
# Update count for this specific video
if type_key in self._frame_count_cache[video]:
self._frame_count_cache[video][type_key].add(idx_pair)
# Update total for all videos
if None in self._frame_count_cache:
if type_key in self._frame_count_cache[None]:
self._frame_count_cache[None][type_key].add(idx_pair)
def _del_count_cache(self, video, video_idx, frame_idx, type_key: str):
idx_pair = (video_idx, frame_idx)
# Update count for this specific video
if type_key in self._frame_count_cache[video]:
self._frame_count_cache[video][type_key].discard(idx_pair)
# Update total for all videos
if None in self._frame_count_cache:
if type_key in self._frame_count_cache[None]:
self._frame_count_cache[None][type_key].discard(idx_pair)
[docs]@attr.s(auto_attribs=True)
class Labels(MutableSequence):
"""
The :class:`Labels` class collects the data for a SLEAP project.
This class is front-end for all interactions with loading, writing,
and modifying these labels. The actual storage backend for the data
is mostly abstracted away from the main interface.
Attributes:
labeled_frames: A list of :class:`LabeledFrame` objects
videos: A list of :class:`Video` objects that these labels may or may
not reference. The video for every `LabeledFrame` will be
stored in `videos` attribute, but some videos in
this list may not have any associated labeled frames.
skeletons: A list of :class:`Skeleton` objects (again, that may or may
not be referenced by an :class:`Instance` in labeled frame).
tracks: A list of :class:`Track` that instances can belong to.
suggestions: List that stores "suggested" frames for
videos in project. These can be suggested frames for user
to label or suggested frames for user to review.
negative_anchors: Dictionary that stores center-points around
which to crop as negative samples when training.
Dictionary key is :class:`Video`, value is list of
(frame index, x, y) tuples.
"""
labeled_frames: List[LabeledFrame] = attr.ib(default=attr.Factory(list))
videos: List[Video] = attr.ib(default=attr.Factory(list))
skeletons: List[Skeleton] = attr.ib(default=attr.Factory(list))
nodes: List[Node] = attr.ib(default=attr.Factory(list))
tracks: List[Track] = attr.ib(default=attr.Factory(list))
suggestions: List["SuggestionFrame"] = attr.ib(default=attr.Factory(list))
negative_anchors: Dict[Video, list] = attr.ib(default=attr.Factory(dict))
provenance: Dict[Text, Union[str, int, float, bool]] = attr.ib(
default=attr.Factory(dict)
)
def __attrs_post_init__(self):
"""
Called by attrs after the class is instantiated.
This updates the top level contains (videos, skeletons, etc)
from data in the labeled frames, as well as various caches.
"""
# Add any videos/skeletons/nodes/tracks that are in labeled
# frames but not in the lists on our object
self._update_from_labels()
# Update caches used to find frames by frame index
self._cache = LabelsDataCache(self)
# Create a variable to store a temporary storage directory
# used when we unzip
self.__temp_dir = None
def _update_from_labels(self, merge: bool = False):
"""Updates top level attributes with data from labeled frames.
Args:
merge: If True, then update even if there's already data.
Returns:
None.
"""
# Add any videos that are present in the labels but
# missing from the video list
if merge or len(self.videos) == 0:
# find videos in labeled frames or suggestions
# that aren't yet in top level videos
lf_videos = {label.video for label in self.labels}
suggestion_videos = {sug.video for sug in self.suggestions}
new_videos = lf_videos.union(suggestion_videos) - set(self.videos)
# just add the new videos so we don't re-order current list
if len(new_videos):
self.videos.extend(list(new_videos))
# Ditto for skeletons
if merge or len(self.skeletons) == 0:
self.skeletons = list(
set(self.skeletons).union(
{
instance.skeleton
for label in self.labels
for instance in label.instances
}
)
)
# Ditto for nodes
if merge or len(self.nodes) == 0:
self.nodes = list(
set(self.nodes).union(
{node for skeleton in self.skeletons for node in skeleton.nodes}
)
)
# Ditto for tracks, a pattern is emerging here
if merge or len(self.tracks) == 0:
# Get tracks from any Instances or PredictedInstances
other_tracks = {
instance.track
for frame in self.labels
for instance in frame.instances
if instance.track
}
# Add tracks from any PredictedInstance referenced by instance
# This fixes things when there's a referenced PredictionInstance
# which is no longer in the frame.
other_tracks = other_tracks.union(
{
instance.from_predicted.track
for frame in self.labels
for instance in frame.instances
if instance.from_predicted and instance.from_predicted.track
}
)
# Get list of other tracks not already in track list
new_tracks = list(other_tracks - set(self.tracks))
# Sort the new tracks by spawned on and then name
new_tracks.sort(key=lambda t: (t.spawned_on, t.name))
self.tracks.extend(new_tracks)
def _update_containers(self, new_label: LabeledFrame):
""" Ensure that top-level containers are kept updated with new
instances of objects that come along with new labels. """
if new_label.video not in self.videos:
self.videos.append(new_label.video)
for skeleton in {instance.skeleton for instance in new_label}:
if skeleton not in self.skeletons:
self.skeletons.append(skeleton)
for node in skeleton.nodes:
if node not in self.nodes:
self.nodes.append(node)
# Add any new Tracks as well
for instance in new_label.instances:
if instance.track and instance.track not in self.tracks:
self.tracks.append(instance.track)
# Sort the tracks again
self.tracks.sort(key=lambda t: (t.spawned_on, t.name))
# Update cache datastructures
self._cache.update(new_label)
def update_cache(self):
self._cache.update()
# Below are convenience methods for working with Labels as list.
# Maybe we should just inherit from list? Maybe this class shouldn't
# exists since it is just a list really with some class methods. I
# think more stuff might appear in this class later down the line
# though.
@property
def labels(self):
"""Alias for labeled_frames."""
return self.labeled_frames
@property
def skeleton(self) -> Skeleton:
"""Return the skeleton if there is only a single skeleton in the labels."""
if len(self.skeletons) == 1:
return self.skeletons[0]
else:
raise ValueError(
"Labels.skeleton can only be used when there is only a single skeleton "
"saved in the labels. Use Labels.skeletons instead."
)
@property
def video(self) -> Video:
"""Return the video if there is only a single video in the labels."""
if len(self.videos) == 0:
raise ValueError("There are no videos in the labels.")
elif len(self.videos) == 1:
return self.videos[0]
else:
raise ValueError(
"Labels.video can only be used when there is only a single video saved "
"in the labels. Use Labels.videos instead."
)
@property
def has_missing_videos(self) -> bool:
"""Return True if any of the video files in the labels are missing."""
return any(video.is_missing for video in self.videos)
def __len__(self) -> int:
"""Return number of labeled frames."""
return len(self.labeled_frames)
[docs] def index(self, value) -> int:
"""Return index of labeled frame in list of labeled frames."""
return self.labeled_frames.index(value)
def __contains__(self, item) -> bool:
"""Check if object contains the given item.
Args:
item: The item to look for within `Labels`.
This can be :class:`LabeledFrame`,
:class:`Video`, :class:`Skeleton`,
:class:`Node`, or (:class:`Video`, frame idx) tuple.
Returns:
True if item is found.
"""
if isinstance(item, LabeledFrame):
return item in self.labeled_frames
elif isinstance(item, Video):
return item in self.videos
elif isinstance(item, Skeleton):
return item in self.skeletons
elif isinstance(item, Node):
return item in self.nodes
elif isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], Video):
if isinstance(item[1], int):
return self.find_first(*item) is not None
elif isinstance(item[1], np.integer):
return self.find_first(item[0], item[1].tolist()) is not None
raise ValueError("Item is not an object type contained in labels.")
def __getitem__(self, key, *args) -> Union[LabeledFrame, List[LabeledFrame]]:
"""Return labeled frames matching key.
Args:
key: Indexing argument to match against. If `key` is a `Video` or tuple of
`(Video, frame_index)`, frames that match the criteria will be searched
for. If a scalar, list, range or array of integers are provided, the
labels with those linear indices will be returned.
Raises:
KeyError: If the specified key could not be found.
Returns:
A list with the matching `LabeledFrame`s, or a single `LabeledFrame` if a
scalar key was provided.
"""
if len(args) > 0:
key = key + tuple(args)
if isinstance(key, int):
return self.labels.__getitem__(key)
elif isinstance(key, Video):
if key not in self.videos:
raise KeyError("Video not found in labels.")
return self.find(video=key)
elif isinstance(key, tuple) and len(key) == 2 and isinstance(key[0], Video):
if key[0] not in self.videos:
raise KeyError("Video not found in labels.")
if isinstance(key[1], int):
_hit = self.find_first(video=key[0], frame_idx=key[1])
if _hit is None:
raise KeyError(
f"No label found for specified video at frame {key[1]}."
)
return _hit
elif isinstance(key[1], (np.integer, np.ndarray)):
return self.__getitem__((key[0], key[1].tolist()))
elif isinstance(key[1], (list, range)):
return self.find(video=key[0], frame_idx=key[1])
else:
raise KeyError("Invalid label indexing arguments.")
elif isinstance(key, (list, range)):
return [self.__getitem__(i) for i in key]
elif isinstance(key, (np.integer, np.ndarray)):
return self.__getitem__(key.tolist())
else:
raise KeyError("Invalid label indexing arguments.")
def __setitem__(self, index, value: LabeledFrame):
"""Set labeled frame at given index."""
# TODO: Maybe we should remove this method altogether?
self.labeled_frames.__setitem__(index, value)
self._update_containers(value)
[docs] def insert(self, index, value: LabeledFrame):
"""Insert labeled frame at given index."""
if value in self or (value.video, value.frame_idx) in self:
return
self.labeled_frames.insert(index, value)
self._update_containers(value)
[docs] def append(self, value: LabeledFrame):
"""Add labeled frame to list of labeled frames."""
self.insert(len(self) + 1, value)
def __delitem__(self, key):
"""Remove labeled frame with given index."""
self.labeled_frames.remove(self.labeled_frames[key])
[docs] def remove(self, value: LabeledFrame):
"""Remove given labeled frame."""
self.remove_frame(value)
[docs] def remove_frame(self, lf: LabeledFrame, update_cache: bool = True):
"""Remove a given labeled frame.
Args:
lf: Labeled frame instance to remove.
update_cache: If True, update the internal frame cache. If False, cache
update can be postponed (useful when removing many frames).
"""
self.labeled_frames.remove(lf)
if update_cache:
self._cache.remove_frame(lf)
[docs] def remove_frames(self, lfs: List[LabeledFrame]):
"""Remove a list of frames from the labels.
Args:
lfs: A sequence of labeled frames to remove.
"""
to_remove = set(lfs)
self.labeled_frames = [lf for lf in self.labeled_frames if lf not in to_remove]
self.update_cache()
[docs] def find(
self,
video: Video,
frame_idx: Optional[Union[int, Iterable[int]]] = None,
return_new: bool = False,
) -> List[LabeledFrame]:
"""Search for labeled frames given video and/or frame index.
Args:
video: A :class:`Video` that is associated with the project.
frame_idx: The frame index (or indices) which we want to
find in the video. If a range is specified, we'll return
all frames with indices in that range. If not specific,
then we'll return all labeled frames for video.
return_new: Whether to return singleton of new and empty
:class:`LabeledFrame` if none is found in project.
Returns:
List of `LabeledFrame` objects that match the criteria.
Empty if no matches found, unless return_new is True,
in which case it contains a new `LabeledFrame` with
`video` and `frame_index` set.
"""
null_result = (
[LabeledFrame(video=video, frame_idx=frame_idx)] if return_new else []
)
result = self._cache.find_frames(video, frame_idx)
return null_result if result is None else result
[docs] def frames(self, video: Video, from_frame_idx: int = -1, reverse=False):
"""Return an iterator over all labeled frames in a video.
Args:
video: A :class:`Video` that is associated with the project.
from_frame_idx: The frame index from which we want to start.
Defaults to the first frame of video.
reverse: Whether to iterate over frames in reverse order.
Yields:
:class:`LabeledFrame`
"""
frame_idxs = self._cache.find_fancy_frame_idxs(video, from_frame_idx, reverse)
# Yield the frames
for idx in frame_idxs:
yield self._cache._frame_idx_map[video][idx]
[docs] def find_first(
self, video: Video, frame_idx: Optional[int] = None
) -> Optional[LabeledFrame]:
"""Find the first occurrence of a matching labeled frame.
Matches on frames for the given video and/or frame index.
Args:
video: a `Video` instance that is associated with the
labeled frames
frame_idx: an integer specifying the frame index within
the video
Returns:
First `LabeledFrame` that match the criteria
or None if none were found.
"""
if video in self.videos:
for label in self.labels:
if label.video == video and (
frame_idx is None or (label.frame_idx == frame_idx)
):
return label
[docs] def find_last(
self, video: Video, frame_idx: Optional[int] = None
) -> Optional[LabeledFrame]:
"""Find the last occurrence of a matching labeled frame.
Matches on frames for the given video and/or frame index.
Args:
video: a `Video` instance that is associated with the
labeled frames
frame_idx: an integer specifying the frame index within
the video
Returns:
Last `LabeledFrame` that match the criteria
or None if none were found.
"""
if video in self.videos:
for label in reversed(self.labels):
if label.video == video and (
frame_idx is None or (label.frame_idx == frame_idx)
):
return label
@property
def user_labeled_frames(self):
"""Return all labeled frames with user (non-predicted) instances."""
return [lf for lf in self.labeled_frames if lf.has_user_instances]
def get_labeled_frame_count(self, video: Optional[Video] = None, filter: Text = ""):
return self._cache.get_frame_count(video, filter)
[docs] def instance_count(self, video: Video, frame_idx: int) -> int:
"""Return number of instances matching video/frame index."""
count = 0
labeled_frame = self.find_first(video, frame_idx)
if labeled_frame is not None:
count = len(
[inst for inst in labeled_frame.instances if isinstance(inst, Instance)]
)
return count
@property
def all_instances(self) -> List[Instance]:
"""Return list of all instances."""
return list(self.instances())
@property
def user_instances(self) -> List[Instance]:
"""Return list of all user (non-predicted) instances."""
return [inst for inst in self.all_instances if isinstance(inst, Instance)]
@property
def predicted_instances(self) -> List[PredictedInstance]:
"""Return list of all predicted instances."""
return [
inst for inst in self.all_instances if isinstance(inst, PredictedInstance)
]
[docs] def describe(self):
"""Print basic statistics about the labels dataset."""
print(f"Videos: {len(self.videos)}")
n_user_inst = len(self.user_instances)
n_predicted_inst = len(self.predicted_instances)
print(
f"Instances: {n_user_inst:,} (user-labeled), "
f"{n_predicted_inst:,} (predicted), "
f"{n_user_inst + n_predicted_inst:,} (total)"
)
n_user_only = 0
n_pred_only = 0
n_both = 0
for lf in self.labeled_frames:
has_user = lf.has_user_instances
has_pred = lf.has_predicted_instances
if has_user and not has_pred:
n_user_only += 1
elif not has_user and has_pred:
n_pred_only += 1
elif has_user and has_pred:
n_both += 1
n_total = len(self.labeled_frames)
print(
f"Frames: {n_user_only:,} (user-labeled), "
f"{n_pred_only:,} (predicted), "
f"{n_both:,} (both), "
f"{n_total:,} (total)"
)
[docs] def instances(self, video: Video = None, skeleton: Skeleton = None):
"""Iterate over instances in the labels, optionally with filters.
Args:
video: Only iterate through instances in this video
skeleton: Only iterate through instances with this skeleton
Yields:
Instance: The next labeled instance
"""
for label in self.labels:
if video is None or label.video == video:
for instance in label.instances:
if skeleton is None or instance.skeleton == skeleton:
yield instance
def get_template_instance_points(self, skeleton: Skeleton):
if not hasattr(self, "_template_instance_points"):
self._template_instance_points = dict()
# Use cache unless there are a small number of labeled frames so far, or
# we don't have a cached template instance yet or the skeleton has changed.
rebuild_template = False
if len(self.labeled_frames) < 100:
rebuild_template = True
elif skeleton not in self._template_instance_points:
rebuild_template = True
elif skeleton.nodes != self._template_instance_points[skeleton]["nodes"]:
rebuild_template = True
if rebuild_template:
# Make sure there are some labeled frames
if self.labeled_frames and any(self.instances()):
from sleap.info import align
first_n_instances = itertools.islice(
self.instances(skeleton=skeleton), 1000
)
template_points = align.get_template_points_array(first_n_instances)
self._template_instance_points[skeleton] = dict(
points=template_points, nodes=skeleton.nodes
)
else:
# No labeled frames so use force-directed graph layout
import networkx as nx
node_positions = nx.spring_layout(G=skeleton.graph, scale=50)
template_points = np.stack(
[
node_positions[node]
if node in node_positions
else np.random.randint(0, 50, size=2)
for node in skeleton.nodes
]
)
self._template_instance_points[skeleton] = dict(
points=template_points, nodes=skeleton.nodes
)
return self._template_instance_points[skeleton]["points"]
[docs] def get_track_count(self, video: Video) -> int:
"""Return the number of occupied tracks for a given video."""
return len(self.get_track_occupancy(video))
[docs] def get_track_occupancy(self, video: Video) -> List:
"""Return track occupancy list for given video."""
return self._cache.get_video_track_occupancy(video=video)
[docs] def add_track(self, video: Video, track: Track):
"""Add track to labels, updating occupancy."""
self.tracks.append(track)
self._cache.add_track(video, track)
[docs] def track_set_instance(
self, frame: LabeledFrame, instance: Instance, new_track: Track
):
"""Set track on given instance, updating occupancy."""
self.track_swap(
frame.video,
new_track,
instance.track,
(frame.frame_idx, frame.frame_idx + 1),
)
if instance.track is None:
self._cache.remove_instance(frame, instance) # FIXME
instance.track = new_track
[docs] def track_swap(
self,
video: Video,
new_track: Track,
old_track: Optional[Track],
frame_range: tuple,
):
"""Swap track assignment for instances in two tracks.
If you need to change the track to or from None, you'll need
to use :meth:`track_set_instance` for each specific
instance you want to modify.
Args:
video: The :class:`Video` for which we want to swap tracks.
new_track: A :class:`Track` for which we want to swap
instances with another track.
old_track: The other :class:`Track` for swapping.
frame_range: Tuple of (start, end) frame indexes.
If you want to swap tracks on a single frame, use
(frame index, frame index + 1).
"""
self._cache.track_swap(video, new_track, old_track, frame_range)
# Update tracks set on instances
# Get all instances in old/new tracks
# Note that this won't match on None track.
old_track_instances = self.find_track_occupancy(video, old_track, frame_range)
new_track_instances = self.find_track_occupancy(video, new_track, frame_range)
# swap new to old tracks on all instances
for instance in old_track_instances:
instance.track = new_track
# old_track can be `Track` or int
# If int, it's index in instance list which we'll use as a pseudo-track,
# but we won't set instances currently on new_track to old_track.
if type(old_track) == Track:
for instance in new_track_instances:
instance.track = old_track
[docs] def remove_instance(
self, frame: LabeledFrame, instance: Instance, in_transaction: bool = False
):
"""Remove instance from frame, updating track occupancy."""
frame.instances.remove(instance)
if not in_transaction:
self._cache.remove_instance(frame, instance)
[docs] def add_instance(self, frame: LabeledFrame, instance: Instance):
"""Add instance to frame, updating track occupancy."""
# Ensure that there isn't already an Instance with this track
tracks_in_frame = [
inst.track
for inst in frame
if type(inst) == Instance and inst.track is not None
]
if instance.track in tracks_in_frame:
instance.track = None
frame.instances.append(instance)
self._cache.add_instance(frame, instance)
[docs] def find_track_occupancy(
self, video: Video, track: Union[Track, int], frame_range=None
) -> List[Instance]:
"""Get instances for a given video, track, and range of frames.
Args:
video: the `Video`
track: the `Track` or int ("pseudo-track" index to instance list)
frame_range (optional):
If specified, only return instances on frames in range.
If None, return all instances for given track.
Returns:
List of :class:`Instance` objects.
"""
frame_range = range(*frame_range) if type(frame_range) == tuple else frame_range
def does_track_match(inst, tr, labeled_frame):
match = False
if type(tr) == Track and inst.track is tr:
match = True
elif (
type(tr) == int
and labeled_frame.instances.index(inst) == tr
and inst.track is None
):
match = True
return match
track_frame_inst = [
instance
for lf in self.find(video)
for instance in lf.instances
if does_track_match(instance, track, lf)
and (frame_range is None or lf.frame_idx in frame_range)
]
return track_frame_inst
[docs] def get_video_suggestions(self, video: Video) -> List[int]:
"""Return a list of suggested frame indices.
Args:
video: Video to get suggestions for.
Returns:
Indices of the labeled frames for for the specified video.
"""
return [item.frame_idx for item in self.suggestions if item.video == video]
[docs] def get_suggestions(self) -> List[SuggestionFrame]:
"""Return all suggestions as a list of SuggestionFrame items."""
return self.suggestions
[docs] def find_suggestion(self, video, frame_idx):
"""Find SuggestionFrame by video and frame index."""
matches = [
item
for item in self.suggestions
if item.video == video and item.frame_idx == frame_idx
]
if matches:
return matches[0]
return None
[docs] def get_next_suggestion(self, video, frame_idx, seek_direction=1):
"""Return a (video, frame_idx) tuple seeking from given frame."""
# make sure we have valid seek_direction
if seek_direction not in (-1, 1):
raise ValueError("seek_direction should be -1 or 1.")
# make sure the video belongs to this Labels object
if video not in self.videos:
return None
all_suggestions = self.get_suggestions()
# If we're currently on a suggestion, then follow order of list
match = self.find_suggestion(video, frame_idx)
if match is not None:
suggestion_idx = all_suggestions.index(match)
new_idx = (suggestion_idx + seek_direction) % len(all_suggestions)
return all_suggestions[new_idx]
# Otherwise, find the prev/next suggestion sorted by frame order...
# Look for next (or previous) suggestion in current video.
if seek_direction == 1:
frame_suggestion = min(
(i for i in self.get_video_suggestions(video) if i > frame_idx),
default=None,
)
else:
frame_suggestion = max(
(i for i in self.get_video_suggestions(video) if i < frame_idx),
default=None,
)
if frame_suggestion is not None:
return self.find_suggestion(video, frame_suggestion)
# If we didn't find suggestion in current video, then we want earliest
# frame in next video with suggestions.
next_video_idx = (self.videos.index(video) + seek_direction) % len(self.videos)
video = self.videos[next_video_idx]
if seek_direction == 1:
frame_suggestion = min(
(i for i in self.get_video_suggestions(video)), default=None
)
else:
frame_suggestion = max(
(i for i in self.get_video_suggestions(video)), default=None
)
return self.find_suggestion(video, frame_suggestion)
[docs] def set_suggestions(self, suggestions: List[SuggestionFrame]):
"""Set the suggested frames."""
self.suggestions = suggestions
[docs] def delete_suggestions(self, video):
"""Delete suggestions for specified video."""
self.suggestions = [item for item in self.suggestions if item.video != video]
[docs] def add_video(self, video: Video):
"""Add a video to the labels if it is not already in it.
Video instances are added automatically when adding labeled frames,
but this function allows for adding videos to the labels before any
labeled frames are added.
Args:
video: `Video` instance
"""
if video not in self.videos:
self.videos.append(video)
[docs] def remove_video(self, video: Video):
"""Remove a video from the labels and all associated labeled frames.
Args:
video: `Video` instance to be removed.
"""
if video not in self.videos:
raise KeyError("Video is not in labels.")
# Delete all associated labeled frames
for label in reversed(self.labeled_frames):
if label.video == video:
self.labeled_frames.remove(label)
# Delete data that's indexed by video
self.delete_suggestions(video)
if video in self.negative_anchors:
del self.negative_anchors[video]
# Delete video
self.videos.remove(video)
self._cache.remove_video(video)
@classmethod
def from_json(cls, *args, **kwargs):
from sleap.io.format.labels_json import LabelsJsonAdaptor
return LabelsJsonAdaptor.from_json_data(*args, **kwargs)
[docs] def extend_from(
self, new_frames: Union["Labels", List[LabeledFrame]], unify: bool = False
):
"""Merge data from another `Labels` object or `LabeledFrame` list.
Arg:
new_frames: the object from which to copy data
unify: whether to replace objects in new frames with
corresponding objects from current `Labels` data
Returns:
bool, True if we added frames, False otherwise
"""
# allow either Labels or list of LabeledFrames
if isinstance(new_frames, Labels):
new_frames = new_frames.labeled_frames
# return if this isn't non-empty list of labeled frames
if not isinstance(new_frames, list) or len(new_frames) == 0:
return False
if not isinstance(new_frames[0], LabeledFrame):
return False
# If unify, we want to replace objects in the frames with
# corresponding objects from the current labels.
# We do this by deserializing/serializing with match_to.
if unify:
new_json = Labels(labeled_frames=new_frames).to_dict()
new_labels = Labels.from_json(new_json, match_to=self)
new_frames = new_labels.labeled_frames
# copy the labeled frames
self.labeled_frames.extend(new_frames)
# merge labeled frames for the same video/frame idx
self.merge_matching_frames()
# update top level videos/nodes/skeletons/tracks
self._update_from_labels(merge=True)
self._cache.update()
return True
[docs] def has_frame(
self,
lf: Optional[LabeledFrame] = None,
video: Optional[Video] = None,
frame_idx: Optional[int] = None,
use_cache: bool = True,
) -> bool:
"""Check if the labels contain a specified frame.
Args:
lf: `LabeledFrame` to search for. If not provided, the `video` and
`frame_idx` must not be `None`.
video: `Video` of the frame. Not necessary if `lf` is given.
frame_idx: Integer frame index of the frame. Not necessary if `lf` is given.
use_cache: If `True` (the default), use label lookup cache for faster
searching. If `False`, check every frame without the cache.
Returns:
A `bool` indicating whether the specified `LabeledFrame` is contained in the
labels.
This will return `True` if there is a matching frame with the same video and
frame index, even if they contain different instances.
Notes:
The `Video` instance must be the same as the ones in these labels, so if
comparing to `Video`s loaded from another file, be sure to load those labels
with matching, i.e.: `sleap.Labels.load_file(..., match_to=labels)`.
"""
if lf is not None:
video = lf.video
frame_idx = lf.frame_idx
if video is None or frame_idx is None:
raise ValueError("Either lf or video and frame_idx must be provided.")
if use_cache:
return len(self.find(video, frame_idx=frame_idx, return_new=False)) > 0
else:
if video not in self.videos:
return False
for lf in self.labeled_frames:
if lf.video == video and lf.frame_idx == frame_idx:
return True
return False
[docs] def remove_user_instances(self, new_labels: Optional["Labels"] = None):
"""Clear user instances from the labels.
Useful prior to merging operations to prevent overlapping instances from new
labels.
Args:
new_labels: If not `None`, only user instances in frames that also contain
user instances in the new labels will be removed. If not provided
(the default), all user instances will be removed.
Notes:
If providing `new_labels`, it must have been loaded using
`sleap.Labels.load_file(..., match_to=labels)` to ensure that conflicting
frames can be detected.
Labeled frames without any instances after clearing will also be removed
from the dataset.
"""
keep_lfs = []
for lf in self.labeled_frames:
if new_labels is not None:
if not new_labels.has_frame(lf):
# Base frame is not in new labels, so just keep it without
# modification.
keep_lfs.append(lf)
continue
if lf.has_predicted_instances:
# Remove predictions from base frame.
lf.instances = lf.predicted_instances
keep_lfs.append(lf)
# Keep only labeled frames with no conflicting predictions.
self.labeled_frames = keep_lfs
[docs] def remove_predictions(self, new_labels: Optional["Labels"] = None):
"""Clear predicted instances from the labels.
Useful prior to merging operations to prevent overlapping instances from new
predictions.
Args:
new_labels: If not `None`, only predicted instances in frames that also
contain predictions in the new labels will be removed. If not provided
(the default), all predicted instances will be removed.
Notes:
If providing `new_labels`, it must have been loaded using
`sleap.Labels.load_file(..., match_to=labels)` to ensure that conflicting
frames can be detected.
Labeled frames without any instances after clearing will also be removed
from the dataset.
"""
keep_lfs = []
for lf in self.labeled_frames:
if new_labels is not None:
if not new_labels.has_frame(lf):
# Base frame is not in new labels, so just keep it without
# modification.
keep_lfs.append(lf)
continue
if lf.has_user_instances:
# Remove predictions from base frame.
lf.instances = lf.user_instances
keep_lfs.append(lf)
# Keep only labeled frames with no conflicting predictions.
self.labeled_frames = keep_lfs
[docs] @classmethod
def complex_merge_between(
cls, base_labels: "Labels", new_labels: "Labels", unify: bool = True
) -> tuple:
"""Merge frames and other data from one dataset into another.
Anything that can be merged cleanly is merged into base_labels.
Frames conflict just in case each labels object has a matching
frame (same video and frame idx) with instances not in other.
Frames can be merged cleanly if:
* the frame is in only one of the labels, or
* the frame is in both labels, but all instances perfectly match
(which means they are redundant), or
* the frame is in both labels, maybe there are some redundant
instances, but only one version of the frame has additional
instances not in the other.
Args:
base_labels: the `Labels` that we're merging into
new_labels: the `Labels` that we're merging from
unify: whether to replace objects (e.g., `Video`) in
new_labels with *matching* objects from base
Returns:
tuple of three items:
* Dictionary, keys are :class:`Video`, values are
dictionary in which keys are frame index (int)
and value is list of :class:`Instance` objects
* list of conflicting :class:`Instance` objects from base
* list of conflicting :class:`Instance` objects from new
"""
# If unify, we want to replace objects in the frames with
# corresponding objects from the current labels.
# We do this by deserializing/serializing with match_to.
if unify:
new_json = new_labels.to_dict()
new_labels = cls.from_json(new_json, match_to=base_labels)
# Merge anything that can be merged cleanly and get conflicts
merged, extra_base, extra_new = LabeledFrame.complex_merge_between(
base_labels=base_labels, new_frames=new_labels.labeled_frames
)
# For clean merge, finish merge now by cleaning up base object
if not extra_base and not extra_new:
# Add any new videos (etc) into top level lists in base
base_labels._update_from_labels(merge=True)
# Update caches
base_labels.update_cache()
# Merge suggestions and negative anchors
base_labels.suggestions.extend(new_labels.suggestions)
cls.merge_container_dicts(
base_labels.negative_anchors, new_labels.negative_anchors
)
return merged, extra_base, extra_new
[docs] @staticmethod
def finish_complex_merge(
base_labels: "Labels", resolved_frames: List[LabeledFrame]
):
"""Finish conflicted merge from complex_merge_between.
Args:
base_labels: the `Labels` that we're merging into
resolved_frames: the list of frames to add into base_labels
"""
# Add all the resolved frames to base
base_labels.labeled_frames.extend(resolved_frames)
# Combine instances when there are two LabeledFrames for same
# video and frame index
base_labels.merge_matching_frames()
# Add any new videos (etc) into top level lists in base
base_labels._update_from_labels(merge=True)
# Update caches
base_labels.update_cache()
[docs] @staticmethod
def merge_container_dicts(dict_a: Dict, dict_b: Dict) -> Dict:
"""Merge data from dict_b into dict_a."""
for key in dict_b.keys():
if key in dict_a:
dict_a[key].extend(dict_b[key])
uniquify(dict_a[key])
else:
dict_a[key] = dict_b[key]
[docs] def merge_matching_frames(self, video: Optional[Video] = None):
"""Merge `LabeledFrame` objects that are for the same video frame.
Args:
video: combine for this video; if None, do all videos
"""
if video is None:
for vid in {lf.video for lf in self.labeled_frames}:
self.merge_matching_frames(video=vid)
else:
self.labeled_frames = LabeledFrame.merge_frames(
self.labeled_frames, video=video
)
[docs] def to_dict(self, skip_labels: bool = False) -> Dict[str, Any]:
"""Serialize all labels to dicts.
Serializes the labels in the underling list of LabeledFrames to a dict
structure. This function returns a nested dict structure composed entirely of
primitive python types. It is used to create JSON and HDF5 serialized datasets.
Args:
skip_labels: If True, skip labels serialization and just do the metadata.
Returns:
A dict containing the followings top level keys:
* version - The version of the dict/json serialization format.
* skeletons - The skeletons associated with these underlying
instances.
* nodes - The nodes that the skeletons represent.
* videos - The videos that that the instances occur on.
* labels - The labeled frames
* tracks - The tracks associated with each instance.
* suggestions - The suggested frames.
* negative_anchors - The negative training sample anchors.
"""
# FIXME: Update list of nodes
# We shouldn't have to do this here, but for some reason we're missing nodes
# which are in the skeleton but don't have points (in the first instance?).
self.nodes = list(
set(self.nodes).union(
{node for skeleton in self.skeletons for node in skeleton.nodes}
)
)
# Register some unstructure hooks since we don't want complete deserialization
# of video and skeleton objects present in the labels. We will serialize these
# as references to the above constructed lists to limit redundant data in the
# json
label_cattr = make_instance_cattr()
label_cattr.register_unstructure_hook(
Skeleton, lambda x: str(self.skeletons.index(x))
)
label_cattr.register_unstructure_hook(
Video, lambda x: str(self.videos.index(x))
)
label_cattr.register_unstructure_hook(Node, lambda x: str(self.nodes.index(x)))
label_cattr.register_unstructure_hook(
Track, lambda x: str(self.tracks.index(x))
)
# Make a converter for the top level skeletons list.
idx_to_node = {i: self.nodes[i] for i in range(len(self.nodes))}
skeleton_cattr = Skeleton.make_cattr(idx_to_node)
# Make attr for tracks so that we save as tuples rather than dicts;
# this can save a lot of space when there are lots of tracks.
track_cattr = cattr.Converter(unstruct_strat=cattr.UnstructureStrategy.AS_TUPLE)
# Serialize the skeletons, videos, and labels
dicts = {
"version": LABELS_JSON_FILE_VERSION,
"skeletons": skeleton_cattr.unstructure(self.skeletons),
"nodes": cattr.unstructure(self.nodes),
"videos": Video.cattr().unstructure(self.videos),
"tracks": track_cattr.unstructure(self.tracks),
"suggestions": label_cattr.unstructure(self.suggestions),
"negative_anchors": label_cattr.unstructure(self.negative_anchors),
"provenance": label_cattr.unstructure(self.provenance),
}
if not skip_labels:
dicts["labels"] = label_cattr.unstructure(self.labeled_frames)
return dicts
[docs] def to_json(self):
"""Serialize all labels in the underling list of LabeledFrame(s) to JSON.
Returns:
The JSON representation of the string.
"""
# Unstructure the data into dicts and dump to JSON.
return json_dumps(self.to_dict())
[docs] @classmethod
def load_file(
cls,
filename: str,
video_search: Union[Callable, List[Text], None] = None,
*args,
**kwargs,
):
"""Load file, detecting format from filename."""
from .format import read
return read(
filename, for_object="labels", video_search=video_search, *args, **kwargs
)
[docs] @classmethod
def save_file(
cls, labels: "Labels", filename: str, default_suffix: str = "", *args, **kwargs
):
"""Save file, detecting format from filename.
Args:
labels: The dataset to save.
filename: Path where we'll save it. We attempt to detect format
from the suffix (e.g., ".json").
default_suffix: If we can't detect valid suffix on filename,
we can add default suffix to filename (and use corresponding
format). Doesn't need to have "." before file extension.
Raises:
ValueError: If cannot detect valid filetype.
"""
# Convert to full (absolute) path
filename = os.path.abspath(filename)
# Make sure that all directories for path exist
os.makedirs(os.path.dirname(filename), exist_ok=True)
from .format import write
write(filename, labels, *args, **kwargs)
[docs] def save(
self,
filename: Text,
with_images: bool = False,
embed_all_labeled: bool = False,
embed_suggested: bool = False,
):
"""Save the labels to a file.
Args:
filename: Path to save the labels to ending in `.slp`. If the filename does
not end in `.slp`, the extension will be automatically appended.
with_images: If `True`, the image data for frames with labels will be
embedded in the saved labels. This is useful for generating a single
file to be used when training remotely. Defaults to `False`.
embed_all_labeled: If `True`, save image data for labeled frames without
user-labeled instances (defaults to `False`). This is useful for
selecting arbitrary frames to save by adding empty `LabeledFrame`s to
the dataset. Labeled frame metadata will be saved regardless.
embed_suggested: If `True`, save image data for frames in the suggestions
(defaults to `False`). Useful for predicting on remaining suggestions
after training. Suggestions metadata will be saved regardless.
Notes:
This is an instance-level wrapper for the `Labels.save_file` class method.
"""
if os.path.splitext(filename)[1].lower() != ".slp":
filename = filename + ".slp"
Labels.save_file(
self,
filename,
save_frame_data=with_images,
all_labeled=embed_all_labeled,
suggested=embed_suggested,
)
@classmethod
def load_json(cls, filename: str, *args, **kwargs) -> "Labels":
from .format import read
return read(filename, for_object="labels", as_format="json", *args, **kwargs)
@classmethod
def save_json(cls, labels: "Labels", filename: str, *args, **kwargs):
from .format import write
write(filename, labels, as_format="json", *args, **kwargs)
@classmethod
def load_hdf5(cls, filename, *args, **kwargs):
from .format import read
return read(filename, for_object="labels", as_format="hdf5_v1", *args, **kwargs)
@classmethod
def save_hdf5(cls, labels, filename, *args, **kwargs):
from .format import write
write(filename, labels, as_format="hdf5_v1", *args, **kwargs)
@classmethod
def load_leap_matlab(cls, filename, *args, **kwargs):
from .format import read
return read(filename, for_object="labels", as_format="leap", *args, **kwargs)
@classmethod
def load_deeplabcut(cls, filename: str) -> "Labels":
from .format import read
return read(filename, for_object="labels", as_format="deeplabcut")
@classmethod
def load_coco(
cls, filename: str, img_dir: str, use_missing_gui: bool = False
) -> "Labels":
from sleap.io.format.coco import LabelsCocoAdaptor
from sleap.io.format.filehandle import FileHandle
return LabelsCocoAdaptor.read(FileHandle(filename), img_dir, use_missing_gui)
@classmethod
def from_deepposekit(
cls, filename: str, video_path: str, skeleton_path: str
) -> "Labels":
from sleap.io.format.deepposekit import LabelsDeepPoseKitAdaptor
from sleap.io.format.filehandle import FileHandle
return LabelsDeepPoseKitAdaptor.read(
FileHandle(filename), video_path, skeleton_path
)
[docs] def save_frame_data_imgstore(
self, output_dir: str = "./", format: str = "png", all_labels: bool = False
) -> List[ImgStoreVideo]:
"""Write images for labeled frames from all videos to imgstore datasets.
This only writes frames that have been labeled. Videos without
any labeled frames will be included as empty imgstores.
Args:
output_dir: Path to directory which will contain imgstores.
format: The image format to use for the data.
Use "png" for lossless, "jpg" for lossy.
Other imgstore formats will probably work as well but
have not been tested.
all_labels: Include any labeled frames, not just the frames
we'll use for training (i.e., those with `Instance` objects ).
Returns:
A list of :class:`ImgStoreVideo` objects with the stored
frames.
"""
# For each label
imgstore_vids = []
for v_idx, v in enumerate(self.videos):
frame_nums = [
lf.frame_idx
for lf in self.labeled_frames
if v == lf.video and (all_labels or lf.has_user_instances)
]
# Join with "/" instead of os.path.join() since we want
# path to work on Windows and Posix systems
frames_filename = output_dir + f"/frame_data_vid{v_idx}"
vid = v.to_imgstore(
path=frames_filename, frame_numbers=frame_nums, format=format
)
# Close the video for now
vid.close()
imgstore_vids.append(vid)
return imgstore_vids
[docs] def save_frame_data_hdf5(
self,
output_path: str,
format: str = "png",
user_labeled: bool = True,
all_labeled: bool = False,
suggested: bool = False,
) -> List[HDF5Video]:
"""Write images for labeled frames from all videos to hdf5 file.
Note that this will make an HDF5 video, not an HDF5 labels dataset.
Args:
output_path: Path to HDF5 file.
format: The image format to use for the data. Defaults to png.
user_labeled: Include labeled frames with user instances. Defaults to
`True`.
all_labeled: Include all labeled frames, including those with user-labeled
instances, predicted instances or labeled frames with no instances.
Defaults to `False`.
suggested: Include suggested frames even if they do not have instances.
Useful for inference after training. Defaults to `False`.
Returns:
A list of :class:`HDF5Video` objects with the stored frames.
"""
new_vids = []
for v_idx, video in enumerate(self.videos):
lfs_v = self.find(video)
frame_nums = [
lf.frame_idx
for lf in lfs_v
if all_labeled or (user_labeled and lf.has_user_instances)
]
if suggested:
frame_nums += [
suggestion.frame_idx
for suggestion in self.suggestions
if suggestion.video == video
]
frame_nums = sorted(list(set(frame_nums)))
vid = video.to_hdf5(
path=output_path,
dataset=f"video{v_idx}",
format=format,
frame_numbers=frame_nums,
)
vid.close()
new_vids.append(vid)
return new_vids
@classmethod
def make_gui_video_callback(cls, search_paths: Optional[List] = None) -> Callable:
return cls.make_video_callback(search_paths=search_paths, use_gui=True)
[docs] @classmethod
def make_video_callback(
cls, search_paths: Optional[List] = None, use_gui: bool = False
) -> Callable:
"""Create a callback for finding missing videos.
The callback can be used while loading a saved project and
allows the user to find videos which have been moved (or have
paths from a different system).
The callback function returns True to signal "abort".
Args:
search_paths: If specified, this is a list of paths where
we'll automatically try to find the missing videos.
Returns:
The callback function.
"""
search_paths = search_paths or []
def video_callback(video_list, new_paths=search_paths):
filenames = [item["backend"]["filename"] for item in video_list]
missing = pathutils.list_file_missing(filenames)
# Try changing the prefix using saved patterns
if sum(missing):
pathutils.fix_paths_with_saved_prefix(filenames, missing)
# Check for file in search_path directories
if sum(missing) and new_paths:
for i, filename in enumerate(filenames):
fixed_path = find_path_using_paths(filename, new_paths)
if fixed_path != filename:
filenames[i] = fixed_path
missing[i] = False
if use_gui:
# If there are still missing paths, prompt user
if sum(missing):
# If we are using dummy for any video not found by user
# then don't require user to find everything.
allow_incomplete = USE_DUMMY_FOR_MISSING_VIDEOS
okay = MissingFilesDialog(
filenames, missing, allow_incomplete=allow_incomplete
).exec_()
if not okay:
return True # True for stop
if not use_gui and sum(missing):
# If we got the same number of paths as there are videos
if len(filenames) == len(new_paths):
# and the file extensions match
exts_match = all(
(
old.split(".")[-1] == new.split(".")[-1]
for old, new in zip(filenames, new_paths)
)
)
if exts_match:
# then the search paths should be a list of all the
# video paths, so we can get the new path for the missing
# old path.
for i, filename in enumerate(filenames):
if missing[i]:
filenames[i] = new_paths[i]
# Replace the video filenames with changes by user
for i, item in enumerate(video_list):
item["backend"]["filename"] = filenames[i]
if USE_DUMMY_FOR_MISSING_VIDEOS and sum(missing):
# Replace any video still missing with "dummy" video
for is_missing, item in zip(missing, video_list):
from sleap.io.video import DummyVideo
vid = DummyVideo(filename=item["backend"]["filename"])
item["backend"] = cattr.unstructure(vid)
return video_callback
[docs]def find_path_using_paths(missing_path: Text, search_paths: List[Text]) -> Text:
"""Find a path to a missing file given a set of paths to search in.
Args:
missing_path: Path to the missing filename.
search_paths: List of paths to search in.
Returns:
The corrected path if it was found, or the original missing path if it was not.
"""
# Get basename (filename with directories) using current os path format
current_basename = os.path.basename(missing_path)
# Handle unix, windows, or mixed paths
if current_basename.find("/") > -1:
current_basename = current_basename.split("/")[-1]
if current_basename.find("\\") > -1:
current_basename = current_basename.split("\\")[-1]
# Look for file with that name in each of the search path directories
for search_path in search_paths:
if os.path.isfile(search_path):
path_dir = os.path.dirname(search_path)
else:
path_dir = search_path
check_path = os.path.join(path_dir, current_basename)
if os.path.exists(check_path):
return check_path
return missing_path
[docs]def load_file(
filename: Text,
detect_videos: bool = True,
search_paths: Optional[Union[List[Text], Text]] = None,
) -> Labels:
"""Load a SLEAP labels file.
Args:
filename: Path to a SLEAP labels (.slp) file.
detect_videos: If True, will attempt to detect missing videos by searching for
their filenames in the search paths. This is useful when loading SLEAP
labels files that were generated on another computer with different paths.
search_paths: A path or list of paths to search for the missing videos. This can
be the direct path to the video file or its containing folder. If not
specified, defaults to searching for the videos in the same folder as the
labels.
Returns:
The loaded `Labels` instance.
Notes:
This is a convenience method to call `sleap.Labels.load_file`. See that class
method for more functionality in the loading process.
The video files do not need to be accessible in order to load the labels, for
example, when only the predicted instances or user labels are required.
"""
if detect_videos:
if search_paths is None:
search_paths = os.path.dirname(filename)
return Labels.load_file(filename, search_paths)
else:
return Labels.load_file(filename)