useRunProcess.ts 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. import { ref, toRef, toValue } from "vue";
  2. import { useVueFlow } from "@vue-flow/core";
  3. export function useRunProcess({ graph: dagreGraph, cancelOnError = true }) {
  4. const { updateNodeData, getConnectedEdges } = useVueFlow();
  5. const graph = toRef(() => toValue(dagreGraph));
  6. const isRunning = ref(false);
  7. const executedNodes = new Set();
  8. const runningTasks = new Map();
  9. const upcomingTasks = new Set();
  10. async function runNode(node, isStart = false) {
  11. if (executedNodes.has(node.id)) {
  12. return;
  13. }
  14. upcomingTasks.add(node.id);
  15. const incomers = getConnectedEdges(node.id).filter(
  16. connection => connection.target === node.id
  17. );
  18. await Promise.all(
  19. incomers.map(incomer => until(() => !incomer.data.isAnimating))
  20. );
  21. upcomingTasks.clear();
  22. if (!isRunning.value) {
  23. return;
  24. }
  25. executedNodes.add(node.id);
  26. updateNodeData(node.id, {
  27. isRunning: true,
  28. isFinished: false,
  29. hasError: false,
  30. isCancelled: false
  31. });
  32. const delay = Math.floor(Math.random() * 2000) + 1000;
  33. return new Promise(resolve => {
  34. const timeout = setTimeout(
  35. async () => {
  36. const children = graph.value.successors(node.id);
  37. const willThrowError = Math.random() < 0.15;
  38. if (!isStart && willThrowError) {
  39. updateNodeData(node.id, { isRunning: false, hasError: true });
  40. if (toValue(cancelOnError)) {
  41. await skipDescendants(node.id);
  42. runningTasks.delete(node.id);
  43. // @ts-expect-error
  44. resolve();
  45. return;
  46. }
  47. }
  48. updateNodeData(node.id, { isRunning: false, isFinished: true });
  49. runningTasks.delete(node.id);
  50. if (children.length > 0) {
  51. await Promise.all(children.map(id => runNode({ id })));
  52. }
  53. // @ts-expect-error
  54. resolve();
  55. },
  56. isStart ? 0 : delay
  57. );
  58. runningTasks.set(node.id, timeout);
  59. });
  60. }
  61. async function run(nodes) {
  62. if (isRunning.value) {
  63. return;
  64. }
  65. reset(nodes);
  66. isRunning.value = true;
  67. const startingNodes = nodes.filter(
  68. node => graph.value.predecessors(node.id)?.length === 0
  69. );
  70. await Promise.all(startingNodes.map(node => runNode(node, true)));
  71. clear();
  72. }
  73. function reset(nodes) {
  74. clear();
  75. for (const node of nodes) {
  76. updateNodeData(node.id, {
  77. isRunning: false,
  78. isFinished: false,
  79. hasError: false,
  80. isSkipped: false,
  81. isCancelled: false
  82. });
  83. }
  84. }
  85. async function skipDescendants(nodeId) {
  86. const children = graph.value.successors(nodeId);
  87. for (const child of children) {
  88. updateNodeData(child, { isRunning: false, isSkipped: true });
  89. await skipDescendants(child);
  90. }
  91. }
  92. async function stop() {
  93. isRunning.value = false;
  94. for (const nodeId of upcomingTasks) {
  95. clearTimeout(runningTasks.get(nodeId));
  96. runningTasks.delete(nodeId);
  97. // @ts-expect-error
  98. updateNodeData(nodeId, {
  99. isRunning: false,
  100. isFinished: false,
  101. hasError: false,
  102. isSkipped: false,
  103. isCancelled: true
  104. });
  105. await skipDescendants(nodeId);
  106. }
  107. for (const [nodeId, task] of runningTasks) {
  108. clearTimeout(task);
  109. runningTasks.delete(nodeId);
  110. updateNodeData(nodeId, {
  111. isRunning: false,
  112. isFinished: false,
  113. hasError: false,
  114. isSkipped: false,
  115. isCancelled: true
  116. });
  117. await skipDescendants(nodeId);
  118. }
  119. executedNodes.clear();
  120. upcomingTasks.clear();
  121. }
  122. function clear() {
  123. isRunning.value = false;
  124. executedNodes.clear();
  125. runningTasks.clear();
  126. }
  127. return { run, stop, reset, isRunning };
  128. }
  129. async function until(condition) {
  130. return new Promise(resolve => {
  131. const interval = setInterval(() => {
  132. if (condition()) {
  133. clearInterval(interval);
  134. // @ts-expect-error
  135. resolve();
  136. }
  137. }, 100);
  138. });
  139. }