GP2023_chirp_detection/code/behavior.py
2023-04-11 15:33:07 +02:00

323 lines
10 KiB
Python

import os
import os
import numpy as np
import matplotlib.pyplot as plt
from IPython import embed
from pandas import read_csv
from modules.logger import makeLogger
from scipy.ndimage import gaussian_filter1d
logger = makeLogger(__name__)
class Behavior:
"""Load behavior data from csv file as class attributes
Attributes
----------
behavior: 0: chasing onset, 1: chasing offset, 2: physical contact
behavior_type:
behavioral_category:
comment_start:
comment_stop:
dataframe: pandas dataframe with all the data
duration_s:
media_file:
observation_date:
observation_id:
start_s: start time of the event in seconds
stop_s: stop time of the event in seconds
total_length:
"""
def __init__(self, folder_path: str) -> None:
LED_on_time_BORIS = np.load(
os.path.join(folder_path, "LED_on_time.npy"), allow_pickle=True
)
self.time = np.load(
os.path.join(folder_path, "times.npy"), allow_pickle=True
)
csv_filename = [
f for f in os.listdir(folder_path) if f.endswith(".csv")
][
0
] # check if there are more than one csv file
self.dataframe = read_csv(os.path.join(folder_path, csv_filename))
self.chirps = np.load(
os.path.join(folder_path, "chirps.npy"), allow_pickle=True
)
self.chirps_ids = np.load(
os.path.join(folder_path, "chirps_ids.npy"), allow_pickle=True
)
for k, key in enumerate(self.dataframe.keys()):
key = key.lower()
if " " in key:
key = key.replace(" ", "_")
if "(" in key:
key = key.replace("(", "")
key = key.replace(")", "")
setattr(
self, key, np.array(self.dataframe[self.dataframe.keys()[k]])
)
last_LED_t_BORIS = LED_on_time_BORIS[-1]
real_time_range = self.time[-1] - self.time[0]
factor = 1.034141
shift = last_LED_t_BORIS - real_time_range * factor
self.start_s = (self.start_s - shift) / factor
self.stop_s = (self.stop_s - shift) / factor
"""
1 - chasing onset
2 - chasing offset
3 - physical contact event
temporal encpding needs to be corrected ... not exactly 25FPS.
### correspinding python code ###
factor = 1.034141
LED_on_time_BORIS = np.load(os.path.join(folder_path, 'LED_on_time.npy'), allow_pickle=True)
last_LED_t_BORIS = LED_on_time_BORIS[-1]
real_time_range = times[-1] - times[0]
shift = last_LED_t_BORIS - real_time_range * factor
data = pd.read_csv(os.path.join(folder_path, file[1:-7] + '.csv'))
boris_times = data['Start (s)']
data_times = []
for Cevent_t in boris_times:
Cevent_boris_times = (Cevent_t - shift) / factor
data_times.append(Cevent_boris_times)
data_times = np.array(data_times)
behavior = data['Behavior']
"""
def correct_chasing_events(
category: np.ndarray, timestamps: np.ndarray
) -> tuple[np.ndarray, np.ndarray]:
onset_ids = np.arange(len(category))[category == 0]
offset_ids = np.arange(len(category))[category == 1]
# Check whether on- or offset is longer and calculate length difference
if len(onset_ids) > len(offset_ids):
len_diff = len(onset_ids) - len(offset_ids)
longer_array = onset_ids
shorter_array = offset_ids
logger.info(f"Onsets are greater than offsets by {len_diff}")
elif len(onset_ids) < len(offset_ids):
len_diff = len(offset_ids) - len(onset_ids)
longer_array = offset_ids
shorter_array = onset_ids
logger.info(f"Offsets are greater than offsets by {len_diff}")
elif len(onset_ids) == len(offset_ids):
logger.info("Chasing events are equal")
return category, timestamps
# Correct the wrong chasing events; delete double events
wrong_ids = []
for i in range(len(longer_array) - (len_diff + 1)):
if (shorter_array[i] > longer_array[i]) & (
shorter_array[i] < longer_array[i + 1]
):
pass
else:
wrong_ids.append(longer_array[i])
longer_array = np.delete(longer_array, i)
category = np.delete(category, wrong_ids)
timestamps = np.delete(timestamps, wrong_ids)
return category, timestamps
def event_triggered_chirps(
event: np.ndarray,
chirps: np.ndarray,
time_before_event: int,
time_after_event: int,
) -> tuple[np.ndarray, np.ndarray]:
event_chirps = [] # chirps that are in specified window around event
centered_chirps = (
[]
) # timestamps of chirps around event centered on the event timepoint
for event_timestamp in event:
start = event_timestamp - time_before_event # timepoint of window start
stop = event_timestamp + time_after_event # timepoint of window ending
chirps_around_event = [
c for c in chirps if (c >= start) & (c <= stop)
] # get chirps that are in a -5 to +5 sec window around event
event_chirps.append(chirps_around_event)
if len(chirps_around_event) == 0:
continue
else:
centered_chirps.append(chirps_around_event - event_timestamp)
centered_chirps = np.concatenate(
centered_chirps, axis=0
) # convert list of arrays to one array for plotting
return event_chirps, centered_chirps
def main(datapath: str):
# behavior is pandas dataframe with all the data
bh = Behavior(datapath)
# chirps are not sorted in time (presumably due to prior groupings)
# get and sort chirps and corresponding fish_ids of the chirps
chirps = bh.chirps[np.argsort(bh.chirps)]
chirps_fish_ids = bh.chirps_ids[np.argsort(bh.chirps)]
category = bh.behavior
timestamps = bh.start_s
# Correct for doubles in chasing on- and offsets to get the right on-/offset pairs
# Get rid of tracking faults (two onsets or two offsets after another)
category, timestamps = correct_chasing_events(category, timestamps)
# split categories
chasing_onset = timestamps[category == 0]
chasing_offset = timestamps[category == 1]
physical_contact = timestamps[category == 2]
# First overview plot
fig1, ax1 = plt.subplots()
ax1.scatter(
chirps,
np.ones_like(chirps),
marker="*",
color="royalblue",
label="Chirps",
)
ax1.scatter(
chasing_onset,
np.ones_like(chasing_onset) * 2,
marker=".",
color="forestgreen",
label="Chasing onset",
)
ax1.scatter(
chasing_offset,
np.ones_like(chasing_offset) * 2.5,
marker=".",
color="firebrick",
label="Chasing offset",
)
ax1.scatter(
physical_contact,
np.ones_like(physical_contact) * 3,
marker="x",
color="black",
label="Physical contact",
)
plt.legend()
# plt.show()
plt.close()
# Get fish ids
fish_ids = np.unique(chirps_fish_ids)
##### Chasing triggered chirps CTC #####
# Evaluate how many chirps were emitted in specific time window around the chasing onset events
# Iterate over chasing onsets (later over fish)
time_around_event = 5 # time window around the event in which chirps are counted, 5 = -5 to +5 sec around event
#### Loop crashes at concatenate in function ####
# for i in range(len(fish_ids)):
# fish = fish_ids[i]
# chirps = chirps[chirps_fish_ids == fish]
# print(fish)
chasing_chirps, centered_chasing_chirps = event_triggered_chirps(
chasing_onset, chirps, time_around_event, time_around_event
)
physical_chirps, centered_physical_chirps = event_triggered_chirps(
physical_contact, chirps, time_around_event, time_around_event
)
# Kernel density estimation ???
# centered_chasing_chirps_convolved = gaussian_filter1d(centered_chasing_chirps, 5)
# centered_chasing = chasing_onset[0] - chasing_onset[0] ## get the 0 timepoint for plotting; set one chasing event to 0
offsets = [0.5, 1]
fig4, ax4 = plt.subplots(
figsize=(20 / 2.54, 12 / 2.54), constrained_layout=True
)
ax4.eventplot(
np.array([centered_chasing_chirps, centered_physical_chirps]),
lineoffsets=offsets,
linelengths=0.25,
colors=["g", "r"],
)
ax4.vlines(0, 0, 1.5, "tab:grey", "dashed", "Timepoint of event")
# ax4.plot(centered_chasing_chirps_convolved)
ax4.set_yticks(offsets)
ax4.set_yticklabels(["Chasings", "Physical \n contacts"])
ax4.set_xlabel("Time[s]")
ax4.set_ylabel("Type of event")
plt.show()
# Associate chirps to inidividual fish
fish1 = chirps[chirps_fish_ids == fish_ids[0]]
fish2 = chirps[chirps_fish_ids == fish_ids[1]]
fish = [len(fish1), len(fish2)]
### Plots:
# 1. All recordings, all fish, all chirps
# One CTC, one PTC
# 2. All recordings, only winners
# One CTC, one PTC
# 3. All recordings, all losers
# One CTC, one PTC
#### Chirp counts per fish general #####
fig2, ax2 = plt.subplots()
x = ["Fish1", "Fish2"]
width = 0.35
ax2.bar(x, fish, width=width)
ax2.set_ylabel("Chirp count")
# plt.show()
plt.close()
##### Count chirps emitted during chasing events and chirps emitted out of chasing events #####
chirps_in_chasings = []
for onset, offset in zip(chasing_onset, chasing_offset):
chirps_in_chasing = [c for c in chirps if (c > onset) & (c < offset)]
chirps_in_chasings.append(chirps_in_chasing)
# chirps out of chasing events
counts_chirps_chasings = 0
chasings_without_chirps = 0
for i in chirps_in_chasings:
if i:
chasings_without_chirps += 1
else:
counts_chirps_chasings += 1
# chirps in chasing events
fig3, ax3 = plt.subplots()
ax3.bar(
["Chirps in chasing events", "Chasing events without Chirps"],
[counts_chirps_chasings, chasings_without_chirps],
width=width,
)
plt.ylabel("Count")
# plt.show()
plt.close()
# comparison between chasing events with and without chirps
embed()
exit()
if __name__ == "__main__":
# Path to the data
datapath = "../data/mount_data/2020-05-13-10_00/"
datapath = "../data/mount_data/2020-05-13-10_00/"
main(datapath)