hipscatalog_gen.pipeline package

Submodules

hipscatalog_gen.pipeline.common module

Shared pipeline steps for input handling, densmaps, and outputs.

build_and_prepare_input(cfg, diag_ctx, log_fn, persist_ddfs)[source]

Load inputs, validate RA/DEC, repartition, and persist when needed.

Parameters:
  • cfg (Any) – Parsed configuration object.

  • diag_ctx – Diagnostics context factory (label -> context manager).

  • log_fn – Logging callback.

  • persist_ddfs (bool) – Whether to persist the input collection in memory.

Returns:

  • ddf: Dask-like collection ready for downstream stages.

  • RA_NAME / DEC_NAME: Resolved column names for coordinates.

  • keep_cols: Ordered list of columns to keep.

  • is_hats: True when the input is an LSDB/HATS catalog.

  • paths: List of resolved input paths.

Return type:

Tuple containing (ddf, RA_NAME, DEC_NAME, keep_cols, is_hats, paths) where

compute_and_write_densmaps(ddf_sel, ra_col, dec_col, level_limit, out_dir, diag_ctx, log_fn=None)[source]

Compute density maps for all depths and write them to disk.

Parameters:
  • ddf_sel (Any) – Dask-like collection with RA/DEC columns.

  • ra_col (str) – Name of the RA column (degrees).

  • dec_col (str) – Name of the DEC column (degrees).

  • level_limit (int) – Maximum HiPS order to compute.

  • out_dir (Path) – Output directory where FITS files are written.

  • diag_ctx – Diagnostics context factory (label -> context manager).

  • log_fn – Optional logging callback for progress updates.

Returns:

Mapping of depth -> numpy array with counts per HEALPix pixel.

Return type:

Dict[int, ndarray]

compute_input_total(ddf, diag_ctx, log_fn, avoid_computes)[source]

Compute total number of input rows (post RA/DEC validation).

Parameters:
  • ddf (Any) – Dask-like collection with validated RA/DEC.

  • diag_ctx – Diagnostics context factory (label -> context manager).

  • log_fn – Logging callback.

  • avoid_computes (bool) – Whether to avoid explicit compute() when possible.

Returns:

Total number of rows as an integer.

Return type:

int

write_counts_summaries(out_dir, level_limit, input_total, log_fn, precomputed_depth_totals)[source]

Build output counts from selection-stage precomputed depth totals.

Parameters:
  • out_dir (Path)

  • level_limit (int)

  • input_total (int)

  • precomputed_depth_totals (Dict[str, int])

Return type:

tuple[int, dict]

write_common_static_products(out_dir, cfg, densmaps, keep_cols, ra_col, dec_col, paths, ddf)[source]

Write MOC, metadata.xml, and arguments echo.

Parameters:
  • out_dir (Path) – Destination HiPS root directory.

  • cfg (Any) – Parsed configuration object.

  • densmaps (Dict[int, ndarray]) – Mapping depth -> densmap counts.

  • keep_cols (List[str]) – Ordered list of columns retained in outputs.

  • ra_col (str) – Name of the RA column.

  • dec_col (str) – Name of the DEC column.

  • paths (List[str]) – Resolved input paths.

  • ddf (Any) – Dask-like collection used to infer column dtypes.

Return type:

None

log_epilogue(out_dir, log_lines, t0, log_fn, write_process_log=True)[source]

Emit closing log lines and optionally persist process.log.

Parameters:
  • out_dir (Path)

  • log_lines (List[str])

  • t0 (float)

  • write_process_log (bool)

Return type:

None

log_prologue(cfg, out_dir, log_fn)[source]

Emit the initial pipeline log lines.

Parameters:
  • cfg (Any)

  • out_dir (Path)

Return type:

None

write_tiles_with_allsky(out_dir, depth, header_line, ra_col, dec_col, counts, selected, order_desc, allsky_needed, log_fn)[source]

Finalize tiles and write optional Allsky.tsv.

Parameters:
  • out_dir (Path)

  • depth (int)

  • header_line (str)

  • ra_col (str)

  • dec_col (str)

  • counts (ndarray)

  • selected (DataFrame)

  • order_desc (bool)

  • allsky_needed (bool)

Return type:

tuple[dict[int, int] | None, DataFrame | None]

maybe_persist_ddf(ddf_like, should_persist, diag_ctx, log_fn, *, log_prefix, diag_label=None, reason=None)[source]

Persist a Dask collection when requested, logging and awaiting completion.

Parameters:
  • ddf_like (Any)

  • should_persist (bool)

  • log_prefix (str)

  • diag_label (str | None)

  • reason (str | None)

hipscatalog_gen.pipeline.logging_utils module

Structured logging utilities for the pipeline.

class LogContext(stage=None, depth=None)[source]

Bases: object

Mutable context for structured logging fields.

Parameters:
  • stage (str | None)

  • depth (int | None)

stage: str | None = None
depth: int | None = None
setup_structured_logger(out_dir, selection_mode, *, json_logs=False)[source]

Configure a structured logger that writes to stdout and process.log; optionally JSON lines.

Parameters:
  • out_dir (Path)

  • selection_mode (str)

  • json_logs (bool)

Return type:

tuple[LogContext, Callable[[str, bool], None]]

hipscatalog_gen.pipeline.main module

Central orchestration for the HiPS catalog pipeline.

This module wires configuration, cluster setup, input reading, densmap computation, and selection logic implemented in the submodules.

Typical usage (library):

from hipscatalog_gen import load_config, run_pipeline cfg = load_config(“config.yaml”) run_pipeline(cfg)

Command-line interface:

python -m hipscatalog_gen.cli –config config.yaml

run_pipeline(cfg, *, json_logs=False)[source]

Run the full HiPS catalog generation pipeline.

Parameters:
  • cfg (Config) – Parsed configuration object with input, algorithm, cluster, and output options.

  • json_logs (bool) – Whether to also emit structured JSON lines to process.jsonl.

Raises:
  • ValueError – If output.out_dir already exists without output.overwrite set.

  • ValueError – If level_limit is outside the supported range [4, 11].

  • ValueError – If the configured selection_mode is unsupported.

Return type:

None

hipscatalog_gen.pipeline.modes module

Registry of selection modes and helpers to resolve them by name.

class SelectionMode(name, validate_fn, normalize_fn, prepare_fn, run_fn, description)[source]

Bases: object

Registry entry for a selection mode.

Parameters:
  • name (str)

  • validate_fn (Callable[[Any], None])

  • normalize_fn (Callable[[...], Tuple[Any, Any]])

  • prepare_fn (Callable[[...], Any])

  • run_fn (Callable[[...], Any])

  • description (str)

name: str
validate_fn: Callable[[Any], None]
normalize_fn: Callable[[...], Tuple[Any, Any]]
prepare_fn: Callable[[...], Any]
run_fn: Callable[[...], Any]
description: str
get_selection_mode(name)[source]

Return a selection mode entry, raising on unsupported names.

Parameters:

name (str) – Selection mode identifier (e.g., mag_global).

Returns:

SelectionMode instance with validators and handlers.

Raises:

ValueError – If the name is not registered.

Return type:

SelectionMode

hipscatalog_gen.pipeline.params module

Normalized parameter containers used by selection modes.

class MagGlobalParams(mag_min, mag_max, sentinel=None)[source]

Bases: object

Normalized parameters for mag_global selection.

Parameters:
  • mag_min (float)

  • mag_max (float)

  • sentinel (float | None)

mag_min: float
mag_max: float
sentinel: float | None = None
class ScoreGlobalParams(score_min, score_max, sentinel=None)[source]

Bases: object

Normalized parameters for score_global selection.

Parameters:
  • score_min (float)

  • score_max (float)

  • sentinel (float | None)

score_min: float
score_max: float
sentinel: float | None = None
class ScoreDensityHybridParams(score_min, score_max, sentinel=None)[source]

Bases: object

Normalized parameters for score_density_hybrid selection.

Parameters:
  • score_min (float)

  • score_max (float)

  • sentinel (float | None)

score_min: float
score_max: float
sentinel: float | None = None

hipscatalog_gen.pipeline.structure module

Pipeline data structures and execution harness.

class PipelineStage(name, fn, diag_label=None)[source]

Bases: object

Represents a single ordered pipeline step.

Parameters:
name: str
fn: Callable[[PipelineContext], PipelineContext | None]
diag_label: str | None
class PipelineContext(cfg, out_dir, report_dir, log_fn, diag_ctx, persist_ddfs, avoid_computes, selection_mode, log_ctx=None, ddf=None, RA_NAME=None, DEC_NAME=None, keep_cols=None, is_hats=False, paths=None, input_total=None, remainder_ddf=None, densmaps=<factory>, total_written=None, selection_params=None, telemetry=<factory>)[source]

Bases: object

Shared state passed between pipeline stages.

Parameters:
  • cfg (Config)

  • out_dir (Path)

  • report_dir (Path)

  • log_fn (Callable[[...], None])

  • diag_ctx (Callable[[str], ContextManager[Any]])

  • persist_ddfs (bool)

  • avoid_computes (bool)

  • selection_mode (str)

  • log_ctx (Any | None)

  • ddf (Any | None)

  • RA_NAME (str | None)

  • DEC_NAME (str | None)

  • keep_cols (List[str] | None)

  • is_hats (bool)

  • paths (List[str] | None)

  • input_total (int | None)

  • remainder_ddf (Any | None)

  • densmaps (Dict[int, ndarray])

  • total_written (int | None)

  • selection_params (Any | None)

  • telemetry (Dict[str, Any])

cfg: Config
out_dir: Path
report_dir: Path
log_fn: Callable[[...], None]
diag_ctx: Callable[[str], ContextManager[Any]]
persist_ddfs: bool
avoid_computes: bool
selection_mode: str
log_ctx: Any | None
ddf: Any | None
RA_NAME: str | None
DEC_NAME: str | None
keep_cols: List[str] | None
is_hats: bool
paths: List[str] | None
input_total: int | None
remainder_ddf: Any | None
densmaps: Dict[int, ndarray]
total_written: int | None
selection_params: Any | None
telemetry: Dict[str, Any]
with_updates(**kwargs)[source]

Return a new context with updated fields.

Parameters:

kwargs (Any)

Return type:

PipelineContext

run_stages(stages, ctx)[source]

Execute ordered pipeline stages, threading a context object.

Parameters:
Returns:

Final context returned after the last stage.

Return type:

PipelineContext

hipscatalog_gen.pipeline.validation module

Validation helpers for configuration blocks across selection modes.

validate_mag_global_cfg(cfg)[source]

Surface mag_global config issues early.

Parameters:

cfg (Any)

Return type:

None

validate_score_global_cfg(cfg)[source]

Surface score_global config issues early.

Parameters:

cfg (Any)

Return type:

None

validate_score_density_hybrid_cfg(cfg)[source]

Surface score_density_hybrid config issues early.

Parameters:

cfg (Any)

Return type:

None

validate_common_cfg(cfg)[source]

Cross-field validation for cluster/output and shared algorithm settings.

Parameters:

cfg (Any)

Return type:

None

Module contents

Pipeline coordination, validation, and logging utilities.

Imports are kept lazy here to avoid circular imports during documentation builds.