1515from ..base import BaseRaw
1616
1717# these will all get mapped to `misc`. Quaternion channels are handled separately.
18- _NON_DATA_CHS = ("buffer" , "ramp" , "loadcell" , "aux" )
18+ _CONTROL_CHS = ("buffer" , "ramp" )
19+ _AUX_CHS = ("loadcell" , "aux" )
20+
21+
22+ def _get_str (node , tag ):
23+ val = node .find (tag )
24+ if val is not None :
25+ return val .text
26+
27+
28+ def _get_int (node , tag ):
29+ return int (_get_str (node , tag ))
30+
31+
32+ def _get_float (node , tag , ** replacements ):
33+ # filter freqs may be "Unknown", can't blindly parse as floats
34+ val = _get_str (node , tag )
35+ return replacements [val ] if val in replacements else float (val )
1936
2037
2138def _parse_otb_plus_metadata (metadata , extras_metadata ):
@@ -75,9 +92,13 @@ def _parse_otb_plus_metadata(metadata, extras_metadata):
7592 # 4-6: translations
7693 if ch_id .startswith ("Quaternion" ):
7794 ch_type = "chpi" # TODO verify
78- scalings [gain_ix ] = 1e-3 # TODO CHPI is usually 1e-4
95+ scalings [gain_ix ] = 1e-3 # CHPI is usually 1e-4; limbs move more
96+ adc_ranges [gain_ix ] = 1.0
97+ elif any (ch_id .lower ().startswith (_ch .lower ()) for _ch in _CONTROL_CHS ):
98+ ch_type = "stim"
99+ scalings [gain_ix ] = 1.0
79100 adc_ranges [gain_ix ] = 1.0
80- elif any (ch_id .lower ().startswith (_ch .lower ()) for _ch in _NON_DATA_CHS ):
101+ elif any (ch_id .lower ().startswith (_ch .lower ()) for _ch in _AUX_CHS ):
81102 ch_type = "misc"
82103 scalings [gain_ix ] = 1.0
83104 adc_ranges [gain_ix ] = 1.0
@@ -88,11 +109,6 @@ def _parse_otb_plus_metadata(metadata, extras_metadata):
88109 ch_types .append (ch_type )
89110
90111 # parse subject info
91- def get_str (node , tag ):
92- val = node .find (tag )
93- if val is not None :
94- return val .text
95-
96112 def parse_date (dt ):
97113 return datetime .fromisoformat (dt ).date ()
98114
@@ -111,13 +127,13 @@ def parse_sex(sex):
111127 )
112128 subject_info = dict ()
113129 for source , target , func in subj_info_mapping :
114- value = get_str (extras_metadata , source )
130+ value = _get_str (extras_metadata , source )
115131 if value is not None :
116132 subject_info [target ] = func (value )
117133
118- meas_date = get_str (extras_metadata , "time" )
119- duration = get_str (extras_metadata , "duration" )
120- site = get_str (extras_metadata , "place" )
134+ meas_date = _get_str (extras_metadata , "time" )
135+ duration = _get_float (extras_metadata , "duration" )
136+ site = _get_str (extras_metadata , "place" )
121137
122138 return dict (
123139 adc_range = adc_ranges ,
@@ -141,22 +157,11 @@ def parse_sex(sex):
141157
142158
143159def _parse_otb_four_metadata (metadata , extras_metadata ):
144- def get_str (node , tag ):
145- return node .find (tag ).text
146-
147- def get_int (node , tag ):
148- return int (get_str (node , tag ))
149-
150- def get_float (node , tag , ** replacements ):
151- # filter freqs may be "Unknown", can't blindly parse as floats
152- val = get_str (node , tag )
153- return replacements [val ] if val in replacements else float (val )
154-
155160 assert metadata .tag == "DeviceParameters"
156161 # device-level metadata
157- bit_depth = get_int (metadata , "AdBits" ) # TODO use `SampleSize * 8` instead?
158- sfreq = get_float (metadata , "SamplingFrequency" )
159- device_gain = get_float (metadata , "Gain" )
162+ bit_depth = _get_int (metadata , "AdBits" ) # TODO use `SampleSize * 8` instead?
163+ sfreq = _get_float (metadata , "SamplingFrequency" )
164+ device_gain = _get_float (metadata , "Gain" )
160165 # containers
161166 gains = list ()
162167 ch_names = list ()
@@ -178,23 +183,26 @@ def get_float(node, tag, **replacements):
178183 for adapter in extras_metadata .iter ("TrackInfo" ):
179184 strings = adapter .find ("StringsDescriptions" )
180185 # expected to be same for all adapters
181- bit_depths .add (get_int (adapter , "ADC_Nbits" ))
182- device_names .add (get_str (adapter , "Device" ))
183- sfreqs .add (get_int (adapter , "SamplingFrequency" ))
184- durations .add (get_float (adapter , "TimeDuration" ))
186+ bit_depths .add (_get_int (adapter , "ADC_Nbits" ))
187+ device_names .add (_get_str (adapter , "Device" ))
188+ sfreqs .add (_get_int (adapter , "SamplingFrequency" ))
189+ durations .add (_get_float (adapter , "TimeDuration" ))
185190 # may be different for each adapter
186- adapter_adc_range = get_float (adapter , "ADC_Range" )
187- adapter_id = get_str (adapter , "SubTitle" )
188- adapter_gain = get_float (adapter , "Gain" )
189- adapter_scaling = 1.0 / get_float (adapter , "UnitOfMeasurementFactor" )
190- # ch_offset = get_int(adapter, "AcquisitionChannel")
191- n_chans .append (get_int (adapter , "NumberOfChannels" ))
192- paths .append (get_str (adapter , "SignalStreamPath" ))
193- units .append (get_str (adapter , "UnitOfMeasurement" ))
191+ adapter_adc_range = _get_float (adapter , "ADC_Range" )
192+ adapter_id = _get_str (adapter , "SubTitle" )
193+ adapter_gain = _get_float (adapter , "Gain" )
194+ if adapter_id .startswith ("Quaternion" ):
195+ adapter_scaling = 1e-3
196+ else :
197+ adapter_scaling = 1.0 / _get_float (adapter , "UnitOfMeasurementFactor" )
198+ # ch_offset = _get_int(adapter, "AcquisitionChannel")
199+ n_chans .append (_get_int (adapter , "NumberOfChannels" ))
200+ paths .append (_get_str (adapter , "SignalStreamPath" ))
201+ units .append (_get_str (adapter , "UnitOfMeasurement" ))
194202 # we only really care about lowpass/highpass on the data channels
195203 if adapter_id not in ("Quaternion" , "Buffer" , "Ramp" ):
196- hp = get_float (strings , "HighPassFilter" , Unknown = None )
197- lp = get_float (strings , "LowPassFilter" , Unknown = None )
204+ hp = _get_float (strings , "HighPassFilter" , Unknown = None )
205+ lp = _get_float (strings , "LowPassFilter" , Unknown = None )
198206 if hp is not None :
199207 highpass .append (hp )
200208 if lp is not None :
@@ -207,8 +215,8 @@ def get_float(node, tag, **replacements):
207215 # # FWIW in the example file: range for Buffer is 1-100,
208216 # # Ramp and Control are -32767-32768, and
209217 # # EMG chs are ±2.1237507098703645E-05
210- # rmin = get_float (adapter, "RangeMin")
211- # rmax = get_float (adapter, "RangeMax")
218+ # rmin = _get_float (adapter, "RangeMin")
219+ # rmax = _get_float (adapter, "RangeMax")
212220 # if rmin.is_integer() and rmax.is_integer():
213221 # rmin = int(rmin)
214222 # rmax = int(rmax)
@@ -231,15 +239,18 @@ def get_float(node, tag, **replacements):
231239 scalings .append (adapter_scaling )
232240 # channel types
233241 # TODO verify for quats & buffer channel
234- # ramp and control channels definitely "MISC"
242+ # ramp and control channels maybe "MISC", arguably "STIM"?
235243 # quats should maybe be FIFF.FIFFV_QUAT_{N} (N from 0-6), but need to verify
236244 # what quats should be, as there are only 4 quat channels. The FIFF quats:
237- # 0: obsolete
245+ # 0: obsolete (?)
238246 # 1-3: rotations
239247 # 4-6: translations
240248 if adapter_id .startswith ("Quaternion" ):
241249 ch_type = "chpi" # TODO verify
242- elif any (adapter_id .lower ().startswith (_ch ) for _ch in _NON_DATA_CHS ):
250+ # adc_ranges[gain_ix] = 1.0
251+ elif any (adapter_id .lower ().startswith (_ch ) for _ch in _CONTROL_CHS ):
252+ ch_type = "stim"
253+ elif any (adapter_id .lower ().startswith (_ch ) for _ch in _AUX_CHS ):
243254 ch_type = "misc"
244255 else :
245256 ch_type = "emg"
@@ -369,7 +380,7 @@ def __init__(self, fname, *, verbose=None):
369380 # bit_depth seems to be unreliable for some OTB4 files, so let's check:
370381 if duration is not None : # None for OTB+ files
371382 expected_n_samp = int (duration * sfreq * n_chan )
372- expected_bit_depth = int (8 * data_size_bytes / expected_n_samp )
383+ expected_bit_depth = int (np . rint ( 8 * data_size_bytes / expected_n_samp ) )
373384 if bit_depth != expected_bit_depth :
374385 warn (
375386 f"mismatch between file metadata `AdBits` ({ bit_depth } bit) and "
0 commit comments