.. _error_handling: Error Handling and Resilience (Admin Guide) ============================================ This page documents how Pulsar reacts to operational failures — RabbitMQ/relay outages, Pulsar restarts, Galaxy unreachability, transient HTTP errors during staging — and what an administrator needs to configure or monitor to make those guarantees real. The single guarantee Pulsar provides is: **for every job submitted by Galaxy, exactly one terminal status update (** ``complete`` **,** ``failed`` **,** ``cancelled`` **, or** ``lost`` **) is delivered, in non-decreasing lifecycle order, within bounded time after the underlying fault clears.** The mechanisms below combine to deliver that guarantee. Each section names the configuration knobs, defaults, on-disk artifacts, and operational signals so you can verify the system is doing what it claims. .. contents:: :local: :depth: 2 ------------------------------------------------------------------------ 1. The job lifecycle and where status updates come from ------------------------------------------------------------------------ A job moves through these states (see ``pulsar/managers/status.py``): :: preprocessing -> queued -> running -> postprocessing -> complete | failed | cancelled | lost Status transitions are emitted by the *stateful* manager wrapper (``pulsar/managers/stateful.py``): * ``preprocessing`` — the moment the setup message is accepted and the staging thread begins; the job directory contains ``launch_config`` but not yet ``preprocessed``. * ``running`` — first call to ``get_status()`` that detects the job has begun executing on the underlying runner. * terminal (``complete``/``failed``/``cancelled``/``lost``) — written as the on-disk metadata file ``final_status`` after postprocessing. All transitions go through a single callback registered by the messaging binding (``pulsar/messaging/bind_amqp.py`` and ``pulsar/messaging/bind_relay.py``), which writes the payload to the **status-update outbox** and lets a background drain thread retry the publish. The synchronous publish path is gone. ------------------------------------------------------------------------ 2. The persistent status-update outbox ------------------------------------------------------------------------ Why ~~~ Before this layer existed, a Pulsar that hit a broker hiccup *just* as a job finished would log the publish exception, kill the postprocess thread, and leave Galaxy permanently unaware that the job was done. This was the single largest data-loss path in the lifecycle. How ~~~ Every status update is written to a per-manager directory before any attempt to publish: :: /-status-outbox/ # AMQP modes /-relay-status-outbox/ # relay mode Each entry is a single JSON file named ``-.json``. ``seq`` is a monotonic per-outbox counter primed from the maximum seq found on disk at startup, so a lexically sorted directory listing yields strict FIFO drain order even after a restart. A daemon thread (named ``status-outbox-``) drains pending entries every few seconds: * on enqueue success the file is removed only **after** ``publish_fn`` returns; * on publish failure the file is left on disk, the drain thread retries on the next pass, and ``enqueue`` itself never raises; * on Pulsar startup the drain thread runs once before consumer threads start, so any backlog left by the previous process is delivered first. What this gives you ~~~~~~~~~~~~~~~~~~~ * A broker outage — even one that lasts for minutes or spans a Pulsar restart — does not lose a status update. The terminal state sits on disk until the broker is reachable again. * The postprocess thread cannot be terminated by a publish failure. * If the broker eventually has to be replaced or wiped, you can simply delete the affected outbox directories *after* you accept that those updates are lost — no other Pulsar state needs touching. Configuration ~~~~~~~~~~~~~ * ``persistence_directory`` (yaml, default ``files/persisted_data``) — where the outbox lives. **Required** for at-least-once semantics. If set to ``__none__`` the outbox is disabled and the legacy synchronous publish path is used; do not run production this way. * ``status_outbox_drain_interval`` (yaml, default ``5.0`` seconds) — how often the daemon thread retries pending entries when no immediate wakeup has fired. Monitoring ~~~~~~~~~~ * The directory size of ``/*-status-outbox*/`` is the natural backpressure signal. Steady state should be 0 entries. Sustained growth means the broker (or Galaxy's consumer) is unreachable, not that Pulsar is broken. * The log line ``Outbox has N pending messages to retry`` fires on startup whenever the previous run left a backlog — alert on it if N is non-zero for more than a few drain intervals. ------------------------------------------------------------------------ 3. AMQP durability defenses ------------------------------------------------------------------------ When Pulsar talks to RabbitMQ: * ``amqp_durable: true`` (off by default) declares the ``pulsar`` exchange and every per-name queue with ``durable=true``, and stamps publishes with ``delivery_mode=2`` (persistent). With this enabled, setup, kill, and status-update messages survive a RabbitMQ restart. The default is ``false`` so existing deployments with non-durable queues keep working — RabbitMQ refuses to redeclare an existing queue with mismatched durability, so flipping this on a live deployment requires deleting the affected queues first or migrating to a fresh broker. The outbox already covers the publisher-side leak (LP1) regardless of this setting; durable queues are an extra defense for the *broker-restart* case specifically. * ``amqp_publish_retry: true`` enables kombu's connection-retry policy. When set, pulsar fills in bounded defaults (``max_retries: 5``, ``interval_start: 1``, ``interval_step: 2``, ``interval_max: 30``) so a single transient hiccup is absorbed in-band without round-tripping through the outbox drain loop. * ``amqp_acknowledge: true`` (off by default) layers an additional publisher-confirms protocol on top, with its own UUID store under ``/amqp_ack-/``. This is independent of the outbox; both can be enabled together for defense-in-depth. .. note:: To enable durable queues on an existing deployment, drain or delete the existing ``pulsar__*`` queues (e.g. ``rabbitmqctl delete_queue pulsar__setup`` for each one) before flipping ``amqp_durable: true``. On a fresh install you can simply set the flag from the start. ------------------------------------------------------------------------ 4. pulsar-relay durability defenses ------------------------------------------------------------------------ Relay uses HTTP long-polling with a per-topic *cursor* (``last_message_id``) that the consumer advances as it reads messages. Without persistence the cursor would reset on every restart, silently skipping any setup or status_update messages published while the consumer was down. Pulsar persists the cursor at: :: /-relay-cursor.json written atomically (rename-temp) on every advance, loaded on startup. The Galaxy-side ``RelayClientManager`` accepts a ``relay_cursor_path`` keyword to do the same on the receiver side. Galaxy job handlers run as separate processes — each one polls the relay independently and tracks its own cursor — so ``RelayClientManager`` expands the operator-supplied path with a **stable, per-handler identifier** before passing it to the transport. The suffix is resolved in this order: 1. ``relay_handler_id`` constructor kwarg (typically ``app.config.server_name`` from the Galaxy runner — stable across restarts). 2. ``GALAXY_SERVER_NAME`` environment variable (Galaxy's standard handler tag, set by the launcher). 3. ``pidNNN`` as a last resort, with a startup ``WARNING`` log: PID is unique per process but changes on every restart, so the persisted cursor would not be picked up by the next run and the F4 guarantee is degraded to "no Galaxy-side cursor persistence." A shared file would suffer last-writer-wins corruption when two handlers persist concurrently and could silently rewind another handler's progress, which is why the suffix is mandatory whenever ``relay_cursor_path`` is set. ------------------------------------------------------------------------ 5. Setup-message idempotency ------------------------------------------------------------------------ Setup messages can be redelivered: if Pulsar SIGKILLs after consuming a setup but before AMQP records the ack, the broker will redeliver to the next consumer. ``submit_job`` short-circuits the redelivery to a no-op **only when the prior run can finish on its own**: * the job has reached a terminal status (``final_status`` metadata exists), or * the job is still tracked by ``active_jobs`` and ``recover_active_jobs`` will resume it on next get_status(). If the prior run crashed *between* persisting ``launch_config`` and activating the job — a narrow window — there is no recovery hook, so the redelivered setup drives a fresh ``preprocess_and_launch`` instead of silently dropping the work. This is invisible to admins under normal operation; it surfaces in logs as ``Ignoring duplicate setup message for job_id ``. ------------------------------------------------------------------------ 6. Active-job recovery on restart ------------------------------------------------------------------------ Pulsar's stateful manager keeps two persistent indices under ``persistence_directory``: :: /-active-jobs/ /-preprocessing-jobs/ Each file is a job_id; its existence means the job was active when the process last ran. On startup ``recover_active_jobs`` walks both directories and: 1. for jobs in ``-preprocessing-jobs/``, re-reads ``launch_config`` and re-launches the preprocessing thread; 2. for jobs in ``-active-jobs/``, calls the manager's ``_recover_active_job`` method (e.g. requeue from the persisted command line, or re-attach to the DRMAA external id). Two outcomes you should be aware of: * If recovery cannot reattach to the job (e.g. the DRMAA external id is no longer known) Pulsar publishes a single ``lost`` status update and removes the active-jobs entry. From Galaxy's perspective the job ended with ``lost``; nothing is silently abandoned. * The ``queued_python`` runner — the one used in the resilience test framework — runs jobs as direct Pulsar subprocesses and cannot survive a Pulsar SIGKILL. Such a restart yields ``lost``, not ``complete``, for the in-flight job. **Use a real DRM (DRMAA/Slurm/Kubernetes) for workloads that need to survive Pulsar restarts.** ------------------------------------------------------------------------ 7. Staging error handling (file transfer to/from Galaxy) ------------------------------------------------------------------------ Pulsar pulls inputs from Galaxy and pushes outputs back over HTTP. Both sides go through the ``RetryActionExecutor`` with a ``should_retry=is_transient_http_error`` predicate (``pulsar/client/transport/transient.py``): * **Transient errors are retried** — 408, 425, 429, 500, 502, 503, 504, connection errors, and timeouts. Default policy retries indefinitely with bounded backoff (start 2 s, step 2 s, max 30 s). * **Permanent errors fail the job** — any 4xx other than the rate-limit set above (404, 403, 400, etc.) immediately marks the job ``failed`` with the upstream HTTP body in the captured stderr. This is why a misconfigured Galaxy URL or a deleted dataset surfaces as a single fast ``failed``, while a Galaxy restart or a load-balancer hiccup recovers transparently after retries. Configuration knobs (per-manager YAML): * ``preprocess_action_max_retries`` / ``postprocess_action_max_retries`` * ``preprocess_action_interval_start`` / ``_step`` / ``_max`` * ``preprocess_action_should_retry`` (advanced — replaces the default predicate; only do this if you understand which errors are safe to retry in your environment) ------------------------------------------------------------------------ 8. Status-update ordering guarantees ------------------------------------------------------------------------ Pulsar (sender side) ~~~~~~~~~~~~~~~~~~~~ Within one Pulsar instance, status updates for a given job are emitted in strict lifecycle order, and the outbox drains them in strict FIFO via the monotonic seq prefix described above. That ordering is preserved across process restarts: pending entries from the previous run drain before any new entries from the new run, because their seqs are smaller. Galaxy (receiver side) ~~~~~~~~~~~~~~~~~~~~~~ The broker can re-deliver a message whose ack was lost, and the relay backend can replay messages a client claims it has not yet seen. Galaxy must therefore enforce two invariants: 1. **Dedupe by** ``(job_id, status)``. The pulsar status-update payload for a given (job_id, status) is idempotent in content, so a duplicate is safe to drop. 2. **A terminal status is a sink.** Once Galaxy has recorded a terminal status for a job, drop any further updates for that job — both non-terminal regressions (a stale ``running`` arriving late) and conflicting later terminals (a delayed ``failed`` after ``complete``). The reference implementation of these rules is the ``StatusRecorder`` class in ``test/resilience/mock_galaxy/recorder.py``; treat it as the canonical behavior to mirror in production Galaxy. Explicit Galaxy-side resets ~~~~~~~~~~~~~~~~~~~~~~~~~~~ Operators (or Galaxy itself) sometimes need to reset a job's state — for example resubmitting after a ``cancel`` or restarting a ``failed`` job. Such a transition deliberately re-enters a non-terminal state from a terminal one and would be rejected by the regression guard above. The agreed escape hatch is a ``_galaxy_reset_token`` field in the status payload (any truthy value, intended to be unique per reset event): * the receiver treats the update as the start of a fresh epoch for that ``job_id``; * prior transitions are forgotten for the *current-state* view of the job; * the audit log retains the full observed history. Pulsar itself does not emit reset tokens. Galaxy is the authority on what constitutes a reset. ------------------------------------------------------------------------ 9. Failure-mode → outcome cheat sheet ------------------------------------------------------------------------ ================================================= ==================================== =========================================================================== Failure Recovery mechanism What you see in logs / on disk ================================================= ==================================== =========================================================================== Broker brief disconnect kombu/relay reconnect loop ``recoverable_exceptions`` log; queue drains on reconnect Broker down for minutes outbox holds terminal status outbox dir size > 0 until reconnect; warns on next drain RabbitMQ crash + restart (``amqp_durable: true``) durable queues + delivery_mode=2 no Pulsar-side log; messages restored from broker disk RabbitMQ crash + restart (default, non-durable) outbox replays publisher side in-flight inbound msgs are lost broker-side; outbound resume from outbox Pulsar SIGKILL during preprocessing ``-preprocessing-jobs/`` recovery ``Failed to find launch parameters`` warning if missing Pulsar SIGKILL during running (DRM) ``-active-jobs/`` recovery re-attach via runner; no status change visible to Galaxy Pulsar SIGKILL during running (queued_python) job dies, ``lost`` reported one ``lost`` status update; admin should know this runner Pulsar SIGKILL after final_status, before publish outbox replay on restart ``Outbox ... has N pending messages to retry`` warning Galaxy 5xx during input download transient retry ``Failed to execute action[...], retrying`` info logs Galaxy 4xx during input download fail-fast, ``failed`` reported single ``failed`` status; HTTP body in job stderr Galaxy unreachable during status_update outbox holds, broker buffers same as broker outage from Pulsar's perspective Both Pulsar and broker restart together outbox (+ durable queues if enabled) everything resumes once both are up; ordering preserved ================================================= ==================================== =========================================================================== ------------------------------------------------------------------------ 10. Verifying resilience in your environment ------------------------------------------------------------------------ The repo includes a docker-compose-based resilience suite under ``test/resilience/`` that exercises every failure mode above end-to-end against a real RabbitMQ, ``ghcr.io/mvdbeek/pulsar-relay``, and toxiproxy fault injection. Before relying on this guide in production, run:: docker compose -f test/resilience/docker-compose.yml up -d --build pytest test/resilience -v 48 scenarios pass across the ``amqp``, ``amqp_ack``, and ``relay`` modes. See ``test/resilience/README.md`` for layout, what each scenario asserts, and how to add new ones (e.g. for a custom runner). ------------------------------------------------------------------------ 11. Tunables, defaults, and where they live ------------------------------------------------------------------------ ================================ ======================================= ==================================================================== Setting Default Effect ================================ ======================================= ==================================================================== ``persistence_directory`` ``files/persisted_data`` outbox + active-jobs + relay cursor live here ``status_outbox_drain_interval`` ``5.0`` (seconds) background outbox retry cadence ``amqp_durable`` ``false`` opt-in: durable queues + delivery_mode=2 (broker-restart resilience) ``amqp_publish_retry`` unset (off) kombu publish retry; defaults bounded when on ``amqp_acknowledge`` ``false`` additional publisher-confirms layer ``amqp_consumer_timeout`` ``0.2`` consumer drain_events timeout (responsiveness) ``message_queue_publish`` ``true`` disable to make Pulsar receive-only ``message_queue_consume`` ``true`` disable to make Pulsar send-only ``ensure_cleanup`` ``false`` join consumer threads on shutdown ================================ ======================================= ==================================================================== For deep debugging set ``logging.loggers.pulsar.level: DEBUG`` in ``server.ini`` (the resilience tests run with INFO and grep for the bind and outbox log lines as readiness signals — the same pattern works for production health checks).