TriggerDAGRunOperator
- You can use
wait_for_completion=True
and it will detect whether the dag passed or failed. - If you have a cleanup function make sure that it marks the upstream task as failed as appropriate.
for task_instance in kwargs['dag_run'].get_task_instances():
if task_instance.current_state() not in [State.SUCCESS, State.SKIPPED] and \
task_instance.task_id != kwargs['task_instance'].task_id:
raise Exception("Task {} failed. Failing this DAG run".format(task_instance.task_id))
No Comments