Skip to content

megatop.utils

megatop.utils.Timer

Timer class to measure elapsed time.

A thread-safe singleton class that provides functionality to measure and log elapsed time. Can be used either as a context manager or with explicit start/stop calls.

Examples:

Using as context manager:

>>> with Timer(thread="my_operation"):
>>>     # code to time
>>>     ...

Using with explicit start/stop:

>>> timer = Timer(thread="my_operation")
>>> timer.start()
>>> # code to time
>>> timer.stop()
Source code in src/megatop/utils/timer.py
class Timer(metaclass=_SingletonReInit):
    """Timer class to measure elapsed time.

    A thread-safe singleton class that provides functionality to measure and log elapsed time.
    Can be used either as a context manager or with explicit start/stop calls.

    Examples:
        Using as context manager:

        >>> with Timer(thread="my_operation"):
        >>>     # code to time
        >>>     ...

        Using with explicit start/stop:

        >>> timer = Timer(thread="my_operation")
        >>> timer.start()
        >>> # code to time
        >>> timer.stop()

    """

    __slots__ = [
        "__dict__",
        "_context_latest_thread",
        "_context_manager_threads",
        "_thread_starts",
    ]

    _lock_init: bool = False

    def __init__(self, thread: str = "default") -> None:
        """Initialize a new Timer instance.

        Args:
            thread: Name to identify this timer instance.
        """
        if not self._lock_init:
            # initialize dict and queue only once
            self._thread_starts: dict[str, int] = {}
            self._context_threads: LifoQueue[str] = LifoQueue()
            self._lock_init = True
        self._context_latest_thread = thread

    def __enter__(self) -> "Timer":
        """Enter the Timer context manager and start timing.

        This method is automatically called when entering a context manager block ('with' statement).
        It adds the current thread to the context threads queue and starts timing.

        Returns:
            The Timer instance for use in the context.
        """
        self._context_threads.put(self._context_latest_thread)
        self.start(self._context_latest_thread)
        return self

    def __exit__(self, *_exc_info) -> None:
        """Exit the Timer context manager and stop timing.

        This method is automatically called when exiting a context manager block ('with' statement).
        It stops the timer for the current thread and logs the elapsed time.
        """
        last_context_manager_thread = self._context_threads.get()
        self.stop(last_context_manager_thread)

    def start(self, thread: str = "default") -> None:
        """Start the Timer with given name.

        Should always be followed by a stop() call later in the code.

        Args:
            thread: Name of the timer to start.

        Raises:
            ValueError: when timer with given name already exists.
        """
        if thread in self._thread_starts:
            msg = f"timer {thread!r} already exists"
            raise ValueError(msg)

        self._thread_starts[thread] = time.perf_counter_ns()

    def stop(self, thread: str = "default") -> None:
        """Stop the Timer with given name and log the time elapsed since the start() call.

        Args:
            thread: Name of the timer to stop.

        Raises:
            ValueError: when timer with given name does not exist.
        """
        if thread not in self._thread_starts:
            msg = f"timer {thread!r} does not exist"
            raise ValueError(msg)

        start_time_ns = self._thread_starts.pop(thread)
        elapsed_ns = time.perf_counter_ns() - start_time_ns
        msg = f"Elapsed time [{thread}]: {self._format_nanoseconds(elapsed_ns)}"
        logger.info(msg)

    @classmethod
    def _format_nanoseconds(cls, ns: int) -> str:
        # when possible, bypass divmod for performance
        us, ns = divmod(ns, 1000) if ns > 1000 else (0, ns)
        ms, us = divmod(us, 1000) if us > 1000 else (0, us)
        ss, ms = divmod(ms, 1000) if ms > 1000 else (0, ms)
        mm, ss = divmod(ss, 60) if ss > 60 else (0, ss)
        hh, mm = divmod(mm, 60) if mm > 60 else (0, mm)
        if hh > 0:
            # 1 h 02 m 03 s
            return f"{hh} h {mm:02} m {ss:02} s"
        if mm > 0:
            # 2 m 03 s
            return f"{mm} m {ss:02} s"
        if ss > 0:
            # 3.045 s
            return f"{ss}.{ms:03} s"
        if ms > 0:
            # 45.006 ms
            return f"{ms}.{us:03} ms"
        if us > 0:
            # 6.078 us
            return f"{us}.{ns:03} us"
        # sub-microsecond, just print nanoseconds
        return f"{ns} ns"

__enter__()

Enter the Timer context manager and start timing.

This method is automatically called when entering a context manager block ('with' statement). It adds the current thread to the context threads queue and starts timing.

Returns:

  • Timer

    The Timer instance for use in the context.

Source code in src/megatop/utils/timer.py
def __enter__(self) -> "Timer":
    """Enter the Timer context manager and start timing.

    This method is automatically called when entering a context manager block ('with' statement).
    It adds the current thread to the context threads queue and starts timing.

    Returns:
        The Timer instance for use in the context.
    """
    self._context_threads.put(self._context_latest_thread)
    self.start(self._context_latest_thread)
    return self

__exit__(*_exc_info)

Exit the Timer context manager and stop timing.

This method is automatically called when exiting a context manager block ('with' statement). It stops the timer for the current thread and logs the elapsed time.

Source code in src/megatop/utils/timer.py
def __exit__(self, *_exc_info) -> None:
    """Exit the Timer context manager and stop timing.

    This method is automatically called when exiting a context manager block ('with' statement).
    It stops the timer for the current thread and logs the elapsed time.
    """
    last_context_manager_thread = self._context_threads.get()
    self.stop(last_context_manager_thread)

__init__(thread='default')

Initialize a new Timer instance.

Parameters:

  • thread (str, default: 'default' ) –

    Name to identify this timer instance.

Source code in src/megatop/utils/timer.py
def __init__(self, thread: str = "default") -> None:
    """Initialize a new Timer instance.

    Args:
        thread: Name to identify this timer instance.
    """
    if not self._lock_init:
        # initialize dict and queue only once
        self._thread_starts: dict[str, int] = {}
        self._context_threads: LifoQueue[str] = LifoQueue()
        self._lock_init = True
    self._context_latest_thread = thread

start(thread='default')

Start the Timer with given name.

Should always be followed by a stop() call later in the code.

Parameters:

  • thread (str, default: 'default' ) –

    Name of the timer to start.

Raises:

  • ValueError

    when timer with given name already exists.

Source code in src/megatop/utils/timer.py
def start(self, thread: str = "default") -> None:
    """Start the Timer with given name.

    Should always be followed by a stop() call later in the code.

    Args:
        thread: Name of the timer to start.

    Raises:
        ValueError: when timer with given name already exists.
    """
    if thread in self._thread_starts:
        msg = f"timer {thread!r} already exists"
        raise ValueError(msg)

    self._thread_starts[thread] = time.perf_counter_ns()

stop(thread='default')

Stop the Timer with given name and log the time elapsed since the start() call.

Parameters:

  • thread (str, default: 'default' ) –

    Name of the timer to stop.

Raises:

  • ValueError

    when timer with given name does not exist.

Source code in src/megatop/utils/timer.py
def stop(self, thread: str = "default") -> None:
    """Stop the Timer with given name and log the time elapsed since the start() call.

    Args:
        thread: Name of the timer to stop.

    Raises:
        ValueError: when timer with given name does not exist.
    """
    if thread not in self._thread_starts:
        msg = f"timer {thread!r} does not exist"
        raise ValueError(msg)

    start_time_ns = self._thread_starts.pop(thread)
    elapsed_ns = time.perf_counter_ns() - start_time_ns
    msg = f"Elapsed time [{thread}]: {self._format_nanoseconds(elapsed_ns)}"
    logger.info(msg)

megatop.utils.function_timer(thread=None)

Function decorator to time a function.

Wraps a function with a Timer context manager to measure and log its execution time.

Parameters:

  • thread (str | None, default: None ) –

    Optional thread name to distinguish timer. If None, uses function name with arguments as thread name.

Returns:

  • Callable

    A wrapped function that logs timing information when called.

Source code in src/megatop/utils/timer.py
def function_timer(thread: str | None = None):
    """Function decorator to time a function.

    Wraps a function with a Timer context manager to measure and log its execution time.

    Args:
        thread (str | None): Optional thread name to distinguish timer.
            If None, uses function name with arguments as thread name.

    Returns:
        Callable: A wrapped function that logs timing information when called.
    """

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            thread_name = thread or _get_function_with_arguments_as_thread_name(func, args, kwargs)
            with Timer(thread=thread_name):
                return func(*args, **kwargs)

        return wrapper

    return decorator

megatop.utils.TF_utils

get_alm_ordering(fields='TEB', components=None)

Build an iterator that yields the ordering of the alm components given fields and components (e.g. frequencies). Expected behavior for T,E,B fields and f1, f2 is : almT_f1, almE_f1, almB_f1, almT_f2, almE_f2, almB_f2

Parameters

fields : str The fields to consider. Default is "TEB". components : list The components to consider. Default is None.

Source code in src/megatop/utils/TF_utils.py
def get_alm_ordering(fields="TEB", components=None):
    """
    Build an iterator that yields the ordering of the alm components
    given fields and components (e.g. frequencies).
    Expected behavior for T,E,B fields and f1, f2 is :
        almT_f1, almE_f1, almB_f1, almT_f2, almE_f2, almB_f2

    Parameters
    ----------
    fields : str
        The fields to consider. Default is "TEB".
    components : list
        The components to consider. Default is None.
    """
    if components is not None:
        for c in components:
            for f in fields:
                yield c, f
    else:
        for f in fields:
            yield f

get_alms_from_cls(ps_array, lmax, seed=None)

Generate alms from power spectra.

Parameters

ps_array : np.ndarray A 2D array containing the power spectra with shape [fields, lmax]. With fields ordered with NEW ordering from healpy: TT, EE, BB, TE, EB, TB or TT, EE, BB, TE lmax : int The maximum multipole to consider.

fields : str

The fields to consider. Default is "TEB".

components : list

The components to consider. Default is None.

seed : int The seed for the random number generator. Default is None.

Source code in src/megatop/utils/TF_utils.py
def get_alms_from_cls(
    ps_array,
    lmax,
    #   fields="TEB", components=None,
    seed=None,
):
    """
    Generate alms from power spectra.

    Parameters
    ----------
    ps_array : np.ndarray
        A 2D array containing the power spectra with shape [fields, lmax].
        With fields ordered with NEW ordering from healpy:
        TT, EE, BB, TE, EB, TB or TT, EE, BB, TE
    lmax : int
        The maximum multipole to consider.
    # fields : str
    #     The fields to consider. Default is "TEB".
    # components : list
    #     The components to consider. Default is None.
    seed : int
        The seed for the random number generator. Default is None.
    """
    # ps_matrix = get_ps_matrix_for_sim(ps_dict,
    #                                   lmax,
    #                                   components=components,
    #                                   fields=fields)
    np.random.seed(seed)  # noqa: NPY002
    return hp.synalm(
        ps_array,
        lmax=lmax,
        new=True,
    )

get_map_from_alms(alms, nside)

Get a map from alms and a template.

Parameters

alms : np.ndarray The alms to use. nside : int

Returns

map : np.ndarray Returns the corresponding map.

Source code in src/megatop/utils/TF_utils.py
def get_map_from_alms(alms, nside):
    """
    Get a map from alms and a template.

    Parameters
    ----------
    alms : np.ndarray
        The alms to use.
    nside : int

    Returns
    -------
    map : np.ndarray
        Returns the corresponding map.
    """

    return hu.alm2map(alms, spin=[0, 2], nside=nside)

get_ps_matrix_for_sim(ps_dict, lmax, components=None, fields='TEB')

Build a matrix of power spectra for simulation of correlated alms.

Parameters

ps_dict : dict A dictionary containing the power spectra. lmax : int The maximum multipole to consider. components : list The components to consider. Default is None. fields : str The fields to consider. Default is "TEB".

Returns

ps_matrix : np.ndarray A 3D array containing the power spectra with dimension (nfields * ncomponents, nfields * ncomponents, lmax).

Source code in src/megatop/utils/TF_utils.py
def get_ps_matrix_for_sim(ps_dict, lmax, components=None, fields="TEB"):
    """
    Build a matrix of power spectra for simulation of correlated alms.

    Parameters
    ----------
    ps_dict : dict
        A dictionary containing the power spectra.
    lmax : int
        The maximum multipole to consider.
    components : list
        The components to consider. Default is None.
    fields : str
        The fields to consider. Default is "TEB".

    Returns
    -------
    ps_matrix : np.ndarray
        A 3D array containing the power spectra with dimension
        (nfields * ncomponents, nfields * ncomponents, lmax).
    """
    ordering = list(get_alm_ordering(fields=fields, components=components))

    nalms = len(components) * len(fields) if components is not None else len(fields)
    ps_matrix = np.zeros((nalms, nalms, lmax))

    for i, lab1 in enumerate(ordering):
        for j, lab2 in enumerate(ordering):
            if components is not None:
                c1, f1 = lab1
                c2, f2 = lab2
                ps_matrix[i, j, :] = ps_dict[c1, c2][f1 + f2][:lmax]
            else:
                f1 = lab1
                f2 = lab2
                ps_matrix[i, j, :] = ps_dict[f1 + f2][:lmax]
    return ps_matrix

power_law_cl(ell, amp, delta_ell, power_law_index)

Source code in src/megatop/utils/TF_utils.py
def power_law_cl(ell, amp, delta_ell, power_law_index):
    """ """
    pl_ps = {}
    for spec in ["TT", "TE", "TB", "EE", "EB", "BB"]:
        A = amp[spec] if isinstance(amp, dict) else amp
        #    A = amp[spec]
        # else:
        #    A = amp
        # A is power spectrum amplitude at pivot ell == 1 - delta_ell
        pl_ps[spec] = A / (ell + delta_ell) ** power_law_index
        if spec != spec[::-1]:
            pl_ps[spec[::-1]] = pl_ps[spec]

    return pl_ps

megatop.utils.V3p1calc

Simons Observatory LAT Noise Model

This is release v3.1.2.

Version 3.1.0 brings an update to the LAT T noise model, relative to v3.0. Version 3.1.1 brings the SAT noise model into the same file and framework, for convenience (but the SAT noise is the same as for 3.0).

This code includes one SO SAT noise model:

SOSatV3point1

This code includes two SO LAT noise models:

SOLatV3 - the original V3 noise model, but expanded to include elevation dependence. This should reproduced original V3 results at the reference elevation of 50 degrees.

SOLatV3point1 - like SOLatV3 but with an updated atmospheric 1/f model.

This code is based on the original SO LAT Noise Model released with the SO forecasts paper, but with some functional changes:

  • Object-oriented organization to make it easier to swap in different noise models for the same calculation.
  • Beam deconvolution is optional.
  • Plotting code is not included, and instead found separately in this repo.

Version 3.1.2 fixes the noise elevation scaling for the LAT MF bands. The MF noise elevation scalings were using an average of the feedhorn and lenslet values, but we are only using feedhorns at MF now.

SOLatV3

Bases: SOTel

Source code in src/megatop/utils/V3p1calc.py
class SOLatV3(SOTel):
    has_T = True
    has_P = True
    atm_version = 0
    Tatmos_band_corr = 0.9
    Patmos_band_corr = 0.9

    def __init__(
        self,
        sensitivity_mode=None,
        N_tubes=None,
        survey_years=5.0,
        survey_efficiency=0.2 * 0.85,
        el=None,
    ):
        """Arguments:

        sensitivity_mode (int or string): Should be 'threshold',
          'baseline', or 'goal'.  Alternately you can pass 0, 1, or
          2.

        N_tubes: A list of tuples giving the survey-averaged number
          of each LAT tube in operation.  For example, the default
          is [('LF', 1), ('MF', 4), ('UHF', 2)], populating a total
          of 7 tubes in this LAT.  Fractional tubes are acceptable
          (imagine a tube were swapped out part way through the
          survey).

        survey_years: Total calendar years that the survey operates.

        survey_efficiency: Fraction of calendar time that may be
          used to compute map depth.

        el: Elevation, in degrees.  This affects white noise and red
          noise, through separate scalings.

        """
        # Define the instrument.
        self.bands = np.array([27.0, 39.0, 93.0, 145.0, 225.0, 280.0])
        self.beams = np.array([7.4, 5.1, 2.2, 1.4, 1.0, 0.9])

        # Set defaults for survey area, time, efficiency
        self.survey_years = survey_years
        self.survey_efficiency = survey_efficiency

        # Translate integer to string mode; check it.
        sens_modes = {0: "threshold", 1: "baseline", 2: "goal"}
        if sensitivity_mode is None:
            sensitivity_mode = "baseline"
        elif sensitivity_mode in sens_modes:
            sensitivity_mode = sens_modes.get(sensitivity_mode)

        assert sensitivity_mode in sens_modes.values()  # threshold,baseline,goal? 0,1,2?
        self.sensitivity_mode = sensitivity_mode

        # Sensitivities of each kind of optics tube, in uK rtsec, by
        # band.  0 represents 0 weight, not 0 noise!  Note the weird
        # scalings for MF (4**.5) and UHF (2**.5) are to convert to
        # per-tube values from the reference design sensitivity values
        # that included 4 MF and 2 UHF tubes.
        nar = np.array
        self.tube_configs = {
            "threshold": {
                "LF": nar([61.0, 30.0, 0, 0, 0, 0]),
                "MF": nar([0, 0, 6.5, 8.1, 0, 0]) * 4**0.5,
                "UHF": nar([0, 0, 0, 0, 17.0, 42.0]) * 2**0.5,
            },
            "baseline": {
                "LF": nar([48.0, 24.0, 0, 0, 0, 0]),
                "MF": nar([0, 0, 5.4, 6.7, 0, 0]) * 4**0.5,
                "UHF": nar([0, 0, 0, 0, 15.0, 36.0]) * 2**0.5,
            },
            "goal": {
                "LF": nar([35.0, 18.0, 0, 0, 0, 0]),
                "MF": nar([0, 0, 3.9, 4.2, 0, 0]) * 4**0.5,
                "UHF": nar([0, 0, 0, 0, 10.0, 25.0]) * 2**0.5,
            },
        }[sensitivity_mode]

        # The reference tube config.
        ref_tubes = [("LF", 1), ("MF", 4), ("UHF", 2)]

        if N_tubes is None:
            N_tubes = ref_tubes
        else:
            N_tubes = [(b, x) for (b, n), x in zip(ref_tubes, N_tubes)]

        ##
        ## T noise
        ##

        # Elevation stuff
        self.el_noise_params = SO_el_noise_func_params[sensitivity_mode]
        self.el = el

        ##
        ## P noise
        ##
        # "atmos" just means low-ell here.  We do not claim it
        # originates from atmospheric sources.
        self.Patmos_ell = 700.0 + np.zeros(self.n_bands)
        self.Patmos_alpha = -1.4 + np.zeros(self.n_bands)

        # Do general computations.
        self.precompute(N_tubes)

__init__(sensitivity_mode=None, N_tubes=None, survey_years=5.0, survey_efficiency=0.2 * 0.85, el=None)

Arguments:

sensitivity_mode (int or string): Should be 'threshold', 'baseline', or 'goal'. Alternately you can pass 0, 1, or 2.

A list of tuples giving the survey-averaged number

of each LAT tube in operation. For example, the default is [('LF', 1), ('MF', 4), ('UHF', 2)], populating a total of 7 tubes in this LAT. Fractional tubes are acceptable (imagine a tube were swapped out part way through the survey).

survey_years: Total calendar years that the survey operates.

Fraction of calendar time that may be

used to compute map depth.

Elevation, in degrees. This affects white noise and red

noise, through separate scalings.

Source code in src/megatop/utils/V3p1calc.py
def __init__(
    self,
    sensitivity_mode=None,
    N_tubes=None,
    survey_years=5.0,
    survey_efficiency=0.2 * 0.85,
    el=None,
):
    """Arguments:

    sensitivity_mode (int or string): Should be 'threshold',
      'baseline', or 'goal'.  Alternately you can pass 0, 1, or
      2.

    N_tubes: A list of tuples giving the survey-averaged number
      of each LAT tube in operation.  For example, the default
      is [('LF', 1), ('MF', 4), ('UHF', 2)], populating a total
      of 7 tubes in this LAT.  Fractional tubes are acceptable
      (imagine a tube were swapped out part way through the
      survey).

    survey_years: Total calendar years that the survey operates.

    survey_efficiency: Fraction of calendar time that may be
      used to compute map depth.

    el: Elevation, in degrees.  This affects white noise and red
      noise, through separate scalings.

    """
    # Define the instrument.
    self.bands = np.array([27.0, 39.0, 93.0, 145.0, 225.0, 280.0])
    self.beams = np.array([7.4, 5.1, 2.2, 1.4, 1.0, 0.9])

    # Set defaults for survey area, time, efficiency
    self.survey_years = survey_years
    self.survey_efficiency = survey_efficiency

    # Translate integer to string mode; check it.
    sens_modes = {0: "threshold", 1: "baseline", 2: "goal"}
    if sensitivity_mode is None:
        sensitivity_mode = "baseline"
    elif sensitivity_mode in sens_modes:
        sensitivity_mode = sens_modes.get(sensitivity_mode)

    assert sensitivity_mode in sens_modes.values()  # threshold,baseline,goal? 0,1,2?
    self.sensitivity_mode = sensitivity_mode

    # Sensitivities of each kind of optics tube, in uK rtsec, by
    # band.  0 represents 0 weight, not 0 noise!  Note the weird
    # scalings for MF (4**.5) and UHF (2**.5) are to convert to
    # per-tube values from the reference design sensitivity values
    # that included 4 MF and 2 UHF tubes.
    nar = np.array
    self.tube_configs = {
        "threshold": {
            "LF": nar([61.0, 30.0, 0, 0, 0, 0]),
            "MF": nar([0, 0, 6.5, 8.1, 0, 0]) * 4**0.5,
            "UHF": nar([0, 0, 0, 0, 17.0, 42.0]) * 2**0.5,
        },
        "baseline": {
            "LF": nar([48.0, 24.0, 0, 0, 0, 0]),
            "MF": nar([0, 0, 5.4, 6.7, 0, 0]) * 4**0.5,
            "UHF": nar([0, 0, 0, 0, 15.0, 36.0]) * 2**0.5,
        },
        "goal": {
            "LF": nar([35.0, 18.0, 0, 0, 0, 0]),
            "MF": nar([0, 0, 3.9, 4.2, 0, 0]) * 4**0.5,
            "UHF": nar([0, 0, 0, 0, 10.0, 25.0]) * 2**0.5,
        },
    }[sensitivity_mode]

    # The reference tube config.
    ref_tubes = [("LF", 1), ("MF", 4), ("UHF", 2)]

    if N_tubes is None:
        N_tubes = ref_tubes
    else:
        N_tubes = [(b, x) for (b, n), x in zip(ref_tubes, N_tubes)]

    ##
    ## T noise
    ##

    # Elevation stuff
    self.el_noise_params = SO_el_noise_func_params[sensitivity_mode]
    self.el = el

    ##
    ## P noise
    ##
    # "atmos" just means low-ell here.  We do not claim it
    # originates from atmospheric sources.
    self.Patmos_ell = 700.0 + np.zeros(self.n_bands)
    self.Patmos_alpha = -1.4 + np.zeros(self.n_bands)

    # Do general computations.
    self.precompute(N_tubes)

SOSatV3point1

Bases: SOTel

Source code in src/megatop/utils/V3p1calc.py
class SOSatV3point1(SOTel):
    has_T = False
    has_P = True
    atm_version = 1

    def __init__(
        self,
        sensitivity_mode=None,
        N_tubes=None,
        survey_years=5.0,
        survey_efficiency=0.2 * 0.85,
        el=None,
        one_over_f_mode=0,
    ):
        """Arguments:

        sensitivity_mode (int or string): Should be 'threshold',
          'baseline', or 'goal'.  Alternately you can pass 0, 1, or
          2.

        N_tubes: A list of tuples giving the survey-averaged number
          of each SAT type in operation.  For example, the default
          is [('LF', .4), ('MF', 1.6), ('UHF', 1)]

        survey_years: Total calendar years that the survey operates.

        survey_efficiency: Fraction of calendar time that may be
          used to compute map depth.

        el: Elevation, in degrees.  The present SAT model does not
          support this parameter.

        one_over_f_mode: 0 or 1 to select 'pessimistic' or
          'optimistic' red-noise behavior, respectively.
        """
        # Define the instrument.
        self.bands = np.array([27.0, 39.0, 93.0, 145.0, 225.0, 280.0])
        self.beams = np.array([91.0, 63.0, 30.0, 17.0, 11.0, 9.0])

        # Set defaults for survey area, time, efficiency
        self.survey_years = survey_years
        self.survey_efficiency = survey_efficiency

        # Translate integer to string mode; check it.
        sens_modes = {0: "threshold", 1: "baseline", 2: "goal"}
        if sensitivity_mode is None:
            sensitivity_mode = "baseline"
        elif sensitivity_mode in sens_modes:
            sensitivity_mode = sens_modes.get(sensitivity_mode)

        assert sensitivity_mode in sens_modes.values()  # threshold,baseline,goal? 0,1,2?
        self.sensitivity_mode = sensitivity_mode

        # Sensitivities of each kind of optics tube, in uK rtsec, by
        # band.  0 represents 0 weight, not 0 noise!  Note the weird
        # scaling for MF (2**.5) is to convert to per-tube values from
        # the reference design sensitivity values that included 2 MF
        # instruments.
        nar = np.array
        self.tube_configs = {
            "threshold": {
                "LF": nar([32.0, 17.0, 0, 0, 0, 0]),
                "MF": nar([0, 0, 4.6, 5.5, 0, 0]) * 2**0.5,
                "UHF": nar([0, 0, 0, 0, 11.0, 26.0]),
            },
            "baseline": {
                "LF": nar([21.0, 13.0, 0, 0, 0, 0]),
                "MF": nar([0, 0, 3.4, 4.3, 0, 0]) * 2**0.5,
                "UHF": nar([0, 0, 0, 0, 8.6, 22.0]),
            },
            "goal": {
                "LF": nar([15.0, 10.0, 0, 0, 0, 0]),
                "MF": nar([0, 0, 2.4, 2.7, 0, 0]) * 2**0.5,
                "UHF": nar([0, 0, 0, 0, 5.7, 14.0]),
            },
        }[sensitivity_mode]

        # Save the elevation request.
        assert el is None  # Sorry, no SAT elevation function!
        self.el = el

        # The reference tube config.
        ref_tubes = [("LF", 0.4), ("MF", 1.6), ("UHF", 1)]

        if N_tubes is None:
            N_tubes = ref_tubes
        else:
            N_tubes = [(b, x) for (b, n), x in zip(ref_tubes, N_tubes)]

        assert el is None
        self.el = el

        self.Patmos_alpha = np.array([-2.4, -2.4, -2.5, -3, -3, -3])
        if one_over_f_mode == 0:
            self.Patmos_ell = np.array([30.0, 30, 50, 50, 70, 100])
        elif one_over_f_mode == 1:
            self.Patmos_ell = np.array([15.0, 15, 25, 25, 35, 40])
        else:
            raise ValueError("Invalid one_over_f_mode")

        self.precompute(N_tubes)

__init__(sensitivity_mode=None, N_tubes=None, survey_years=5.0, survey_efficiency=0.2 * 0.85, el=None, one_over_f_mode=0)

Arguments:

sensitivity_mode (int or string): Should be 'threshold', 'baseline', or 'goal'. Alternately you can pass 0, 1, or 2.

A list of tuples giving the survey-averaged number

of each SAT type in operation. For example, the default is [('LF', .4), ('MF', 1.6), ('UHF', 1)]

survey_years: Total calendar years that the survey operates.

Fraction of calendar time that may be

used to compute map depth.

Elevation, in degrees. The present SAT model does not

support this parameter.

0 or 1 to select 'pessimistic' or

'optimistic' red-noise behavior, respectively.

Source code in src/megatop/utils/V3p1calc.py
def __init__(
    self,
    sensitivity_mode=None,
    N_tubes=None,
    survey_years=5.0,
    survey_efficiency=0.2 * 0.85,
    el=None,
    one_over_f_mode=0,
):
    """Arguments:

    sensitivity_mode (int or string): Should be 'threshold',
      'baseline', or 'goal'.  Alternately you can pass 0, 1, or
      2.

    N_tubes: A list of tuples giving the survey-averaged number
      of each SAT type in operation.  For example, the default
      is [('LF', .4), ('MF', 1.6), ('UHF', 1)]

    survey_years: Total calendar years that the survey operates.

    survey_efficiency: Fraction of calendar time that may be
      used to compute map depth.

    el: Elevation, in degrees.  The present SAT model does not
      support this parameter.

    one_over_f_mode: 0 or 1 to select 'pessimistic' or
      'optimistic' red-noise behavior, respectively.
    """
    # Define the instrument.
    self.bands = np.array([27.0, 39.0, 93.0, 145.0, 225.0, 280.0])
    self.beams = np.array([91.0, 63.0, 30.0, 17.0, 11.0, 9.0])

    # Set defaults for survey area, time, efficiency
    self.survey_years = survey_years
    self.survey_efficiency = survey_efficiency

    # Translate integer to string mode; check it.
    sens_modes = {0: "threshold", 1: "baseline", 2: "goal"}
    if sensitivity_mode is None:
        sensitivity_mode = "baseline"
    elif sensitivity_mode in sens_modes:
        sensitivity_mode = sens_modes.get(sensitivity_mode)

    assert sensitivity_mode in sens_modes.values()  # threshold,baseline,goal? 0,1,2?
    self.sensitivity_mode = sensitivity_mode

    # Sensitivities of each kind of optics tube, in uK rtsec, by
    # band.  0 represents 0 weight, not 0 noise!  Note the weird
    # scaling for MF (2**.5) is to convert to per-tube values from
    # the reference design sensitivity values that included 2 MF
    # instruments.
    nar = np.array
    self.tube_configs = {
        "threshold": {
            "LF": nar([32.0, 17.0, 0, 0, 0, 0]),
            "MF": nar([0, 0, 4.6, 5.5, 0, 0]) * 2**0.5,
            "UHF": nar([0, 0, 0, 0, 11.0, 26.0]),
        },
        "baseline": {
            "LF": nar([21.0, 13.0, 0, 0, 0, 0]),
            "MF": nar([0, 0, 3.4, 4.3, 0, 0]) * 2**0.5,
            "UHF": nar([0, 0, 0, 0, 8.6, 22.0]),
        },
        "goal": {
            "LF": nar([15.0, 10.0, 0, 0, 0, 0]),
            "MF": nar([0, 0, 2.4, 2.7, 0, 0]) * 2**0.5,
            "UHF": nar([0, 0, 0, 0, 5.7, 14.0]),
        },
    }[sensitivity_mode]

    # Save the elevation request.
    assert el is None  # Sorry, no SAT elevation function!
    self.el = el

    # The reference tube config.
    ref_tubes = [("LF", 0.4), ("MF", 1.6), ("UHF", 1)]

    if N_tubes is None:
        N_tubes = ref_tubes
    else:
        N_tubes = [(b, x) for (b, n), x in zip(ref_tubes, N_tubes)]

    assert el is None
    self.el = el

    self.Patmos_alpha = np.array([-2.4, -2.4, -2.5, -3, -3, -3])
    if one_over_f_mode == 0:
        self.Patmos_ell = np.array([30.0, 30, 50, 50, 70, 100])
    elif one_over_f_mode == 1:
        self.Patmos_ell = np.array([15.0, 15, 25, 25, 35, 40])
    else:
        raise ValueError("Invalid one_over_f_mode")

    self.precompute(N_tubes)

SOTel

Base class for SO SAT and LAT noise models. The sub-class constructors should set up all the internal variables and then call precompute(). Then the noise levels can be obtained with get_white_noise / get_noise_curves.

Source code in src/megatop/utils/V3p1calc.py
class SOTel:
    """Base class for SO SAT and LAT noise models.  The sub-class
    constructors should set up all the internal variables and then
    call precompute().  Then the noise levels can be obtained with
    get_white_noise / get_noise_curves.

    """

    # Switches that tell get_noise_curves what to return.
    has_T = False
    has_P = False

    # In-tube, cross-band correlation coefficients for red ("atmos").
    Tatmos_band_corr = 0.0
    Patmos_band_corr = 0.0

    # Factor by which to attenuate LAT atmospheric power, given FOV
    # relative to ACT?
    Tatmos_FOV_mod = 0.5

    def __init__(self, *args, **kwargs):
        raise RuntimeError("You should subclass this.")

    @property
    def n_bands(self):
        return len(self.bands) if self.bands is not None else 0

    def get_bands(self):
        return self.bands.copy()

    def get_beams(self):
        return self.beams.copy()

    def precompute(self, N_tubes, N_tels=1):
        white_noise_el_rescale = np.array([1.0] * len(self.bands))
        if self.el is not None:
            el_data = self.el_noise_params
            el_lims = el_data.get("valid")
            if el_lims[0] == "only":
                assert self.el == el_lims[1]  # noise model only valid at one elevation...
            else:
                assert (el_lims[0] <= self.el) and (self.el <= el_lims[1])
                band_idx = np.array([np.argmin(abs(el_data["bands"] - b)) for b in self.bands])
                assert np.all(abs(np.array(el_data["bands"])[band_idx] - self.bands) < 5)
                coeffs = el_data["coeffs"]
                white_noise_el_rescale = np.array(
                    [
                        el_noise_func(coeffs[i], self.el) / el_noise_func(coeffs[i], 50.0)
                        for i in band_idx
                    ]
                )

        # Accumulate total white noise level and atmospheric
        # covariance matrix for this configuration.

        band_weights = np.zeros(self.n_bands)
        for tube_name, tube_count in N_tubes:
            tube_noise = self.tube_configs[tube_name] * white_noise_el_rescale
            s = tube_noise != 0
            band_weights[s] += tube_count * N_tels * tube_noise[s] ** -2

        self.band_sens = np.zeros(self.n_bands) + 1e9
        s = band_weights > 0
        self.band_sens[s] = band_weights[s] ** -0.5

        # Special for atmospheric noise model.
        C, alpha = get_atmosphere_params(self.bands, self.atm_version, el=self.el)
        self.Tatmos_C = C * self.Tatmos_FOV_mod
        self.Tatmos_alpha = alpha
        self.Tatmos_ell = 1000.0 + np.zeros(self.n_bands)

        # Compute covariant weight matrix (atmosphere parameters).
        cov_weight = np.zeros((self.n_bands, self.n_bands))
        pcov_weight = np.zeros((self.n_bands, self.n_bands))
        for tube_name, tube_count in N_tubes:
            # Get the list of coupled bands; e.g. [1,2] for MF.
            nonz = self.tube_configs[tube_name].nonzero()[0]
            for i in nonz:
                for j in nonz:
                    assert cov_weight[i, j] == 0.0  # Can't do overlapping
                    # tubes without weights.
                    T_corr = {True: 1.0, False: self.Tatmos_band_corr}[i == j]
                    cov_weight[i, j] += (
                        tube_count
                        * N_tels
                        / (T_corr * (self.Tatmos_C[i] * self.Tatmos_C[j]) ** 0.5)
                    )
                    P_corr = {True: 1.0, False: self.Patmos_band_corr}[i == j]
                    pcov_weight[i, j] = P_corr

        # Reciprocate non-zero elements.
        s = cov_weight != 0
        self.Tatmos_cov = np.diag([1e9] * self.n_bands)
        self.Tatmos_cov[s] = 1.0 / cov_weight[s]

        # Polarization is simpler...
        self.Patmos_cov = pcov_weight

    def get_survey_time(self):
        """Returns the effective survey time (survey_years * efficiency), in
        seconds.

        """
        t = self.survey_years * 365.25 * 86400.0  ## convert years to seconds
        return t * self.survey_efficiency

    def get_survey_spread(self, f_sky, units="arcmin2"):
        """Returns the dilution factor that converts array instrument
        sensitivity (in units uK^2 sec) to map white noise level
        (units uK^2 arcmin^2).  Units are arcmin^2 / second (unless
        units='sr' is set)

        """
        # Factor that converts uK^2 sec -> uK^2 arcmin^2.
        A = f_sky * 4 * np.pi
        if units == "arcmin2":
            A *= (60 * 180 / np.pi) ** 2
        elif units != "sr":
            raise ValueError(f"Unknown units '{units}'.")
        return A / self.get_survey_time()

    def get_white_noise(self, f_sky, units="arcmin2"):
        """Returns the survey white noise level, in temperature, for each
        band, in uK^2 arcmin2, for the specified f_sky (0 < f_sky <= 1).

        Pass units='sr' to get uK^2 steradian units.

        """
        return self.band_sens**2 * self.get_survey_spread(f_sky, units=units)

    def get_noise_curves(
        self, f_sky, ell_max, delta_ell, deconv_beam=True, full_covar=False, rolloff_ell=None
    ):
        """Get the noise curves N(ell) for all bands.

        The ell vector is determined by ell_max and delta_ell: ell =
        range(2, ell_max, delta_ell).

        The f_sky is the area of the survey in units of a full sky; (0
        < f_sky <= 1).

        Returns (ell, T_noise, P_noise).  If a model does not describe
        one of these spectra (has_T == False, or has_P == False), the
        corresponding spectrum will return as None.  Otherwise, the
        shape of T_noise and P_noise will be (n_bands, n_ell) if
        full_covar is False, and (n_bands, n_bands, n_ell) if
        full_covar is True.

        If deconv_beam is True, then the beam transfer functions are
        deconvolved, to give the effective noise level relative to a
        signal at each ell.

        If rolloff_ell is specified, a transfer function is applied to
        reduce red noise below this cutoff.  The transfer function at
        ell > rolloff_ell will be 1.  See code if you care about what
        happens below that.

        """
        ell = np.arange(2, ell_max, delta_ell)
        W = self.band_sens**2

        # Get full covariance; indices are [band,band,ell]
        ellf = (ell / self.Tatmos_ell[:, None]) ** (self.Tatmos_alpha[:, None])
        T_noise = self.Tatmos_cov[:, :, None] * (ellf[:, None, :] * ellf[None, :, :]) ** 0.5

        # P noise is tied directly to the white noise level.
        P_low_noise = (2 * W[:, None]) * (ell / self.Patmos_ell[:, None]) ** self.Patmos_alpha[
            :, None
        ]
        P_noise = (
            self.Patmos_cov[:, :, None] * (P_low_noise[:, None, :] * P_low_noise[None, :, :]) ** 0.5
        )

        # Add in white noise on the diagonal.
        Map_white_noise_levels = (
            np.sqrt(2) * self.band_sens * np.sqrt(self.get_survey_spread(f_sky))
        )
        for i in range(len(W)):
            T_noise[i, i] += W[i]
            P_noise[i, i] += W[i] * 2

        if rolloff_ell is not None:
            # Use the same simple rolloff for all bands, T & P.
            gain = rolloff(ell, rolloff_ell)
            T_noise *= gain
            P_noise *= gain

        # Deconvolve beams.
        if deconv_beam:
            beam_sig_rad = self.get_beams() * np.pi / 180 / 60 / (8.0 * np.log(2)) ** 0.5
            beams = np.exp(-0.5 * ell * (ell + 1) * beam_sig_rad[:, None] ** 2)
            T_noise /= beams[:, None, :] * beams[None, :, :]
            P_noise /= beams[:, None, :] * beams[None, :, :]

        # Diagonal only?
        if not full_covar:
            ii = range(self.n_bands)
            T_noise = T_noise[ii, ii]
            P_noise = P_noise[ii, ii]

        T_out, P_out = None, None
        if self.has_T:
            T_out = T_noise * self.get_survey_spread(f_sky, units="sr")
        if self.has_P:
            P_out = P_noise * self.get_survey_spread(f_sky, units="sr")

        return (ell, T_out, P_out, Map_white_noise_levels)

get_noise_curves(f_sky, ell_max, delta_ell, deconv_beam=True, full_covar=False, rolloff_ell=None)

Get the noise curves N(ell) for all bands.

The ell vector is determined by ell_max and delta_ell: ell = range(2, ell_max, delta_ell).

The f_sky is the area of the survey in units of a full sky; (0 < f_sky <= 1).

Returns (ell, T_noise, P_noise). If a model does not describe one of these spectra (has_T == False, or has_P == False), the corresponding spectrum will return as None. Otherwise, the shape of T_noise and P_noise will be (n_bands, n_ell) if full_covar is False, and (n_bands, n_bands, n_ell) if full_covar is True.

If deconv_beam is True, then the beam transfer functions are deconvolved, to give the effective noise level relative to a signal at each ell.

If rolloff_ell is specified, a transfer function is applied to reduce red noise below this cutoff. The transfer function at ell > rolloff_ell will be 1. See code if you care about what happens below that.

Source code in src/megatop/utils/V3p1calc.py
def get_noise_curves(
    self, f_sky, ell_max, delta_ell, deconv_beam=True, full_covar=False, rolloff_ell=None
):
    """Get the noise curves N(ell) for all bands.

    The ell vector is determined by ell_max and delta_ell: ell =
    range(2, ell_max, delta_ell).

    The f_sky is the area of the survey in units of a full sky; (0
    < f_sky <= 1).

    Returns (ell, T_noise, P_noise).  If a model does not describe
    one of these spectra (has_T == False, or has_P == False), the
    corresponding spectrum will return as None.  Otherwise, the
    shape of T_noise and P_noise will be (n_bands, n_ell) if
    full_covar is False, and (n_bands, n_bands, n_ell) if
    full_covar is True.

    If deconv_beam is True, then the beam transfer functions are
    deconvolved, to give the effective noise level relative to a
    signal at each ell.

    If rolloff_ell is specified, a transfer function is applied to
    reduce red noise below this cutoff.  The transfer function at
    ell > rolloff_ell will be 1.  See code if you care about what
    happens below that.

    """
    ell = np.arange(2, ell_max, delta_ell)
    W = self.band_sens**2

    # Get full covariance; indices are [band,band,ell]
    ellf = (ell / self.Tatmos_ell[:, None]) ** (self.Tatmos_alpha[:, None])
    T_noise = self.Tatmos_cov[:, :, None] * (ellf[:, None, :] * ellf[None, :, :]) ** 0.5

    # P noise is tied directly to the white noise level.
    P_low_noise = (2 * W[:, None]) * (ell / self.Patmos_ell[:, None]) ** self.Patmos_alpha[
        :, None
    ]
    P_noise = (
        self.Patmos_cov[:, :, None] * (P_low_noise[:, None, :] * P_low_noise[None, :, :]) ** 0.5
    )

    # Add in white noise on the diagonal.
    Map_white_noise_levels = (
        np.sqrt(2) * self.band_sens * np.sqrt(self.get_survey_spread(f_sky))
    )
    for i in range(len(W)):
        T_noise[i, i] += W[i]
        P_noise[i, i] += W[i] * 2

    if rolloff_ell is not None:
        # Use the same simple rolloff for all bands, T & P.
        gain = rolloff(ell, rolloff_ell)
        T_noise *= gain
        P_noise *= gain

    # Deconvolve beams.
    if deconv_beam:
        beam_sig_rad = self.get_beams() * np.pi / 180 / 60 / (8.0 * np.log(2)) ** 0.5
        beams = np.exp(-0.5 * ell * (ell + 1) * beam_sig_rad[:, None] ** 2)
        T_noise /= beams[:, None, :] * beams[None, :, :]
        P_noise /= beams[:, None, :] * beams[None, :, :]

    # Diagonal only?
    if not full_covar:
        ii = range(self.n_bands)
        T_noise = T_noise[ii, ii]
        P_noise = P_noise[ii, ii]

    T_out, P_out = None, None
    if self.has_T:
        T_out = T_noise * self.get_survey_spread(f_sky, units="sr")
    if self.has_P:
        P_out = P_noise * self.get_survey_spread(f_sky, units="sr")

    return (ell, T_out, P_out, Map_white_noise_levels)

get_survey_spread(f_sky, units='arcmin2')

Returns the dilution factor that converts array instrument sensitivity (in units uK^2 sec) to map white noise level (units uK^2 arcmin^2). Units are arcmin^2 / second (unless units='sr' is set)

Source code in src/megatop/utils/V3p1calc.py
def get_survey_spread(self, f_sky, units="arcmin2"):
    """Returns the dilution factor that converts array instrument
    sensitivity (in units uK^2 sec) to map white noise level
    (units uK^2 arcmin^2).  Units are arcmin^2 / second (unless
    units='sr' is set)

    """
    # Factor that converts uK^2 sec -> uK^2 arcmin^2.
    A = f_sky * 4 * np.pi
    if units == "arcmin2":
        A *= (60 * 180 / np.pi) ** 2
    elif units != "sr":
        raise ValueError(f"Unknown units '{units}'.")
    return A / self.get_survey_time()

get_survey_time()

Returns the effective survey time (survey_years * efficiency), in seconds.

Source code in src/megatop/utils/V3p1calc.py
def get_survey_time(self):
    """Returns the effective survey time (survey_years * efficiency), in
    seconds.

    """
    t = self.survey_years * 365.25 * 86400.0  ## convert years to seconds
    return t * self.survey_efficiency

get_white_noise(f_sky, units='arcmin2')

Returns the survey white noise level, in temperature, for each band, in uK^2 arcmin2, for the specified f_sky (0 < f_sky <= 1).

Pass units='sr' to get uK^2 steradian units.

Source code in src/megatop/utils/V3p1calc.py
def get_white_noise(self, f_sky, units="arcmin2"):
    """Returns the survey white noise level, in temperature, for each
    band, in uK^2 arcmin2, for the specified f_sky (0 < f_sky <= 1).

    Pass units='sr' to get uK^2 steradian units.

    """
    return self.band_sens**2 * self.get_survey_spread(f_sky, units=units)

get_atmosphere_params(freqs, version=1, el=None)

Returns atmospheric noise power parameters, for an ACTPol optics tube.

Arguments:

freqs: array of frequencies (in GHz) to process. This function only handles the standard SO frequency values. version: version of the C factors to return. See below. el: elevation angle, in degrees. Default is 50.

Returns (C_array, alpha_array), where each array has the same shape as freqs. The red noise contribution to each band is then:

  N_red(ell) =   C * (ell/1000)^alpha

with units of [uK^2 sec].

The model is naturally calibrated for boresight elevation el = 50 degrees. A simple rescaling (proportional to csc(el)) is applied for other values of el.

In the present model, alpha=-3.5 always.

version=0: This atmospheric model was used in SO V3 forecasts but contains an error.

version=1: This atmospheric model is better than version=0, in that at least one error has been corrected. The relative atmospheric powers have been measured in AT model, and then calibrated to ACT. Low frequency results are inflated somewhat because ACT sees more power at 90 GHz than predicted by this modeling.

Source code in src/megatop/utils/V3p1calc.py
def get_atmosphere_params(freqs, version=1, el=None):
    """Returns atmospheric noise power parameters, for an ACTPol optics
    tube.

    Arguments:

      freqs: array of frequencies (in GHz) to process.  This function only
        handles the standard SO frequency values.
      version: version of the C factors to return.  See below.
      el: elevation angle, in degrees.  Default is 50.

    Returns (C_array, alpha_array), where each array has the same
    shape as freqs.  The red noise contribution to each band is then:

          N_red(ell) =   C * (ell/1000)^alpha

    with units of [uK^2 sec].

    The model is naturally calibrated for boresight elevation el = 50
    degrees.  A simple rescaling (proportional to csc(el)) is applied
    for other values of el.

    In the present model, alpha=-3.5 always.

    version=0: This atmospheric model was used in SO V3 forecasts but
    contains an error.

    version=1: This atmospheric model is better than version=0, in
    that at least one error has been corrected.  The relative
    atmospheric powers have been measured in AT model, and then
    calibrated to ACT.  Low frequency results are inflated somewhat
    because ACT sees more power at 90 GHz than predicted by this
    modeling.
    """
    # This el_correction is quite naive; atmosphere may be less
    # structured (i.e. smoothed out) at lower elevation because of the
    # increased thickness.
    if el is None:
        el = 50.0
    el_correction = np.sin(50.0 * np.pi / 180) / np.sin(el * np.pi / 180)
    if version == 0:
        data_bands = np.array([27.0, 39.0, 93.0, 145.0, 225.0, 280.0])
        data_C = np.array([200.0, 77.0, 1800.0, 12000.0, 68000.0, 124000.0])
        data = {}
        for b, C in zip(data_bands, data_C):
            data[b] = C
        # Jam in a fake value for 20 GHz.
        data[20.0] = 200.0
    elif version == 1:
        data_bands = np.array([20.0, 27.0, 39.0, 93.0, 145.0, 225.0, 280.0, 285, 350, 410])
        data_C = np.array(
            [
                3.45565594e02 * 1.4,
                6.00734484e01 * 1.4,
                1.45291936e01 * 1.4,
                6.81426710e02 * 1.4,
                1.20000000e04,
                2.66015359e05,
                1.56040514e06,
                1e9,
                1e9,
                1e9,
            ]
        )
        data = {}
        for b, C in zip(data_bands, data_C):
            data[b] = C

    return (np.array([data[f] * el_correction**2 for f in freqs]), np.array([-3.5 for f in freqs]))

rolloff(ell, ell_off=None, alpha=-4, patience=2.0)

Get a transfer function T(ell) to roll off red noise at ell < ell_off. ell should be an ndarray. Above the cut-off, T(ell>=ell_off) = 1. For T(ell 1) is the maximum allowed value of:

               T(ell) * ell**alpha
         -----------------------------
          T(ell_off) * ell_off**alpha

I.e, supposing you were fighting an ellalpha spectrum, the roll-off in T(ell) will be applied aggressively enough that T(ell)*ellalpha does not rise above "patience" times its value at ell_off.

Source code in src/megatop/utils/V3p1calc.py
def rolloff(ell, ell_off=None, alpha=-4, patience=2.0):
    """Get a transfer function T(ell) to roll off red noise at ell <
    ell_off.  ell should be an ndarray.  Above the cut-off,
    T(ell>=ell_off) = 1.  For T(ell<ell_off) will roll off smoothly,
    approaching T(ell) \propto ell^-alpha.  The speed at which the
    transition to the full power-law occurs is set by "patience";
    patience (> 1) is the maximum allowed value of:

                       T(ell) * ell**alpha
                 -----------------------------
                  T(ell_off) * ell_off**alpha

    I.e, supposing you were fighting an ell**alpha spectrum, the
    roll-off in T(ell) will be applied aggressively enough that
    T(ell)*ell**alpha does not rise above "patience" times its value
    at ell_off.

    """
    if ell_off is None or ell_off <= 0:
        return np.ones(ell.shape)
    L2 = ell_off
    L1 = L2 * patience ** (2.0 / alpha)
    x = -np.log(ell / L2) / np.log(L1 / L2)
    beta = alpha * np.log(L1 / L2)
    output = x * 0
    output[x < 0] = (-x * x)[x < 0]
    output[x < -1] = (1 + 2 * x)[x < -1]
    return np.exp(output * beta)

megatop.utils.binning

create_binning(lmax, delta_ell, uniform_start=None)

Create multipole bin edges for NaMaster.

All edges are inclusive: bin i covers ell in [bin_low[i], bin_high[i]]. The first bin always starts at ell=2 and the last bin always ends at lmax. When saving and reloading via NmtBin.from_edges, pass bin_high + 1 because namaster treats its upper edge argument as exclusive.

Parameters:

  • lmax (int) –

    Maximum multipole, inclusive. The last bin always ends at lmax.

  • delta_ell (int) –

    Width of the uniform bins.

  • uniform_start (int | None, default: None ) –

    If provided, the first bin spans [2, uniform_start - 1] and uniform bins of width delta_ell start at uniform_start. If None, all bins have width delta_ell aligned to multiples of delta_ell from 0, with the first bin forced to start at ell=2.

Returns:

  • bin_low ( ndarray ) –

    Lower edge of each bin (inclusive).

  • bin_high ( ndarray ) –

    Upper edge of each bin (inclusive).

  • bin_center ( ndarray ) –

    Arithmetic center of each bin.

Source code in src/megatop/utils/binning.py
def create_binning(
    lmax: int, delta_ell: int, uniform_start: int | None = None
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Create multipole bin edges for NaMaster.

    All edges are inclusive: bin i covers ell in [bin_low[i], bin_high[i]].
    The first bin always starts at ell=2 and the last bin always ends at lmax.
    When saving and reloading via NmtBin.from_edges, pass bin_high + 1
    because namaster treats its upper edge argument as exclusive.

    Args:
        lmax: Maximum multipole, inclusive. The last bin always ends at lmax.
        delta_ell: Width of the uniform bins.
        uniform_start: If provided, the first bin spans [2, uniform_start - 1]
            and uniform bins of width delta_ell start at uniform_start.
            If None, all bins have width delta_ell aligned to multiples of
            delta_ell from 0, with the first bin forced to start at ell=2.

    Returns:
        bin_low: Lower edge of each bin (inclusive).
        bin_high: Upper edge of each bin (inclusive).
        bin_center: Arithmetic center of each bin.
    """
    if uniform_start is not None:
        # Uniform bins of width delta_ell starting at uniform_start
        regular_low = np.arange(uniform_start, lmax, delta_ell)
        regular_high = regular_low + delta_ell - 1
        # Prepend the wide first bin [2, uniform_start - 1]
        bin_low = np.concatenate(([2], regular_low))
        bin_high = np.concatenate(([uniform_start - 1], regular_high))
    else:
        # Uniform grid aligned to multiples of delta_ell from 0
        bin_low = np.arange(0, lmax, delta_ell)
        bin_high = bin_low + delta_ell - 1
        # Override first bin to start at ell=2 without shifting the whole grid.
        # Must be done after computing bin_high to avoid overlap with bin 1.
        bin_low[0] = 2

    bin_high[-1] = lmax  # clamp last bin to lmax (inclusive)
    bin_center = (bin_low + bin_high) / 2

    return bin_low, bin_high, bin_center

load_nmt_binning(manager)

Load the binning from the file.

Source code in src/megatop/utils/binning.py
def load_nmt_binning(manager: DataManager):
    """Load the binning from the file."""
    binning_info = np.load(manager.path_to_binning, allow_pickle=True)
    # +1 because NaMaster expects upper bounds to be exclusive
    return nmt.NmtBin.from_edges(binning_info["bin_low"], binning_info["bin_high"] + 1)

megatop.utils.compsep

set_alm_tozero_above_lmax(alm, lmax)

Truncate the spherical harmonic coefficients (alm) to a new lmax. Parameters


alm : array_like Input spherical harmonic coefficients, shape (..., N), where N = (lmax_old + 1) * (lmax_old + 2) / 2. lmax_new : int New maximum multipole to truncate to. Returns


alm_new : array_like Truncated spherical harmonic coefficients, shape (..., M), where M = (lmax_new + 1) * (lmax_new + 2) / 2.

Source code in src/megatop/utils/compsep.py
def set_alm_tozero_above_lmax(alm, lmax):
    """
    Truncate the spherical harmonic coefficients (alm) to a new lmax.
    Parameters
    ----------
    alm : array_like
        Input spherical harmonic coefficients, shape (..., N), where N = (lmax_old + 1) * (lmax_old + 2) / 2.
    lmax_new : int
        New maximum multipole to truncate to.
    Returns
    -------
    alm_new : array_like
        Truncated spherical harmonic coefficients, shape (..., M), where M = (lmax_new + 1) * (lmax_new + 2) / 2.
    """
    lmax_input = hp.Alm.getlmax(alm.shape[-1])
    assert lmax_input >= lmax, "lmax must be <= lmax_input"

    for ell in range(lmax + 1, lmax_input + 1):
        for m in range(ell + 1):
            index = hp.Alm.getidx(lmax_input, ell, m)
            alm[..., index] = 0 + 0j

    return alm

set_alm_tozero_below_lmin(alm, lmin)

Truncate the spherical harmonic coefficients (alm) to a new lmax. Parameters


alm : array_like Input spherical harmonic coefficients, shape (..., N), where N = (lmax_old + 1) * (lmax_old + 2) / 2. lmax_new : int New maximum multipole to truncate to. Returns


alm_new : array_like Truncated spherical harmonic coefficients, shape (..., M), where M = (lmax_new + 1) * (lmax_new + 2) / 2.

Source code in src/megatop/utils/compsep.py
def set_alm_tozero_below_lmin(alm, lmin):
    """
    Truncate the spherical harmonic coefficients (alm) to a new lmax.
    Parameters
    ----------
    alm : array_like
        Input spherical harmonic coefficients, shape (..., N), where N = (lmax_old + 1) * (lmax_old + 2) / 2.
    lmax_new : int
        New maximum multipole to truncate to.
    Returns
    -------
    alm_new : array_like
        Truncated spherical harmonic coefficients, shape (..., M), where M = (lmax_new + 1) * (lmax_new + 2) / 2.
    """
    lmax = hp.Alm.getlmax(alm.shape[-1])
    assert lmin <= lmax, "lmin must be < lmax"

    for ell in range(lmin):
        for m in range(ell + 1):
            index = hp.Alm.getidx(lmax, ell, m)
            alm[..., index] = 0 + 0j

    return alm

megatop.utils.harmonic

Pixelization-agnostic spherical harmonic transforms.

Dispatches between HEALPix (via ducc0) and CAR (via pixell.curvedsky) based on the input map type. Shapes are (..., npix) for HEALPix and (..., ny, nx) for CAR. All public functions are keyword-only.

Environment variables

MEGATOP_SHT_NTHREADS: Default ducc0 thread count (HEALPix path). Read once at import; 0 (default) lets ducc0 choose. Overridden per-call via nthreads (0 or None falls back to this default).

alm2map(alms, *, spin=0, nside=None, shape=None, wcs=None, out=None, lmax=None, mmax=None, nthreads=None)

Inverse SHT. Target pixelization set by kwargs:

  • out (enmap) → CAR in-place.
  • shape + wcs → CAR, new enmap.
  • nside → HEALPix.

Parameters:

  • alms

    Spherical harmonic coefficients.

  • spin

    Spin weight: 0 (T), 2 (Q/U), or a list like [0, 2] for mixed-spin fields. HEALPix splits the alm axis by spin group; CAR passes to pixell.

    TEB → TQU requires spin=[0, 2]. With a (3, nalm) input the output is (3, npix) with rows [T, Q, U]. Batch dimensions are supported: (batch, 3, nalm)(batch, 3, npix).

  • nside

    HEALPix resolution. Mutually exclusive with CAR options.

  • shape

    CAR pixel shape. Used with wcs.

  • wcs

    CAR world coordinate system. Used with shape.

  • out

    Pre-allocated output written in-place and returned. enmap for CAR, ndarray for HEALPix. Mutually exclusive with shape/wcs.

  • lmax

    Bandlimit (HEALPix only — CAR infers it from alms).

  • mmax

    Azimuthal bandlimit (HEALPix only). Defaults to lmax.

  • nthreads

    ducc0 thread count (HEALPix). None uses MEGATOP_SHT_NTHREADS.

Returns:

  • np.ndarray (..., npix) for HEALPix or pixell.enmap.ndmap

  • (..., ny, nx) for CAR. Returns out when provided.

Raises:

  • ValueError

    Conflicting or missing pixelization targets.

Source code in src/megatop/utils/harmonic.py
def alm2map(
    alms,
    *,
    spin=0,
    nside=None,
    shape=None,
    wcs=None,
    out=None,
    lmax=None,
    mmax=None,
    nthreads=None,
):
    """Inverse SHT. Target pixelization set by kwargs:

    - ``out`` (enmap) → CAR in-place.
    - ``shape`` + ``wcs`` → CAR, new enmap.
    - ``nside`` → HEALPix.

    Args:
        alms: Spherical harmonic coefficients.
        spin: Spin weight: ``0`` (T), ``2`` (Q/U), or a list like ``[0, 2]``
            for mixed-spin fields. HEALPix splits the alm axis by spin group;
            CAR passes to pixell.

            TEB → TQU requires ``spin=[0, 2]``. With a ``(3, nalm)`` input the
            output is ``(3, npix)`` with rows ``[T, Q, U]``.
            Batch dimensions are supported: ``(batch, 3, nalm)`` → ``(batch, 3, npix)``.
        nside: HEALPix resolution. Mutually exclusive with CAR options.
        shape: CAR pixel shape. Used with ``wcs``.
        wcs: CAR world coordinate system. Used with ``shape``.
        out: Pre-allocated output written in-place and returned. enmap for CAR,
            ndarray for HEALPix. Mutually exclusive with ``shape``/``wcs``.
        lmax: Bandlimit (HEALPix only — CAR infers it from ``alms``).
        mmax: Azimuthal bandlimit (HEALPix only). Defaults to ``lmax``.
        nthreads: ducc0 thread count (HEALPix). ``None`` uses ``MEGATOP_SHT_NTHREADS``.

    Returns:
        ``np.ndarray`` ``(..., npix)`` for HEALPix or ``pixell.enmap.ndmap``
        ``(..., ny, nx)`` for CAR. Returns ``out`` when provided.

    Raises:
        ValueError: Conflicting or missing pixelization targets.
    """
    if out is not None and (shape is not None or wcs is not None):
        raise ValueError("Provide either out or shape+wcs, not both.")
    car_target = _is_car(out) or (shape is not None and wcs is not None)
    if car_target and nside is not None:
        raise ValueError("Specify either CAR (out / shape+wcs) or HEALPix (nside), not both.")
    if car_target:
        if out is None:
            full_shape = (*alms.shape[:-1], *shape[-2:])
            out = enmap.zeros(full_shape, wcs=wcs, dtype=alms.real.dtype)
        return curvedsky.alm2map(alms, map=out, spin=spin, copy=False)
    if nside is None:
        raise ValueError("Provide nside for HEALPix or out / shape+wcs for CAR.")
    kw = {"nside": nside, "lmax": lmax, "mmax": mmax, "nthreads": nthreads}
    if isinstance(spin, (list, tuple)):
        inplace = out is not None
        maps_out = [] if not inplace else None
        out_idx = idx = 0
        for s in spin:
            nmaps = 1 if s == 0 else 2
            # _ducc_synthesis uses ([ntrans,] nmaps, nalm) convention directly
            out_seg = out[..., out_idx : out_idx + nmaps, :] if inplace else None
            result = _ducc_synthesis(alms[..., idx : idx + nmaps, :], spin=s, out=out_seg, **kw)
            if not inplace:
                maps_out.append(result)
            idx += nmaps
            out_idx += nmaps
        return out if inplace else np.concatenate(maps_out, axis=-2)
    return _alm2map_healpix(alms, spin=spin, out=out, **kw)

almxfl(alms, fl, *, mmax=None, inplace=False)

Multiply each a_lm by f_l. Pixel-agnostic.

Parameters:

  • alms

    1-D alm array or 2-D (ncomp, nalm).

  • fl

    Filter of length lmax + 1.

  • mmax

    Azimuthal bandlimit. Defaults to lmax (full triangular layout).

  • inplace

    Modify alms in place.

Returns:

  • Filtered alms, same shape as input.

Source code in src/megatop/utils/harmonic.py
def almxfl(alms, fl, *, mmax=None, inplace=False):
    """Multiply each ``a_lm`` by ``f_l``. Pixel-agnostic.

    Args:
        alms: 1-D alm array or 2-D ``(ncomp, nalm)``.
        fl: Filter of length ``lmax + 1``.
        mmax: Azimuthal bandlimit. Defaults to ``lmax`` (full triangular layout).
        inplace: Modify ``alms`` in place.

    Returns:
        Filtered alms, same shape as input.
    """
    if alms.ndim == 1:
        return hp.almxfl(alms, fl, mmax=mmax, inplace=inplace)
    out = alms if inplace else alms.copy()
    for i in range(out.shape[0]):
        hp.almxfl(out[i], fl, mmax=mmax, inplace=True)
    return out

anafast(maps, maps2=None, *, lmax=None, mmax=None, niter=3, pol=True, nthreads=None)

Compute auto or cross power spectrum.

Routes through map2alm then hp.alm2cl. For TQU input (pol=True), spin-0/spin-2 decomposition yields TEB alms; output is six spectra TT EE BB TE EB TB in diagonal order, consistent with synfast(new=True).

Parameters:

  • maps

    Input map (HEALPix ndarray or CAR enmap).

  • maps2

    Second map for cross-spectrum.

  • lmax

    Bandlimit.

  • mmax

    Azimuthal bandlimit (HEALPix only — pixell does not expose it).

  • niter

    Jacobi iterations for the forward SHT.

  • pol

    If True and the map has a Stokes axis of length 3, decompose into TEB and return all six spectra. Raises ValueError if the Stokes axis exists but has length ≠ 3.

  • nthreads

    ducc0 thread count (HEALPix only).

Returns:

  • Power spectrum array. For TQU input: TT EE BB TE EB TB in diagonal

  • order, directly usable as synfast input.

Source code in src/megatop/utils/harmonic.py
def anafast(maps, maps2=None, *, lmax=None, mmax=None, niter=3, pol=True, nthreads=None):
    """Compute auto or cross power spectrum.

    Routes through ``map2alm`` then ``hp.alm2cl``. For TQU input (``pol=True``),
    spin-0/spin-2 decomposition yields TEB alms; output is six spectra
    TT EE BB TE EB TB in diagonal order, consistent with ``synfast(new=True)``.

    Args:
        maps: Input map (HEALPix ndarray or CAR enmap).
        maps2: Second map for cross-spectrum.
        lmax: Bandlimit.
        mmax: Azimuthal bandlimit (HEALPix only — pixell does not expose it).
        niter: Jacobi iterations for the forward SHT.
        pol: If ``True`` and the map has a Stokes axis of length 3, decompose
            into TEB and return all six spectra. Raises ``ValueError`` if the
            Stokes axis exists but has length ≠ 3.
        nthreads: ducc0 thread count (HEALPix only).

    Returns:
        Power spectrum array. For TQU input: TT EE BB TE EB TB in diagonal
        order, directly usable as ``synfast`` input.
    """

    def _alms(m):
        if m is None:
            return None
        stokes_axis = -3 if _is_car(m) else -2
        has_stokes = m.ndim >= abs(stokes_axis)
        if pol and has_stokes and m.shape[stokes_axis] != 3:
            raise ValueError(
                f"pol=True requires 3 Stokes components along axis {stokes_axis}, "
                f"got shape {m.shape}"
            )
        is_tqu = pol and has_stokes and m.shape[stokes_axis] == 3
        spin = [0, 2] if is_tqu else 0
        return map2alm(m, spin=spin, lmax=lmax, mmax=mmax, niter=niter, nthreads=nthreads)

    return hp.alm2cl(_alms(maps), _alms(maps2), lmax=lmax, mmax=mmax)

gauss_beam(fwhm_arcmin, lmax, *, pol=False)

Wrapper around healpy.gauss_beam with FWHM in arcminutes.

See healpy documentation for full details. The only difference is that fwhm_arcmin is in arcminutes whereas healpy's fwhm is in radians.

Source code in src/megatop/utils/harmonic.py
def gauss_beam(fwhm_arcmin, lmax, *, pol=False):
    """Wrapper around ``healpy.gauss_beam`` with FWHM in arcminutes.

    See healpy documentation for full details. The only difference is that
    ``fwhm_arcmin`` is in arcminutes whereas healpy's ``fwhm`` is in radians.
    """
    return hp.gauss_beam(np.radians(fwhm_arcmin / 60), lmax=lmax, pol=pol)

getlmax(alm, mmax=None)

Infer lmax from alm.shape[-1].

Parameters:

  • alm

    Alm array; last axis stores the (l, m) layout.

  • mmax

    Azimuthal bandlimit if the layout is truncated (mmax < lmax). Defaults to lmax (full triangular layout).

Returns:

  • int

    lmax consistent with alm.shape[-1] and mmax.

Source code in src/megatop/utils/harmonic.py
def getlmax(alm, mmax=None) -> int:
    """Infer ``lmax`` from ``alm.shape[-1]``.

    Args:
        alm: Alm array; last axis stores the ``(l, m)`` layout.
        mmax: Azimuthal bandlimit if the layout is truncated (``mmax < lmax``).
            Defaults to ``lmax`` (full triangular layout).

    Returns:
        ``lmax`` consistent with ``alm.shape[-1]`` and ``mmax``.
    """
    return hp.Alm.getlmax(alm.shape[-1], mmax=mmax)

map2alm(maps, *, spin=0, lmax=None, mmax=None, niter=3, nthreads=None)

Forward SHT, dispatching on pixelization.

Parameters:

  • maps

    HEALPix (..., npix) ndarray or CAR pixell.enmap.ndmap (..., ny, nx).

  • spin

    Spin weight: 0 (T), 2 (Q/U), or a list like [0, 2] for mixed-spin fields. HEALPix splits the map axis by spin group; CAR passes to pixell.

    Unlike healpy's pol=True, TQU → TEB requires spin=[0, 2] explicitly. With spin=[0, 2] and a (3, npix) input the output is (3, nalm) with rows [alm_T, alm_E, alm_B]. Batch dimensions are supported: (batch, 3, npix)(batch, 3, nalm).

  • lmax

    Bandlimit. HEALPix default: 3 * nside - 1; CAR: library default.

  • mmax

    Azimuthal bandlimit (HEALPix only). Defaults to lmax.

  • niter

    Refinement steps. HEALPix: Jacobi iterations; CAR: passed to pixell.

  • nthreads

    ducc0 thread count (HEALPix). None uses MEGATOP_SHT_NTHREADS.

Returns:

  • Alm array, last axis in triangular (l, m) layout.

Source code in src/megatop/utils/harmonic.py
def map2alm(maps, *, spin=0, lmax=None, mmax=None, niter=3, nthreads=None):
    """Forward SHT, dispatching on pixelization.

    Args:
        maps: HEALPix ``(..., npix)`` ndarray or CAR ``pixell.enmap.ndmap``
            ``(..., ny, nx)``.
        spin: Spin weight: ``0`` (T), ``2`` (Q/U), or a list like ``[0, 2]``
            for mixed-spin fields. HEALPix splits the map axis by spin group;
            CAR passes to pixell.

            Unlike healpy's ``pol=True``, TQU → TEB requires ``spin=[0, 2]``
            explicitly. With ``spin=[0, 2]`` and a ``(3, npix)`` input the
            output is ``(3, nalm)`` with rows ``[alm_T, alm_E, alm_B]``.
            Batch dimensions are supported: ``(batch, 3, npix)`` → ``(batch, 3, nalm)``.
        lmax: Bandlimit. HEALPix default: ``3 * nside - 1``; CAR: library default.
        mmax: Azimuthal bandlimit (HEALPix only). Defaults to ``lmax``.
        niter: Refinement steps. HEALPix: Jacobi iterations; CAR: passed to pixell.
        nthreads: ducc0 thread count (HEALPix). ``None`` uses ``MEGATOP_SHT_NTHREADS``.

    Returns:
        Alm array, last axis in triangular ``(l, m)`` layout.
    """
    if _is_car(maps):
        return curvedsky.map2alm(maps, spin=spin, lmax=lmax, niter=niter)
    kw = {"lmax": lmax, "mmax": mmax, "niter": niter, "nthreads": nthreads}
    if isinstance(spin, (list, tuple)):
        alms_out = []
        idx = 0
        for s in spin:
            nmaps = 1 if s == 0 else 2
            # _map2alm_healpix_iter uses ducc0 ([ntrans,] nmaps, npix) convention directly
            alms_out.append(_map2alm_healpix_iter(maps[..., idx : idx + nmaps, :], spin=s, **kw))
            idx += nmaps
        return np.concatenate(alms_out, axis=-2)
    return _map2alm_healpix(maps, spin=spin, **kw)

smooth(maps, fwhm_arcmin, *, pol=False, lmax=None, nside=None, shape=None, wcs=None, out=None, niter=3, nthreads=None)

Smooth a map with a Gaussian beam.

When no output geometry is given the input geometry is reused: HEALPix nside is inferred from the input pixel count; CAR shape and WCS are copied from the input enmap.

Parameters:

  • maps

    Input map — HEALPix (..., npix) ndarray or CAR pixell.enmap.ndmap (..., ny, nx).

  • fwhm_arcmin

    FWHM of the Gaussian beam in arcminutes.

  • pol

    If True, treat maps as TQU and apply separate T and P beams (spin [0, 2]). If False (default), apply a single spin-0 beam to all components.

  • lmax

    Bandlimit. Inferred from the alm output of map2alm if None.

  • nside

    HEALPix output resolution. Defaults to input nside.

  • shape

    CAR output pixel shape. Defaults to input shape.

  • wcs

    CAR world coordinate system. Defaults to input WCS.

  • out

    Pre-allocated output map written in-place and returned.

  • niter

    Jacobi iterations for the forward SHT (HEALPix).

  • nthreads

    ducc0 thread count (HEALPix).

Returns:

  • Smoothed map, same type and geometry as input unless overridden.

Source code in src/megatop/utils/harmonic.py
def smooth(
    maps,
    fwhm_arcmin,
    *,
    pol=False,
    lmax=None,
    nside=None,
    shape=None,
    wcs=None,
    out=None,
    niter=3,
    nthreads=None,
):
    """Smooth a map with a Gaussian beam.

    When no output geometry is given the input geometry is reused:
    HEALPix nside is inferred from the input pixel count; CAR shape and
    WCS are copied from the input enmap.

    Args:
        maps: Input map — HEALPix ``(..., npix)`` ndarray or CAR
            ``pixell.enmap.ndmap`` ``(..., ny, nx)``.
        fwhm_arcmin: FWHM of the Gaussian beam in arcminutes.
        pol: If ``True``, treat ``maps`` as TQU and apply separate T and P
            beams (spin ``[0, 2]``). If ``False`` (default), apply a single
            spin-0 beam to all components.
        lmax: Bandlimit. Inferred from the alm output of ``map2alm`` if
            ``None``.
        nside: HEALPix output resolution. Defaults to input nside.
        shape: CAR output pixel shape. Defaults to input shape.
        wcs: CAR world coordinate system. Defaults to input WCS.
        out: Pre-allocated output map written in-place and returned.
        niter: Jacobi iterations for the forward SHT (HEALPix).
        nthreads: ducc0 thread count (HEALPix).

    Returns:
        Smoothed map, same type and geometry as input unless overridden.
    """
    spin = [0, 2] if pol else 0
    alms = map2alm(maps, spin=spin, lmax=lmax, niter=niter, nthreads=nthreads)
    lmax_alm = getlmax(alms)
    if pol:
        bl = gauss_beam(fwhm_arcmin, lmax_alm, pol=True)  # (lmax+1, 4)
        almxfl(alms[0], bl[:, 0], inplace=True)
        almxfl(alms[1:], bl[:, 1], inplace=True)
    else:
        almxfl(alms, gauss_beam(fwhm_arcmin, lmax_alm), inplace=True)
    if _is_car(maps):
        if out is None and shape is None:
            shape = maps.shape
            wcs = maps.wcs
    elif out is None and nside is None and shape is None:
        nside = hp.npix2nside(np.asarray(maps).shape[-1])
    return alm2map(alms, spin=spin, nside=nside, shape=shape, wcs=wcs, out=out, nthreads=nthreads)

synfast(cl, *, nside=None, shape=None, wcs=None, lmax=None, seed=None, new=True, nthreads=None)

Generate a Gaussian random map from a power spectrum.

Parameters:

  • cl

    Power spectrum. Accepted shapes:

    • 1-D (lmax+1,) — single TT spectrum.
    • 2-D (nspec, lmax+1) — flat spectra; ordering set by new.
    • 2-D (4, lmax+1) — healpy shorthand TT EE BB TE (EB=TB=0).
    • 3-D (n, n, lmax+1) — covariance matrix.
  • nside

    HEALPix resolution. Mutually exclusive with shape/wcs.

  • shape

    CAR pixel shape. Used with wcs.

  • wcs

    CAR world coordinate system. Used with shape.

  • lmax

    Bandlimit. Defaults to library default.

  • seed

    PRNG seed.

  • nthreads

    ducc0 thread count (HEALPix). None uses MEGATOP_SHT_NTHREADS.

  • new

    Ordering for 2-D cl, passed to hp.synalm. Defaults to True (TT EE BB TE EB TB), unlike healpy's default False. No effect for 1-D or 3-D input.

Returns:

  • np.ndarray for HEALPix, pixell.enmap.ndmap for CAR.

Raises:

  • ValueError

    Missing or conflicting pixelization targets.

Source code in src/megatop/utils/harmonic.py
def synfast(cl, *, nside=None, shape=None, wcs=None, lmax=None, seed=None, new=True, nthreads=None):
    """Generate a Gaussian random map from a power spectrum.

    Args:
        cl: Power spectrum. Accepted shapes:

            * 1-D ``(lmax+1,)`` — single TT spectrum.
            * 2-D ``(nspec, lmax+1)`` — flat spectra; ordering set by ``new``.
            * 2-D ``(4, lmax+1)`` — healpy shorthand TT EE BB TE (EB=TB=0).
            * 3-D ``(n, n, lmax+1)`` — covariance matrix.

        nside: HEALPix resolution. Mutually exclusive with ``shape``/``wcs``.
        shape: CAR pixel shape. Used with ``wcs``.
        wcs: CAR world coordinate system. Used with ``shape``.
        lmax: Bandlimit. Defaults to library default.
        seed: PRNG seed.
        nthreads: ducc0 thread count (HEALPix). ``None`` uses ``MEGATOP_SHT_NTHREADS``.
        new: Ordering for 2-D ``cl``, passed to ``hp.synalm``. Defaults to
            ``True`` (TT EE BB TE EB TB), unlike healpy's default ``False``.
            No effect for 1-D or 3-D input.

    Returns:
        ``np.ndarray`` for HEALPix, ``pixell.enmap.ndmap`` for CAR.

    Raises:
        ValueError: Missing or conflicting pixelization targets.
    """
    car_target = shape is not None or wcs is not None
    healpix = nside is not None
    if healpix and car_target:
        raise ValueError("Specify either nside (HEALPix) or shape+wcs (CAR).")
    if not healpix and (shape is None or wcs is None):
        raise ValueError("Provide nside, or both shape and wcs.")

    if seed is not None:
        np.random.seed(seed)  # noqa: NPY002

    cl = np.asarray(cl)
    if cl.ndim == 3:
        new = True
    cl_norm = _normalise_cl(cl)
    scalar = isinstance(cl_norm, np.ndarray) and cl_norm.ndim == 1
    kw = (
        {"nside": nside, "lmax": lmax, "nthreads": nthreads}
        if healpix
        else {"shape": shape[-2:], "wcs": wcs, "lmax": lmax}
    )

    if scalar:
        alm = hp.synalm(cl_norm, lmax=lmax, new=new)
        return alm2map(alm, spin=0, **kw)

    # Multi-component (T, E, B) → synthesise (T, Q, U)
    alm_T, alm_E, alm_B = hp.synalm(cl_norm, lmax=lmax, new=new)
    alms_teb = np.stack([alm_T, alm_E, alm_B])
    return alm2map(alms_teb, spin=[0, 2], **kw)

megatop.utils.mask

get_analysis_mask(common_hitmap, binary_mask, apod_radius_deg, apod_type)

Create analysis mask and apodize it.

Source code in src/megatop/utils/mask.py
def get_analysis_mask(common_hitmap, binary_mask, apod_radius_deg, apod_type):
    """
    Create analysis mask and apodize it.
    """
    return nmt.mask_apodization((binary_mask * common_hitmap), apod_radius_deg, apotype=apod_type)

get_apodized_mask_from_nhits(nhits_map, nside, galactic_mask=None, point_source_mask=None, zero_threshold=0.001, apod_radius=10.0, apod_radius_point_source=4.0, apod_type='C1', no_nhits_rescaling=False)

Produce an appropriately apodized mask from an nhits map as used in
the BB pipeline paper (https://arxiv.org/abs/2302.04276).

Procedure:
* Make binary mask by smoothing, normalizing and thresholding nhits map
* (optional) multiply binary mask by galactic mask
* Apodize (binary * galactic)
* (optional) multiply (binary * galactic) with point source mask
* (optional) apodize (binary * galactic * point source)
* Multiply everything by (smoothed) nhits map (if no_nhits_rescaling is False)

Parameters
----------
nhits_map : array
    maps of the nhits
nside : int
    nside of the output mask.
galactic_mask : array, optional
    galactic mask to apply.
point_source_mask : array, optional
    point source mask to apply.
zero_threshold : float, optional
    Threshold below which nhits values are set to zero.
apod_radius : float, optional
    Apodization radius for the galactic mask (in degrees).
apod_radius_point_source : float, optional
    Apodization radius for the point source mask (in degrees).
apod_type : str, optional
    Type of apodization, default is "C1".
no_nhits_rescaling : bool, optional
    If True, the apodized binary mask outputed is not resacled by nhits . Default is False.

Returns

-------

array

The apodized mask.

Source code in src/megatop/utils/mask.py
def get_apodized_mask_from_nhits(
    nhits_map,
    nside,
    galactic_mask=None,
    point_source_mask=None,
    zero_threshold=1e-3,
    apod_radius=10.0,
    apod_radius_point_source=4.0,
    apod_type="C1",
    no_nhits_rescaling=False,
):
    """
        Produce an appropriately apodized mask from an nhits map as used in
        the BB pipeline paper (https://arxiv.org/abs/2302.04276).

        Procedure:
        * Make binary mask by smoothing, normalizing and thresholding nhits map
        * (optional) multiply binary mask by galactic mask
        * Apodize (binary * galactic)
        * (optional) multiply (binary * galactic) with point source mask
        * (optional) apodize (binary * galactic * point source)
        * Multiply everything by (smoothed) nhits map (if no_nhits_rescaling is False)

        Parameters
        ----------
        nhits_map : array
            maps of the nhits
        nside : int
            nside of the output mask.
        galactic_mask : array, optional
            galactic mask to apply.
        point_source_mask : array, optional
            point source mask to apply.
        zero_threshold : float, optional
            Threshold below which nhits values are set to zero.
        apod_radius : float, optional
            Apodization radius for the galactic mask (in degrees).
        apod_radius_point_source : float, optional
            Apodization radius for the point source mask (in degrees).
        apod_type : str, optional
            Type of apodization, default is "C1".
        no_nhits_rescaling : bool, optional
            If True, the apodized binary mask outputed is not resacled by nhits . Default is False.

    #     Returns
    #     -------
    #     array
    #         The apodized mask.
    #"""
    # Get binary mask
    binary_mask = get_binary_mask_from_nhits(nhits_map, nside, zero_threshold)

    # Multiply by Galactic mask
    if galactic_mask is not None:
        binary_mask *= hp.ud_grade(galactic_mask, nside)

    # Apodize the binary mask
    binary_mask = nmt.mask_apodization(binary_mask, apod_radius, apotype=apod_type)

    # Multiply with point source mask
    if point_source_mask is not None:
        binary_mask *= hp.ud_grade(point_source_mask, nside)
        binary_mask = nmt.mask_apodization(binary_mask, apod_radius_point_source, apotype=apod_type)
    if no_nhits_rescaling:
        return binary_mask
    return nhits_map * binary_mask

get_binary_mask(common_hitmap, gal_mask, zero_threshold)

Compute a binary mask by thresholding the common_hitmap and galactic mask.

Source code in src/megatop/utils/mask.py
def get_binary_mask(common_hitmap, gal_mask, zero_threshold):
    """
    Compute a binary mask by thresholding the common_hitmap and galactic mask.
    """
    # TODO: Include PS mask
    binary_mask = np.zeros_like(common_hitmap)
    binary_mask[common_hitmap > zero_threshold] = 1
    return binary_mask * gal_mask

get_binary_mask_from_nhits(nhits_map, nside, zero_threshold=0.001)

Generate a binary mask from a nhits by setting to zero pixels below a certain threshold and to 1 others.

Parameters

nhits_map : array maps of the nhits nside : int nside of the output mask zero_threshold: float threshold for setting pixels to 0 and 1.

Returns

binary_mask: array

Source code in src/megatop/utils/mask.py
def get_binary_mask_from_nhits(nhits_map, nside, zero_threshold=1e-3):
    """
    Generate a binary mask from a nhits by setting to zero pixels below a certain threshold and to 1 others.

    Parameters
    ----------
    nhits_map : array
        maps of the nhits
    nside : int
        nside of the output mask
    zero_threshold: float
        threshold for setting pixels to 0 and 1.

    Returns
    -------
    binary_mask: array
    """
    nhits_smoothed = hu.smooth(hp.ud_grade(nhits_map, nside, power=-2, dtype=np.float64), 60.0)
    nhits_smoothed[nhits_smoothed < 0] = 0
    nhits_smoothed /= np.amax(nhits_smoothed)
    binary_mask = np.zeros_like(nhits_smoothed)
    binary_mask[nhits_smoothed > zero_threshold] = 1

    return binary_mask

get_common_nhits_map(hit_maps, fwhm_arcmin_nhits)

Get the 'common' nhits map by mutpliplying the (normalised) nhits maps for all frequencies together.

Source code in src/megatop/utils/mask.py
def get_common_nhits_map(hit_maps, fwhm_arcmin_nhits):
    """
    Get the 'common' nhits map by mutpliplying the (normalised) nhits maps for all frequencies together.
    """
    common_nhits_map = (
        np.prod(hit_maps, axis=0) ** (1.0 / hit_maps.shape[0]) if hit_maps.ndim > 1 else hit_maps
    )
    return smooth_mask(common_nhits_map / np.max(common_nhits_map), fwhm_arcmin=fwhm_arcmin_nhits)

get_norm_smooth_nhits_from_depth(depth_maps, fwhm_arcmin_nhits)

Compute normalised and smoothed hitmap(s) from a depth map(s). Parameters


depth_maps : array input depth map (in muk_arcmin), shape is (..., npix)

Source code in src/megatop/utils/mask.py
def get_norm_smooth_nhits_from_depth(depth_maps, fwhm_arcmin_nhits):
    """
    Compute normalised and smoothed hitmap(s) from a depth map(s).
    Parameters
    ----------
    depth_maps : array
        input depth map (in muk_arcmin), shape is (..., npix)
    """
    nhits_map = np.zeros_like(depth_maps)
    valid = depth_maps > 0
    nhits_map[valid] = 1.0 / (depth_maps[valid] ** 2)
    return norm_smooth_nhits_maps(nhits_maps=nhits_map, fwhm_arcmin_nhits=fwhm_arcmin_nhits)

get_spin_derivatives(map)

First and second spin derivatives of a given spin-0 map. Parameters


map : array Input spin-0 map in HEALPix format.

Returns

tuple of arrays First and second spin derivatives of the input map.

Source code in src/megatop/utils/mask.py
def get_spin_derivatives(map):
    """
    First and second spin derivatives of a given spin-0 map.
    Parameters
    ----------
    map : array
        Input spin-0 map in HEALPix format.

    Returns
    -------
    tuple of arrays
        First and second spin derivatives of the input map.
    """
    nside = hp.npix2nside(np.shape(map)[-1])
    ell = np.arange(3 * nside)
    alpha1i = np.sqrt(ell * (ell + 1.0))
    alpha2i = np.sqrt((ell - 1.0) * ell * (ell + 1.0) * (ell + 2.0))
    alm = hu.map2alm(map, spin=0)
    first = hu.alm2map(hu.almxfl(alm, alpha1i), spin=0, nside=nside)
    second = hu.alm2map(hu.almxfl(alm, alpha2i), spin=0, nside=nside)

    return first, second

norm_smooth_nhits_maps(nhits_maps, fwhm_arcmin_nhits)

Normalize nhits_map(s)

Source code in src/megatop/utils/mask.py
def norm_smooth_nhits_maps(nhits_maps, fwhm_arcmin_nhits):
    """
    Normalize nhits_map(s)
    """
    if nhits_maps.ndim > 1:
        norm_nhits = nhits_maps / np.max(nhits_maps, axis=1)[..., np.newaxis]
    else:
        norm_nhits = nhits_maps / np.max(nhits_maps)
    return smooth_mask(mask=norm_nhits, fwhm_arcmin=fwhm_arcmin_nhits)

random_src_mask(mask, nsrcs, mask_radius_arcmin)

Generate a modified version of a mask by randomly masking circular regions around selected points.

Parameters

mask : array input mask on which random point sources will be substracted nrscs : int number of random sources to be masked mask_radius_arcmin: float radius (in arcmin) of the mask around each source

Returns

ps_mask: array

Source code in src/megatop/utils/mask.py
def random_src_mask(mask, nsrcs, mask_radius_arcmin):
    """
    Generate a modified version of a mask by randomly masking circular regions around selected points.

    Parameters
    ----------
    mask : array
        input mask on which random point sources will be substracted
    nrscs : int
        number of random sources to be masked
    mask_radius_arcmin: float
        radius (in arcmin) of the mask around each source

    Returns
    -------
    ps_mask: array
    """
    ps_mask = mask.copy()
    rng = np.random.default_rng()
    src_ids = rng.choice(np.where(mask == 1)[0], nsrcs)
    for src_id in src_ids:
        vec = hp.pix2vec(hp.get_nside(mask), src_id)
        disc = hp.query_disc(hp.get_nside(mask), vec, np.deg2rad(mask_radius_arcmin / 60))
        ps_mask[disc] = 0
    return ps_mask

read_depth_maps(list_depthmapname, nside)

Read depth maps and ud_grade. Maps are assumed equatorial (celestial).

Source code in src/megatop/utils/mask.py
def read_depth_maps(list_depthmapname: list[Path], nside: int):
    """
    Read depth maps and ud_grade. Maps are assumed equatorial (celestial).
    """
    depth_maps = []
    for depthmapname in list_depthmapname:
        depth_maps.append(hp.ud_grade(hp.read_map(depthmapname, field=0), nside_out=nside))
    return np.array(depth_maps, dtype=np.float64)

read_nhits_maps(list_hitmapname, nside)

Read hit maps and ud_grade. Maps are assumed equatorial (celestial).

Source code in src/megatop/utils/mask.py
def read_nhits_maps(list_hitmapname: list[Path], nside: int):
    """
    Read hit maps and ud_grade. Maps are assumed equatorial (celestial).
    """
    nhits_maps = []
    SO_NOMINAL_NHITS = None
    for hitmapname in list_hitmapname:
        if hitmapname == "SO_nominal":
            if SO_NOMINAL_NHITS is None:
                try:
                    logger.info(f"Downloading nominal hit map from {SO_NOMINAL_HITMAP_URL}")
                    with urlopen(SO_NOMINAL_HITMAP_URL, timeout=10) as _:
                        # healpy can read directly from the URL
                        SO_NOMINAL_NHITS = hp.read_map(SO_NOMINAL_HITMAP_URL)
                        SO_NOMINAL_NHITS = hp.ud_grade(SO_NOMINAL_NHITS, nside, power=-2)
                except URLError:
                    logger.error("Nominal hitmap download failed")
                    logger.error("Exiting mask_handler without creating a mask")
                    sys.exit()
            nhits_maps.append(SO_NOMINAL_NHITS)
        else:
            nhits_maps.append(
                hp.ud_grade(hp.read_map(hitmapname, field=0), nside_out=nside, power=-2)
            )
    return np.array(nhits_maps, dtype=np.float64)

smooth_mask(mask, fwhm_arcmin)

Smooth mask(s) with a (gaussian) beam.

Source code in src/megatop/utils/mask.py
def smooth_mask(mask, fwhm_arcmin):
    """
    Smooth mask(s) with a (gaussian) beam.
    """
    mask_smoothed = hu.smooth(mask, fwhm_arcmin)
    mask_smoothed[mask_smoothed < 0] = 0
    return mask_smoothed

megatop.utils.mpi

MPIGATHER(array, comm, rank, size, root)

Gathers an array to the root process using MPI.

Parameters

array : np.ndarray The array to gather. comm : MPI.COMM_WORLD The MPI communicator. rank : int The rank of the process. size : int The size of the communicator. root : int The root process.

Returns

array_recvbuf : np.ndarray The gathered array.

Source code in src/megatop/utils/mpi.py
@requires_mpi4py
def MPIGATHER(array, comm, rank, size, root):
    """
    Gathers an array to the root process using MPI.

    Parameters
    ----------
    array : np.ndarray
        The array to gather.
    comm : MPI.COMM_WORLD
        The MPI communicator.
    rank : int
        The rank of the process.
    size : int
        The size of the communicator.
    root : int
        The root process.

    Returns
    -------
    array_recvbuf : np.ndarray
        The gathered array.

    """

    array = np.ascontiguousarray(array)

    array_recvbuf = None
    if rank == 0:
        shape_recvbuf_array = (size, *array.shape)
        array_recvbuf = np.empty(shape_recvbuf_array)

        # Ensure recvbuf is contiguous
        # to make sure the comm.Gather() works correctly
        array_recvbuf = np.ascontiguousarray(array_recvbuf)

    comm.Gather(array, array_recvbuf, root=root)

    return array_recvbuf

MPISUM(array, comm, rank, root)

Reduces an array using the SUM operator to the root process using MPI.

Parameters

array : np.ndarray The array to reduce. comm : MPI.COMM_WORLD The MPI communicator. rank : int The rank of the process. root : int The root process.

Returns

array_recvbuf : np.ndarray The reduced array.

Source code in src/megatop/utils/mpi.py
@requires_mpi4py
def MPISUM(array, comm, rank, root):
    """
    Reduces an array using the SUM operator to the root process using MPI.

    Parameters
    ----------
    array : np.ndarray
        The array to reduce.
    comm : MPI.COMM_WORLD
        The MPI communicator.
    rank : int
        The rank of the process.
    root : int
        The root process.

    Returns
    -------
    array_recvbuf : np.ndarray
        The reduced array.

    """

    array_recvbuf = np.zeros_like(array) if rank == root else None

    array = np.ascontiguousarray(array)

    comm.Reduce(array, array_recvbuf, op=MPI.SUM, root=root)

    return array_recvbuf

megatop.utils.passband

fgbuster_passband(map_sets)

Source code in src/megatop/utils/passband.py
def fgbuster_passband(map_sets: list) -> list:
    """"""
    passbands_norm = []
    for map_set in map_sets:
        weight = map_set.weight / _jysr2rj(map_set.frequency)
        weight /= _rj2cmb(
            map_set.frequency
        )  # so now on top of the weight we have the conversion factor from K_CMB to Jy/sr
        weight /= trapezoid(np.nan_to_num(weight, nan=0), map_set.frequency * 1e9)
        passbands_norm.append([map_set.frequency, np.nan_to_num(weight, nan=0)])
    return passbands_norm

passband_constructor(config, manager, passband_int)

Read passbands from config file and outputs

Source code in src/megatop/utils/passband.py
def passband_constructor(config: Config, manager: DataManager, passband_int: bool) -> list:
    """Read passbands from config file and outputs"""  # TODO doc and shape/type of output
    for map_set in config.map_sets:
        if passband_int:
            tab = QTable.read(
                manager.path_to_passbands / map_set.passband_filename, format="ascii.ipac"
            )
            map_set.frequency = np.array(tab["bandpass_frequency"])
            map_set.weight = np.array(tab["bandpass_weight"])
        else:
            map_set.frequency = map_set.freq_tag
            map_set.weight = 1.0
    return config.map_sets

megatop.utils.plot

freq_maps_plotter(config, map_set, plot_dir, plot_name, vmin=None, vmax=None, cmap_set_under='w', component='CMB')

This function plots the frequency maps. It directly saves the plot directly.

Parameters:

  • config

    The configuration object.

  • map_set (ndarray) –

    The frequency maps, with shape (num_freq, num_stokes, num_pixels) or (1, num_stokes, num_pixels) if there is only one set of (TQU) map to display (e.g. CMB maps).

  • plot_dir (str) –

    The path to the directory where the plot will be saved.

  • plot_name (str) –

    The name of the file to save the plot.

  • vmin (dict, default: None ) –

    The minimum values for the colorbar of the plots.

  • vmax (dict, default: None ) –

    The maximum values for the colorbar of the plots.

  • cmap (colormap) –

    The colormap to use for the plots.

  • cmap_set_under (str, default: 'w' ) –

    The color to set for the values under vmin.

  • component (str, default: 'CMB' ) –

    If map_set of the shape (1, num_stokes, num_pixels) this parameter gives the name of the component to plot, e.g. CMB, fg, etc.

Returns:

  • None

Source code in src/megatop/utils/plot.py
def freq_maps_plotter(
    config,
    map_set,
    plot_dir,
    plot_name,
    vmin=None,
    vmax=None,
    cmap_set_under="w",
    component="CMB",
):
    """
    This function plots the frequency maps. It directly saves the plot directly.

    Args:
        config: The configuration object.
        map_set (ndarray): The frequency maps, with shape (num_freq, num_stokes, num_pixels) or (1, num_stokes, num_pixels) if
                            there is only one set of (TQU) map to display (e.g. CMB maps).
        plot_dir (str): The path to the directory where the plot will be saved.
        plot_name (str): The name of the file to save the plot.
        vmin (dict): The minimum values for the colorbar of the plots.
        vmax (dict): The maximum values for the colorbar of the plots.
        cmap (colormap): The colormap to use for the plots.
        cmap_set_under (str): The color to set for the values under vmin.
        component (str): If map_set of the shape (1, num_stokes, num_pixels) this parameter gives the
                        name of the component to plot, e.g. CMB, fg, etc.

    Returns:
        None
    """

    if vmin is None:
        vmin = {"I": None, "Q": None, "U": None}
    if vmax is None:
        vmax = {"I": None, "Q": None, "U": None}

    plt.figure(figsize=(20, 7))
    k = 0

    if map_set.shape == (1, 3, map_set.shape[-1]) or map_set.shape == (1, 2, map_set.shape[-1]):
        enum_freq = [0]
        row = 1
        column = map_set.shape[1]
    elif map_set.shape == (len(config.frequencies), 3, map_set.shape[-1]) or map_set.shape == (
        len(config.frequencies),
        2,
        map_set.shape[-1],
    ):
        enum_freq = config.frequencies
        row = map_set.shape[1]
        column = len(config.frequencies)
    else:
        logger.error(
            f"In freq_maps_plotter() map_set doesn't have the right shape, must be (nfreq, nstokes, npix) OR (1, nstokes, npix), here: {map_set.shape}"
        )
        msg = "Bad map set shape"
        raise TypeError(msg)

    stokes_list = ["I", "Q", "U"] if map_set.shape[1] == 3 else ["Q", "U"]

    for j_stokes, stokes in enumerate(stokes_list):
        for i_f, fr in enumerate(enum_freq):
            title_map = f"{component} {stokes}" if enum_freq == [0] else f"{fr} GHz {stokes}"
            if stokes == "I":
                cmap = cm.RdBu
                cmap.set_under(cmap_set_under)
                hp.mollview(
                    map_set[i_f, j_stokes],
                    cmap=cmap,
                    title=title_map,
                    min=vmin[stokes],
                    max=vmax[stokes],
                    sub=(row, column, k + 1),
                )
            else:
                cmap = cm.Greys
                cmap.set_under(cmap_set_under)
                hp.mollview(
                    map_set[i_f, j_stokes],
                    cmap=cmap,
                    title=title_map,
                    min=vmin[stokes],
                    max=vmax[stokes],
                    sub=(row, column, k + 1),
                )
            k += 1

    plt.savefig(plot_dir / plot_name, bbox_inches="tight")
    plt.clf()

freq_maps_plotter_one_stoke(config, map_set, plot_dir, plot_name, vmin=None, vmax=None, cmap=cm.RdBu, cmap_set_under='w', title_prefix='')

This function plots the frequency maps. It directly saves the plot directly.

Parameters:

  • config

    The configuration object.

  • map_set (ndarray) –

    The frequency maps, with shape (num_freq, 1, num_pixels).

  • plot_dir (str) –

    The path to the directory where the plot will be saved.

  • plot_name (str) –

    The name of the file to save the plot.

  • vmin (dict, default: None ) –

    The minimum values for the colorbar of the plots.

  • vmax (dict, default: None ) –

    The maximum values for the colorbar of the plots.

  • cmap (colormap, default: RdBu ) –

    The colormap to use for the plots.

  • cmap_set_under (str, default: 'w' ) –

    The color to set for the values under vmin.

Returns:

  • None

Source code in src/megatop/utils/plot.py
def freq_maps_plotter_one_stoke(
    config,
    map_set,
    plot_dir,
    plot_name,
    vmin=None,
    vmax=None,
    cmap=cm.RdBu,
    cmap_set_under="w",
    title_prefix="",
):
    """
    This function plots the frequency maps. It directly saves the plot directly.

    Args:
        config: The configuration object.
        map_set (ndarray): The frequency maps, with shape (num_freq, 1, num_pixels).
        plot_dir (str): The path to the directory where the plot will be saved.
        plot_name (str): The name of the file to save the plot.
        vmin (dict): The minimum values for the colorbar of the plots.
        vmax (dict): The maximum values for the colorbar of the plots.
        cmap (colormap): The colormap to use for the plots.
        cmap_set_under (str): The color to set for the values under vmin.


    Returns:
        None
    """

    cmap.set_under(cmap_set_under)

    plt.figure(figsize=(20, 7))

    # TODO: it nfreq>6 this might get a bit busy...
    row = 1
    column = len(config.frequencies)

    for i_f, fr in enumerate(config.frequencies):
        title_map = f"{title_prefix} {fr} GHz"

        hp.mollview(
            map_set[i_f],
            cmap=cmap,
            title=title_map,
            min=vmin,
            max=vmax,
            sub=(row, column, i_f + 1),
        )

    plt.savefig(plot_dir / plot_name, bbox_inches="tight")
    plt.clf()

plotTTEEBB(plot_dir, freqs, Cl, save_name, legend_labels=('fg $C_\\ell$ $\\nu=$',), y_axis_label='y_axis', use_D_ell=True, lims_x=(2, 2000), lims_y=(0.01, 10000000.0), ell=None)

This function plots the Cls. It directly saves the plot directly.

Parameters:

  • plot_dir (str) –

    The path to the directory where the plot will be saved.

  • freqs (list) –

    The frequencies of the maps.

  • Cl (ndarray) –

    The Cls, with shape (num_freq, num_spectra [TT,EE,BB], num_ell).

  • save_name (str) –

    The name of the file to save the plot.

  • legend_labels (list, default: ('fg $C_\\ell$ $\\nu=$',) ) –

    The labels for the legend of the plot.

  • y_axis_label (str, default: 'y_axis' ) –

    The label for the y axis of the plot.

  • use_D_ell (bool, default: True ) –

    If True, the Cls are multiplied by ell*(ell+1)/2/pi.

  • lims_x (tuple, default: (2, 2000) ) –

    The limits for the x axis of the plot.

  • lims_y (tuple, default: (0.01, 10000000.0) ) –

    The limits for the y axis of the plot.

  • ell (ndarray, default: None ) –

    The ell values to use for the x axis. If None, it will be set to np.arange(0, Cl.shape[-1]).

Source code in src/megatop/utils/plot.py
def plotTTEEBB(
    plot_dir,
    freqs,
    Cl,
    save_name,
    legend_labels=(r"fg $C_\ell$ $\nu=$",),
    y_axis_label="y_axis",
    use_D_ell=True,
    lims_x=(2, 2000),
    lims_y=(1e-2, 1e7),
    ell=None,
):
    """
    This function plots the Cls. It directly saves the plot directly.

    Args:
        plot_dir (str): The path to the directory where the plot will be saved.
        freqs (list): The frequencies of the maps.
        Cl (ndarray): The Cls, with shape (num_freq, num_spectra [TT,EE,BB], num_ell).
        save_name (str): The name of the file to save the plot.
        legend_labels (list): The labels for the legend of the plot.
        y_axis_label (str): The label for the y axis of the plot.
        use_D_ell (bool): If True, the Cls are multiplied by ell*(ell+1)/2/pi.
        lims_x (tuple): The limits for the x axis of the plot.
        lims_y (tuple): The limits for the y axis of the plot.
        ell (ndarray): The ell values to use for the x axis. If None, it will be set to np.arange(0, Cl.shape[-1]).
    """
    if ell is None:
        ell = np.arange(0, Cl.shape[-1])
    norm = ell * (ell + 1) / 2 / np.pi

    if not use_D_ell:
        norm = 1

    fig, ax = plt.subplots(1, 3, sharex=True, sharey="row", figsize=(16, 9))
    for f in range(Cl.shape[0]):
        ax[0].plot(ell, norm * Cl[f, 0], color="C" + str(f), ls="-", alpha=0.4)
        ax[1].plot(ell, norm * Cl[f, 1], color="C" + str(f), ls="-", alpha=0.4)
        ax[2].plot(
            ell,
            norm * Cl[f, 2],
            label=legend_labels[0] + str(freqs[f]) * (Cl.shape[0] != 1),
            color="C" + str(f),
            ls="-",
            alpha=0.4,
        )

    ax[0].set_title("TT")
    ax[1].set_title("EE")
    ax[2].set_title("BB")
    if lims_x is None:
        lims_x = (2, ell[-1])

    ax[0].set_xlabel(r"$\ell$")
    ax[1].set_xlabel(r"$\ell$")
    ax[2].set_xlabel(r"$\ell$")

    ax[0].set_ylabel(y_axis_label)

    ax[2].legend(bbox_to_anchor=(1.1, 1.05), fancybox=True, shadow=True)
    ax[0].loglog()
    ax[1].loglog()
    ax[2].loglog()

    ax[0].set_xlim(lims_x)
    ax[0].set_ylim(lims_y)
    plt.subplots_adjust(wspace=0, hspace=0)
    plt.savefig(plot_dir / save_name, bbox_inches="tight")
    plt.close()

plotTTEEBB_diff(plot_dir, freqs, Cl_data, Cl_model, save_name, legend_labels=('label data $C_\\ell$ $\\nu=$', 'label model $C_\\ell$ $\\nu=$'), axis_labels=('y_axis_row0', 'y_axis_row1'), use_D_ell=True, lims_x=(2, 2000), lims_y=(0.01, 10000000.0))

This function plots the difference between the data and the model Cls. It directly saves the plot directly.

Parameters:

  • plot_dir (str) –

    The path to the directory where the plot will be saved.

  • freqs (list) –

    The frequencies of the maps.

  • Cl_data (ndarray) –

    The Cls of the data, with shape (num_freq, num_spectra [TT,EE,BB], num_ell).

  • Cl_model (ndarray) –

    The Cls of the model that the data will be compared to, with shape (num_freq, num_spectra [TT,EE,BB], num_ell).

  • save_name (str) –

    The name of the file to save the plot.

  • legend_labels (list, default: ('label data $C_\\ell$ $\\nu=$', 'label model $C_\\ell$ $\\nu=$') ) –

    The labels for the legend of the plot (one for data, one for model)

  • axis_labels (list, default: ('y_axis_row0', 'y_axis_row1') ) –

    The labels for the y axis of the plot.

  • use_D_ell (bool, default: True ) –

    If True, the Cls are multiplied by ell*(ell+1)/2/pi.

  • lims_x (tuple, default: (2, 2000) ) –

    The limits for the x axis of the plot.

  • lims_y (tuple, default: (0.01, 10000000.0) ) –

    The limits for the y axis of the plot.

Returns:

  • None

Source code in src/megatop/utils/plot.py
def plotTTEEBB_diff(
    plot_dir,
    freqs,
    Cl_data,
    Cl_model,
    save_name,
    legend_labels=(r"label data $C_\ell$ $\nu=$", r"label model $C_\ell$ $\nu=$"),
    axis_labels=("y_axis_row0", "y_axis_row1"),
    use_D_ell=True,
    lims_x=(2, 2000),
    lims_y=(1e-2, 1e7),
):
    """
    This function plots the difference between the data and the model Cls. It directly saves the plot directly.

    Args:
        plot_dir (str): The path to the directory where the plot will be saved.
        freqs (list): The frequencies of the maps.
        Cl_data (ndarray): The Cls of the data, with shape (num_freq, num_spectra [TT,EE,BB], num_ell).
        Cl_model (ndarray): The Cls of the model that the data will be compared to,
                            with shape (num_freq, num_spectra [TT,EE,BB], num_ell).
        save_name (str): The name of the file to save the plot.
        legend_labels (list): The labels for the legend of the plot (one for data, one for model)
        axis_labels (list): The labels for the y axis of the plot.
        use_D_ell (bool): If True, the Cls are multiplied by ell*(ell+1)/2/pi.
        lims_x (tuple): The limits for the x axis of the plot.
        lims_y (tuple): The limits for the y axis of the plot.


    Returns:
        None
    """

    ell = np.arange(0, Cl_data.shape[-1])
    norm = ell * (ell + 1) / 2 / np.pi

    if Cl_data.ndim == 2:
        Cl_data = Cl_data[np.newaxis, ...]
    if Cl_model.ndim == 2:
        Cl_model = Cl_model[np.newaxis, ...]
    Cl_model = Cl_model[..., ell]

    if not use_D_ell:
        norm = 1

    fig, ax = plt.subplots(2, 3, sharex=True, sharey="row", figsize=(15, 15))
    for f in range(Cl_data.shape[0]):
        ax[0][0].plot(ell, norm * Cl_data[f, 0], color="C" + str(f), ls="-", alpha=0.4)
        ax[0][1].plot(ell, norm * Cl_data[f, 1], color="C" + str(f), ls="-", alpha=0.4)
        ax[0][2].plot(
            ell,
            norm * Cl_data[f, 2],
            label=legend_labels[0] + str(freqs[f]) * (Cl_data.shape[0] != 1),
            color="C" + str(f),
            ls="-",
            alpha=0.4,
        )
        ax[0][0].plot(ell, norm * Cl_model[f, 0], color="C" + str(f), ls=":")
        ax[0][1].plot(ell, norm * Cl_model[f, 1], color="C" + str(f), ls=":")
        ax[0][2].plot(
            ell,
            norm * Cl_model[f, 2],
            label=legend_labels[1] + str(freqs[f]) * (Cl_data.shape[0] != 1),
            color="C" + str(f),
            ls=":",
        )

        zero_index_model0 = np.where(Cl_model[f, 0] != 0)[0]
        zero_index_model1 = np.where(Cl_model[f, 1] != 0)[0]
        zero_index_model2 = np.where(Cl_model[f, 2] != 0)[0]
        ax[1][0].plot(
            ell[zero_index_model0],
            (
                (Cl_data[f, 0] - Cl_model[f, 0])[zero_index_model0]
                / Cl_model[f, 0, zero_index_model0]
            ),
            color="C" + str(f),
            ls="-",
            alpha=0.4,
        )
        ax[1][1].plot(
            ell[zero_index_model1],
            (
                (Cl_data[f, 1] - Cl_model[f, 1])[zero_index_model1]
                / Cl_model[f, 1, zero_index_model1]
            ),
            color="C" + str(f),
            ls="-",
            alpha=0.4,
        )
        ax[1][2].plot(
            ell[zero_index_model2],
            (
                (Cl_data[f, 2] - Cl_model[f, 2])[zero_index_model2]
                / Cl_model[f, 2, zero_index_model2]
            ),
            color="C" + str(f),
            ls="-",
            alpha=0.4,
        )

    ax[0][0].set_title("TT")
    ax[0][1].set_title("EE")
    ax[0][2].set_title("BB")
    ax[1][0].set_xlabel(r"$\ell$")
    ax[1][1].set_xlabel(r"$\ell$")
    ax[1][2].set_xlabel(r"$\ell$")
    ax[0][0].set_ylabel(axis_labels[0])
    ax[1][0].set_ylabel(axis_labels[1])
    ax[0][2].legend(bbox_to_anchor=(1.1, 1.05), fancybox=True, shadow=True)
    ax[0][0].loglog()
    ax[0][1].loglog()
    ax[0][2].loglog()
    ax[1][0].set_xscale("log")
    ax[1][1].set_xscale("log")
    ax[1][2].set_xscale("log")
    ax[1][0].grid(axis="y", c="k", alpha=0.5, ls="dashed")
    ax[1][1].grid(axis="y", c="k", alpha=0.5, ls="dashed")
    ax[1][2].grid(axis="y", c="k", alpha=0.5, ls="dashed")
    if lims_x is None:
        lims_x = (2, ell[-1])
        # ell[-1] allows to avoid empty space on the right,
        # 2 is to avoid the first 2 ell that are ill-defined
    ax[0][0].set_xlim(lims_x)
    ax[0][0].set_ylim(lims_y)

    plt.subplots_adjust(wspace=0, hspace=0)
    plt.savefig(plot_dir / save_name, bbox_inches="tight")
    plt.close()

plot_all_Cls(all_Cls, bin_centre, plot_dir, plot_name, use_D_ell=True, y_axis_label='y_axis')

This function plots the Cls outputed from map_to_cl or noise_spectra_estimator. It directly saves the plot directly.

Parameters:

  • all_Cls (dict) –

    Each entry correspond to auto or cross correlation between component maps (e.g "CMBxCMB", "CMBxDust" etc) in the shape of (num_spectra, num_ell) where num_spectra=4 : [EE,EB,BE,BB]

  • bin_centre (ndarray) –

    The bin centres.

  • plot_dir (str) –

    The path to the directory where the plot will be saved.

  • plot_name (str) –

    The name of the file to save the plot.

  • use_D_ell (bool, default: True ) –

    If True, the Cls are multiplied by ell*(ell+1)/2/pi.

  • y_axis_label (str, default: 'y_axis' ) –

    The label for the y axis of the plot.

Returns:

  • None

Source code in src/megatop/utils/plot.py
def plot_all_Cls(all_Cls, bin_centre, plot_dir, plot_name, use_D_ell=True, y_axis_label="y_axis"):
    """
    This function plots the Cls outputed from map_to_cl or noise_spectra_estimator.
    It directly saves the plot directly.

    Args:
        all_Cls (dict): Each entry correspond to auto or cross correlation between
                        component maps (e.g "CMBxCMB", "CMBxDust" etc)
                        in the shape of (num_spectra, num_ell) where num_spectra=4 : [EE,EB,BE,BB]
        bin_centre (ndarray): The bin centres.
        plot_dir (str): The path to the directory where the plot will be saved.
        plot_name (str): The name of the file to save the plot.
        use_D_ell (bool): If True, the Cls are multiplied by ell*(ell+1)/2/pi.
        y_axis_label (str): The label for the y axis of the plot.

    Returns:
        None
    """
    norm = bin_centre * (bin_centre + 1) / 2 / np.pi
    if not use_D_ell:
        norm = 1
    # Define the labels
    labels = ["EE", "EB", "BE", "BB"]

    # Create the figure
    fig, ax = plt.subplots(1, 4, sharex=True, sharey="row", figsize=(16, 9))

    ax = ax.flatten()

    # Loop over the different power spectra
    for i, label in enumerate(labels):
        # Loop over the different components
        for j, key in enumerate(all_Cls.keys()):
            alpha = 0.4 if key not in ("CMBxCMB", "Noise_CMBxNoise_CMB") else 1
            color = "C" + str(j) if key not in ("CMBxCMB", "Noise_CMBxNoise_CMB") else "black"
            ax[i].scatter(
                bin_centre, norm * all_Cls[key][i], marker=".", label=key, color=color, alpha=alpha
            )
            ax[i].scatter(
                bin_centre,
                -norm * all_Cls[key][i],
                marker="x",
                label="-" + key,
                color=color,
                alpha=alpha,
            )

        ax[i].set_title(label)

        ax[i].set_yscale("log")
        ax[i].set_xscale("log")
        ax[i].set_xlabel(r"$\ell$")
    ax[0].set_ylabel(y_axis_label)
    ax[3].legend(bbox_to_anchor=(1.1, 1.05), fancybox=True, shadow=True)

    plt.subplots_adjust(wspace=0, hspace=0)

    plt.savefig(plot_dir / plot_name, bbox_inches="tight")
    plt.close()

megatop.utils.preproc

read_input_maps(list_mapnames)

This function reads the frequency maps from the files and returns them as an array.

Parameters

list_mapnames: list list of paths to maps

Returns
freq_maps_input : (list)
    The list of frequency maps, with len (num_freq), each of them having shape (num_stokes, num_pixels).s
Notes
  • The input maps are allowed to have different `num_pixels' to allow for different resolutions.
Source code in src/megatop/utils/preproc.py
def read_input_maps(list_mapnames: list[Path]) -> list[npt.ArrayLike]:
    """
    This function reads the frequency maps from the files and returns them as an array.

    Parameters
    ----------
    list_mapnames: list
        list of paths to maps

    Returns
    -------
        freq_maps_input : (list)
            The list of frequency maps, with len (num_freq), each of them having shape (num_stokes, num_pixels).s
    Notes
    -----
    * The input maps are allowed to have different `num_pixels' to allow for different resolutions.
    """
    freq_maps_input = []
    for mapname in list_mapnames:
        logger.debug(f"Reading map from {mapname}")
        freq_maps_input.append(hp.read_map(mapname, field=None, dtype=np.float64))
    return freq_maps_input

megatop.utils.spectra

compute_spectra_from_camb(r, cosmo_pars, which='total')

Compute CMB TT, EE, BB, TE spectra from CAMB. Parameters


r : float Tensor-to-scalar ratio. cosmo_pars : CAMBCosmoPars Cosmological parameters passed to camb.set_params(). which : str One of the spectra keys returned by results.get_cmb_power_spectra(). Returns


TT, EE, BB, TE : array-like Each with shape (LMAX+1,)

Source code in src/megatop/utils/spectra.py
def compute_spectra_from_camb(
    r: float,
    cosmo_pars: CAMBCosmoPars,
    which: CAMBSpectraKey = "total",
):
    """
    Compute CMB TT, EE, BB, TE spectra from CAMB.
    Parameters
    ----------
    r : float
        Tensor-to-scalar ratio.
    cosmo_pars : CAMBCosmoPars
        Cosmological parameters passed to camb.set_params().
    which : str
        One of the spectra keys returned by results.get_cmb_power_spectra().
    Returns
    -------
    TT, EE, BB, TE : array-like
        Each with shape (LMAX+1,)
    """

    cosmo_params = camb.set_params(**cosmo_pars.as_camb_kwargs())
    cosmo_params.set_for_lmax(2000, lens_potential_accuracy=1)
    cosmo_params.WantTensors = True

    infl_params = camb.initialpower.InitialPowerLaw()
    infl_params.set_params(
        ns=cosmo_pars.ns,
        r=r,
        parameterization="tensor_param_indeptilt",
        nt=0,
        ntrun=0,
    )
    cosmo_params.InitPower = infl_params

    results = camb.get_results(cosmo_params)
    full_spectra = results.get_cmb_power_spectra(cosmo_params, CMB_unit="muK", raw_cl=True)

    cmb = full_spectra[which]

    TT = cmb[:, 0]
    EE = cmb[:, 1]
    BB = cmb[:, 2]
    TE = cmb[:, 3]

    return TT, EE, BB, TE

get_effective_transfer_function(transfer_freq, W_maxL, binary_mask=None)

Computes the effective transfer function from the transfer functions at each frequency and the maximum likelihood results for the component separation operator Parameters


transfer_freq : np.ndarray Frequency transfer functions, shape (n_freq, 9 , 9, n_bins). Where 9 refers to the T, E, B, and their cross-spectra (xy->wz). W_maxL : np.ndarray Component separation operator, shape (ncomp, nfreq, n_stokes, n_pix). binary_mask : np.ndarray, optional Binary mask, shape (n_pix,). If provided, the effective transfer function is computed only over the observed pixels. Default is None, which means the effective transfer function is computed over all pixels. Returns


effective_transfer_function : np.ndarray Effective transfer function, shape (ncomp, ncomp, pol_spectra, pol_spectra ,nbin). pol_spectra refers EE, EB, BE, BB

Source code in src/megatop/utils/spectra.py
def get_effective_transfer_function(
    transfer_freq: NDArray, W_maxL: NDArray, binary_mask: NDArray | None = None
) -> NDArray:
    """
    Computes the effective transfer function from the transfer functions at each frequency and
    the maximum likelihood results for the component separation operator
    Parameters
    ----------
    transfer_freq : np.ndarray
        Frequency transfer functions, shape (n_freq, 9 , 9, n_bins). Where 9 refers to the T, E, B, and their cross-spectra (xy->wz).
    W_maxL : np.ndarray
        Component separation operator, shape (ncomp, nfreq, n_stokes, n_pix).
    binary_mask : np.ndarray, optional
        Binary mask, shape (n_pix,). If provided, the effective transfer function is computed
        only over the observed pixels. Default is None, which means the effective transfer function is computed
        over all pixels.
    Returns
    -------
    effective_transfer_function : np.ndarray
        Effective transfer function, shape (ncomp, ncomp, pol_spectra, pol_spectra ,nbin). pol_spectra refers EE, EB, BE, BB
    """

    #  Averaging W over observed pixels and stokes parameters:
    if binary_mask is None:
        pix_stokes_mean_W = np.mean(W_maxL, axis=(-2, -1))  # shape (ncomp, nfreq)
    else:
        pix_stokes_mean_W = np.mean(W_maxL[..., binary_mask], axis=(-2, -1))  # shape (ncomp, nfreq)

    pol_transfer = transfer_freq[:, -4:, -4:]  # Keeping only polarised components (EE, EB, BE, BB)
    # applying comp-sep operator on both sides of the transfer function
    normalisation_factor = np.einsum(
        "cf, fk -> ck", pix_stokes_mean_W, pix_stokes_mean_W.T
    )  # shape (ncomp, pol_spectra, n_bins)

    effective_transfer_function = np.einsum(
        "cf,fsdl,fk->cksdl", pix_stokes_mean_W, pol_transfer, pix_stokes_mean_W.T
    )  # shape (ncomp, pol_spectra, n_bins)

    # applying normalisation_factor to each comp x comp in effective_transfer_function
    # to get the effective transfer function
    normalized_effective_transfer_function = (
        effective_transfer_function
        / normalisation_factor[(...,) + (np.newaxis,) * (effective_transfer_function.ndim - 2)]
    )

    # inverting over polarised spectra space:
    inverse_effective_transfer_function = np.zeros_like(normalized_effective_transfer_function)
    for i in range(normalized_effective_transfer_function.shape[0]):
        for j in range(normalized_effective_transfer_function.shape[1]):
            for ell in range(normalized_effective_transfer_function.shape[-1]):
                # Inverting the transfer function for each ell over the spectra dimension
                inverse_effective_transfer_function[i, j, :, :, ell] = np.linalg.inv(
                    normalized_effective_transfer_function[i, j, :, :, ell]
                )

    return normalized_effective_transfer_function, inverse_effective_transfer_function

get_effective_transfer_function_WCl(transfer_freq, W_maxL, binary_mask=None)

Computes the effective transfer function from the transfer functions at each frequency and the maximum likelihood results for the component separation operator Parameters


transfer_freq : np.ndarray Frequency transfer functions, shape (n_freq, 9 , 9, n_bins). Where 9 refers to the T, E, B, and their cross-spectra (xy->wz). W_maxL : np.ndarray Component separation operator, shape (ncomp, nfreq, n_stokes, n_pix). binary_mask : np.ndarray, optional Binary mask, shape (n_pix,). If provided, the effective transfer function is computed only over the observed pixels. Default is None, which means the effective transfer function is computed over all pixels. Returns


effective_transfer_function : np.ndarray Effective transfer function, shape (ncomp, ncomp, pol_spectra, pol_spectra ,nbin). pol_spectra refers EE, EB, BE, BB

Source code in src/megatop/utils/spectra.py
def get_effective_transfer_function_WCl(
    transfer_freq: NDArray, W_maxL: NDArray, binary_mask: NDArray | None = None
) -> NDArray:
    """
    Computes the effective transfer function from the transfer functions at each frequency and
    the maximum likelihood results for the component separation operator
    Parameters
    ----------
    transfer_freq : np.ndarray
        Frequency transfer functions, shape (n_freq, 9 , 9, n_bins). Where 9 refers to the T, E, B, and their cross-spectra (xy->wz).
    W_maxL : np.ndarray
        Component separation operator, shape (ncomp, nfreq, n_stokes, n_pix).
    binary_mask : np.ndarray, optional
        Binary mask, shape (n_pix,). If provided, the effective transfer function is computed
        only over the observed pixels. Default is None, which means the effective transfer function is computed
        over all pixels.
    Returns
    -------
    effective_transfer_function : np.ndarray
        Effective transfer function, shape (ncomp, ncomp, pol_spectra, pol_spectra ,nbin). pol_spectra refers EE, EB, BE, BB
    """

    #  Averaging W over observed pixels and stokes parameters:
    if binary_mask is None:
        pix_stokes_mean_W = np.mean(W_maxL, axis=(-2, -1))  # shape (ncomp, nfreq)
    else:
        pix_stokes_mean_W = np.mean(W_maxL[..., binary_mask], axis=(-2, -1))  # shape (ncomp, nfreq)

    pol_transfer = transfer_freq[:, -4:, -4:]  # Keeping only polarised components (EE, EB, BE, BB)
    # applying comp-sep operator on both sides of the transfer function
    normalisation_factor = np.einsum(
        "cf, fk -> ck", pix_stokes_mean_W, pix_stokes_mean_W.T
    )  # shape (ncomp, pol_spectra, n_bins)

    effective_transfer_function = np.einsum(
        "cf,fsdl,fk->cksdl", pix_stokes_mean_W, pol_transfer, pix_stokes_mean_W.T
    )  # shape (ncomp, pol_spectra, n_bins)

    # applying normalisation_factor to each comp x comp in effective_transfer_function
    # to get the effective transfer function
    normalized_effective_transfer_function = (
        effective_transfer_function
        / normalisation_factor[(...,) + (np.newaxis,) * (effective_transfer_function.ndim - 2)]
    )

    # inverting over polarised spectra space:
    inverse_effective_transfer_function = np.zeros_like(normalized_effective_transfer_function)
    for i in range(normalized_effective_transfer_function.shape[0]):
        for j in range(normalized_effective_transfer_function.shape[1]):
            for ell in range(normalized_effective_transfer_function.shape[-1]):
                # Inverting the transfer function for each ell over the spectra dimension
                inverse_effective_transfer_function[i, j, :, :, ell] = np.linalg.inv(
                    normalized_effective_transfer_function[i, j, :, :, ell]
                )

    return normalized_effective_transfer_function, inverse_effective_transfer_function

limit_namaster_output(all_Cls, bin_index_lminlmax)

This function limits the output of namaster to the desired l range.

Parameters:

  • all_Cls (dict) –

    The dictionary containing the Cls computed by namaster.

  • bin_index_lminlmax (ndarray) –

    The indices of the bins corresponding to the desired l range.

Returns:

  • dict

    The dictionary containing the Cls computed by namaster, limited to the desired l range.

Source code in src/megatop/utils/spectra.py
def limit_namaster_output(all_Cls, bin_index_lminlmax):
    """
    This function limits the output of namaster to the desired l range.

    Args:
        all_Cls (dict): The dictionary containing the Cls computed by namaster.
        bin_index_lminlmax (ndarray): The indices of the bins corresponding to the desired l range.

    Returns:
        dict: The dictionary containing the Cls computed by namaster, limited to the desired l range.
    """
    all_Cls_limited = {}
    for key, value in all_Cls.items():
        all_Cls_limited[key] = value[..., bin_index_lminlmax]
    return all_Cls_limited

spectra_from_namaster(freq_noise_maps, mask_analysis, workspaceff, nmt_bins, compute_cross_freq=False, purify_e=False, purify_b=False, beam=None, return_all_spectra=False, lmax=None)

Computes the auto and cross-spectra from the frequency noise maps using NaMaster. Parameters


freq_noise_maps : np.ndarray Frequency noise maps, shape (n_freq, 3, n_pix). where 3 refers to the T, Q, U components. mask_analysis : np.ndarray Analysis mask, shape (n_pix,). workspaceff : nmt.NmtWorkspace NaMaster workspace for decoupling the spectra. nmt_bins : nmt.NmtBin NaMaster binning object for the spectra. compute_cross_freq : bool, optional Whether to compute cross-frequency spectra. Default is False. purify_e : bool, optional Whether to purify E-mode polarization. Default is False. purify_b : bool, optional Whether to purify B-mode polarization. Default is False. beam : np.ndarray, optional Beam correction factors, shape (n_freq, n_bins). If None, no beam correction is applied. Default is None. Returns


cl_decoupled_freq : np.ndarray Decoupled power spectra for each frequency, shape (n_freq, n_bins, 3). Where 3 refers to the T, E, B components. And T is set to zero. unbin_cl_decoupled_freq : np.ndarray Unbinned decoupled power spectra for each frequency, shape (n_freq, n_bins, 3). Where 3 refers to the T, E, B components. And T is set to zero.

Notes
  • The function computes the auto-spectra for each frequency noise map.
  • The T component is set to zero in the output spectra.
  • The EB cross-spectra are ignored in the output.
Source code in src/megatop/utils/spectra.py
def spectra_from_namaster(
    freq_noise_maps,
    mask_analysis,
    workspaceff,
    nmt_bins,
    compute_cross_freq=False,
    purify_e=False,
    purify_b=False,
    beam=None,
    return_all_spectra=False,
    lmax=None,
):
    """
    Computes the auto and cross-spectra from the frequency noise maps using NaMaster.
    Parameters
    ----------
    freq_noise_maps : np.ndarray
        Frequency noise maps, shape (n_freq, 3, n_pix).
        where 3 refers to the T, Q, U components.
    mask_analysis : np.ndarray
        Analysis mask, shape (n_pix,).
    workspaceff : nmt.NmtWorkspace
        NaMaster workspace for decoupling the spectra.
    nmt_bins : nmt.NmtBin
        NaMaster binning object for the spectra.
    compute_cross_freq : bool, optional
        Whether to compute cross-frequency spectra. Default is False.
    purify_e : bool, optional
        Whether to purify E-mode polarization. Default is False.
    purify_b : bool, optional
        Whether to purify B-mode polarization. Default is False.
    beam : np.ndarray, optional
        Beam correction factors, shape (n_freq, n_bins). If None, no beam correction is applied.
        Default is None.
    Returns
    -------
    cl_decoupled_freq : np.ndarray
        Decoupled power spectra for each frequency, shape (n_freq, n_bins, 3).
        Where 3 refers to the T, E, B components. And T is set to zero.
    unbin_cl_decoupled_freq : np.ndarray
        Unbinned decoupled power spectra for each frequency, shape (n_freq, n_bins, 3).
        Where 3 refers to the T, E, B components. And T is set to zero.

    Notes
    -----
    - The function computes the auto-spectra for each frequency noise map.
    - The T component is set to zero in the output spectra.
    - The EB cross-spectra are ignored in the output.
    """

    if compute_cross_freq:
        msg = "Cross-frequency spectra computation is not implemented yet"
        raise NotImplementedError(msg)

    if beam is not None and beam.shape[0] != freq_noise_maps.shape[0]:
        msg = f"Beam shape {beam.shape} does not match frequency noise maps shape {freq_noise_maps.shape}"
        raise ValueError(msg)

    # reset_workspace = True if workspaceff is None else False
    reset_workspace = (
        workspaceff is None
    )  # returns bool depending on whether workspaceff is None or not

    cl_decoupled_freq = []
    unbin_cl_decoupled_freq = []
    for f in range(freq_noise_maps.shape[0]):
        beam_f = beam[f] if beam is not None else None

        fields = nmt.NmtField(
            mask_analysis,
            freq_noise_maps[f, 1:],
            beam=beam_f,
            purify_e=purify_e,
            purify_b=purify_b,
            n_iter=10,
            lmax=lmax,
            lmax_mask=lmax,
        )
        if reset_workspace:
            workspaceff = nmt.NmtWorkspace.from_fields(fields, fields, nmt_bins)

        cl_coupled = nmt.compute_coupled_cell(fields, fields)
        cl_decoupled = workspaceff.decouple_cell(cl_coupled)
        unbin_cl_decoupled = nmt_bins.unbin_cell(cl_decoupled)

        if return_all_spectra:
            # Append the full decoupled and unbinned spectra
            cl_decoupled_freq.append(cl_decoupled)
            unbin_cl_decoupled_freq.append(unbin_cl_decoupled)
        else:
            # Keeping only the TT, EE, BB components, setting T to zero
            # Warning: we are ignoring the EB cross-spectra here
            cl_decoupled_freq.append([cl_decoupled[0] * 0, cl_decoupled[0], cl_decoupled[3]])
            unbin_cl_decoupled_freq.append(
                [unbin_cl_decoupled[0] * 0, unbin_cl_decoupled[0], unbin_cl_decoupled[3]]
            )

    return np.array(cl_decoupled_freq), np.array(unbin_cl_decoupled_freq)

megatop.utils.timer

Timer

Timer class to measure elapsed time.

A thread-safe singleton class that provides functionality to measure and log elapsed time. Can be used either as a context manager or with explicit start/stop calls.

Examples:

Using as context manager:

>>> with Timer(thread="my_operation"):
>>>     # code to time
>>>     ...

Using with explicit start/stop:

>>> timer = Timer(thread="my_operation")
>>> timer.start()
>>> # code to time
>>> timer.stop()
Source code in src/megatop/utils/timer.py
class Timer(metaclass=_SingletonReInit):
    """Timer class to measure elapsed time.

    A thread-safe singleton class that provides functionality to measure and log elapsed time.
    Can be used either as a context manager or with explicit start/stop calls.

    Examples:
        Using as context manager:

        >>> with Timer(thread="my_operation"):
        >>>     # code to time
        >>>     ...

        Using with explicit start/stop:

        >>> timer = Timer(thread="my_operation")
        >>> timer.start()
        >>> # code to time
        >>> timer.stop()

    """

    __slots__ = [
        "__dict__",
        "_context_latest_thread",
        "_context_manager_threads",
        "_thread_starts",
    ]

    _lock_init: bool = False

    def __init__(self, thread: str = "default") -> None:
        """Initialize a new Timer instance.

        Args:
            thread: Name to identify this timer instance.
        """
        if not self._lock_init:
            # initialize dict and queue only once
            self._thread_starts: dict[str, int] = {}
            self._context_threads: LifoQueue[str] = LifoQueue()
            self._lock_init = True
        self._context_latest_thread = thread

    def __enter__(self) -> "Timer":
        """Enter the Timer context manager and start timing.

        This method is automatically called when entering a context manager block ('with' statement).
        It adds the current thread to the context threads queue and starts timing.

        Returns:
            The Timer instance for use in the context.
        """
        self._context_threads.put(self._context_latest_thread)
        self.start(self._context_latest_thread)
        return self

    def __exit__(self, *_exc_info) -> None:
        """Exit the Timer context manager and stop timing.

        This method is automatically called when exiting a context manager block ('with' statement).
        It stops the timer for the current thread and logs the elapsed time.
        """
        last_context_manager_thread = self._context_threads.get()
        self.stop(last_context_manager_thread)

    def start(self, thread: str = "default") -> None:
        """Start the Timer with given name.

        Should always be followed by a stop() call later in the code.

        Args:
            thread: Name of the timer to start.

        Raises:
            ValueError: when timer with given name already exists.
        """
        if thread in self._thread_starts:
            msg = f"timer {thread!r} already exists"
            raise ValueError(msg)

        self._thread_starts[thread] = time.perf_counter_ns()

    def stop(self, thread: str = "default") -> None:
        """Stop the Timer with given name and log the time elapsed since the start() call.

        Args:
            thread: Name of the timer to stop.

        Raises:
            ValueError: when timer with given name does not exist.
        """
        if thread not in self._thread_starts:
            msg = f"timer {thread!r} does not exist"
            raise ValueError(msg)

        start_time_ns = self._thread_starts.pop(thread)
        elapsed_ns = time.perf_counter_ns() - start_time_ns
        msg = f"Elapsed time [{thread}]: {self._format_nanoseconds(elapsed_ns)}"
        logger.info(msg)

    @classmethod
    def _format_nanoseconds(cls, ns: int) -> str:
        # when possible, bypass divmod for performance
        us, ns = divmod(ns, 1000) if ns > 1000 else (0, ns)
        ms, us = divmod(us, 1000) if us > 1000 else (0, us)
        ss, ms = divmod(ms, 1000) if ms > 1000 else (0, ms)
        mm, ss = divmod(ss, 60) if ss > 60 else (0, ss)
        hh, mm = divmod(mm, 60) if mm > 60 else (0, mm)
        if hh > 0:
            # 1 h 02 m 03 s
            return f"{hh} h {mm:02} m {ss:02} s"
        if mm > 0:
            # 2 m 03 s
            return f"{mm} m {ss:02} s"
        if ss > 0:
            # 3.045 s
            return f"{ss}.{ms:03} s"
        if ms > 0:
            # 45.006 ms
            return f"{ms}.{us:03} ms"
        if us > 0:
            # 6.078 us
            return f"{us}.{ns:03} us"
        # sub-microsecond, just print nanoseconds
        return f"{ns} ns"

__enter__()

Enter the Timer context manager and start timing.

This method is automatically called when entering a context manager block ('with' statement). It adds the current thread to the context threads queue and starts timing.

Returns:

  • Timer

    The Timer instance for use in the context.

Source code in src/megatop/utils/timer.py
def __enter__(self) -> "Timer":
    """Enter the Timer context manager and start timing.

    This method is automatically called when entering a context manager block ('with' statement).
    It adds the current thread to the context threads queue and starts timing.

    Returns:
        The Timer instance for use in the context.
    """
    self._context_threads.put(self._context_latest_thread)
    self.start(self._context_latest_thread)
    return self

__exit__(*_exc_info)

Exit the Timer context manager and stop timing.

This method is automatically called when exiting a context manager block ('with' statement). It stops the timer for the current thread and logs the elapsed time.

Source code in src/megatop/utils/timer.py
def __exit__(self, *_exc_info) -> None:
    """Exit the Timer context manager and stop timing.

    This method is automatically called when exiting a context manager block ('with' statement).
    It stops the timer for the current thread and logs the elapsed time.
    """
    last_context_manager_thread = self._context_threads.get()
    self.stop(last_context_manager_thread)

__init__(thread='default')

Initialize a new Timer instance.

Parameters:

  • thread (str, default: 'default' ) –

    Name to identify this timer instance.

Source code in src/megatop/utils/timer.py
def __init__(self, thread: str = "default") -> None:
    """Initialize a new Timer instance.

    Args:
        thread: Name to identify this timer instance.
    """
    if not self._lock_init:
        # initialize dict and queue only once
        self._thread_starts: dict[str, int] = {}
        self._context_threads: LifoQueue[str] = LifoQueue()
        self._lock_init = True
    self._context_latest_thread = thread

start(thread='default')

Start the Timer with given name.

Should always be followed by a stop() call later in the code.

Parameters:

  • thread (str, default: 'default' ) –

    Name of the timer to start.

Raises:

  • ValueError

    when timer with given name already exists.

Source code in src/megatop/utils/timer.py
def start(self, thread: str = "default") -> None:
    """Start the Timer with given name.

    Should always be followed by a stop() call later in the code.

    Args:
        thread: Name of the timer to start.

    Raises:
        ValueError: when timer with given name already exists.
    """
    if thread in self._thread_starts:
        msg = f"timer {thread!r} already exists"
        raise ValueError(msg)

    self._thread_starts[thread] = time.perf_counter_ns()

stop(thread='default')

Stop the Timer with given name and log the time elapsed since the start() call.

Parameters:

  • thread (str, default: 'default' ) –

    Name of the timer to stop.

Raises:

  • ValueError

    when timer with given name does not exist.

Source code in src/megatop/utils/timer.py
def stop(self, thread: str = "default") -> None:
    """Stop the Timer with given name and log the time elapsed since the start() call.

    Args:
        thread: Name of the timer to stop.

    Raises:
        ValueError: when timer with given name does not exist.
    """
    if thread not in self._thread_starts:
        msg = f"timer {thread!r} does not exist"
        raise ValueError(msg)

    start_time_ns = self._thread_starts.pop(thread)
    elapsed_ns = time.perf_counter_ns() - start_time_ns
    msg = f"Elapsed time [{thread}]: {self._format_nanoseconds(elapsed_ns)}"
    logger.info(msg)

function_timer(thread=None)

Function decorator to time a function.

Wraps a function with a Timer context manager to measure and log its execution time.

Parameters:

  • thread (str | None, default: None ) –

    Optional thread name to distinguish timer. If None, uses function name with arguments as thread name.

Returns:

  • Callable

    A wrapped function that logs timing information when called.

Source code in src/megatop/utils/timer.py
def function_timer(thread: str | None = None):
    """Function decorator to time a function.

    Wraps a function with a Timer context manager to measure and log its execution time.

    Args:
        thread (str | None): Optional thread name to distinguish timer.
            If None, uses function name with arguments as thread name.

    Returns:
        Callable: A wrapped function that logs timing information when called.
    """

    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            thread_name = thread or _get_function_with_arguments_as_thread_name(func, args, kwargs)
            with Timer(thread=thread_name):
                return func(*args, **kwargs)

        return wrapper

    return decorator

megatop.utils.utils

MemoryUsage(message='')

' Prints the memory usage of the current process.

Parameters

args : argparse.Namespace The arguments from the command line. In particular looks for the verbose flag. If false, the function does nothing. message : str, optional The message to print. The default is ''.

Returns

None

Source code in src/megatop/utils/utils.py
def MemoryUsage(message: str = "") -> None:
    """'
    Prints the memory usage of the current process.

    Parameters
    ----------
    args : argparse.Namespace
        The arguments from the command line.
        In particular looks for the verbose flag. If false, the function does nothing.
    message : str, optional
        The message to print. The default is ''.

    Returns
    -------
    None

    """
    current, peak = tracemalloc.get_traced_memory()
    message_all = (
        message + f"Current memory usage is {current / 10**6}MB; Peak was {peak / 10**6}MB"
    )
    logger.info(message_all)