Skip to content

Reference

This section contains the automatically generated API documentation for ectop.

Core

Main application class for ectop.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Ectop

Bases: App

A Textual-based TUI for monitoring and controlling ecFlow.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Source code in src/ectop/app.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
class Ectop(App):
    """
    A Textual-based TUI for monitoring and controlling ecFlow.

    .. note::
        If you modify features, API, or usage, you MUST update the documentation immediately.
    """

    CSS = f"""
    Screen {{
        background: {COLOR_BG};
    }}

    StatusBar {{
        dock: bottom;
        height: 1;
        background: {COLOR_STATUS_BAR_BG};
        color: {COLOR_TEXT};
    }}

    /* Left Sidebar (Tree) */
    #sidebar {{
        width: 30%;
        height: 100%;
        border-right: solid {COLOR_BORDER};
        background: {COLOR_SIDEBAR_BG};
    }}

    Tree {{
        background: {COLOR_SIDEBAR_BG};
        color: {COLOR_TEXT};
        padding: 1;
    }}

    /* Right Content (Tabs) */
    #main_content {{
        width: 70%;
        height: 100%;
    }}

    TabbedContent {{
        height: 100%;
    }}

    /* Content Areas */
    RichLog {{
        background: {COLOR_CONTENT_BG};
        color: {COLOR_TEXT_HIGHLIGHT};
        border: none;
    }}

    .code_view {{
        background: {COLOR_CONTENT_BG};
        padding: 1;
        width: 100%;
        height: auto;
    }}

    #search_box {{
        dock: top;
        display: none;
        background: {COLOR_CONTENT_BG};
        color: {COLOR_TEXT_HIGHLIGHT};
        border: tall {COLOR_BORDER};
    }}

    #search_box.visible {{
        display: block;
    }}

    #why_container {{
        padding: 1 2;
        background: {COLOR_BG};
        border: thick {COLOR_BORDER};
        width: 60%;
        height: 60%;
    }}

    #why_title {{
        text-align: center;
        background: {COLOR_HEADER_BG};
        color: white;
        margin-bottom: 1;
    }}

    #confirm_container {{
        padding: 1 2;
        background: {COLOR_BG};
        border: thick {COLOR_BORDER};
        width: 40%;
        height: 20%;
    }}

    #confirm_message {{
        text-align: center;
        margin-bottom: 1;
    }}

    #confirm_actions {{
        align: center middle;
    }}

    #confirm_actions Button {{
        margin: 0 1;
    }}

    #var_container {{
        padding: 1 2;
        background: {COLOR_BG};
        border: thick {COLOR_BORDER};
        width: 80%;
        height: 80%;
    }}

    #var_title {{
        text-align: center;
        background: {COLOR_HEADER_BG};
        color: white;
        margin-bottom: 1;
    }}

    #var_input.hidden {{
        display: none;
    }}
    """

    COMMANDS = App.COMMANDS | {EctopCommands}

    BINDINGS = [
        Binding("q", "quit", "Quit"),
        Binding("p", "command_palette", "Command Palette"),
        Binding("r", "refresh", "Refresh Tree"),
        Binding("l", "load_node", "Load Logs/Script"),
        Binding("s", "suspend", "Suspend"),
        Binding("u", "resume", "Resume"),
        Binding("k", "kill", "Kill"),
        Binding("f", "force", "Force Complete"),
        Binding("F", "cycle_filter", "Cycle Filter"),
        Binding("R", "requeue", "Requeue"),
        Binding("c", "copy_path", "Copy Path"),
        Binding("S", "restart_server", "Start Server"),
        Binding("H", "halt_server", "Halt Server"),
        Binding("/", "search", "Search"),
        Binding("w", "why", "Why?"),
        Binding("e", "edit_script", "Edit & Rerun"),
        Binding("t", "toggle_live", "Toggle Live Log"),
        Binding("v", "variables", "Variables"),
        Binding("ctrl+f", "search_content", "Search in Content"),
    ]

    def __init__(
        self,
        host: str = DEFAULT_HOST,
        port: int = DEFAULT_PORT,
        refresh_interval: float = DEFAULT_REFRESH_INTERVAL,
        **kwargs: Any,
    ) -> None:
        """
        Initialize the application.

        Parameters
        ----------
        host : str, optional
            The ecFlow server hostname, by default DEFAULT_HOST.
        port : int, optional
            The ecFlow server port, by default DEFAULT_PORT.
        refresh_interval : float, optional
            The interval for live log updates, by default DEFAULT_REFRESH_INTERVAL.
        **kwargs : Any
            Additional keyword arguments for the Textual App.
        """
        super().__init__(**kwargs)
        self.host = host
        self.port = port
        self.refresh_interval = refresh_interval
        self.ecflow_client: EcflowClient | None = None

    def compose(self) -> ComposeResult:
        """
        Compose the UI layout.

        Returns
        -------
        ComposeResult
            The UI components.
        """
        yield Header(show_clock=True)
        yield SearchBox(placeholder="Search nodes...", id="search_box")
        yield Horizontal(
            Container(SuiteTree("ecFlow Server", id="suite_tree"), id="sidebar"),
            MainContent(id="main_content"),
        )
        yield StatusBar(id="status_bar")
        yield Footer()

    def on_mount(self) -> None:
        """
        Handle the mount event to start the application.

        Returns
        -------
        None
        """
        self._initial_connect()
        self.set_interval(self.refresh_interval, self._live_log_tick)

    def on_tree_node_selected(self, event: SuiteTree.NodeSelected[str]) -> None:
        """
        Handle node selection to automatically load content.

        Parameters
        ----------
        event : SuiteTree.NodeSelected[str]
            The node selection event.
        """
        if event.node.data:
            self.action_load_node()

    @work(thread=True)
    def _initial_connect(self) -> None:
        """
        Perform initial connection to the ecFlow server.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs blocking I/O.
        """
        try:
            self.ecflow_client = EcflowClient(self.host, self.port)
            self.ecflow_client.ping()
            # Initial refresh
            self.action_refresh()
        except RuntimeError as e:
            self.call_from_thread(self.notify, f"{ERROR_CONNECTION_FAILED}: {e}", severity="error", timeout=10)
            tree = self.query_one("#suite_tree", SuiteTree)
            self.call_from_thread(self._update_tree_error, tree)
        except Exception as e:
            self.call_from_thread(self.notify, f"Unexpected Error: {e}", severity="error")

    def _update_tree_error(self, tree: SuiteTree) -> None:
        """
        Update tree root to show error.

        Parameters
        ----------
        tree : SuiteTree
            The suite tree widget.

        Returns
        -------
        None
        """
        tree.root.label = f"[red]{ERROR_CONNECTION_FAILED} (Check Host/Port)[/]"

    @work(exclusive=True, thread=True)
    def action_refresh(self) -> None:
        """
        Fetch suites from server and rebuild the tree.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs blocking I/O.
        """
        if not self.ecflow_client:
            return

        self.call_from_thread(self.notify, "Refreshing tree...")

        tree = self.query_one("#suite_tree", SuiteTree)
        status_bar = self.query_one("#status_bar", StatusBar)
        try:
            self.ecflow_client.sync_local()
            defs = self.ecflow_client.get_defs()
            status = "Connected"
            version = "Unknown"
            if defs:
                status = str(defs.get_server_state())
            try:
                version = self.ecflow_client.server_version()
            except RuntimeError:
                pass

            self.call_from_thread(tree.update_tree, self.ecflow_client.host, self.ecflow_client.port, defs)
            self.call_from_thread(
                status_bar.update_status, self.ecflow_client.host, self.ecflow_client.port, status=status, version=version
            )
            self.call_from_thread(self.notify, "Tree Refreshed")
        except RuntimeError as e:
            self.call_from_thread(
                status_bar.update_status, self.ecflow_client.host, self.ecflow_client.port, status=STATUS_SYNC_ERROR
            )
            self.call_from_thread(self.notify, f"Refresh Error: {e}", severity="error")
        except Exception as e:
            self.call_from_thread(self.notify, f"Unexpected Error: {e}", severity="error")

    @work(thread=True)
    def action_restart_server(self) -> None:
        """
        Restart the ecFlow server (RUNNING).
        """
        if not self.ecflow_client:
            return
        try:
            self.ecflow_client.restart_server()
            self.call_from_thread(self.notify, "Server Started (RUNNING)")
            self.action_refresh()
        except Exception as e:
            self.call_from_thread(self.notify, f"Restart Error: {e}", severity="error")

    @work(thread=True)
    def action_halt_server(self) -> None:
        """
        Halt the ecFlow server (HALT).
        """
        if not self.ecflow_client:
            return
        try:
            self.ecflow_client.halt_server()
            self.call_from_thread(self.notify, "Server Halted (HALT)")
            self.action_refresh()
        except Exception as e:
            self.call_from_thread(self.notify, f"Halt Error: {e}", severity="error")

    def get_selected_path(self) -> str | None:
        """
        Helper to get the ecFlow path of the selected node.

        Returns
        -------
        str | None
            The absolute path of the selected node, or None if no node is selected.
        """
        try:
            node = self.query_one("#suite_tree", SuiteTree).cursor_node
            return node.data if node else None
        except Exception:
            return None

    def action_load_node(self) -> None:
        """
        Fetch Output, Script, and Job files for the selected node.

        Returns
        -------
        None
        """
        path = self.get_selected_path()
        if not path:
            self.notify("No node selected", severity="warning")
            return
        self._load_node_worker(path)

    @work(thread=True, exclusive=True)
    def _load_node_worker(self, path: str) -> None:
        """
        Worker to fetch files for a node.

        Parameters
        ----------
        path : str
            The ecFlow node path.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs blocking I/O and UI updates.
        """
        if not self.ecflow_client:
            return

        self.call_from_thread(self.notify, f"Loading files for {path}...")
        content_area = self.query_one("#main_content", MainContent)

        try:
            # Sync to get latest try numbers for filenames
            self.ecflow_client.sync_local()
        except RuntimeError:
            pass

        # 1. Output Log
        try:
            content = self.ecflow_client.file(path, "jobout")
            self.call_from_thread(content_area.update_log, content)
        except RuntimeError:
            self.call_from_thread(content_area.show_error, "#log_output", "File type 'jobout' not found.")

        # 2. Script
        try:
            content = self.ecflow_client.file(path, "script")
            self.call_from_thread(content_area.update_script, content)
        except RuntimeError:
            self.call_from_thread(content_area.show_error, "#view_script", "File type 'script' not available.")

        # 3. Job
        try:
            content = self.ecflow_client.file(path, "job")
            self.call_from_thread(content_area.update_job, content)
        except RuntimeError:
            self.call_from_thread(content_area.show_error, "#view_job", "File type 'job' not available.")

    @work(thread=True)
    def _run_client_command(self, command_name: str, path: str | None) -> None:
        """
        Generic helper to run ecflow commands in a worker thread.

        Parameters
        ----------
        command_name : str
            The name of the command to run on the EcflowClient.
        path : str | None
            The absolute path to the node.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs blocking I/O.
        """
        if not path or not self.ecflow_client:
            return
        try:
            method = getattr(self.ecflow_client, command_name)
            method(path)
            self.call_from_thread(self.notify, f"{command_name.replace('_', ' ').capitalize()}: {path}")
            self.action_refresh()
        except RuntimeError as e:
            self.call_from_thread(self.notify, f"Command Error: {e}", severity="error")
        except Exception as e:
            self.call_from_thread(self.notify, f"Unexpected Error: {e}", severity="error")

    def action_suspend(self) -> None:
        """
        Suspend the selected node.

        Returns
        -------
        None
        """
        self._run_client_command("suspend", self.get_selected_path())

    def action_resume(self) -> None:
        """
        Resume the selected node.

        Returns
        -------
        None
        """
        self._run_client_command("resume", self.get_selected_path())

    def action_kill(self) -> None:
        """
        Kill the selected node.

        Returns
        -------
        None
        """
        self._run_client_command("kill", self.get_selected_path())

    def action_force(self) -> None:
        """
        Force complete the selected node.

        Returns
        -------
        None
        """
        self._run_client_command("force_complete", self.get_selected_path())

    def action_cycle_filter(self) -> None:
        """
        Cycle through tree filters.

        Returns
        -------
        None
        """
        self.query_one("#suite_tree", SuiteTree).action_cycle_filter()

    def action_requeue(self) -> None:
        """
        Requeue the selected node.

        Returns
        -------
        None
        """
        self._run_client_command("requeue", self.get_selected_path())

    def action_copy_path(self) -> None:
        """
        Copy the selected node path to the clipboard.

        Returns
        -------
        None
        """
        path = self.get_selected_path()
        if path:
            if hasattr(self, "copy_to_clipboard"):
                self.copy_to_clipboard(path)
                self.notify(f"Copied to clipboard: {path}")
            else:
                self.notify(f"Node path: {path}")
        else:
            self.notify("No node selected", severity="warning")

    def action_toggle_live(self) -> None:
        """
        Toggle live log updates.

        Returns
        -------
        None
        """
        content_area = self.query_one("#main_content", MainContent)
        content_area.is_live = not content_area.is_live
        state = "ON" if content_area.is_live else "OFF"
        self.notify(f"Live Log: {state}")
        if content_area.is_live:
            content_area.active = "tab_output"

    def _live_log_tick(self) -> None:
        """
        Periodic tick to update the live log if enabled.

        Returns
        -------
        None
        """
        if not self.ecflow_client:
            return
        content_area = self.query_one("#main_content", MainContent)
        if content_area.is_live and content_area.active == "tab_output":
            path = self.get_selected_path()
            if path:
                self._live_log_worker(path)

    @work(thread=True, exclusive=True)
    def _live_log_worker(self, path: str) -> None:
        """
        Worker to fetch the latest log content for live updates.

        Parameters
        ----------
        path : str
            The ecFlow node path.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs blocking I/O and UI updates.
        """
        if not self.ecflow_client:
            return
        try:
            content = self.ecflow_client.file(path, "jobout")
            content_area = self.query_one("#main_content", MainContent)
            self.call_from_thread(content_area.update_log, content, append=True)
        except RuntimeError:
            pass

    def action_why(self) -> None:
        """
        Show the 'Why' inspector for the selected node.

        Returns
        -------
        None
        """
        path = self.get_selected_path()
        if not path or not self.ecflow_client:
            self.notify("No node selected", severity="warning")
            return
        self.push_screen(WhyInspector(path, self.ecflow_client))

    def action_variables(self) -> None:
        """
        Show the variable tweaker for the selected node.

        Returns
        -------
        None
        """
        path = self.get_selected_path()
        if not path or not self.ecflow_client:
            self.notify("No node selected", severity="warning")
            return
        self.push_screen(VariableTweaker(path, self.ecflow_client))

    def action_search_content(self) -> None:
        """
        Trigger content search in the main content area.

        Returns
        -------
        None
        """
        self.query_one("#main_content", MainContent).action_search()

    def action_edit_script(self) -> None:
        """
        Open the node script in an editor and update it on the server.

        Returns
        -------
        None
        """
        path = self.get_selected_path()
        if not path:
            self.notify("No node selected", severity="warning")
            return
        self._edit_script_worker(path)

    @work(thread=True, exclusive=True)
    def _edit_script_worker(self, path: str) -> None:
        """
        Worker to fetch script and prepare for editing.

        Parameters
        ----------
        path : str
            The ecFlow node path.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs blocking I/O and schedules an editor.
        """
        if not self.ecflow_client:
            return

        try:
            content = self.ecflow_client.file(path, "script")
            with tempfile.NamedTemporaryFile(suffix=".ecf", delete=False, mode="w") as f:
                f.write(content)
                temp_path = f.name

            self.call_from_thread(self._run_editor, temp_path, path, content)

        except RuntimeError as e:
            self.call_from_thread(self.notify, f"Edit Error: {e}", severity="error")
        except Exception as e:
            self.call_from_thread(self.notify, f"Unexpected Error: {e}", severity="error")

    def _run_editor(self, temp_path: str, path: str, old_content: str) -> None:
        """
        Run the editor in a suspended state.

        Parameters
        ----------
        temp_path : str
            Path to the temporary file.
        path : str
            The ecFlow node path.
        old_content : str
            The original content of the script.
        """
        editor = os.environ.get("EDITOR", DEFAULT_EDITOR)
        with self.suspend():
            subprocess.run([editor, temp_path], check=False)

        self._finish_edit(temp_path, path, old_content)

    @work(thread=True)
    def _finish_edit(self, temp_path: str, path: str, old_content: str) -> None:
        """
        Process the edited script and update the server.

        Parameters
        ----------
        temp_path : str
            Path to the temporary file.
        path : str
            The ecFlow node path.
        old_content : str
            The original content of the script.
        """
        try:
            with open(temp_path) as f:
                new_content = f.read()

            if os.path.exists(temp_path):
                os.unlink(temp_path)

            if new_content != old_content:
                if self.ecflow_client:
                    self.ecflow_client.alter(path, "change", "script", new_content)
                    self.call_from_thread(self.notify, "Script updated on server")
                    self.call_from_thread(self._prompt_requeue, path)
            else:
                self.call_from_thread(self.notify, "No changes detected")
        except RuntimeError as e:
            self.call_from_thread(self.notify, f"Update Error: {e}", severity="error")
        except Exception as e:
            self.call_from_thread(self.notify, f"Unexpected Error: {e}", severity="error")

    def _prompt_requeue(self, path: str) -> None:
        """
        Prompt the user to requeue the node after a script edit.

        Parameters
        ----------
        path : str
            The absolute path to the node.
        """
        from ectop.widgets.modals.confirm import ConfirmModal

        def do_requeue() -> None:
            if self.ecflow_client:
                # We should probably run this in a worker too, but for simplicity
                # we'll call a worker-wrapped method
                self._run_client_command("requeue", path)

        self.push_screen(ConfirmModal(f"Re-queue {path} now?", do_requeue))

    def action_search(self) -> None:
        """
        Show the search box.

        Returns
        -------
        None
        """
        search_box = self.query_one("#search_box", SearchBox)
        search_box.add_class("visible")
        search_box.focus()

    def on_input_submitted(self, event: Input.Submitted) -> None:
        """
        Handle search submission.

        Parameters
        ----------
        event : Input.Submitted
            The input submission event.
        """
        if event.input.id == "search_box":
            query = event.value
            if query:
                tree = self.query_one("#suite_tree", SuiteTree)
                tree.find_and_select(query)

    def on_input_changed(self, event: Input.Changed) -> None:
        """
        Handle search input changes for live search.

        Parameters
        ----------
        event : Input.Changed
            The input changed event.
        """
        if event.input.id == "search_box":
            query = event.value
            if query:
                tree = self.query_one("#suite_tree", SuiteTree)
                tree.find_and_select(query)

__init__(host=DEFAULT_HOST, port=DEFAULT_PORT, refresh_interval=DEFAULT_REFRESH_INTERVAL, **kwargs)

Initialize the application.

Parameters

host : str, optional The ecFlow server hostname, by default DEFAULT_HOST. port : int, optional The ecFlow server port, by default DEFAULT_PORT. refresh_interval : float, optional The interval for live log updates, by default DEFAULT_REFRESH_INTERVAL. **kwargs : Any Additional keyword arguments for the Textual App.

Source code in src/ectop/app.py
def __init__(
    self,
    host: str = DEFAULT_HOST,
    port: int = DEFAULT_PORT,
    refresh_interval: float = DEFAULT_REFRESH_INTERVAL,
    **kwargs: Any,
) -> None:
    """
    Initialize the application.

    Parameters
    ----------
    host : str, optional
        The ecFlow server hostname, by default DEFAULT_HOST.
    port : int, optional
        The ecFlow server port, by default DEFAULT_PORT.
    refresh_interval : float, optional
        The interval for live log updates, by default DEFAULT_REFRESH_INTERVAL.
    **kwargs : Any
        Additional keyword arguments for the Textual App.
    """
    super().__init__(**kwargs)
    self.host = host
    self.port = port
    self.refresh_interval = refresh_interval
    self.ecflow_client: EcflowClient | None = None

action_copy_path()

Copy the selected node path to the clipboard.

Returns

None

Source code in src/ectop/app.py
def action_copy_path(self) -> None:
    """
    Copy the selected node path to the clipboard.

    Returns
    -------
    None
    """
    path = self.get_selected_path()
    if path:
        if hasattr(self, "copy_to_clipboard"):
            self.copy_to_clipboard(path)
            self.notify(f"Copied to clipboard: {path}")
        else:
            self.notify(f"Node path: {path}")
    else:
        self.notify("No node selected", severity="warning")

action_cycle_filter()

Cycle through tree filters.

Returns

None

Source code in src/ectop/app.py
def action_cycle_filter(self) -> None:
    """
    Cycle through tree filters.

    Returns
    -------
    None
    """
    self.query_one("#suite_tree", SuiteTree).action_cycle_filter()

action_edit_script()

Open the node script in an editor and update it on the server.

Returns

None

Source code in src/ectop/app.py
def action_edit_script(self) -> None:
    """
    Open the node script in an editor and update it on the server.

    Returns
    -------
    None
    """
    path = self.get_selected_path()
    if not path:
        self.notify("No node selected", severity="warning")
        return
    self._edit_script_worker(path)

action_force()

Force complete the selected node.

Returns

None

Source code in src/ectop/app.py
def action_force(self) -> None:
    """
    Force complete the selected node.

    Returns
    -------
    None
    """
    self._run_client_command("force_complete", self.get_selected_path())

action_halt_server()

Halt the ecFlow server (HALT).

Source code in src/ectop/app.py
@work(thread=True)
def action_halt_server(self) -> None:
    """
    Halt the ecFlow server (HALT).
    """
    if not self.ecflow_client:
        return
    try:
        self.ecflow_client.halt_server()
        self.call_from_thread(self.notify, "Server Halted (HALT)")
        self.action_refresh()
    except Exception as e:
        self.call_from_thread(self.notify, f"Halt Error: {e}", severity="error")

action_kill()

Kill the selected node.

Returns

None

Source code in src/ectop/app.py
def action_kill(self) -> None:
    """
    Kill the selected node.

    Returns
    -------
    None
    """
    self._run_client_command("kill", self.get_selected_path())

action_load_node()

Fetch Output, Script, and Job files for the selected node.

Returns

None

Source code in src/ectop/app.py
def action_load_node(self) -> None:
    """
    Fetch Output, Script, and Job files for the selected node.

    Returns
    -------
    None
    """
    path = self.get_selected_path()
    if not path:
        self.notify("No node selected", severity="warning")
        return
    self._load_node_worker(path)

action_refresh()

Fetch suites from server and rebuild the tree.

Returns

None

Notes

This is a background worker that performs blocking I/O.

Source code in src/ectop/app.py
@work(exclusive=True, thread=True)
def action_refresh(self) -> None:
    """
    Fetch suites from server and rebuild the tree.

    Returns
    -------
    None

    Notes
    -----
    This is a background worker that performs blocking I/O.
    """
    if not self.ecflow_client:
        return

    self.call_from_thread(self.notify, "Refreshing tree...")

    tree = self.query_one("#suite_tree", SuiteTree)
    status_bar = self.query_one("#status_bar", StatusBar)
    try:
        self.ecflow_client.sync_local()
        defs = self.ecflow_client.get_defs()
        status = "Connected"
        version = "Unknown"
        if defs:
            status = str(defs.get_server_state())
        try:
            version = self.ecflow_client.server_version()
        except RuntimeError:
            pass

        self.call_from_thread(tree.update_tree, self.ecflow_client.host, self.ecflow_client.port, defs)
        self.call_from_thread(
            status_bar.update_status, self.ecflow_client.host, self.ecflow_client.port, status=status, version=version
        )
        self.call_from_thread(self.notify, "Tree Refreshed")
    except RuntimeError as e:
        self.call_from_thread(
            status_bar.update_status, self.ecflow_client.host, self.ecflow_client.port, status=STATUS_SYNC_ERROR
        )
        self.call_from_thread(self.notify, f"Refresh Error: {e}", severity="error")
    except Exception as e:
        self.call_from_thread(self.notify, f"Unexpected Error: {e}", severity="error")

action_requeue()

Requeue the selected node.

Returns

None

Source code in src/ectop/app.py
def action_requeue(self) -> None:
    """
    Requeue the selected node.

    Returns
    -------
    None
    """
    self._run_client_command("requeue", self.get_selected_path())

action_restart_server()

Restart the ecFlow server (RUNNING).

Source code in src/ectop/app.py
@work(thread=True)
def action_restart_server(self) -> None:
    """
    Restart the ecFlow server (RUNNING).
    """
    if not self.ecflow_client:
        return
    try:
        self.ecflow_client.restart_server()
        self.call_from_thread(self.notify, "Server Started (RUNNING)")
        self.action_refresh()
    except Exception as e:
        self.call_from_thread(self.notify, f"Restart Error: {e}", severity="error")

action_resume()

Resume the selected node.

Returns

None

Source code in src/ectop/app.py
def action_resume(self) -> None:
    """
    Resume the selected node.

    Returns
    -------
    None
    """
    self._run_client_command("resume", self.get_selected_path())

Show the search box.

Returns

None

Source code in src/ectop/app.py
def action_search(self) -> None:
    """
    Show the search box.

    Returns
    -------
    None
    """
    search_box = self.query_one("#search_box", SearchBox)
    search_box.add_class("visible")
    search_box.focus()

action_search_content()

Trigger content search in the main content area.

Returns

None

Source code in src/ectop/app.py
def action_search_content(self) -> None:
    """
    Trigger content search in the main content area.

    Returns
    -------
    None
    """
    self.query_one("#main_content", MainContent).action_search()

action_suspend()

Suspend the selected node.

Returns

None

Source code in src/ectop/app.py
def action_suspend(self) -> None:
    """
    Suspend the selected node.

    Returns
    -------
    None
    """
    self._run_client_command("suspend", self.get_selected_path())

action_toggle_live()

Toggle live log updates.

Returns

None

Source code in src/ectop/app.py
def action_toggle_live(self) -> None:
    """
    Toggle live log updates.

    Returns
    -------
    None
    """
    content_area = self.query_one("#main_content", MainContent)
    content_area.is_live = not content_area.is_live
    state = "ON" if content_area.is_live else "OFF"
    self.notify(f"Live Log: {state}")
    if content_area.is_live:
        content_area.active = "tab_output"

action_variables()

Show the variable tweaker for the selected node.

Returns

None

Source code in src/ectop/app.py
def action_variables(self) -> None:
    """
    Show the variable tweaker for the selected node.

    Returns
    -------
    None
    """
    path = self.get_selected_path()
    if not path or not self.ecflow_client:
        self.notify("No node selected", severity="warning")
        return
    self.push_screen(VariableTweaker(path, self.ecflow_client))

action_why()

Show the 'Why' inspector for the selected node.

Returns

None

Source code in src/ectop/app.py
def action_why(self) -> None:
    """
    Show the 'Why' inspector for the selected node.

    Returns
    -------
    None
    """
    path = self.get_selected_path()
    if not path or not self.ecflow_client:
        self.notify("No node selected", severity="warning")
        return
    self.push_screen(WhyInspector(path, self.ecflow_client))

compose()

Compose the UI layout.

Returns

ComposeResult The UI components.

Source code in src/ectop/app.py
def compose(self) -> ComposeResult:
    """
    Compose the UI layout.

    Returns
    -------
    ComposeResult
        The UI components.
    """
    yield Header(show_clock=True)
    yield SearchBox(placeholder="Search nodes...", id="search_box")
    yield Horizontal(
        Container(SuiteTree("ecFlow Server", id="suite_tree"), id="sidebar"),
        MainContent(id="main_content"),
    )
    yield StatusBar(id="status_bar")
    yield Footer()

get_selected_path()

Helper to get the ecFlow path of the selected node.

Returns

str | None The absolute path of the selected node, or None if no node is selected.

Source code in src/ectop/app.py
def get_selected_path(self) -> str | None:
    """
    Helper to get the ecFlow path of the selected node.

    Returns
    -------
    str | None
        The absolute path of the selected node, or None if no node is selected.
    """
    try:
        node = self.query_one("#suite_tree", SuiteTree).cursor_node
        return node.data if node else None
    except Exception:
        return None

on_input_changed(event)

Handle search input changes for live search.

Parameters

event : Input.Changed The input changed event.

Source code in src/ectop/app.py
def on_input_changed(self, event: Input.Changed) -> None:
    """
    Handle search input changes for live search.

    Parameters
    ----------
    event : Input.Changed
        The input changed event.
    """
    if event.input.id == "search_box":
        query = event.value
        if query:
            tree = self.query_one("#suite_tree", SuiteTree)
            tree.find_and_select(query)

on_input_submitted(event)

Handle search submission.

Parameters

event : Input.Submitted The input submission event.

Source code in src/ectop/app.py
def on_input_submitted(self, event: Input.Submitted) -> None:
    """
    Handle search submission.

    Parameters
    ----------
    event : Input.Submitted
        The input submission event.
    """
    if event.input.id == "search_box":
        query = event.value
        if query:
            tree = self.query_one("#suite_tree", SuiteTree)
            tree.find_and_select(query)

on_mount()

Handle the mount event to start the application.

Returns

None

Source code in src/ectop/app.py
def on_mount(self) -> None:
    """
    Handle the mount event to start the application.

    Returns
    -------
    None
    """
    self._initial_connect()
    self.set_interval(self.refresh_interval, self._live_log_tick)

on_tree_node_selected(event)

Handle node selection to automatically load content.

Parameters

event : SuiteTree.NodeSelected[str] The node selection event.

Source code in src/ectop/app.py
def on_tree_node_selected(self, event: SuiteTree.NodeSelected[str]) -> None:
    """
    Handle node selection to automatically load content.

    Parameters
    ----------
    event : SuiteTree.NodeSelected[str]
        The node selection event.
    """
    if event.node.data:
        self.action_load_node()

EctopCommands

Bases: Provider

Command provider for ectop.

Source code in src/ectop/app.py
class EctopCommands(Provider):
    """
    Command provider for ectop.
    """

    async def search(self, query: str) -> Hits:
        """
        Search for commands.

        Parameters
        ----------
        query : str
            The search query.

        Yields
        ------
        Hit
            A command hit.
        """
        matcher = self.matcher(query)
        app = self.app
        assert isinstance(app, Ectop)

        commands = [
            ("Refresh Tree", app.action_refresh, "Refresh the ecFlow suite tree"),
            ("Search Nodes", app.action_search, "Search for a node by name or path"),
            ("Suspend Node", app.action_suspend, "Suspend the currently selected node"),
            ("Resume Node", app.action_resume, "Resume the currently selected node"),
            ("Kill Node", app.action_kill, "Kill the currently selected node"),
            ("Force Complete", app.action_force, "Force complete the currently selected node"),
            ("Cycle Filter", app.action_cycle_filter, "Cycle status filters (All, Aborted, Active...)"),
            ("Requeue", app.action_requeue, "Requeue the currently selected node"),
            ("Copy Path", app.action_copy_path, "Copy the selected node path"),
            ("Why?", app.action_why, "Inspect why a node is not running"),
            ("Variables", app.action_variables, "View/Edit node variables"),
            ("Edit Script", app.action_edit_script, "Edit and rerun node script"),
            ("Restart Server", app.action_restart_server, "Start server scheduling (RUNNING)"),
            ("Halt Server", app.action_halt_server, "Stop server scheduling (HALT)"),
            ("Toggle Live Log", app.action_toggle_live, "Toggle live log updates"),
            ("Quit", app.action_quit, "Quit the application"),
        ]

        for name, action, help_text in commands:
            score = matcher.match(name)
            if score > 0:
                yield Hit(
                    score,
                    matcher.highlight(name),
                    action,
                    help=help_text,
                )

search(query) async

Search for commands.

Parameters

query : str The search query.

Yields

Hit A command hit.

Source code in src/ectop/app.py
async def search(self, query: str) -> Hits:
    """
    Search for commands.

    Parameters
    ----------
    query : str
        The search query.

    Yields
    ------
    Hit
        A command hit.
    """
    matcher = self.matcher(query)
    app = self.app
    assert isinstance(app, Ectop)

    commands = [
        ("Refresh Tree", app.action_refresh, "Refresh the ecFlow suite tree"),
        ("Search Nodes", app.action_search, "Search for a node by name or path"),
        ("Suspend Node", app.action_suspend, "Suspend the currently selected node"),
        ("Resume Node", app.action_resume, "Resume the currently selected node"),
        ("Kill Node", app.action_kill, "Kill the currently selected node"),
        ("Force Complete", app.action_force, "Force complete the currently selected node"),
        ("Cycle Filter", app.action_cycle_filter, "Cycle status filters (All, Aborted, Active...)"),
        ("Requeue", app.action_requeue, "Requeue the currently selected node"),
        ("Copy Path", app.action_copy_path, "Copy the selected node path"),
        ("Why?", app.action_why, "Inspect why a node is not running"),
        ("Variables", app.action_variables, "View/Edit node variables"),
        ("Edit Script", app.action_edit_script, "Edit and rerun node script"),
        ("Restart Server", app.action_restart_server, "Start server scheduling (RUNNING)"),
        ("Halt Server", app.action_halt_server, "Stop server scheduling (HALT)"),
        ("Toggle Live Log", app.action_toggle_live, "Toggle live log updates"),
        ("Quit", app.action_quit, "Quit the application"),
    ]

    for name, action, help_text in commands:
        score = matcher.match(name)
        if score > 0:
            yield Hit(
                score,
                matcher.highlight(name),
                action,
                help=help_text,
            )

ecFlow Client Wrapper for ectop.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

EcflowClient

A wrapper around the ecflow.Client to provide a cleaner API and error handling.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Attributes

host : str The hostname of the ecFlow server. port : int The port number of the ecFlow server. client : ecflow.Client The underlying ecFlow client instance.

Source code in src/ectop/client.py
class EcflowClient:
    """
    A wrapper around the ecflow.Client to provide a cleaner API and error handling.

    .. note::
        If you modify features, API, or usage, you MUST update the documentation immediately.

    Attributes
    ----------
    host : str
        The hostname of the ecFlow server.
    port : int
        The port number of the ecFlow server.
    client : ecflow.Client
        The underlying ecFlow client instance.
    """

    def __init__(self, host: str = "localhost", port: int = 3141) -> None:
        """
        Initialize the EcflowClient.

        Parameters
        ----------
        host : str, optional
            The hostname of the ecFlow server, by default "localhost".
        port : int, optional
            The port number of the ecFlow server, by default 3141.

        Raises
        ------
        RuntimeError
            If the ecFlow client cannot be initialized.
        """
        self.host: str = host
        self.port: int = port
        try:
            self.client: ecflow.Client = ecflow.Client(host, port)
        except RuntimeError as e:
            raise RuntimeError(f"Failed to initialize ecFlow client for {host}:{port}: {e}") from e

    def ping(self) -> None:
        """
        Ping the ecFlow server to check connectivity.

        Returns
        -------
        None

        Raises
        ------
        RuntimeError
            If the server is unreachable or the ping fails.

        Notes
        -----
        This is a blocking network call and should be run in a background worker.
        """
        try:
            self.client.ping()
        except RuntimeError as e:
            raise RuntimeError(f"Failed to ping ecFlow server at {self.host}:{self.port}: {e}") from e

    def sync_local(self) -> None:
        """
        Synchronize the local definition with the server.

        Returns
        -------
        None

        Raises
        ------
        RuntimeError
            If synchronization fails.

        Notes
        -----
        This is a blocking network call and should be run in a background worker.
        """
        try:
            self.client.sync_local()
        except RuntimeError as e:
            raise RuntimeError(f"Failed to sync with ecFlow server: {e}") from e

    def get_defs(self) -> Defs | None:
        """
        Retrieve the current definitions from the client.

        Returns
        -------
        ecflow.Defs | None
            The ecFlow definitions, or None if not available.

        Raises
        ------
        RuntimeError
            If the definitions cannot be retrieved.
        """
        try:
            return self.client.get_defs()
        except RuntimeError as e:
            raise RuntimeError(f"Failed to get definitions from client: {e}") from e

    def file(self, path: str, file_type: str) -> str:
        """
        Retrieve a file (log, script, job) for a specific node.

        Parameters
        ----------
        path : str
            The absolute path to the node.
        file_type : str
            The type of file to retrieve ('jobout', 'script', 'job').

        Returns
        -------
        str
            The content of the requested file.

        Raises
        ------
        RuntimeError
            If the file cannot be retrieved.
        """
        try:
            return self.client.get_file(path, file_type)
        except RuntimeError as e:
            raise RuntimeError(f"Failed to retrieve {file_type} for {path}: {e}") from e

    def suspend(self, path: str) -> None:
        """
        Suspend a node.

        Parameters
        ----------
        path : str
            The absolute path to the node.

        Raises
        ------
        RuntimeError
            If the node cannot be suspended.
        """
        try:
            self.client.suspend(path)
        except RuntimeError as e:
            raise RuntimeError(f"Failed to suspend {path}: {e}") from e

    def resume(self, path: str) -> None:
        """
        Resume a suspended node.

        Parameters
        ----------
        path : str
            The absolute path to the node.

        Raises
        ------
        RuntimeError
            If the node cannot be resumed.
        """
        try:
            self.client.resume(path)
        except RuntimeError as e:
            raise RuntimeError(f"Failed to resume {path}: {e}") from e

    def kill(self, path: str) -> None:
        """
        Kill a running task.

        Parameters
        ----------
        path : str
            The absolute path to the node.

        Raises
        ------
        RuntimeError
            If the node cannot be killed.
        """
        try:
            self.client.kill(path)
        except RuntimeError as e:
            raise RuntimeError(f"Failed to kill {path}: {e}") from e

    def force_complete(self, path: str) -> None:
        """
        Force a node to the complete state.

        Parameters
        ----------
        path : str
            The absolute path to the node.

        Raises
        ------
        RuntimeError
            If the node state cannot be forced.
        """
        try:
            self.client.force_complete(path)
        except RuntimeError as e:
            raise RuntimeError(f"Failed to force complete {path}: {e}") from e

    def alter(self, path: str, alter_type: str, name: str, value: str = "") -> None:
        """
        Alter a node attribute or variable.

        Parameters
        ----------
        path : str
            The absolute path to the node.
        alter_type : str
            The type of alteration (e.g., 'change', 'add', 'delete').
        name : str
            The name of the attribute or variable.
        value : str, optional
            The new value, by default "".

        Raises
        ------
        RuntimeError
            If the alteration fails.
        """
        try:
            self.client.alter(path, alter_type, name, value)
        except RuntimeError as e:
            raise RuntimeError(f"Failed to alter {path} ({alter_type} {name}={value}): {e}") from e

    def requeue(self, path: str) -> None:
        """
        Requeue a node.

        Parameters
        ----------
        path : str
            The absolute path to the node.

        Raises
        ------
        RuntimeError
            If the node cannot be requeued.
        """
        try:
            self.client.requeue(path)
        except RuntimeError as e:
            raise RuntimeError(f"Failed to requeue {path}: {e}") from e

    def restart_server(self) -> None:
        """
        Restart the ecFlow server (resume from HALTED state).

        Raises
        ------
        RuntimeError
            If the server cannot be restarted.
        """
        try:
            self.client.restart_server()
        except RuntimeError as e:
            raise RuntimeError(f"Failed to restart server: {e}") from e

    def halt_server(self) -> None:
        """
        Halt the ecFlow server (suspend scheduling).

        Raises
        ------
        RuntimeError
            If the server cannot be halted.
        """
        try:
            self.client.halt_server()
        except RuntimeError as e:
            raise RuntimeError(f"Failed to halt server: {e}") from e

    def version(self) -> str:
        """
        Retrieve the ecFlow client version.

        Returns
        -------
        str
            The client version string.

        Raises
        ------
        RuntimeError
            If the version cannot be retrieved.
        """
        try:
            return str(self.client.version())
        except RuntimeError as e:
            raise RuntimeError(f"Failed to get client version: {e}") from e

    def server_version(self) -> str:
        """
        Retrieve the ecFlow server version.

        Returns
        -------
        str
            The server version string.

        Raises
        ------
        RuntimeError
            If the server version cannot be retrieved.
        """
        try:
            return str(self.client.server_version())
        except RuntimeError as e:
            raise RuntimeError(f"Failed to get server version: {e}") from e

__init__(host='localhost', port=3141)

Initialize the EcflowClient.

Parameters

host : str, optional The hostname of the ecFlow server, by default "localhost". port : int, optional The port number of the ecFlow server, by default 3141.

Raises

RuntimeError If the ecFlow client cannot be initialized.

Source code in src/ectop/client.py
def __init__(self, host: str = "localhost", port: int = 3141) -> None:
    """
    Initialize the EcflowClient.

    Parameters
    ----------
    host : str, optional
        The hostname of the ecFlow server, by default "localhost".
    port : int, optional
        The port number of the ecFlow server, by default 3141.

    Raises
    ------
    RuntimeError
        If the ecFlow client cannot be initialized.
    """
    self.host: str = host
    self.port: int = port
    try:
        self.client: ecflow.Client = ecflow.Client(host, port)
    except RuntimeError as e:
        raise RuntimeError(f"Failed to initialize ecFlow client for {host}:{port}: {e}") from e

alter(path, alter_type, name, value='')

Alter a node attribute or variable.

Parameters

path : str The absolute path to the node. alter_type : str The type of alteration (e.g., 'change', 'add', 'delete'). name : str The name of the attribute or variable. value : str, optional The new value, by default "".

Raises

RuntimeError If the alteration fails.

Source code in src/ectop/client.py
def alter(self, path: str, alter_type: str, name: str, value: str = "") -> None:
    """
    Alter a node attribute or variable.

    Parameters
    ----------
    path : str
        The absolute path to the node.
    alter_type : str
        The type of alteration (e.g., 'change', 'add', 'delete').
    name : str
        The name of the attribute or variable.
    value : str, optional
        The new value, by default "".

    Raises
    ------
    RuntimeError
        If the alteration fails.
    """
    try:
        self.client.alter(path, alter_type, name, value)
    except RuntimeError as e:
        raise RuntimeError(f"Failed to alter {path} ({alter_type} {name}={value}): {e}") from e

file(path, file_type)

Retrieve a file (log, script, job) for a specific node.

Parameters

path : str The absolute path to the node. file_type : str The type of file to retrieve ('jobout', 'script', 'job').

Returns

str The content of the requested file.

Raises

RuntimeError If the file cannot be retrieved.

Source code in src/ectop/client.py
def file(self, path: str, file_type: str) -> str:
    """
    Retrieve a file (log, script, job) for a specific node.

    Parameters
    ----------
    path : str
        The absolute path to the node.
    file_type : str
        The type of file to retrieve ('jobout', 'script', 'job').

    Returns
    -------
    str
        The content of the requested file.

    Raises
    ------
    RuntimeError
        If the file cannot be retrieved.
    """
    try:
        return self.client.get_file(path, file_type)
    except RuntimeError as e:
        raise RuntimeError(f"Failed to retrieve {file_type} for {path}: {e}") from e

force_complete(path)

Force a node to the complete state.

Parameters

path : str The absolute path to the node.

Raises

RuntimeError If the node state cannot be forced.

Source code in src/ectop/client.py
def force_complete(self, path: str) -> None:
    """
    Force a node to the complete state.

    Parameters
    ----------
    path : str
        The absolute path to the node.

    Raises
    ------
    RuntimeError
        If the node state cannot be forced.
    """
    try:
        self.client.force_complete(path)
    except RuntimeError as e:
        raise RuntimeError(f"Failed to force complete {path}: {e}") from e

get_defs()

Retrieve the current definitions from the client.

Returns

ecflow.Defs | None The ecFlow definitions, or None if not available.

Raises

RuntimeError If the definitions cannot be retrieved.

Source code in src/ectop/client.py
def get_defs(self) -> Defs | None:
    """
    Retrieve the current definitions from the client.

    Returns
    -------
    ecflow.Defs | None
        The ecFlow definitions, or None if not available.

    Raises
    ------
    RuntimeError
        If the definitions cannot be retrieved.
    """
    try:
        return self.client.get_defs()
    except RuntimeError as e:
        raise RuntimeError(f"Failed to get definitions from client: {e}") from e

halt_server()

Halt the ecFlow server (suspend scheduling).

Raises

RuntimeError If the server cannot be halted.

Source code in src/ectop/client.py
def halt_server(self) -> None:
    """
    Halt the ecFlow server (suspend scheduling).

    Raises
    ------
    RuntimeError
        If the server cannot be halted.
    """
    try:
        self.client.halt_server()
    except RuntimeError as e:
        raise RuntimeError(f"Failed to halt server: {e}") from e

kill(path)

Kill a running task.

Parameters

path : str The absolute path to the node.

Raises

RuntimeError If the node cannot be killed.

Source code in src/ectop/client.py
def kill(self, path: str) -> None:
    """
    Kill a running task.

    Parameters
    ----------
    path : str
        The absolute path to the node.

    Raises
    ------
    RuntimeError
        If the node cannot be killed.
    """
    try:
        self.client.kill(path)
    except RuntimeError as e:
        raise RuntimeError(f"Failed to kill {path}: {e}") from e

ping()

Ping the ecFlow server to check connectivity.

Returns

None

Raises

RuntimeError If the server is unreachable or the ping fails.

Notes

This is a blocking network call and should be run in a background worker.

Source code in src/ectop/client.py
def ping(self) -> None:
    """
    Ping the ecFlow server to check connectivity.

    Returns
    -------
    None

    Raises
    ------
    RuntimeError
        If the server is unreachable or the ping fails.

    Notes
    -----
    This is a blocking network call and should be run in a background worker.
    """
    try:
        self.client.ping()
    except RuntimeError as e:
        raise RuntimeError(f"Failed to ping ecFlow server at {self.host}:{self.port}: {e}") from e

requeue(path)

Requeue a node.

Parameters

path : str The absolute path to the node.

Raises

RuntimeError If the node cannot be requeued.

Source code in src/ectop/client.py
def requeue(self, path: str) -> None:
    """
    Requeue a node.

    Parameters
    ----------
    path : str
        The absolute path to the node.

    Raises
    ------
    RuntimeError
        If the node cannot be requeued.
    """
    try:
        self.client.requeue(path)
    except RuntimeError as e:
        raise RuntimeError(f"Failed to requeue {path}: {e}") from e

restart_server()

Restart the ecFlow server (resume from HALTED state).

Raises

RuntimeError If the server cannot be restarted.

Source code in src/ectop/client.py
def restart_server(self) -> None:
    """
    Restart the ecFlow server (resume from HALTED state).

    Raises
    ------
    RuntimeError
        If the server cannot be restarted.
    """
    try:
        self.client.restart_server()
    except RuntimeError as e:
        raise RuntimeError(f"Failed to restart server: {e}") from e

resume(path)

Resume a suspended node.

Parameters

path : str The absolute path to the node.

Raises

RuntimeError If the node cannot be resumed.

Source code in src/ectop/client.py
def resume(self, path: str) -> None:
    """
    Resume a suspended node.

    Parameters
    ----------
    path : str
        The absolute path to the node.

    Raises
    ------
    RuntimeError
        If the node cannot be resumed.
    """
    try:
        self.client.resume(path)
    except RuntimeError as e:
        raise RuntimeError(f"Failed to resume {path}: {e}") from e

server_version()

Retrieve the ecFlow server version.

Returns

str The server version string.

Raises

RuntimeError If the server version cannot be retrieved.

Source code in src/ectop/client.py
def server_version(self) -> str:
    """
    Retrieve the ecFlow server version.

    Returns
    -------
    str
        The server version string.

    Raises
    ------
    RuntimeError
        If the server version cannot be retrieved.
    """
    try:
        return str(self.client.server_version())
    except RuntimeError as e:
        raise RuntimeError(f"Failed to get server version: {e}") from e

suspend(path)

Suspend a node.

Parameters

path : str The absolute path to the node.

Raises

RuntimeError If the node cannot be suspended.

Source code in src/ectop/client.py
def suspend(self, path: str) -> None:
    """
    Suspend a node.

    Parameters
    ----------
    path : str
        The absolute path to the node.

    Raises
    ------
    RuntimeError
        If the node cannot be suspended.
    """
    try:
        self.client.suspend(path)
    except RuntimeError as e:
        raise RuntimeError(f"Failed to suspend {path}: {e}") from e

sync_local()

Synchronize the local definition with the server.

Returns

None

Raises

RuntimeError If synchronization fails.

Notes

This is a blocking network call and should be run in a background worker.

Source code in src/ectop/client.py
def sync_local(self) -> None:
    """
    Synchronize the local definition with the server.

    Returns
    -------
    None

    Raises
    ------
    RuntimeError
        If synchronization fails.

    Notes
    -----
    This is a blocking network call and should be run in a background worker.
    """
    try:
        self.client.sync_local()
    except RuntimeError as e:
        raise RuntimeError(f"Failed to sync with ecFlow server: {e}") from e

version()

Retrieve the ecFlow client version.

Returns

str The client version string.

Raises

RuntimeError If the version cannot be retrieved.

Source code in src/ectop/client.py
def version(self) -> str:
    """
    Retrieve the ecFlow client version.

    Returns
    -------
    str
        The client version string.

    Raises
    ------
    RuntimeError
        If the version cannot be retrieved.
    """
    try:
        return str(self.client.version())
    except RuntimeError as e:
        raise RuntimeError(f"Failed to get client version: {e}") from e

CLI entry point for ectop.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

main()

Run the ectop application.

Parses command-line arguments and environment variables for server configuration.

Source code in src/ectop/cli.py
def main() -> None:
    """
    Run the ectop application.

    Parses command-line arguments and environment variables for server configuration.
    """
    parser = argparse.ArgumentParser(description="ectop — High-performance TUI for ECMWF ecFlow")
    parser.add_argument(
        "--host",
        type=str,
        default=os.environ.get("ECF_HOST", DEFAULT_HOST),
        help=f"ecFlow server hostname (default: {DEFAULT_HOST} or ECF_HOST)",
    )
    parser.add_argument(
        "--port",
        type=int,
        default=int(os.environ.get("ECF_PORT", DEFAULT_PORT)),
        help=f"ecFlow server port (default: {DEFAULT_PORT} or ECF_PORT)",
    )
    parser.add_argument(
        "--refresh",
        type=float,
        default=float(os.environ.get("ECTOP_REFRESH", DEFAULT_REFRESH_INTERVAL)),
        help=f"Automatic refresh interval in seconds (default: {DEFAULT_REFRESH_INTERVAL} or ECTOP_REFRESH)",
    )

    args = parser.parse_args()

    app = Ectop(host=args.host, port=args.port, refresh_interval=args.refresh)
    app.run()

Constants for the ectop application.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

DEFAULT_EDITOR = 'vi' module-attribute

Default editor for script editing.

DEFAULT_SHELL = 'bash' module-attribute

Default shell for script execution.

ERROR_CONNECTION_FAILED = 'Connection Failed' module-attribute

Standard error message for connection failures.

INHERITED_VAR_PREFIX = 'inh_' module-attribute

Prefix for inherited variable keys in the VariableTweaker.

LOADING_PLACEHOLDER = 'loading...' module-attribute

Placeholder text for lazy-loaded tree nodes.

STATUS_SYNC_ERROR = 'Sync Error' module-attribute

Standard status message for synchronization errors.

SYNTAX_THEME = 'monokai' module-attribute

Default theme for syntax highlighting.

TREE_FILTERS = [None, 'aborted', 'active', 'queued', 'submitted', 'suspended'] module-attribute

Default status filters for the SuiteTree.

Widgets

Main content area for displaying ecFlow node information.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

MainContent

Bases: Vertical

A container to display Output logs, Scripts, and Job files in tabs.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Attributes

is_live : bool Whether live log updates are enabled. last_log_size : int The size of the log content at the last update.

Source code in src/ectop/widgets/content.py
class MainContent(Vertical):
    """
    A container to display Output logs, Scripts, and Job files in tabs.

    .. note::
        If you modify features, API, or usage, you MUST update the documentation immediately.

    Attributes
    ----------
    is_live : bool
        Whether live log updates are enabled.
    last_log_size : int
        The size of the log content at the last update.
    """

    is_live: reactive[bool] = reactive(False, init=False)
    """Whether live log updates are enabled."""

    log_content: reactive[str] = reactive("", init=False)
    """The content of the output log."""

    script_content: reactive[str] = reactive("", init=False)
    """The content of the script."""

    job_content: reactive[str] = reactive("", init=False)
    """The content of the job file."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """
        Initialize the MainContent widget.

        Parameters
        ----------
        *args : Any
            Positional arguments for Vertical.
        **kwargs : Any
            Keyword arguments for Vertical.
        """
        super().__init__(*args, **kwargs)
        self.last_log_size: int = 0
        self._content_cache: dict[str, str] = {}

    def compose(self) -> ComposeResult:
        """
        Compose the tabs for Output, Script, and Job.

        Returns
        -------
        ComposeResult
            The UI components for the tabs.
        """
        yield Input(placeholder="Search in content...", id="content_search", classes="hidden")
        with TabbedContent(id="content_tabs"):
            with TabPane("Output", id="tab_output"):
                yield RichLog(markup=True, highlight=True, id="log_output")
            with TabPane("Script (.ecf)", id="tab_script"):
                with VerticalScroll():
                    yield Static("", id="view_script", classes="code_view")
            with TabPane("Job (Processed)", id="tab_job"):
                with VerticalScroll():
                    yield Static("", id="view_job", classes="code_view")

    @property
    def active(self) -> str | None:
        """
        Get the active tab ID.

        Returns
        -------
        str | None
            The ID of the active tab.
        """
        return self.query_one("#content_tabs", TabbedContent).active

    @active.setter
    def active(self, value: str) -> None:
        """
        Set the active tab ID.

        Parameters
        ----------
        value : str
            The ID of the tab to activate.
        """
        self.query_one("#content_tabs", TabbedContent).active = value

    def watch_log_content(self, content: str) -> None:
        """
        Watch for changes in log content and update the widget.

        Parameters
        ----------
        content : str
            The new log content.
        """
        widget = self.query_one("#log_output", RichLog)
        self._content_cache["output"] = content
        # Check if this is an append or a full refresh.
        # Simple heuristic: if content starts with old content, it might be an append.
        # But for robustness, we just clear and rewrite unless specifically asked.
        # Since reactive doesn't easily support 'append' flag, we'll keep update_log
        # for append operations but use reactive for full updates.
        widget.clear()
        self.last_log_size = len(content)
        widget.write(content)

    def watch_script_content(self, content: str) -> None:
        """
        Watch for changes in script content and update the widget.

        Parameters
        ----------
        content : str
            The new script content.
        """
        self._content_cache["script"] = content
        widget = self.query_one("#view_script", Static)
        syntax = Syntax(content, DEFAULT_SHELL, theme=SYNTAX_THEME, line_numbers=True)
        widget.update(syntax)

    def watch_job_content(self, content: str) -> None:
        """
        Watch for changes in job content and update the widget.

        Parameters
        ----------
        content : str
            The new job content.
        """
        self._content_cache["job"] = content
        widget = self.query_one("#view_job", Static)
        syntax = Syntax(content, DEFAULT_SHELL, theme=SYNTAX_THEME, line_numbers=True)
        widget.update(syntax)

    def update_log(self, content: str, append: bool = False) -> None:
        """
        Update the Output log tab.

        Parameters
        ----------
        content : str
            The content to display or append.
        append : bool, optional
            Whether to append to existing content, by default False.
        """
        if not append:
            self.log_content = content
        else:
            widget = self.query_one("#log_output", RichLog)
            self._content_cache["output"] = content
            new_content = content[self.last_log_size :]
            if new_content:
                widget.write(new_content)
                self.last_log_size = len(content)

    def update_script(self, content: str) -> None:
        """
        Update the Script tab.

        Parameters
        ----------
        content : str
            The script content.
        """
        self.script_content = content

    def update_job(self, content: str) -> None:
        """
        Update the Job tab.

        Parameters
        ----------
        content : str
            The job content.
        """
        self.job_content = content

    def action_search(self) -> None:
        """
        Toggle the content search input.

        Returns
        -------
        None
        """
        search_input = self.query_one("#content_search", Input)
        if "hidden" in search_input.classes:
            search_input.remove_class("hidden")
            search_input.focus()
        else:
            search_input.add_class("hidden")
            # Refocus the active tab's content
            active_tab = self.active
            if active_tab == "tab_output":
                self.query_one("#log_output").focus()

    def on_input_submitted(self, event: Input.Submitted) -> None:
        """
        Handle content search submission.

        Parameters
        ----------
        event : Input.Submitted
            The input submission event.

        Returns
        -------
        None
        """
        if event.input.id == "content_search":
            query = event.value
            if not query:
                return

            active_tab = self.active
            cache_key = "output"
            label = "Output"
            if active_tab == "tab_script":
                cache_key = "script"
                label = "Script"
            elif active_tab == "tab_job":
                cache_key = "job"
                label = "Job"

            content = self._content_cache.get(cache_key, "")
            self._run_search_worker(query, content, label)

    @work(thread=True)
    def _run_search_worker(self, query: str, content: str, label: str) -> None:
        """
        Run the search in a background worker.

        Parameters
        ----------
        query : str
            The search query.
        content : str
            The content to search.
        label : str
            The label of the content being searched.
        """
        matches = content.lower().count(query.lower())
        if matches > 0:
            safe_call_app(self.app, self.app.notify, f"Found {matches} matches for '{query}' in {label}", severity="information")
        else:
            safe_call_app(self.app, self.app.notify, f"No matches found for '{query}' in {label}", severity="warning")

    def show_error(self, widget_id: str, message: str) -> None:
        """
        Display an error message in a specific widget and clear cache.

        Parameters
        ----------
        widget_id : str
            The ID of the widget where the error should be shown.
        message : str
            The error message to display.
        """
        cache_key = None
        if widget_id == "#log_output":
            cache_key = "output"
        elif widget_id == "#view_script":
            cache_key = "script"
        elif widget_id == "#view_job":
            cache_key = "job"

        if cache_key:
            self._content_cache[cache_key] = ""

        widget = self.query_one(widget_id)
        if isinstance(widget, RichLog):
            widget.write(f"[italic red]{message}[/]")
        elif isinstance(widget, Static):
            widget.update(f"[italic red]{message}[/]")

active property writable

Get the active tab ID.

Returns

str | None The ID of the active tab.

is_live = reactive(False, init=False) class-attribute instance-attribute

Whether live log updates are enabled.

job_content = reactive('', init=False) class-attribute instance-attribute

The content of the job file.

log_content = reactive('', init=False) class-attribute instance-attribute

The content of the output log.

script_content = reactive('', init=False) class-attribute instance-attribute

The content of the script.

__init__(*args, **kwargs)

Initialize the MainContent widget.

Parameters

args : Any Positional arguments for Vertical. *kwargs : Any Keyword arguments for Vertical.

Source code in src/ectop/widgets/content.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """
    Initialize the MainContent widget.

    Parameters
    ----------
    *args : Any
        Positional arguments for Vertical.
    **kwargs : Any
        Keyword arguments for Vertical.
    """
    super().__init__(*args, **kwargs)
    self.last_log_size: int = 0
    self._content_cache: dict[str, str] = {}

Toggle the content search input.

Returns

None

Source code in src/ectop/widgets/content.py
def action_search(self) -> None:
    """
    Toggle the content search input.

    Returns
    -------
    None
    """
    search_input = self.query_one("#content_search", Input)
    if "hidden" in search_input.classes:
        search_input.remove_class("hidden")
        search_input.focus()
    else:
        search_input.add_class("hidden")
        # Refocus the active tab's content
        active_tab = self.active
        if active_tab == "tab_output":
            self.query_one("#log_output").focus()

compose()

Compose the tabs for Output, Script, and Job.

Returns

ComposeResult The UI components for the tabs.

Source code in src/ectop/widgets/content.py
def compose(self) -> ComposeResult:
    """
    Compose the tabs for Output, Script, and Job.

    Returns
    -------
    ComposeResult
        The UI components for the tabs.
    """
    yield Input(placeholder="Search in content...", id="content_search", classes="hidden")
    with TabbedContent(id="content_tabs"):
        with TabPane("Output", id="tab_output"):
            yield RichLog(markup=True, highlight=True, id="log_output")
        with TabPane("Script (.ecf)", id="tab_script"):
            with VerticalScroll():
                yield Static("", id="view_script", classes="code_view")
        with TabPane("Job (Processed)", id="tab_job"):
            with VerticalScroll():
                yield Static("", id="view_job", classes="code_view")

on_input_submitted(event)

Handle content search submission.

Parameters

event : Input.Submitted The input submission event.

Returns

None

Source code in src/ectop/widgets/content.py
def on_input_submitted(self, event: Input.Submitted) -> None:
    """
    Handle content search submission.

    Parameters
    ----------
    event : Input.Submitted
        The input submission event.

    Returns
    -------
    None
    """
    if event.input.id == "content_search":
        query = event.value
        if not query:
            return

        active_tab = self.active
        cache_key = "output"
        label = "Output"
        if active_tab == "tab_script":
            cache_key = "script"
            label = "Script"
        elif active_tab == "tab_job":
            cache_key = "job"
            label = "Job"

        content = self._content_cache.get(cache_key, "")
        self._run_search_worker(query, content, label)

show_error(widget_id, message)

Display an error message in a specific widget and clear cache.

Parameters

widget_id : str The ID of the widget where the error should be shown. message : str The error message to display.

Source code in src/ectop/widgets/content.py
def show_error(self, widget_id: str, message: str) -> None:
    """
    Display an error message in a specific widget and clear cache.

    Parameters
    ----------
    widget_id : str
        The ID of the widget where the error should be shown.
    message : str
        The error message to display.
    """
    cache_key = None
    if widget_id == "#log_output":
        cache_key = "output"
    elif widget_id == "#view_script":
        cache_key = "script"
    elif widget_id == "#view_job":
        cache_key = "job"

    if cache_key:
        self._content_cache[cache_key] = ""

    widget = self.query_one(widget_id)
    if isinstance(widget, RichLog):
        widget.write(f"[italic red]{message}[/]")
    elif isinstance(widget, Static):
        widget.update(f"[italic red]{message}[/]")

update_job(content)

Update the Job tab.

Parameters

content : str The job content.

Source code in src/ectop/widgets/content.py
def update_job(self, content: str) -> None:
    """
    Update the Job tab.

    Parameters
    ----------
    content : str
        The job content.
    """
    self.job_content = content

update_log(content, append=False)

Update the Output log tab.

Parameters

content : str The content to display or append. append : bool, optional Whether to append to existing content, by default False.

Source code in src/ectop/widgets/content.py
def update_log(self, content: str, append: bool = False) -> None:
    """
    Update the Output log tab.

    Parameters
    ----------
    content : str
        The content to display or append.
    append : bool, optional
        Whether to append to existing content, by default False.
    """
    if not append:
        self.log_content = content
    else:
        widget = self.query_one("#log_output", RichLog)
        self._content_cache["output"] = content
        new_content = content[self.last_log_size :]
        if new_content:
            widget.write(new_content)
            self.last_log_size = len(content)

update_script(content)

Update the Script tab.

Parameters

content : str The script content.

Source code in src/ectop/widgets/content.py
def update_script(self, content: str) -> None:
    """
    Update the Script tab.

    Parameters
    ----------
    content : str
        The script content.
    """
    self.script_content = content

watch_job_content(content)

Watch for changes in job content and update the widget.

Parameters

content : str The new job content.

Source code in src/ectop/widgets/content.py
def watch_job_content(self, content: str) -> None:
    """
    Watch for changes in job content and update the widget.

    Parameters
    ----------
    content : str
        The new job content.
    """
    self._content_cache["job"] = content
    widget = self.query_one("#view_job", Static)
    syntax = Syntax(content, DEFAULT_SHELL, theme=SYNTAX_THEME, line_numbers=True)
    widget.update(syntax)

watch_log_content(content)

Watch for changes in log content and update the widget.

Parameters

content : str The new log content.

Source code in src/ectop/widgets/content.py
def watch_log_content(self, content: str) -> None:
    """
    Watch for changes in log content and update the widget.

    Parameters
    ----------
    content : str
        The new log content.
    """
    widget = self.query_one("#log_output", RichLog)
    self._content_cache["output"] = content
    # Check if this is an append or a full refresh.
    # Simple heuristic: if content starts with old content, it might be an append.
    # But for robustness, we just clear and rewrite unless specifically asked.
    # Since reactive doesn't easily support 'append' flag, we'll keep update_log
    # for append operations but use reactive for full updates.
    widget.clear()
    self.last_log_size = len(content)
    widget.write(content)

watch_script_content(content)

Watch for changes in script content and update the widget.

Parameters

content : str The new script content.

Source code in src/ectop/widgets/content.py
def watch_script_content(self, content: str) -> None:
    """
    Watch for changes in script content and update the widget.

    Parameters
    ----------
    content : str
        The new script content.
    """
    self._content_cache["script"] = content
    widget = self.query_one("#view_script", Static)
    syntax = Syntax(content, DEFAULT_SHELL, theme=SYNTAX_THEME, line_numbers=True)
    widget.update(syntax)

Search box widget for finding nodes in the suite tree.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

SearchBox

Bases: Input

An input widget for searching nodes in the tree.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Source code in src/ectop/widgets/search.py
class SearchBox(Input):
    """
    An input widget for searching nodes in the tree.

    .. note::
        If you modify features, API, or usage, you MUST update the documentation immediately.
    """

    BINDINGS = [
        Binding("escape", "cancel", "Cancel Search"),
        Binding("enter", "submit", "Search Next"),
    ]

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """
        Initialize the SearchBox.

        Parameters
        ----------
        *args : Any
            Positional arguments for the Input widget.
        **kwargs : Any
            Keyword arguments for the Input widget.
        """
        super().__init__(*args, **kwargs)

    def action_cancel(self) -> None:
        """
        Clear search, hide box, and return focus to the tree.
        """
        self.value = ""
        self.remove_class("visible")
        self.app.set_focus(self.app.query_one("#suite_tree"))

    def on_blur(self) -> None:
        """
        Hide the search box when it loses focus.
        """
        self.remove_class("visible")

__init__(*args, **kwargs)

Initialize the SearchBox.

Parameters

args : Any Positional arguments for the Input widget. *kwargs : Any Keyword arguments for the Input widget.

Source code in src/ectop/widgets/search.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """
    Initialize the SearchBox.

    Parameters
    ----------
    *args : Any
        Positional arguments for the Input widget.
    **kwargs : Any
        Keyword arguments for the Input widget.
    """
    super().__init__(*args, **kwargs)

action_cancel()

Clear search, hide box, and return focus to the tree.

Source code in src/ectop/widgets/search.py
def action_cancel(self) -> None:
    """
    Clear search, hide box, and return focus to the tree.
    """
    self.value = ""
    self.remove_class("visible")
    self.app.set_focus(self.app.query_one("#suite_tree"))

on_blur()

Hide the search box when it loses focus.

Source code in src/ectop/widgets/search.py
def on_blur(self) -> None:
    """
    Hide the search box when it loses focus.
    """
    self.remove_class("visible")

Sidebar widget for the ecFlow suite tree.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

SuiteTree

Bases: Tree[str]

A tree widget to display ecFlow suites and nodes.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Source code in src/ectop/widgets/sidebar.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
class SuiteTree(Tree[str]):
    """
    A tree widget to display ecFlow suites and nodes.

    .. note::
        If you modify features, API, or usage, you MUST update the documentation immediately.
    """

    current_filter: reactive[str | None] = reactive(None, init=False)
    """The current status filter applied to the tree."""

    defs: reactive[Defs | None] = reactive(None, init=False)
    """The ecFlow definitions to display."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """
        Initialize the SuiteTree.

        Parameters
        ----------
        *args : Any
            Positional arguments for the Tree widget.
        **kwargs : Any
            Keyword arguments for the Tree widget.

        Returns
        -------
        None
        """
        super().__init__(*args, **kwargs)
        self.filters: list[str | None] = TREE_FILTERS
        self.host: str = ""
        self.port: int = 0
        self._all_paths_cache: list[str] | None = None

    def update_tree(self, client_host: str, client_port: int, defs: Defs | None) -> None:
        """
        Update the tree data.

        Parameters
        ----------
        client_host : str
            The hostname of the ecFlow server.
        client_port : int
            The port of the ecFlow server.
        defs : ecflow.Defs | None
            The ecFlow definitions to display.

        Returns
        -------
        None

        Notes
        -----
        This method triggers the reactive watchers.
        """
        self.host = client_host
        self.port = client_port
        self.defs = defs

    def watch_defs(self, new_defs: Defs | None) -> None:
        """
        Watch for changes in definitions and rebuild the tree.

        Parameters
        ----------
        new_defs : ecflow.Defs | None
            The new ecFlow definitions.
        """
        self._rebuild_tree()

    def watch_current_filter(self, new_filter: str | None) -> None:
        """
        Watch for changes in the current filter and rebuild the tree.

        Parameters
        ----------
        new_filter : str | None
            The new filter value.
        """
        self._rebuild_tree()

    def _rebuild_tree(self) -> None:
        """
        Rebuild the tree from ecFlow definitions using lazy loading.
        """
        self._all_paths_cache = None
        self.clear()
        if not self.defs:
            self.root.label = "Server Empty"
            return

        filter_str = f" [Filter: {self.current_filter}]" if self.current_filter else ""
        self.root.label = f"{ICON_SERVER} {self.host}:{self.port}{filter_str}"

        # Start background worker for tree population to avoid blocking UI
        self._populate_tree_worker()

        # Trigger background cache building for search
        self._build_all_paths_cache_worker()

    @work(thread=True)
    def _populate_tree_worker(self) -> None:
        """
        Worker to populate the tree root with suites in a background thread.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs recursive filtering.
        """
        if not self.defs:
            return
        for suite in cast("list[ecflow.Suite]", self.defs.suites):
            if self._should_show_node(suite):
                self._safe_call(self._add_node_to_ui, self.root, suite)

    def _should_show_node(self, node: Node) -> bool:
        """
        Determine if a node should be shown based on the current filter.

        Parameters
        ----------
        node : ecflow.Node
            The ecFlow node to check.

        Returns
        -------
        bool
            True if the node or any of its descendants match the filter.
        """
        if not self.current_filter:
            return True

        state = str(node.get_state())
        if state == self.current_filter:
            return True

        if hasattr(node, "nodes"):
            return any(self._should_show_node(child) for child in node.nodes)

        return False

    @work(thread=True)
    def _build_all_paths_cache_worker(self) -> None:
        """
        Worker to build the node path cache in a background thread.

        Returns
        -------
        None

        Notes
        -----
        This cache is used by find_and_select to provide fast search without
        blocking the UI thread on the first search.
        """
        if not self.defs:
            return

        paths: list[str] = []
        for suite in self.defs.suites:
            paths.append(suite.get_abs_node_path())
            for node in suite.get_all_nodes():
                paths.append(node.get_abs_node_path())

        self._all_paths_cache = paths

    def action_cycle_filter(self) -> None:
        """
        Cycle through available status filters and refresh the tree.

        Returns
        -------
        None
        """
        current_idx = self.filters.index(self.current_filter)
        next_idx = (current_idx + 1) % len(self.filters)
        self.current_filter = self.filters[next_idx]

        self.app.notify(f"Filter: {self.current_filter or 'All'}")

    def _add_node_to_ui(self, parent_ui_node: TreeNode[str], ecflow_node: ecflow.Node) -> TreeNode[str]:
        """
        Add a single ecflow node to the UI tree.

        Parameters
        ----------
        parent_ui_node : TreeNode[str]
            The parent node in the Textual tree.
        ecflow_node : ecflow.Node
            The ecFlow node to add.

        Returns
        -------
        TreeNode[str]
            The newly created UI node.
        """
        state = str(ecflow_node.get_state())
        icon = STATE_MAP.get(state, ICON_UNKNOWN_STATE)

        is_container = isinstance(ecflow_node, ecflow.Family | ecflow.Suite)
        type_icon = ICON_FAMILY if is_container else ICON_TASK

        label = Text(f"{icon} {type_icon} {ecflow_node.name()} ")
        label.append(f"[{state}]", style="bold italic")

        new_ui_node = parent_ui_node.add(
            label,
            data=ecflow_node.get_abs_node_path(),
            expand=False,
        )

        # If it's a container and has children, add a placeholder for lazy loading
        if is_container and hasattr(ecflow_node, "nodes"):
            # Use a more efficient check for presence of children than len(list(...))
            has_children = False
            try:
                # Check if there is at least one child
                next(iter(ecflow_node.nodes))
                has_children = True
            except (StopIteration, RuntimeError):
                pass

            if has_children:
                new_ui_node.add(LOADING_PLACEHOLDER, allow_expand=False)

        return new_ui_node

    def on_tree_node_expanded(self, event: Tree.NodeExpanded[str]) -> None:
        """
        Handle node expansion to load children on demand.

        Parameters
        ----------
        event : Tree.NodeExpanded[str]
            The expansion event.

        Returns
        -------
        None
        """
        node = event.node
        self._load_children(node)

    def _load_children(self, ui_node: TreeNode[str], sync: bool = False) -> None:
        """
        Load children for a UI node if they haven't been loaded yet.

        Parameters
        ----------
        ui_node : TreeNode[str]
            The UI node to load children for.
        sync : bool, optional
            Whether to load children synchronously, by default False.

        Returns
        -------
        None

        Raises
        ------
        None

        Notes
        -----
        Uses `_load_children_worker` for async loading.
        """
        if not ui_node.data or not self.defs:
            return

        # Check if we have the placeholder
        if len(ui_node.children) == 1 and str(ui_node.children[0].label) == LOADING_PLACEHOLDER:
            # UI modification must be scheduled on the main thread
            placeholder = ui_node.children[0]
            self._safe_call(placeholder.remove)

            if sync:
                ecflow_node = self.defs.find_abs_node(ui_node.data)
                if ecflow_node and hasattr(ecflow_node, "nodes"):
                    for child in ecflow_node.nodes:
                        self._safe_call(self._add_node_to_ui, ui_node, child)
            else:
                self._load_children_worker(ui_node, ui_node.data)

    @work(thread=True)
    def _load_children_worker(self, ui_node: TreeNode[str], node_path: str) -> None:
        """
        Worker to load children nodes in a background thread.

        Parameters
        ----------
        ui_node : TreeNode[str]
            The UI node to populate.
        node_path : str
            The absolute path of the ecFlow node.

        Returns
        -------
        None

        Notes
        -----
        UI updates are scheduled back to the main thread using `call_from_thread`.
        """
        if not self.defs:
            return

        ecflow_node = self.defs.find_abs_node(node_path)
        if ecflow_node and hasattr(ecflow_node, "nodes"):
            for child in cast("list[ecflow.Node]", ecflow_node.nodes):
                if self._should_show_node(child):
                    self.app.call_from_thread(self._add_node_to_ui, ui_node, child)

    @work(exclusive=True, thread=True)
    def find_and_select(self, query: str) -> None:
        """
        Find nodes matching query in the ecFlow definitions and select them.

        This handles searching through unloaded parts of the tree in a
        background thread to keep the UI responsive.

        Parameters
        ----------
        query : str
            The search query.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker.
        """
        self._find_and_select_logic(query)

    def _find_and_select_logic(self, query: str) -> None:
        """
        The actual search logic split out for testing.

        Parameters
        ----------
        query : str
            The search query.
        """
        if not self.defs:
            return

        query = query.lower()

        # Build or use cached paths
        if not hasattr(self, "_all_paths_cache") or self._all_paths_cache is None:
            # Fallback if cache isn't ready yet (e.g. searching immediately after sync)
            paths: list[str] = []
            for suite in self.defs.suites:
                paths.append(suite.get_abs_node_path())
                for node in suite.get_all_nodes():
                    paths.append(node.get_abs_node_path())
            self._all_paths_cache = paths

        all_paths = self._all_paths_cache

        # Get current cursor state on main thread
        cursor_node = getattr(self, "cursor_node", None)
        current_path = cursor_node.data if cursor_node else None

        start_index = 0
        if current_path and current_path in all_paths:
            try:
                start_index = all_paths.index(current_path) + 1
            except ValueError:
                start_index = 0

        # Search from start_index to end, then wrap around
        found_path = None
        for i in range(len(all_paths)):
            path = all_paths[(start_index + i) % len(all_paths)]
            if query in path.lower():
                found_path = path
                break

        if found_path:
            self._select_by_path_logic(found_path)
        else:
            self._safe_call(self.app.notify, f"No match found for '{query}'", severity="warning")

    def _safe_call(self, callback: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
        """
        Safely call a UI-related function from either the main thread or a worker.

        Parameters
        ----------
        callback : Callable[..., Any]
            The function to call.
        *args : Any
            Positional arguments.
        **kwargs : Any
            Keyword arguments.

        Returns
        -------
        Any
            The result of the call if synchronous, or None if scheduled.
        """
        try:
            return safe_call_app(self.app, callback, *args, **kwargs)
        except (AttributeError, RuntimeError, Exception):
            # App might not be fully initialized in some tests
            # Fallback to direct call if app is not available
            return callback(*args, **kwargs)

    @work(thread=True)
    def select_by_path(self, path: str) -> None:
        """
        Select a node by its absolute ecFlow path, expanding parents as needed.

        Parameters
        ----------
        path : str
            The absolute path of the node to select.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker to avoid blocking the UI thread when
        loading many nested nodes synchronously.
        """
        self._select_by_path_logic(path)

    def _select_by_path_logic(self, path: str) -> None:
        """
        The actual logic for selecting a node by path.

        Parameters
        ----------
        path : str
            The absolute path of the node to select.

        Returns
        -------
        None

        Notes
        -----
        This method should be called from a background thread as it performs
        synchronous child loading.
        """
        if path == "/":
            self.app.call_from_thread(self.select_node, self.root)
            return

        parts = path.strip("/").split("/")
        current_ui_node = self.root

        current_path = ""
        for part in parts:
            current_path += "/" + part
            # Load children synchronously within the worker thread
            self._load_children(current_ui_node, sync=True)
            self._safe_call(current_ui_node.expand)

            found = False
            for child in current_ui_node.children:
                if child.data == current_path:
                    current_ui_node = child
                    found = True
                    break
            if not found:
                return

        self._safe_call(self._select_and_reveal, current_ui_node)

    def _select_and_reveal(self, node: TreeNode[str]) -> None:
        """
        Select a node and expand all its parents.

        Parameters
        ----------
        node : TreeNode[str]
            The node to select and reveal.

        Returns
        -------
        None
        """
        self.select_node(node)
        parent = node.parent
        while parent:
            parent.expand()
            parent = parent.parent
        self.scroll_to_node(node)

current_filter = reactive(None, init=False) class-attribute instance-attribute

The current status filter applied to the tree.

defs = reactive(None, init=False) class-attribute instance-attribute

The ecFlow definitions to display.

__init__(*args, **kwargs)

Initialize the SuiteTree.

Parameters

args : Any Positional arguments for the Tree widget. *kwargs : Any Keyword arguments for the Tree widget.

Returns

None

Source code in src/ectop/widgets/sidebar.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """
    Initialize the SuiteTree.

    Parameters
    ----------
    *args : Any
        Positional arguments for the Tree widget.
    **kwargs : Any
        Keyword arguments for the Tree widget.

    Returns
    -------
    None
    """
    super().__init__(*args, **kwargs)
    self.filters: list[str | None] = TREE_FILTERS
    self.host: str = ""
    self.port: int = 0
    self._all_paths_cache: list[str] | None = None

action_cycle_filter()

Cycle through available status filters and refresh the tree.

Returns

None

Source code in src/ectop/widgets/sidebar.py
def action_cycle_filter(self) -> None:
    """
    Cycle through available status filters and refresh the tree.

    Returns
    -------
    None
    """
    current_idx = self.filters.index(self.current_filter)
    next_idx = (current_idx + 1) % len(self.filters)
    self.current_filter = self.filters[next_idx]

    self.app.notify(f"Filter: {self.current_filter or 'All'}")

find_and_select(query)

Find nodes matching query in the ecFlow definitions and select them.

This handles searching through unloaded parts of the tree in a background thread to keep the UI responsive.

Parameters

query : str The search query.

Returns

None

Notes

This is a background worker.

Source code in src/ectop/widgets/sidebar.py
@work(exclusive=True, thread=True)
def find_and_select(self, query: str) -> None:
    """
    Find nodes matching query in the ecFlow definitions and select them.

    This handles searching through unloaded parts of the tree in a
    background thread to keep the UI responsive.

    Parameters
    ----------
    query : str
        The search query.

    Returns
    -------
    None

    Notes
    -----
    This is a background worker.
    """
    self._find_and_select_logic(query)

on_tree_node_expanded(event)

Handle node expansion to load children on demand.

Parameters

event : Tree.NodeExpanded[str] The expansion event.

Returns

None

Source code in src/ectop/widgets/sidebar.py
def on_tree_node_expanded(self, event: Tree.NodeExpanded[str]) -> None:
    """
    Handle node expansion to load children on demand.

    Parameters
    ----------
    event : Tree.NodeExpanded[str]
        The expansion event.

    Returns
    -------
    None
    """
    node = event.node
    self._load_children(node)

select_by_path(path)

Select a node by its absolute ecFlow path, expanding parents as needed.

Parameters

path : str The absolute path of the node to select.

Returns

None

Notes

This is a background worker to avoid blocking the UI thread when loading many nested nodes synchronously.

Source code in src/ectop/widgets/sidebar.py
@work(thread=True)
def select_by_path(self, path: str) -> None:
    """
    Select a node by its absolute ecFlow path, expanding parents as needed.

    Parameters
    ----------
    path : str
        The absolute path of the node to select.

    Returns
    -------
    None

    Notes
    -----
    This is a background worker to avoid blocking the UI thread when
    loading many nested nodes synchronously.
    """
    self._select_by_path_logic(path)

update_tree(client_host, client_port, defs)

Update the tree data.

Parameters

client_host : str The hostname of the ecFlow server. client_port : int The port of the ecFlow server. defs : ecflow.Defs | None The ecFlow definitions to display.

Returns

None

Notes

This method triggers the reactive watchers.

Source code in src/ectop/widgets/sidebar.py
def update_tree(self, client_host: str, client_port: int, defs: Defs | None) -> None:
    """
    Update the tree data.

    Parameters
    ----------
    client_host : str
        The hostname of the ecFlow server.
    client_port : int
        The port of the ecFlow server.
    defs : ecflow.Defs | None
        The ecFlow definitions to display.

    Returns
    -------
    None

    Notes
    -----
    This method triggers the reactive watchers.
    """
    self.host = client_host
    self.port = client_port
    self.defs = defs

watch_current_filter(new_filter)

Watch for changes in the current filter and rebuild the tree.

Parameters

new_filter : str | None The new filter value.

Source code in src/ectop/widgets/sidebar.py
def watch_current_filter(self, new_filter: str | None) -> None:
    """
    Watch for changes in the current filter and rebuild the tree.

    Parameters
    ----------
    new_filter : str | None
        The new filter value.
    """
    self._rebuild_tree()

watch_defs(new_defs)

Watch for changes in definitions and rebuild the tree.

Parameters

new_defs : ecflow.Defs | None The new ecFlow definitions.

Source code in src/ectop/widgets/sidebar.py
def watch_defs(self, new_defs: Defs | None) -> None:
    """
    Watch for changes in definitions and rebuild the tree.

    Parameters
    ----------
    new_defs : ecflow.Defs | None
        The new ecFlow definitions.
    """
    self._rebuild_tree()

Status bar widget for ectop.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

StatusBar

Bases: Static

A status bar widget to display server information and health.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Source code in src/ectop/widgets/statusbar.py
class StatusBar(Static):
    """
    A status bar widget to display server information and health.

    .. note::
        If you modify features, API, or usage, you MUST update the documentation immediately.
    """

    server_info: reactive[str] = reactive("Disconnected")
    """The host:port string of the ecFlow server."""

    last_sync: reactive[str] = reactive("Never")
    """The timestamp of the last successful synchronization."""

    status: reactive[str] = reactive("Unknown")
    """The current status of the ecFlow server."""

    server_version: reactive[str] = reactive("Unknown")
    """The version of the ecFlow server."""

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """
        Initialize the StatusBar.

        Parameters
        ----------
        *args : Any
            Positional arguments for the Static widget.
        **kwargs : Any
            Keyword arguments for the Static widget.
        """
        super().__init__(*args, **kwargs)

    def update_status(self, host: str, port: int, status: str = "Connected", version: str = "Unknown") -> None:
        """
        Update the status bar information.

        Parameters
        ----------
        host : str
            The ecFlow server hostname.
        port : int
            The ecFlow server port.
        status : str, optional
            The server status message, by default "Connected".
        version : str, optional
            The ecFlow server version, by default "Unknown".
        """
        self.server_info = f"{host}:{port}"
        self.status = str(status)
        self.server_version = str(version)
        self.last_sync = datetime.now().strftime("%H:%M:%S")

    def render(self) -> Text:
        """
        Render the status bar.

        Returns
        -------
        Text
            The rendered status bar content.
        """
        status_color = "red"
        if self.status == "RUNNING":
            status_color = "green"
        elif self.status == "HALTED":
            status_color = COLOR_STATUS_HALTED
        elif "Connected" in self.status:
            status_color = "green"

        return Text.assemble(
            (" Server: ", "bold"),
            (self.server_info, "cyan"),
            (" (v", "bold"),
            (self.server_version, "magenta"),
            (")", "bold"),
            (" | Status: ", "bold"),
            (self.status, status_color),
            (" | Last Sync: ", "bold"),
            (self.last_sync, "yellow"),
        )

last_sync = reactive('Never') class-attribute instance-attribute

The timestamp of the last successful synchronization.

server_info = reactive('Disconnected') class-attribute instance-attribute

The host:port string of the ecFlow server.

server_version = reactive('Unknown') class-attribute instance-attribute

The version of the ecFlow server.

status = reactive('Unknown') class-attribute instance-attribute

The current status of the ecFlow server.

__init__(*args, **kwargs)

Initialize the StatusBar.

Parameters

args : Any Positional arguments for the Static widget. *kwargs : Any Keyword arguments for the Static widget.

Source code in src/ectop/widgets/statusbar.py
def __init__(self, *args: Any, **kwargs: Any) -> None:
    """
    Initialize the StatusBar.

    Parameters
    ----------
    *args : Any
        Positional arguments for the Static widget.
    **kwargs : Any
        Keyword arguments for the Static widget.
    """
    super().__init__(*args, **kwargs)

render()

Render the status bar.

Returns

Text The rendered status bar content.

Source code in src/ectop/widgets/statusbar.py
def render(self) -> Text:
    """
    Render the status bar.

    Returns
    -------
    Text
        The rendered status bar content.
    """
    status_color = "red"
    if self.status == "RUNNING":
        status_color = "green"
    elif self.status == "HALTED":
        status_color = COLOR_STATUS_HALTED
    elif "Connected" in self.status:
        status_color = "green"

    return Text.assemble(
        (" Server: ", "bold"),
        (self.server_info, "cyan"),
        (" (v", "bold"),
        (self.server_version, "magenta"),
        (")", "bold"),
        (" | Status: ", "bold"),
        (self.status, status_color),
        (" | Last Sync: ", "bold"),
        (self.last_sync, "yellow"),
    )

update_status(host, port, status='Connected', version='Unknown')

Update the status bar information.

Parameters

host : str The ecFlow server hostname. port : int The ecFlow server port. status : str, optional The server status message, by default "Connected". version : str, optional The ecFlow server version, by default "Unknown".

Source code in src/ectop/widgets/statusbar.py
def update_status(self, host: str, port: int, status: str = "Connected", version: str = "Unknown") -> None:
    """
    Update the status bar information.

    Parameters
    ----------
    host : str
        The ecFlow server hostname.
    port : int
        The ecFlow server port.
    status : str, optional
        The server status message, by default "Connected".
    version : str, optional
        The ecFlow server version, by default "Unknown".
    """
    self.server_info = f"{host}:{port}"
    self.status = str(status)
    self.server_version = str(version)
    self.last_sync = datetime.now().strftime("%H:%M:%S")

Modals

Confirmation modal dialog.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

ConfirmModal

Bases: ModalScreen[None]

A modal screen for confirmation actions.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Source code in src/ectop/widgets/modals/confirm.py
class ConfirmModal(ModalScreen[None]):
    """
    A modal screen for confirmation actions.

    .. note::
        If you modify features, API, or usage, you MUST update the documentation immediately.
    """

    BINDINGS = [
        Binding("escape", "close", "Cancel"),
        Binding("y", "confirm", "Yes"),
        Binding("n", "close", "No"),
    ]

    def __init__(self, message: str, callback: Callable[[], None]) -> None:
        """
        Initialize the ConfirmModal.

        Parameters
        ----------
        message : str
            The message to display in the modal.
        callback : Callable[[], None]
            The function to call if confirmed.
        """
        super().__init__()
        self.message: str = message
        self.callback: Callable[[], None] = callback

    def compose(self) -> ComposeResult:
        """
        Compose the modal UI.

        Returns
        -------
        ComposeResult
            The UI components for the modal.
        """
        with Vertical(id="confirm_container"):
            yield Static(self.message, id="confirm_message")
            with Horizontal(id="confirm_actions"):
                yield Button("Yes (y)", variant="success", id="yes_btn")
                yield Button("No (n)", variant="error", id="no_btn")

    def action_close(self) -> None:
        """Close the modal without confirming."""
        self.app.pop_screen()

    def action_confirm(self) -> None:
        """Confirm the action and call the callback."""
        self.callback()
        self.app.pop_screen()

    def on_button_pressed(self, event: Button.Pressed) -> None:
        """
        Handle button press events.

        Parameters
        ----------
        event : Button.Pressed
            The button press event.
        """
        if event.button.id == "yes_btn":
            self.action_confirm()
        else:
            self.action_close()

__init__(message, callback)

Initialize the ConfirmModal.

Parameters

message : str The message to display in the modal. callback : Callable[[], None] The function to call if confirmed.

Source code in src/ectop/widgets/modals/confirm.py
def __init__(self, message: str, callback: Callable[[], None]) -> None:
    """
    Initialize the ConfirmModal.

    Parameters
    ----------
    message : str
        The message to display in the modal.
    callback : Callable[[], None]
        The function to call if confirmed.
    """
    super().__init__()
    self.message: str = message
    self.callback: Callable[[], None] = callback

action_close()

Close the modal without confirming.

Source code in src/ectop/widgets/modals/confirm.py
def action_close(self) -> None:
    """Close the modal without confirming."""
    self.app.pop_screen()

action_confirm()

Confirm the action and call the callback.

Source code in src/ectop/widgets/modals/confirm.py
def action_confirm(self) -> None:
    """Confirm the action and call the callback."""
    self.callback()
    self.app.pop_screen()

compose()

Compose the modal UI.

Returns

ComposeResult The UI components for the modal.

Source code in src/ectop/widgets/modals/confirm.py
def compose(self) -> ComposeResult:
    """
    Compose the modal UI.

    Returns
    -------
    ComposeResult
        The UI components for the modal.
    """
    with Vertical(id="confirm_container"):
        yield Static(self.message, id="confirm_message")
        with Horizontal(id="confirm_actions"):
            yield Button("Yes (y)", variant="success", id="yes_btn")
            yield Button("No (n)", variant="error", id="no_btn")

on_button_pressed(event)

Handle button press events.

Parameters

event : Button.Pressed The button press event.

Source code in src/ectop/widgets/modals/confirm.py
def on_button_pressed(self, event: Button.Pressed) -> None:
    """
    Handle button press events.

    Parameters
    ----------
    event : Button.Pressed
        The button press event.
    """
    if event.button.id == "yes_btn":
        self.action_confirm()
    else:
        self.action_close()

Modal screen for viewing and editing ecFlow variables.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

VariableTweaker

Bases: ModalScreen[None]

A modal screen for managing ecFlow node variables.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Source code in src/ectop/widgets/modals/variables.py
class VariableTweaker(ModalScreen[None]):
    """
    A modal screen for managing ecFlow node variables.

    .. note::
        If you modify features, API, or usage, you MUST update the documentation immediately.
    """

    BINDINGS = [
        Binding("escape", "close", "Close"),
        Binding("v", "close", "Close"),
        Binding("a", "add_variable", "Add Variable"),
        Binding("d", "delete_variable", "Delete Variable"),
    ]

    def __init__(self, node_path: str, client: EcflowClient) -> None:
        """
        Initialize the VariableTweaker.

        Parameters
        ----------
        node_path : str
            The absolute path to the ecFlow node.
        client : EcflowClient
            The ecFlow client instance.

        Returns
        -------
        None
        """
        super().__init__()
        self.node_path: str = node_path
        self.client: EcflowClient = client
        self.selected_var_name: str | None = None

    def compose(self) -> ComposeResult:
        """
        Compose the modal UI.

        Returns
        -------
        ComposeResult
            The UI components for the modal.
        """
        with Vertical(id="var_container"):
            yield Static(f"Variables for {self.node_path}", id="var_title")
            yield DataTable(id="var_table")
            yield Input(placeholder="Enter new value...", id="var_input")
            with Horizontal(id="var_actions"):
                yield Button("Close", variant="primary", id="close_btn")

    def on_mount(self) -> None:
        """
        Handle the mount event to initialize the table.

        Returns
        -------
        None
        """
        table = self.query_one("#var_table", DataTable)
        table.add_columns("Name", "Value", "Type")
        table.cursor_type = "row"
        self.refresh_vars()
        self.query_one("#var_input").add_class("hidden")

    def action_close(self) -> None:
        """
        Close the modal.

        Returns
        -------
        None
        """
        self.app.pop_screen()

    def on_button_pressed(self, event: Button.Pressed) -> None:
        """
        Handle button press events.

        Parameters
        ----------
        event : Button.Pressed
            The button press event.

        Returns
        -------
        None
        """
        if event.button.id == "close_btn":
            self.app.pop_screen()

    @work(thread=True)
    def refresh_vars(self) -> None:
        """
        Fetch variables from the server and refresh the table in a background worker.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs blocking I/O.
        """
        self._refresh_vars_logic()

    def _refresh_vars_logic(self) -> None:
        """
        The actual logic for fetching variables and updating the UI.

        Returns
        -------
        None

        Raises
        ------
        RuntimeError
            If server synchronization fails.

        Notes
        -----
        This method can be called directly for testing.
        """
        try:
            self.client.sync_local()
            defs = self.client.get_defs()
            if not defs:
                return
            node = defs.find_abs_node(self.node_path)

            if not node:
                self.app.call_from_thread(self.app.notify, "Node not found", severity="error")
                return

            rows: list[tuple[str, str, str, str]] = []
            seen_vars: set[str] = set()

            # User variables
            for var in node.variables:
                rows.append((var.name(), var.value(), VAR_TYPE_USER, var.name()))
                seen_vars.add(var.name())

            # Generated variables
            for var in node.get_generated_variables():
                rows.append((var.name(), var.value(), VAR_TYPE_GENERATED, var.name()))
                seen_vars.add(var.name())

            # Inherited variables (climb up the tree)
            parent = node.get_parent()
            while parent:
                for var in parent.variables:
                    # Only add if not already present (overridden)
                    if var.name() not in seen_vars:
                        rows.append(
                            (
                                var.name(),
                                var.value(),
                                f"{VAR_TYPE_INHERITED} ({parent.name()})",
                                f"{INHERITED_VAR_PREFIX}{var.name()}",
                            )
                        )
                        seen_vars.add(var.name())
                parent = parent.get_parent()

            self.app.call_from_thread(self._update_table, rows)

        except RuntimeError as e:
            self.app.call_from_thread(self.app.notify, f"Error fetching variables: {e}", severity="error")
        except Exception as e:
            self.app.call_from_thread(self.app.notify, f"Unexpected Error: {e}", severity="error")

    def _update_table(self, rows: list[tuple[str, str, str, str]]) -> None:
        """
        Update the DataTable with new rows.

        Parameters
        ----------
        rows : list[tuple[str, str, str, str]]
            The rows to add to the table.

        Returns
        -------
        None
        """
        table = self.query_one("#var_table", DataTable)
        table.clear()
        for row in rows:
            table.add_row(row[0], row[1], row[2], key=row[3])

    def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
        """
        Handle row selection to start editing a variable.

        Parameters
        ----------
        event : DataTable.RowSelected
            The row selection event.

        Returns
        -------
        None
        """
        row_key = event.row_key.value
        if row_key and row_key.startswith(INHERITED_VAR_PREFIX):
            self.app.notify("Cannot edit inherited variables directly. Add it to this node to override.", severity="warning")
            return

        self.selected_var_name = row_key
        input_field = self.query_one("#var_input", Input)
        input_field.remove_class("hidden")
        input_field.focus()

    def on_input_submitted(self, event: Input.Submitted) -> None:
        """
        Handle variable submission (add or update).

        Parameters
        ----------
        event : Input.Submitted
            The input submission event.

        Returns
        -------
        None
        """
        if event.input.id == "var_input":
            self._submit_variable_worker(event.value)

    @work(thread=True)
    def _submit_variable_worker(self, value: str) -> None:
        """
        Worker to submit a new or updated variable in a background thread.

        Parameters
        ----------
        value : str
            The new value or 'name=value' string.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs blocking I/O.
        """
        self._submit_variable_logic(value)

    def _submit_variable_logic(self, value: str) -> None:
        """
        The actual logic for submitting a variable update or addition.

        Parameters
        ----------
        value : str
            The new value or 'name=value' string.

        Returns
        -------
        None

        Raises
        ------
        RuntimeError
            If the server alteration fails.

        Notes
        -----
        This method can be called directly for testing.
        """
        try:
            if self.selected_var_name:
                # Editing existing
                self.client.alter(self.node_path, "add_variable", self.selected_var_name, value)
                self.app.call_from_thread(self.app.notify, f"Updated {self.selected_var_name}")
            else:
                # Adding new (expecting name=value)
                if "=" in value:
                    name, val = value.split("=", 1)
                    self.client.alter(self.node_path, "add_variable", name.strip(), val.strip())
                    self.app.call_from_thread(self.app.notify, f"Added {name.strip()}")
                else:
                    self.app.call_from_thread(self.app.notify, "Use name=value format to add", severity="warning")
                    return

            self.app.call_from_thread(self._reset_input)
            self.app.call_from_thread(self.refresh_vars)
        except RuntimeError as e:
            self.app.call_from_thread(self.app.notify, f"Error: {e}", severity="error")
        except Exception as e:
            self.app.call_from_thread(self.app.notify, f"Unexpected Error: {e}", severity="error")

    def _reset_input(self) -> None:
        """
        Reset the input field state.

        Returns
        -------
        None
        """
        input_field = self.query_one("#var_input", Input)
        input_field.add_class("hidden")
        input_field.value = ""
        input_field.placeholder = "Enter new value..."
        self.query_one("#var_table").focus()

    def action_add_variable(self) -> None:
        """
        Show the input field to add a new variable.

        Returns
        -------
        None
        """
        input_field = self.query_one("#var_input", Input)
        input_field.placeholder = "Enter name=value to add"
        input_field.remove_class("hidden")
        input_field.focus()
        self.selected_var_name = None

    def action_delete_variable(self) -> None:
        """
        Delete the selected variable from the server.

        Returns
        -------
        None
        """
        table = self.query_one("#var_table", DataTable)
        row_index = table.cursor_row
        if row_index is not None:
            # Get row key from the index
            row_keys = list(table.rows.keys())
            row_key = row_keys[row_index].value
            if row_key:
                self._delete_variable_worker(row_key)

    @work(thread=True)
    def _delete_variable_worker(self, row_key: str) -> None:
        """
        Worker to delete a variable from the server in a background thread.

        Parameters
        ----------
        row_key : str
            The name (or key) of the variable to delete.

        Returns
        -------
        None

        Notes
        -----
        This is a background worker that performs blocking I/O.
        """
        self._delete_variable_logic(row_key)

    def _delete_variable_logic(self, row_key: str) -> None:
        """
        The actual logic for deleting a variable.

        Parameters
        ----------
        row_key : str
            The name (or key) of the variable to delete.

        Returns
        -------
        None

        Raises
        ------
        RuntimeError
            If the server alteration fails.

        Notes
        -----
        This method can be called directly for testing.
        """
        if row_key.startswith(INHERITED_VAR_PREFIX):
            self.app.call_from_thread(self.app.notify, "Cannot delete inherited variables", severity="error")
            return

        try:
            self.client.alter(self.node_path, "delete_variable", row_key)
            self.app.call_from_thread(self.app.notify, f"Deleted {row_key}")
            self.app.call_from_thread(self.refresh_vars)
        except RuntimeError as e:
            self.app.call_from_thread(self.app.notify, f"Error: {e}", severity="error")
        except Exception as e:
            self.app.call_from_thread(self.app.notify, f"Unexpected Error: {e}", severity="error")

__init__(node_path, client)

Initialize the VariableTweaker.

Parameters

node_path : str The absolute path to the ecFlow node. client : EcflowClient The ecFlow client instance.

Returns

None

Source code in src/ectop/widgets/modals/variables.py
def __init__(self, node_path: str, client: EcflowClient) -> None:
    """
    Initialize the VariableTweaker.

    Parameters
    ----------
    node_path : str
        The absolute path to the ecFlow node.
    client : EcflowClient
        The ecFlow client instance.

    Returns
    -------
    None
    """
    super().__init__()
    self.node_path: str = node_path
    self.client: EcflowClient = client
    self.selected_var_name: str | None = None

action_add_variable()

Show the input field to add a new variable.

Returns

None

Source code in src/ectop/widgets/modals/variables.py
def action_add_variable(self) -> None:
    """
    Show the input field to add a new variable.

    Returns
    -------
    None
    """
    input_field = self.query_one("#var_input", Input)
    input_field.placeholder = "Enter name=value to add"
    input_field.remove_class("hidden")
    input_field.focus()
    self.selected_var_name = None

action_close()

Close the modal.

Returns

None

Source code in src/ectop/widgets/modals/variables.py
def action_close(self) -> None:
    """
    Close the modal.

    Returns
    -------
    None
    """
    self.app.pop_screen()

action_delete_variable()

Delete the selected variable from the server.

Returns

None

Source code in src/ectop/widgets/modals/variables.py
def action_delete_variable(self) -> None:
    """
    Delete the selected variable from the server.

    Returns
    -------
    None
    """
    table = self.query_one("#var_table", DataTable)
    row_index = table.cursor_row
    if row_index is not None:
        # Get row key from the index
        row_keys = list(table.rows.keys())
        row_key = row_keys[row_index].value
        if row_key:
            self._delete_variable_worker(row_key)

compose()

Compose the modal UI.

Returns

ComposeResult The UI components for the modal.

Source code in src/ectop/widgets/modals/variables.py
def compose(self) -> ComposeResult:
    """
    Compose the modal UI.

    Returns
    -------
    ComposeResult
        The UI components for the modal.
    """
    with Vertical(id="var_container"):
        yield Static(f"Variables for {self.node_path}", id="var_title")
        yield DataTable(id="var_table")
        yield Input(placeholder="Enter new value...", id="var_input")
        with Horizontal(id="var_actions"):
            yield Button("Close", variant="primary", id="close_btn")

on_button_pressed(event)

Handle button press events.

Parameters

event : Button.Pressed The button press event.

Returns

None

Source code in src/ectop/widgets/modals/variables.py
def on_button_pressed(self, event: Button.Pressed) -> None:
    """
    Handle button press events.

    Parameters
    ----------
    event : Button.Pressed
        The button press event.

    Returns
    -------
    None
    """
    if event.button.id == "close_btn":
        self.app.pop_screen()

on_data_table_row_selected(event)

Handle row selection to start editing a variable.

Parameters

event : DataTable.RowSelected The row selection event.

Returns

None

Source code in src/ectop/widgets/modals/variables.py
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
    """
    Handle row selection to start editing a variable.

    Parameters
    ----------
    event : DataTable.RowSelected
        The row selection event.

    Returns
    -------
    None
    """
    row_key = event.row_key.value
    if row_key and row_key.startswith(INHERITED_VAR_PREFIX):
        self.app.notify("Cannot edit inherited variables directly. Add it to this node to override.", severity="warning")
        return

    self.selected_var_name = row_key
    input_field = self.query_one("#var_input", Input)
    input_field.remove_class("hidden")
    input_field.focus()

on_input_submitted(event)

Handle variable submission (add or update).

Parameters

event : Input.Submitted The input submission event.

Returns

None

Source code in src/ectop/widgets/modals/variables.py
def on_input_submitted(self, event: Input.Submitted) -> None:
    """
    Handle variable submission (add or update).

    Parameters
    ----------
    event : Input.Submitted
        The input submission event.

    Returns
    -------
    None
    """
    if event.input.id == "var_input":
        self._submit_variable_worker(event.value)

on_mount()

Handle the mount event to initialize the table.

Returns

None

Source code in src/ectop/widgets/modals/variables.py
def on_mount(self) -> None:
    """
    Handle the mount event to initialize the table.

    Returns
    -------
    None
    """
    table = self.query_one("#var_table", DataTable)
    table.add_columns("Name", "Value", "Type")
    table.cursor_type = "row"
    self.refresh_vars()
    self.query_one("#var_input").add_class("hidden")

refresh_vars()

Fetch variables from the server and refresh the table in a background worker.

Returns

None

Notes

This is a background worker that performs blocking I/O.

Source code in src/ectop/widgets/modals/variables.py
@work(thread=True)
def refresh_vars(self) -> None:
    """
    Fetch variables from the server and refresh the table in a background worker.

    Returns
    -------
    None

    Notes
    -----
    This is a background worker that performs blocking I/O.
    """
    self._refresh_vars_logic()

Modal screen for inspecting why an ecFlow node is not running.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

DepData dataclass

Intermediate data structure for dependency information.

Attributes:

Name Type Description
label str

The text to display for this dependency.

path str | None

The ecFlow path if this represents a node, otherwise None.

is_met bool

Whether this dependency is currently satisfied.

children list[DepData]

Nested dependencies.

icon str | None

Optional icon override.

Source code in src/ectop/widgets/modals/why.py
@dataclass
class DepData:
    """
    Intermediate data structure for dependency information.

    Attributes:
        label: The text to display for this dependency.
        path: The ecFlow path if this represents a node, otherwise None.
        is_met: Whether this dependency is currently satisfied.
        children: Nested dependencies.
        icon: Optional icon override.
    """

    label: str
    path: str | None = None
    is_met: bool = True
    children: list[DepData] = field(default_factory=list)
    icon: str | None = None

WhyInspector

Bases: ModalScreen[None]

A modal screen to inspect dependencies and triggers of an ecFlow node.

.. note:: If you modify features, API, or usage, you MUST update the documentation immediately.

Source code in src/ectop/widgets/modals/why.py
class WhyInspector(ModalScreen[None]):
    """
    A modal screen to inspect dependencies and triggers of an ecFlow node.

    .. note::
        If you modify features, API, or usage, you MUST update the documentation immediately.
    """

    BINDINGS = [
        Binding("escape", "close", "Close"),
        Binding("w", "close", "Close"),
    ]

    def __init__(self, node_path: str, client: EcflowClient) -> None:
        """
        Initialize the WhyInspector.

        Parameters:
            node_path: The absolute path to the ecFlow node.
            client: The ecFlow client instance.
        """
        super().__init__()
        self.node_path: str = node_path
        self.client: EcflowClient = client

    def compose(self) -> ComposeResult:
        """
        Compose the modal UI.

        Returns:
            The UI components for the modal.
        """
        with Vertical(id="why_container"):
            yield Static(f"Why is {self.node_path} not running?", id="why_title")
            yield Tree("Dependencies", id="dep_tree")
            with Horizontal(id="why_actions"):
                yield Button("Close", variant="primary", id="close_btn")

    def on_mount(self) -> None:
        """
        Handle the mount event to initialize the dependency tree.
        """
        self.refresh_deps()

    def action_close(self) -> None:
        """
        Close the modal.
        """
        self.app.pop_screen()

    def on_button_pressed(self, event: Button.Pressed) -> None:
        """
        Handle button press events.

        Parameters:
            event: The button press event.
        """
        if event.button.id == "close_btn":
            self.app.pop_screen()

    def on_tree_node_selected(self, event: Tree.NodeSelected[str]) -> None:
        """
        Jump to the selected dependency node in the main tree.

        Parameters:
            event: The tree node selection event.
        """
        node_path = event.node.data
        if node_path:
            from ectop.widgets.sidebar import SuiteTree

            try:
                tree = self.app.query_one("#suite_tree", SuiteTree)
                tree.select_by_path(node_path)
                self.app.notify(f"Jumped to {node_path}")
                self.app.pop_screen()
            except Exception as e:
                self.app.notify(f"Failed to jump: {e}", severity="error")

    def refresh_deps(self) -> None:
        """
        Fetch dependencies from the server and rebuild the tree.
        """
        tree = self.query_one("#dep_tree", Tree)
        self._refresh_deps_worker(tree)

    @work(thread=True)
    def _refresh_deps_worker(self, tree: Tree) -> None:
        """
        Worker to fetch dependencies from the server and rebuild the tree.

        Parameters:
            tree: The tree widget to refresh.

        Notes:
            This is a background worker that performs blocking I/O.
        """
        self._refresh_deps_logic(tree)

    def _refresh_deps_logic(self, tree: Tree) -> None:
        """
        The actual logic for fetching dependencies and updating the UI tree.

        Parameters:
            tree: The tree widget to refresh.

        Raises:
            RuntimeError: If server synchronization fails.
        """
        try:
            self.client.sync_local()
            defs = self.client.get_defs()
            if not defs:
                self.app.call_from_thread(self._update_tree_ui, tree, DepData("Server Empty"))
                return

            node = defs.find_abs_node(self.node_path)
            if not node:
                self.app.call_from_thread(self._update_tree_ui, tree, DepData("Node not found"))
                return

            # Gather data in the worker thread
            dep_data = self._gather_dependency_data(node, defs)

            # Update UI on the main thread
            self.app.call_from_thread(self._update_tree_ui, tree, dep_data)

        except RuntimeError as e:
            self.app.call_from_thread(self._update_tree_ui, tree, DepData(f"Error: {e}"))
        except Exception as e:
            self.app.call_from_thread(self._update_tree_ui, tree, DepData(f"Unexpected Error: {e}"))

    def _gather_dependency_data(self, node: Node, defs: Defs) -> DepData:
        """
        Gather dependency data from an ecFlow node.

        Parameters
        ----------
        node : ecflow.Node
            The ecFlow node to inspect.
        defs : ecflow.Defs
            The ecFlow definitions for node lookups.

        Returns
        -------
        DepData
            The root dependency data object.
        """
        root = DepData("Dependencies")

        # Reason
        try:
            why_str = node.get_why()
            if why_str:
                root.children.append(DepData(f"Reason: {why_str}", icon=ICON_REASON))
        except (AttributeError, RuntimeError):
            pass

        # Triggers
        try:
            trigger = node.get_trigger()
            if trigger:
                trigger_root = DepData("Triggers")
                self._parse_expression_data(trigger_root, trigger.get_expression(), defs)
                root.children.append(trigger_root)
        except (AttributeError, RuntimeError) as e:
            root.children.append(DepData(f"Trigger Error: {e}", icon=ICON_NOT_MET, is_met=False))

        # Complete
        try:
            complete = node.get_complete()
            if complete:
                complete_root = DepData("Complete Expression")
                self._parse_expression_data(complete_root, complete.get_expression(), defs)
                root.children.append(complete_root)
        except (AttributeError, RuntimeError) as e:
            root.children.append(DepData(f"Complete Expr Error: {e}", icon=ICON_NOT_MET, is_met=False))

        # Limits
        try:
            inlimits = list(node.inlimits)
            if inlimits:
                limit_root = DepData("Limits")
                for il in inlimits:
                    limit_root.children.append(DepData(f"Limit: {il.name()} (Path: {il.value()})", icon="🔒"))
                root.children.append(limit_root)
        except (AttributeError, RuntimeError):
            pass

        # Times, Dates, Crons
        try:
            time_root = DepData("Time Dependencies")
            has_time = False
            for t in node.get_times():
                time_root.children.append(DepData(f"Time: {t}", icon=ICON_TIME))
                has_time = True
            for d in node.get_dates():
                time_root.children.append(DepData(f"Date: {d}", icon=ICON_DATE))
                has_time = True
            for c in node.get_crons():
                time_root.children.append(DepData(f"Cron: {c}", icon=ICON_CRON))
                has_time = True
            if has_time:
                root.children.append(time_root)
        except (AttributeError, RuntimeError):
            pass

        return root

    def _parse_expression_data(self, parent: DepData, expr_str: str, defs: Defs) -> bool:
        """
        Parse an ecFlow expression and populate DepData objects.

        Parameters
        ----------
        parent : DepData
            The parent DepData object.
        expr_str : str
            The expression string to parse.
        defs : ecflow.Defs
            The ecFlow definitions for node lookups.

        Returns
        -------
        bool
            True if the expression is currently met.
        """
        try:
            expr_str = expr_str.strip()
            if not expr_str:
                return True

            # Remove outer parentheses
            while expr_str.startswith("(") and expr_str.endswith(")"):
                depth = 0
                is_pair = True
                for i, char in enumerate(expr_str):
                    if char == "(":
                        depth += 1
                    elif char == ")":
                        depth -= 1
                    if depth == 0 and i < len(expr_str) - 1:
                        is_pair = False
                        break
                if is_pair:
                    expr_str = expr_str[1:-1].strip()
                else:
                    break

            # NOT operator
            if expr_str.startswith("!"):
                not_node = DepData("NOT (Must be false)")
                inner_met = self._parse_expression_data(not_node, expr_str[1:].strip(), defs)
                is_met = not inner_met
                not_node.is_met = is_met
                parent.children.append(not_node)
                return is_met

            # AND/OR operators
            for op, label in [(" or ", EXPR_OR_LABEL), (" and ", EXPR_AND_LABEL)]:
                depth = 0
                for i in range(len(expr_str)):
                    if expr_str[i] == "(":
                        depth += 1
                    elif expr_str[i] == ")":
                        depth -= 1
                    elif depth == 0 and expr_str[i : i + len(op)] == op:
                        op_node = DepData(label)
                        left = expr_str[:i].strip()
                        right = expr_str[i + len(op) :].strip()
                        is_met_left = self._parse_expression_data(op_node, left, defs)
                        is_met_right = self._parse_expression_data(op_node, right, defs)
                        is_met = (is_met_left or is_met_right) if op == " or " else (is_met_left and is_met_right)
                        op_node.is_met = is_met
                        parent.children.append(op_node)
                        return is_met

            # Leaf node
            match = re.search(r"(!?\s*)(/[a-zA-Z0-9_\-\./]+)(\s*(==|!=|<=|>=|<|>)\s*(\w+))?", expr_str)
            if match:
                negation = match.group(1).strip()
                path = match.group(2)
                op = match.group(4) or "=="
                expected_state = match.group(5) or "complete"
                target_node = defs.find_abs_node(path)

                if target_node is not None:
                    actual_state = str(target_node.get_state())
                    is_met = False
                    if op == "==":
                        is_met = actual_state == expected_state
                    elif op == "!=":
                        is_met = actual_state != expected_state

                    if negation == "!":
                        is_met = not is_met

                    neg_str = "! " if negation == "!" else ""
                    label = f"{neg_str}{path} {op} {actual_state} (Expected: {expected_state})"
                    if actual_state == "aborted":
                        label = f"[b red]{label} (STOPPED HERE)[/]"

                    parent.children.append(DepData(label, path=path, is_met=is_met))
                    return is_met
                else:
                    parent.children.append(DepData(f"{path} (Not found)", is_met=False, icon=ICON_UNKNOWN))
                    return False
            else:
                parent.children.append(DepData(expr_str, icon=ICON_NOTE))
                return True
        except Exception as e:
            parent.children.append(DepData(f"Parse Error: {expr_str} ({e})", is_met=False, icon=ICON_NOT_MET))
            return False

    def _update_tree_ui(self, tree: Tree, data: DepData) -> None:
        """
        Update the tree UI from DepData.

        Parameters:
            tree: The tree widget.
            data: The root dependency data.
        """
        tree.clear()
        tree.root.label = data.label
        for child in data.children:
            self._add_to_tree(tree.root, child)
        tree.root.expand_all()

    def _add_to_tree(self, parent_node: TreeNode[str], data: DepData) -> None:
        """
        Recursively add DepData to the Textual Tree.

        Parameters:
            parent_node: The parent TreeNode.
            data: The DepData to add.
        """
        icon = data.icon or (ICON_MET if data.is_met else ICON_NOT_MET)
        label = f"{icon} {data.label}"
        new_node = parent_node.add(label, data=data.path, expand=True)
        for child in data.children:
            self._add_to_tree(new_node, child)

__init__(node_path, client)

Initialize the WhyInspector.

Parameters:

Name Type Description Default
node_path str

The absolute path to the ecFlow node.

required
client EcflowClient

The ecFlow client instance.

required
Source code in src/ectop/widgets/modals/why.py
def __init__(self, node_path: str, client: EcflowClient) -> None:
    """
    Initialize the WhyInspector.

    Parameters:
        node_path: The absolute path to the ecFlow node.
        client: The ecFlow client instance.
    """
    super().__init__()
    self.node_path: str = node_path
    self.client: EcflowClient = client

action_close()

Close the modal.

Source code in src/ectop/widgets/modals/why.py
def action_close(self) -> None:
    """
    Close the modal.
    """
    self.app.pop_screen()

compose()

Compose the modal UI.

Returns:

Type Description
ComposeResult

The UI components for the modal.

Source code in src/ectop/widgets/modals/why.py
def compose(self) -> ComposeResult:
    """
    Compose the modal UI.

    Returns:
        The UI components for the modal.
    """
    with Vertical(id="why_container"):
        yield Static(f"Why is {self.node_path} not running?", id="why_title")
        yield Tree("Dependencies", id="dep_tree")
        with Horizontal(id="why_actions"):
            yield Button("Close", variant="primary", id="close_btn")

on_button_pressed(event)

Handle button press events.

Parameters:

Name Type Description Default
event Pressed

The button press event.

required
Source code in src/ectop/widgets/modals/why.py
def on_button_pressed(self, event: Button.Pressed) -> None:
    """
    Handle button press events.

    Parameters:
        event: The button press event.
    """
    if event.button.id == "close_btn":
        self.app.pop_screen()

on_mount()

Handle the mount event to initialize the dependency tree.

Source code in src/ectop/widgets/modals/why.py
def on_mount(self) -> None:
    """
    Handle the mount event to initialize the dependency tree.
    """
    self.refresh_deps()

on_tree_node_selected(event)

Jump to the selected dependency node in the main tree.

Parameters:

Name Type Description Default
event NodeSelected[str]

The tree node selection event.

required
Source code in src/ectop/widgets/modals/why.py
def on_tree_node_selected(self, event: Tree.NodeSelected[str]) -> None:
    """
    Jump to the selected dependency node in the main tree.

    Parameters:
        event: The tree node selection event.
    """
    node_path = event.node.data
    if node_path:
        from ectop.widgets.sidebar import SuiteTree

        try:
            tree = self.app.query_one("#suite_tree", SuiteTree)
            tree.select_by_path(node_path)
            self.app.notify(f"Jumped to {node_path}")
            self.app.pop_screen()
        except Exception as e:
            self.app.notify(f"Failed to jump: {e}", severity="error")

refresh_deps()

Fetch dependencies from the server and rebuild the tree.

Source code in src/ectop/widgets/modals/why.py
def refresh_deps(self) -> None:
    """
    Fetch dependencies from the server and rebuild the tree.
    """
    tree = self.query_one("#dep_tree", Tree)
    self._refresh_deps_worker(tree)