"""
Adaptor for reading DeepLabCut datasets.
This can either read a CSV file with labeled frames for a single video,
or a YAML file which potentially contains multiple videos.
The adaptor was created by manually inspecting DeepLabCut files and there's no
guarantee that it will perfectly import all data (especially metadata).
If the adaptor can find full video files for the annotated frames, then the
full videos will be used in the resulting SLEAP dataset. Otherwise, we'll
create a video object which wraps the individual frame images.
"""
import os
import re
import yaml
import numpy as np
import pandas as pd
from typing import List, Optional
from sleap import Labels, Video, Skeleton
from sleap.instance import Instance, LabeledFrame, Point
from sleap.util import find_files_by_suffix
from .adaptor import Adaptor, SleapObjectType
from .filehandle import FileHandle
[docs]class LabelsDeepLabCutCsvAdaptor(Adaptor):
"""
Reads DeepLabCut csv file with labeled frames for single video.
"""
@property
def handles(self):
return SleapObjectType.labels
@property
def default_ext(self):
return "csv"
@property
def all_exts(self):
return ["csv"]
@property
def name(self):
return "DeepLabCut Dataset CSV"
[docs] def can_read_file(self, file: FileHandle):
if not self.does_match_ext(file.filename):
return False
# TODO: add checks for valid deeplabcut csv
return True
[docs] def can_write_filename(self, filename: str):
return False
[docs] def does_read(self) -> bool:
return True
[docs] def does_write(self) -> bool:
return False
[docs] @classmethod
def read(
cls, file: FileHandle, full_video: Optional[Video] = None, *args, **kwargs,
) -> Labels:
return Labels(labeled_frames=cls.read_frames(file, full_video, *args, **kwargs))
[docs] @classmethod
def make_video_for_image_list(cls, image_dir, filenames) -> Video:
"""Creates a Video object from frame images."""
# the image filenames in the csv may not match where the user has them
# so we'll change the directory to match where the user has the csv
def fix_img_path(img_dir, img_filename):
img_filename = img_filename.replace("\\", "/")
img_filename = os.path.basename(img_filename)
img_filename = os.path.join(img_dir, img_filename)
return img_filename
filenames = list(map(lambda f: fix_img_path(image_dir, f), filenames))
return Video.from_image_filenames(filenames)
@classmethod
def read_frames(
cls,
file: FileHandle,
skeleton: Optional[Skeleton] = None,
full_video: Optional[Video] = None,
*args,
**kwargs,
) -> List[LabeledFrame]:
filename = file.filename
# Read CSV file.
data = pd.read_csv(filename, header=[1, 2])
# Check if this is in the new multi-animal format.
is_multianimal = data.columns[0][0] == "individuals"
if is_multianimal:
# Reload with additional header rows if using new format.
data = pd.read_csv(filename, header=[1, 2, 3])
# Pull out animal and node names from the columns.
animal_names = []
node_names = []
for animal_name, node_name, _ in data.columns[1:][::2]:
if animal_name not in animal_names:
animal_names.append(animal_name)
if node_name not in node_names:
node_names.append(node_name)
else:
# Create the skeleton from the list of nodes in the csv file.
# Note that DeepLabCut doesn't have edges, so these will need to be
# added by user later.
node_names = [n[0] for n in list(data)[1::2]]
if skeleton is None:
skeleton = Skeleton()
skeleton.add_nodes(node_names)
# Get list of all images filenames.
img_files = data.iloc[:, 0]
if full_video:
video = full_video
index_frames_by_original_index = True
else:
# Create the Video object
img_dir = os.path.dirname(filename)
video = cls.make_video_for_image_list(img_dir, img_files)
# The frames in the video we created will be indexed from 0 to N
# rather than having their index from the original source video.
index_frames_by_original_index = False
lfs = []
for i in range(len(data)):
# Figure out frame index to use.
if index_frames_by_original_index:
# Extract "0123" from "path/img0123.png" as original frame index.
frame_idx_match = re.search("(?<=img)(\\d+)(?=\\.png)", img_files[i])
if frame_idx_match is not None:
frame_idx = int(frame_idx_match.group(0))
else:
raise ValueError(
f"Unable to determine frame index for image {img_files[i]}"
)
else:
frame_idx = i
instances = []
if is_multianimal:
for animal_name in animal_names:
any_not_missing = False
# Get points for each node.
instance_points = dict()
for node in node_names:
x, y = (
data[(animal_name, node, "x")][i],
data[(animal_name, node, "y")][i],
)
instance_points[node] = Point(x, y)
if ~(np.isnan(x) and np.isnan(y)):
any_not_missing = True
if any_not_missing:
# Create instance with points.
instances.append(
Instance(skeleton=skeleton, points=instance_points)
)
else:
# Get points for each node.
instance_points = dict()
for node in node_names:
x, y = data[(node, "x")][i], data[(node, "y")][i]
instance_points[node] = Point(x, y)
# Create instance with points assuming there's a single instance per
# frame.
instances.append(Instance(skeleton=skeleton, points=instance_points))
# Create LabeledFrame and add it to list.
lfs.append(
LabeledFrame(video=video, frame_idx=frame_idx, instances=instances)
)
return lfs
[docs]class LabelsDeepLabCutYamlAdaptor(Adaptor):
@property
def handles(self):
return SleapObjectType.labels
@property
def default_ext(self):
return "yaml"
@property
def all_exts(self):
return ["yaml", "yml"]
@property
def name(self):
return "DeepLabCut Dataset YAML"
[docs] def can_read_file(self, file: FileHandle):
if not self.does_match_ext(file.filename):
return False
if "video_sets" not in file.text:
return False
return True
[docs] def can_write_filename(self, filename: str):
return False
[docs] def does_read(self) -> bool:
return True
[docs] def does_write(self) -> bool:
return False
[docs] @classmethod
def read(cls, file: FileHandle, *args, **kwargs,) -> Labels:
filename = file.filename
# Load data from the YAML file
project_data = yaml.load(file.text, Loader=yaml.SafeLoader)
# Create skeleton which we'll use for each video
skeleton = Skeleton()
skeleton.add_nodes(project_data["bodyparts"])
# Get subdirectories of videos and labeled data
root_dir = os.path.dirname(filename)
videos_dir = os.path.join(root_dir, "videos")
labeled_data_dir = os.path.join(root_dir, "labeled-data")
with os.scandir(labeled_data_dir) as file_iterator:
data_subdirs = [file.path for file in file_iterator if file.is_dir()]
labeled_frames = []
# Each subdirectory of labeled data corresponds to a video.
# We'll go through each and import the labeled frames.
for data_subdir in data_subdirs:
csv_files = find_files_by_suffix(
data_subdir, prefix="CollectedData", suffix=".csv"
)
if csv_files:
csv_path = csv_files[0]
# Try to find a full video corresponding to this subdir.
# If subdirectory is foo, we look for foo.mp4 in videos dir.
shortname = os.path.split(data_subdir)[-1]
video_path = os.path.join(videos_dir, f"{shortname}.mp4")
if os.path.exists(video_path):
video = Video.from_filename(video_path)
else:
# When no video is found, the individual frame images
# stored in the labeled data subdir will be used.
print(
f"Unable to find {video_path} so using individual frame images."
)
video = None
# Import the labeled fraems
labeled_frames.extend(
LabelsDeepLabCutCsvAdaptor.read_frames(
FileHandle(csv_path), full_video=video, skeleton=skeleton
)
)
else:
print(f"No csv data file found in {data_subdir}")
return Labels(labeled_frames=labeled_frames)