import numpy as np
from scipy import stats
from scipy import signal
from scipy.optimize import curve_fit
import glob
import yaml
import re
import matplotlib.pyplot as plt
import pickle
[docs]
def save_pkl(obj, path):
'''Function saving an object as a pickle file.
:param obj: python object (list, dictionary...) to be saved
:type obj: generic
:param path: path to the object to be saved
:type path: string
'''
with open(path, 'wb') as f:
pickle.dump(obj, f, pickle.HIGHEST_PROTOCOL)
[docs]
def load_pkl(path):
'''Function loading an object from a pickle file.
:param path: path to the object to be loaded
:type path: string
'''
with open(path, 'rb') as f:
return pickle.load(f)
[docs]
def readSpikes(file):
'''Function reading spike times in the format produced by the simulation
:param file: path to the file with spike times
:type file: string
'''
l = []
with open(file) as in_file:
for line in in_file.readlines():
n_list = [float(i) for i in line.split()]
if len(l) <= round(n_list[0]):
l.extend( [ [] for i in range( (round(n_list[0]) - len(l) + 1) ) ] )
l[round(n_list[0])].extend(n_list[1:])
for i in range(len(l)):
l[i] = np.array(l[i])
return l
[docs]
def newReadSpikes(file, n):
'''Function reading spike times in the format produced by the simulation
:param file: path to the file with spike times
:type file: string
'''
l = [ [] for i in range(n) ]
with open(file) as in_file:
for line in in_file.readlines():
n_list = [float(i) for i in line.split()]
if n <= round(n_list[0]):
print(f'ERROR: non coherent n: {n_list[0]}>={n}')
exit()
# print(round(n_list[0]), n, l)
l[round(n_list[0])].extend(n_list[1:])
for i in range(len(l)):
l[i] = np.array(l[i])
return l
[docs]
class SpikeSim:
'''Class loading and parsing files given by a simulation.
The main attributes are the simulation parameters and results:
* end_t: end time of simulation
* dt: time resolution of the simulation
* input_mode: external input mode:
- 0 (base mode): each neuron receives an indipendent poisson signal with mean frequency = SubNetwork::ext_in_rate
- 1 (not implemented)
* data: dictionary with spike times corresponding to each population; data['pop'] is a list of np.arrays each containing the activity of a neuron
* subnets: a list of the SubNetworks in the simulation
'''
def __init__(self, path, sim_fname, neglect_t, neglect_t_end=-1, config_fname=''):
'''Class constructor:
:param path: path to the directory with output simulation files and configuration files
:type path: string
:param sim_fname: name of the simulation configuration file (inside the directory matched by path)
:type sim_fname: string
:param neglect_t: time (in ms) to be neglected at the beginning of the simulation
:type sim_fname: float
:param neglect_t_end: time (in ms) to be neglected at the end of the simulation
:type sim_fname: float
:param config_fname: name of the subnets_config_yaml configuration file (inside the directory matched by path)
:type sim_fname: string
'''
self.input_dir = path
self.sim_filename = sim_fname
self.t_start = neglect_t
self.t_end = neglect_t_end
self.dt = 0
self.input_mode = 0
self.data = dict()
self.subnets = []
self.omegas = dict()
self.Ns = dict()
self.getParameterValues()
if config_fname!='':
with open(self.input_dir + '/' +config_fname) as file:
in_dict = yaml.load(file, Loader=yaml.FullLoader)
for d in in_dict:
self.omegas[d['name']] = d['osc_omega']
self.Ns[d['name']] = d['N']
self.loadData()
self.necglectTime()
[docs]
def getParameterValues(self):
'''Method initializing the simulation parameters'''
with open(self.input_dir + '/' +self.sim_filename) as file:
in_dict = yaml.load(file, Loader=yaml.FullLoader)
dict_t_end = in_dict['t_end']
if self.t_end < 0.:
self.t_end = dict_t_end
elif self.t_end > dict_t_end:
print(f'ERROR: t_end is too big: max = {dict_t_end}, passed = {self.t_end}')
exit()
self.dt = in_dict['dt']
self.input_mode = in_dict['input_mode']
if self.input_mode != 0:
paper_configfile = in_dict['input_mode_config']
if ( len(paper_configfile.split('/')) !=1 ):
paper_configfile = paper_configfile.split('/')[-1]
with open(self.input_dir + '/'+paper_configfile) as file_paper:
paper_dict = yaml.load(file_paper, Loader=yaml.FullLoader)
[docs]
def loadData(self):
'''Method loading spike times for each SubNetwork'''
subnets_files = glob.glob(self.input_dir + '/*_spikes.txt')
for f in subnets_files:
pop = re.split('/|_', f)[-2]
self.data[pop] = newReadSpikes(f, self.Ns[pop])
self.subnets.append(pop)
self.subnets = sorted(self.subnets)
[docs]
def necglectTime(self):
'''Method removing the spikes occurring before t_start and after t_end (if > 0)'''
for i in self.data:
for j in range( len(self.data[i]) ):
if self.t_end < 0.:
self.data[i][j] = self.data[i][j][self.data[i][j]>self.t_start] - self.t_start
else:
self.data[i][j] = self.data[i][j][ np.logical_and( self.data[i][j]>self.t_start, self.data[i][j]<self.t_end ) ] - self.t_start
[docs]
def info(self):
'''Method printing the simulation parameters'''
print('Simulation data from: ' + self.input_dir)
print('\t simulation config file: ' + self.sim_filename)
print('\t subnets in the network: ', self.subnets)
print(f'\t t_start = {self.t_start} ms')
print(f'\t t_end = {self.t_end} ms')
print(f'\t dt = {self.dt} ms')
print(f'\t input_mode = {self.input_mode}')
def saveData(self, path):
save_pkl(self.data, path)
[docs]
def histogram(self, pop = '', res=1., save_img=''):
'''Method showing or saving the spiking activity of a given subnet
:param pop: desidered population; if 'all' is passed all population are showed.
:type pop: string
:param res: time width of each bin in the histogram
:type pop: float
:param save_img: path and name of the file to be saved
:type save_img: string
'''
pop_passed = True
if pop == '':
pop_passed = False
if pop.lower() == 'all':
plt.figure()
for i,p in enumerate(sorted(self.subnets)):
print(p)
l = len(self.subnets)
cols = 1 if l==1 else 2
rows = round(l/cols) if (l%2==0 or l==1) else int(l/cols)+1
try:
plt.subplot(rows,cols, i+1)
plt.ylabel(p)
plt.xlabel('t [ms]')
plt.hist( np.concatenate(self.data[p]), bins=int((self.t_end - self.t_start)/res) )
except Exception as e: print(e)
# plt.tight_layout()
# plt.savefig(self.input_dir+'/activity.png', dpi=500)
plt.show()
else:
while True:
if pop == '':
pop = input('histogram: enter subnetwork: ')
if pop.lower() == 'stop':
break
if not pop in self.subnets:
print(f'No subnet with name "{pop}", try again...')
pop = ''
continue
plt.hist(np.concatenate(self.data[pop]), bins=int((self.t_end - self.t_start)/res))
if save_img == '':
plt.show()
else:
plt.savefig(save_img, dpi = 500)
plt.close()
if pop_passed: break
else: pop = ''
[docs]
def MeanActivity(self):
'''Method computing the mean spiking activity of the subnets'''
MAct = []
Ns = []
MActPerN = []
for s in self.subnets:
if len(self.data[s]) >0:
counts = len( np.concatenate(self.data[s]) )
else: counts = 0
MAct.append( counts/(self.t_end - self.t_start) )
n = len(self.data[s])
Ns.append(n)
if n > 0:
MActPerN.append(counts/n/(self.t_end - self.t_start))
else:
MActPerN.append(0)
return {self.subnets[i] : [ MAct[i], Ns[i], MActPerN[i] ] for i in range(len(self.subnets))}
def cartesian_to_cilindric(x,y,z, xv=0.,yv=-50.):
r = np.sqrt((x-xv)**2+(y-yv)**2)
if y>yv: # I/II quad (I:[0,pi/2], II:[pi/2,pi])
if abs(x-xv) < 1e-8:
theta = 1/2*np.pi
else:
theta = np.arctan((y-yv)/(x-xv))
if theta<0:
theta = np.pi+theta
return r,theta,z
else: # III/IV quad (III:[pi,3pi/2], IV:[-pi/2,0])
if abs(x-xv) < 1e-8:
theta = 3/2*np.pi
else:
theta = np.arctan((y-yv)/(x-xv))
if theta>0:
theta = np.pi+theta
return r,theta,z