From bc67f981e1ff783575a0b574229e45387647dd16 Mon Sep 17 00:00:00 2001 From: mikael Date: Thu, 20 Jun 2019 17:36:58 +0200 Subject: [PATCH] Build example identifiers with several processes #59 --- dl1_data_handler/reader.py | 261 ++++++++++++++++++++++--------------- 1 file changed, 158 insertions(+), 103 deletions(-) diff --git a/dl1_data_handler/reader.py b/dl1_data_handler/reader.py index b7d48059..01716003 100644 --- a/dl1_data_handler/reader.py +++ b/dl1_data_handler/reader.py @@ -1,6 +1,7 @@ from collections import OrderedDict import random import threading +import multiprocessing as mp import numpy as np import tables @@ -34,12 +35,12 @@ def __init__(self, shuffle=False, seed=None, image_channels=None, - mapping_method=None, mapping_settings=None, array_info=None, event_info=None, transforms=None, - validate_processor=False + validate_processor=False, + num_workers=1 ): # Construct dict of filename:file_handle pairs @@ -60,110 +61,59 @@ def __init__(self, self.example_identifiers = None self.telescopes = {} - if selected_telescope_ids is None: - selected_telescope_ids = {} + self.tel_type = None - if event_selection is None: - event_selection = {} + if selected_telescope_ids is not None: + self.selected_telescope_ids = selected_telescope_ids + else: + self.selected_telescope_ids = {} + + self.selected_telescope_type = selected_telescope_type + + if event_selection is not None: + self.event_selection = event_selection + else: + self.event_selection = {} - if image_selection is None: - image_selection = {} + if image_selection is not None: + self.image_selection = image_selection + else: + self.image_selection = {} + + self.selection_string = selection_string if mapping_settings is None: mapping_settings = {} - # Loop over the files to assemble the selected event identifiers - for filename, f in self.files.items(): - example_identifiers = [] - - # Get dict of all the tel_types in the file mapped to their tel_ids - telescopes = {} - for row in f.root.Array_Information: - tel_type = row['type'].decode() - if tel_type not in telescopes: - telescopes[tel_type] = [] - telescopes[tel_type].append(row['id']) - - # Enforce an automatic minimal telescope selection cut: - # there must be at least one triggered telescope of a - # selected type in the event - # Users can include stricter cuts in the selection string - if self.mode in ['mono', 'stereo']: - if selected_telescope_type is None: - # Default: use the first tel type in the file - default = f.root.Array_Information[0]['type'].decode() - selected_telescope_type = default - self.tel_type = selected_telescope_type - selected_tel_types = [selected_telescope_type] - elif self.mode == 'multi-stereo': - if selected_telescope_type is None: - # Default: use all tel types - selected_telescope_type = list(telescopes) - self.tel_type = None - selected_tel_types = selected_telescope_type - multiplicity_conditions = ['(' + tel_type + '_multiplicity > 0)' - for tel_type in selected_tel_types] - tel_cut_string = '(' + ' | '.join(multiplicity_conditions) + ')' - # Combine minimal telescope cut with explicit selection cuts - if selection_string: - cut_condition = selection_string + ' & ' + tel_cut_string - else: - cut_condition = tel_cut_string - - # Select which telescopes from the full dataset to include in each - # event by a telescope type and an optional list of telescope ids. - selected_telescopes = {} - for tel_type in selected_tel_types: - available_tel_ids = telescopes[tel_type] - # Keep only the selected tel ids for the tel type - if tel_type in selected_telescope_ids: - # Check all requested telescopes are available to select - requested_tel_ids = selected_telescope_ids[tel_type] - invalid_tel_ids = (set(requested_tel_ids) - - set(available_tel_ids)) - if invalid_tel_ids: - raise ValueError("Tel ids {} are not a valid selection" - "for tel type '{}'".format( - invalid_tel_ids, tel_type)) - selected_telescopes[tel_type] = requested_tel_ids - else: - selected_telescopes[tel_type] = available_tel_ids - - selected_nrows = set([row.nrow for row - in f.root.Events.where(cut_condition)]) - selected_nrows &= self._select_event(f, event_selection) - selected_nrows = list(selected_nrows) - - # Make list of identifiers of all examples passing event selection - if self.mode in ['stereo', 'multi-stereo']: - example_identifiers = [(filename, nrow) for nrow - in selected_nrows] - elif self.mode == 'mono': - example_identifiers = [] - field = '{}_indices'.format(self.tel_type) - selected_indices = f.root.Events.read_coordinates(selected_nrows, field=field) - for tel_id in selected_telescopes[self.tel_type]: - img_ids = set(selected_indices[:, telescopes[self.tel_type].index(tel_id)]) - img_ids.remove(0) - img_ids = list(img_ids) - # TODO handle all selected channels - mask = self._select_image(f.root[self.tel_type][img_ids]['charge'], image_selection) - img_ids = np.array(img_ids)[mask] - for index in img_ids: - example_identifiers.append((filename, index, tel_id)) - - # Confirm that the files are consistent and merge them - if not self.telescopes: - self.telescopes = telescopes - if self.telescopes != telescopes: - raise ValueError("Inconsistent telescope definition in " - "{}".format(filename)) - self.selected_telescopes = selected_telescopes + # Load telescope information from first file + first_file = list(self.files)[0] + self.telescopes, self.tel_type, self.selected_telescopes = self._load_telescope_data(self.files[first_file], + self.selected_telescope_type, + self.selected_telescope_ids) - if self.example_identifiers is None: - self.example_identifiers = example_identifiers - else: - self.example_identifiers.extend(example_identifiers) + file_queue = mp.Queue() + # Loop over the files to assemble the selected event identifiers + for filename in list(self.files): + file_queue.put(filename) + # Create shared variables + manager = mp.Manager() + ex_identifiers_mp = manager.list() + # Run processes + if num_workers > 0: + num_workers = num_workers + else: + num_workers = 1 + workers = [mp.Process(target=self._create_example_identifiers, + args=(file_queue, + ex_identifiers_mp, + )) for _ in range(num_workers)] + for w in workers: + w.start() + file_queue.close() + for w in workers: + w.join() + + self.example_identifiers = list(ex_identifiers_mp) # Shuffle the examples if shuffle: @@ -196,7 +146,7 @@ def __init__(self, } ] for col_name in self.array_info: - col = f.root.Array_Information.cols._f_col(col_name) + col = self.files[first_file].root.Array_Information.cols._f_col(col_name) self.unprocessed_example_description.append( { 'name': col_name, @@ -226,7 +176,7 @@ def __init__(self, } ] for col_name in self.array_info: - col = f.root.Array_Information.cols._f_col(col_name) + col = self.files[first_file].root.Array_Information.cols._f_col(col_name) self.unprocessed_example_description.append( { 'name': col_name, @@ -258,7 +208,7 @@ def __init__(self, } ]) for col_name in self.array_info: - col = f.root.Array_Information.cols._f_col(col_name) + col = self.files[first_file].root.Array_Information.cols._f_col(col_name) self.unprocessed_example_description.append( { 'name': tel_type + '_' + col_name, @@ -270,7 +220,7 @@ def __init__(self, ) # Add event info to description for col_name in self.event_info: - col = f.root.Events.cols._f_col(col_name) + col = self.files[first_file].root.Events.cols._f_col(col_name) self.unprocessed_example_description.append( { 'name': col_name, @@ -291,6 +241,111 @@ def __init__(self, # Definition of preprocessed example self.example_description = self.processor.output_description + def _load_telescope_data(self, file, selected_telescope_type, selected_telescope_ids): + # Get dict of all the tel_types in the file mapped to their tel_ids + telescopes = {} + for row in file.root.Array_Information: + t_type = row['type'].decode() + if t_type not in telescopes: + telescopes[t_type] = [] + telescopes[t_type].append(row['id']) + + tel_type = None + if self.mode in ['mono', 'stereo']: + if selected_telescope_type is None: + # Default: use the first tel type in the file + default = file.root.Array_Information[0]['type'].decode() + tel_type = default + else: + tel_type = selected_telescope_type + selected_tel_types = [tel_type] + elif self.mode == 'multi-stereo': + if selected_telescope_type is None: + # Default: use all tel types + selected_tel_types = list(telescopes) + else: + selected_tel_types = selected_telescope_type + tel_type = None + + # Select which telescopes from the full dataset to include in each + # event by a telescope type and an optional list of telescope ids. + selected_telescopes = {} + for tel_type in selected_tel_types: + available_tel_ids = telescopes[tel_type] + # Keep only the selected tel ids for the tel type + if tel_type in selected_telescope_ids: + # Check all requested telescopes are available to select + requested_tel_ids = selected_telescope_ids[tel_type] + invalid_tel_ids = (set(requested_tel_ids) + - set(available_tel_ids)) + if invalid_tel_ids: + raise ValueError("Tel ids {} are not a valid selection" + "for tel type '{}'".format( + invalid_tel_ids, tel_type)) + selected_telescopes[tel_type] = requested_tel_ids + else: + selected_telescopes[tel_type] = available_tel_ids + + return telescopes, tel_type, selected_telescopes + + def _check_telescope_consistency(self, telescopes, tel_type, selected_telescopes): + assert self.telescopes == telescopes, 'Files inconsistent' + assert self.tel_type == tel_type, 'Files inconsistent' + assert self.selected_telescopes == selected_telescopes, 'Files inconsistent' + + def _create_example_identifiers(self, file_queue, ex_identifiers_mp): + + while True: + if file_queue.empty(): + break + else: + filename = file_queue.get() + f = self.files[filename] + example_identifiers = [] + telescopes, tel_type, selected_telescopes = self._load_telescope_data(f, + self.selected_telescope_type, + self.selected_telescope_ids + ) + + # Enforce an automatic minimal telescope selection cut: + # there must be at least one triggered telescope of a + # selected type in the event + # Users can include stricter cuts in the selection string + multiplicity_conditions = ['(' + tel_type + '_multiplicity > 0)' + for tel_type in list(selected_telescopes)] + tel_cut_string = '(' + ' | '.join(multiplicity_conditions) + ')' + # Select events + # Combine minimal telescope cut with explicit selection cuts + if self.selection_string is not None: + cut_condition = self.selection_string + ' & ' + tel_cut_string + else: + cut_condition = tel_cut_string + selected_nrows = set([row.nrow for row + in f.root.Events.where(cut_condition)]) + selected_nrows &= self._select_event(f, self.event_selection) + selected_nrows = list(selected_nrows) + + # Make list of identifiers of all examples passing event selection + if self.mode in ['stereo', 'multi-stereo']: + example_identifiers = [(filename, nrow) for nrow + in selected_nrows] + elif self.mode == 'mono': + example_identifiers = [] + field = '{}_indices'.format(self.tel_type) + # Select images + selected_indices = f.root.Events.read_coordinates(selected_nrows, field=field) + for tel_id in selected_telescopes[self.tel_type]: + img_ids = set(selected_indices[:, telescopes[self.tel_type].index(tel_id)]) + img_ids.remove(0) + img_ids = list(img_ids) + # TODO handle all selected channels + mask = self._select_image(f.root[self.tel_type][img_ids]['charge'], self.image_selection) + img_ids = np.array(img_ids)[mask] + for index in img_ids: + example_identifiers.append((filename, index, tel_id)) + + ex_identifiers_mp.extend(example_identifiers) + def _select_event(self, file, filters): """ Filter the data event wise.