-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecute_code.py
More file actions
131 lines (112 loc) · 4.8 KB
/
Copy pathexecute_code.py
File metadata and controls
131 lines (112 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from __future__ import annotations
import os
import time
from uuid import uuid4
import requests
from agent.graph_state import State
from observability.langfuse import start_langfuse_observation
def _worker_url() -> str:
return os.getenv("MANIM_WORKER_URL", "http://localhost:8080").rstrip("/")
def _poll_interval_seconds() -> int:
return max(1, int(os.getenv("MANIM_WORKER_POLL_SECONDS", "5")))
def _max_wait_seconds() -> int:
return max(_poll_interval_seconds(), int(os.getenv("MANIM_WORKER_MAX_WAIT_SECONDS", "900")))
def execute_code(state: State) -> dict:
worker_url = _worker_url()
if not worker_url:
return {
"sandbox_error": "MANIM_WORKER_URL is not configured",
"video_url": "",
"render_failures": state.get("render_failures", 0) + 1,
}
request_id = str(uuid4())
with start_langfuse_observation(
name="execute-code",
as_type="tool",
input={"scene_name": state["scene_name"], "request_id": request_id},
metadata={"worker_url": worker_url},
) as observation:
trace_id = observation.trace_id if observation is not None else ""
parent_span_id = observation.id if observation is not None else ""
payload = {
"code": state["code"],
"scene_name": state["scene_name"],
"request_id": request_id,
"trace_id": trace_id,
"parent_span_id": parent_span_id,
}
try:
submit_response = requests.post(
f"{worker_url}/jobs",
json=payload,
timeout=30,
)
submit_response.raise_for_status()
submit_data = submit_response.json()
except requests.RequestException as exc:
if observation is not None:
observation.update(level="ERROR", status_message=str(exc))
return {
"sandbox_error": f"Failed to submit render job: {exc}",
"video_url": "",
"render_failures": state.get("render_failures", 0) + 1,
}
job_id = submit_data.get("job_id", "").strip()
if not job_id:
if observation is not None:
observation.update(level="ERROR", status_message="Render worker did not return a job_id")
return {
"sandbox_error": "Render worker did not return a job_id",
"video_url": "",
"render_failures": state.get("render_failures", 0) + 1,
}
poll_interval = _poll_interval_seconds()
deadline = time.monotonic() + _max_wait_seconds()
while time.monotonic() < deadline:
try:
status_response = requests.get(
f"{worker_url}/jobs/{job_id}",
timeout=30,
)
status_response.raise_for_status()
status_data = status_response.json()
except requests.RequestException as exc:
if observation is not None:
observation.update(level="ERROR", status_message=str(exc))
return {
"sandbox_error": f"Failed to poll render job {job_id}: {exc}",
"video_url": "",
"render_failures": state.get("render_failures", 0) + 1,
}
status = (status_data.get("status") or "").strip()
if status == "succeeded":
video_url = status_data.get("video_url", "")
if observation is not None:
observation.update(output={"job_id": job_id, "status": status, "video_url": video_url})
return {
"sandbox_error": "No error",
"video_url": video_url,
"render_failures": 0,
}
if status == "failed":
error_message = status_data.get("error", "Render job failed")
if observation is not None:
observation.update(
level="ERROR",
status_message=error_message,
output={"job_id": job_id, "status": status},
)
return {
"sandbox_error": error_message,
"video_url": "",
"render_failures": state.get("render_failures", 0) + 1,
}
time.sleep(poll_interval)
timeout_message = f"Render job {job_id} timed out after {_max_wait_seconds()} seconds"
if observation is not None:
observation.update(level="ERROR", status_message=timeout_message, output={"job_id": job_id})
return {
"sandbox_error": timeout_message,
"video_url": "",
"render_failures": state.get("render_failures", 0) + 1,
}