Skip to content

Data containers

Generating mock data

fakeit_for_multiple_parameters(obsconfs, model, parameters, rng_key=0, apply_stat=True, sparsify_matrix=False)

Convenience function to simulate multiple spectra from a given model and a set of parameters. This is supposed to be somewhat optimized and can handle multiple parameters at once without blowing up the memory. The parameters should be passed as a dictionary with the parameter name as the key and the parameter values as the values, the value can be a scalar or a nd-array.

Example:

from jaxspec.data.util import fakeit_for_multiple_parameters
from numpy.random import default_rng

rng = default_rng(42)
size = (10, 30)

parameters = {
    "tbabs_1_nh": rng.uniform(0.1, 0.4, size=size),
    "powerlaw_1_alpha": rng.uniform(1, 3, size=size),
    "powerlaw_1_norm": rng.exponential(10 ** (-0.5), size=size),
    "blackbodyrad_1_kT": rng.uniform(0.1, 3.0, size=size),
    "blackbodyrad_1_norm": rng.exponential(10 ** (-3), size=size)
}

spectra = fakeit_for_multiple_parameters(obsconf, model, parameters)

Parameters:

Name Type Description Default
obsconfs ObsConfiguration | list[ObsConfiguration]

The observational setup(s).

required
model SpectralModel

The model to use.

required
parameters Mapping[K, V]

The parameters of the model.

required
rng_key int

The random number generator seed.

0
apply_stat bool

Whether to apply Poisson statistic on the folded spectra or not.

True
sparsify_matrix bool

Whether to sparsify the matrix or not.

False
Source code in src/jaxspec/data/util.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def fakeit_for_multiple_parameters(
    obsconfs: ObsConfiguration | list[ObsConfiguration],
    model: SpectralModel,
    parameters: Mapping[K, V],
    rng_key: int = 0,
    apply_stat: bool = True,
    sparsify_matrix: bool = False,
):
    """
    Convenience function to simulate multiple spectra from a given model and a set of parameters.
    This is supposed to be somewhat optimized and can handle multiple parameters at once without blowing
    up the memory. The parameters should be passed as a dictionary with the parameter name as the key and
    the parameter values as the values, the value can be a scalar or a nd-array.

    # Example:

    ``` python
    from jaxspec.data.util import fakeit_for_multiple_parameters
    from numpy.random import default_rng

    rng = default_rng(42)
    size = (10, 30)

    parameters = {
        "tbabs_1_nh": rng.uniform(0.1, 0.4, size=size),
        "powerlaw_1_alpha": rng.uniform(1, 3, size=size),
        "powerlaw_1_norm": rng.exponential(10 ** (-0.5), size=size),
        "blackbodyrad_1_kT": rng.uniform(0.1, 3.0, size=size),
        "blackbodyrad_1_norm": rng.exponential(10 ** (-3), size=size)
    }

    spectra = fakeit_for_multiple_parameters(obsconf, model, parameters)
    ```

    Parameters:
        obsconfs: The observational setup(s).
        model: The model to use.
        parameters: The parameters of the model.
        rng_key: The random number generator seed.
        apply_stat: Whether to apply Poisson statistic on the folded spectra or not.
        sparsify_matrix: Whether to sparsify the matrix or not.
    """

    obsconf_list = [obsconfs] if isinstance(obsconfs, ObsConfiguration) else obsconfs
    fakeits = []

    for i, obsconf in enumerate(obsconf_list):
        countrate = forward_model_with_multiple_inputs(
            model, parameters, obsconf, sparse=sparsify_matrix
        )

        if apply_stat:
            with handlers.seed(rng_seed=rng_key):
                spectrum = numpyro.sample(
                    f"likelihood_obs_{i}",
                    numpyro.distributions.Poisson(countrate),
                )

        else:
            spectrum = countrate

        fakeits.append(spectrum)

    return fakeits[0] if len(fakeits) == 1 else fakeits

Data containers

ObsConfiguration

Bases: Dataset

Class to store the data of a folding model, which is the link between the unfolded and folded spectra.

Source code in src/jaxspec/data/obsconf.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class ObsConfiguration(xr.Dataset):
    """
    Class to store the data of a folding model, which is the link between the unfolded and folded spectra.
    """

    transfer_matrix: xr.DataArray
    """The transfer matrix"""
    area: xr.DataArray
    """The effective area of the instrument"""
    exposure: xr.DataArray
    """The total exposure"""
    folded_counts: xr.DataArray
    """The observed counts, after grouping"""
    folded_background: xr.DataArray
    """The background counts, after grouping"""

    __slots__ = (
        "transfer_matrix",
        "area",
        "exposure",
        "folded_counts",
        "folded_background",
    )

    @property
    def in_energies(self):
        """
        The energy bounds of the unfolded bins in keV. The shape is (2, n_bins).
        """

        in_energies = np.stack(
            (
                np.asarray(self.coords["e_min_unfolded"], dtype=np.float64),
                np.asarray(self.coords["e_max_unfolded"], dtype=np.float64),
            )
        )

        return in_energies

    @property
    def out_energies(self):
        """
        The energy bounds of the folded bins in keV. The shape is (2, n_bins).
        """

        out_energies = np.stack(
            (
                np.asarray(self.coords["e_min_folded"].data, dtype=np.float64),
                np.asarray(self.coords["e_max_folded"].data, dtype=np.float64),
            )
        )

        return out_energies

    @classmethod
    def from_pha_file(
        cls,
        pha_path,
        rmf_path: str | None = None,
        arf_path: str | None = None,
        bkg_path: str | None = None,
        low_energy: float = 1e-20,
        high_energy: float = 1e20,
    ):
        r"""
        Build the observation configuration from a PHA file.

        Parameters:
            pha_path: The path to the PHA file.
            rmf_path: The path to the RMF file.
            arf_path: The path to the ARF file.
            bkg_path: The path to the background file.
            low_energy: The lower bound of the energy range to consider.
            high_energy: The upper bound of the energy range to consider.
        """

        from .util import data_path_finder

        arf_path_default, rmf_path_default, bkg_path_default = data_path_finder(pha_path)

        arf_path = arf_path_default if arf_path is None else arf_path
        rmf_path = rmf_path_default if rmf_path is None else rmf_path
        bkg_path = bkg_path_default if bkg_path is None else bkg_path

        instrument = Instrument.from_ogip_file(rmf_path, arf_path=arf_path)
        observation = Observation.from_pha_file(pha_path, bkg_path=bkg_path)

        return cls.from_instrument(
            instrument, observation, low_energy=low_energy, high_energy=high_energy
        )

    @classmethod
    def from_instrument(
        cls,
        instrument: Instrument,
        observation: Observation,
        low_energy: float = 1e-20,
        high_energy: float = 1e20,
    ):
        r"""
        Build the observation configuration from an [`Instrument`][jaxspec.data.Instrument] and an [`Observation`][jaxspec.data.Observation] object.

        Parameters:
            instrument: The instrument object.
            observation: The observation object.
            low_energy: The lower bound of the energy range to consider.
            high_energy: The upper bound of the energy range to consider.

        """
        # First we unpack all the xarray data to classical np array for efficiency
        # We also exclude the bins that are flagged with bad quality on the instrument
        quality_filter = observation.quality.data == 0
        grouping = (
            scipy.sparse.csr_array(observation.grouping.data.to_scipy_sparse()) * quality_filter
        )
        e_min_channel = instrument.coords["e_min_channel"].data
        e_max_channel = instrument.coords["e_max_channel"].data
        e_min_unfolded = instrument.coords["e_min_unfolded"].data
        e_max_unfolded = instrument.coords["e_max_unfolded"].data
        redistribution = scipy.sparse.csr_array(instrument.redistribution.data.to_scipy_sparse())
        area = instrument.area.data
        exposure = observation.exposure.data

        # Computing the lower and upper energies of the bins after grouping
        # This is just a trick to compute it without 10 lines of code
        grouping_nan = observation.grouping.data * quality_filter
        grouping_nan.fill_value = np.nan
        e_min = sparse.nanmin(grouping_nan * e_min_channel, axis=1).todense()
        e_max = sparse.nanmax(grouping_nan * e_max_channel, axis=1).todense()

        # Compute the transfer matrix
        transfer_matrix = grouping @ (redistribution * area * exposure)

        # Exclude bins out of the considered energy range, and bins without contribution from the RMF

        row_idx = (e_min > low_energy) & (e_max < high_energy) & (grouping.sum(axis=1) > 0)
        col_idx = (e_min_unfolded > 0) & (redistribution.sum(axis=0) > 0)

        # Apply this reduction to all the relevant arrays
        transfer_matrix = sparse.COO.from_scipy_sparse(transfer_matrix[row_idx][:, col_idx])
        folded_counts = observation.folded_counts.data[row_idx]
        folded_backratio = observation.folded_backratio.data[row_idx]
        area = instrument.area.data[col_idx]
        e_min_folded = e_min[row_idx]
        e_max_folded = e_max[row_idx]
        e_min_unfolded = e_min_unfolded[col_idx]
        e_max_unfolded = e_max_unfolded[col_idx]

        if observation.folded_background is not None:
            folded_background = observation.folded_background.data[row_idx]
            folded_background_unscaled = observation.folded_background_unscaled.data[row_idx]
        else:
            folded_background = np.zeros_like(folded_counts)
            folded_background_unscaled = np.zeros_like(folded_counts)

        data_dict = {
            "transfer_matrix": (
                ["folded_channel", "unfolded_channel"],
                transfer_matrix,
                {
                    "description": "Transfer matrix to use to fold the incoming spectrum. It is built and restricted using the grouping, redistribution matrix, effective area, quality flags and energy bands defined by the user."
                },
            ),
            "area": (
                ["unfolded_channel"],
                area,
                {
                    "description": "Effective area with the same restrictions as the transfer matrix.",
                    "units": "cm^2",
                },
            ),
            "exposure": ([], exposure, {"description": "Total exposure", "unit": "s"}),
            "folded_counts": (
                ["folded_channel"],
                folded_counts,
                {
                    "description": "Folded counts after grouping, with the same restrictions as the transfer matrix.",
                    "unit": "photons",
                },
            ),
            "folded_backratio": (
                ["folded_channel"],
                folded_backratio,
                {
                    "description": "Background scaling after grouping, with the same restrictions as the transfer matrix."
                },
            ),
            "folded_background": (
                ["folded_channel"],
                folded_background,
                {
                    "description": "Folded background counts after grouping, with the same restrictions as the transfer matrix.",
                    "unit": "photons",
                },
            ),
            "folded_background_unscaled": (
                ["folded_channel"],
                folded_background_unscaled,
                {
                    "description": "To be done",
                    "unit": "photons",
                },
            ),
        }

        return cls(
            data_dict,
            coords={
                "e_min_folded": (
                    ["folded_channel"],
                    e_min_folded,
                    {"description": "Low energy of folded channel"},
                ),
                "e_max_folded": (
                    ["folded_channel"],
                    e_max_folded,
                    {"description": "High energy of folded channel"},
                ),
                "e_min_unfolded": (
                    ["unfolded_channel"],
                    e_min_unfolded,
                    {"description": "Low energy of unfolded channel"},
                ),
                "e_max_unfolded": (
                    ["unfolded_channel"],
                    e_max_unfolded,
                    {"description": "High energy of unfolded channel"},
                ),
            },
            attrs=observation.attrs | instrument.attrs,
        )

    @classmethod
    def mock_from_instrument(
        cls,
        instrument: Instrument,
        exposure: float,
        low_energy: float = 1e-300,
        high_energy: float = 1e300,
    ):
        """
        Create a mock observation configuration from an instrument object. The fake observation will have zero counts.

        Parameters:
            instrument: The instrument object.
            exposure: The total exposure of the mock observation.
            low_energy: The lower bound of the energy range to consider.
            high_energy: The upper bound of the energy range to consider.
        """

        n_channels = len(instrument.coords["instrument_channel"])

        observation = Observation.from_matrix(
            np.zeros(n_channels),
            sparse.eye(n_channels),
            np.arange(n_channels),
            np.zeros(n_channels, dtype=bool),
            exposure,
            backratio=np.ones(n_channels),
            attributes={"description": "Mock observation"} | instrument.attrs,
        )

        return cls.from_instrument(
            instrument, observation, low_energy=low_energy, high_energy=high_energy
        )

    def plot_counts(self, **kwargs):
        return self.folded_counts.plot.step(x="e_min_folded", where="post", **kwargs)

area instance-attribute

The effective area of the instrument

exposure instance-attribute

The total exposure

folded_background instance-attribute

The background counts, after grouping

folded_counts instance-attribute

The observed counts, after grouping

in_energies property

The energy bounds of the unfolded bins in keV. The shape is (2, n_bins).

out_energies property

The energy bounds of the folded bins in keV. The shape is (2, n_bins).

transfer_matrix instance-attribute

The transfer matrix

from_instrument(instrument, observation, low_energy=1e-20, high_energy=1e+20) classmethod

Build the observation configuration from an Instrument and an Observation object.

Parameters:

Name Type Description Default
instrument Instrument

The instrument object.

required
observation Observation

The observation object.

required
low_energy float

The lower bound of the energy range to consider.

1e-20
high_energy float

The upper bound of the energy range to consider.

1e+20
Source code in src/jaxspec/data/obsconf.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
@classmethod
def from_instrument(
    cls,
    instrument: Instrument,
    observation: Observation,
    low_energy: float = 1e-20,
    high_energy: float = 1e20,
):
    r"""
    Build the observation configuration from an [`Instrument`][jaxspec.data.Instrument] and an [`Observation`][jaxspec.data.Observation] object.

    Parameters:
        instrument: The instrument object.
        observation: The observation object.
        low_energy: The lower bound of the energy range to consider.
        high_energy: The upper bound of the energy range to consider.

    """
    # First we unpack all the xarray data to classical np array for efficiency
    # We also exclude the bins that are flagged with bad quality on the instrument
    quality_filter = observation.quality.data == 0
    grouping = (
        scipy.sparse.csr_array(observation.grouping.data.to_scipy_sparse()) * quality_filter
    )
    e_min_channel = instrument.coords["e_min_channel"].data
    e_max_channel = instrument.coords["e_max_channel"].data
    e_min_unfolded = instrument.coords["e_min_unfolded"].data
    e_max_unfolded = instrument.coords["e_max_unfolded"].data
    redistribution = scipy.sparse.csr_array(instrument.redistribution.data.to_scipy_sparse())
    area = instrument.area.data
    exposure = observation.exposure.data

    # Computing the lower and upper energies of the bins after grouping
    # This is just a trick to compute it without 10 lines of code
    grouping_nan = observation.grouping.data * quality_filter
    grouping_nan.fill_value = np.nan
    e_min = sparse.nanmin(grouping_nan * e_min_channel, axis=1).todense()
    e_max = sparse.nanmax(grouping_nan * e_max_channel, axis=1).todense()

    # Compute the transfer matrix
    transfer_matrix = grouping @ (redistribution * area * exposure)

    # Exclude bins out of the considered energy range, and bins without contribution from the RMF

    row_idx = (e_min > low_energy) & (e_max < high_energy) & (grouping.sum(axis=1) > 0)
    col_idx = (e_min_unfolded > 0) & (redistribution.sum(axis=0) > 0)

    # Apply this reduction to all the relevant arrays
    transfer_matrix = sparse.COO.from_scipy_sparse(transfer_matrix[row_idx][:, col_idx])
    folded_counts = observation.folded_counts.data[row_idx]
    folded_backratio = observation.folded_backratio.data[row_idx]
    area = instrument.area.data[col_idx]
    e_min_folded = e_min[row_idx]
    e_max_folded = e_max[row_idx]
    e_min_unfolded = e_min_unfolded[col_idx]
    e_max_unfolded = e_max_unfolded[col_idx]

    if observation.folded_background is not None:
        folded_background = observation.folded_background.data[row_idx]
        folded_background_unscaled = observation.folded_background_unscaled.data[row_idx]
    else:
        folded_background = np.zeros_like(folded_counts)
        folded_background_unscaled = np.zeros_like(folded_counts)

    data_dict = {
        "transfer_matrix": (
            ["folded_channel", "unfolded_channel"],
            transfer_matrix,
            {
                "description": "Transfer matrix to use to fold the incoming spectrum. It is built and restricted using the grouping, redistribution matrix, effective area, quality flags and energy bands defined by the user."
            },
        ),
        "area": (
            ["unfolded_channel"],
            area,
            {
                "description": "Effective area with the same restrictions as the transfer matrix.",
                "units": "cm^2",
            },
        ),
        "exposure": ([], exposure, {"description": "Total exposure", "unit": "s"}),
        "folded_counts": (
            ["folded_channel"],
            folded_counts,
            {
                "description": "Folded counts after grouping, with the same restrictions as the transfer matrix.",
                "unit": "photons",
            },
        ),
        "folded_backratio": (
            ["folded_channel"],
            folded_backratio,
            {
                "description": "Background scaling after grouping, with the same restrictions as the transfer matrix."
            },
        ),
        "folded_background": (
            ["folded_channel"],
            folded_background,
            {
                "description": "Folded background counts after grouping, with the same restrictions as the transfer matrix.",
                "unit": "photons",
            },
        ),
        "folded_background_unscaled": (
            ["folded_channel"],
            folded_background_unscaled,
            {
                "description": "To be done",
                "unit": "photons",
            },
        ),
    }

    return cls(
        data_dict,
        coords={
            "e_min_folded": (
                ["folded_channel"],
                e_min_folded,
                {"description": "Low energy of folded channel"},
            ),
            "e_max_folded": (
                ["folded_channel"],
                e_max_folded,
                {"description": "High energy of folded channel"},
            ),
            "e_min_unfolded": (
                ["unfolded_channel"],
                e_min_unfolded,
                {"description": "Low energy of unfolded channel"},
            ),
            "e_max_unfolded": (
                ["unfolded_channel"],
                e_max_unfolded,
                {"description": "High energy of unfolded channel"},
            ),
        },
        attrs=observation.attrs | instrument.attrs,
    )

from_pha_file(pha_path, rmf_path=None, arf_path=None, bkg_path=None, low_energy=1e-20, high_energy=1e+20) classmethod

Build the observation configuration from a PHA file.

Parameters:

Name Type Description Default
pha_path

The path to the PHA file.

required
rmf_path str | None

The path to the RMF file.

None
arf_path str | None

The path to the ARF file.

None
bkg_path str | None

The path to the background file.

None
low_energy float

The lower bound of the energy range to consider.

1e-20
high_energy float

The upper bound of the energy range to consider.

1e+20
Source code in src/jaxspec/data/obsconf.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@classmethod
def from_pha_file(
    cls,
    pha_path,
    rmf_path: str | None = None,
    arf_path: str | None = None,
    bkg_path: str | None = None,
    low_energy: float = 1e-20,
    high_energy: float = 1e20,
):
    r"""
    Build the observation configuration from a PHA file.

    Parameters:
        pha_path: The path to the PHA file.
        rmf_path: The path to the RMF file.
        arf_path: The path to the ARF file.
        bkg_path: The path to the background file.
        low_energy: The lower bound of the energy range to consider.
        high_energy: The upper bound of the energy range to consider.
    """

    from .util import data_path_finder

    arf_path_default, rmf_path_default, bkg_path_default = data_path_finder(pha_path)

    arf_path = arf_path_default if arf_path is None else arf_path
    rmf_path = rmf_path_default if rmf_path is None else rmf_path
    bkg_path = bkg_path_default if bkg_path is None else bkg_path

    instrument = Instrument.from_ogip_file(rmf_path, arf_path=arf_path)
    observation = Observation.from_pha_file(pha_path, bkg_path=bkg_path)

    return cls.from_instrument(
        instrument, observation, low_energy=low_energy, high_energy=high_energy
    )

mock_from_instrument(instrument, exposure, low_energy=1e-300, high_energy=1e+300) classmethod

Create a mock observation configuration from an instrument object. The fake observation will have zero counts.

Parameters:

Name Type Description Default
instrument Instrument

The instrument object.

required
exposure float

The total exposure of the mock observation.

required
low_energy float

The lower bound of the energy range to consider.

1e-300
high_energy float

The upper bound of the energy range to consider.

1e+300
Source code in src/jaxspec/data/obsconf.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
@classmethod
def mock_from_instrument(
    cls,
    instrument: Instrument,
    exposure: float,
    low_energy: float = 1e-300,
    high_energy: float = 1e300,
):
    """
    Create a mock observation configuration from an instrument object. The fake observation will have zero counts.

    Parameters:
        instrument: The instrument object.
        exposure: The total exposure of the mock observation.
        low_energy: The lower bound of the energy range to consider.
        high_energy: The upper bound of the energy range to consider.
    """

    n_channels = len(instrument.coords["instrument_channel"])

    observation = Observation.from_matrix(
        np.zeros(n_channels),
        sparse.eye(n_channels),
        np.arange(n_channels),
        np.zeros(n_channels, dtype=bool),
        exposure,
        backratio=np.ones(n_channels),
        attributes={"description": "Mock observation"} | instrument.attrs,
    )

    return cls.from_instrument(
        instrument, observation, low_energy=low_energy, high_energy=high_energy
    )

Instrument

Bases: Dataset

Class to store the data of an instrument

Source code in src/jaxspec/data/instrument.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
class Instrument(xr.Dataset):
    """
    Class to store the data of an instrument
    """

    redistribution: xr.DataArray
    """The photon redistribution probability matrix"""
    area: xr.DataArray
    """The effective area of the instrument"""

    __slots__ = (
        "redistribution",
        "area",
        "e_min_out",
        "e_max_out",
        "e_min_in",
        "e_max_in",
    )

    @classmethod
    def from_matrix(
        cls,
        redistribution_matrix,
        spectral_response,
        e_min_unfolded,
        e_max_unfolded,
        e_min_channel,
        e_max_channel,
    ):
        return cls(
            {
                "redistribution": (
                    ["instrument_channel", "unfolded_channel"],
                    redistribution_matrix,
                    {"description": "Redistribution matrix"},
                ),
                "area": (
                    ["unfolded_channel"],
                    np.array(spectral_response, dtype=np.float64),
                    {"description": "Effective area", "units": "cm^2"},
                ),
            },
            coords={
                "e_min_unfolded": (
                    ["unfolded_channel"],
                    np.array(e_min_unfolded, dtype=np.float64),
                    {"description": "Low bin energy for ingoing channels", "units": "keV"},
                ),
                "e_max_unfolded": (
                    ["unfolded_channel"],
                    np.array(e_max_unfolded, dtype=np.float64),
                    {"description": "High bin energy for ingoing channels", "units": "keV"},
                ),
                "e_min_channel": (
                    ["instrument_channel"],
                    np.array(e_min_channel, dtype=np.float64),
                    {"description": "Low bin energy for outgoing channels", "units": "keV"},
                ),
                "e_max_channel": (
                    ["instrument_channel"],
                    np.array(e_max_channel, dtype=np.float64),
                    {"description": "High bin energy for outgoing channels", "units": "keV"},
                ),
            },
            attrs={"description": "X-ray instrument response dataset"},
        )

    @classmethod
    def from_ogip_file(cls, rmf_path: str | os.PathLike, arf_path: str | os.PathLike | None = None):
        """
        Load the data from OGIP files.

        Parameters:
            rmf_path: The RMF file path.
            arf_path: The ARF file path.
        """

        rmf = DataRMF.from_file(rmf_path)

        if arf_path is not None:
            specresp = DataARF.from_file(arf_path).specresp

        else:
            specresp = rmf.matrix.sum(axis=0)
            rmf.sparse_matrix /= specresp

        return cls.from_matrix(
            rmf.sparse_matrix, specresp, rmf.energ_lo, rmf.energ_hi, rmf.e_min, rmf.e_max
        )

    def plot_redistribution(
        self,
        xscale: str = "log",
        yscale: str = "log",
        cmap=None,
        vmin: float = 1e-6,
        vmax: float = 1e0,
        add_labels: bool = True,
        **kwargs,
    ):
        """
        Plot the redistribution probability matrix

        Parameters:
            xscale : The scale of the x-axis.
            yscale : The scale of the y-axis.
            cmap : The colormap to use.
            vmin : The minimum value for the colormap.
            vmax : The maximum value for the colormap.
            add_labels : Whether to add labels to the plot.
            **kwargs : `kwargs` passed to https://docs.xarray.dev/en/latest/generated/xarray.plot.pcolormesh.html#xarray.plot.pcolormesh
        """

        import cmasher as cmr

        return xr.plot.pcolormesh(
            self.redistribution,
            x="e_max_unfolded",
            y="e_max_channel",
            xscale=xscale,
            yscale=yscale,
            cmap=cmr.ember_r if cmap is None else cmap,
            norm=colors.LogNorm(vmin=vmin, vmax=vmax),
            add_labels=add_labels,
            **kwargs,
        )

    def plot_area(self, xscale: str = "log", yscale: str = "log", where: str = "post", **kwargs):
        """
        Plot the effective area

        Parameters:
            xscale : The scale of the x-axis.
            yscale : The scale of the y-axis.
            where : The position of the steps.
            **kwargs : `kwargs` passed to https://docs.xarray.dev/en/latest/generated/xarray.DataArray.plot.line.html#xarray.DataArray.plot.line
        """

        return self.area.plot.step(
            x="e_min_unfolded", xscale=xscale, yscale=yscale, where=where, **kwargs
        )

area instance-attribute

The effective area of the instrument

redistribution instance-attribute

The photon redistribution probability matrix

from_ogip_file(rmf_path, arf_path=None) classmethod

Load the data from OGIP files.

Parameters:

Name Type Description Default
rmf_path str | PathLike

The RMF file path.

required
arf_path str | PathLike | None

The ARF file path.

None
Source code in src/jaxspec/data/instrument.py
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@classmethod
def from_ogip_file(cls, rmf_path: str | os.PathLike, arf_path: str | os.PathLike | None = None):
    """
    Load the data from OGIP files.

    Parameters:
        rmf_path: The RMF file path.
        arf_path: The ARF file path.
    """

    rmf = DataRMF.from_file(rmf_path)

    if arf_path is not None:
        specresp = DataARF.from_file(arf_path).specresp

    else:
        specresp = rmf.matrix.sum(axis=0)
        rmf.sparse_matrix /= specresp

    return cls.from_matrix(
        rmf.sparse_matrix, specresp, rmf.energ_lo, rmf.energ_hi, rmf.e_min, rmf.e_max
    )

plot_area(xscale='log', yscale='log', where='post', **kwargs)

Plot the effective area

Parameters:

Name Type Description Default
xscale

The scale of the x-axis.

'log'
yscale

The scale of the y-axis.

'log'
where

The position of the steps.

'post'
**kwargs

kwargs passed to https://docs.xarray.dev/en/latest/generated/xarray.DataArray.plot.line.html#xarray.DataArray.plot.line

{}
Source code in src/jaxspec/data/instrument.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def plot_area(self, xscale: str = "log", yscale: str = "log", where: str = "post", **kwargs):
    """
    Plot the effective area

    Parameters:
        xscale : The scale of the x-axis.
        yscale : The scale of the y-axis.
        where : The position of the steps.
        **kwargs : `kwargs` passed to https://docs.xarray.dev/en/latest/generated/xarray.DataArray.plot.line.html#xarray.DataArray.plot.line
    """

    return self.area.plot.step(
        x="e_min_unfolded", xscale=xscale, yscale=yscale, where=where, **kwargs
    )

plot_redistribution(xscale='log', yscale='log', cmap=None, vmin=1e-06, vmax=1.0, add_labels=True, **kwargs)

Plot the redistribution probability matrix

Parameters:

Name Type Description Default
xscale

The scale of the x-axis.

'log'
yscale

The scale of the y-axis.

'log'
cmap

The colormap to use.

None
vmin

The minimum value for the colormap.

1e-06
vmax

The maximum value for the colormap.

1.0
add_labels

Whether to add labels to the plot.

True
**kwargs

kwargs passed to https://docs.xarray.dev/en/latest/generated/xarray.plot.pcolormesh.html#xarray.plot.pcolormesh

{}
Source code in src/jaxspec/data/instrument.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def plot_redistribution(
    self,
    xscale: str = "log",
    yscale: str = "log",
    cmap=None,
    vmin: float = 1e-6,
    vmax: float = 1e0,
    add_labels: bool = True,
    **kwargs,
):
    """
    Plot the redistribution probability matrix

    Parameters:
        xscale : The scale of the x-axis.
        yscale : The scale of the y-axis.
        cmap : The colormap to use.
        vmin : The minimum value for the colormap.
        vmax : The maximum value for the colormap.
        add_labels : Whether to add labels to the plot.
        **kwargs : `kwargs` passed to https://docs.xarray.dev/en/latest/generated/xarray.plot.pcolormesh.html#xarray.plot.pcolormesh
    """

    import cmasher as cmr

    return xr.plot.pcolormesh(
        self.redistribution,
        x="e_max_unfolded",
        y="e_max_channel",
        xscale=xscale,
        yscale=yscale,
        cmap=cmr.ember_r if cmap is None else cmap,
        norm=colors.LogNorm(vmin=vmin, vmax=vmax),
        add_labels=add_labels,
        **kwargs,
    )

Observation

Bases: Dataset

Class to store the data of an observation

Source code in src/jaxspec/data/observation.py
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
class Observation(xr.Dataset):
    """
    Class to store the data of an observation
    """

    counts: xr.DataArray
    """The observed counts"""
    folded_counts: xr.DataArray
    """The observed counts, after grouping"""
    grouping: xr.DataArray
    """The grouping matrix"""
    quality: xr.DataArray
    """The quality flag"""
    exposure: xr.DataArray
    """The total exposure"""
    background: xr.DataArray
    """The background counts if provided, otherwise 0"""
    folded_background: xr.DataArray
    """The background counts, after grouping"""

    __slots__ = (
        "grouping",
        "channel",
        "quality",
        "exposure",
        "background",
        "folded_background",
        "counts",
        "folded_counts",
    )

    _default_attributes = {"description": "X-ray observation dataset"}

    @classmethod
    def from_matrix(
        cls,
        counts,
        grouping,
        channel,
        quality,
        exposure,
        background=None,
        background_unscaled=None,
        backratio=1.0,
        attributes: dict | None = None,
    ):
        if attributes is None:
            attributes = {}

        if background is None or background_unscaled is None:
            background = np.zeros_like(counts, dtype=np.int64)
            background_unscaled = np.zeros_like(counts, dtype=np.int64)

        data_dict = {
            "counts": (
                ["instrument_channel"],
                np.asarray(counts, dtype=np.int64),
                {"description": "Counts", "unit": "photons"},
            ),
            "folded_counts": (
                ["folded_channel"],
                np.asarray(np.ma.filled(grouping @ counts), dtype=np.int64),
                {"description": "Folded counts, after grouping", "unit": "photons"},
            ),
            "grouping": (
                ["folded_channel", "instrument_channel"],
                grouping,
                {"description": "Grouping matrix."},
            ),
            "quality": (
                ["instrument_channel"],
                np.asarray(quality, dtype=np.int64),
                {"description": "Quality flag."},
            ),
            "exposure": ([], float(exposure), {"description": "Total exposure", "unit": "s"}),
            "backratio": (
                ["instrument_channel"],
                np.asarray(backratio, dtype=float),
                {"description": "Background scaling (SRC_BACKSCAL/BKG_BACKSCAL)"},
            ),
            "folded_backratio": (
                ["folded_channel"],
                np.asarray(np.ma.filled(grouping @ backratio), dtype=float),
                {"description": "Background scaling after grouping"},
            ),
            "background": (
                ["instrument_channel"],
                np.asarray(background, dtype=np.int64),
                {"description": "Background counts", "unit": "photons"},
            ),
            "folded_background_unscaled": (
                ["folded_channel"],
                np.asarray(np.ma.filled(grouping @ background_unscaled), dtype=np.int64),
                {"description": "Background counts", "unit": "photons"},
            ),
            "folded_background": (
                ["folded_channel"],
                np.asarray(np.ma.filled(grouping @ background), dtype=np.float64),
                {"description": "Background counts", "unit": "photons"},
            ),
        }

        return cls(
            data_dict,
            coords={
                "channel": (
                    ["instrument_channel"],
                    np.asarray(channel, dtype=np.int64),
                    {"description": "Channel number"},
                ),
                "grouped_channel": (
                    ["folded_channel"],
                    np.arange(len(grouping @ counts), dtype=np.int64),
                    {"description": "Channel number"},
                ),
            },
            attrs=cls._default_attributes
            if attributes is None
            else attributes | cls._default_attributes,
        )

    @classmethod
    def from_ogip_container(cls, pha: DataPHA, bkg: DataPHA | None = None, **metadata):
        if bkg is not None:
            backratio = np.nan_to_num(
                (pha.backscal * pha.exposure * pha.areascal)
                / (bkg.backscal * bkg.exposure * bkg.areascal)
            )
        else:
            backratio = np.ones_like(pha.counts)

        if (bkg is not None) and ("NET" in pha.flags):
            counts = pha.counts + bkg.counts * backratio
        else:
            counts = pha.counts

        return cls.from_matrix(
            counts,
            pha.grouping,
            pha.channel,
            pha.quality,
            pha.exposure,
            backratio=backratio,
            background=bkg.counts * backratio if bkg is not None else None,
            background_unscaled=bkg.counts if bkg is not None else None,
            attributes=metadata,
        )

    @classmethod
    def from_pha_file(cls, pha_path: str, bkg_path: str | None = None, **metadata):
        """
        Build an observation from a PHA file

        Parameters:
            pha_path : Path to the PHA file
            bkg_path : Path to the background file
            metadata : Additional metadata to add to the observation
        """
        from .util import data_path_finder

        arf_path, rmf_path, bkg_path_default = data_path_finder(pha_path)
        bkg_path = bkg_path_default if bkg_path is None else bkg_path

        pha = DataPHA.from_file(pha_path)
        bkg = DataPHA.from_file(bkg_path) if bkg_path is not None else None

        if metadata is None:
            metadata = {}

        metadata.update(
            observation_file=pha_path,
            background_file=bkg_path,
            response_matrix_file=rmf_path,
            ancillary_response_file=arf_path,
        )

        return cls.from_ogip_container(pha, bkg=bkg, **metadata)

    def plot_counts(self, **kwargs):
        """
        Plot the counts

        Parameters:
            **kwargs : `kwargs` passed to https://docs.xarray.dev/en/latest/generated/xarray.DataArray.plot.step.html#xarray.DataArray.plot.line
        """

        return self.counts.plot.step(x="instrument_channel", yscale="log", where="post", **kwargs)

    def plot_grouping(self):
        """
        Plot the grouping matrix and compare the grouped counts to the true counts
        in the original channels.
        """

        import matplotlib.pyplot as plt
        import seaborn as sns

        fig = plt.figure(figsize=(6, 6))
        gs = fig.add_gridspec(
            2,
            2,
            width_ratios=(4, 1),
            height_ratios=(1, 4),
            left=0.1,
            right=0.9,
            bottom=0.1,
            top=0.9,
            wspace=0.05,
            hspace=0.05,
        )
        ax = fig.add_subplot(gs[1, 0])
        ax_histx = fig.add_subplot(gs[0, 0], sharex=ax)
        ax_histy = fig.add_subplot(gs[1, 1], sharey=ax)
        sns.heatmap(self.grouping.data.todense().T, ax=ax, cbar=False)
        ax_histx.step(np.arange(len(self.folded_counts)), self.folded_counts, where="post")
        ax_histy.step(self.counts, np.arange(len(self.counts)), where="post")

        ax.set_xlabel("Grouped channels")
        ax.set_ylabel("Channels")
        ax_histx.set_ylabel("Grouped counts")
        ax_histy.set_xlabel("Counts")

        ax_histx.semilogy()
        ax_histy.semilogx()

        _ = [label.set_visible(False) for label in ax_histx.get_xticklabels()]
        _ = [label.set_visible(False) for label in ax_histy.get_yticklabels()]

background instance-attribute

The background counts if provided, otherwise 0

counts instance-attribute

The observed counts

exposure instance-attribute

The total exposure

folded_background instance-attribute

The background counts, after grouping

folded_counts instance-attribute

The observed counts, after grouping

grouping instance-attribute

The grouping matrix

quality instance-attribute

The quality flag

from_pha_file(pha_path, bkg_path=None, **metadata) classmethod

Build an observation from a PHA file

Parameters:

Name Type Description Default
pha_path

Path to the PHA file

required
bkg_path

Path to the background file

None
metadata

Additional metadata to add to the observation

{}
Source code in src/jaxspec/data/observation.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
@classmethod
def from_pha_file(cls, pha_path: str, bkg_path: str | None = None, **metadata):
    """
    Build an observation from a PHA file

    Parameters:
        pha_path : Path to the PHA file
        bkg_path : Path to the background file
        metadata : Additional metadata to add to the observation
    """
    from .util import data_path_finder

    arf_path, rmf_path, bkg_path_default = data_path_finder(pha_path)
    bkg_path = bkg_path_default if bkg_path is None else bkg_path

    pha = DataPHA.from_file(pha_path)
    bkg = DataPHA.from_file(bkg_path) if bkg_path is not None else None

    if metadata is None:
        metadata = {}

    metadata.update(
        observation_file=pha_path,
        background_file=bkg_path,
        response_matrix_file=rmf_path,
        ancillary_response_file=arf_path,
    )

    return cls.from_ogip_container(pha, bkg=bkg, **metadata)

plot_counts(**kwargs)

Plot the counts

Parameters:

Name Type Description Default
**kwargs

kwargs passed to https://docs.xarray.dev/en/latest/generated/xarray.DataArray.plot.step.html#xarray.DataArray.plot.line

{}
Source code in src/jaxspec/data/observation.py
185
186
187
188
189
190
191
192
193
def plot_counts(self, **kwargs):
    """
    Plot the counts

    Parameters:
        **kwargs : `kwargs` passed to https://docs.xarray.dev/en/latest/generated/xarray.DataArray.plot.step.html#xarray.DataArray.plot.line
    """

    return self.counts.plot.step(x="instrument_channel", yscale="log", where="post", **kwargs)

plot_grouping()

Plot the grouping matrix and compare the grouped counts to the true counts in the original channels.

Source code in src/jaxspec/data/observation.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
def plot_grouping(self):
    """
    Plot the grouping matrix and compare the grouped counts to the true counts
    in the original channels.
    """

    import matplotlib.pyplot as plt
    import seaborn as sns

    fig = plt.figure(figsize=(6, 6))
    gs = fig.add_gridspec(
        2,
        2,
        width_ratios=(4, 1),
        height_ratios=(1, 4),
        left=0.1,
        right=0.9,
        bottom=0.1,
        top=0.9,
        wspace=0.05,
        hspace=0.05,
    )
    ax = fig.add_subplot(gs[1, 0])
    ax_histx = fig.add_subplot(gs[0, 0], sharex=ax)
    ax_histy = fig.add_subplot(gs[1, 1], sharey=ax)
    sns.heatmap(self.grouping.data.todense().T, ax=ax, cbar=False)
    ax_histx.step(np.arange(len(self.folded_counts)), self.folded_counts, where="post")
    ax_histy.step(self.counts, np.arange(len(self.counts)), where="post")

    ax.set_xlabel("Grouped channels")
    ax.set_ylabel("Channels")
    ax_histx.set_ylabel("Grouped counts")
    ax_histy.set_xlabel("Counts")

    ax_histx.semilogy()
    ax_histy.semilogx()

    _ = [label.set_visible(False) for label in ax_histx.get_xticklabels()]
    _ = [label.set_visible(False) for label in ax_histy.get_yticklabels()]

OGIP data parsers

DataARF

Class to handle ARF data defined with OGIP standards.

References
Source code in src/jaxspec/data/ogip.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class DataARF:
    r"""
    Class to handle ARF data defined with OGIP standards.

    ??? info "References"
        * [The Calibration Requirements for Spectral Analysis (Definition of RMF and ARF file formats)](https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html)
        * [The Calibration Requirements for Spectral Analysis Addendum: Changes log](https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002a/cal_gen_92_002a.html)
    """

    def __init__(self, energ_lo, energ_hi, specresp):
        self.specresp = specresp
        self.energ_lo = energ_lo
        self.energ_hi = energ_hi

    @classmethod
    def from_file(cls, arf_file: str | os.PathLike):
        """
        Load the data from an ARF file.

        Parameters:
            arf_file: The ARF file path.
        """

        arf_table = QTable.read(arf_file)

        return cls(
            arf_table["ENERG_LO"],
            arf_table["ENERG_HI"],
            arf_table["SPECRESP"].to(u.cm**2).value,
        )

from_file(arf_file) classmethod

Load the data from an ARF file.

Parameters:

Name Type Description Default
arf_file str | PathLike

The ARF file path.

required
Source code in src/jaxspec/data/ogip.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@classmethod
def from_file(cls, arf_file: str | os.PathLike):
    """
    Load the data from an ARF file.

    Parameters:
        arf_file: The ARF file path.
    """

    arf_table = QTable.read(arf_file)

    return cls(
        arf_table["ENERG_LO"],
        arf_table["ENERG_HI"],
        arf_table["SPECRESP"].to(u.cm**2).value,
    )

DataPHA

Class to handle PHA data defined with OGIP standards.

References
Source code in src/jaxspec/data/ogip.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class DataPHA:
    r"""
    Class to handle PHA data defined with OGIP standards.
    ??? info "References"
        * [The OGIP standard PHA file format](https://heasarc.gsfc.nasa.gov/docs/heasarc/ofwg/docs/spectra/ogip_92_007/node5.html)
    """

    def __init__(
        self,
        channel,
        counts,
        exposure,
        grouping=None,
        quality=None,
        backfile=None,
        respfile=None,
        ancrfile=None,
        backscal=1.0,
        areascal=1.0,
        flags=None,
    ):
        self.channel = np.asarray(channel, dtype=int)
        self.counts = np.asarray(counts, dtype=int)
        self.exposure = float(exposure)

        self.quality = np.asarray(quality, dtype=int)
        self.backfile = backfile
        self.respfile = respfile
        self.ancrfile = ancrfile
        self.backscal = np.asarray(backscal, dtype=float)
        self.areascal = np.asarray(areascal, dtype=float)
        self.flags = flags

        if grouping is not None:
            # Indices array of the beginning of each group
            b_grp = np.where(grouping == 1)[0]
            # Indices array of the ending of each group
            e_grp = np.hstack((b_grp[1:], len(channel)))

            # Prepare data for sparse matrix
            rows = []
            cols = []
            data = []

            for i in range(len(b_grp)):
                for j in range(b_grp[i], e_grp[i]):
                    rows.append(i)
                    cols.append(j)
                    data.append(True)

            # Create a COO sparse matrix
            grp_matrix = sparse.COO(
                (data, (rows, cols)), shape=(len(b_grp), len(channel)), fill_value=0
            )

        else:
            # Identity matrix case, use sparse for efficiency
            grp_matrix = sparse.eye(len(channel), format="coo", dtype=bool)

        self.grouping = grp_matrix

    @classmethod
    def from_file(cls, pha_file: str | os.PathLike):
        """
        Load the data from a PHA file.

        Parameters:
            pha_file: The PHA file path.
        """

        data = QTable.read(pha_file, "SPECTRUM")
        header = fits.getheader(pha_file, "SPECTRUM")
        flags = []

        if header.get("HDUCLAS3") == "RATE":
            raise ValueError(
                f"The HDUCLAS3={header.get('HDUCLAS3')} keyword in the PHA file is not supported."
                f"Please open an issue if this is required."
            )

        if header.get("HDUCLAS4") == "TYPE:II":
            raise ValueError(
                f"The HDUCLAS4={header.get('HDUCLAS4')} keyword in the PHA file is not supported."
                f"Please open an issue if this is required."
            )

        if header.get("GROUPING") == 0:
            grouping = None
        elif "GROUPING" in data.colnames:
            grouping = data["GROUPING"]
        else:
            raise ValueError("No grouping column found in the PHA file.")

        if header.get("QUALITY") == 0:
            quality = np.zeros(len(data["CHANNEL"]), dtype=bool)
        elif "QUALITY" in data.colnames:
            quality = data["QUALITY"]
        else:
            raise ValueError("No QUALITY column found in the PHA file.")

        if "BACKSCAL" in header:
            backscal = header["BACKSCAL"] * np.ones_like(data["CHANNEL"])
        elif "BACKSCAL" in data.colnames:
            backscal = data["BACKSCAL"]
        else:
            raise ValueError("No BACKSCAL found in the PHA file.")

        backscal = np.where(backscal == 0, 1.0, backscal)

        if "AREASCAL" in header:
            areascal = header["AREASCAL"]
        elif "AREASCAL" in data.colnames:
            areascal = data["AREASCAL"]
        else:
            raise ValueError("No AREASCAL found in the PHA file.")

        if header.get("HDUCLAS2") == "NET":
            flags.append("NET")

        kwargs = {
            "grouping": grouping,
            "quality": quality,
            "backfile": header.get("BACKFILE"),
            "respfile": header.get("RESPFILE"),
            "ancrfile": header.get("ANCRFILE"),
            "backscal": backscal,
            "areascal": areascal,
            "flags": flags,
        }

        return cls(data["CHANNEL"], data["COUNTS"], header["EXPOSURE"], **kwargs)

from_file(pha_file) classmethod

Load the data from a PHA file.

Parameters:

Name Type Description Default
pha_file str | PathLike

The PHA file path.

required
Source code in src/jaxspec/data/ogip.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
@classmethod
def from_file(cls, pha_file: str | os.PathLike):
    """
    Load the data from a PHA file.

    Parameters:
        pha_file: The PHA file path.
    """

    data = QTable.read(pha_file, "SPECTRUM")
    header = fits.getheader(pha_file, "SPECTRUM")
    flags = []

    if header.get("HDUCLAS3") == "RATE":
        raise ValueError(
            f"The HDUCLAS3={header.get('HDUCLAS3')} keyword in the PHA file is not supported."
            f"Please open an issue if this is required."
        )

    if header.get("HDUCLAS4") == "TYPE:II":
        raise ValueError(
            f"The HDUCLAS4={header.get('HDUCLAS4')} keyword in the PHA file is not supported."
            f"Please open an issue if this is required."
        )

    if header.get("GROUPING") == 0:
        grouping = None
    elif "GROUPING" in data.colnames:
        grouping = data["GROUPING"]
    else:
        raise ValueError("No grouping column found in the PHA file.")

    if header.get("QUALITY") == 0:
        quality = np.zeros(len(data["CHANNEL"]), dtype=bool)
    elif "QUALITY" in data.colnames:
        quality = data["QUALITY"]
    else:
        raise ValueError("No QUALITY column found in the PHA file.")

    if "BACKSCAL" in header:
        backscal = header["BACKSCAL"] * np.ones_like(data["CHANNEL"])
    elif "BACKSCAL" in data.colnames:
        backscal = data["BACKSCAL"]
    else:
        raise ValueError("No BACKSCAL found in the PHA file.")

    backscal = np.where(backscal == 0, 1.0, backscal)

    if "AREASCAL" in header:
        areascal = header["AREASCAL"]
    elif "AREASCAL" in data.colnames:
        areascal = data["AREASCAL"]
    else:
        raise ValueError("No AREASCAL found in the PHA file.")

    if header.get("HDUCLAS2") == "NET":
        flags.append("NET")

    kwargs = {
        "grouping": grouping,
        "quality": quality,
        "backfile": header.get("BACKFILE"),
        "respfile": header.get("RESPFILE"),
        "ancrfile": header.get("ANCRFILE"),
        "backscal": backscal,
        "areascal": areascal,
        "flags": flags,
    }

    return cls(data["CHANNEL"], data["COUNTS"], header["EXPOSURE"], **kwargs)

DataRMF

Class to handle RMF data defined with OGIP standards.

References
Source code in src/jaxspec/data/ogip.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
class DataRMF:
    r"""
    Class to handle RMF data defined with OGIP standards.
    ??? info "References"
        * [The Calibration Requirements for Spectral Analysis (Definition of RMF and ARF file formats)](https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002/cal_gen_92_002.html)
        * [The Calibration Requirements for Spectral Analysis Addendum: Changes log](https://heasarc.gsfc.nasa.gov/docs/heasarc/caldb/docs/memos/cal_gen_92_002a/cal_gen_92_002a.html)
    """

    def __init__(
        self,
        energ_lo,
        energ_hi,
        n_grp,
        f_chan,
        n_chan,
        matrix,
        channel,
        e_min,
        e_max,
        low_threshold=0.0,
    ):
        # RMF stuff
        self.energ_lo = energ_lo  # "Entry" energies
        self.energ_hi = energ_hi  # "Entry" energies
        self.n_grp = n_grp
        self.f_chan = f_chan
        self.n_chan = n_chan
        self.matrix_entry = matrix

        # Detector channels
        self.channel = channel
        self.e_min = e_min
        self.e_max = e_max

        # Prepare data for sparse matrix
        rows = []
        cols = []
        data = []

        for i, n_grp_val in enumerate(self.n_grp):
            base = 0

            if np.size(self.f_chan[i]) == 1:
                low = int(self.f_chan[i].ravel()[0])
                high = min(
                    int(self.f_chan[i].ravel()[0] + self.n_chan[i].ravel()[0]),
                    len(self.channel),
                )

                rows.extend([i] * (high - low))
                cols.extend(range(low, high))
                data.extend(self.matrix_entry[i][0 : high - low])

            else:
                for j in range(n_grp_val):
                    low = self.f_chan[i][j]
                    high = min(self.f_chan[i][j] + self.n_chan[i][j], len(self.channel))

                    rows.extend([i] * (high - low))
                    cols.extend(range(low, high))
                    data.extend(self.matrix_entry[i][base : base + self.n_chan[i][j]])

                    base += self.n_chan[i][j]

        # Convert lists to numpy arrays
        rows = np.array(rows)
        cols = np.array(cols)
        data = np.array(data)

        # Sometimes, zero elements are given in the matrix rows, so we get rid of them
        idxs = data > low_threshold

        # Create a COO sparse matrix and then convert to CSR for efficiency
        coo = sparse.COO(
            [rows[idxs], cols[idxs]], data[idxs], shape=(len(self.energ_lo), len(self.channel))
        )
        self.sparse_matrix = coo.T  # .tocsr()

    @property
    def matrix(self):
        return np.asarray(self.sparse_matrix.todense())

    @classmethod
    def from_file(cls, rmf_file: str | os.PathLike):
        """
        Load the data from an RMF file.

        Parameters:
            rmf_file: The RMF file path.
        """
        extension_names = [hdu[1] for hdu in fits.info(rmf_file, output=False)]

        if "MATRIX" in extension_names:
            matrix_extension = "MATRIX"

        elif "SPECRESP MATRIX" in extension_names:
            matrix_extension = "SPECRESP MATRIX"
            # raise NotImplementedError("SPECRESP MATRIX extension is not yet supported")

        else:
            raise ValueError("No MATRIX or SPECRESP MATRIX extension found in the RMF file")

        matrix_table = QTable.read(rmf_file, matrix_extension)
        ebounds_table = QTable.read(rmf_file, "EBOUNDS")

        matrix_header = fits.getheader(rmf_file, matrix_extension)

        f_chan_column_pos = list(matrix_table.columns).index("F_CHAN") + 1
        tlmin_fchan = int(matrix_header[f"TLMIN{f_chan_column_pos}"])

        return cls(
            matrix_table["ENERG_LO"],
            matrix_table["ENERG_HI"],
            matrix_table["N_GRP"],
            matrix_table["F_CHAN"] - tlmin_fchan,
            matrix_table["N_CHAN"],
            matrix_table["MATRIX"],
            ebounds_table["CHANNEL"],
            ebounds_table["E_MIN"],
            ebounds_table["E_MAX"],
        )

from_file(rmf_file) classmethod

Load the data from an RMF file.

Parameters:

Name Type Description Default
rmf_file str | PathLike

The RMF file path.

required
Source code in src/jaxspec/data/ogip.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
@classmethod
def from_file(cls, rmf_file: str | os.PathLike):
    """
    Load the data from an RMF file.

    Parameters:
        rmf_file: The RMF file path.
    """
    extension_names = [hdu[1] for hdu in fits.info(rmf_file, output=False)]

    if "MATRIX" in extension_names:
        matrix_extension = "MATRIX"

    elif "SPECRESP MATRIX" in extension_names:
        matrix_extension = "SPECRESP MATRIX"
        # raise NotImplementedError("SPECRESP MATRIX extension is not yet supported")

    else:
        raise ValueError("No MATRIX or SPECRESP MATRIX extension found in the RMF file")

    matrix_table = QTable.read(rmf_file, matrix_extension)
    ebounds_table = QTable.read(rmf_file, "EBOUNDS")

    matrix_header = fits.getheader(rmf_file, matrix_extension)

    f_chan_column_pos = list(matrix_table.columns).index("F_CHAN") + 1
    tlmin_fchan = int(matrix_header[f"TLMIN{f_chan_column_pos}"])

    return cls(
        matrix_table["ENERG_LO"],
        matrix_table["ENERG_HI"],
        matrix_table["N_GRP"],
        matrix_table["F_CHAN"] - tlmin_fchan,
        matrix_table["N_CHAN"],
        matrix_table["MATRIX"],
        ebounds_table["CHANNEL"],
        ebounds_table["E_MIN"],
        ebounds_table["E_MAX"],
    )