Source code for sleap.io.legacy

"""
Module for legacy LEAP dataset.
"""
import json
import os
import numpy as np
import pandas as pd

from typing import List

from sleap.util import json_loads
from sleap.io.video import Video

from sleap.instance import (
    LabeledFrame,
    PredictedPoint,
    PredictedInstance,
    Track,
    Point,
    Instance,
)
from sleap.skeleton import Skeleton


[docs]def load_predicted_labels_json_old( data_path: str, parsed_json: dict = None, adjust_matlab_indexing: bool = True, fix_rel_paths: bool = True, ) -> List[LabeledFrame]: """ Load predicted instances from Talmo's old JSON format. Args: data_path: The path to the JSON file. parsed_json: The parsed json if already loaded, so we can save some time if already parsed. adjust_matlab_indexing: Whether to adjust indexing from MATLAB. fix_rel_paths: Whether to fix paths to videos to absolute paths. Returns: List of :class:`LabeledFrame` objects. """ if parsed_json is None: data = json.loads(open(data_path).read()) else: data = parsed_json videos = pd.DataFrame(data["videos"]) predicted_instances = pd.DataFrame(data["predicted_instances"]) predicted_points = pd.DataFrame(data["predicted_points"]) if adjust_matlab_indexing: predicted_instances.frameIdx -= 1 predicted_points.frameIdx -= 1 predicted_points.node -= 1 predicted_points.x -= 1 predicted_points.y -= 1 skeleton = Skeleton() skeleton.add_nodes(data["skeleton"]["nodeNames"]) edges = data["skeleton"]["edges"] if adjust_matlab_indexing: edges = np.array(edges) - 1 for (src_idx, dst_idx) in edges: skeleton.add_edge( data["skeleton"]["nodeNames"][src_idx], data["skeleton"]["nodeNames"][dst_idx], ) if fix_rel_paths: for i, row in videos.iterrows(): p = row.filepath if not os.path.exists(p): p = os.path.join(os.path.dirname(data_path), p) if os.path.exists(p): videos.at[i, "filepath"] = p # Make the video objects video_objects = {} for i, row in videos.iterrows(): if videos.at[i, "format"] == "media": vid = Video.from_media(videos.at[i, "filepath"]) else: vid = Video.from_hdf5( filename=videos.at[i, "filepath"], dataset=videos.at[i, "dataset"] ) video_objects[videos.at[i, "id"]] = vid track_ids = predicted_instances["trackId"].values unique_track_ids = np.unique(track_ids) spawned_on = { track_id: predicted_instances.loc[predicted_instances["trackId"] == track_id][ "frameIdx" ].values[0] for track_id in unique_track_ids } tracks = { i: Track(name=str(i), spawned_on=spawned_on[i]) for i in np.unique(predicted_instances["trackId"].values).tolist() } # A function to get all the instances for a particular video frame def get_frame_predicted_instances(video_id, frame_idx): points = predicted_points is_in_frame = (points["videoId"] == video_id) & ( points["frameIdx"] == frame_idx ) if not is_in_frame.any(): return [] instances = [] frame_instance_ids = np.unique(points["instanceId"][is_in_frame]) for i, instance_id in enumerate(frame_instance_ids): is_instance = is_in_frame & (points["instanceId"] == instance_id) track_id = predicted_instances.loc[ predicted_instances["id"] == instance_id ]["trackId"].values[0] match_score = predicted_instances.loc[ predicted_instances["id"] == instance_id ]["matching_score"].values[0] track_score = predicted_instances.loc[ predicted_instances["id"] == instance_id ]["tracking_score"].values[0] instance_points = { data["skeleton"]["nodeNames"][n]: PredictedPoint( x, y, visible=v, score=confidence ) for x, y, n, v, confidence in zip( *[ points[k][is_instance] for k in ["x", "y", "node", "visible", "confidence"] ] ) } instance = PredictedInstance( skeleton=skeleton, points=instance_points, track=tracks[track_id], score=match_score, ) instances.append(instance) return instances # Get the unique labeled frames and construct a list of LabeledFrame objects for them. frame_keys = list( { (videoId, frameIdx) for videoId, frameIdx in zip( predicted_points["videoId"], predicted_points["frameIdx"] ) } ) frame_keys.sort() labels = [] for videoId, frameIdx in frame_keys: label = LabeledFrame( video=video_objects[videoId], frame_idx=frameIdx, instances=get_frame_predicted_instances(videoId, frameIdx), ) labels.append(label) return labels
[docs]def load_labels_json_old( data_path: str, parsed_json: dict = None, adjust_matlab_indexing: bool = True, fix_rel_paths: bool = True, ) -> List[LabeledFrame]: """ Load predicted instances from Talmo's old JSON format. Args: data_path: The path to the JSON file. parsed_json: The parsed json if already loaded, so we can save some time if already parsed. adjust_matlab_indexing: Whether to adjust indexing from MATLAB. fix_rel_paths: Whether to fix paths to videos to absolute paths. Returns: A newly constructed Labels object. """ if parsed_json is None: data = json_loads(open(data_path).read()) else: data = parsed_json videos = pd.DataFrame(data["videos"]) instances = pd.DataFrame(data["instances"]) points = pd.DataFrame(data["points"]) predicted_instances = pd.DataFrame(data["predicted_instances"]) predicted_points = pd.DataFrame(data["predicted_points"]) if adjust_matlab_indexing: instances.frameIdx -= 1 points.frameIdx -= 1 predicted_instances.frameIdx -= 1 predicted_points.frameIdx -= 1 points.node -= 1 predicted_points.node -= 1 points.x -= 1 predicted_points.x -= 1 points.y -= 1 predicted_points.y -= 1 skeleton = Skeleton() skeleton.add_nodes(data["skeleton"]["nodeNames"]) edges = data["skeleton"]["edges"] if adjust_matlab_indexing: edges = np.array(edges) - 1 for (src_idx, dst_idx) in edges: skeleton.add_edge( data["skeleton"]["nodeNames"][src_idx], data["skeleton"]["nodeNames"][dst_idx], ) if fix_rel_paths: for i, row in videos.iterrows(): p = row.filepath if not os.path.exists(p): p = os.path.join(os.path.dirname(data_path), p) if os.path.exists(p): videos.at[i, "filepath"] = p # Make the video objects video_objects = {} for i, row in videos.iterrows(): if videos.at[i, "format"] == "media": vid = Video.from_media(videos.at[i, "filepath"]) else: vid = Video.from_hdf5( filename=videos.at[i, "filepath"], dataset=videos.at[i, "dataset"] ) video_objects[videos.at[i, "id"]] = vid # A function to get all the instances for a particular video frame def get_frame_instances(video_id, frame_idx): """ """ is_in_frame = (points["videoId"] == video_id) & ( points["frameIdx"] == frame_idx ) if not is_in_frame.any(): return [] instances = [] frame_instance_ids = np.unique(points["instanceId"][is_in_frame]) for i, instance_id in enumerate(frame_instance_ids): is_instance = is_in_frame & (points["instanceId"] == instance_id) instance_points = { data["skeleton"]["nodeNames"][n]: Point(x, y, visible=v) for x, y, n, v in zip( *[points[k][is_instance] for k in ["x", "y", "node", "visible"]] ) } instance = Instance(skeleton=skeleton, points=instance_points) instances.append(instance) return instances # Get the unique labeled frames and construct a list of LabeledFrame objects for them. frame_keys = list( { (videoId, frameIdx) for videoId, frameIdx in zip(points["videoId"], points["frameIdx"]) } ) frame_keys.sort() labels = [] for videoId, frameIdx in frame_keys: label = LabeledFrame( video=video_objects[videoId], frame_idx=frameIdx, instances=get_frame_instances(videoId, frameIdx), ) labels.append(label) return labels