"""
Adaptor for reading/writing SLEAP datasets as HDF5 (including `.slp`).
Note that this is not the adaptor for reading/writing the "analysis" HDF5
format.
"""
from sleap.io import format
from . import labels_json
from sleap.instance import (
PointArray,
PredictedPointArray,
Instance,
PredictedInstance,
LabeledFrame,
PredictedPoint,
Point,
)
from sleap.util import json_loads, json_dumps
from sleap import Labels, Video
import h5py
import numpy as np
import os
from typing import Optional, Callable, List, Text, Union
[docs]class LabelsV1Adaptor(format.adaptor.Adaptor):
FORMAT_ID = 1.1
# 1.0 points with gridline coordinates, top left corner at (0, 0)
# 1.1 points with midpixel coordinates, top left corner at (-0.5, -0.5)
@property
def handles(self):
return format.adaptor.SleapObjectType.labels
@property
def default_ext(self):
return "slp"
@property
def all_exts(self):
return ["slp", "h5", "hdf5"]
@property
def name(self):
return "Labels HDF5"
[docs] def can_read_file(self, file: format.filehandle.FileHandle):
if not self.does_match_ext(file.filename):
return False
if not file.is_hdf5:
return False
if file.format_id is not None and file.format_id >= 2:
return False
if "metadata" not in file.file:
return False
return True
[docs] def can_write_filename(self, filename: str):
return self.does_match_ext(filename)
[docs] def does_read(self) -> bool:
return True
[docs] def does_write(self) -> bool:
return True
@classmethod
def read_headers(
cls,
file: format.filehandle.FileHandle,
video_search: Union[Callable, List[Text], None] = None,
match_to: Optional[Labels] = None,
):
f = file.file
# Extract the Labels JSON metadata and create Labels object with just this
# metadata.
dicts = json_loads(
f.require_group("metadata").attrs["json"].tostring().decode()
)
# These items are stored in separate lists because the metadata group got to be
# too big.
for key in ("videos", "tracks", "suggestions"):
hdf5_key = f"{key}_json"
if hdf5_key in f:
items = [json_loads(item_json) for item_json in f[hdf5_key]]
dicts[key] = items
# Video path "." means the video is saved in same file as labels, so replace
# these paths.
for video_item in dicts["videos"]:
if video_item["backend"]["filename"] == ".":
video_item["backend"]["filename"] = file.filename
# Use the video_callback for finding videos with broken paths:
# 1. Accept single string as video search path
if isinstance(video_search, str):
video_search = [video_search]
# 2. Accept list of strings as video search paths
if hasattr(video_search, "__iter__"):
# If the callback is an iterable, then we'll expect it to be a list of
# strings and build a non-gui callback with those as the search paths.
search_paths = [
# os.path.dirname(path) if os.path.isfile(path) else path
path
for path in video_search
]
# Make the search function from list of paths
video_search = Labels.make_video_callback(search_paths)
# 3. Use the callback function (either given as arg or build from paths)
if callable(video_search):
video_search(dicts["videos"])
# Create the Labels object with the header data we've loaded
labels = labels_json.LabelsJsonAdaptor.from_json_data(dicts, match_to=match_to)
return labels
[docs] @classmethod
def read(
cls,
file: format.filehandle.FileHandle,
video_search: Union[Callable, List[Text], None] = None,
match_to: Optional[Labels] = None,
*args,
**kwargs,
):
f = file.file
labels = cls.read_headers(file, video_search, match_to)
frames_dset = f["frames"][:]
instances_dset = f["instances"][:]
points_dset = f["points"][:]
pred_points_dset = f["pred_points"][:]
# Shift the *non-predicted* points since these used to be saved with a gridline
# coordinate system.
if (file.format_id or 0) < 1.1:
points_dset[:]["x"] -= 0.5
points_dset[:]["y"] -= 0.5
# Rather than instantiate a bunch of Point\PredictedPoint objects, we will use
# inplace numpy recarrays. This will save a lot of time and memory when reading
# things in.
points = PointArray(buf=points_dset, shape=len(points_dset))
pred_points = PredictedPointArray(
buf=pred_points_dset, shape=len(pred_points_dset)
)
# Extend the tracks list with a None track. We will signify this with a -1 in
# the data which will map to last element of tracks
tracks = labels.tracks.copy()
tracks.extend([None])
# A dict to keep track of instances that have a from_predicted link. The key is
# the instance and the value is the index of the instance.
from_predicted_lookup = {}
# Create the instances
instances = []
for i in instances_dset:
track = tracks[i["track"]]
skeleton = labels.skeletons[i["skeleton"]]
if i["instance_type"] == 0: # Instance
instance = Instance(
skeleton=skeleton,
track=track,
points=points[i["point_id_start"] : i["point_id_end"]],
)
else: # PredictedInstance
instance = PredictedInstance(
skeleton=skeleton,
track=track,
points=pred_points[i["point_id_start"] : i["point_id_end"]],
score=i["score"],
)
instances.append(instance)
if i["from_predicted"] != -1:
from_predicted_lookup[instance] = i["from_predicted"]
# Make a second pass to add any from_predicted links
for instance, from_predicted_idx in from_predicted_lookup.items():
instance.from_predicted = instances[from_predicted_idx]
# Create the labeled frames
frames = [
LabeledFrame(
video=labels.videos[frame["video"]],
frame_idx=frame["frame_idx"],
instances=instances[
frame["instance_id_start"] : frame["instance_id_end"]
],
)
for i, frame in enumerate(frames_dset)
]
labels.labeled_frames = frames
# Do the stuff that should happen after we have labeled frames
labels.update_cache()
return labels
[docs] @classmethod
def write(
cls,
filename: str,
source_object: object,
append: bool = False,
save_frame_data: bool = False,
frame_data_format: str = "png",
all_labeled: bool = False,
suggested: bool = False,
):
labels = source_object
# Delete the file if it exists, we want to start from scratch since
# h5py truncates the file which seems to not actually delete data
# from the file. Don't if we are appending of course.
if os.path.exists(filename) and not append:
os.unlink(filename)
# Serialize all the meta-data to JSON.
d = labels.to_dict(skip_labels=True)
if save_frame_data:
new_videos = labels.save_frame_data_hdf5(
filename,
format=frame_data_format,
user_labeled=True,
all_labeled=all_labeled,
suggested=suggested,
)
# Replace path to video file with "." (which indicates that the
# video is in the same file as the HDF5 labels dataset).
# Otherwise, the video paths will break if the HDF5 labels
# dataset file is moved.
for vid in new_videos:
vid.backend.filename = "."
d["videos"] = Video.cattr().unstructure(new_videos)
with h5py.File(filename, "a") as f:
# Add all the JSON metadata
meta_group = f.require_group("metadata")
meta_group.attrs["format_id"] = cls.FORMAT_ID
# If we are appending and there already exists JSON metadata
if append and "json" in meta_group.attrs:
# Otherwise, we need to read the JSON and append to the lists
old_labels = labels_json.LabelsJsonAdaptor.from_json_data(
meta_group.attrs["json"].tostring().decode()
)
# A function to join to list but only include new non-dupe entries
# from the right hand list.
def append_unique(old, new):
unique = []
for x in new:
try:
matches = [y.matches(x) for y in old]
except AttributeError:
matches = [x == y for y in old]
# If there were no matches, this is a unique object.
if sum(matches) == 0:
unique.append(x)
else:
# If we have an object that matches, replace the instance
# with the one from the new list. This will will make sure
# objects on the Instances are the same as those in the
# Labels lists.
for i, match in enumerate(matches):
if match:
old[i] = x
return old + unique
# Append the lists
labels.tracks = append_unique(old_labels.tracks, labels.tracks)
labels.skeletons = append_unique(old_labels.skeletons, labels.skeletons)
labels.videos = append_unique(old_labels.videos, labels.videos)
labels.nodes = append_unique(old_labels.nodes, labels.nodes)
# FIXME: Do something for suggestions and negative_anchors
# Get the dict for JSON and save it over the old data
d = labels.to_dict(skip_labels=True)
if not append:
# These items are stored in separate lists because the metadata
# group got to be too big.
for key in ("videos", "tracks", "suggestions"):
# Convert for saving in hdf5 dataset
data = [np.string_(json_dumps(item)) for item in d[key]]
hdf5_key = f"{key}_json"
# Save in its own dataset (e.g., videos_json)
f.create_dataset(hdf5_key, data=data, maxshape=(None,))
# Clear from dict since we don't want to save this in attribute
d[key] = []
# Output the dict to JSON
meta_group.attrs["json"] = np.string_(json_dumps(d))
# FIXME: We can probably construct these from attrs fields
# We will store Instances and PredcitedInstances in the same
# table. instance_type=0 or Instance and instance_type=1 for
# PredictedInstance, score will be ignored for Instances.
instance_dtype = np.dtype(
[
("instance_id", "i8"),
("instance_type", "u1"),
("frame_id", "u8"),
("skeleton", "u4"),
("track", "i4"),
("from_predicted", "i8"),
("score", "f4"),
("point_id_start", "u8"),
("point_id_end", "u8"),
]
)
frame_dtype = np.dtype(
[
("frame_id", "u8"),
("video", "u4"),
("frame_idx", "u8"),
("instance_id_start", "u8"),
("instance_id_end", "u8"),
]
)
num_instances = len(labels.all_instances)
max_skeleton_size = max([len(s.nodes) for s in labels.skeletons], default=0)
# Initialize data arrays for serialization
points = np.zeros(num_instances * max_skeleton_size, dtype=Point.dtype)
pred_points = np.zeros(
num_instances * max_skeleton_size, dtype=PredictedPoint.dtype
)
instances = np.zeros(num_instances, dtype=instance_dtype)
frames = np.zeros(len(labels), dtype=frame_dtype)
# Pre compute some structures to make serialization faster
skeleton_to_idx = {
skeleton: labels.skeletons.index(skeleton)
for skeleton in labels.skeletons
}
track_to_idx = {
track: labels.tracks.index(track) for track in labels.tracks
}
track_to_idx[None] = -1
video_to_idx = {
video: labels.videos.index(video) for video in labels.videos
}
instance_type_to_idx = {Instance: 0, PredictedInstance: 1}
# Each instance we create will have and index in the dataset, keep track of
# these so we can quickly add from_predicted links on a second pass.
instance_to_idx = {}
instances_with_from_predicted = []
instances_from_predicted = []
# If we are appending, we need look inside to see what frame, instance, and
# point ids we need to start from. This gives us offsets to use.
if append and "points" in f:
point_id_offset = f["points"].shape[0]
pred_point_id_offset = f["pred_points"].shape[0]
instance_id_offset = f["instances"][-1]["instance_id"] + 1
frame_id_offset = int(f["frames"][-1]["frame_id"]) + 1
else:
point_id_offset = 0
pred_point_id_offset = 0
instance_id_offset = 0
frame_id_offset = 0
point_id = 0
pred_point_id = 0
instance_id = 0
for frame_id, label in enumerate(labels):
frames[frame_id] = (
frame_id + frame_id_offset,
video_to_idx[label.video],
label.frame_idx,
instance_id + instance_id_offset,
instance_id + instance_id_offset + len(label.instances),
)
for instance in label.instances:
# Add this instance to our lookup structure we will need for
# from_predicted links
instance_to_idx[instance] = instance_id
parray = instance.get_points_array(copy=False, full=True)
instance_type = type(instance)
# Check whether we are working with a PredictedInstance or an
# Instance.
if instance_type is PredictedInstance:
score = instance.score
pid = pred_point_id + pred_point_id_offset
else:
score = np.nan
pid = point_id + point_id_offset
# Keep track of any from_predicted instance links, we will
# insert the correct instance_id in the dataset after we are
# done.
if instance.from_predicted:
instances_with_from_predicted.append(instance_id)
instances_from_predicted.append(instance.from_predicted)
# Copy all the data
instances[instance_id] = (
instance_id + instance_id_offset,
instance_type_to_idx[instance_type],
frame_id,
skeleton_to_idx[instance.skeleton],
track_to_idx[instance.track],
-1,
score,
pid,
pid + len(parray),
)
# If these are predicted points, copy them to the predicted point
# array otherwise, use the normal point array
if type(parray) is PredictedPointArray:
pred_points[
pred_point_id : (pred_point_id + len(parray))
] = parray
pred_point_id = pred_point_id + len(parray)
else:
points[point_id : (point_id + len(parray))] = parray
point_id = point_id + len(parray)
instance_id = instance_id + 1
# Add from_predicted links
for instance_id, from_predicted in zip(
instances_with_from_predicted, instances_from_predicted
):
try:
instances[instance_id]["from_predicted"] = instance_to_idx[
from_predicted
]
except KeyError:
# If we haven't encountered the from_predicted instance yet then
# don't save the link. It's possible for a user to create a regular
# instance from a predicted instance and then delete all predicted
# instances from the file, but in this case I don’t think there's
# any reason to remember which predicted instance the regular
# instance came from.
pass
# We pre-allocated our points array with max possible size considering the
# max skeleton size, drop any unused points.
points = points[0:point_id]
pred_points = pred_points[0:pred_point_id]
# Create datasets if we need to
if append and "points" in f:
f["points"].resize((f["points"].shape[0] + points.shape[0]), axis=0)
f["points"][-points.shape[0] :] = points
f["pred_points"].resize(
(f["pred_points"].shape[0] + pred_points.shape[0]), axis=0
)
f["pred_points"][-pred_points.shape[0] :] = pred_points
f["instances"].resize(
(f["instances"].shape[0] + instances.shape[0]), axis=0
)
f["instances"][-instances.shape[0] :] = instances
f["frames"].resize((f["frames"].shape[0] + frames.shape[0]), axis=0)
f["frames"][-frames.shape[0] :] = frames
else:
f.create_dataset(
"points", data=points, maxshape=(None,), dtype=Point.dtype
)
f.create_dataset(
"pred_points",
data=pred_points,
maxshape=(None,),
dtype=PredictedPoint.dtype,
)
f.create_dataset(
"instances", data=instances, maxshape=(None,), dtype=instance_dtype
)
f.create_dataset(
"frames", data=frames, maxshape=(None,), dtype=frame_dtype
)