Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Colors (as in color-color or hardness-intensity diagrams) #781

Merged
merged 28 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b615af5
Move functions to analyze chunks to base and make them generic
matteobachetti Dec 7, 2023
b6322fb
Make more precise start and stop times for events
matteobachetti Dec 7, 2023
e9e4575
Calculate colors
matteobachetti Dec 7, 2023
4be5673
Add intensity function
matteobachetti Dec 7, 2023
d321602
Test properly
matteobachetti Dec 7, 2023
4e2c74e
Add changelog
matteobachetti Dec 7, 2023
68d3419
Refactor analyze_chunks to analyze_segments. Allow segment_size=None …
matteobachetti Dec 7, 2023
7e4caa8
Fix doctest
matteobachetti Dec 8, 2023
cf81ed7
Fix doctest
matteobachetti Dec 27, 2023
d280db7
Avoid side effects when creating the mask
matteobachetti Jan 2, 2024
ecbf775
Test properly
matteobachetti Jan 2, 2024
4f0071f
Verify that estimate_segment_size works properly for events
matteobachetti Jan 2, 2024
4e3e94c
Fix docstring
matteobachetti Jan 2, 2024
9dc8f76
Treat invalid data as such
matteobachetti Jan 2, 2024
7188b28
Add analyze_by_gti function; make func and segment_size required argu…
matteobachetti Jan 11, 2024
2c5410e
Update tests
matteobachetti Jan 11, 2024
5365311
Deprecate analyze_lc_chunks
matteobachetti Jan 11, 2024
1207a3f
Fix confusion between min_time_bins and min_total_counts
matteobachetti Jan 11, 2024
6849f69
Fix get_color_evolution docstring
matteobachetti Jan 11, 2024
69be5b9
Make logic more intuitive
matteobachetti Jan 11, 2024
61dc87d
Fix rascal doctest changing formatting
matteobachetti Jan 11, 2024
92bd5b9
Eliminate local imports
matteobachetti Jan 11, 2024
f12330b
Add note about dt in docstring
matteobachetti Jan 11, 2024
46c0633
Fix docstring
matteobachetti Jan 12, 2024
a2d292e
Fix and cleanup imports
matteobachetti Jan 15, 2024
04830ad
Fix and test analyze_by_gti
matteobachetti Jan 15, 2024
76c0822
Code cleanup
matteobachetti Jan 15, 2024
115b9e6
Fix h5py installation condition
matteobachetti Jan 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/changes/781.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Calculate colors and intensities on a segment-by-segment basis in event lists
273 changes: 261 additions & 12 deletions stingray/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import pickle
import warnings
import copy
import os
import logging

import numpy as np
Expand All @@ -33,8 +32,9 @@
gti_border_bins,
get_btis,
merge_gtis,
check_separate,
append_gtis,
get_total_gti_length,
bin_intervals_from_gtis,
time_intervals_from_gtis,
)

from typing import TYPE_CHECKING, Type, TypeVar, Union
Expand Down Expand Up @@ -247,8 +247,8 @@ def pretty_print(self, func_to_apply=None, attrs_to_apply=[], attrs_to_discard=[

This is useful for debugging, and for interactive use.

Optional parameters
-------------------
Other parameters
----------------
func_to_apply : function
A function that modifies the attributes listed in ``attrs_to_apply``.
It must return the modified attributes and a label to be printed.
Expand Down Expand Up @@ -1053,6 +1053,7 @@ def __getitem__(self, index):
raise IndexError("The index must be either an integer or a slice " "object !")

new_ts = type(self)()

for attr in self.meta_attrs():
setattr(new_ts, attr, copy.deepcopy(getattr(self, attr)))

Expand All @@ -1062,6 +1063,17 @@ def __getitem__(self, index):
return new_ts


def _ts_sum(ts):
"""Sum the number of values of a time series object.

If it has a ``counts`` attribute, sum the counts. Otherwise, count the number
of time samples. Masked values are ignored.
"""
if hasattr(ts, "counts"):
return np.sum(ts.counts[ts.mask])
return np.count_nonzero(ts.mask)


class StingrayTimeseries(StingrayObject):
"""Basic class for time series data.

Expand Down Expand Up @@ -1128,7 +1140,10 @@ class StingrayTimeseries(StingrayObject):

dt: float
The time resolution of the measurements. Can be a scalar or an array attribute (useful
for non-evenly sampled data or events from different instruments)
for non-evenly sampled data or events from different instruments). It can also be 0, which
means that the time series is not evenly sampled and the effects of the time resolution are
considered negligible for the analysis. This is sometimes the case for events from
high-energy telescopes.

mjdref : float
The MJD used as a reference for the time array.
Expand Down Expand Up @@ -1216,8 +1231,12 @@ def gti(self, value):

@property
def mask(self):
if self._mask is None:
self._mask = create_gti_mask(self.time, self.gti, dt=self.dt)
if self._mask is not None:
return self._mask
if self._gti is not None:
self._mask = create_gti_mask(self.time, self._gti, dt=self.dt)
else:
self._mask = np.ones_like(self.time, dtype=bool)
return self._mask

@property
Expand Down Expand Up @@ -1291,6 +1310,19 @@ def _validate_and_format(self, value, attr_name, compare_to_attr):
)
return value

@property
def exposure(self):
"""
Return the total exposure of the time series, i.e. the sum of the GTIs.

Returns
-------
total_exposure : float
The total exposure of the time series, in seconds.
"""

return get_total_gti_length(self.gti)

def __eq__(self, other_ts):
return super().__eq__(other_ts)

Expand Down Expand Up @@ -1704,12 +1736,10 @@ def truncate(self, start=0, stop=None, method="index"):
>>> ts = StingrayTimeseries(time, array_attrs={"counts": count}, dt=1)
>>> ts_new = ts.truncate(start=2, stop=8)
>>> assert np.allclose(ts_new.counts, [30, 40, 50, 60, 70, 80])
>>> ts_new.time
array([3, 4, 5, 6, 7, 8])
>>> assert np.allclose(ts_new.time, [3, 4, 5, 6, 7, 8])
>>> # Truncation can also be done by time values
>>> ts_new = ts.truncate(start=6, method='time')
>>> ts_new.time
array([6, 7, 8, 9])
>>> assert np.allclose(ts_new.time, [6, 7, 8, 9])
>>> assert np.allclose(ts_new.counts, [60, 70, 80, 90])
"""

Expand Down Expand Up @@ -2405,6 +2435,225 @@ def plot(
)
return ax

def estimate_segment_size(self, min_counts=None, min_samples=None, even_sampling=None):
"""Estimate a reasonable segment length for segment-by-segment analysis.

The user has to specify a criterion based on a minimum number of counts (if
the time series has a ``counts`` attribute) or a minimum number of time samples.
At least one between ``min_counts`` and ``min_samples`` must be specified.
In the special case of a time series with ``dt=0`` (event list-like, where each time
stamp correspond to a single count), the two definitions are equivalent.

Other Parameters
----------------
min_counts : int
Minimum number of counts for each chunk. Optional (but needs ``min_samples``
if left unspecified). Only makes sense if the series has a ``counts`` attribute and
it is evenly sampled.
min_samples : int
Minimum number of time bins. Optional (but needs ``min_counts`` if left unspecified).
even_sampling : bool
Force the treatment of the data as evenly sampled or not. If None, the data are
considered evenly sampled if ``self.dt`` is larger than zero and the median
separation between subsequent times is within 1% of ``self.dt``.

Returns
-------
segment_size : float
The length of the light curve chunks that satisfies the conditions

Examples
--------
>>> import numpy as np
>>> time = np.arange(150)
>>> counts = np.zeros_like(time) + 3
>>> ts = StingrayTimeseries(time, counts=counts, dt=1)
>>> assert np.isclose(ts.estimate_segment_size(min_counts=10, min_samples=3), 4.0)
>>> assert np.isclose(ts.estimate_segment_size(min_counts=10, min_samples=5), 5.0)
>>> counts[2:4] = 1
>>> ts = StingrayTimeseries(time, counts=counts, dt=1)
>>> assert np.isclose(ts.estimate_segment_size(min_counts=3, min_samples=1), 3.0)
>>> # A slightly more complex example
>>> dt=0.2
>>> time = np.arange(0, 1000, dt)
>>> counts = np.random.poisson(100, size=len(time))
>>> ts = StingrayTimeseries(time, counts=counts, dt=dt)
>>> assert np.isclose(ts.estimate_segment_size(100, 2), 0.4)
>>> min_total_bins = 40
>>> assert np.isclose(ts.estimate_segment_size(100, 40), 8.0)
"""
if min_counts is None and min_samples is None:
raise ValueError("You have to specify at least one of min_counts or min_samples")

mean_data_separation = np.median(np.diff(self.time))

if even_sampling is None:
# The time series is considered evenly sampled if the median separation between
# subsequent times is within 1% of the time resolution
even_sampling = False
if (
self.dt is not None
and self.dt > 0
and np.isclose(mean_data_separation, self.dt, rtol=0.01)
):
even_sampling = True
logging.info(f"Data are {'not' if not even_sampling else ''} evenly sampled")

if min_counts is None:
if even_sampling and hasattr(self, "counts"):
min_counts = 0
else:
min_counts = min_samples

mean_ctrate = _ts_sum(self) / self.exposure

rough_estimate = np.ceil(min_counts / mean_ctrate)

# If data are evenly sampled, even sampling make the segment an integer multiple of dt.
# Otherwise, just use steps of 1 second.
if even_sampling:
step = self.dt
else:
step = 1.0

rough_estimate = np.ceil(min_counts / mean_ctrate / step) * step

segment_size = np.max([rough_estimate, min_samples * step])

keep_searching = True

while keep_searching:
start_times, stop_times, results = self.analyze_segments(_ts_sum, segment_size)
mincounts = np.min(results)
if mincounts >= min_counts:
keep_searching = False
else:
segment_size += step

return segment_size

def analyze_segments(self, func, segment_size, fraction_step=1, **kwargs):
"""Analyze segments of the light curve with any function.

Parameters
----------
func : function
Function accepting a :class:`StingrayTimeseries` object as single argument, plus
possible additional keyword arguments, and returning a number or a
tuple - e.g., ``(result, error)`` where both ``result`` and ``error`` are
numbers.
segment_size : float
Length in seconds of the light curve segments. If None, the full GTIs are considered
instead as segments.

Other parameters
----------------
fraction_step : float
If the step is not a full ``segment_size`` but less (e.g. a moving window),
this indicates the ratio between step step and ``segment_size`` (e.g.
0.5 means that the window shifts of half ``segment_size``)
matteobachetti marked this conversation as resolved.
Show resolved Hide resolved
kwargs : keyword arguments
These additional keyword arguments, if present, they will be passed
to ``func``

Returns
-------
start_times : array
Lower time boundaries of all time segments.
stop_times : array
upper time boundaries of all segments.
result : array of N elements
The result of ``func`` for each segment of the light curve

Examples
--------
>>> import numpy as np
>>> time = np.arange(0, 10, 0.1)
>>> counts = np.zeros_like(time) + 10
>>> ts = StingrayTimeseries(time, counts=counts, dt=0.1)
>>> # Define a function that calculates the mean
>>> mean_func = lambda ts: np.mean(ts.counts)
>>> # Calculate the mean in segments of 5 seconds
>>> start, stop, res = ts.analyze_segments(mean_func, 5)
>>> len(res) == 2
True
>>> np.allclose(res, 10)
True
"""

if segment_size is None:
start_times = self.gti[:, 0]
stop_times = self.gti[:, 1]
start = np.searchsorted(self.time, start_times)
stop = np.searchsorted(self.time, stop_times)
elif self.dt > 0:
matteobachetti marked this conversation as resolved.
Show resolved Hide resolved
start, stop = bin_intervals_from_gtis(
self.gti, segment_size, self.time, fraction_step=fraction_step, dt=self.dt
)
start_times = self.time[start] - 0.5 * self.dt
# Remember that stop is one element above the last element, because
# it's defined to be used in intervals start:stop
stop_times = self.time[stop - 1] + self.dt * 1.5
else:
start_times, stop_times = time_intervals_from_gtis(
self.gti, segment_size, fraction_step=fraction_step
)
start = np.searchsorted(self.time, start_times)
stop = np.searchsorted(self.time, stop_times)

results = []
for i, (st, sp, tst, tsp) in enumerate(zip(start, stop, start_times, stop_times)):
if sp - st <= 1:
warnings.warn(
f"Segment {i} ({tst}--{tsp}) has one data point or less. Skipping it."
)
res = np.nan
else:
lc_filt = self[st:sp]
lc_filt.gti = np.asarray([[tst, tsp]])

res = func(lc_filt, **kwargs)
results.append(res)

results = np.array(results)

if len(results.shape) == 2:
results = [results[:, i] for i in range(results.shape[1])]
return start_times, stop_times, results

def analyze_by_gti(self, func, fraction_step=1, **kwargs):
"""Analyze the light curve with any function, on a GTI-by-GTI base.

Parameters
----------
func : function
Function accepting a :class:`StingrayTimeseries` object as single argument, plus
possible additional keyword arguments, and returning a number or a
tuple - e.g., ``(result, error)`` where both ``result`` and ``error`` are
numbers.

Other parameters
----------------
fraction_step : float
By default, segments do not overlap (``fraction_step`` = 1). If ``fraction_step`` < 1,
then the start points of consecutive segments are ``fraction_step * segment_size``
apart, and consecutive segments overlap. For example, for ``fraction_step`` = 0.5,
the window shifts one half of ``segment_size``)
kwargs : keyword arguments
These additional keyword arguments, if present, they will be passed
to ``func``

Returns
-------
start_times : array
Lower time boundaries of all time segments.
stop_times : array
upper time boundaries of all segments.
result : array of N elements
The result of ``func`` for each segment of the light curve
"""
return self.analyze_segments(func, segment_size=None, fraction_step=fraction_step, **kwargs)


def interpret_times(time: TTime, mjdref: float = 0) -> tuple[npt.ArrayLike, float]:
"""Understand the format of input times, and return seconds from MJDREF
Expand Down