Skip to content

Commit

Permalink
Merge pull request #781 from StingraySoftware/colors
Browse files Browse the repository at this point in the history
Colors (as in color-color or hardness-intensity diagrams)
  • Loading branch information
matteobachetti committed Jan 15, 2024
2 parents 1fc9b90 + 115b9e6 commit a4cafa4
Show file tree
Hide file tree
Showing 7 changed files with 609 additions and 194 deletions.
1 change: 1 addition & 0 deletions docs/changes/781.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Calculate colors and intensities on a segment-by-segment basis in event lists
273 changes: 261 additions & 12 deletions stingray/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pickle
import warnings
import copy
import os
import logging

import numpy as np
Expand All @@ -33,8 +32,9 @@
gti_border_bins,
get_btis,
merge_gtis,
check_separate,
append_gtis,
get_total_gti_length,
bin_intervals_from_gtis,
time_intervals_from_gtis,
)

from typing import TYPE_CHECKING, Type, TypeVar, Union
Expand Down Expand Up @@ -247,8 +247,8 @@ def pretty_print(self, func_to_apply=None, attrs_to_apply=[], attrs_to_discard=[
This is useful for debugging, and for interactive use.
Optional parameters
-------------------
Other parameters
----------------
func_to_apply : function
A function that modifies the attributes listed in ``attrs_to_apply``.
It must return the modified attributes and a label to be printed.
Expand Down Expand Up @@ -1053,6 +1053,7 @@ def __getitem__(self, index):
raise IndexError("The index must be either an integer or a slice " "object !")

new_ts = type(self)()

for attr in self.meta_attrs():
setattr(new_ts, attr, copy.deepcopy(getattr(self, attr)))

Expand All @@ -1062,6 +1063,17 @@ def __getitem__(self, index):
return new_ts


def _ts_sum(ts):
"""Sum the number of values of a time series object.
If it has a ``counts`` attribute, sum the counts. Otherwise, count the number
of time samples. Masked values are ignored.
"""
if hasattr(ts, "counts"):
return np.sum(ts.counts[ts.mask])
return np.count_nonzero(ts.mask)


class StingrayTimeseries(StingrayObject):
"""Basic class for time series data.
Expand Down Expand Up @@ -1128,7 +1140,10 @@ class StingrayTimeseries(StingrayObject):
dt: float
The time resolution of the measurements. Can be a scalar or an array attribute (useful
for non-evenly sampled data or events from different instruments)
for non-evenly sampled data or events from different instruments). It can also be 0, which
means that the time series is not evenly sampled and the effects of the time resolution are
considered negligible for the analysis. This is sometimes the case for events from
high-energy telescopes.
mjdref : float
The MJD used as a reference for the time array.
Expand Down Expand Up @@ -1216,8 +1231,12 @@ def gti(self, value):

@property
def mask(self):
if self._mask is None:
self._mask = create_gti_mask(self.time, self.gti, dt=self.dt)
if self._mask is not None:
return self._mask
if self._gti is not None:
self._mask = create_gti_mask(self.time, self._gti, dt=self.dt)
else:
self._mask = np.ones_like(self.time, dtype=bool)
return self._mask

@property
Expand Down Expand Up @@ -1291,6 +1310,19 @@ def _validate_and_format(self, value, attr_name, compare_to_attr):
)
return value

@property
def exposure(self):
"""
Return the total exposure of the time series, i.e. the sum of the GTIs.
Returns
-------
total_exposure : float
The total exposure of the time series, in seconds.
"""

return get_total_gti_length(self.gti)

def __eq__(self, other_ts):
return super().__eq__(other_ts)

Expand Down Expand Up @@ -1704,12 +1736,10 @@ def truncate(self, start=0, stop=None, method="index"):
>>> ts = StingrayTimeseries(time, array_attrs={"counts": count}, dt=1)
>>> ts_new = ts.truncate(start=2, stop=8)
>>> assert np.allclose(ts_new.counts, [30, 40, 50, 60, 70, 80])
>>> ts_new.time
array([3, 4, 5, 6, 7, 8])
>>> assert np.allclose(ts_new.time, [3, 4, 5, 6, 7, 8])
>>> # Truncation can also be done by time values
>>> ts_new = ts.truncate(start=6, method='time')
>>> ts_new.time
array([6, 7, 8, 9])
>>> assert np.allclose(ts_new.time, [6, 7, 8, 9])
>>> assert np.allclose(ts_new.counts, [60, 70, 80, 90])
"""

Expand Down Expand Up @@ -2405,6 +2435,225 @@ def plot(
)
return ax

def estimate_segment_size(self, min_counts=None, min_samples=None, even_sampling=None):
"""Estimate a reasonable segment length for segment-by-segment analysis.
The user has to specify a criterion based on a minimum number of counts (if
the time series has a ``counts`` attribute) or a minimum number of time samples.
At least one between ``min_counts`` and ``min_samples`` must be specified.
In the special case of a time series with ``dt=0`` (event list-like, where each time
stamp correspond to a single count), the two definitions are equivalent.
Other Parameters
----------------
min_counts : int
Minimum number of counts for each chunk. Optional (but needs ``min_samples``
if left unspecified). Only makes sense if the series has a ``counts`` attribute and
it is evenly sampled.
min_samples : int
Minimum number of time bins. Optional (but needs ``min_counts`` if left unspecified).
even_sampling : bool
Force the treatment of the data as evenly sampled or not. If None, the data are
considered evenly sampled if ``self.dt`` is larger than zero and the median
separation between subsequent times is within 1% of ``self.dt``.
Returns
-------
segment_size : float
The length of the light curve chunks that satisfies the conditions
Examples
--------
>>> import numpy as np
>>> time = np.arange(150)
>>> counts = np.zeros_like(time) + 3
>>> ts = StingrayTimeseries(time, counts=counts, dt=1)
>>> assert np.isclose(ts.estimate_segment_size(min_counts=10, min_samples=3), 4.0)
>>> assert np.isclose(ts.estimate_segment_size(min_counts=10, min_samples=5), 5.0)
>>> counts[2:4] = 1
>>> ts = StingrayTimeseries(time, counts=counts, dt=1)
>>> assert np.isclose(ts.estimate_segment_size(min_counts=3, min_samples=1), 3.0)
>>> # A slightly more complex example
>>> dt=0.2
>>> time = np.arange(0, 1000, dt)
>>> counts = np.random.poisson(100, size=len(time))
>>> ts = StingrayTimeseries(time, counts=counts, dt=dt)
>>> assert np.isclose(ts.estimate_segment_size(100, 2), 0.4)
>>> min_total_bins = 40
>>> assert np.isclose(ts.estimate_segment_size(100, 40), 8.0)
"""
if min_counts is None and min_samples is None:
raise ValueError("You have to specify at least one of min_counts or min_samples")

mean_data_separation = np.median(np.diff(self.time))

if even_sampling is None:
# The time series is considered evenly sampled if the median separation between
# subsequent times is within 1% of the time resolution
even_sampling = False
if (
self.dt is not None
and self.dt > 0
and np.isclose(mean_data_separation, self.dt, rtol=0.01)
):
even_sampling = True
logging.info(f"Data are {'not' if not even_sampling else ''} evenly sampled")

if min_counts is None:
if even_sampling and hasattr(self, "counts"):
min_counts = 0
else:
min_counts = min_samples

mean_ctrate = _ts_sum(self) / self.exposure

rough_estimate = np.ceil(min_counts / mean_ctrate)

# If data are evenly sampled, even sampling make the segment an integer multiple of dt.
# Otherwise, just use steps of 1 second.
if even_sampling:
step = self.dt
else:
step = 1.0

rough_estimate = np.ceil(min_counts / mean_ctrate / step) * step

segment_size = np.max([rough_estimate, min_samples * step])

keep_searching = True

while keep_searching:
start_times, stop_times, results = self.analyze_segments(_ts_sum, segment_size)
mincounts = np.min(results)
if mincounts >= min_counts:
keep_searching = False
else:
segment_size += step

return segment_size

def analyze_segments(self, func, segment_size, fraction_step=1, **kwargs):
"""Analyze segments of the light curve with any function.
Parameters
----------
func : function
Function accepting a :class:`StingrayTimeseries` object as single argument, plus
possible additional keyword arguments, and returning a number or a
tuple - e.g., ``(result, error)`` where both ``result`` and ``error`` are
numbers.
segment_size : float
Length in seconds of the light curve segments. If None, the full GTIs are considered
instead as segments.
Other parameters
----------------
fraction_step : float
If the step is not a full ``segment_size`` but less (e.g. a moving window),
this indicates the ratio between step step and ``segment_size`` (e.g.
0.5 means that the window shifts of half ``segment_size``)
kwargs : keyword arguments
These additional keyword arguments, if present, they will be passed
to ``func``
Returns
-------
start_times : array
Lower time boundaries of all time segments.
stop_times : array
upper time boundaries of all segments.
result : array of N elements
The result of ``func`` for each segment of the light curve
Examples
--------
>>> import numpy as np
>>> time = np.arange(0, 10, 0.1)
>>> counts = np.zeros_like(time) + 10
>>> ts = StingrayTimeseries(time, counts=counts, dt=0.1)
>>> # Define a function that calculates the mean
>>> mean_func = lambda ts: np.mean(ts.counts)
>>> # Calculate the mean in segments of 5 seconds
>>> start, stop, res = ts.analyze_segments(mean_func, 5)
>>> len(res) == 2
True
>>> np.allclose(res, 10)
True
"""

if segment_size is None:
start_times = self.gti[:, 0]
stop_times = self.gti[:, 1]
start = np.searchsorted(self.time, start_times)
stop = np.searchsorted(self.time, stop_times)
elif self.dt > 0:
start, stop = bin_intervals_from_gtis(
self.gti, segment_size, self.time, fraction_step=fraction_step, dt=self.dt
)
start_times = self.time[start] - 0.5 * self.dt
# Remember that stop is one element above the last element, because
# it's defined to be used in intervals start:stop
stop_times = self.time[stop - 1] + self.dt * 1.5
else:
start_times, stop_times = time_intervals_from_gtis(
self.gti, segment_size, fraction_step=fraction_step
)
start = np.searchsorted(self.time, start_times)
stop = np.searchsorted(self.time, stop_times)

results = []
for i, (st, sp, tst, tsp) in enumerate(zip(start, stop, start_times, stop_times)):
if sp - st <= 1:
warnings.warn(
f"Segment {i} ({tst}--{tsp}) has one data point or less. Skipping it."
)
res = np.nan
else:
lc_filt = self[st:sp]
lc_filt.gti = np.asarray([[tst, tsp]])

res = func(lc_filt, **kwargs)
results.append(res)

results = np.array(results)

if len(results.shape) == 2:
results = [results[:, i] for i in range(results.shape[1])]
return start_times, stop_times, results

def analyze_by_gti(self, func, fraction_step=1, **kwargs):
"""Analyze the light curve with any function, on a GTI-by-GTI base.
Parameters
----------
func : function
Function accepting a :class:`StingrayTimeseries` object as single argument, plus
possible additional keyword arguments, and returning a number or a
tuple - e.g., ``(result, error)`` where both ``result`` and ``error`` are
numbers.
Other parameters
----------------
fraction_step : float
By default, segments do not overlap (``fraction_step`` = 1). If ``fraction_step`` < 1,
then the start points of consecutive segments are ``fraction_step * segment_size``
apart, and consecutive segments overlap. For example, for ``fraction_step`` = 0.5,
the window shifts one half of ``segment_size``)
kwargs : keyword arguments
These additional keyword arguments, if present, they will be passed
to ``func``
Returns
-------
start_times : array
Lower time boundaries of all time segments.
stop_times : array
upper time boundaries of all segments.
result : array of N elements
The result of ``func`` for each segment of the light curve
"""
return self.analyze_segments(func, segment_size=None, fraction_step=fraction_step, **kwargs)


def interpret_times(time: TTime, mjdref: float = 0) -> tuple[npt.ArrayLike, float]:
"""Understand the format of input times, and return seconds from MJDREF
Expand Down

0 comments on commit a4cafa4

Please sign in to comment.