-
Notifications
You must be signed in to change notification settings - Fork 6
Description
I am trying to use the kl_divergence score to compare distributions from two sets of spiking data from two simulators (NEST and PyNN). Since the model I am investigating has multiple distinct populations, I want to compare the distributions of these populations separately. For this, I am using the joint_test example together with a label_test that checks for spiketrains to be from the same population and a conventional test (e.g. firing rate, cv isi, ...). However, this is currently not possible, even though the joint_test only produces a single score and not multiple ones, while using wasserstein_distance works.
A first attempt at rectifying this was adding code that ensures that the layer_prediction and layer_observation are 1d (see below), but this did resulted in the same error which I append at the end.
Here is a reduced code example (sorry, still quite long), provided that one has some spiking data at hand that can be separated according to their annotation:
class fr_test(tests.firing_rate_test):
name = 'FR'
class label_test(tests.two_sample_test):
name = 'Label Test'
params = {'label_key': 'source_population'}
def generate_prediction(self, model, **kwargs):
labels = self.get_prediction(model)
if labels is None:
if kwargs:
self.params.update(kwargs)
spiketrains = model.produce_spiketrains(**self.params)
labels = np.array([st.annotations[self.params['label_key']]
for st in spiketrains])
label_ids = np.zeros((len(labels)))
if hasattr(self, "label_mapping"):
max_id = max(self.label_mapping.values())
else:
self.label_mapping = {}
max_id = -1
for label in np.unique(labels):
if label in self.label_mapping:
id = self.label_mapping[label]
else:
id = max_id + 1
max_id += 1
self.label_mapping[label] = id
label_ids[labels == label] = id
self.set_prediction(model, label_ids)
return label_ids
class joint_test(tests.TestM2M, tests.joint_test):
score_type = scores.kl_divergence
test_list = [fr_test,
label_test
]
test_params = [{},
{'label_key': 'source_population'}
]
def compute_score(self, observation, prediction):
"""Generates a score given the observations provided in the constructor
and the prediction generated by generate_prediction.
Must generate a score of score_type.
No default implementation.
"""
if not hasattr(self, 'score_type') or \
not hasattr(self.score_type, 'compute'):
raise NotImplementedError(("Test %s either implements no "
"compute_score method or provides no "
"score_type with a compute method.")
% self.name)
# separate predictions according to labels
label_test_list = [i for i, t in enumerate(self.test_list)
if t.name == 'Label Test']
if len(label_test_list) == 1:
label_test_idx = label_test_list[0]
pred_labels = prediction[label_test_idx]
prediction = np.delete(prediction, label_test_idx, axis=0)
obsv_labels = observation[label_test_idx]
observation = np.delete(observation, label_test_idx, axis=0)
# compute score for each label separately
scores = []
label_intersection = np.intersect1d(pred_labels, obsv_labels)
for label in np.unique(label_intersection):
layer_prediction = prediction[:,
np.where(pred_labels == label)[0]]
layer_observation = observation[:,
np.where(obsv_labels == label)[0]]
if layer_prediction.shape[0] == 1:
layer_prediction = layer_prediction[0]
if layer_observation.shape[0] == 1:
layer_observation = layer_observation[0]
scores.append(self.score_type.compute(layer_observation,
layer_prediction))
# merge scores
mean_score = np.mean([s.score for s in scores])
score = self.score_type(mean_score)
for attribute in scores[0].__dict__:
if attribute != 'score':
score_attr = [getattr(s, attribute) for s in scores]
setattr(score, attribute, score_attr)
mapping = self.test_inst[label_test_idx].label_mapping
inverse_mapping = dict([(value, key)
for key, value in mapping.items()])
score.labels = [inverse_mapping[i]
for i in np.unique(label_intersection)]
else:
s.score
return score
class LoadNest(models.spiketrain_data):
file_path = ''
params = {}
def load(self, file_path, simulator, **kwargs):
return list(pd.read_hdf(file_path))
class LoadPyNN(models.spiketrain_data):
file_path = ''
params = {}
def load(self, file_path, simulator, **kwargs):
return list(pd.read_hdf(file_path))
class NestSim(LoadNest):
file_path = os.path.join(PyNEST_path, 'some_file.h5')
params = copy(LoadNest.params)
params.update(color='#ee7733', simulator='Nest')
class PyNNSim(LoadPyNN):
file_path = os.path.join(PyNN_path, 'some_file.h5')
params = copy(LoadPyNN.params)
params.update(color='#01796f', simulator='PyNN')
Nest = NestSim(name='Nest')
PyNN = PyNNSim(name='PyNN')
joint = joint_test()
judge = joint.judge(models=[PyNN, Nest])
Error thrown:
judge = joint.judge(models=[PyNN, Nest])
/p/home/jusers/albers2/jureca/.local/lib/python3.8/site-packages/elephant/statistics.py:334: UserWarning: The input size is too small. Please providean input with more than 1 entry. Returning `NaN`since the argument `with_nan` is `True`
warnings.warn("The input size is too small. Please provide"
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/p/project/cjinb33/albers2/projects/mam_benchmarking/PyNN/validation/validate.py in <module>
----> 1 judge = joint.judge(models=[PyNN, Nest])
~/.local/lib/python3.8/site-packages/sciunit/tests.py in judge(self, models, skip_incapable, stop_on_error, deep_error, only_lower_triangle)
861 scores[i][j] = scores[j][i]
862 else:
--> 863 scores[i][j] = self._judge(
864 predictions[i], predictions[j], model1, model2
865 )
~/.local/lib/python3.8/site-packages/sciunit/tests.py in _judge(self, prediction1, prediction2, model1, model2)
706
707 # 6.
--> 708 score = self.compute_score(prediction1, prediction2)
709 if self.converter:
710 score = self.converter.convert(score)
/p/project/cjinb33/albers2/projects/mam_benchmarking/PyNN/validation/validate.py in compute_score(self, observation, prediction)
177 layer_observation = observation[:,
178 np.where(obsv_labels == label)[0]]
--> 179 scores.append(self.score_type.compute(layer_observation,
180 layer_prediction))
181
/p/project/cjinb33/albers2/repositories/NetworkUnit/networkunit/scores/kl_divergence.py in compute(self, data_sample_1, data_sample_2, kl_binsize, **kwargs)
41 min_value = min([min(sample1),min(sample2)])
42 bins = (max_value - min_value) / kl_binsize
---> 43 edges = np.linspace(min_value, max_value, bins)
44
45 P, edges = np.histogram(sample1, bins=edges, density=True)
<__array_function__ internals> in linspace(*args, **kwargs)
/p/software/jurecadc/stages/2020/software/SciPy-Stack/2020-gcccoremkl-9.3.0-2020.2.254-Python-3.8.5/lib/python3.8/site-packages/numpy-1.19.1-py3.8-linux-x86_64.egg/numpy/core/function_base.py in linspace(start, stop, num, endpoint, retstep, dtype, axis)
111
112 """
--> 113 num = operator.index(num)
114 if num < 0:
115 raise ValueError("Number of samples, %s, must be non-negative." % num)
TypeError: 'numpy.float64' object cannot be interpreted as an integer