Skip to main content

TriggerDAGRunOperator

  • You can use wait_for_completion=True and it will detect whether the dag passed or failed.
  • If you have a cleanup function make sure that it marks the upstream task as failed as appropriate.
for task_instance in kwargs['dag_run'].get_task_instances():
        if task_instance.current_state() not in [State.SUCCESS, State.SKIPPED] and \
                task_instance.task_id != kwargs['task_instance'].task_id:
            raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))