workflow.scripts.plan_workflow
Create a Cylc workflow plan from a list of goals and stages to exclude.
This is the starting point for most workflow usages, and can be used to generate a base Cylc workflow to modify and extend.
1"""Create a Cylc workflow plan from a list of goals and stages to exclude. 2 3This is the starting point for most workflow usages, and can be used 4to generate a base Cylc workflow to modify and extend. 5""" 6 7import tempfile 8from collections.abc import Iterable 9from enum import StrEnum 10from pathlib import Path, PurePath 11from typing import Annotated, Any, NamedTuple, Optional 12 13import jinja2 14import networkx as nx 15import printree 16import tqdm 17import typer 18from pyvis.network import Network 19 20from workflow import realisations 21from workflow.defaults import DefaultsVersion 22 23app = typer.Typer() 24 25 26class WorkflowTarget(StrEnum): 27 """Enumeration of possible workflow targets.""" 28 29 NeSI = "nesi" 30 Hypocentre = "hypocentre" 31 32 33class StageIdentifier(StrEnum): 34 """Valid stage identifier in the workflow plan.""" 35 36 CopyInput = "copy_input" 37 GCMTToRealisation = "gcmt_to_realisation" 38 DomainGeneration = "generate_velocity_model_parameters" 39 VelocityModelGeneration = "generate_velocity_model" 40 StationSelection = "generate_station_coordinates" 41 ModelCoordinates = "write_model_coordinates" 42 SRFGeneration = "realisation_to_srf" 43 CheckSRF = "check_srf" 44 CopyDomainParameters = "copy_domain_parameters" 45 EMOD3DParameters = "create_e3d_par" 46 CheckDomain = "check_domain" 47 StochGeneration = "generate_stoch" 48 HighFrequency = "hf_sim" 49 LowFrequency = "emod3d" 50 Broadband = "bb_sim" 51 IntensityMeasureCalculation = "im_calc" 52 PlotTimeslices = "plot_ts" 53 MergeTimeslices = "merge_ts" 54 NSHMToRealisation = "nshm_to_realisation" 55 56 57class Source(StrEnum): 58 """Realisation source options.""" 59 60 GCMT = "gcmt" 61 NSHM = "nshm" 62 63 64class GroupIdentifier(StrEnum): 65 """Group identifiers to use to bulk target or exclude in workflow planning.""" 66 67 Preprocessing = "preprocessing" 68 """Alias for all preprocessing stages.""" 69 HighFrequency = "high_frequency" 70 """Alias for the high frequency workflow.""" 71 LowFrequency = "low_frequency" 72 """Alias for the low frequency workflow.""" 73 Domain = "domain" 74 75 76GROUP_STAGES = { 77 GroupIdentifier.Preprocessing: { 78 StageIdentifier.DomainGeneration, 79 StageIdentifier.VelocityModelGeneration, 80 StageIdentifier.StationSelection, 81 StageIdentifier.ModelCoordinates, 82 StageIdentifier.SRFGeneration, 83 StageIdentifier.EMOD3DParameters, 84 StageIdentifier.NSHMToRealisation, 85 StageIdentifier.GCMTToRealisation, 86 StageIdentifier.StochGeneration, 87 StageIdentifier.CopyDomainParameters, 88 }, 89 GroupIdentifier.HighFrequency: { 90 StageIdentifier.HighFrequency, 91 }, 92 GroupIdentifier.LowFrequency: {StageIdentifier.LowFrequency}, 93 GroupIdentifier.Domain: { 94 StageIdentifier.VelocityModelGeneration, 95 StageIdentifier.StationSelection, 96 StageIdentifier.DomainGeneration, 97 StageIdentifier.CopyDomainParameters, 98 StageIdentifier.ModelCoordinates, 99 }, 100} 101 102GROUP_GOALS = { 103 GroupIdentifier.Preprocessing: { 104 StageIdentifier.EMOD3DParameters, 105 StageIdentifier.StochGeneration, 106 }, 107 GroupIdentifier.LowFrequency: {StageIdentifier.LowFrequency}, 108 GroupIdentifier.HighFrequency: {StageIdentifier.HighFrequency}, 109 GroupIdentifier.Domain: { 110 StageIdentifier.VelocityModelGeneration, 111 StageIdentifier.StationSelection, 112 StageIdentifier.ModelCoordinates, 113 StageIdentifier.CopyDomainParameters, 114 }, 115} 116 117 118class Stage(NamedTuple): 119 """Representation of a workflow stage in the output Cylc file.""" 120 121 identifier: StageIdentifier 122 """The stage identifier.""" 123 event: str 124 """The event the stage is running for.""" 125 sample: Optional[int] 126 """The sample number of the realisation.""" 127 128 129class AnnotatedPath(PurePath): 130 """Pure path annotated with description of the file.""" 131 132 def __init__(self, path: str | Path, description: Optional[str] = None): 133 """Create an annotated path. 134 135 Parameters 136 ---------- 137 path : str | Path 138 The path to annotate. 139 description : Optional[str] 140 The description of the path. 141 """ 142 super().__init__(path) 143 self.path = path 144 self.description = description 145 146 def __hash__(self): 147 """Construct a unique hash for Annotated Paths.""" 148 if super().name == "realisation.json": 149 # Realisations should also have their annotations hashed. 150 return hash((self.path, self.description)) 151 else: 152 return super().__hash__() 153 154 155def stage_inputs( 156 stage: Stage, 157 root: Path, 158 required_realisation_sections: dict[tuple[str, Optional[int]], set[str]], 159) -> set[AnnotatedPath]: 160 """Return a list of stage inputs for the stage. 161 162 Parameters 163 ---------- 164 stage : Stage 165 The stage to get inputs for. 166 root : Path 167 The root directory of the simulation. 168 169 170 Returns 171 ------- 172 set[Path] 173 A set of input paths required by the stage. 174 """ 175 realisation_identifier = stage.event 176 if stage.sample: 177 realisation_identifier += f"_{stage.sample}" 178 parent_directory = root / stage.event 179 event_directory = root / realisation_identifier 180 realisation_requirements = sorted( 181 required_realisation_sections.get((stage.event, stage.sample), set()) 182 ) 183 realisation = { 184 AnnotatedPath( 185 event_directory / "realisation.json", 186 f'Realisation file for event containing: {', '.join(realisation_requirements)}.', 187 ) 188 } 189 station_ll = AnnotatedPath( 190 parent_directory / "stations" / "stations.ll", 191 "lat,lon coordinates corresponding to x,y coordinates of stations in domain.", 192 ) 193 s_wave = AnnotatedPath( 194 parent_directory / "Velocity_Model" / "vs3dfile.s", 195 "s-wave velocity file for velocity model (Section: Velocity Model Files).", 196 ) 197 match stage: 198 case Stage( 199 identifier=StageIdentifier.SRFGeneration 200 | StageIdentifier.VelocityModelGeneration 201 | StageIdentifier.StationSelection 202 | StageIdentifier.ModelCoordinates 203 ): 204 return realisation 205 case Stage(identifier=StageIdentifier.CopyDomainParameters): 206 return { 207 AnnotatedPath( 208 parent_directory / "realisation.json", 209 f"Realisation file for event containing: {', '.join(required_realisation_sections.get((stage.event, None), set()))}.", 210 ) 211 } 212 case Stage( 213 identifier=StageIdentifier.EMOD3DParameters 214 | StageIdentifier.LowFrequency, 215 ): 216 return ( 217 realisation 218 | { 219 station_ll, 220 AnnotatedPath( 221 parent_directory / "stations" / "stations.statcords", 222 "x,y coordinates of stations in domain.", 223 ), 224 AnnotatedPath( 225 parent_directory / "model" / "model_params", 226 "Model centre and corners for EMOD3D.", 227 ), 228 AnnotatedPath( 229 parent_directory / "model" / "grid_file", 230 "Domain extents for EMOD3D.", 231 ), 232 AnnotatedPath( 233 parent_directory / "Velocity_Model" / "rho3dfile.d", 234 "Density component file for velocity model. (Section: Velocity Model Files)", 235 ), 236 AnnotatedPath( 237 parent_directory / "Velocity_Model" / "vp3dfile.p", 238 "p-wave velocity file for velocity model. (Section: Velocity Model Files)", 239 ), 240 s_wave, 241 AnnotatedPath( 242 parent_directory / "Velocity_Model" / "in_basin_mask.b", 243 "Boolean basin mask (Section: Velocity Model Files).", 244 ), 245 AnnotatedPath( 246 event_directory / "realisation.srf", 247 "Slip model of source (Section: SRF Format).", 248 ), 249 } 250 | ( 251 set() 252 if stage.identifier == StageIdentifier.EMOD3DParameters 253 else { 254 AnnotatedPath( 255 event_directory / "LF" / "e3d.par", 256 "EMOD3D Parameters (Section: https://wiki.canterbury.ac.nz/pages/viewpage.action?pageId=100794983)", 257 ) 258 } 259 ) 260 ) 261 case Stage(identifier=StageIdentifier.StochGeneration): 262 return realisation | { 263 AnnotatedPath( 264 event_directory / "realisation.srf", 265 "Slip model of source (Section: SRF Format).", 266 ) 267 } 268 case Stage(identifier=StageIdentifier.HighFrequency): 269 return realisation | { 270 AnnotatedPath( 271 event_directory / "realisation.stoch", 272 "Downsampled SRF for stochastic source model input (Section: Stoch format).", 273 ), 274 station_ll, 275 s_wave, 276 } 277 case Stage(identifier=StageIdentifier.Broadband): 278 return realisation | { 279 station_ll, 280 AnnotatedPath( 281 event_directory / "LF", "Low-frequency simulation directory." 282 ), 283 AnnotatedPath( 284 event_directory / "realisation.hf", 285 "High-frequency waveform file (Section: LF/HF/BB binary format).", 286 ), 287 s_wave, 288 } 289 case Stage(identifier=StageIdentifier.IntensityMeasureCalculation): 290 return realisation | { 291 AnnotatedPath( 292 event_directory / "realisation.bb", 293 "Broadband waveform file (Section: LF/HF/BB binary format).", 294 ) 295 } 296 case Stage(identifier=StageIdentifier.PlotTimeslices): 297 return { 298 AnnotatedPath( 299 event_directory / "LF" / "OutBin" / "output.e3d", 300 "Merged xyts-slices of low-frequency simulation (Section: XYTS.e3d binary format).", 301 ) 302 } 303 case Stage(identifier=StageIdentifier.MergeTimeslices): 304 return { 305 AnnotatedPath( 306 event_directory / "LF" / "OutBin", 307 "Component xyts-slices of low-frequency simulation, one per compute node (Section: XYTS.e3d binary format).", 308 ) 309 } 310 return set() 311 312 313def stage_config_inputs(stage: Stage) -> set[str]: 314 """Get the realisation configuration inputs for a given stage. 315 316 Parameters 317 ---------- 318 stage : Stage 319 The stage to get inputs for. 320 321 322 Returns 323 ------- 324 set[str] 325 The input config sections for this stage. 326 """ 327 input_dictionary = { 328 StageIdentifier.EMOD3DParameters: { 329 realisations.DomainParameters._config_key, 330 realisations.VelocityModelParameters._config_key, 331 realisations.RealisationMetadata._config_key, 332 }, 333 StageIdentifier.Broadband: { 334 realisations.RealisationMetadata._config_key, 335 realisations.DomainParameters._config_key, 336 }, 337 StageIdentifier.StationSelection: {realisations.DomainParameters._config_key}, 338 StageIdentifier.VelocityModelGeneration: { 339 realisations.DomainParameters._config_key, 340 realisations.RealisationMetadata._config_key, 341 }, 342 StageIdentifier.HighFrequency: { 343 realisations.DomainParameters._config_key, 344 realisations.RealisationMetadata._config_key, 345 }, 346 StageIdentifier.IntensityMeasureCalculation: { 347 realisations.RealisationMetadata._config_key, 348 realisations.BroadbandParameters._config_key, 349 }, 350 StageIdentifier.DomainGeneration: { 351 realisations.RealisationMetadata._config_key, 352 realisations.RupturePropagationConfig._config_key, 353 realisations.SourceConfig._config_key, 354 }, 355 StageIdentifier.SRFGeneration: { 356 realisations.RealisationMetadata._config_key, 357 realisations.SourceConfig._config_key, 358 realisations.RupturePropagationConfig._config_key, 359 }, 360 StageIdentifier.ModelCoordinates: {realisations.DomainParameters._config_key}, 361 StageIdentifier.StochGeneration: {realisations.RealisationMetadata._config_key}, 362 StageIdentifier.StationSelection: {realisations.DomainParameters._config_key}, 363 } 364 return input_dictionary.get(stage.identifier, set()) 365 366 367def stage_config_outputs(stage: Stage) -> set[str]: 368 """Get the realisation configuration outputs for a given stage. 369 370 Parameters 371 ---------- 372 stage : Stage 373 The stage to get outputs for. 374 375 376 Returns 377 ------- 378 set[str] 379 The output config sections for this stage. 380 """ 381 output_dictionary = { 382 StageIdentifier.NSHMToRealisation: { 383 realisations.SourceConfig._config_key, 384 realisations.RupturePropagationConfig._config_key, 385 realisations.RealisationMetadata._config_key, 386 }, 387 StageIdentifier.GCMTToRealisation: { 388 realisations.SourceConfig._config_key, 389 realisations.RupturePropagationConfig._config_key, 390 realisations.RealisationMetadata._config_key, 391 }, 392 StageIdentifier.EMOD3DParameters: {realisations.EMOD3DParameters._config_key}, 393 StageIdentifier.Broadband: {realisations.BroadbandParameters._config_key}, 394 StageIdentifier.VelocityModelGeneration: { 395 realisations.VelocityModelParameters._config_key 396 }, 397 StageIdentifier.HighFrequency: {realisations.HFConfig._config_key}, 398 StageIdentifier.IntensityMeasureCalculation: { 399 realisations.IntensityMeasureCalcuationParameters._config_key 400 }, 401 StageIdentifier.CopyDomainParameters: { 402 realisations.VelocityModelParameters._config_key, 403 realisations.DomainParameters._config_key, 404 }, 405 StageIdentifier.DomainGeneration: { 406 realisations.VelocityModelParameters._config_key, 407 realisations.DomainParameters._config_key, 408 }, 409 StageIdentifier.SRFGeneration: {realisations.SRFConfig._config_key}, 410 StageIdentifier.StochGeneration: {realisations.HFConfig._config_key}, 411 } 412 return output_dictionary.get(stage.identifier, set()) 413 414 415def realisation_configuration_requirements( 416 workflow_plan: nx.DiGraph, realisation: tuple[str, Optional[int]] 417) -> set[str]: 418 """Get the requirements for the realisation.json in this stage. 419 420 Parameters 421 ---------- 422 workflow_plan : nx.DiGraph 423 The overall workflow plane. 424 realisation : tuple[str, Optional[int]] 425 The realisation to retrieve requirements for. 426 427 Returns 428 ------- 429 set[str] 430 The realisation configuration sections that must be present 431 for the workflow to run correctly. 432 """ 433 stages = [ 434 stage 435 for stage in workflow_plan.nodes 436 if (stage.event, stage.sample) == realisation 437 ] 438 if stages: 439 return set.union(*[stage_config_inputs(stage) for stage in stages]) - set.union( 440 *[stage_config_outputs(stage) for stage in stages] 441 ) 442 else: 443 return set() 444 445 446def stage_outputs(stage: Stage, root: Path) -> set[AnnotatedPath]: 447 """Return a list of stage inputs for the stage. 448 449 Parameters 450 ---------- 451 stage : Stage 452 The stage to get inputs for. 453 root : Path 454 The root directory of the simulation. 455 456 457 Returns 458 ------- 459 set[Path] 460 A set of input paths required by the stage. 461 """ 462 realisation_identifier = stage.event 463 if stage.sample: 464 realisation_identifier += f"_{stage.sample}" 465 event_directory = root / realisation_identifier 466 realisation = { 467 AnnotatedPath( 468 event_directory / "realisation.json", "Realisation file for event." 469 ) 470 } 471 match stage: 472 case Stage( 473 identifier=StageIdentifier.NSHMToRealisation 474 | StageIdentifier.GCMTToRealisation 475 ): 476 return realisation 477 case Stage(identifier=StageIdentifier.SRFGeneration): 478 return { 479 AnnotatedPath( 480 event_directory / "realisation.srf", 481 "Slip model of source.", 482 ) 483 } 484 case Stage(identifier=StageIdentifier.ModelCoordinates): 485 return { 486 AnnotatedPath( 487 event_directory / "model" / "model_params", 488 "Model centre and corners for EMOD3D.", 489 ), 490 AnnotatedPath( 491 event_directory / "model" / "grid_file", 492 "Domain extents for EMOD3D.", 493 ), 494 } 495 case Stage(identifier=StageIdentifier.StationSelection): 496 return { 497 AnnotatedPath( 498 event_directory / "stations" / "stations.ll", 499 "lat,lon coordinates corresponding to x,y coordinates of stations in domain.", 500 ), 501 AnnotatedPath( 502 event_directory / "stations" / "stations.statcords", 503 "x,y coordinates of stations in domain.", 504 ), 505 } 506 case Stage(identifier=StageIdentifier.VelocityModelGeneration): 507 return { 508 AnnotatedPath( 509 event_directory / "Velocity_Model" / "rho3dfile.d", 510 "Density component file for velocity model.", 511 ), 512 AnnotatedPath( 513 event_directory / "Velocity_Model" / "vp3dfile.p", 514 "p-wave velocity file for velocity model.", 515 ), 516 AnnotatedPath( 517 event_directory / "Velocity_Model" / "vs3dfile.s", 518 "s-wave velocity file for velocity model.", 519 ), 520 AnnotatedPath( 521 event_directory / "Velocity_Model" / "in_basin_mask.b", 522 "Boolean basin mask.", 523 ), 524 } 525 case Stage(identifier=StageIdentifier.EMOD3DParameters): 526 return { 527 AnnotatedPath( 528 event_directory / "LF", "Low-frequency simulation directory." 529 ), 530 AnnotatedPath(event_directory / "LF" / "e3d.par", "EMOD3D Parameters"), 531 } 532 case Stage(identifier=StageIdentifier.LowFrequency): 533 return { 534 AnnotatedPath( 535 event_directory / "LF", "Low-frequency simulation directory." 536 ) 537 } 538 case Stage(identifier=StageIdentifier.StochGeneration): 539 return { 540 AnnotatedPath( 541 event_directory / "realisation.stoch", 542 "Downsampled SRF for stochastic source model input.", 543 ) 544 } 545 case Stage(identifier=StageIdentifier.HighFrequency): 546 return { 547 AnnotatedPath( 548 event_directory / "realisation.hf", "High-frequency waveform file." 549 ) 550 } 551 case Stage(identifier=StageIdentifier.Broadband): 552 return { 553 AnnotatedPath( 554 event_directory / "realisation.bb", "Broadband waveform file." 555 ) 556 } 557 case Stage(identifier=StageIdentifier.IntensityMeasureCalculation): 558 return { 559 AnnotatedPath( 560 event_directory / "ims.parquet", "Intensity measure statistics." 561 ) 562 } 563 case Stage(identifier=StageIdentifier.PlotTimeslices): 564 return { 565 AnnotatedPath( 566 event_directory / "animation.mp4", 567 "Animation of low-frequency waveform.", 568 ) 569 } 570 case Stage(identifier=StageIdentifier.MergeTimeslices): 571 return { 572 AnnotatedPath( 573 event_directory / "LF" / "OutBin" / "output.e3d", 574 "Merged xyts-slices of low-frequency simulation.", 575 ) 576 } 577 case _: 578 return set() 579 580 581def realisation_workflow(event: str, sample: Optional[int]) -> nx.DiGraph: 582 """Add a realisation to a workflow plan. 583 584 Adds all stages for the realisation to run, and links to event 585 stages for shared resources (i.e. the velocity model). 586 587 Parameters 588 ---------- 589 workflow_plan : nx.DiGraph 590 The current workflow paln. 591 event : str 592 The event to add. 593 sample : Optional[int] 594 The sample number (or None, if the original event). 595 """ 596 workflow_plan = nx.from_dict_of_lists( 597 { 598 Stage(StageIdentifier.NSHMToRealisation, event, sample): [ 599 Stage(StageIdentifier.SRFGeneration, event, sample), 600 ], 601 Stage(StageIdentifier.GCMTToRealisation, event, sample): [ 602 Stage(StageIdentifier.SRFGeneration, event, sample) 603 ], 604 Stage(StageIdentifier.SRFGeneration, event, sample): [ 605 Stage(StageIdentifier.CheckSRF, event, sample) 606 ], 607 Stage(StageIdentifier.CheckSRF, event, sample): [ 608 Stage(StageIdentifier.StochGeneration, event, sample), 609 Stage(StageIdentifier.EMOD3DParameters, event, sample), 610 ], 611 Stage(StageIdentifier.VelocityModelGeneration, event, None): [ 612 Stage(StageIdentifier.EMOD3DParameters, event, sample), 613 Stage(StageIdentifier.HighFrequency, event, sample), 614 ], 615 Stage(StageIdentifier.StationSelection, event, None): [ 616 Stage(StageIdentifier.EMOD3DParameters, event, sample), 617 Stage(StageIdentifier.HighFrequency, event, sample), 618 ], 619 Stage(StageIdentifier.ModelCoordinates, event, None): [ 620 Stage(StageIdentifier.EMOD3DParameters, event, sample) 621 ], 622 Stage(StageIdentifier.EMOD3DParameters, event, sample): [ 623 Stage(StageIdentifier.CheckDomain, event, sample) 624 ], 625 Stage(StageIdentifier.CheckDomain, event, sample): [ 626 Stage(StageIdentifier.LowFrequency, event, sample) 627 ], 628 Stage(StageIdentifier.LowFrequency, event, sample): [ 629 Stage(StageIdentifier.Broadband, event, sample), 630 Stage(StageIdentifier.MergeTimeslices, event, sample), 631 ], 632 Stage(StageIdentifier.StochGeneration, event, sample): [ 633 Stage(StageIdentifier.HighFrequency, event, sample) 634 ], 635 Stage(StageIdentifier.HighFrequency, event, sample): [ 636 Stage(StageIdentifier.Broadband, event, sample) 637 ], 638 Stage(StageIdentifier.Broadband, event, sample): [ 639 Stage(StageIdentifier.IntensityMeasureCalculation, event, sample) 640 ], 641 Stage(StageIdentifier.MergeTimeslices, event, sample): [ 642 Stage(StageIdentifier.PlotTimeslices, event, sample) 643 ], 644 }, 645 create_using=nx.DiGraph, 646 ) 647 if not sample: 648 workflow_plan.add_edges_from( 649 [ 650 ( 651 Stage(StageIdentifier.NSHMToRealisation, event, sample), 652 Stage(StageIdentifier.DomainGeneration, event, sample), 653 ), 654 ( 655 Stage(StageIdentifier.GCMTToRealisation, event, sample), 656 Stage(StageIdentifier.DomainGeneration, event, sample), 657 ), 658 ( 659 Stage(StageIdentifier.DomainGeneration, event, sample), 660 Stage(StageIdentifier.EMOD3DParameters, event, sample), 661 ), 662 ( 663 Stage(StageIdentifier.DomainGeneration, event, sample), 664 Stage(StageIdentifier.VelocityModelGeneration, event, sample), 665 ), 666 ( 667 Stage(StageIdentifier.DomainGeneration, event, sample), 668 Stage(StageIdentifier.StationSelection, event, sample), 669 ), 670 ( 671 Stage(StageIdentifier.DomainGeneration, event, sample), 672 Stage(StageIdentifier.ModelCoordinates, event, sample), 673 ), 674 ] 675 ) 676 else: 677 workflow_plan.add_edges_from( 678 [ 679 ( 680 Stage(StageIdentifier.DomainGeneration, event, None), 681 Stage(StageIdentifier.CopyDomainParameters, event, sample), 682 ), 683 ( 684 Stage(StageIdentifier.CopyDomainParameters, event, sample), 685 Stage(StageIdentifier.EMOD3DParameters, event, sample), 686 ), 687 ( 688 Stage(StageIdentifier.CopyDomainParameters, event, sample), 689 Stage(StageIdentifier.HighFrequency, event, sample), 690 ), 691 ] 692 ) 693 694 return workflow_plan 695 696 697def create_abstract_workflow_plan( 698 realisations: set[tuple[str, Optional[int]]], 699 goals: Iterable[StageIdentifier], 700 excluding: Iterable[StageIdentifier], 701) -> nx.DiGraph: 702 """Create an abstract workflow graph from a list of goals and excluded stages. 703 704 Parameters 705 ---------- 706 goals : Iterable[StageIdentifier] 707 The goal stages for the workflow. 708 excluding : Iterable[StageIdentifier] 709 The excluded stages for the workflow. 710 711 Returns 712 ------- 713 nx.DiGraph 714 A abstract workflow plan. This workflow plan contains only 715 included stages that are required to reach the goals. If two 716 workflow stages depend on each other only through paths 717 consisting entirely of excluded nodes, then they are adjacent 718 directly in the abstract plan by edges. 719 """ 720 721 excluding_stages = { 722 Stage(excluded, *realisation) 723 for excluded in excluding 724 for realisation in realisations 725 } 726 727 output_graph = nx.DiGraph() 728 realisation_iteration = ( 729 realisations if len(realisations) < 100 else tqdm.tqdm(realisations) 730 ) 731 732 for realisation in realisation_iteration: 733 workflow_plan = realisation_workflow(*realisation) 734 workflow_plan = nx.transitive_closure_dag(workflow_plan) 735 736 for goal in goals: 737 reduced_graph = nx.transitive_reduction( 738 workflow_plan.subgraph( 739 ( 740 set(workflow_plan.predecessors(Stage(goal, *realisation))) 741 | {Stage(goal, *realisation)} 742 ) 743 - excluding_stages 744 ) 745 ) 746 output_graph.update( 747 edges=reduced_graph.edges(), nodes=reduced_graph.nodes() 748 ) 749 750 roots = [node for node, degree in output_graph.in_degree() if degree == 0] 751 copy_input_stage = Stage(StageIdentifier.CopyInput, "", None) 752 output_graph.add_node(copy_input_stage) 753 for root in roots: 754 output_graph.add_edge(copy_input_stage, root) 755 return output_graph 756 757 758def stage_to_node_string(stage: Stage) -> str: 759 r"""Convert a `Stage` into a human readable node identifier string. 760 761 Parameters 762 ---------- 763 stage : Stage 764 The stage to render. 765 766 Returns 767 ------- 768 str 769 A string of the format 770 {stage.identifier}\n{stage.event}_{stage.sample}, if event and 771 sample are non-trivial. 772 """ 773 node_string = str(stage.identifier) 774 if stage.event: 775 node_string += f"\n{stage.event}" 776 if stage.sample: 777 node_string += f"_{stage.sample}" 778 return node_string 779 780 781def pyvis_graph(workflow_plan: nx.DiGraph) -> Network: 782 """Convert a workflow plan into a pyvis diagram for visualisation. 783 784 Parameters 785 ---------- 786 workflow_plan : nx.DiGraph 787 The workflow plan to visualise. 788 789 790 Returns 791 ------- 792 Network 793 A pyvis rendering for this workflow plan. 794 """ 795 network = Network( 796 width="100%", height="1500px", directed=True, layout="hierarchical" 797 ) 798 network.show_buttons(filter_=["physics"]) 799 roots = [node for node, degree in workflow_plan.in_degree() if degree == 0] 800 reversed_workflow = workflow_plan.reverse() 801 stage: Stage 802 for stage in workflow_plan.nodes(): 803 network.add_node( 804 stage_to_node_string(stage), 805 group=f"{stage.event}_{stage.sample or ''}", 806 size=20, 807 level=max( 808 ( 809 len(path) - 1 810 for root in roots 811 for path in nx.all_simple_paths(reversed_workflow, stage, root) 812 ), 813 default=0, 814 ), 815 ) 816 for stage, next_stage in workflow_plan.edges(): 817 network.add_edge(stage_to_node_string(stage), stage_to_node_string(next_stage)) 818 return network 819 820 821REALISATION_ITERATION_RE = r"_rel\d+$" 822 823 824def parse_realisation(realisation_id: str) -> set[tuple[str, Optional[int]]]: 825 """Parse a realisation identifier string from the command line into a realisation identifier. 826 827 Parameters 828 ---------- 829 realisation_id : str 830 The realisation identifier string to parse. 831 832 Returns 833 ------- 834 tuple[str, Optional[int]] 835 The parsed realisation event and sample number. 836 """ 837 try: 838 index = realisation_id.rindex(":") 839 event, num_samples = realisation_id[:index], realisation_id[index + 1 :] 840 841 return {(event, sample or None) for sample in range(int(num_samples))} 842 except ValueError: 843 return {(realisation_id, None)} 844 845 846def build_filetree(files: set[AnnotatedPath]) -> dict[str, Any]: 847 """Build a file tree from a set of annotated file paths. 848 849 Parameters 850 ---------- 851 files : set[AnnotatedPath] 852 The set of files to construct a tree for. 853 854 855 Returns 856 ------- 857 dict[str, Any] 858 A file tree. 859 """ 860 filetree: dict[str, Any] = {} 861 for file in sorted(files): 862 cur = filetree 863 for part in file.parts[:-1]: 864 if part not in cur: 865 cur[part] = {} 866 cur = cur[part] 867 if not cur.get(file.parts[-1]): 868 cur[file.parts[-1]] = file.description 869 return filetree 870 871 872@app.command( 873 help="Plan and generate a Cylc workflow file for a number of realisations." 874) 875def plan_workflow( 876 realisation_ids: Annotated[ 877 list[str], 878 typer.Argument( 879 help="List of realisations to generate workflows for. Realisations have the format event:realisation_count, such as Darfield:4." 880 ), 881 ], 882 flow_file: Annotated[ 883 Path, 884 typer.Argument( 885 help="Path to output flow file (e.g. ~/cylc-src/my-workflow/flow.cylc)", 886 writable=True, 887 dir_okay=False, 888 ), 889 ], 890 goal: Annotated[ 891 list[StageIdentifier], 892 typer.Option( 893 help="List of workflow outputs to generate", 894 default_factory=lambda: [], 895 rich_help_panel='Planning Workflows' 896 ), 897 ], 898 group_goal: Annotated[ 899 list[GroupIdentifier], 900 typer.Option( 901 help="List of group goals to generate", default_factory=lambda: [], rich_help_panel='Planning Workflows' 902 ), 903 ], 904 excluding: Annotated[ 905 list[StageIdentifier], 906 typer.Option(help="List of stages to exclude", default_factory=lambda: [], rich_help_panel='Planning Workflows'), 907 ], 908 excluding_group: Annotated[ 909 list[GroupIdentifier], 910 typer.Option( 911 help="List of stage groups to exclude", default_factory=lambda: [], rich_help_panel='Planning Workflows' 912 ), 913 ], 914 visualise: Annotated[ 915 bool, typer.Option(help="Visualise the planned workflow as a graph", rich_help_panel='Visualising Workflows') 916 ] = False, 917 show_required_files: Annotated[ 918 bool, 919 typer.Option( 920 help="Print the expected directory tree at the start of the simulation.", rich_help_panel='Visualising Workflows' 921 ), 922 ] = True, 923 target_host: Annotated[ 924 WorkflowTarget, 925 typer.Option(help="Select the target host where the workflow will be run", rich_help_panel='Planning Workflows'), 926 ] = WorkflowTarget.NeSI, 927 source: Annotated[ 928 Optional[Source], 929 typer.Option( 930 help="If given, set the source of the realisation. For NSHM and GCMT, the realisation id corresponds to the rupture id and GCMT PublicID respectively.", rich_help_panel='Sources' 931 ), 932 ] = None, 933 defaults_version: Annotated[ 934 Optional[DefaultsVersion], 935 typer.Option( 936 help="The simulation defaults to apply for all realisations. Required if source is specified.", rich_help_panel='Sources' 937 ), 938 ] = None, 939): 940 """Plan and generate a Cylc workflow file for a number of realisations. 941 942 Parameters 943 ---------- 944 realisations : list[str] 945 The list of realisations to generate the workflow for. 946 flow_file : Path 947 The output flow file path to write the Cylc workflow to. 948 goal : list[StageIdentifier] 949 A list of workflow stages to mark as goals. These stages are 950 define the endpoints for the workflow. 951 group_goal : list[GroupIdentifier] 952 A list of workflow groups to target. A workflow group is just 953 an alias for a set of workflow stages. Equivalent to adding 954 each group member to `goal`. 955 excluding : list[StageIdentifier] 956 A list of workflow stages to exclude from the flows. 957 group_goal : list[GroupIdentifier] 958 A list of workflow groups to exclude. A workflow group is just 959 an alias for a set of workflow stages. Equivalent to adding 960 each group member to `excluding`. 961 """ 962 realisations = set.union( 963 *[parse_realisation(realisation_id) for realisation_id in realisation_ids] 964 ) 965 if source and not defaults_version: 966 print( 967 "You must specify a defaults version if you specify a source. See the help text for options." 968 ) 969 raise typer.Exit(code=1) 970 excluding_set = set(excluding) 971 goal_set = set(goal) 972 if group_goal: 973 goal_set |= set.union(*[GROUP_GOALS[group] for group in group_goal]) 974 if excluding_group: 975 excluding_set |= set.union(*[GROUP_STAGES[group] for group in excluding_group]) 976 977 excluding_source_map: dict[Optional[Source], set[StageIdentifier]] = { 978 Source.GCMT: {StageIdentifier.GCMTToRealisation}, 979 Source.NSHM: {StageIdentifier.NSHMToRealisation}, 980 } 981 excluding_set |= set.union( 982 *excluding_source_map.values() 983 ) - excluding_source_map.get(source, set()) 984 workflow_plan = create_abstract_workflow_plan(realisations, goal_set, excluding_set) 985 env = jinja2.Environment( 986 loader=jinja2.PackageLoader("workflow"), 987 ) 988 template = env.get_template("flow.cylc") 989 flow_template = template.render( 990 defaults_version=defaults_version, 991 realisations=realisations, 992 target_host=target_host, 993 workflow_plan={ 994 node: sorted(dependents, key=lambda stage: stage_to_node_string(stage)) 995 for node, dependents in sorted( 996 nx.to_dict_of_lists(workflow_plan).items(), 997 key=lambda kv: stage_to_node_string(kv[0]), 998 ) 999 }, 1000 ) 1001 flow_file.write_text( 1002 # strip empty lines from the output flow template 1003 "\n".join(line for line in flow_template.split("\n") if line.strip()) 1004 ) 1005 if show_required_files: 1006 root_path = Path("cylc-src") / "WORKFLOW_NAME" / "input" 1007 inputs = { 1008 AnnotatedPath( 1009 Path("cylc-src") / "WORKFLOW_NAME" / "flow.cylc", 1010 f"Your workflow file (the file {flow_file}).", 1011 ) 1012 } 1013 outputs = set() 1014 required_realisation_sections = { 1015 realisation: realisation_configuration_requirements( 1016 workflow_plan, realisation 1017 ) 1018 for realisation in realisations 1019 } 1020 1021 for stage in workflow_plan.nodes: 1022 inputs |= stage_inputs(stage, root_path, required_realisation_sections) 1023 outputs |= stage_outputs(stage, root_path) 1024 1025 missing_file_tree = build_filetree(inputs - outputs) 1026 1027 if missing_file_tree: 1028 print("You require the following files for your simulation:") 1029 print() 1030 printree.ptree(missing_file_tree) 1031 print() 1032 print( 1033 "Refer to the indicated sections in https://wiki.canterbury.ac.nz/display/QuakeCore/File+Formats+Used+In+Ground+Motion+Simulation" 1034 ) 1035 if any(required_realisation_sections.values()): 1036 print( 1037 "Refer to the realisation glossary at URL HERE for details on filling in the realisation files." 1038 ) 1039 if visualise: 1040 network = pyvis_graph(workflow_plan) 1041 with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as graph_render: 1042 network.show(graph_render.name, notebook=False)
27class WorkflowTarget(StrEnum): 28 """Enumeration of possible workflow targets.""" 29 30 NeSI = "nesi" 31 Hypocentre = "hypocentre"
Enumeration of possible workflow targets.
34class StageIdentifier(StrEnum): 35 """Valid stage identifier in the workflow plan.""" 36 37 CopyInput = "copy_input" 38 GCMTToRealisation = "gcmt_to_realisation" 39 DomainGeneration = "generate_velocity_model_parameters" 40 VelocityModelGeneration = "generate_velocity_model" 41 StationSelection = "generate_station_coordinates" 42 ModelCoordinates = "write_model_coordinates" 43 SRFGeneration = "realisation_to_srf" 44 CheckSRF = "check_srf" 45 CopyDomainParameters = "copy_domain_parameters" 46 EMOD3DParameters = "create_e3d_par" 47 CheckDomain = "check_domain" 48 StochGeneration = "generate_stoch" 49 HighFrequency = "hf_sim" 50 LowFrequency = "emod3d" 51 Broadband = "bb_sim" 52 IntensityMeasureCalculation = "im_calc" 53 PlotTimeslices = "plot_ts" 54 MergeTimeslices = "merge_ts" 55 NSHMToRealisation = "nshm_to_realisation"
Valid stage identifier in the workflow plan.
Realisation source options.
65class GroupIdentifier(StrEnum): 66 """Group identifiers to use to bulk target or exclude in workflow planning.""" 67 68 Preprocessing = "preprocessing" 69 """Alias for all preprocessing stages.""" 70 HighFrequency = "high_frequency" 71 """Alias for the high frequency workflow.""" 72 LowFrequency = "low_frequency" 73 """Alias for the low frequency workflow.""" 74 Domain = "domain"
Group identifiers to use to bulk target or exclude in workflow planning.
Alias for all preprocessing stages.
Alias for the high frequency workflow.
Alias for the low frequency workflow.
119class Stage(NamedTuple): 120 """Representation of a workflow stage in the output Cylc file.""" 121 122 identifier: StageIdentifier 123 """The stage identifier.""" 124 event: str 125 """The event the stage is running for.""" 126 sample: Optional[int] 127 """The sample number of the realisation."""
Representation of a workflow stage in the output Cylc file.
Create new instance of Stage(identifier, event, sample)
130class AnnotatedPath(PurePath): 131 """Pure path annotated with description of the file.""" 132 133 def __init__(self, path: str | Path, description: Optional[str] = None): 134 """Create an annotated path. 135 136 Parameters 137 ---------- 138 path : str | Path 139 The path to annotate. 140 description : Optional[str] 141 The description of the path. 142 """ 143 super().__init__(path) 144 self.path = path 145 self.description = description 146 147 def __hash__(self): 148 """Construct a unique hash for Annotated Paths.""" 149 if super().name == "realisation.json": 150 # Realisations should also have their annotations hashed. 151 return hash((self.path, self.description)) 152 else: 153 return super().__hash__()
Pure path annotated with description of the file.
133 def __init__(self, path: str | Path, description: Optional[str] = None): 134 """Create an annotated path. 135 136 Parameters 137 ---------- 138 path : str | Path 139 The path to annotate. 140 description : Optional[str] 141 The description of the path. 142 """ 143 super().__init__(path) 144 self.path = path 145 self.description = description
Create an annotated path.
Parameters
- path (str | Path): The path to annotate.
- description (Optional[str]): The description of the path.
156def stage_inputs( 157 stage: Stage, 158 root: Path, 159 required_realisation_sections: dict[tuple[str, Optional[int]], set[str]], 160) -> set[AnnotatedPath]: 161 """Return a list of stage inputs for the stage. 162 163 Parameters 164 ---------- 165 stage : Stage 166 The stage to get inputs for. 167 root : Path 168 The root directory of the simulation. 169 170 171 Returns 172 ------- 173 set[Path] 174 A set of input paths required by the stage. 175 """ 176 realisation_identifier = stage.event 177 if stage.sample: 178 realisation_identifier += f"_{stage.sample}" 179 parent_directory = root / stage.event 180 event_directory = root / realisation_identifier 181 realisation_requirements = sorted( 182 required_realisation_sections.get((stage.event, stage.sample), set()) 183 ) 184 realisation = { 185 AnnotatedPath( 186 event_directory / "realisation.json", 187 f'Realisation file for event containing: {', '.join(realisation_requirements)}.', 188 ) 189 } 190 station_ll = AnnotatedPath( 191 parent_directory / "stations" / "stations.ll", 192 "lat,lon coordinates corresponding to x,y coordinates of stations in domain.", 193 ) 194 s_wave = AnnotatedPath( 195 parent_directory / "Velocity_Model" / "vs3dfile.s", 196 "s-wave velocity file for velocity model (Section: Velocity Model Files).", 197 ) 198 match stage: 199 case Stage( 200 identifier=StageIdentifier.SRFGeneration 201 | StageIdentifier.VelocityModelGeneration 202 | StageIdentifier.StationSelection 203 | StageIdentifier.ModelCoordinates 204 ): 205 return realisation 206 case Stage(identifier=StageIdentifier.CopyDomainParameters): 207 return { 208 AnnotatedPath( 209 parent_directory / "realisation.json", 210 f"Realisation file for event containing: {', '.join(required_realisation_sections.get((stage.event, None), set()))}.", 211 ) 212 } 213 case Stage( 214 identifier=StageIdentifier.EMOD3DParameters 215 | StageIdentifier.LowFrequency, 216 ): 217 return ( 218 realisation 219 | { 220 station_ll, 221 AnnotatedPath( 222 parent_directory / "stations" / "stations.statcords", 223 "x,y coordinates of stations in domain.", 224 ), 225 AnnotatedPath( 226 parent_directory / "model" / "model_params", 227 "Model centre and corners for EMOD3D.", 228 ), 229 AnnotatedPath( 230 parent_directory / "model" / "grid_file", 231 "Domain extents for EMOD3D.", 232 ), 233 AnnotatedPath( 234 parent_directory / "Velocity_Model" / "rho3dfile.d", 235 "Density component file for velocity model. (Section: Velocity Model Files)", 236 ), 237 AnnotatedPath( 238 parent_directory / "Velocity_Model" / "vp3dfile.p", 239 "p-wave velocity file for velocity model. (Section: Velocity Model Files)", 240 ), 241 s_wave, 242 AnnotatedPath( 243 parent_directory / "Velocity_Model" / "in_basin_mask.b", 244 "Boolean basin mask (Section: Velocity Model Files).", 245 ), 246 AnnotatedPath( 247 event_directory / "realisation.srf", 248 "Slip model of source (Section: SRF Format).", 249 ), 250 } 251 | ( 252 set() 253 if stage.identifier == StageIdentifier.EMOD3DParameters 254 else { 255 AnnotatedPath( 256 event_directory / "LF" / "e3d.par", 257 "EMOD3D Parameters (Section: https://wiki.canterbury.ac.nz/pages/viewpage.action?pageId=100794983)", 258 ) 259 } 260 ) 261 ) 262 case Stage(identifier=StageIdentifier.StochGeneration): 263 return realisation | { 264 AnnotatedPath( 265 event_directory / "realisation.srf", 266 "Slip model of source (Section: SRF Format).", 267 ) 268 } 269 case Stage(identifier=StageIdentifier.HighFrequency): 270 return realisation | { 271 AnnotatedPath( 272 event_directory / "realisation.stoch", 273 "Downsampled SRF for stochastic source model input (Section: Stoch format).", 274 ), 275 station_ll, 276 s_wave, 277 } 278 case Stage(identifier=StageIdentifier.Broadband): 279 return realisation | { 280 station_ll, 281 AnnotatedPath( 282 event_directory / "LF", "Low-frequency simulation directory." 283 ), 284 AnnotatedPath( 285 event_directory / "realisation.hf", 286 "High-frequency waveform file (Section: LF/HF/BB binary format).", 287 ), 288 s_wave, 289 } 290 case Stage(identifier=StageIdentifier.IntensityMeasureCalculation): 291 return realisation | { 292 AnnotatedPath( 293 event_directory / "realisation.bb", 294 "Broadband waveform file (Section: LF/HF/BB binary format).", 295 ) 296 } 297 case Stage(identifier=StageIdentifier.PlotTimeslices): 298 return { 299 AnnotatedPath( 300 event_directory / "LF" / "OutBin" / "output.e3d", 301 "Merged xyts-slices of low-frequency simulation (Section: XYTS.e3d binary format).", 302 ) 303 } 304 case Stage(identifier=StageIdentifier.MergeTimeslices): 305 return { 306 AnnotatedPath( 307 event_directory / "LF" / "OutBin", 308 "Component xyts-slices of low-frequency simulation, one per compute node (Section: XYTS.e3d binary format).", 309 ) 310 } 311 return set()
Return a list of stage inputs for the stage.
Parameters
- stage (Stage): The stage to get inputs for.
- root (Path): The root directory of the simulation.
Returns
- set[Path]: A set of input paths required by the stage.
314def stage_config_inputs(stage: Stage) -> set[str]: 315 """Get the realisation configuration inputs for a given stage. 316 317 Parameters 318 ---------- 319 stage : Stage 320 The stage to get inputs for. 321 322 323 Returns 324 ------- 325 set[str] 326 The input config sections for this stage. 327 """ 328 input_dictionary = { 329 StageIdentifier.EMOD3DParameters: { 330 realisations.DomainParameters._config_key, 331 realisations.VelocityModelParameters._config_key, 332 realisations.RealisationMetadata._config_key, 333 }, 334 StageIdentifier.Broadband: { 335 realisations.RealisationMetadata._config_key, 336 realisations.DomainParameters._config_key, 337 }, 338 StageIdentifier.StationSelection: {realisations.DomainParameters._config_key}, 339 StageIdentifier.VelocityModelGeneration: { 340 realisations.DomainParameters._config_key, 341 realisations.RealisationMetadata._config_key, 342 }, 343 StageIdentifier.HighFrequency: { 344 realisations.DomainParameters._config_key, 345 realisations.RealisationMetadata._config_key, 346 }, 347 StageIdentifier.IntensityMeasureCalculation: { 348 realisations.RealisationMetadata._config_key, 349 realisations.BroadbandParameters._config_key, 350 }, 351 StageIdentifier.DomainGeneration: { 352 realisations.RealisationMetadata._config_key, 353 realisations.RupturePropagationConfig._config_key, 354 realisations.SourceConfig._config_key, 355 }, 356 StageIdentifier.SRFGeneration: { 357 realisations.RealisationMetadata._config_key, 358 realisations.SourceConfig._config_key, 359 realisations.RupturePropagationConfig._config_key, 360 }, 361 StageIdentifier.ModelCoordinates: {realisations.DomainParameters._config_key}, 362 StageIdentifier.StochGeneration: {realisations.RealisationMetadata._config_key}, 363 StageIdentifier.StationSelection: {realisations.DomainParameters._config_key}, 364 } 365 return input_dictionary.get(stage.identifier, set())
Get the realisation configuration inputs for a given stage.
Parameters
- stage (Stage): The stage to get inputs for.
Returns
- set[str]: The input config sections for this stage.
368def stage_config_outputs(stage: Stage) -> set[str]: 369 """Get the realisation configuration outputs for a given stage. 370 371 Parameters 372 ---------- 373 stage : Stage 374 The stage to get outputs for. 375 376 377 Returns 378 ------- 379 set[str] 380 The output config sections for this stage. 381 """ 382 output_dictionary = { 383 StageIdentifier.NSHMToRealisation: { 384 realisations.SourceConfig._config_key, 385 realisations.RupturePropagationConfig._config_key, 386 realisations.RealisationMetadata._config_key, 387 }, 388 StageIdentifier.GCMTToRealisation: { 389 realisations.SourceConfig._config_key, 390 realisations.RupturePropagationConfig._config_key, 391 realisations.RealisationMetadata._config_key, 392 }, 393 StageIdentifier.EMOD3DParameters: {realisations.EMOD3DParameters._config_key}, 394 StageIdentifier.Broadband: {realisations.BroadbandParameters._config_key}, 395 StageIdentifier.VelocityModelGeneration: { 396 realisations.VelocityModelParameters._config_key 397 }, 398 StageIdentifier.HighFrequency: {realisations.HFConfig._config_key}, 399 StageIdentifier.IntensityMeasureCalculation: { 400 realisations.IntensityMeasureCalcuationParameters._config_key 401 }, 402 StageIdentifier.CopyDomainParameters: { 403 realisations.VelocityModelParameters._config_key, 404 realisations.DomainParameters._config_key, 405 }, 406 StageIdentifier.DomainGeneration: { 407 realisations.VelocityModelParameters._config_key, 408 realisations.DomainParameters._config_key, 409 }, 410 StageIdentifier.SRFGeneration: {realisations.SRFConfig._config_key}, 411 StageIdentifier.StochGeneration: {realisations.HFConfig._config_key}, 412 } 413 return output_dictionary.get(stage.identifier, set())
Get the realisation configuration outputs for a given stage.
Parameters
- stage (Stage): The stage to get outputs for.
Returns
- set[str]: The output config sections for this stage.
416def realisation_configuration_requirements( 417 workflow_plan: nx.DiGraph, realisation: tuple[str, Optional[int]] 418) -> set[str]: 419 """Get the requirements for the realisation.json in this stage. 420 421 Parameters 422 ---------- 423 workflow_plan : nx.DiGraph 424 The overall workflow plane. 425 realisation : tuple[str, Optional[int]] 426 The realisation to retrieve requirements for. 427 428 Returns 429 ------- 430 set[str] 431 The realisation configuration sections that must be present 432 for the workflow to run correctly. 433 """ 434 stages = [ 435 stage 436 for stage in workflow_plan.nodes 437 if (stage.event, stage.sample) == realisation 438 ] 439 if stages: 440 return set.union(*[stage_config_inputs(stage) for stage in stages]) - set.union( 441 *[stage_config_outputs(stage) for stage in stages] 442 ) 443 else: 444 return set()
Get the requirements for the realisation.json in this stage.
Parameters
- workflow_plan (nx.DiGraph): The overall workflow plane.
- realisation (tuple[str, Optional[int]]): The realisation to retrieve requirements for.
Returns
- set[str]: The realisation configuration sections that must be present for the workflow to run correctly.
447def stage_outputs(stage: Stage, root: Path) -> set[AnnotatedPath]: 448 """Return a list of stage inputs for the stage. 449 450 Parameters 451 ---------- 452 stage : Stage 453 The stage to get inputs for. 454 root : Path 455 The root directory of the simulation. 456 457 458 Returns 459 ------- 460 set[Path] 461 A set of input paths required by the stage. 462 """ 463 realisation_identifier = stage.event 464 if stage.sample: 465 realisation_identifier += f"_{stage.sample}" 466 event_directory = root / realisation_identifier 467 realisation = { 468 AnnotatedPath( 469 event_directory / "realisation.json", "Realisation file for event." 470 ) 471 } 472 match stage: 473 case Stage( 474 identifier=StageIdentifier.NSHMToRealisation 475 | StageIdentifier.GCMTToRealisation 476 ): 477 return realisation 478 case Stage(identifier=StageIdentifier.SRFGeneration): 479 return { 480 AnnotatedPath( 481 event_directory / "realisation.srf", 482 "Slip model of source.", 483 ) 484 } 485 case Stage(identifier=StageIdentifier.ModelCoordinates): 486 return { 487 AnnotatedPath( 488 event_directory / "model" / "model_params", 489 "Model centre and corners for EMOD3D.", 490 ), 491 AnnotatedPath( 492 event_directory / "model" / "grid_file", 493 "Domain extents for EMOD3D.", 494 ), 495 } 496 case Stage(identifier=StageIdentifier.StationSelection): 497 return { 498 AnnotatedPath( 499 event_directory / "stations" / "stations.ll", 500 "lat,lon coordinates corresponding to x,y coordinates of stations in domain.", 501 ), 502 AnnotatedPath( 503 event_directory / "stations" / "stations.statcords", 504 "x,y coordinates of stations in domain.", 505 ), 506 } 507 case Stage(identifier=StageIdentifier.VelocityModelGeneration): 508 return { 509 AnnotatedPath( 510 event_directory / "Velocity_Model" / "rho3dfile.d", 511 "Density component file for velocity model.", 512 ), 513 AnnotatedPath( 514 event_directory / "Velocity_Model" / "vp3dfile.p", 515 "p-wave velocity file for velocity model.", 516 ), 517 AnnotatedPath( 518 event_directory / "Velocity_Model" / "vs3dfile.s", 519 "s-wave velocity file for velocity model.", 520 ), 521 AnnotatedPath( 522 event_directory / "Velocity_Model" / "in_basin_mask.b", 523 "Boolean basin mask.", 524 ), 525 } 526 case Stage(identifier=StageIdentifier.EMOD3DParameters): 527 return { 528 AnnotatedPath( 529 event_directory / "LF", "Low-frequency simulation directory." 530 ), 531 AnnotatedPath(event_directory / "LF" / "e3d.par", "EMOD3D Parameters"), 532 } 533 case Stage(identifier=StageIdentifier.LowFrequency): 534 return { 535 AnnotatedPath( 536 event_directory / "LF", "Low-frequency simulation directory." 537 ) 538 } 539 case Stage(identifier=StageIdentifier.StochGeneration): 540 return { 541 AnnotatedPath( 542 event_directory / "realisation.stoch", 543 "Downsampled SRF for stochastic source model input.", 544 ) 545 } 546 case Stage(identifier=StageIdentifier.HighFrequency): 547 return { 548 AnnotatedPath( 549 event_directory / "realisation.hf", "High-frequency waveform file." 550 ) 551 } 552 case Stage(identifier=StageIdentifier.Broadband): 553 return { 554 AnnotatedPath( 555 event_directory / "realisation.bb", "Broadband waveform file." 556 ) 557 } 558 case Stage(identifier=StageIdentifier.IntensityMeasureCalculation): 559 return { 560 AnnotatedPath( 561 event_directory / "ims.parquet", "Intensity measure statistics." 562 ) 563 } 564 case Stage(identifier=StageIdentifier.PlotTimeslices): 565 return { 566 AnnotatedPath( 567 event_directory / "animation.mp4", 568 "Animation of low-frequency waveform.", 569 ) 570 } 571 case Stage(identifier=StageIdentifier.MergeTimeslices): 572 return { 573 AnnotatedPath( 574 event_directory / "LF" / "OutBin" / "output.e3d", 575 "Merged xyts-slices of low-frequency simulation.", 576 ) 577 } 578 case _: 579 return set()
Return a list of stage inputs for the stage.
Parameters
- stage (Stage): The stage to get inputs for.
- root (Path): The root directory of the simulation.
Returns
- set[Path]: A set of input paths required by the stage.
582def realisation_workflow(event: str, sample: Optional[int]) -> nx.DiGraph: 583 """Add a realisation to a workflow plan. 584 585 Adds all stages for the realisation to run, and links to event 586 stages for shared resources (i.e. the velocity model). 587 588 Parameters 589 ---------- 590 workflow_plan : nx.DiGraph 591 The current workflow paln. 592 event : str 593 The event to add. 594 sample : Optional[int] 595 The sample number (or None, if the original event). 596 """ 597 workflow_plan = nx.from_dict_of_lists( 598 { 599 Stage(StageIdentifier.NSHMToRealisation, event, sample): [ 600 Stage(StageIdentifier.SRFGeneration, event, sample), 601 ], 602 Stage(StageIdentifier.GCMTToRealisation, event, sample): [ 603 Stage(StageIdentifier.SRFGeneration, event, sample) 604 ], 605 Stage(StageIdentifier.SRFGeneration, event, sample): [ 606 Stage(StageIdentifier.CheckSRF, event, sample) 607 ], 608 Stage(StageIdentifier.CheckSRF, event, sample): [ 609 Stage(StageIdentifier.StochGeneration, event, sample), 610 Stage(StageIdentifier.EMOD3DParameters, event, sample), 611 ], 612 Stage(StageIdentifier.VelocityModelGeneration, event, None): [ 613 Stage(StageIdentifier.EMOD3DParameters, event, sample), 614 Stage(StageIdentifier.HighFrequency, event, sample), 615 ], 616 Stage(StageIdentifier.StationSelection, event, None): [ 617 Stage(StageIdentifier.EMOD3DParameters, event, sample), 618 Stage(StageIdentifier.HighFrequency, event, sample), 619 ], 620 Stage(StageIdentifier.ModelCoordinates, event, None): [ 621 Stage(StageIdentifier.EMOD3DParameters, event, sample) 622 ], 623 Stage(StageIdentifier.EMOD3DParameters, event, sample): [ 624 Stage(StageIdentifier.CheckDomain, event, sample) 625 ], 626 Stage(StageIdentifier.CheckDomain, event, sample): [ 627 Stage(StageIdentifier.LowFrequency, event, sample) 628 ], 629 Stage(StageIdentifier.LowFrequency, event, sample): [ 630 Stage(StageIdentifier.Broadband, event, sample), 631 Stage(StageIdentifier.MergeTimeslices, event, sample), 632 ], 633 Stage(StageIdentifier.StochGeneration, event, sample): [ 634 Stage(StageIdentifier.HighFrequency, event, sample) 635 ], 636 Stage(StageIdentifier.HighFrequency, event, sample): [ 637 Stage(StageIdentifier.Broadband, event, sample) 638 ], 639 Stage(StageIdentifier.Broadband, event, sample): [ 640 Stage(StageIdentifier.IntensityMeasureCalculation, event, sample) 641 ], 642 Stage(StageIdentifier.MergeTimeslices, event, sample): [ 643 Stage(StageIdentifier.PlotTimeslices, event, sample) 644 ], 645 }, 646 create_using=nx.DiGraph, 647 ) 648 if not sample: 649 workflow_plan.add_edges_from( 650 [ 651 ( 652 Stage(StageIdentifier.NSHMToRealisation, event, sample), 653 Stage(StageIdentifier.DomainGeneration, event, sample), 654 ), 655 ( 656 Stage(StageIdentifier.GCMTToRealisation, event, sample), 657 Stage(StageIdentifier.DomainGeneration, event, sample), 658 ), 659 ( 660 Stage(StageIdentifier.DomainGeneration, event, sample), 661 Stage(StageIdentifier.EMOD3DParameters, event, sample), 662 ), 663 ( 664 Stage(StageIdentifier.DomainGeneration, event, sample), 665 Stage(StageIdentifier.VelocityModelGeneration, event, sample), 666 ), 667 ( 668 Stage(StageIdentifier.DomainGeneration, event, sample), 669 Stage(StageIdentifier.StationSelection, event, sample), 670 ), 671 ( 672 Stage(StageIdentifier.DomainGeneration, event, sample), 673 Stage(StageIdentifier.ModelCoordinates, event, sample), 674 ), 675 ] 676 ) 677 else: 678 workflow_plan.add_edges_from( 679 [ 680 ( 681 Stage(StageIdentifier.DomainGeneration, event, None), 682 Stage(StageIdentifier.CopyDomainParameters, event, sample), 683 ), 684 ( 685 Stage(StageIdentifier.CopyDomainParameters, event, sample), 686 Stage(StageIdentifier.EMOD3DParameters, event, sample), 687 ), 688 ( 689 Stage(StageIdentifier.CopyDomainParameters, event, sample), 690 Stage(StageIdentifier.HighFrequency, event, sample), 691 ), 692 ] 693 ) 694 695 return workflow_plan
Add a realisation to a workflow plan.
Adds all stages for the realisation to run, and links to event stages for shared resources (i.e. the velocity model).
Parameters
- workflow_plan (nx.DiGraph): The current workflow paln.
- event (str): The event to add.
- sample (Optional[int]): The sample number (or None, if the original event).
698def create_abstract_workflow_plan( 699 realisations: set[tuple[str, Optional[int]]], 700 goals: Iterable[StageIdentifier], 701 excluding: Iterable[StageIdentifier], 702) -> nx.DiGraph: 703 """Create an abstract workflow graph from a list of goals and excluded stages. 704 705 Parameters 706 ---------- 707 goals : Iterable[StageIdentifier] 708 The goal stages for the workflow. 709 excluding : Iterable[StageIdentifier] 710 The excluded stages for the workflow. 711 712 Returns 713 ------- 714 nx.DiGraph 715 A abstract workflow plan. This workflow plan contains only 716 included stages that are required to reach the goals. If two 717 workflow stages depend on each other only through paths 718 consisting entirely of excluded nodes, then they are adjacent 719 directly in the abstract plan by edges. 720 """ 721 722 excluding_stages = { 723 Stage(excluded, *realisation) 724 for excluded in excluding 725 for realisation in realisations 726 } 727 728 output_graph = nx.DiGraph() 729 realisation_iteration = ( 730 realisations if len(realisations) < 100 else tqdm.tqdm(realisations) 731 ) 732 733 for realisation in realisation_iteration: 734 workflow_plan = realisation_workflow(*realisation) 735 workflow_plan = nx.transitive_closure_dag(workflow_plan) 736 737 for goal in goals: 738 reduced_graph = nx.transitive_reduction( 739 workflow_plan.subgraph( 740 ( 741 set(workflow_plan.predecessors(Stage(goal, *realisation))) 742 | {Stage(goal, *realisation)} 743 ) 744 - excluding_stages 745 ) 746 ) 747 output_graph.update( 748 edges=reduced_graph.edges(), nodes=reduced_graph.nodes() 749 ) 750 751 roots = [node for node, degree in output_graph.in_degree() if degree == 0] 752 copy_input_stage = Stage(StageIdentifier.CopyInput, "", None) 753 output_graph.add_node(copy_input_stage) 754 for root in roots: 755 output_graph.add_edge(copy_input_stage, root) 756 return output_graph
Create an abstract workflow graph from a list of goals and excluded stages.
Parameters
- goals (Iterable[StageIdentifier]): The goal stages for the workflow.
- excluding (Iterable[StageIdentifier]): The excluded stages for the workflow.
Returns
- nx.DiGraph: A abstract workflow plan. This workflow plan contains only included stages that are required to reach the goals. If two workflow stages depend on each other only through paths consisting entirely of excluded nodes, then they are adjacent directly in the abstract plan by edges.
759def stage_to_node_string(stage: Stage) -> str: 760 r"""Convert a `Stage` into a human readable node identifier string. 761 762 Parameters 763 ---------- 764 stage : Stage 765 The stage to render. 766 767 Returns 768 ------- 769 str 770 A string of the format 771 {stage.identifier}\n{stage.event}_{stage.sample}, if event and 772 sample are non-trivial. 773 """ 774 node_string = str(stage.identifier) 775 if stage.event: 776 node_string += f"\n{stage.event}" 777 if stage.sample: 778 node_string += f"_{stage.sample}" 779 return node_string
Convert a Stage into a human readable node identifier string.
Parameters
- stage (Stage): The stage to render.
Returns
- str: A string of the format {stage.identifier}\n{stage.event}_{stage.sample}, if event and sample are non-trivial.
782def pyvis_graph(workflow_plan: nx.DiGraph) -> Network: 783 """Convert a workflow plan into a pyvis diagram for visualisation. 784 785 Parameters 786 ---------- 787 workflow_plan : nx.DiGraph 788 The workflow plan to visualise. 789 790 791 Returns 792 ------- 793 Network 794 A pyvis rendering for this workflow plan. 795 """ 796 network = Network( 797 width="100%", height="1500px", directed=True, layout="hierarchical" 798 ) 799 network.show_buttons(filter_=["physics"]) 800 roots = [node for node, degree in workflow_plan.in_degree() if degree == 0] 801 reversed_workflow = workflow_plan.reverse() 802 stage: Stage 803 for stage in workflow_plan.nodes(): 804 network.add_node( 805 stage_to_node_string(stage), 806 group=f"{stage.event}_{stage.sample or ''}", 807 size=20, 808 level=max( 809 ( 810 len(path) - 1 811 for root in roots 812 for path in nx.all_simple_paths(reversed_workflow, stage, root) 813 ), 814 default=0, 815 ), 816 ) 817 for stage, next_stage in workflow_plan.edges(): 818 network.add_edge(stage_to_node_string(stage), stage_to_node_string(next_stage)) 819 return network
Convert a workflow plan into a pyvis diagram for visualisation.
Parameters
- workflow_plan (nx.DiGraph): The workflow plan to visualise.
Returns
- Network: A pyvis rendering for this workflow plan.
825def parse_realisation(realisation_id: str) -> set[tuple[str, Optional[int]]]: 826 """Parse a realisation identifier string from the command line into a realisation identifier. 827 828 Parameters 829 ---------- 830 realisation_id : str 831 The realisation identifier string to parse. 832 833 Returns 834 ------- 835 tuple[str, Optional[int]] 836 The parsed realisation event and sample number. 837 """ 838 try: 839 index = realisation_id.rindex(":") 840 event, num_samples = realisation_id[:index], realisation_id[index + 1 :] 841 842 return {(event, sample or None) for sample in range(int(num_samples))} 843 except ValueError: 844 return {(realisation_id, None)}
Parse a realisation identifier string from the command line into a realisation identifier.
Parameters
- realisation_id (str): The realisation identifier string to parse.
Returns
- tuple[str, Optional[int]]: The parsed realisation event and sample number.
847def build_filetree(files: set[AnnotatedPath]) -> dict[str, Any]: 848 """Build a file tree from a set of annotated file paths. 849 850 Parameters 851 ---------- 852 files : set[AnnotatedPath] 853 The set of files to construct a tree for. 854 855 856 Returns 857 ------- 858 dict[str, Any] 859 A file tree. 860 """ 861 filetree: dict[str, Any] = {} 862 for file in sorted(files): 863 cur = filetree 864 for part in file.parts[:-1]: 865 if part not in cur: 866 cur[part] = {} 867 cur = cur[part] 868 if not cur.get(file.parts[-1]): 869 cur[file.parts[-1]] = file.description 870 return filetree
Build a file tree from a set of annotated file paths.
Parameters
- files (set[AnnotatedPath]): The set of files to construct a tree for.
Returns
- dict[str, Any]: A file tree.
873@app.command( 874 help="Plan and generate a Cylc workflow file for a number of realisations." 875) 876def plan_workflow( 877 realisation_ids: Annotated[ 878 list[str], 879 typer.Argument( 880 help="List of realisations to generate workflows for. Realisations have the format event:realisation_count, such as Darfield:4." 881 ), 882 ], 883 flow_file: Annotated[ 884 Path, 885 typer.Argument( 886 help="Path to output flow file (e.g. ~/cylc-src/my-workflow/flow.cylc)", 887 writable=True, 888 dir_okay=False, 889 ), 890 ], 891 goal: Annotated[ 892 list[StageIdentifier], 893 typer.Option( 894 help="List of workflow outputs to generate", 895 default_factory=lambda: [], 896 rich_help_panel='Planning Workflows' 897 ), 898 ], 899 group_goal: Annotated[ 900 list[GroupIdentifier], 901 typer.Option( 902 help="List of group goals to generate", default_factory=lambda: [], rich_help_panel='Planning Workflows' 903 ), 904 ], 905 excluding: Annotated[ 906 list[StageIdentifier], 907 typer.Option(help="List of stages to exclude", default_factory=lambda: [], rich_help_panel='Planning Workflows'), 908 ], 909 excluding_group: Annotated[ 910 list[GroupIdentifier], 911 typer.Option( 912 help="List of stage groups to exclude", default_factory=lambda: [], rich_help_panel='Planning Workflows' 913 ), 914 ], 915 visualise: Annotated[ 916 bool, typer.Option(help="Visualise the planned workflow as a graph", rich_help_panel='Visualising Workflows') 917 ] = False, 918 show_required_files: Annotated[ 919 bool, 920 typer.Option( 921 help="Print the expected directory tree at the start of the simulation.", rich_help_panel='Visualising Workflows' 922 ), 923 ] = True, 924 target_host: Annotated[ 925 WorkflowTarget, 926 typer.Option(help="Select the target host where the workflow will be run", rich_help_panel='Planning Workflows'), 927 ] = WorkflowTarget.NeSI, 928 source: Annotated[ 929 Optional[Source], 930 typer.Option( 931 help="If given, set the source of the realisation. For NSHM and GCMT, the realisation id corresponds to the rupture id and GCMT PublicID respectively.", rich_help_panel='Sources' 932 ), 933 ] = None, 934 defaults_version: Annotated[ 935 Optional[DefaultsVersion], 936 typer.Option( 937 help="The simulation defaults to apply for all realisations. Required if source is specified.", rich_help_panel='Sources' 938 ), 939 ] = None, 940): 941 """Plan and generate a Cylc workflow file for a number of realisations. 942 943 Parameters 944 ---------- 945 realisations : list[str] 946 The list of realisations to generate the workflow for. 947 flow_file : Path 948 The output flow file path to write the Cylc workflow to. 949 goal : list[StageIdentifier] 950 A list of workflow stages to mark as goals. These stages are 951 define the endpoints for the workflow. 952 group_goal : list[GroupIdentifier] 953 A list of workflow groups to target. A workflow group is just 954 an alias for a set of workflow stages. Equivalent to adding 955 each group member to `goal`. 956 excluding : list[StageIdentifier] 957 A list of workflow stages to exclude from the flows. 958 group_goal : list[GroupIdentifier] 959 A list of workflow groups to exclude. A workflow group is just 960 an alias for a set of workflow stages. Equivalent to adding 961 each group member to `excluding`. 962 """ 963 realisations = set.union( 964 *[parse_realisation(realisation_id) for realisation_id in realisation_ids] 965 ) 966 if source and not defaults_version: 967 print( 968 "You must specify a defaults version if you specify a source. See the help text for options." 969 ) 970 raise typer.Exit(code=1) 971 excluding_set = set(excluding) 972 goal_set = set(goal) 973 if group_goal: 974 goal_set |= set.union(*[GROUP_GOALS[group] for group in group_goal]) 975 if excluding_group: 976 excluding_set |= set.union(*[GROUP_STAGES[group] for group in excluding_group]) 977 978 excluding_source_map: dict[Optional[Source], set[StageIdentifier]] = { 979 Source.GCMT: {StageIdentifier.GCMTToRealisation}, 980 Source.NSHM: {StageIdentifier.NSHMToRealisation}, 981 } 982 excluding_set |= set.union( 983 *excluding_source_map.values() 984 ) - excluding_source_map.get(source, set()) 985 workflow_plan = create_abstract_workflow_plan(realisations, goal_set, excluding_set) 986 env = jinja2.Environment( 987 loader=jinja2.PackageLoader("workflow"), 988 ) 989 template = env.get_template("flow.cylc") 990 flow_template = template.render( 991 defaults_version=defaults_version, 992 realisations=realisations, 993 target_host=target_host, 994 workflow_plan={ 995 node: sorted(dependents, key=lambda stage: stage_to_node_string(stage)) 996 for node, dependents in sorted( 997 nx.to_dict_of_lists(workflow_plan).items(), 998 key=lambda kv: stage_to_node_string(kv[0]), 999 ) 1000 }, 1001 ) 1002 flow_file.write_text( 1003 # strip empty lines from the output flow template 1004 "\n".join(line for line in flow_template.split("\n") if line.strip()) 1005 ) 1006 if show_required_files: 1007 root_path = Path("cylc-src") / "WORKFLOW_NAME" / "input" 1008 inputs = { 1009 AnnotatedPath( 1010 Path("cylc-src") / "WORKFLOW_NAME" / "flow.cylc", 1011 f"Your workflow file (the file {flow_file}).", 1012 ) 1013 } 1014 outputs = set() 1015 required_realisation_sections = { 1016 realisation: realisation_configuration_requirements( 1017 workflow_plan, realisation 1018 ) 1019 for realisation in realisations 1020 } 1021 1022 for stage in workflow_plan.nodes: 1023 inputs |= stage_inputs(stage, root_path, required_realisation_sections) 1024 outputs |= stage_outputs(stage, root_path) 1025 1026 missing_file_tree = build_filetree(inputs - outputs) 1027 1028 if missing_file_tree: 1029 print("You require the following files for your simulation:") 1030 print() 1031 printree.ptree(missing_file_tree) 1032 print() 1033 print( 1034 "Refer to the indicated sections in https://wiki.canterbury.ac.nz/display/QuakeCore/File+Formats+Used+In+Ground+Motion+Simulation" 1035 ) 1036 if any(required_realisation_sections.values()): 1037 print( 1038 "Refer to the realisation glossary at URL HERE for details on filling in the realisation files." 1039 ) 1040 if visualise: 1041 network = pyvis_graph(workflow_plan) 1042 with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as graph_render: 1043 network.show(graph_render.name, notebook=False)
Plan and generate a Cylc workflow file for a number of realisations.
Parameters
- realisations (list[str]): The list of realisations to generate the workflow for.
- flow_file (Path): The output flow file path to write the Cylc workflow to.
- goal (list[StageIdentifier]): A list of workflow stages to mark as goals. These stages are define the endpoints for the workflow.
- group_goal (list[GroupIdentifier]):
A list of workflow groups to target. A workflow group is just
an alias for a set of workflow stages. Equivalent to adding
each group member to
goal. - excluding (list[StageIdentifier]): A list of workflow stages to exclude from the flows.
- group_goal (list[GroupIdentifier]):
A list of workflow groups to exclude. A workflow group is just
an alias for a set of workflow stages. Equivalent to adding
each group member to
excluding.