diff --git a/docs/changes/300.feature.rst b/docs/changes/300.feature.rst new file mode 100644 index 000000000..2afd68774 --- /dev/null +++ b/docs/changes/300.feature.rst @@ -0,0 +1,11 @@ +Add functions to calculate asymmetric energy dispersion and energy migration matrices to ``pyirf.irf.energy_dispersion``, supporting both binning in ``true_source_fov_offset`` / ``true_source_fov_position_angle`` and ``true_source_fov_lon`` / ``true_source_fov_lat``. +These functions are +* ``energy_dispersion_asymmetric_polar`` +* ``energy_dispersion_asymmetric_lonlat`` +* ``energy_migration_matrix_asymmetric_polar`` +* ``energy_migration_matrix_asymmetric_lonlat`` +* ``energy_dispersion_to_migration_asymmetric`` + +Also adds functions to create fits table HDUs in a format to the existing GADF definition for the energy dispersion to ``pyirf.io.gadf``. +* ``create_energy_dispersion_asymmetric_polar_hdu`` +* ``create_energy_dispersion_asymmetric_lonlat_hdu`` diff --git a/pyirf/io/__init__.py b/pyirf/io/__init__.py index aeefad535..e6c0a26b2 100644 --- a/pyirf/io/__init__.py +++ b/pyirf/io/__init__.py @@ -2,6 +2,8 @@ from .gadf import ( create_aeff2d_hdu, create_energy_dispersion_hdu, + create_energy_dispersion_3d_polar_hdu, + create_energy_dispersion_3d_lonlat_hdu, create_psf_table_hdu, create_rad_max_hdu, create_background_2d_hdu, @@ -13,6 +15,8 @@ "create_psf_table_hdu", "create_aeff2d_hdu", "create_energy_dispersion_hdu", + "create_energy_dispersion_3d_polar_hdu", + "create_energy_dispersion_3d_lonlat_hdu", "create_psf_table_hdu", "create_rad_max_hdu", "create_background_2d_hdu", diff --git a/pyirf/io/gadf.py b/pyirf/io/gadf.py index e14c20670..181f7d169 100644 --- a/pyirf/io/gadf.py +++ b/pyirf/io/gadf.py @@ -11,6 +11,8 @@ __all__ = [ "create_aeff2d_hdu", "create_energy_dispersion_hdu", + "create_energy_dispersion_3d_polar_hdu", + "create_energy_dispersion_3d_lonlat_hdu", "create_psf_table_hdu", "create_rad_max_hdu", ] @@ -202,6 +204,130 @@ def create_energy_dispersion_hdu( return BinTableHDU(edisp, header=header, name=extname) +def create_energy_dispersion_3d_polar_hdu( + energy_dispersion, + true_energy_bins, + fov_offset_bins, + fov_position_angle_bins, + migration_bins, + point_like=True, + extname="EDISP", + **header_cards, +): + """ + Create a fits binary table HDU in a format similar to the GADF definition of the energy dispersion, but using two-axis polar FoV coordinates. + See the similar GADF specification at + https://gamma-astro-data-formats.readthedocs.io/en/latest/irfs/full_enclosure/edisp/index.html + + Parameters + ---------- + energy_dispersion: numpy.ndarray + Energy dispersion array, must have shape + (n_energy_bins, n_migra_bins, n_source_offset_bins) + true_energy_bins: astropy.units.Quantity[energy] + Bin edges in true energy + fov_offset_bins: astropy.units.Quantity[angle] + Bin edges in the field of view offset. + For Point-Like IRFs, only giving a single bin is appropriate. + fov_position_angle_bins: astropy.units.Quantity[angle] + Bin edges in the field of view position angle. + For Point-Like IRFs, only giving a single bin is appropriate. + migration_bins: numpy.ndarray + Bin edges for the relative energy migration (``reco_energy / true_energy``) + point_like: bool + If the provided effective area was calculated after applying a direction cut, + pass ``True``, else ``False`` for a full-enclosure effective area. + extname: str + Name for BinTableHDU + **header_cards + Additional metadata to add to the header, use this to set e.g. TELESCOP or + INSTRUME. + """ + + edisp = QTable() + edisp["ENERG_LO"], edisp["ENERG_HI"] = binning.split_bin_lo_hi(true_energy_bins[np.newaxis, :].to(u.TeV)) + edisp["THETA_LO"], edisp["THETA_HI"] = binning.split_bin_lo_hi(fov_offset_bins[np.newaxis, :].to(u.deg)) + edisp["PHI_LO"], edisp["PHI_HI"] = binning.split_bin_lo_hi(fov_position_angle_bins[np.newaxis, :].to(u.deg)) + edisp["MIGRA_LO"], edisp["MIGRA_HI"] = binning.split_bin_lo_hi(migration_bins[np.newaxis, :]) + # transpose as FITS uses opposite dimension order + edisp["MATRIX"] = u.Quantity(energy_dispersion.T[np.newaxis, ...]).to(u.one) + + # required header keywords + header = DEFAULT_HEADER.copy() + header["HDUCLAS1"] = "RESPONSE" + header["HDUCLAS2"] = "EDISP" + header["HDUCLAS3"] = "POINT-LIKE" if point_like else "FULL-ENCLOSURE" + header["HDUCLAS4"] = "EDISP_3D" + header["DATE"] = Time.now().utc.iso + idx = edisp.colnames.index("MATRIX") + 1 + header[f"CREF{idx}"] = "(ENERG_LO:ENERG_HI,THETA_LO:THETA_HI,PHI_LO:PHI_HI,MIGRA_LO:MIGRA_HI)" + _add_header_cards(header, **header_cards) + + return BinTableHDU(edisp, header=header, name=extname) + + +def create_energy_dispersion_3d_lonlat_hdu( + energy_dispersion, + true_energy_bins, + fov_longitude_bins, + fov_latitude_bins, + migration_bins, + point_like=True, + extname="EDISP", + **header_cards, +): + """ + Create a fits binary table HDU in a format similar to the GADF definition of the energy dispersion, but using two-axis longitude-latitude FoV coordinates. + See the similar GADF specification at + https://gamma-astro-data-formats.readthedocs.io/en/latest/irfs/full_enclosure/edisp/index.html + + Parameters + ---------- + energy_dispersion: numpy.ndarray + Energy dispersion array, must have shape + (n_energy_bins, n_migra_bins, n_source_offset_bins) + true_energy_bins: astropy.units.Quantity[energy] + Bin edges in true energy + fov_longitude_bins: astropy.units.Quantity[angle] + Bin edges in the field of view longitude. + For Point-Like IRFs, only giving a single bin is appropriate. + fov_latitiude_bins: astropy.units.Quantity[angle] + Bin edges in the field of view latitude. + For Point-Like IRFs, only giving a single bin is appropriate. + migration_bins: numpy.ndarray + Bin edges for the relative energy migration (``reco_energy / true_energy``) + point_like: bool + If the provided effective area was calculated after applying a direction cut, + pass ``True``, else ``False`` for a full-enclosure effective area. + extname: str + Name for BinTableHDU + **header_cards + Additional metadata to add to the header, use this to set e.g. TELESCOP or + INSTRUME. + """ + + edisp = QTable() + edisp["ENERG_LO"], edisp["ENERG_HI"] = binning.split_bin_lo_hi(true_energy_bins[np.newaxis, :].to(u.TeV)) + edisp["DETX_LO"], edisp["DETX_HI"] = binning.split_bin_lo_hi(fov_longitude_bins[np.newaxis, :].to(u.deg)) + edisp["DETY_LO"], edisp["DETY_HI"] = binning.split_bin_lo_hi(fov_latitude_bins[np.newaxis, :].to(u.deg)) + edisp["MIGRA_LO"], edisp["MIGRA_HI"] = binning.split_bin_lo_hi(migration_bins[np.newaxis, :]) + # transpose as FITS uses opposite dimension order + edisp["MATRIX"] = u.Quantity(energy_dispersion.T[np.newaxis, ...]).to(u.one) + + # required header keywords + header = DEFAULT_HEADER.copy() + header["HDUCLAS1"] = "RESPONSE" + header["HDUCLAS2"] = "EDISP" + header["HDUCLAS3"] = "POINT-LIKE" if point_like else "FULL-ENCLOSURE" + header["HDUCLAS4"] = "EDISP_3D" + header["DATE"] = Time.now().utc.iso + idx = edisp.colnames.index("MATRIX") + 1 + header[f"CREF{idx}"] = "(ENERG_LO:ENERG_HI,DETX_LO:DETX_HI,DETY_LO:DETY_HI,MIGRA_LO:MIGRA_HI)" + _add_header_cards(header, **header_cards) + + return BinTableHDU(edisp, header=header, name=extname) + + #: Unit to store background rate in GADF format #: #: see https://github.com/open-gamma-ray-astro/gamma-astro-data-formats/issues/153 diff --git a/pyirf/io/tests/test_gadf.py b/pyirf/io/tests/test_gadf.py index 653834d8b..feaafe438 100644 --- a/pyirf/io/tests/test_gadf.py +++ b/pyirf/io/tests/test_gadf.py @@ -45,6 +45,44 @@ def edisp_hdus(): return edisp, hdus +@pytest.fixture +def edisp_3d_polar_hdus(): + from pyirf.io import create_energy_dispersion_3d_polar_hdu + + edisp = np.zeros( + (len(e_bins) - 1, len(fov_bins) - 1, len(fov_bins) - 1, len(migra_bins) - 1) + ) + edisp[..., 50] = 1.0 + + hdus = [ + create_energy_dispersion_3d_polar_hdu( + edisp, e_bins, fov_bins, fov_bins, migra_bins, point_like=point_like + ) + for point_like in [True, False] + ] + + return edisp, hdus + + +@pytest.fixture +def edisp_3d_lonlat_hdus(): + from pyirf.io import create_energy_dispersion_3d_lonlat_hdu + + edisp = np.zeros( + (len(e_bins) - 1, len(fov_bins) - 1, len(fov_bins) - 1, len(migra_bins) - 1) + ) + edisp[..., 50] = 1.0 + + hdus = [ + create_energy_dispersion_3d_lonlat_hdu( + edisp, e_bins, fov_bins, fov_bins, migra_bins, point_like=point_like + ) + for point_like in [True, False] + ] + + return edisp, hdus + + @pytest.fixture def psf_hdu(): from pyirf.io import create_psf_table_hdu @@ -133,6 +171,26 @@ def test_energy_dispersion_schema(edisp_hdus): EDISP_2D.validate_hdu(hdu) +@pytest.mark.xfail(reason="EDISP_3D not implemented yet", raises=ImportError) +def test_energy_dispersion_schema_3d_polar(edisp_3d_polar_hdus): + from ogadf_schema.irfs import EDISP_3D + + _, hdus = edisp_3d_polar_hdus + + for hdu in hdus: + EDISP_3D.validate_hdu(hdu) + + +@pytest.mark.xfail(reason="EDISP_3D not implemented yet", raises=ImportError) +def test_energy_dispersion_schema_3d_lonlat(edisp_3d_lonlat_hdus): + from ogadf_schema.irfs import EDISP_3D + + _, hdus = edisp_3d_lonlat_hdus + + for hdu in hdus: + EDISP_3D.validate_hdu(hdu) + + def test_psf_table_gammapy(psf_hdu): '''Test our psf is readable by gammapy''' from gammapy.irf import PSF3D diff --git a/pyirf/irf/__init__.py b/pyirf/irf/__init__.py index f6eb1608d..fed50ca68 100644 --- a/pyirf/irf/__init__.py +++ b/pyirf/irf/__init__.py @@ -5,7 +5,11 @@ effective_area_3d_polar, effective_area_3d_lonlat, ) -from .energy_dispersion import energy_dispersion +from .energy_dispersion import ( + energy_dispersion, + energy_dispersion_3d_polar, + energy_dispersion_3d_lonlat, +) from .psf import psf_table from .background import background_2d @@ -16,6 +20,8 @@ "effective_area_3d_polar", "effective_area_3d_lonlat", "energy_dispersion", + "energy_dispersion_3d_polar", + "energy_dispersion_3d_lonlat", "psf_table", "background_2d", ] diff --git a/pyirf/irf/energy_dispersion.py b/pyirf/irf/energy_dispersion.py index f34c3546d..db3565092 100644 --- a/pyirf/irf/energy_dispersion.py +++ b/pyirf/irf/energy_dispersion.py @@ -6,8 +6,13 @@ __all__ = [ "energy_dispersion", + "energy_dispersion_3d_polar", + "energy_dispersion_3d_lonlat", "energy_migration_matrix", + "energy_migration_matrix_3d_polar", + "energy_migration_matrix_3d_lonlat", "energy_dispersion_to_migration", + "energy_dispersion_to_migration_3d", ] @@ -18,14 +23,19 @@ def _normalize_hist(hist, migration_bins): # calculate number of events along the N_MIGRA axis to get events # per energy per fov - norm = hist.sum(axis=1) with np.errstate(invalid="ignore"): - # hist shape is (N_E, N_MIGRA, N_FOV), norm shape is (N_E, N_FOV) - # so we need to add a new axis in the middle to get (N_E, 1, N_FOV) - # bin_width is 1d, so we need newaxis, use the values, newaxis - hist = hist / norm[:, np.newaxis, :] / bin_width[np.newaxis, :, np.newaxis] - + if hist.ndim == 3: + # hist shape is (N_E, N_MIGRA, N_FOV), norm shape is (N_E, N_FOV) + # so we need to add a new axis in the middle to get (N_E, 1, N_FOV) + # bin_width is 1d, so we need newaxis, use the values, newaxis + norm = hist.sum(axis=1) + hist = hist / norm[:, np.newaxis, :] / bin_width[np.newaxis, :, np.newaxis] + else: + # this handles the asymmetric case where hist shape is (N_E, N_FOV_1, N_FOV_2, N_MIGRA) + norm = hist.sum(axis=-1) + hist /= norm[..., np.newaxis] + hist /= bin_width[np.newaxis, np.newaxis, np.newaxis, :] return np.nan_to_num(hist) @@ -84,6 +94,133 @@ def energy_dispersion( return energy_dispersion +def energy_dispersion_3d_polar( + selected_events, + true_energy_bins, + fov_offset_bins, + fov_position_angle_bins, + migration_bins, +): + """ + Calculate energy dispersion for the given DL2 event list. + Energy dispersion is defined as the probability of finding an event + at a given relative deviation ``(reco_energy / true_energy)`` for a given + true energy. + + Parameters + ---------- + selected_events: astropy.table.QTable + Table of the DL2 events. + Required columns: ``reco_energy``, ``true_energy``, ``true_source_fov_offset``, + ``true_source_fov_position_angle``. + true_energy_bins: astropy.units.Quantity[energy] + Bin edges in true energy + fov_offset_bins: astropy.units.Quantity[angle] + Bin edges in the field of view offset. + For Point-Like IRFs, only giving a single bin is appropriate. + fov_position_angle_bins: astropy.units.Quantity[angle] + Bin edges in the field of view position angle. + For Point-Like IRFs or when only considering offset, only giving a single bin + is apporpriate. + migration_bins: astropy.units.Quantity[energy] + Bin edges in relative deviation, recommended range: [0.2, 5] + + Returns + ------- + energy_dispersion: numpy.ndarray + Energy dispersion matrix + with shape (n_true_energy_bins, n_migration_bins, n_fov_offset_bins, + n_fov_position_angle_bins) + """ + mu = (selected_events["reco_energy"] / selected_events["true_energy"]).to_value( + u.one + ) + + energy_dispersion, _ = np.histogramdd( + np.column_stack( + [ + selected_events["true_energy"].to_value(u.TeV), + selected_events["true_source_fov_offset"].to_value(u.deg), + selected_events["true_source_fov_position_angle"].to_value(u.deg), + mu, + ] + ), + bins=[ + true_energy_bins.to_value(u.TeV), + fov_offset_bins.to_value(u.deg), + fov_position_angle_bins.to_value(u.deg), + migration_bins, + ], + ) + + energy_dispersion = _normalize_hist(energy_dispersion, migration_bins) + + return energy_dispersion + + +def energy_dispersion_3d_lonlat( + selected_events, + true_energy_bins, + fov_longitude_bins, + fov_latitude_bins, + migration_bins, +): + """ + Calculate energy dispersion for the given DL2 event list. + Energy dispersion is defined as the probability of finding an event + at a given relative deviation ``(reco_energy / true_energy)`` for a given + true energy. + + Parameters + ---------- + selected_events: astropy.table.QTable + Table of the DL2 events. + Required columns: ``reco_energy``, ``true_energy``, ``true_source_fov_lon``, + ``true_source_fov_lat``. + true_energy_bins: astropy.units.Quantity[energy] + Bin edges in true energy + fov_longitude_bins: astropy.units.Quantity[angle] + Bin edges in the field of view longitude. + For Point-Like IRFs, only giving a single bin is appropriate. + fov_latitude_bins: astropy.units.Quantity[angle] + Bin edges in the field of view latitude. + For Point-Like IRFs, only giving a single bin is apporpriate. + migration_bins: astropy.units.Quantity[energy] + Bin edges in relative deviation, recommended range: [0.2, 5] + + Returns + ------- + energy_dispersion: numpy.ndarray + Energy dispersion matrix + with shape (n_true_energy_bins, n_migration_bins, n_fov_longitude_bins, + n_fov_latitude_bins) + """ + mu = (selected_events["reco_energy"] / selected_events["true_energy"]).to_value( + u.one + ) + + energy_dispersion, _ = np.histogramdd( + np.column_stack( + [ + selected_events["true_energy"].to_value(u.TeV), + selected_events["true_source_fov_lon"].to_value(u.deg), + selected_events["true_source_fov_lat"].to_value(u.deg), + mu, + ] + ), + bins=[ + true_energy_bins.to_value(u.TeV), + fov_longitude_bins.to_value(u.deg), + fov_latitude_bins.to_value(u.deg), + migration_bins, + ], + ) + + energy_dispersion = _normalize_hist(energy_dispersion, migration_bins) + + return energy_dispersion + + @u.quantity_input(true_energy_bins=u.TeV, reco_energy_bins=u.TeV, fov_offset_bins=u.deg) def energy_migration_matrix( events, true_energy_bins, reco_energy_bins, fov_offset_bins @@ -134,6 +271,126 @@ def energy_migration_matrix( return hist +@u.quantity_input( + true_energy_bins=u.TeV, + reco_energy_bins=u.TeV, + fov_offset_bins=u.deg, + fov_position_angle_bins=u.deg, +) +def energy_migration_matrix_3d_polar( + events, true_energy_bins, reco_energy_bins, fov_offset_bins, fov_position_angle_bins +): + """Compute the energy migration matrix directly from the events in + offset and position angle binning. + + Parameters + ---------- + events : `~astropy.table.QTable` + Table of the DL2 events. + Required columns: ``reco_energy``, ``true_energy``, ``true_source_fov_offset``, + ``true_source_fov_position_angle``. + true_energy_bins : `~astropy.units.Quantity` + Bin edges in true energy. + reco_energy_bins : `~astropy.units.Quantity` + Bin edges in reconstructed energy. + + Returns + ------- + matrix : array-like + Migration matrix as probabilities along the reconstructed energy axis. + energy axis with shape + (n_true_energy_bins, n_reco_energy_bins, n_fov_offset_bins, + n_fov_position_angle_bins) + containing energies in TeV. + """ + + hist, _ = np.histogramdd( + np.column_stack( + [ + events["true_energy"].to_value(u.TeV), + events["reco_energy"].to_value(u.TeV), + events["true_source_fov_offset"].to_value(u.deg), + events["true_source_fov_position_angle"].to_value(u.deg), + ] + ), + bins=[ + true_energy_bins.to_value(u.TeV), + reco_energy_bins.to_value(u.TeV), + fov_offset_bins.to_value(u.deg), + fov_position_angle_bins.to_value(u.deg), + ], + ) + + with np.errstate(invalid="ignore"): + hist /= hist.sum(axis=1)[:, np.newaxis, ...] + # the nans come from the fact that the sum along the reconstructed energy axis + # might sometimes be 0 when there are no events in that given true energy bin + # and fov offset bin + hist[np.isnan(hist)] = 0 + + return hist + + +@u.quantity_input( + true_energy_bins=u.TeV, + reco_energy_bins=u.TeV, + fov_longitude_bins=u.deg, + fov_latitude_bins=u.deg, +) +def energy_migration_matrix_3d_lonlat( + events, true_energy_bins, reco_energy_bins, fov_longitude_bins, fov_latitude_bins +): + """Compute the energy migration matrix directly from the events in + longitude and latitude binning. + + Parameters + ---------- + events : `~astropy.table.QTable` + Table of the DL2 events. + Required columns: ``reco_energy``, ``true_energy``, ``true_source_fov_lon``, + ``true_source_fov_lat``. + true_energy_bins : `~astropy.units.Quantity` + Bin edges in true energy. + reco_energy_bins : `~astropy.units.Quantity` + Bin edges in reconstructed energy. + + Returns + ------- + matrix : array-like + Migration matrix as probabilities along the reconstructed energy axis. + energy axis with shape + (n_true_energy_bins, n_reco_energy_bins, n_fov_longitude_bins, + n_fov_latitude_bins) + containing energies in TeV. + """ + + hist, _ = np.histogramdd( + np.column_stack( + [ + events["true_energy"].to_value(u.TeV), + events["reco_energy"].to_value(u.TeV), + events["true_source_fov_lon"].to_value(u.deg), + events["true_source_fov_lat"].to_value(u.deg), + ] + ), + bins=[ + true_energy_bins.to_value(u.TeV), + reco_energy_bins.to_value(u.TeV), + fov_longitude_bins.to_value(u.deg), + fov_latitude_bins.to_value(u.deg), + ], + ) + + with np.errstate(invalid="ignore"): + hist /= hist.sum(axis=1)[:, np.newaxis, ...] + # the nans come from the fact that the sum along the reconstructed energy axis + # might sometimes be 0 when there are no events in that given true energy bin + # and fov offset bin + hist[np.isnan(hist)] = 0 + + return hist + + def energy_dispersion_to_migration( dispersion_matrix, disp_true_energy_edges, @@ -215,3 +472,86 @@ def energy_dispersion_to_migration( migration_matrix[idx, :, :] = y return migration_matrix + + +def energy_dispersion_to_migration_3d( + dispersion_matrix, + disp_true_energy_edges, + disp_migration_edges, + new_true_energy_edges, + new_reco_energy_edges, +): + """ + Construct a energy migration matrix from an energy dispersion matrix. + + Depending on the new energy ranges, the sum over the first axis + can be smaller than 1. + The new true energy bins need to be a subset of the old range, + extrapolation is not supported. + New reconstruction bins outside of the old migration range are filled with + zeros. + + Parameters + ---------- + dispersion_matrix: numpy.ndarray + Energy dispersion_matrix + disp_true_energy_edges: astropy.units.Quantity[energy] + True energy edges matching the first dimension of the dispersion matrix + disp_migration_edges: numpy.ndarray + Migration edges matching the second dimension of the dispersion matrix + new_true_energy_edges: astropy.units.Quantity[energy] + True energy edges matching the first dimension of the output + new_reco_energy_edges: astropy.units.Quantity[energy] + Reco energy edges matching the second dimension of the output + + Returns + ------- + migration_matrix: numpy.ndarray + Four-dimensional energy migration matrix. The third and fourth + dimension equal the fov dimensions of the energy dispersion matrix. + """ + migration_matrix = np.zeros( + ( + len(new_true_energy_edges) - 1, + len(new_reco_energy_edges) - 1, + *dispersion_matrix.shape[1:3], + ) + ) + + migra_width = np.diff(disp_migration_edges) + probability = dispersion_matrix * migra_width[np.newaxis, np.newaxis, np.newaxis, :] + + true_energy_interpolation = resample_histogram1d( + probability, + disp_true_energy_edges, + new_true_energy_edges, + axis=0, + ) + + norm = np.sum(true_energy_interpolation, axis=-1, keepdims=True) + norm[norm == 0] = 1 + true_energy_interpolation /= norm + + for idx, e_true in enumerate( + (new_true_energy_edges[1:] + new_true_energy_edges[:-1]) / 2 + ): + # get migration for the new true energy bin + e_true_dispersion = true_energy_interpolation[idx] + + with warnings.catch_warnings(): + # silence inf/inf division warning + warnings.filterwarnings( + "ignore", "invalid value encountered in true_divide" + ) + interpolation_edges = new_reco_energy_edges / e_true + + y = resample_histogram1d( + e_true_dispersion, + disp_migration_edges, + interpolation_edges, + axis=-1, + ).swapaxes(0, -1) + + migration_matrix[idx, ...] = y + + return migration_matrix diff --git a/pyirf/irf/tests/test_energy_dispersion.py b/pyirf/irf/tests/test_energy_dispersion.py index 451511ffc..5213a823f 100644 --- a/pyirf/irf/tests/test_energy_dispersion.py +++ b/pyirf/irf/tests/test_energy_dispersion.py @@ -91,6 +91,238 @@ def ppf(cdf, bins, value): ) +def test_energy_dispersion_3d_polar(): + from pyirf.irf import energy_dispersion_3d_polar + + np.random.seed(0) + + N = 10000 + TRUE_SIGMA_1 = 0.20 + TRUE_SIGMA_2 = 0.10 + TRUE_SIGMA_3 = 0.05 + + selected_events = QTable( + { + "reco_energy": np.concatenate( + [ + np.random.normal(1.0, TRUE_SIGMA_1, size=N) * 0.5, + np.random.normal(1.0, TRUE_SIGMA_2, size=N) * 5, + np.random.normal(1.0, TRUE_SIGMA_3, size=N) * 50, + ] + ) + * u.TeV, + "true_energy": np.concatenate( + [np.full(N, 0.5), np.full(N, 5.0), np.full(N, 50.0)] + ) + * u.TeV, + "true_source_fov_offset": np.concatenate( + [ + np.full(N // 4, 0.2), + np.full(N // 4, 1.5), + np.full(N // 4, 0.2), + np.full(N // 4, 1.5), + np.full(N // 4, 0.2), + np.full(N // 4, 1.5), + np.full(N // 4, 0.2), + np.full(N // 4, 1.5), + np.full(N // 4, 0.2), + np.full(N // 4, 1.5), + np.full(N // 4, 0.2), + np.full(N // 4, 1.5), + ] + ) + * u.deg, + "true_source_fov_position_angle": np.concatenate( + [ + np.full(N // 4, 90), + np.full(N // 4, 270), + np.full(N // 4, 270), + np.full(N // 4, 90), + np.full(N // 4, 90), + np.full(N // 4, 270), + np.full(N // 4, 270), + np.full(N // 4, 90), + np.full(N // 4, 90), + np.full(N // 4, 270), + np.full(N // 4, 270), + np.full(N // 4, 90), + ] + ) + * u.deg, + } + ) + + true_energy_bins = np.array([0.1, 1.0, 10.0, 100]) * u.TeV + fov_offset_bins = np.array([0, 1, 2]) * u.deg + fov_position_angle_bins = np.array([0, 180, 360]) * u.deg + migration_bins = np.linspace(0, 2, 1001) + + result = energy_dispersion_3d_polar( + selected_events, + true_energy_bins, + fov_offset_bins, + fov_position_angle_bins, + migration_bins, + ) + + assert result.shape == (3, 2, 2, 1000) + + bin_width = np.diff(migration_bins) + bin_centers = 0.5 * (migration_bins[1:] + migration_bins[:-1]) + # edisp shape is (N_E, N_MIGRA, N_FOV_1, N_FOV_2), + # we need to integrate over migration axis + integral = (result * bin_width[np.newaxis, np.newaxis, np.newaxis, :]).sum(axis=-1) + + np.testing.assert_allclose(integral, 1.0) + + cdf = np.cumsum(result * bin_width[np.newaxis, np.newaxis, np.newaxis, :], axis=-1) + + def ppf(cdf, bins, value): + return np.interp(value, cdf, bins[1:]) + + assert np.isclose( + TRUE_SIGMA_1, + 0.5 + * ( + ppf(cdf[0, 0, 0, :], migration_bins, 0.84) + - ppf(cdf[0, 0, 0, :], migration_bins, 0.16) + ), + rtol=0.1, + ) + assert np.isclose( + TRUE_SIGMA_2, + 0.5 + * ( + ppf(cdf[1, 0, 0, :], migration_bins, 0.84) + - ppf(cdf[1, 0, 0, :], migration_bins, 0.16) + ), + rtol=0.1, + ) + assert np.isclose( + TRUE_SIGMA_3, + 0.5 + * ( + ppf(cdf[2, 0, 0, :], migration_bins, 0.84) + - ppf(cdf[2, 0, 0, :], migration_bins, 0.16) + ), + rtol=0.1, + ) + + +def test_energy_dispersion_3d_lonlat(): + from pyirf.irf import energy_dispersion_3d_lonlat + + np.random.seed(0) + + N = 10000 + TRUE_SIGMA_1 = 0.20 + TRUE_SIGMA_2 = 0.10 + TRUE_SIGMA_3 = 0.05 + + selected_events = QTable( + { + "reco_energy": np.concatenate( + [ + np.random.normal(1.0, TRUE_SIGMA_1, size=N) * 0.5, + np.random.normal(1.0, TRUE_SIGMA_2, size=N) * 5, + np.random.normal(1.0, TRUE_SIGMA_3, size=N) * 50, + ] + ) + * u.TeV, + "true_energy": np.concatenate( + [np.full(N, 0.5), np.full(N, 5.0), np.full(N, 50.0)] + ) + * u.TeV, + "true_source_fov_lon": np.concatenate( + [ + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + ] + ) + * u.deg, + "true_source_fov_lat": np.concatenate( + [ + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, 0.5), + np.full(N // 4, -0.5), + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, 0.5), + np.full(N // 4, -0.5), + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, 0.5), + np.full(N // 4, -0.5), + ] + ) + * u.deg, + } + ) + + true_energy_bins = np.array([0.1, 1.0, 10.0, 100]) * u.TeV + fov_lon_bins = np.array([-1, 0, 1]) * u.deg + fov_lat_bins = np.array([-1, 0, 1]) * u.deg + migration_bins = np.linspace(0, 2, 1001) + + result = energy_dispersion_3d_lonlat( + selected_events, true_energy_bins, fov_lon_bins, fov_lat_bins, migration_bins + ) + + assert result.shape == (3, 2, 2, 1000) + + bin_width = np.diff(migration_bins) + bin_centers = 0.5 * (migration_bins[1:] + migration_bins[:-1]) + # edisp shape is (N_E, N_MIGRA, N_FOV_1, N_FOV_2), + # we need to integrate over migration axis + integral = (result * bin_width[np.newaxis, np.newaxis, np.newaxis, :]).sum(axis=-1) + + np.testing.assert_allclose(integral, 1.0) + + cdf = np.cumsum(result * bin_width[np.newaxis, np.newaxis, np.newaxis, :], axis=-1) + + def ppf(cdf, bins, value): + return np.interp(value, cdf, bins[1:]) + + assert np.isclose( + TRUE_SIGMA_1, + 0.5 + * ( + ppf(cdf[0, 0, 0, :], migration_bins, 0.84) + - ppf(cdf[0, 0, 0, :], migration_bins, 0.16) + ), + rtol=0.1, + ) + assert np.isclose( + TRUE_SIGMA_2, + 0.5 + * ( + ppf(cdf[1, 0, 0, :], migration_bins, 0.84) + - ppf(cdf[1, 0, 0, :], migration_bins, 0.16) + ), + rtol=0.1, + ) + assert np.isclose( + TRUE_SIGMA_3, + 0.5 + * ( + ppf(cdf[2, 0, 0, :], migration_bins, 0.84) + - ppf(cdf[2, 0, 0, :], migration_bins, 0.16) + ), + rtol=0.1, + ) + + def test_energy_dispersion_to_migration(): from pyirf.irf import energy_dispersion from pyirf.irf.energy_dispersion import energy_dispersion_to_migration @@ -153,6 +385,164 @@ def test_energy_dispersion_to_migration(): assert np.all(np.isfinite(migration_matrix)) +def test_energy_dispersion_to_migration_3d_polar(): + from pyirf.irf import energy_dispersion_3d_polar + from pyirf.irf.energy_dispersion import energy_dispersion_to_migration_3d + + np.random.seed(0) + N = 10000 + true_energy_bins = 10 ** np.arange(np.log10(0.2), np.log10(200), 1 / 10) * u.TeV + + fov_offset_bins = np.array([0, 1, 2]) * u.deg + fov_position_angle_bins = np.array([0, 180, 360]) * u.deg + migration_bins = np.linspace(0, 2, 101) + + true_energy = ( + np.random.uniform(true_energy_bins[0].value, true_energy_bins[-1].value, size=N) + * u.TeV + ) + reco_energy = true_energy * np.random.uniform(0.5, 1.5, size=N) + + selected_events = QTable( + { + "reco_energy": reco_energy, + "true_energy": true_energy, + "true_source_fov_offset": np.concatenate( + [ + np.full(N // 4, 0.2), + np.full(N // 4, 1.5), + np.full(N // 4, 0.2), + np.full(N // 4, 1.5), + ] + ) + * u.deg, + "true_source_fov_position_angle": np.concatenate( + [ + np.full(N // 4, 90), + np.full(N // 4, 90), + np.full(N // 4, 270), + np.full(N // 4, 270), + ] + ) + * u.deg, + } + ) + + dispersion_matrix = energy_dispersion_3d_polar( + selected_events, + true_energy_bins, + fov_offset_bins, + fov_position_angle_bins, + migration_bins, + ) + + # migration matrix selecting a limited energy band with different binsizes + new_true_energy_bins = 10 ** np.arange(np.log10(2), np.log10(20), 1 / 5) * u.TeV + new_reco_energy_bins = 10 ** np.arange(np.log10(2), np.log10(20), 1 / 5) * u.TeV + migration_matrix = energy_dispersion_to_migration_3d( + dispersion_matrix, + true_energy_bins, + migration_bins, + new_true_energy_bins, + new_reco_energy_bins, + ) + + # test dimension + assert migration_matrix.shape[0] == len(new_true_energy_bins) - 1 + assert migration_matrix.shape[1] == len(new_reco_energy_bins) - 1 + assert migration_matrix.shape[2] == dispersion_matrix.shape[1] + assert migration_matrix.shape[3] == dispersion_matrix.shape[2] + + # test that all migrations are included for central energies + assert np.isclose(migration_matrix.sum(axis=1).max(), 1, rtol=0.01) + + # test that migrations dont always sum to 1 (since some energies are + # not included in the matrix) + assert migration_matrix.sum() < (len(new_true_energy_bins) - 1) * ( + len(fov_offset_bins) - 1 + ) * (len(fov_position_angle_bins) - 1) + assert np.all(np.isfinite(migration_matrix)) + + +def test_energy_dispersion_to_migration_3d_lonlat(): + from pyirf.irf import energy_dispersion_3d_lonlat + from pyirf.irf.energy_dispersion import energy_dispersion_to_migration_3d + + np.random.seed(0) + N = 10000 + true_energy_bins = 10 ** np.arange(np.log10(0.2), np.log10(200), 1 / 10) * u.TeV + + fov_longitude_bins = np.array([-1, 0, 1]) * u.deg + fov_latitude_bins = np.array([-1, 0, 1]) * u.deg + migration_bins = np.linspace(0, 2, 101) + + true_energy = ( + np.random.uniform(true_energy_bins[0].value, true_energy_bins[-1].value, size=N) + * u.TeV + ) + reco_energy = true_energy * np.random.uniform(0.5, 1.5, size=N) + + selected_events = QTable( + { + "reco_energy": reco_energy, + "true_energy": true_energy, + "true_source_fov_lon": np.concatenate( + [ + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + ] + ) + * u.deg, + "true_source_fov_lat": np.concatenate( + [ + np.full(N // 4, -0.5), + np.full(N // 4, -0.5), + np.full(N // 4, 0.5), + np.full(N // 4, 0.5), + ] + ) + * u.deg, + } + ) + + dispersion_matrix = energy_dispersion_3d_lonlat( + selected_events, + true_energy_bins, + fov_longitude_bins, + fov_latitude_bins, + migration_bins, + ) + + # migration matrix selecting a limited energy band with different binsizes + new_true_energy_bins = 10 ** np.arange(np.log10(2), np.log10(20), 1 / 5) * u.TeV + new_reco_energy_bins = 10 ** np.arange(np.log10(2), np.log10(20), 1 / 5) * u.TeV + migration_matrix = energy_dispersion_to_migration_3d( + dispersion_matrix, + true_energy_bins, + migration_bins, + new_true_energy_bins, + new_reco_energy_bins, + ) + + # test dimension + assert migration_matrix.shape[0] == len(new_true_energy_bins) - 1 + assert migration_matrix.shape[1] == len(new_reco_energy_bins) - 1 + assert migration_matrix.shape[2] == dispersion_matrix.shape[1] + assert migration_matrix.shape[3] == dispersion_matrix.shape[2] + + # test that all migrations are included for central energies + assert np.isclose(migration_matrix.sum(axis=1).max(), 1, rtol=0.01) + + # test that migrations dont always sum to 1 (since some energies are + # not included in the matrix) + assert migration_matrix.sum() < (len(new_true_energy_bins) - 1) * ( + len(fov_longitude_bins) - 1 + ) * (len(fov_latitude_bins) - 1) + assert np.all(np.isfinite(migration_matrix)) + + def test_energy_migration_matrix_from_events(): from pyirf.irf.energy_dispersion import energy_migration_matrix @@ -194,6 +584,122 @@ def test_energy_migration_matrix_from_events(): assert np.allclose(matrix.sum(axis=1).max(), 1, rtol=0.1) +def test_energy_migration_matrix_3d_polar_from_events(): + from pyirf.irf.energy_dispersion import energy_migration_matrix_3d_polar + + np.random.seed(0) + N = 10000 + true_energy_bins = 10 ** np.arange(np.log10(0.2), np.log10(200), 1 / 10) * u.TeV + reco_energy_bins = 10 ** np.arange(np.log10(2), np.log10(20), 1 / 5) * u.TeV + fov_offset_bins = np.array([0, 1, 2]) * u.deg + fov_position_angle_bins = np.array([0, 180, 360]) * u.deg + + true_energy = ( + np.random.uniform(true_energy_bins[0].value, true_energy_bins[-1].value, size=N) + * u.TeV + ) + reco_energy = true_energy * np.random.uniform(0.5, 1.5, size=N) + + events = QTable( + { + "reco_energy": reco_energy, + "true_energy": true_energy, + "true_source_fov_offset": np.concatenate( + [ + np.full(N // 4, 0.2), + np.full(N // 4, 0.2), + np.full(N // 4, 1.5), + np.full(N // 4, 1.5), + ] + ) + * u.deg, + "true_source_fov_position_angle": np.concatenate( + [ + np.full(N // 4, 90), + np.full(N // 4, 270), + np.full(N // 4, 90), + np.full(N // 4, 270), + ] + ) + * u.deg, + } + ) + + matrix = energy_migration_matrix_3d_polar( + events, + true_energy_bins, + reco_energy_bins, + fov_offset_bins, + fov_position_angle_bins, + ) + + assert matrix.shape == ( + len(true_energy_bins) - 1, + len(reco_energy_bins) - 1, + len(fov_offset_bins) - 1, + len(fov_position_angle_bins) - 1, + ) + assert np.allclose(matrix.sum(axis=1).max(), 1, rtol=0.1) + + +def test_energy_migration_matrix_3d_lonlat_from_events(): + from pyirf.irf.energy_dispersion import energy_migration_matrix_3d_lonlat + + np.random.seed(0) + N = 10000 + true_energy_bins = 10 ** np.arange(np.log10(0.2), np.log10(200), 1 / 10) * u.TeV + reco_energy_bins = 10 ** np.arange(np.log10(2), np.log10(20), 1 / 5) * u.TeV + fov_longitude_bins = np.array([-1, 0, 1]) * u.deg + fov_latitude_bins = np.array([-1, 0, 1]) * u.deg + + true_energy = ( + np.random.uniform(true_energy_bins[0].value, true_energy_bins[-1].value, size=N) + * u.TeV + ) + reco_energy = true_energy * np.random.uniform(0.5, 1.5, size=N) + + events = QTable( + { + "reco_energy": reco_energy, + "true_energy": true_energy, + "true_source_fov_lon": np.concatenate( + [ + np.full(N // 4, 0.2), + np.full(N // 4, 0.2), + np.full(N // 4, -0.2), + np.full(N // 4, -0.2), + ] + ) + * u.deg, + "true_source_fov_lat": np.concatenate( + [ + np.full(N // 4, 0.2), + np.full(N // 4, -0.2), + np.full(N // 4, 0.2), + np.full(N // 4, -0.2), + ] + ) + * u.deg, + } + ) + + matrix = energy_migration_matrix_3d_lonlat( + events, + true_energy_bins, + reco_energy_bins, + fov_longitude_bins, + fov_latitude_bins, + ) + + assert matrix.shape == ( + len(true_energy_bins) - 1, + len(reco_energy_bins) - 1, + len(fov_longitude_bins) - 1, + len(fov_latitude_bins) - 1, + ) + assert np.allclose(matrix.sum(axis=1).max(), 1, rtol=0.1) + + def test_overflow_bins_migration_matrix(): from pyirf.irf import energy_dispersion from pyirf.irf.energy_dispersion import energy_dispersion_to_migration