Skip to content

Mocks

megatop.pipeline.mocker.func_TF_sims(id_sim, manager, config, binary_mask, *, obsmat_funcs=None)

Generate pure E and pure B map with power law spectra for Transfer Function Computation.

Source code in src/megatop/pipeline/mocker.py
def func_TF_sims(
    id_sim: int,
    manager: DataManager,
    config: Config,
    binary_mask: NDArray,
    *,
    obsmat_funcs: dict | None = None,
) -> int:
    """Generate pure E and pure B map with power law spectra for Transfer Function Computation."""

    # Getting power law spectra
    logger.debug("Generating power law spectra for TF simulations")
    ell = np.arange(3 * config.nside + 500)
    pl_spectra = power_law_cl(
        ell,
        config.map_sim_pars.TF_power_law_amp,
        config.map_sim_pars.TF_power_law_delta_ell,
        config.map_sim_pars.TF_power_law_index,
    )
    pl_spectra_new_orderhp = np.array(
        [
            pl_spectra["TT"],
            pl_spectra["EE"],
            pl_spectra["BB"],
            pl_spectra["TE"],
            pl_spectra["EB"],
            pl_spectra["TB"],
        ]
    )

    # Generating alms from power law spectra
    alms_TEB = get_alms_from_cls(
        pl_spectra_new_orderhp, 3 * config.nside, seed=config.map_sim_pars.cmb_seed
    )

    logger.info(
        f"Sum of difference between TT and EE alms / len(alms): {np.sum(alms_TEB[0] - alms_TEB[1]) / len(alms_TEB[0])}"
    )
    logger.info(f"alms T = {alms_TEB[0]}")
    logger.info(f"alms E = {alms_TEB[1]}")
    logger.info(f"alms B = {alms_TEB[2]}")

    # Generating pure T, E and B maps from alms:
    map_pure_T, map_pure_E, map_pure_B = hu.alm2map(
        np.eye(3)[:, :, None] * alms_TEB, spin=[0, 2], nside=config.nside
    )

    unfiltered_freq_map_pure_T = np.array([map_pure_T] * len(config.frequencies))
    unfiltered_freq_map_pure_E = np.array([map_pure_E] * len(config.frequencies))
    unfiltered_freq_map_pure_B = np.array([map_pure_B] * len(config.frequencies))

    # apply filtering
    if obsmat_funcs is not None:
        with Timer("filter-pure-T-E-B-maps"):
            filtered_freq_map_pure_T = unfiltered_freq_map_pure_T.copy()
            filtered_freq_map_pure_E = unfiltered_freq_map_pure_E.copy()
            filtered_freq_map_pure_B = unfiltered_freq_map_pure_B.copy()

            for i_f, (key, func) in enumerate(obsmat_funcs.items()):
                logger.debug(f"Filtering {key} channel")
                filtered_freq_map_pure_T[i_f] = mock.apply_observation_matrix(
                    func, filtered_freq_map_pure_T[i_f]
                )
                filtered_freq_map_pure_E[i_f] = mock.apply_observation_matrix(
                    func, filtered_freq_map_pure_E[i_f]
                )
                filtered_freq_map_pure_B[i_f] = mock.apply_observation_matrix(
                    func, filtered_freq_map_pure_B[i_f]
                )
    else:
        msg_no_obsmat = (
            "No observation matrices provided for filtering. Please provide obsmat_funcs."
        )
        raise ValueError(msg_no_obsmat)

    # mask unobserved pixels
    _ = mask.apply_binary_mask(unfiltered_freq_map_pure_T, binary_mask, unseen=False)
    _ = mask.apply_binary_mask(unfiltered_freq_map_pure_E, binary_mask, unseen=False)
    _ = mask.apply_binary_mask(unfiltered_freq_map_pure_B, binary_mask, unseen=False)
    _ = mask.apply_binary_mask(filtered_freq_map_pure_T, binary_mask, unseen=False)
    _ = mask.apply_binary_mask(filtered_freq_map_pure_E, binary_mask, unseen=False)
    _ = mask.apply_binary_mask(filtered_freq_map_pure_B, binary_mask, unseen=False)

    # save results
    save_TFsims(
        manager,
        np.array(
            [unfiltered_freq_map_pure_T, unfiltered_freq_map_pure_E, unfiltered_freq_map_pure_B]
        ),
        np.array([filtered_freq_map_pure_T, filtered_freq_map_pure_E, filtered_freq_map_pure_B]),
        id_sim=id_sim,
    )

    return id_sim

megatop.pipeline.mocker.func_noise(manager, config, binary_mask, common_nhits_map, id_sim)

Generate a noise realization.

Source code in src/megatop/pipeline/mocker.py
def func_noise(
    manager: DataManager,
    config: Config,
    binary_mask: NDArray,
    common_nhits_map: NDArray,
    id_sim: int,
) -> int:
    """Generate a noise realization."""
    noise = get_noise(config, binary_mask, common_nhits_map, id_sim=id_sim)
    _ = mask.apply_binary_mask(noise, binary_mask, unseen=False)
    save_simu(manager, noise, id_sim=id_sim, is_noise=True)
    return id_sim

megatop.pipeline.mocker.func_signal(id_sim, manager, config, binary_mask, common_nhits_map, *, obsmat_funcs=None)

Generate a sky realization.

Source code in src/megatop/pipeline/mocker.py
def func_signal(
    id_sim: int,
    manager: DataManager,
    config: Config,
    binary_mask: NDArray,
    common_nhits_map: NDArray,
    *,
    obsmat_funcs: dict | None = None,
) -> int:
    """Generate a sky realization."""
    # construct passbands if necessary
    config.map_sets = passband.passband_constructor(
        config, manager, passband_int=config.map_sim_pars.passband_int
    )
    if config.map_sim_pars.passband_int:
        logger.info("Using passband-integration for the mocker step.")

    # generate the components
    cmb = get_cmb(manager, config, id_sim=id_sim)
    fg = get_foregrounds(config)
    noise = get_noise(
        config,
        binary_mask,
        common_nhits_map,
        id_sim=id_sim,
        extra_seed=config.map_sim_pars.cmb_seed,
    )

    # broadcast CMB to all frequencies
    sky = cmb[None, ...] + fg

    # apply beam and pixel window function correction
    with Timer("beam-freq-maps"):
        for i_f, _f in enumerate(config.frequencies):
            sky[i_f] = mock.beam_winpix_correction(
                config.nside, sky[i_f], config.beams[i_f], config.lmax
            )

    # apply filtering
    if obsmat_funcs is not None:
        with Timer("filter-freq-maps"):
            for i_f, (key, func) in enumerate(obsmat_funcs.items()):
                logger.debug(f"Filtering {key} channel")
                sky[i_f] = mock.apply_observation_matrix(func, sky[i_f])

    # add noise
    sky += noise

    # mask unobserved pixels
    _ = mask.apply_binary_mask(sky, binary_mask, unseen=False)

    # save results
    save_simu(manager, sky, id_sim=id_sim, is_noise=False)

    return id_sim

megatop.pipeline.mocker.main_noise()

Entry point for generating a single noise realization.

Source code in src/megatop/pipeline/mocker.py
def main_noise():
    """Entry point for generating a single noise realization."""
    parser = argparse.ArgumentParser(description="Generate a single noise realization")
    parser.add_argument("--config", type=Path, required=True, help="config file")
    parser.add_argument("--sim", type=int, required=True, help="simulation index to generate")
    parser.add_argument("--map-set", type=str, default=None, help="map set name to generate")

    args = parser.parse_args()
    config = Config.load_yaml(args.config)
    if args.map_set is not None:
        config.map_sets = [ms for ms in config.map_sets if ms.name == args.map_set]
    manager = DataManager(config)
    manager.create_output_dirs(config.map_sim_pars.n_sim, config.noise_sim_pars.n_sim)

    binary_mask = hp.read_map(manager.path_to_binary_mask)
    common_nhits_map = hp.read_map(manager.path_to_common_nhits_map)
    func_noise(manager, config, binary_mask, common_nhits_map, args.sim)

megatop.pipeline.mocker.main_signal()

Entry point for generating a single sky realization.

Source code in src/megatop/pipeline/mocker.py
def main_signal():
    """Entry point for generating a single sky realization."""
    parser = argparse.ArgumentParser(description="Generate a single sky realization")
    parser.add_argument("--config", type=Path, required=True, help="config file")
    parser.add_argument("--sim", type=int, required=True, help="simulation index to generate")
    parser.add_argument("--map-set", type=str, default=None, help="map set name to generate")

    args = parser.parse_args()
    config = Config.load_yaml(args.config)
    if args.map_set is not None:
        config.map_sets = [ms for ms in config.map_sets if ms.name == args.map_set]
    manager = DataManager(config)
    manager.create_output_dirs(config.map_sim_pars.n_sim, config.noise_sim_pars.n_sim)

    binary_mask = hp.read_map(manager.path_to_binary_mask)
    common_nhits_map = hp.read_map(manager.path_to_common_nhits_map)
    func_signal(args.sim, manager, config, binary_mask, common_nhits_map)

megatop.pipeline.mocker.process_TF_sims(config, manager, comm)

Generate pure T, E and pure B map with power law spectra for Transfer Function Computation.

Source code in src/megatop/pipeline/mocker.py
def process_TF_sims(config: Config, manager: DataManager, comm: Comm):
    """Generate pure T, E and pure B map with power law spectra for Transfer Function Computation."""
    rank = 0 if comm is None else comm.Get_rank()
    n_sim = config.map_sim_pars.TF_n_sim

    if n_sim == 0:
        return

    if rank == 0:
        logger.info(f"Generating {n_sim} TF simulations")

    # Load necessary data
    binary_mask = hp.read_map(manager.path_to_binary_mask)

    func = partial(
        func_TF_sims,
        manager=manager,
        config=config,
        binary_mask=binary_mask,
    )

    if filtering := config.map_sim_pars.filter_sims:
        # Load the obsmat(s) for our map set(s)
        obsmat_funcs = load_obsmat(manager, config)
        func = partial(func, obsmat_funcs=obsmat_funcs)

    for result in _map(func, range(n_sim), comm, force_seq=filtering):
        logger.info(f"Finished TF simulation {result + 1} / {n_sim}")

megatop.pipeline.mocker.save_TFsims(manager, unfiltered_maps_TEB, filtered_maps_TEB, id_sim=None)

Save an unfiltered and filtered TF realization.

Source code in src/megatop/pipeline/mocker.py
@function_timer("save-TFsims")
def save_TFsims(
    manager: DataManager,
    unfiltered_maps_TEB: NDArray,
    filtered_maps_TEB: NDArray,
    id_sim: int | None = None,
) -> None:
    """Save an unfiltered and filtered TF realization."""
    # get appropriate filenames based on type
    filenames_unfiltered, filenames_filtered = manager.get_maps_sim_for_TF_filenames(id_sim)

    # save the maps
    for f in range(len(filenames_unfiltered)):  # loop over frequencies
        for j, s in enumerate(["T", "E", "B"]):  # loop over pure T, E, B
            msg = f"Saving unfiltered TF pure_{s} simulation"
            logger.info(f"{msg} to {filenames_unfiltered[f][j]}")
            hp.write_map(
                filenames_unfiltered[f][j],
                unfiltered_maps_TEB[j, f],
                dtype=["float64", "float64", "float64"],
                overwrite=True,
            )
            msg = f"Saving filtered TF pure_{s} simulation"
            logger.info(f"{msg} to {filenames_filtered[f][j]}")
            hp.write_map(
                filenames_filtered[f][j],
                filtered_maps_TEB[j, f],
                dtype=["float64", "float64", "float64"],
                overwrite=True,
            )

megatop.pipeline.mocker.save_simu(manager, simulated_maps, id_sim=None, is_noise=False)

Save a sky realization.

Source code in src/megatop/pipeline/mocker.py
@function_timer("save-simu")
def save_simu(
    manager: DataManager,
    simulated_maps: NDArray,
    id_sim: int | None = None,
    is_noise: bool = False,
) -> None:
    """Save a sky realization."""
    # get appropriate filenames based on type
    filenames = (
        manager.get_noise_maps_filenames(id_sim) if is_noise else manager.get_maps_filenames(id_sim)
    )

    # save the maps
    for i, fname in enumerate(filenames):
        msg = "Saving noise simulation" if is_noise else "Saving simulated sky"
        logger.debug(f"{msg} to {fname}")
        hp.write_map(
            fname,
            simulated_maps[i],
            dtype=["float64", "float64", "float64"],
            overwrite=True,
        )