workflow.scripts.plan_workflow

Create a Cylc workflow plan from a list of goals and stages to exclude.

This is the starting point for most workflow usages, and can be used to generate a base Cylc workflow to modify and extend.

   1"""Create a Cylc workflow plan from a list of goals and stages to exclude.
   2
   3This is the starting point for most workflow usages, and can be used
   4to generate a base Cylc workflow to modify and extend.
   5"""
   6
   7import tempfile
   8from collections.abc import Iterable
   9from enum import StrEnum
  10from pathlib import Path, PurePath
  11from typing import Annotated, Any, NamedTuple, Optional
  12
  13import jinja2
  14import networkx as nx
  15import printree
  16import tqdm
  17import typer
  18from pyvis.network import Network
  19
  20from workflow import realisations
  21from workflow.defaults import DefaultsVersion
  22
  23app = typer.Typer()
  24
  25
  26class WorkflowTarget(StrEnum):
  27    """Enumeration of possible workflow targets."""
  28
  29    NeSI = "nesi"
  30    Hypocentre = "hypocentre"
  31
  32
  33class StageIdentifier(StrEnum):
  34    """Valid stage identifier in the workflow plan."""
  35
  36    CopyInput = "copy_input"
  37    GCMTToRealisation = "gcmt_to_realisation"
  38    DomainGeneration = "generate_velocity_model_parameters"
  39    VelocityModelGeneration = "generate_velocity_model"
  40    StationSelection = "generate_station_coordinates"
  41    ModelCoordinates = "write_model_coordinates"
  42    SRFGeneration = "realisation_to_srf"
  43    CheckSRF = "check_srf"
  44    CopyDomainParameters = "copy_domain_parameters"
  45    EMOD3DParameters = "create_e3d_par"
  46    CheckDomain = "check_domain"
  47    StochGeneration = "generate_stoch"
  48    HighFrequency = "hf_sim"
  49    LowFrequency = "emod3d"
  50    Broadband = "bb_sim"
  51    IntensityMeasureCalculation = "im_calc"
  52    PlotTimeslices = "plot_ts"
  53    MergeTimeslices = "merge_ts"
  54    NSHMToRealisation = "nshm_to_realisation"
  55
  56
  57class Source(StrEnum):
  58    """Realisation source options."""
  59
  60    GCMT = "gcmt"
  61    NSHM = "nshm"
  62
  63
  64class GroupIdentifier(StrEnum):
  65    """Group identifiers to use to bulk target or exclude in workflow planning."""
  66
  67    Preprocessing = "preprocessing"
  68    """Alias for all preprocessing stages."""
  69    HighFrequency = "high_frequency"
  70    """Alias for the high frequency workflow."""
  71    LowFrequency = "low_frequency"
  72    """Alias for the low frequency workflow."""
  73    Domain = "domain"
  74
  75
  76GROUP_STAGES = {
  77    GroupIdentifier.Preprocessing: {
  78        StageIdentifier.DomainGeneration,
  79        StageIdentifier.VelocityModelGeneration,
  80        StageIdentifier.StationSelection,
  81        StageIdentifier.ModelCoordinates,
  82        StageIdentifier.SRFGeneration,
  83        StageIdentifier.EMOD3DParameters,
  84        StageIdentifier.NSHMToRealisation,
  85        StageIdentifier.GCMTToRealisation,
  86        StageIdentifier.StochGeneration,
  87        StageIdentifier.CopyDomainParameters,
  88    },
  89    GroupIdentifier.HighFrequency: {
  90        StageIdentifier.HighFrequency,
  91    },
  92    GroupIdentifier.LowFrequency: {StageIdentifier.LowFrequency},
  93    GroupIdentifier.Domain: {
  94        StageIdentifier.VelocityModelGeneration,
  95        StageIdentifier.StationSelection,
  96        StageIdentifier.DomainGeneration,
  97        StageIdentifier.CopyDomainParameters,
  98        StageIdentifier.ModelCoordinates,
  99    },
 100}
 101
 102GROUP_GOALS = {
 103    GroupIdentifier.Preprocessing: {
 104        StageIdentifier.EMOD3DParameters,
 105        StageIdentifier.StochGeneration,
 106    },
 107    GroupIdentifier.LowFrequency: {StageIdentifier.LowFrequency},
 108    GroupIdentifier.HighFrequency: {StageIdentifier.HighFrequency},
 109    GroupIdentifier.Domain: {
 110        StageIdentifier.VelocityModelGeneration,
 111        StageIdentifier.StationSelection,
 112        StageIdentifier.ModelCoordinates,
 113        StageIdentifier.CopyDomainParameters,
 114    },
 115}
 116
 117
 118class Stage(NamedTuple):
 119    """Representation of a workflow stage in the output Cylc file."""
 120
 121    identifier: StageIdentifier
 122    """The stage identifier."""
 123    event: str
 124    """The event the stage is running for."""
 125    sample: Optional[int]
 126    """The sample number of the realisation."""
 127
 128
 129class AnnotatedPath(PurePath):
 130    """Pure path annotated with description of the file."""
 131
 132    def __init__(self, path: str | Path, description: Optional[str] = None):
 133        """Create an annotated path.
 134
 135        Parameters
 136        ----------
 137        path : str | Path
 138            The path to annotate.
 139        description : Optional[str]
 140            The description of the path.
 141        """
 142        super().__init__(path)
 143        self.path = path
 144        self.description = description
 145
 146    def __hash__(self):
 147        """Construct a unique hash for Annotated Paths."""
 148        if super().name == "realisation.json":
 149            # Realisations should also have their annotations hashed.
 150            return hash((self.path, self.description))
 151        else:
 152            return super().__hash__()
 153
 154
 155def stage_inputs(
 156    stage: Stage,
 157    root: Path,
 158    required_realisation_sections: dict[tuple[str, Optional[int]], set[str]],
 159) -> set[AnnotatedPath]:
 160    """Return a list of stage inputs for the stage.
 161
 162    Parameters
 163    ----------
 164    stage : Stage
 165        The stage to get inputs for.
 166    root : Path
 167        The root directory of the simulation.
 168
 169
 170    Returns
 171    -------
 172    set[Path]
 173        A set of input paths required by the stage.
 174    """
 175    realisation_identifier = stage.event
 176    if stage.sample:
 177        realisation_identifier += f"_{stage.sample}"
 178    parent_directory = root / stage.event
 179    event_directory = root / realisation_identifier
 180    realisation_requirements = sorted(
 181        required_realisation_sections.get((stage.event, stage.sample), set())
 182    )
 183    realisation = {
 184        AnnotatedPath(
 185            event_directory / "realisation.json",
 186            f'Realisation file for event containing: {', '.join(realisation_requirements)}.',
 187        )
 188    }
 189    station_ll = AnnotatedPath(
 190        parent_directory / "stations" / "stations.ll",
 191        "lat,lon coordinates corresponding to x,y coordinates of stations in domain.",
 192    )
 193    s_wave = AnnotatedPath(
 194        parent_directory / "Velocity_Model" / "vs3dfile.s",
 195        "s-wave velocity file for velocity model (Section: Velocity Model Files).",
 196    )
 197    match stage:
 198        case Stage(
 199            identifier=StageIdentifier.SRFGeneration
 200            | StageIdentifier.VelocityModelGeneration
 201            | StageIdentifier.StationSelection
 202            | StageIdentifier.ModelCoordinates
 203        ):
 204            return realisation
 205        case Stage(identifier=StageIdentifier.CopyDomainParameters):
 206            return {
 207                AnnotatedPath(
 208                    parent_directory / "realisation.json",
 209                    f"Realisation file for event containing: {', '.join(required_realisation_sections.get((stage.event, None), set()))}.",
 210                )
 211            }
 212        case Stage(
 213            identifier=StageIdentifier.EMOD3DParameters
 214            | StageIdentifier.LowFrequency,
 215        ):
 216            return (
 217                realisation
 218                | {
 219                    station_ll,
 220                    AnnotatedPath(
 221                        parent_directory / "stations" / "stations.statcords",
 222                        "x,y coordinates of stations in domain.",
 223                    ),
 224                    AnnotatedPath(
 225                        parent_directory / "model" / "model_params",
 226                        "Model centre and corners for EMOD3D.",
 227                    ),
 228                    AnnotatedPath(
 229                        parent_directory / "model" / "grid_file",
 230                        "Domain extents for EMOD3D.",
 231                    ),
 232                    AnnotatedPath(
 233                        parent_directory / "Velocity_Model" / "rho3dfile.d",
 234                        "Density component file for velocity model. (Section: Velocity Model Files)",
 235                    ),
 236                    AnnotatedPath(
 237                        parent_directory / "Velocity_Model" / "vp3dfile.p",
 238                        "p-wave velocity file for velocity model. (Section: Velocity Model Files)",
 239                    ),
 240                    s_wave,
 241                    AnnotatedPath(
 242                        parent_directory / "Velocity_Model" / "in_basin_mask.b",
 243                        "Boolean basin mask (Section: Velocity Model Files).",
 244                    ),
 245                    AnnotatedPath(
 246                        event_directory / "realisation.srf",
 247                        "Slip model of source (Section: SRF Format).",
 248                    ),
 249                }
 250                | (
 251                    set()
 252                    if stage.identifier == StageIdentifier.EMOD3DParameters
 253                    else {
 254                        AnnotatedPath(
 255                            event_directory / "LF" / "e3d.par",
 256                            "EMOD3D Parameters (Section: https://wiki.canterbury.ac.nz/pages/viewpage.action?pageId=100794983)",
 257                        )
 258                    }
 259                )
 260            )
 261        case Stage(identifier=StageIdentifier.StochGeneration):
 262            return realisation | {
 263                AnnotatedPath(
 264                    event_directory / "realisation.srf",
 265                    "Slip model of source (Section: SRF Format).",
 266                )
 267            }
 268        case Stage(identifier=StageIdentifier.HighFrequency):
 269            return realisation | {
 270                AnnotatedPath(
 271                    event_directory / "realisation.stoch",
 272                    "Downsampled SRF for stochastic source model input (Section: Stoch format).",
 273                ),
 274                station_ll,
 275                s_wave,
 276            }
 277        case Stage(identifier=StageIdentifier.Broadband):
 278            return realisation | {
 279                station_ll,
 280                AnnotatedPath(
 281                    event_directory / "LF", "Low-frequency simulation directory."
 282                ),
 283                AnnotatedPath(
 284                    event_directory / "realisation.hf",
 285                    "High-frequency waveform file (Section: LF/HF/BB binary format).",
 286                ),
 287                s_wave,
 288            }
 289        case Stage(identifier=StageIdentifier.IntensityMeasureCalculation):
 290            return realisation | {
 291                AnnotatedPath(
 292                    event_directory / "realisation.bb",
 293                    "Broadband waveform file (Section: LF/HF/BB binary format).",
 294                )
 295            }
 296        case Stage(identifier=StageIdentifier.PlotTimeslices):
 297            return {
 298                AnnotatedPath(
 299                    event_directory / "LF" / "OutBin" / "output.e3d",
 300                    "Merged xyts-slices of low-frequency simulation (Section: XYTS.e3d binary format).",
 301                )
 302            }
 303        case Stage(identifier=StageIdentifier.MergeTimeslices):
 304            return {
 305                AnnotatedPath(
 306                    event_directory / "LF" / "OutBin",
 307                    "Component xyts-slices of low-frequency simulation, one per compute node (Section: XYTS.e3d binary format).",
 308                )
 309            }
 310    return set()
 311
 312
 313def stage_config_inputs(stage: Stage) -> set[str]:
 314    """Get the realisation configuration inputs for a given stage.
 315
 316    Parameters
 317    ----------
 318    stage : Stage
 319        The stage to get inputs for.
 320
 321
 322    Returns
 323    -------
 324    set[str]
 325        The input config sections for this stage.
 326    """
 327    input_dictionary = {
 328        StageIdentifier.EMOD3DParameters: {
 329            realisations.DomainParameters._config_key,
 330            realisations.VelocityModelParameters._config_key,
 331            realisations.RealisationMetadata._config_key,
 332        },
 333        StageIdentifier.Broadband: {
 334            realisations.RealisationMetadata._config_key,
 335            realisations.DomainParameters._config_key,
 336        },
 337        StageIdentifier.StationSelection: {realisations.DomainParameters._config_key},
 338        StageIdentifier.VelocityModelGeneration: {
 339            realisations.DomainParameters._config_key,
 340            realisations.RealisationMetadata._config_key,
 341        },
 342        StageIdentifier.HighFrequency: {
 343            realisations.DomainParameters._config_key,
 344            realisations.RealisationMetadata._config_key,
 345        },
 346        StageIdentifier.IntensityMeasureCalculation: {
 347            realisations.RealisationMetadata._config_key,
 348            realisations.BroadbandParameters._config_key,
 349        },
 350        StageIdentifier.DomainGeneration: {
 351            realisations.RealisationMetadata._config_key,
 352            realisations.RupturePropagationConfig._config_key,
 353            realisations.SourceConfig._config_key,
 354        },
 355        StageIdentifier.SRFGeneration: {
 356            realisations.RealisationMetadata._config_key,
 357            realisations.SourceConfig._config_key,
 358            realisations.RupturePropagationConfig._config_key,
 359        },
 360        StageIdentifier.ModelCoordinates: {realisations.DomainParameters._config_key},
 361        StageIdentifier.StochGeneration: {realisations.RealisationMetadata._config_key},
 362        StageIdentifier.StationSelection: {realisations.DomainParameters._config_key},
 363    }
 364    return input_dictionary.get(stage.identifier, set())
 365
 366
 367def stage_config_outputs(stage: Stage) -> set[str]:
 368    """Get the realisation configuration outputs for a given stage.
 369
 370    Parameters
 371    ----------
 372    stage : Stage
 373        The stage to get outputs for.
 374
 375
 376    Returns
 377    -------
 378    set[str]
 379        The output config sections for this stage.
 380    """
 381    output_dictionary = {
 382        StageIdentifier.NSHMToRealisation: {
 383            realisations.SourceConfig._config_key,
 384            realisations.RupturePropagationConfig._config_key,
 385            realisations.RealisationMetadata._config_key,
 386        },
 387        StageIdentifier.GCMTToRealisation: {
 388            realisations.SourceConfig._config_key,
 389            realisations.RupturePropagationConfig._config_key,
 390            realisations.RealisationMetadata._config_key,
 391        },
 392        StageIdentifier.EMOD3DParameters: {realisations.EMOD3DParameters._config_key},
 393        StageIdentifier.Broadband: {realisations.BroadbandParameters._config_key},
 394        StageIdentifier.VelocityModelGeneration: {
 395            realisations.VelocityModelParameters._config_key
 396        },
 397        StageIdentifier.HighFrequency: {realisations.HFConfig._config_key},
 398        StageIdentifier.IntensityMeasureCalculation: {
 399            realisations.IntensityMeasureCalcuationParameters._config_key
 400        },
 401        StageIdentifier.CopyDomainParameters: {
 402            realisations.VelocityModelParameters._config_key,
 403            realisations.DomainParameters._config_key,
 404        },
 405        StageIdentifier.DomainGeneration: {
 406            realisations.VelocityModelParameters._config_key,
 407            realisations.DomainParameters._config_key,
 408        },
 409        StageIdentifier.SRFGeneration: {realisations.SRFConfig._config_key},
 410        StageIdentifier.StochGeneration: {realisations.HFConfig._config_key},
 411    }
 412    return output_dictionary.get(stage.identifier, set())
 413
 414
 415def realisation_configuration_requirements(
 416    workflow_plan: nx.DiGraph, realisation: tuple[str, Optional[int]]
 417) -> set[str]:
 418    """Get the requirements for the realisation.json in this stage.
 419
 420    Parameters
 421    ----------
 422    workflow_plan : nx.DiGraph
 423        The overall workflow plane.
 424    realisation : tuple[str, Optional[int]]
 425        The realisation to retrieve requirements for.
 426
 427    Returns
 428    -------
 429    set[str]
 430        The realisation configuration sections that must be present
 431        for the workflow to run correctly.
 432    """
 433    stages = [
 434        stage
 435        for stage in workflow_plan.nodes
 436        if (stage.event, stage.sample) == realisation
 437    ]
 438    if stages:
 439        return set.union(*[stage_config_inputs(stage) for stage in stages]) - set.union(
 440            *[stage_config_outputs(stage) for stage in stages]
 441        )
 442    else:
 443        return set()
 444
 445
 446def stage_outputs(stage: Stage, root: Path) -> set[AnnotatedPath]:
 447    """Return a list of stage inputs for the stage.
 448
 449    Parameters
 450    ----------
 451    stage : Stage
 452        The stage to get inputs for.
 453    root : Path
 454        The root directory of the simulation.
 455
 456
 457    Returns
 458    -------
 459    set[Path]
 460        A set of input paths required by the stage.
 461    """
 462    realisation_identifier = stage.event
 463    if stage.sample:
 464        realisation_identifier += f"_{stage.sample}"
 465    event_directory = root / realisation_identifier
 466    realisation = {
 467        AnnotatedPath(
 468            event_directory / "realisation.json", "Realisation file for event."
 469        )
 470    }
 471    match stage:
 472        case Stage(
 473            identifier=StageIdentifier.NSHMToRealisation
 474            | StageIdentifier.GCMTToRealisation
 475        ):
 476            return realisation
 477        case Stage(identifier=StageIdentifier.SRFGeneration):
 478            return {
 479                AnnotatedPath(
 480                    event_directory / "realisation.srf",
 481                    "Slip model of source.",
 482                )
 483            }
 484        case Stage(identifier=StageIdentifier.ModelCoordinates):
 485            return {
 486                AnnotatedPath(
 487                    event_directory / "model" / "model_params",
 488                    "Model centre and corners for EMOD3D.",
 489                ),
 490                AnnotatedPath(
 491                    event_directory / "model" / "grid_file",
 492                    "Domain extents for EMOD3D.",
 493                ),
 494            }
 495        case Stage(identifier=StageIdentifier.StationSelection):
 496            return {
 497                AnnotatedPath(
 498                    event_directory / "stations" / "stations.ll",
 499                    "lat,lon coordinates corresponding to x,y coordinates of stations in domain.",
 500                ),
 501                AnnotatedPath(
 502                    event_directory / "stations" / "stations.statcords",
 503                    "x,y coordinates of stations in domain.",
 504                ),
 505            }
 506        case Stage(identifier=StageIdentifier.VelocityModelGeneration):
 507            return {
 508                AnnotatedPath(
 509                    event_directory / "Velocity_Model" / "rho3dfile.d",
 510                    "Density component file for velocity model.",
 511                ),
 512                AnnotatedPath(
 513                    event_directory / "Velocity_Model" / "vp3dfile.p",
 514                    "p-wave velocity file for velocity model.",
 515                ),
 516                AnnotatedPath(
 517                    event_directory / "Velocity_Model" / "vs3dfile.s",
 518                    "s-wave velocity file for velocity model.",
 519                ),
 520                AnnotatedPath(
 521                    event_directory / "Velocity_Model" / "in_basin_mask.b",
 522                    "Boolean basin mask.",
 523                ),
 524            }
 525        case Stage(identifier=StageIdentifier.EMOD3DParameters):
 526            return {
 527                AnnotatedPath(
 528                    event_directory / "LF", "Low-frequency simulation directory."
 529                ),
 530                AnnotatedPath(event_directory / "LF" / "e3d.par", "EMOD3D Parameters"),
 531            }
 532        case Stage(identifier=StageIdentifier.LowFrequency):
 533            return {
 534                AnnotatedPath(
 535                    event_directory / "LF", "Low-frequency simulation directory."
 536                )
 537            }
 538        case Stage(identifier=StageIdentifier.StochGeneration):
 539            return {
 540                AnnotatedPath(
 541                    event_directory / "realisation.stoch",
 542                    "Downsampled SRF for stochastic source model input.",
 543                )
 544            }
 545        case Stage(identifier=StageIdentifier.HighFrequency):
 546            return {
 547                AnnotatedPath(
 548                    event_directory / "realisation.hf", "High-frequency waveform file."
 549                )
 550            }
 551        case Stage(identifier=StageIdentifier.Broadband):
 552            return {
 553                AnnotatedPath(
 554                    event_directory / "realisation.bb", "Broadband waveform file."
 555                )
 556            }
 557        case Stage(identifier=StageIdentifier.IntensityMeasureCalculation):
 558            return {
 559                AnnotatedPath(
 560                    event_directory / "ims.parquet", "Intensity measure statistics."
 561                )
 562            }
 563        case Stage(identifier=StageIdentifier.PlotTimeslices):
 564            return {
 565                AnnotatedPath(
 566                    event_directory / "animation.mp4",
 567                    "Animation of low-frequency waveform.",
 568                )
 569            }
 570        case Stage(identifier=StageIdentifier.MergeTimeslices):
 571            return {
 572                AnnotatedPath(
 573                    event_directory / "LF" / "OutBin" / "output.e3d",
 574                    "Merged xyts-slices of low-frequency simulation.",
 575                )
 576            }
 577        case _:
 578            return set()
 579
 580
 581def realisation_workflow(event: str, sample: Optional[int]) -> nx.DiGraph:
 582    """Add a realisation to a workflow plan.
 583
 584    Adds all stages for the realisation to run, and links to event
 585    stages for shared resources (i.e. the velocity model).
 586
 587    Parameters
 588    ----------
 589    workflow_plan : nx.DiGraph
 590        The current workflow paln.
 591    event : str
 592        The event to add.
 593    sample : Optional[int]
 594        The sample number (or None, if the original event).
 595    """
 596    workflow_plan = nx.from_dict_of_lists(
 597        {
 598            Stage(StageIdentifier.NSHMToRealisation, event, sample): [
 599                Stage(StageIdentifier.SRFGeneration, event, sample),
 600            ],
 601            Stage(StageIdentifier.GCMTToRealisation, event, sample): [
 602                Stage(StageIdentifier.SRFGeneration, event, sample)
 603            ],
 604            Stage(StageIdentifier.SRFGeneration, event, sample): [
 605                Stage(StageIdentifier.CheckSRF, event, sample)
 606            ],
 607            Stage(StageIdentifier.CheckSRF, event, sample): [
 608                Stage(StageIdentifier.StochGeneration, event, sample),
 609                Stage(StageIdentifier.EMOD3DParameters, event, sample),
 610            ],
 611            Stage(StageIdentifier.VelocityModelGeneration, event, None): [
 612                Stage(StageIdentifier.EMOD3DParameters, event, sample),
 613                Stage(StageIdentifier.HighFrequency, event, sample),
 614            ],
 615            Stage(StageIdentifier.StationSelection, event, None): [
 616                Stage(StageIdentifier.EMOD3DParameters, event, sample),
 617                Stage(StageIdentifier.HighFrequency, event, sample),
 618            ],
 619            Stage(StageIdentifier.ModelCoordinates, event, None): [
 620                Stage(StageIdentifier.EMOD3DParameters, event, sample)
 621            ],
 622            Stage(StageIdentifier.EMOD3DParameters, event, sample): [
 623                Stage(StageIdentifier.CheckDomain, event, sample)
 624            ],
 625            Stage(StageIdentifier.CheckDomain, event, sample): [
 626                Stage(StageIdentifier.LowFrequency, event, sample)
 627            ],
 628            Stage(StageIdentifier.LowFrequency, event, sample): [
 629                Stage(StageIdentifier.Broadband, event, sample),
 630                Stage(StageIdentifier.MergeTimeslices, event, sample),
 631            ],
 632            Stage(StageIdentifier.StochGeneration, event, sample): [
 633                Stage(StageIdentifier.HighFrequency, event, sample)
 634            ],
 635            Stage(StageIdentifier.HighFrequency, event, sample): [
 636                Stage(StageIdentifier.Broadband, event, sample)
 637            ],
 638            Stage(StageIdentifier.Broadband, event, sample): [
 639                Stage(StageIdentifier.IntensityMeasureCalculation, event, sample)
 640            ],
 641            Stage(StageIdentifier.MergeTimeslices, event, sample): [
 642                Stage(StageIdentifier.PlotTimeslices, event, sample)
 643            ],
 644        },
 645        create_using=nx.DiGraph,
 646    )
 647    if not sample:
 648        workflow_plan.add_edges_from(
 649            [
 650                (
 651                    Stage(StageIdentifier.NSHMToRealisation, event, sample),
 652                    Stage(StageIdentifier.DomainGeneration, event, sample),
 653                ),
 654                (
 655                    Stage(StageIdentifier.GCMTToRealisation, event, sample),
 656                    Stage(StageIdentifier.DomainGeneration, event, sample),
 657                ),
 658                (
 659                    Stage(StageIdentifier.DomainGeneration, event, sample),
 660                    Stage(StageIdentifier.EMOD3DParameters, event, sample),
 661                ),
 662                (
 663                    Stage(StageIdentifier.DomainGeneration, event, sample),
 664                    Stage(StageIdentifier.VelocityModelGeneration, event, sample),
 665                ),
 666                (
 667                    Stage(StageIdentifier.DomainGeneration, event, sample),
 668                    Stage(StageIdentifier.StationSelection, event, sample),
 669                ),
 670                (
 671                    Stage(StageIdentifier.DomainGeneration, event, sample),
 672                    Stage(StageIdentifier.ModelCoordinates, event, sample),
 673                ),
 674            ]
 675        )
 676    else:
 677        workflow_plan.add_edges_from(
 678            [
 679                (
 680                    Stage(StageIdentifier.DomainGeneration, event, None),
 681                    Stage(StageIdentifier.CopyDomainParameters, event, sample),
 682                ),
 683                (
 684                    Stage(StageIdentifier.CopyDomainParameters, event, sample),
 685                    Stage(StageIdentifier.EMOD3DParameters, event, sample),
 686                ),
 687                (
 688                    Stage(StageIdentifier.CopyDomainParameters, event, sample),
 689                    Stage(StageIdentifier.HighFrequency, event, sample),
 690                ),
 691            ]
 692        )
 693
 694    return workflow_plan
 695
 696
 697def create_abstract_workflow_plan(
 698    realisations: set[tuple[str, Optional[int]]],
 699    goals: Iterable[StageIdentifier],
 700    excluding: Iterable[StageIdentifier],
 701) -> nx.DiGraph:
 702    """Create an abstract workflow graph from a list of goals and excluded stages.
 703
 704    Parameters
 705    ----------
 706    goals : Iterable[StageIdentifier]
 707        The goal stages for the workflow.
 708    excluding : Iterable[StageIdentifier]
 709        The excluded stages for the workflow.
 710
 711    Returns
 712    -------
 713    nx.DiGraph
 714        A abstract workflow plan. This workflow plan contains only
 715        included stages that are required to reach the goals. If two
 716        workflow stages depend on each other only through paths
 717        consisting entirely of excluded nodes, then they are adjacent
 718        directly in the abstract plan by edges.
 719    """
 720
 721    excluding_stages = {
 722        Stage(excluded, *realisation)
 723        for excluded in excluding
 724        for realisation in realisations
 725    }
 726
 727    output_graph = nx.DiGraph()
 728    realisation_iteration = (
 729        realisations if len(realisations) < 100 else tqdm.tqdm(realisations)
 730    )
 731
 732    for realisation in realisation_iteration:
 733        workflow_plan = realisation_workflow(*realisation)
 734        workflow_plan = nx.transitive_closure_dag(workflow_plan)
 735
 736        for goal in goals:
 737            reduced_graph = nx.transitive_reduction(
 738                workflow_plan.subgraph(
 739                    (
 740                        set(workflow_plan.predecessors(Stage(goal, *realisation)))
 741                        | {Stage(goal, *realisation)}
 742                    )
 743                    - excluding_stages
 744                )
 745            )
 746            output_graph.update(
 747                edges=reduced_graph.edges(), nodes=reduced_graph.nodes()
 748            )
 749
 750    roots = [node for node, degree in output_graph.in_degree() if degree == 0]
 751    copy_input_stage = Stage(StageIdentifier.CopyInput, "", None)
 752    output_graph.add_node(copy_input_stage)
 753    for root in roots:
 754        output_graph.add_edge(copy_input_stage, root)
 755    return output_graph
 756
 757
 758def stage_to_node_string(stage: Stage) -> str:
 759    r"""Convert a `Stage` into a human readable node identifier string.
 760
 761    Parameters
 762    ----------
 763    stage : Stage
 764        The stage to render.
 765
 766    Returns
 767    -------
 768    str
 769        A string of the format
 770        {stage.identifier}\n{stage.event}_{stage.sample}, if event and
 771        sample are non-trivial.
 772    """
 773    node_string = str(stage.identifier)
 774    if stage.event:
 775        node_string += f"\n{stage.event}"
 776    if stage.sample:
 777        node_string += f"_{stage.sample}"
 778    return node_string
 779
 780
 781def pyvis_graph(workflow_plan: nx.DiGraph) -> Network:
 782    """Convert a workflow plan into a pyvis diagram for visualisation.
 783
 784    Parameters
 785    ----------
 786    workflow_plan : nx.DiGraph
 787        The workflow plan to visualise.
 788
 789
 790    Returns
 791    -------
 792    Network
 793        A pyvis rendering for this workflow plan.
 794    """
 795    network = Network(
 796        width="100%", height="1500px", directed=True, layout="hierarchical"
 797    )
 798    network.show_buttons(filter_=["physics"])
 799    roots = [node for node, degree in workflow_plan.in_degree() if degree == 0]
 800    reversed_workflow = workflow_plan.reverse()
 801    stage: Stage
 802    for stage in workflow_plan.nodes():
 803        network.add_node(
 804            stage_to_node_string(stage),
 805            group=f"{stage.event}_{stage.sample or ''}",
 806            size=20,
 807            level=max(
 808                (
 809                    len(path) - 1
 810                    for root in roots
 811                    for path in nx.all_simple_paths(reversed_workflow, stage, root)
 812                ),
 813                default=0,
 814            ),
 815        )
 816    for stage, next_stage in workflow_plan.edges():
 817        network.add_edge(stage_to_node_string(stage), stage_to_node_string(next_stage))
 818    return network
 819
 820
 821REALISATION_ITERATION_RE = r"_rel\d+$"
 822
 823
 824def parse_realisation(realisation_id: str) -> set[tuple[str, Optional[int]]]:
 825    """Parse a realisation identifier string from the command line into a realisation identifier.
 826
 827    Parameters
 828    ----------
 829    realisation_id : str
 830        The realisation identifier string to parse.
 831
 832    Returns
 833    -------
 834    tuple[str, Optional[int]]
 835        The parsed realisation event and sample number.
 836    """
 837    try:
 838        index = realisation_id.rindex(":")
 839        event, num_samples = realisation_id[:index], realisation_id[index + 1 :]
 840
 841        return {(event, sample or None) for sample in range(int(num_samples))}
 842    except ValueError:
 843        return {(realisation_id, None)}
 844
 845
 846def build_filetree(files: set[AnnotatedPath]) -> dict[str, Any]:
 847    """Build a file tree from a set of annotated file paths.
 848
 849    Parameters
 850    ----------
 851    files : set[AnnotatedPath]
 852        The set of files to construct a tree for.
 853
 854
 855    Returns
 856    -------
 857    dict[str, Any]
 858        A file tree.
 859    """
 860    filetree: dict[str, Any] = {}
 861    for file in sorted(files):
 862        cur = filetree
 863        for part in file.parts[:-1]:
 864            if part not in cur:
 865                cur[part] = {}
 866            cur = cur[part]
 867        if not cur.get(file.parts[-1]):
 868            cur[file.parts[-1]] = file.description
 869    return filetree
 870
 871
 872@app.command(
 873    help="Plan and generate a Cylc workflow file for a number of realisations."
 874)
 875def plan_workflow(
 876    realisation_ids: Annotated[
 877        list[str],
 878        typer.Argument(
 879            help="List of realisations to generate workflows for. Realisations have the format event:realisation_count, such as Darfield:4."
 880        ),
 881    ],
 882    flow_file: Annotated[
 883        Path,
 884        typer.Argument(
 885            help="Path to output flow file (e.g. ~/cylc-src/my-workflow/flow.cylc)",
 886            writable=True,
 887            dir_okay=False,
 888        ),
 889    ],
 890    goal: Annotated[
 891        list[StageIdentifier],
 892        typer.Option(
 893            help="List of workflow outputs to generate",
 894            default_factory=lambda: [],
 895            rich_help_panel='Planning Workflows'
 896        ),
 897    ],
 898    group_goal: Annotated[
 899        list[GroupIdentifier],
 900        typer.Option(
 901            help="List of group goals to generate", default_factory=lambda: [], rich_help_panel='Planning Workflows'
 902        ),
 903    ],
 904    excluding: Annotated[
 905        list[StageIdentifier],
 906        typer.Option(help="List of stages to exclude", default_factory=lambda: [], rich_help_panel='Planning Workflows'),
 907    ],
 908    excluding_group: Annotated[
 909        list[GroupIdentifier],
 910        typer.Option(
 911            help="List of stage groups to exclude", default_factory=lambda: [], rich_help_panel='Planning Workflows'
 912        ),
 913    ],
 914    visualise: Annotated[
 915        bool, typer.Option(help="Visualise the planned workflow as a graph", rich_help_panel='Visualising Workflows')
 916    ] = False,
 917    show_required_files: Annotated[
 918        bool,
 919        typer.Option(
 920            help="Print the expected directory tree at the start of the simulation.", rich_help_panel='Visualising Workflows'
 921        ),
 922    ] = True,
 923    target_host: Annotated[
 924        WorkflowTarget,
 925        typer.Option(help="Select the target host where the workflow will be run", rich_help_panel='Planning Workflows'),
 926    ] = WorkflowTarget.NeSI,
 927    source: Annotated[
 928        Optional[Source],
 929        typer.Option(
 930            help="If given, set the source of the realisation. For NSHM and GCMT, the realisation id corresponds to the rupture id and GCMT PublicID respectively.", rich_help_panel='Sources'
 931        ),
 932    ] = None,
 933    defaults_version: Annotated[
 934        Optional[DefaultsVersion],
 935        typer.Option(
 936            help="The simulation defaults to apply for all realisations. Required if source is specified.", rich_help_panel='Sources'
 937        ),
 938    ] = None,
 939):
 940    """Plan and generate a Cylc workflow file for a number of realisations.
 941
 942    Parameters
 943    ----------
 944    realisations : list[str]
 945        The list of realisations to generate the workflow for.
 946    flow_file : Path
 947        The output flow file path to write the Cylc workflow to.
 948    goal : list[StageIdentifier]
 949        A list of workflow stages to mark as goals. These stages are
 950        define the endpoints for the workflow.
 951    group_goal : list[GroupIdentifier]
 952        A list of workflow groups to target. A workflow group is just
 953        an alias for a set of workflow stages. Equivalent to adding
 954        each group member to `goal`.
 955    excluding : list[StageIdentifier]
 956        A list of workflow stages to exclude from the flows.
 957    group_goal : list[GroupIdentifier]
 958        A list of workflow groups to exclude. A workflow group is just
 959        an alias for a set of workflow stages. Equivalent to adding
 960        each group member to `excluding`.
 961    """
 962    realisations = set.union(
 963        *[parse_realisation(realisation_id) for realisation_id in realisation_ids]
 964    )
 965    if source and not defaults_version:
 966        print(
 967            "You must specify a defaults version if you specify a source. See the help text for options."
 968        )
 969        raise typer.Exit(code=1)
 970    excluding_set = set(excluding)
 971    goal_set = set(goal)
 972    if group_goal:
 973        goal_set |= set.union(*[GROUP_GOALS[group] for group in group_goal])
 974    if excluding_group:
 975        excluding_set |= set.union(*[GROUP_STAGES[group] for group in excluding_group])
 976
 977    excluding_source_map: dict[Optional[Source], set[StageIdentifier]] = {
 978        Source.GCMT: {StageIdentifier.GCMTToRealisation},
 979        Source.NSHM: {StageIdentifier.NSHMToRealisation},
 980    }
 981    excluding_set |= set.union(
 982        *excluding_source_map.values()
 983    ) - excluding_source_map.get(source, set())
 984    workflow_plan = create_abstract_workflow_plan(realisations, goal_set, excluding_set)
 985    env = jinja2.Environment(
 986        loader=jinja2.PackageLoader("workflow"),
 987    )
 988    template = env.get_template("flow.cylc")
 989    flow_template = template.render(
 990        defaults_version=defaults_version,
 991        realisations=realisations,
 992        target_host=target_host,
 993        workflow_plan={
 994            node: sorted(dependents, key=lambda stage: stage_to_node_string(stage))
 995            for node, dependents in sorted(
 996                nx.to_dict_of_lists(workflow_plan).items(),
 997                key=lambda kv: stage_to_node_string(kv[0]),
 998            )
 999        },
1000    )
1001    flow_file.write_text(
1002        # strip empty lines from the output flow template
1003        "\n".join(line for line in flow_template.split("\n") if line.strip())
1004    )
1005    if show_required_files:
1006        root_path = Path("cylc-src") / "WORKFLOW_NAME" / "input"
1007        inputs = {
1008            AnnotatedPath(
1009                Path("cylc-src") / "WORKFLOW_NAME" / "flow.cylc",
1010                f"Your workflow file (the file {flow_file}).",
1011            )
1012        }
1013        outputs = set()
1014        required_realisation_sections = {
1015            realisation: realisation_configuration_requirements(
1016                workflow_plan, realisation
1017            )
1018            for realisation in realisations
1019        }
1020
1021        for stage in workflow_plan.nodes:
1022            inputs |= stage_inputs(stage, root_path, required_realisation_sections)
1023            outputs |= stage_outputs(stage, root_path)
1024
1025        missing_file_tree = build_filetree(inputs - outputs)
1026
1027        if missing_file_tree:
1028            print("You require the following files for your simulation:")
1029            print()
1030            printree.ptree(missing_file_tree)
1031            print()
1032            print(
1033                "Refer to the indicated sections in https://wiki.canterbury.ac.nz/display/QuakeCore/File+Formats+Used+In+Ground+Motion+Simulation"
1034            )
1035        if any(required_realisation_sections.values()):
1036            print(
1037                "Refer to the realisation glossary at URL HERE for details on filling in the realisation files."
1038            )
1039    if visualise:
1040        network = pyvis_graph(workflow_plan)
1041        with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as graph_render:
1042            network.show(graph_render.name, notebook=False)
app = <typer.main.Typer object>
class WorkflowTarget(enum.StrEnum):
27class WorkflowTarget(StrEnum):
28    """Enumeration of possible workflow targets."""
29
30    NeSI = "nesi"
31    Hypocentre = "hypocentre"

Enumeration of possible workflow targets.

NeSI = <WorkflowTarget.NeSI: 'nesi'>
Hypocentre = <WorkflowTarget.Hypocentre: 'hypocentre'>
class StageIdentifier(enum.StrEnum):
34class StageIdentifier(StrEnum):
35    """Valid stage identifier in the workflow plan."""
36
37    CopyInput = "copy_input"
38    GCMTToRealisation = "gcmt_to_realisation"
39    DomainGeneration = "generate_velocity_model_parameters"
40    VelocityModelGeneration = "generate_velocity_model"
41    StationSelection = "generate_station_coordinates"
42    ModelCoordinates = "write_model_coordinates"
43    SRFGeneration = "realisation_to_srf"
44    CheckSRF = "check_srf"
45    CopyDomainParameters = "copy_domain_parameters"
46    EMOD3DParameters = "create_e3d_par"
47    CheckDomain = "check_domain"
48    StochGeneration = "generate_stoch"
49    HighFrequency = "hf_sim"
50    LowFrequency = "emod3d"
51    Broadband = "bb_sim"
52    IntensityMeasureCalculation = "im_calc"
53    PlotTimeslices = "plot_ts"
54    MergeTimeslices = "merge_ts"
55    NSHMToRealisation = "nshm_to_realisation"

Valid stage identifier in the workflow plan.

CopyInput = <StageIdentifier.CopyInput: 'copy_input'>
GCMTToRealisation = <StageIdentifier.GCMTToRealisation: 'gcmt_to_realisation'>
DomainGeneration = <StageIdentifier.DomainGeneration: 'generate_velocity_model_parameters'>
VelocityModelGeneration = <StageIdentifier.VelocityModelGeneration: 'generate_velocity_model'>
StationSelection = <StageIdentifier.StationSelection: 'generate_station_coordinates'>
ModelCoordinates = <StageIdentifier.ModelCoordinates: 'write_model_coordinates'>
SRFGeneration = <StageIdentifier.SRFGeneration: 'realisation_to_srf'>
CheckSRF = <StageIdentifier.CheckSRF: 'check_srf'>
CopyDomainParameters = <StageIdentifier.CopyDomainParameters: 'copy_domain_parameters'>
EMOD3DParameters = <StageIdentifier.EMOD3DParameters: 'create_e3d_par'>
CheckDomain = <StageIdentifier.CheckDomain: 'check_domain'>
StochGeneration = <StageIdentifier.StochGeneration: 'generate_stoch'>
HighFrequency = <StageIdentifier.HighFrequency: 'hf_sim'>
LowFrequency = <StageIdentifier.LowFrequency: 'emod3d'>
Broadband = <StageIdentifier.Broadband: 'bb_sim'>
IntensityMeasureCalculation = <StageIdentifier.IntensityMeasureCalculation: 'im_calc'>
PlotTimeslices = <StageIdentifier.PlotTimeslices: 'plot_ts'>
MergeTimeslices = <StageIdentifier.MergeTimeslices: 'merge_ts'>
NSHMToRealisation = <StageIdentifier.NSHMToRealisation: 'nshm_to_realisation'>
class Source(enum.StrEnum):
58class Source(StrEnum):
59    """Realisation source options."""
60
61    GCMT = "gcmt"
62    NSHM = "nshm"

Realisation source options.

GCMT = <Source.GCMT: 'gcmt'>
NSHM = <Source.NSHM: 'nshm'>
class GroupIdentifier(enum.StrEnum):
65class GroupIdentifier(StrEnum):
66    """Group identifiers to use to bulk target or exclude in workflow planning."""
67
68    Preprocessing = "preprocessing"
69    """Alias for all preprocessing stages."""
70    HighFrequency = "high_frequency"
71    """Alias for the high frequency workflow."""
72    LowFrequency = "low_frequency"
73    """Alias for the low frequency workflow."""
74    Domain = "domain"

Group identifiers to use to bulk target or exclude in workflow planning.

Preprocessing = <GroupIdentifier.Preprocessing: 'preprocessing'>

Alias for all preprocessing stages.

HighFrequency = <GroupIdentifier.HighFrequency: 'high_frequency'>

Alias for the high frequency workflow.

LowFrequency = <GroupIdentifier.LowFrequency: 'low_frequency'>

Alias for the low frequency workflow.

Domain = <GroupIdentifier.Domain: 'domain'>
GROUP_STAGES = {<GroupIdentifier.Preprocessing: 'preprocessing'>: {<StageIdentifier.DomainGeneration: 'generate_velocity_model_parameters'>, <StageIdentifier.EMOD3DParameters: 'create_e3d_par'>, <StageIdentifier.CopyDomainParameters: 'copy_domain_parameters'>, <StageIdentifier.SRFGeneration: 'realisation_to_srf'>, <StageIdentifier.VelocityModelGeneration: 'generate_velocity_model'>, <StageIdentifier.ModelCoordinates: 'write_model_coordinates'>, <StageIdentifier.GCMTToRealisation: 'gcmt_to_realisation'>, <StageIdentifier.StationSelection: 'generate_station_coordinates'>, <StageIdentifier.NSHMToRealisation: 'nshm_to_realisation'>, <StageIdentifier.StochGeneration: 'generate_stoch'>}, <GroupIdentifier.HighFrequency: 'high_frequency'>: {<StageIdentifier.HighFrequency: 'hf_sim'>}, <GroupIdentifier.LowFrequency: 'low_frequency'>: {<StageIdentifier.LowFrequency: 'emod3d'>}, <GroupIdentifier.Domain: 'domain'>: {<StageIdentifier.DomainGeneration: 'generate_velocity_model_parameters'>, <StageIdentifier.CopyDomainParameters: 'copy_domain_parameters'>, <StageIdentifier.VelocityModelGeneration: 'generate_velocity_model'>, <StageIdentifier.ModelCoordinates: 'write_model_coordinates'>, <StageIdentifier.StationSelection: 'generate_station_coordinates'>}}
GROUP_GOALS = {<GroupIdentifier.Preprocessing: 'preprocessing'>: {<StageIdentifier.StochGeneration: 'generate_stoch'>, <StageIdentifier.EMOD3DParameters: 'create_e3d_par'>}, <GroupIdentifier.LowFrequency: 'low_frequency'>: {<StageIdentifier.LowFrequency: 'emod3d'>}, <GroupIdentifier.HighFrequency: 'high_frequency'>: {<StageIdentifier.HighFrequency: 'hf_sim'>}, <GroupIdentifier.Domain: 'domain'>: {<StageIdentifier.StationSelection: 'generate_station_coordinates'>, <StageIdentifier.CopyDomainParameters: 'copy_domain_parameters'>, <StageIdentifier.ModelCoordinates: 'write_model_coordinates'>, <StageIdentifier.VelocityModelGeneration: 'generate_velocity_model'>}}
class Stage(typing.NamedTuple):
119class Stage(NamedTuple):
120    """Representation of a workflow stage in the output Cylc file."""
121
122    identifier: StageIdentifier
123    """The stage identifier."""
124    event: str
125    """The event the stage is running for."""
126    sample: Optional[int]
127    """The sample number of the realisation."""

Representation of a workflow stage in the output Cylc file.

Stage( identifier: StageIdentifier, event: str, sample: Optional[int])

Create new instance of Stage(identifier, event, sample)

identifier: StageIdentifier

The stage identifier.

event: str

The event the stage is running for.

sample: Optional[int]

The sample number of the realisation.

class AnnotatedPath(pathlib.PurePath):
130class AnnotatedPath(PurePath):
131    """Pure path annotated with description of the file."""
132
133    def __init__(self, path: str | Path, description: Optional[str] = None):
134        """Create an annotated path.
135
136        Parameters
137        ----------
138        path : str | Path
139            The path to annotate.
140        description : Optional[str]
141            The description of the path.
142        """
143        super().__init__(path)
144        self.path = path
145        self.description = description
146
147    def __hash__(self):
148        """Construct a unique hash for Annotated Paths."""
149        if super().name == "realisation.json":
150            # Realisations should also have their annotations hashed.
151            return hash((self.path, self.description))
152        else:
153            return super().__hash__()

Pure path annotated with description of the file.

AnnotatedPath(path: str | pathlib.Path, description: Optional[str] = None)
133    def __init__(self, path: str | Path, description: Optional[str] = None):
134        """Create an annotated path.
135
136        Parameters
137        ----------
138        path : str | Path
139            The path to annotate.
140        description : Optional[str]
141            The description of the path.
142        """
143        super().__init__(path)
144        self.path = path
145        self.description = description

Create an annotated path.

Parameters
  • path (str | Path): The path to annotate.
  • description (Optional[str]): The description of the path.
path
description
def stage_inputs( stage: Stage, root: pathlib.Path, required_realisation_sections: dict[tuple[str, typing.Optional[int]], set[str]]) -> set[AnnotatedPath]:
156def stage_inputs(
157    stage: Stage,
158    root: Path,
159    required_realisation_sections: dict[tuple[str, Optional[int]], set[str]],
160) -> set[AnnotatedPath]:
161    """Return a list of stage inputs for the stage.
162
163    Parameters
164    ----------
165    stage : Stage
166        The stage to get inputs for.
167    root : Path
168        The root directory of the simulation.
169
170
171    Returns
172    -------
173    set[Path]
174        A set of input paths required by the stage.
175    """
176    realisation_identifier = stage.event
177    if stage.sample:
178        realisation_identifier += f"_{stage.sample}"
179    parent_directory = root / stage.event
180    event_directory = root / realisation_identifier
181    realisation_requirements = sorted(
182        required_realisation_sections.get((stage.event, stage.sample), set())
183    )
184    realisation = {
185        AnnotatedPath(
186            event_directory / "realisation.json",
187            f'Realisation file for event containing: {', '.join(realisation_requirements)}.',
188        )
189    }
190    station_ll = AnnotatedPath(
191        parent_directory / "stations" / "stations.ll",
192        "lat,lon coordinates corresponding to x,y coordinates of stations in domain.",
193    )
194    s_wave = AnnotatedPath(
195        parent_directory / "Velocity_Model" / "vs3dfile.s",
196        "s-wave velocity file for velocity model (Section: Velocity Model Files).",
197    )
198    match stage:
199        case Stage(
200            identifier=StageIdentifier.SRFGeneration
201            | StageIdentifier.VelocityModelGeneration
202            | StageIdentifier.StationSelection
203            | StageIdentifier.ModelCoordinates
204        ):
205            return realisation
206        case Stage(identifier=StageIdentifier.CopyDomainParameters):
207            return {
208                AnnotatedPath(
209                    parent_directory / "realisation.json",
210                    f"Realisation file for event containing: {', '.join(required_realisation_sections.get((stage.event, None), set()))}.",
211                )
212            }
213        case Stage(
214            identifier=StageIdentifier.EMOD3DParameters
215            | StageIdentifier.LowFrequency,
216        ):
217            return (
218                realisation
219                | {
220                    station_ll,
221                    AnnotatedPath(
222                        parent_directory / "stations" / "stations.statcords",
223                        "x,y coordinates of stations in domain.",
224                    ),
225                    AnnotatedPath(
226                        parent_directory / "model" / "model_params",
227                        "Model centre and corners for EMOD3D.",
228                    ),
229                    AnnotatedPath(
230                        parent_directory / "model" / "grid_file",
231                        "Domain extents for EMOD3D.",
232                    ),
233                    AnnotatedPath(
234                        parent_directory / "Velocity_Model" / "rho3dfile.d",
235                        "Density component file for velocity model. (Section: Velocity Model Files)",
236                    ),
237                    AnnotatedPath(
238                        parent_directory / "Velocity_Model" / "vp3dfile.p",
239                        "p-wave velocity file for velocity model. (Section: Velocity Model Files)",
240                    ),
241                    s_wave,
242                    AnnotatedPath(
243                        parent_directory / "Velocity_Model" / "in_basin_mask.b",
244                        "Boolean basin mask (Section: Velocity Model Files).",
245                    ),
246                    AnnotatedPath(
247                        event_directory / "realisation.srf",
248                        "Slip model of source (Section: SRF Format).",
249                    ),
250                }
251                | (
252                    set()
253                    if stage.identifier == StageIdentifier.EMOD3DParameters
254                    else {
255                        AnnotatedPath(
256                            event_directory / "LF" / "e3d.par",
257                            "EMOD3D Parameters (Section: https://wiki.canterbury.ac.nz/pages/viewpage.action?pageId=100794983)",
258                        )
259                    }
260                )
261            )
262        case Stage(identifier=StageIdentifier.StochGeneration):
263            return realisation | {
264                AnnotatedPath(
265                    event_directory / "realisation.srf",
266                    "Slip model of source (Section: SRF Format).",
267                )
268            }
269        case Stage(identifier=StageIdentifier.HighFrequency):
270            return realisation | {
271                AnnotatedPath(
272                    event_directory / "realisation.stoch",
273                    "Downsampled SRF for stochastic source model input (Section: Stoch format).",
274                ),
275                station_ll,
276                s_wave,
277            }
278        case Stage(identifier=StageIdentifier.Broadband):
279            return realisation | {
280                station_ll,
281                AnnotatedPath(
282                    event_directory / "LF", "Low-frequency simulation directory."
283                ),
284                AnnotatedPath(
285                    event_directory / "realisation.hf",
286                    "High-frequency waveform file (Section: LF/HF/BB binary format).",
287                ),
288                s_wave,
289            }
290        case Stage(identifier=StageIdentifier.IntensityMeasureCalculation):
291            return realisation | {
292                AnnotatedPath(
293                    event_directory / "realisation.bb",
294                    "Broadband waveform file (Section: LF/HF/BB binary format).",
295                )
296            }
297        case Stage(identifier=StageIdentifier.PlotTimeslices):
298            return {
299                AnnotatedPath(
300                    event_directory / "LF" / "OutBin" / "output.e3d",
301                    "Merged xyts-slices of low-frequency simulation (Section: XYTS.e3d binary format).",
302                )
303            }
304        case Stage(identifier=StageIdentifier.MergeTimeslices):
305            return {
306                AnnotatedPath(
307                    event_directory / "LF" / "OutBin",
308                    "Component xyts-slices of low-frequency simulation, one per compute node (Section: XYTS.e3d binary format).",
309                )
310            }
311    return set()

Return a list of stage inputs for the stage.

Parameters
  • stage (Stage): The stage to get inputs for.
  • root (Path): The root directory of the simulation.
Returns
  • set[Path]: A set of input paths required by the stage.
def stage_config_inputs(stage: Stage) -> set[str]:
314def stage_config_inputs(stage: Stage) -> set[str]:
315    """Get the realisation configuration inputs for a given stage.
316
317    Parameters
318    ----------
319    stage : Stage
320        The stage to get inputs for.
321
322
323    Returns
324    -------
325    set[str]
326        The input config sections for this stage.
327    """
328    input_dictionary = {
329        StageIdentifier.EMOD3DParameters: {
330            realisations.DomainParameters._config_key,
331            realisations.VelocityModelParameters._config_key,
332            realisations.RealisationMetadata._config_key,
333        },
334        StageIdentifier.Broadband: {
335            realisations.RealisationMetadata._config_key,
336            realisations.DomainParameters._config_key,
337        },
338        StageIdentifier.StationSelection: {realisations.DomainParameters._config_key},
339        StageIdentifier.VelocityModelGeneration: {
340            realisations.DomainParameters._config_key,
341            realisations.RealisationMetadata._config_key,
342        },
343        StageIdentifier.HighFrequency: {
344            realisations.DomainParameters._config_key,
345            realisations.RealisationMetadata._config_key,
346        },
347        StageIdentifier.IntensityMeasureCalculation: {
348            realisations.RealisationMetadata._config_key,
349            realisations.BroadbandParameters._config_key,
350        },
351        StageIdentifier.DomainGeneration: {
352            realisations.RealisationMetadata._config_key,
353            realisations.RupturePropagationConfig._config_key,
354            realisations.SourceConfig._config_key,
355        },
356        StageIdentifier.SRFGeneration: {
357            realisations.RealisationMetadata._config_key,
358            realisations.SourceConfig._config_key,
359            realisations.RupturePropagationConfig._config_key,
360        },
361        StageIdentifier.ModelCoordinates: {realisations.DomainParameters._config_key},
362        StageIdentifier.StochGeneration: {realisations.RealisationMetadata._config_key},
363        StageIdentifier.StationSelection: {realisations.DomainParameters._config_key},
364    }
365    return input_dictionary.get(stage.identifier, set())

Get the realisation configuration inputs for a given stage.

Parameters
  • stage (Stage): The stage to get inputs for.
Returns
  • set[str]: The input config sections for this stage.
def stage_config_outputs(stage: Stage) -> set[str]:
368def stage_config_outputs(stage: Stage) -> set[str]:
369    """Get the realisation configuration outputs for a given stage.
370
371    Parameters
372    ----------
373    stage : Stage
374        The stage to get outputs for.
375
376
377    Returns
378    -------
379    set[str]
380        The output config sections for this stage.
381    """
382    output_dictionary = {
383        StageIdentifier.NSHMToRealisation: {
384            realisations.SourceConfig._config_key,
385            realisations.RupturePropagationConfig._config_key,
386            realisations.RealisationMetadata._config_key,
387        },
388        StageIdentifier.GCMTToRealisation: {
389            realisations.SourceConfig._config_key,
390            realisations.RupturePropagationConfig._config_key,
391            realisations.RealisationMetadata._config_key,
392        },
393        StageIdentifier.EMOD3DParameters: {realisations.EMOD3DParameters._config_key},
394        StageIdentifier.Broadband: {realisations.BroadbandParameters._config_key},
395        StageIdentifier.VelocityModelGeneration: {
396            realisations.VelocityModelParameters._config_key
397        },
398        StageIdentifier.HighFrequency: {realisations.HFConfig._config_key},
399        StageIdentifier.IntensityMeasureCalculation: {
400            realisations.IntensityMeasureCalcuationParameters._config_key
401        },
402        StageIdentifier.CopyDomainParameters: {
403            realisations.VelocityModelParameters._config_key,
404            realisations.DomainParameters._config_key,
405        },
406        StageIdentifier.DomainGeneration: {
407            realisations.VelocityModelParameters._config_key,
408            realisations.DomainParameters._config_key,
409        },
410        StageIdentifier.SRFGeneration: {realisations.SRFConfig._config_key},
411        StageIdentifier.StochGeneration: {realisations.HFConfig._config_key},
412    }
413    return output_dictionary.get(stage.identifier, set())

Get the realisation configuration outputs for a given stage.

Parameters
  • stage (Stage): The stage to get outputs for.
Returns
  • set[str]: The output config sections for this stage.
def realisation_configuration_requirements( workflow_plan: networkx.classes.digraph.DiGraph, realisation: tuple[str, typing.Optional[int]]) -> set[str]:
416def realisation_configuration_requirements(
417    workflow_plan: nx.DiGraph, realisation: tuple[str, Optional[int]]
418) -> set[str]:
419    """Get the requirements for the realisation.json in this stage.
420
421    Parameters
422    ----------
423    workflow_plan : nx.DiGraph
424        The overall workflow plane.
425    realisation : tuple[str, Optional[int]]
426        The realisation to retrieve requirements for.
427
428    Returns
429    -------
430    set[str]
431        The realisation configuration sections that must be present
432        for the workflow to run correctly.
433    """
434    stages = [
435        stage
436        for stage in workflow_plan.nodes
437        if (stage.event, stage.sample) == realisation
438    ]
439    if stages:
440        return set.union(*[stage_config_inputs(stage) for stage in stages]) - set.union(
441            *[stage_config_outputs(stage) for stage in stages]
442        )
443    else:
444        return set()

Get the requirements for the realisation.json in this stage.

Parameters
  • workflow_plan (nx.DiGraph): The overall workflow plane.
  • realisation (tuple[str, Optional[int]]): The realisation to retrieve requirements for.
Returns
  • set[str]: The realisation configuration sections that must be present for the workflow to run correctly.
def stage_outputs( stage: Stage, root: pathlib.Path) -> set[AnnotatedPath]:
447def stage_outputs(stage: Stage, root: Path) -> set[AnnotatedPath]:
448    """Return a list of stage inputs for the stage.
449
450    Parameters
451    ----------
452    stage : Stage
453        The stage to get inputs for.
454    root : Path
455        The root directory of the simulation.
456
457
458    Returns
459    -------
460    set[Path]
461        A set of input paths required by the stage.
462    """
463    realisation_identifier = stage.event
464    if stage.sample:
465        realisation_identifier += f"_{stage.sample}"
466    event_directory = root / realisation_identifier
467    realisation = {
468        AnnotatedPath(
469            event_directory / "realisation.json", "Realisation file for event."
470        )
471    }
472    match stage:
473        case Stage(
474            identifier=StageIdentifier.NSHMToRealisation
475            | StageIdentifier.GCMTToRealisation
476        ):
477            return realisation
478        case Stage(identifier=StageIdentifier.SRFGeneration):
479            return {
480                AnnotatedPath(
481                    event_directory / "realisation.srf",
482                    "Slip model of source.",
483                )
484            }
485        case Stage(identifier=StageIdentifier.ModelCoordinates):
486            return {
487                AnnotatedPath(
488                    event_directory / "model" / "model_params",
489                    "Model centre and corners for EMOD3D.",
490                ),
491                AnnotatedPath(
492                    event_directory / "model" / "grid_file",
493                    "Domain extents for EMOD3D.",
494                ),
495            }
496        case Stage(identifier=StageIdentifier.StationSelection):
497            return {
498                AnnotatedPath(
499                    event_directory / "stations" / "stations.ll",
500                    "lat,lon coordinates corresponding to x,y coordinates of stations in domain.",
501                ),
502                AnnotatedPath(
503                    event_directory / "stations" / "stations.statcords",
504                    "x,y coordinates of stations in domain.",
505                ),
506            }
507        case Stage(identifier=StageIdentifier.VelocityModelGeneration):
508            return {
509                AnnotatedPath(
510                    event_directory / "Velocity_Model" / "rho3dfile.d",
511                    "Density component file for velocity model.",
512                ),
513                AnnotatedPath(
514                    event_directory / "Velocity_Model" / "vp3dfile.p",
515                    "p-wave velocity file for velocity model.",
516                ),
517                AnnotatedPath(
518                    event_directory / "Velocity_Model" / "vs3dfile.s",
519                    "s-wave velocity file for velocity model.",
520                ),
521                AnnotatedPath(
522                    event_directory / "Velocity_Model" / "in_basin_mask.b",
523                    "Boolean basin mask.",
524                ),
525            }
526        case Stage(identifier=StageIdentifier.EMOD3DParameters):
527            return {
528                AnnotatedPath(
529                    event_directory / "LF", "Low-frequency simulation directory."
530                ),
531                AnnotatedPath(event_directory / "LF" / "e3d.par", "EMOD3D Parameters"),
532            }
533        case Stage(identifier=StageIdentifier.LowFrequency):
534            return {
535                AnnotatedPath(
536                    event_directory / "LF", "Low-frequency simulation directory."
537                )
538            }
539        case Stage(identifier=StageIdentifier.StochGeneration):
540            return {
541                AnnotatedPath(
542                    event_directory / "realisation.stoch",
543                    "Downsampled SRF for stochastic source model input.",
544                )
545            }
546        case Stage(identifier=StageIdentifier.HighFrequency):
547            return {
548                AnnotatedPath(
549                    event_directory / "realisation.hf", "High-frequency waveform file."
550                )
551            }
552        case Stage(identifier=StageIdentifier.Broadband):
553            return {
554                AnnotatedPath(
555                    event_directory / "realisation.bb", "Broadband waveform file."
556                )
557            }
558        case Stage(identifier=StageIdentifier.IntensityMeasureCalculation):
559            return {
560                AnnotatedPath(
561                    event_directory / "ims.parquet", "Intensity measure statistics."
562                )
563            }
564        case Stage(identifier=StageIdentifier.PlotTimeslices):
565            return {
566                AnnotatedPath(
567                    event_directory / "animation.mp4",
568                    "Animation of low-frequency waveform.",
569                )
570            }
571        case Stage(identifier=StageIdentifier.MergeTimeslices):
572            return {
573                AnnotatedPath(
574                    event_directory / "LF" / "OutBin" / "output.e3d",
575                    "Merged xyts-slices of low-frequency simulation.",
576                )
577            }
578        case _:
579            return set()

Return a list of stage inputs for the stage.

Parameters
  • stage (Stage): The stage to get inputs for.
  • root (Path): The root directory of the simulation.
Returns
  • set[Path]: A set of input paths required by the stage.
def realisation_workflow(event: str, sample: Optional[int]) -> networkx.classes.digraph.DiGraph:
582def realisation_workflow(event: str, sample: Optional[int]) -> nx.DiGraph:
583    """Add a realisation to a workflow plan.
584
585    Adds all stages for the realisation to run, and links to event
586    stages for shared resources (i.e. the velocity model).
587
588    Parameters
589    ----------
590    workflow_plan : nx.DiGraph
591        The current workflow paln.
592    event : str
593        The event to add.
594    sample : Optional[int]
595        The sample number (or None, if the original event).
596    """
597    workflow_plan = nx.from_dict_of_lists(
598        {
599            Stage(StageIdentifier.NSHMToRealisation, event, sample): [
600                Stage(StageIdentifier.SRFGeneration, event, sample),
601            ],
602            Stage(StageIdentifier.GCMTToRealisation, event, sample): [
603                Stage(StageIdentifier.SRFGeneration, event, sample)
604            ],
605            Stage(StageIdentifier.SRFGeneration, event, sample): [
606                Stage(StageIdentifier.CheckSRF, event, sample)
607            ],
608            Stage(StageIdentifier.CheckSRF, event, sample): [
609                Stage(StageIdentifier.StochGeneration, event, sample),
610                Stage(StageIdentifier.EMOD3DParameters, event, sample),
611            ],
612            Stage(StageIdentifier.VelocityModelGeneration, event, None): [
613                Stage(StageIdentifier.EMOD3DParameters, event, sample),
614                Stage(StageIdentifier.HighFrequency, event, sample),
615            ],
616            Stage(StageIdentifier.StationSelection, event, None): [
617                Stage(StageIdentifier.EMOD3DParameters, event, sample),
618                Stage(StageIdentifier.HighFrequency, event, sample),
619            ],
620            Stage(StageIdentifier.ModelCoordinates, event, None): [
621                Stage(StageIdentifier.EMOD3DParameters, event, sample)
622            ],
623            Stage(StageIdentifier.EMOD3DParameters, event, sample): [
624                Stage(StageIdentifier.CheckDomain, event, sample)
625            ],
626            Stage(StageIdentifier.CheckDomain, event, sample): [
627                Stage(StageIdentifier.LowFrequency, event, sample)
628            ],
629            Stage(StageIdentifier.LowFrequency, event, sample): [
630                Stage(StageIdentifier.Broadband, event, sample),
631                Stage(StageIdentifier.MergeTimeslices, event, sample),
632            ],
633            Stage(StageIdentifier.StochGeneration, event, sample): [
634                Stage(StageIdentifier.HighFrequency, event, sample)
635            ],
636            Stage(StageIdentifier.HighFrequency, event, sample): [
637                Stage(StageIdentifier.Broadband, event, sample)
638            ],
639            Stage(StageIdentifier.Broadband, event, sample): [
640                Stage(StageIdentifier.IntensityMeasureCalculation, event, sample)
641            ],
642            Stage(StageIdentifier.MergeTimeslices, event, sample): [
643                Stage(StageIdentifier.PlotTimeslices, event, sample)
644            ],
645        },
646        create_using=nx.DiGraph,
647    )
648    if not sample:
649        workflow_plan.add_edges_from(
650            [
651                (
652                    Stage(StageIdentifier.NSHMToRealisation, event, sample),
653                    Stage(StageIdentifier.DomainGeneration, event, sample),
654                ),
655                (
656                    Stage(StageIdentifier.GCMTToRealisation, event, sample),
657                    Stage(StageIdentifier.DomainGeneration, event, sample),
658                ),
659                (
660                    Stage(StageIdentifier.DomainGeneration, event, sample),
661                    Stage(StageIdentifier.EMOD3DParameters, event, sample),
662                ),
663                (
664                    Stage(StageIdentifier.DomainGeneration, event, sample),
665                    Stage(StageIdentifier.VelocityModelGeneration, event, sample),
666                ),
667                (
668                    Stage(StageIdentifier.DomainGeneration, event, sample),
669                    Stage(StageIdentifier.StationSelection, event, sample),
670                ),
671                (
672                    Stage(StageIdentifier.DomainGeneration, event, sample),
673                    Stage(StageIdentifier.ModelCoordinates, event, sample),
674                ),
675            ]
676        )
677    else:
678        workflow_plan.add_edges_from(
679            [
680                (
681                    Stage(StageIdentifier.DomainGeneration, event, None),
682                    Stage(StageIdentifier.CopyDomainParameters, event, sample),
683                ),
684                (
685                    Stage(StageIdentifier.CopyDomainParameters, event, sample),
686                    Stage(StageIdentifier.EMOD3DParameters, event, sample),
687                ),
688                (
689                    Stage(StageIdentifier.CopyDomainParameters, event, sample),
690                    Stage(StageIdentifier.HighFrequency, event, sample),
691                ),
692            ]
693        )
694
695    return workflow_plan

Add a realisation to a workflow plan.

Adds all stages for the realisation to run, and links to event stages for shared resources (i.e. the velocity model).

Parameters
  • workflow_plan (nx.DiGraph): The current workflow paln.
  • event (str): The event to add.
  • sample (Optional[int]): The sample number (or None, if the original event).
def create_abstract_workflow_plan( realisations: set[tuple[str, typing.Optional[int]]], goals: Iterable[StageIdentifier], excluding: Iterable[StageIdentifier]) -> networkx.classes.digraph.DiGraph:
698def create_abstract_workflow_plan(
699    realisations: set[tuple[str, Optional[int]]],
700    goals: Iterable[StageIdentifier],
701    excluding: Iterable[StageIdentifier],
702) -> nx.DiGraph:
703    """Create an abstract workflow graph from a list of goals and excluded stages.
704
705    Parameters
706    ----------
707    goals : Iterable[StageIdentifier]
708        The goal stages for the workflow.
709    excluding : Iterable[StageIdentifier]
710        The excluded stages for the workflow.
711
712    Returns
713    -------
714    nx.DiGraph
715        A abstract workflow plan. This workflow plan contains only
716        included stages that are required to reach the goals. If two
717        workflow stages depend on each other only through paths
718        consisting entirely of excluded nodes, then they are adjacent
719        directly in the abstract plan by edges.
720    """
721
722    excluding_stages = {
723        Stage(excluded, *realisation)
724        for excluded in excluding
725        for realisation in realisations
726    }
727
728    output_graph = nx.DiGraph()
729    realisation_iteration = (
730        realisations if len(realisations) < 100 else tqdm.tqdm(realisations)
731    )
732
733    for realisation in realisation_iteration:
734        workflow_plan = realisation_workflow(*realisation)
735        workflow_plan = nx.transitive_closure_dag(workflow_plan)
736
737        for goal in goals:
738            reduced_graph = nx.transitive_reduction(
739                workflow_plan.subgraph(
740                    (
741                        set(workflow_plan.predecessors(Stage(goal, *realisation)))
742                        | {Stage(goal, *realisation)}
743                    )
744                    - excluding_stages
745                )
746            )
747            output_graph.update(
748                edges=reduced_graph.edges(), nodes=reduced_graph.nodes()
749            )
750
751    roots = [node for node, degree in output_graph.in_degree() if degree == 0]
752    copy_input_stage = Stage(StageIdentifier.CopyInput, "", None)
753    output_graph.add_node(copy_input_stage)
754    for root in roots:
755        output_graph.add_edge(copy_input_stage, root)
756    return output_graph

Create an abstract workflow graph from a list of goals and excluded stages.

Parameters
  • goals (Iterable[StageIdentifier]): The goal stages for the workflow.
  • excluding (Iterable[StageIdentifier]): The excluded stages for the workflow.
Returns
  • nx.DiGraph: A abstract workflow plan. This workflow plan contains only included stages that are required to reach the goals. If two workflow stages depend on each other only through paths consisting entirely of excluded nodes, then they are adjacent directly in the abstract plan by edges.
def stage_to_node_string(stage: Stage) -> str:
759def stage_to_node_string(stage: Stage) -> str:
760    r"""Convert a `Stage` into a human readable node identifier string.
761
762    Parameters
763    ----------
764    stage : Stage
765        The stage to render.
766
767    Returns
768    -------
769    str
770        A string of the format
771        {stage.identifier}\n{stage.event}_{stage.sample}, if event and
772        sample are non-trivial.
773    """
774    node_string = str(stage.identifier)
775    if stage.event:
776        node_string += f"\n{stage.event}"
777    if stage.sample:
778        node_string += f"_{stage.sample}"
779    return node_string

Convert a Stage into a human readable node identifier string.

Parameters
  • stage (Stage): The stage to render.
Returns
  • str: A string of the format {stage.identifier}\n{stage.event}_{stage.sample}, if event and sample are non-trivial.
def pyvis_graph(workflow_plan: networkx.classes.digraph.DiGraph) -> pyvis.network.Network:
782def pyvis_graph(workflow_plan: nx.DiGraph) -> Network:
783    """Convert a workflow plan into a pyvis diagram for visualisation.
784
785    Parameters
786    ----------
787    workflow_plan : nx.DiGraph
788        The workflow plan to visualise.
789
790
791    Returns
792    -------
793    Network
794        A pyvis rendering for this workflow plan.
795    """
796    network = Network(
797        width="100%", height="1500px", directed=True, layout="hierarchical"
798    )
799    network.show_buttons(filter_=["physics"])
800    roots = [node for node, degree in workflow_plan.in_degree() if degree == 0]
801    reversed_workflow = workflow_plan.reverse()
802    stage: Stage
803    for stage in workflow_plan.nodes():
804        network.add_node(
805            stage_to_node_string(stage),
806            group=f"{stage.event}_{stage.sample or ''}",
807            size=20,
808            level=max(
809                (
810                    len(path) - 1
811                    for root in roots
812                    for path in nx.all_simple_paths(reversed_workflow, stage, root)
813                ),
814                default=0,
815            ),
816        )
817    for stage, next_stage in workflow_plan.edges():
818        network.add_edge(stage_to_node_string(stage), stage_to_node_string(next_stage))
819    return network

Convert a workflow plan into a pyvis diagram for visualisation.

Parameters
  • workflow_plan (nx.DiGraph): The workflow plan to visualise.
Returns
  • Network: A pyvis rendering for this workflow plan.
REALISATION_ITERATION_RE = '_rel\\d+$'
def parse_realisation(realisation_id: str) -> set[tuple[str, typing.Optional[int]]]:
825def parse_realisation(realisation_id: str) -> set[tuple[str, Optional[int]]]:
826    """Parse a realisation identifier string from the command line into a realisation identifier.
827
828    Parameters
829    ----------
830    realisation_id : str
831        The realisation identifier string to parse.
832
833    Returns
834    -------
835    tuple[str, Optional[int]]
836        The parsed realisation event and sample number.
837    """
838    try:
839        index = realisation_id.rindex(":")
840        event, num_samples = realisation_id[:index], realisation_id[index + 1 :]
841
842        return {(event, sample or None) for sample in range(int(num_samples))}
843    except ValueError:
844        return {(realisation_id, None)}

Parse a realisation identifier string from the command line into a realisation identifier.

Parameters
  • realisation_id (str): The realisation identifier string to parse.
Returns
  • tuple[str, Optional[int]]: The parsed realisation event and sample number.
def build_filetree( files: set[AnnotatedPath]) -> dict[str, typing.Any]:
847def build_filetree(files: set[AnnotatedPath]) -> dict[str, Any]:
848    """Build a file tree from a set of annotated file paths.
849
850    Parameters
851    ----------
852    files : set[AnnotatedPath]
853        The set of files to construct a tree for.
854
855
856    Returns
857    -------
858    dict[str, Any]
859        A file tree.
860    """
861    filetree: dict[str, Any] = {}
862    for file in sorted(files):
863        cur = filetree
864        for part in file.parts[:-1]:
865            if part not in cur:
866                cur[part] = {}
867            cur = cur[part]
868        if not cur.get(file.parts[-1]):
869            cur[file.parts[-1]] = file.description
870    return filetree

Build a file tree from a set of annotated file paths.

Parameters
  • files (set[AnnotatedPath]): The set of files to construct a tree for.
Returns
  • dict[str, Any]: A file tree.
@app.command(help='Plan and generate a Cylc workflow file for a number of realisations.')
def plan_workflow( realisation_ids: Annotated[list[str], <typer.models.ArgumentInfo object>], flow_file: Annotated[pathlib.Path, <typer.models.ArgumentInfo object>], goal: Annotated[list[StageIdentifier], <typer.models.OptionInfo object>], group_goal: Annotated[list[GroupIdentifier], <typer.models.OptionInfo object>], excluding: Annotated[list[StageIdentifier], <typer.models.OptionInfo object>], excluding_group: Annotated[list[GroupIdentifier], <typer.models.OptionInfo object>], visualise: Annotated[bool, <typer.models.OptionInfo object>] = False, show_required_files: Annotated[bool, <typer.models.OptionInfo object>] = True, target_host: Annotated[WorkflowTarget, <typer.models.OptionInfo object>] = <WorkflowTarget.NeSI: 'nesi'>, source: Annotated[Optional[Source], <typer.models.OptionInfo object>] = None, defaults_version: Annotated[Optional[workflow.defaults.DefaultsVersion], <typer.models.OptionInfo object>] = None):
 873@app.command(
 874    help="Plan and generate a Cylc workflow file for a number of realisations."
 875)
 876def plan_workflow(
 877    realisation_ids: Annotated[
 878        list[str],
 879        typer.Argument(
 880            help="List of realisations to generate workflows for. Realisations have the format event:realisation_count, such as Darfield:4."
 881        ),
 882    ],
 883    flow_file: Annotated[
 884        Path,
 885        typer.Argument(
 886            help="Path to output flow file (e.g. ~/cylc-src/my-workflow/flow.cylc)",
 887            writable=True,
 888            dir_okay=False,
 889        ),
 890    ],
 891    goal: Annotated[
 892        list[StageIdentifier],
 893        typer.Option(
 894            help="List of workflow outputs to generate",
 895            default_factory=lambda: [],
 896            rich_help_panel='Planning Workflows'
 897        ),
 898    ],
 899    group_goal: Annotated[
 900        list[GroupIdentifier],
 901        typer.Option(
 902            help="List of group goals to generate", default_factory=lambda: [], rich_help_panel='Planning Workflows'
 903        ),
 904    ],
 905    excluding: Annotated[
 906        list[StageIdentifier],
 907        typer.Option(help="List of stages to exclude", default_factory=lambda: [], rich_help_panel='Planning Workflows'),
 908    ],
 909    excluding_group: Annotated[
 910        list[GroupIdentifier],
 911        typer.Option(
 912            help="List of stage groups to exclude", default_factory=lambda: [], rich_help_panel='Planning Workflows'
 913        ),
 914    ],
 915    visualise: Annotated[
 916        bool, typer.Option(help="Visualise the planned workflow as a graph", rich_help_panel='Visualising Workflows')
 917    ] = False,
 918    show_required_files: Annotated[
 919        bool,
 920        typer.Option(
 921            help="Print the expected directory tree at the start of the simulation.", rich_help_panel='Visualising Workflows'
 922        ),
 923    ] = True,
 924    target_host: Annotated[
 925        WorkflowTarget,
 926        typer.Option(help="Select the target host where the workflow will be run", rich_help_panel='Planning Workflows'),
 927    ] = WorkflowTarget.NeSI,
 928    source: Annotated[
 929        Optional[Source],
 930        typer.Option(
 931            help="If given, set the source of the realisation. For NSHM and GCMT, the realisation id corresponds to the rupture id and GCMT PublicID respectively.", rich_help_panel='Sources'
 932        ),
 933    ] = None,
 934    defaults_version: Annotated[
 935        Optional[DefaultsVersion],
 936        typer.Option(
 937            help="The simulation defaults to apply for all realisations. Required if source is specified.", rich_help_panel='Sources'
 938        ),
 939    ] = None,
 940):
 941    """Plan and generate a Cylc workflow file for a number of realisations.
 942
 943    Parameters
 944    ----------
 945    realisations : list[str]
 946        The list of realisations to generate the workflow for.
 947    flow_file : Path
 948        The output flow file path to write the Cylc workflow to.
 949    goal : list[StageIdentifier]
 950        A list of workflow stages to mark as goals. These stages are
 951        define the endpoints for the workflow.
 952    group_goal : list[GroupIdentifier]
 953        A list of workflow groups to target. A workflow group is just
 954        an alias for a set of workflow stages. Equivalent to adding
 955        each group member to `goal`.
 956    excluding : list[StageIdentifier]
 957        A list of workflow stages to exclude from the flows.
 958    group_goal : list[GroupIdentifier]
 959        A list of workflow groups to exclude. A workflow group is just
 960        an alias for a set of workflow stages. Equivalent to adding
 961        each group member to `excluding`.
 962    """
 963    realisations = set.union(
 964        *[parse_realisation(realisation_id) for realisation_id in realisation_ids]
 965    )
 966    if source and not defaults_version:
 967        print(
 968            "You must specify a defaults version if you specify a source. See the help text for options."
 969        )
 970        raise typer.Exit(code=1)
 971    excluding_set = set(excluding)
 972    goal_set = set(goal)
 973    if group_goal:
 974        goal_set |= set.union(*[GROUP_GOALS[group] for group in group_goal])
 975    if excluding_group:
 976        excluding_set |= set.union(*[GROUP_STAGES[group] for group in excluding_group])
 977
 978    excluding_source_map: dict[Optional[Source], set[StageIdentifier]] = {
 979        Source.GCMT: {StageIdentifier.GCMTToRealisation},
 980        Source.NSHM: {StageIdentifier.NSHMToRealisation},
 981    }
 982    excluding_set |= set.union(
 983        *excluding_source_map.values()
 984    ) - excluding_source_map.get(source, set())
 985    workflow_plan = create_abstract_workflow_plan(realisations, goal_set, excluding_set)
 986    env = jinja2.Environment(
 987        loader=jinja2.PackageLoader("workflow"),
 988    )
 989    template = env.get_template("flow.cylc")
 990    flow_template = template.render(
 991        defaults_version=defaults_version,
 992        realisations=realisations,
 993        target_host=target_host,
 994        workflow_plan={
 995            node: sorted(dependents, key=lambda stage: stage_to_node_string(stage))
 996            for node, dependents in sorted(
 997                nx.to_dict_of_lists(workflow_plan).items(),
 998                key=lambda kv: stage_to_node_string(kv[0]),
 999            )
1000        },
1001    )
1002    flow_file.write_text(
1003        # strip empty lines from the output flow template
1004        "\n".join(line for line in flow_template.split("\n") if line.strip())
1005    )
1006    if show_required_files:
1007        root_path = Path("cylc-src") / "WORKFLOW_NAME" / "input"
1008        inputs = {
1009            AnnotatedPath(
1010                Path("cylc-src") / "WORKFLOW_NAME" / "flow.cylc",
1011                f"Your workflow file (the file {flow_file}).",
1012            )
1013        }
1014        outputs = set()
1015        required_realisation_sections = {
1016            realisation: realisation_configuration_requirements(
1017                workflow_plan, realisation
1018            )
1019            for realisation in realisations
1020        }
1021
1022        for stage in workflow_plan.nodes:
1023            inputs |= stage_inputs(stage, root_path, required_realisation_sections)
1024            outputs |= stage_outputs(stage, root_path)
1025
1026        missing_file_tree = build_filetree(inputs - outputs)
1027
1028        if missing_file_tree:
1029            print("You require the following files for your simulation:")
1030            print()
1031            printree.ptree(missing_file_tree)
1032            print()
1033            print(
1034                "Refer to the indicated sections in https://wiki.canterbury.ac.nz/display/QuakeCore/File+Formats+Used+In+Ground+Motion+Simulation"
1035            )
1036        if any(required_realisation_sections.values()):
1037            print(
1038                "Refer to the realisation glossary at URL HERE for details on filling in the realisation files."
1039            )
1040    if visualise:
1041        network = pyvis_graph(workflow_plan)
1042        with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as graph_render:
1043            network.show(graph_render.name, notebook=False)

Plan and generate a Cylc workflow file for a number of realisations.

Parameters
  • realisations (list[str]): The list of realisations to generate the workflow for.
  • flow_file (Path): The output flow file path to write the Cylc workflow to.
  • goal (list[StageIdentifier]): A list of workflow stages to mark as goals. These stages are define the endpoints for the workflow.
  • group_goal (list[GroupIdentifier]): A list of workflow groups to target. A workflow group is just an alias for a set of workflow stages. Equivalent to adding each group member to goal.
  • excluding (list[StageIdentifier]): A list of workflow stages to exclude from the flows.
  • group_goal (list[GroupIdentifier]): A list of workflow groups to exclude. A workflow group is just an alias for a set of workflow stages. Equivalent to adding each group member to excluding.