hipscatalog_gen.pipeline package
Submodules
hipscatalog_gen.pipeline.common module
Shared pipeline steps for input handling, densmaps, and outputs.
- build_and_prepare_input(cfg, diag_ctx, log_fn, persist_ddfs)[source]
Load inputs, validate RA/DEC, repartition, and persist when needed.
- Parameters:
cfg (Any) – Parsed configuration object.
diag_ctx – Diagnostics context factory (label -> context manager).
log_fn – Logging callback.
persist_ddfs (bool) – Whether to persist the input collection in memory.
- Returns:
ddf: Dask-like collection ready for downstream stages.
RA_NAME / DEC_NAME: Resolved column names for coordinates.
keep_cols: Ordered list of columns to keep.
is_hats: True when the input is an LSDB/HATS catalog.
paths: List of resolved input paths.
- Return type:
Tuple containing
(ddf, RA_NAME, DEC_NAME, keep_cols, is_hats, paths)where
- compute_and_write_densmaps(ddf_sel, ra_col, dec_col, level_limit, out_dir, diag_ctx, log_fn=None)[source]
Compute density maps for all depths and write them to disk.
- Parameters:
ddf_sel (Any) – Dask-like collection with RA/DEC columns.
ra_col (str) – Name of the RA column (degrees).
dec_col (str) – Name of the DEC column (degrees).
level_limit (int) – Maximum HiPS order to compute.
out_dir (Path) – Output directory where FITS files are written.
diag_ctx – Diagnostics context factory (label -> context manager).
log_fn – Optional logging callback for progress updates.
- Returns:
Mapping of depth -> numpy array with counts per HEALPix pixel.
- Return type:
Dict[int, ndarray]
- compute_input_total(ddf, diag_ctx, log_fn, avoid_computes)[source]
Compute total number of input rows (post RA/DEC validation).
- Parameters:
ddf (Any) – Dask-like collection with validated RA/DEC.
diag_ctx – Diagnostics context factory (label -> context manager).
log_fn – Logging callback.
avoid_computes (bool) – Whether to avoid explicit
compute()when possible.
- Returns:
Total number of rows as an integer.
- Return type:
int
- write_counts_summaries(out_dir, level_limit, input_total, log_fn, precomputed_depth_totals)[source]
Build output counts from selection-stage precomputed depth totals.
- Parameters:
out_dir (Path)
level_limit (int)
input_total (int)
precomputed_depth_totals (Dict[str, int])
- Return type:
tuple[int, dict]
- write_common_static_products(out_dir, cfg, densmaps, keep_cols, ra_col, dec_col, paths, ddf)[source]
Write MOC, metadata.xml, and arguments echo.
- Parameters:
out_dir (Path) – Destination HiPS root directory.
cfg (Any) – Parsed configuration object.
densmaps (Dict[int, ndarray]) – Mapping depth -> densmap counts.
keep_cols (List[str]) – Ordered list of columns retained in outputs.
ra_col (str) – Name of the RA column.
dec_col (str) – Name of the DEC column.
paths (List[str]) – Resolved input paths.
ddf (Any) – Dask-like collection used to infer column dtypes.
- Return type:
None
- log_epilogue(out_dir, log_lines, t0, log_fn, write_process_log=True)[source]
Emit closing log lines and optionally persist process.log.
- Parameters:
out_dir (Path)
log_lines (List[str])
t0 (float)
write_process_log (bool)
- Return type:
None
- log_prologue(cfg, out_dir, log_fn)[source]
Emit the initial pipeline log lines.
- Parameters:
cfg (Any)
out_dir (Path)
- Return type:
None
- write_tiles_with_allsky(out_dir, depth, header_line, ra_col, dec_col, counts, selected, order_desc, allsky_needed, log_fn)[source]
Finalize tiles and write optional Allsky.tsv.
- Parameters:
out_dir (Path)
depth (int)
header_line (str)
ra_col (str)
dec_col (str)
counts (ndarray)
selected (DataFrame)
order_desc (bool)
allsky_needed (bool)
- Return type:
tuple[dict[int, int] | None, DataFrame | None]
- maybe_persist_ddf(ddf_like, should_persist, diag_ctx, log_fn, *, log_prefix, diag_label=None, reason=None)[source]
Persist a Dask collection when requested, logging and awaiting completion.
- Parameters:
ddf_like (Any)
should_persist (bool)
log_prefix (str)
diag_label (str | None)
reason (str | None)
hipscatalog_gen.pipeline.logging_utils module
Structured logging utilities for the pipeline.
- class LogContext(stage=None, depth=None)[source]
Bases:
objectMutable context for structured logging fields.
- Parameters:
stage (str | None)
depth (int | None)
- stage: str | None = None
- depth: int | None = None
- setup_structured_logger(out_dir, selection_mode, *, json_logs=False)[source]
Configure a structured logger that writes to stdout and process.log; optionally JSON lines.
- Parameters:
out_dir (Path)
selection_mode (str)
json_logs (bool)
- Return type:
tuple[LogContext, Callable[[str, bool], None]]
hipscatalog_gen.pipeline.main module
Central orchestration for the HiPS catalog pipeline.
This module wires configuration, cluster setup, input reading, densmap computation, and selection logic implemented in the submodules.
- Typical usage (library):
from hipscatalog_gen import load_config, run_pipeline cfg = load_config(“config.yaml”) run_pipeline(cfg)
- Command-line interface:
python -m hipscatalog_gen.cli –config config.yaml
- run_pipeline(cfg, *, json_logs=False)[source]
Run the full HiPS catalog generation pipeline.
- Parameters:
cfg (Config) – Parsed configuration object with input, algorithm, cluster, and output options.
json_logs (bool) – Whether to also emit structured JSON lines to
process.jsonl.
- Raises:
ValueError – If
output.out_diralready exists withoutoutput.overwriteset.ValueError – If
level_limitis outside the supported range [4, 11].ValueError – If the configured
selection_modeis unsupported.
- Return type:
None
hipscatalog_gen.pipeline.modes module
Registry of selection modes and helpers to resolve them by name.
- class SelectionMode(name, validate_fn, normalize_fn, prepare_fn, run_fn, description)[source]
Bases:
objectRegistry entry for a selection mode.
- Parameters:
name (str)
validate_fn (Callable[[Any], None])
normalize_fn (Callable[[...], Tuple[Any, Any]])
prepare_fn (Callable[[...], Any])
run_fn (Callable[[...], Any])
description (str)
- name: str
- validate_fn: Callable[[Any], None]
- normalize_fn: Callable[[...], Tuple[Any, Any]]
- prepare_fn: Callable[[...], Any]
- run_fn: Callable[[...], Any]
- description: str
hipscatalog_gen.pipeline.params module
Normalized parameter containers used by selection modes.
- class MagGlobalParams(mag_min, mag_max, sentinel=None)[source]
Bases:
objectNormalized parameters for mag_global selection.
- Parameters:
mag_min (float)
mag_max (float)
sentinel (float | None)
- mag_min: float
- mag_max: float
- sentinel: float | None = None
hipscatalog_gen.pipeline.structure module
Pipeline data structures and execution harness.
- class PipelineStage(name, fn, diag_label=None)[source]
Bases:
objectRepresents a single ordered pipeline step.
- Parameters:
name (str)
fn (Callable[[PipelineContext], PipelineContext | None])
diag_label (str | None)
- name: str
- fn: Callable[[PipelineContext], PipelineContext | None]
- diag_label: str | None
- class PipelineContext(cfg, out_dir, report_dir, log_fn, diag_ctx, persist_ddfs, avoid_computes, selection_mode, log_ctx=None, ddf=None, RA_NAME=None, DEC_NAME=None, keep_cols=None, is_hats=False, paths=None, input_total=None, remainder_ddf=None, densmaps=<factory>, total_written=None, selection_params=None, telemetry=<factory>)[source]
Bases:
objectShared state passed between pipeline stages.
- Parameters:
cfg (Config)
out_dir (Path)
report_dir (Path)
log_fn (Callable[[...], None])
diag_ctx (Callable[[str], ContextManager[Any]])
persist_ddfs (bool)
avoid_computes (bool)
selection_mode (str)
log_ctx (Any | None)
ddf (Any | None)
RA_NAME (str | None)
DEC_NAME (str | None)
keep_cols (List[str] | None)
is_hats (bool)
paths (List[str] | None)
input_total (int | None)
remainder_ddf (Any | None)
densmaps (Dict[int, ndarray])
total_written (int | None)
selection_params (Any | None)
telemetry (Dict[str, Any])
- out_dir: Path
- report_dir: Path
- log_fn: Callable[[...], None]
- diag_ctx: Callable[[str], ContextManager[Any]]
- persist_ddfs: bool
- avoid_computes: bool
- selection_mode: str
- log_ctx: Any | None
- ddf: Any | None
- RA_NAME: str | None
- DEC_NAME: str | None
- keep_cols: List[str] | None
- is_hats: bool
- paths: List[str] | None
- input_total: int | None
- remainder_ddf: Any | None
- densmaps: Dict[int, ndarray]
- total_written: int | None
- selection_params: Any | None
- telemetry: Dict[str, Any]
- run_stages(stages, ctx)[source]
Execute ordered pipeline stages, threading a context object.
- Parameters:
stages (Sequence[PipelineStage]) – Ordered sequence of
PipelineStageobjects.ctx (PipelineContext) – Initial pipeline context.
- Returns:
Final context returned after the last stage.
- Return type:
hipscatalog_gen.pipeline.validation module
Validation helpers for configuration blocks across selection modes.
- validate_mag_global_cfg(cfg)[source]
Surface mag_global config issues early.
- Parameters:
cfg (Any)
- Return type:
None
- validate_score_global_cfg(cfg)[source]
Surface score_global config issues early.
- Parameters:
cfg (Any)
- Return type:
None
Module contents
Pipeline coordination, validation, and logging utilities.
Imports are kept lazy here to avoid circular imports during documentation builds.