-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Expand file tree
/
Copy pathrunner.py
More file actions
346 lines (312 loc) · 12.1 KB
/
Copy pathrunner.py
File metadata and controls
346 lines (312 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
"""Top-level Strix scan runner."""
from __future__ import annotations
import contextlib
import json
import logging
import uuid
from collections.abc import Callable
from typing import TYPE_CHECKING, Any
from agents import RunConfig
from agents.sandbox import SandboxRunConfig
from openai import RateLimitError
from strix.agents.factory import build_strix_agent, make_child_factory
from strix.config import load_settings
from strix.config.models import (
StrixProvider,
configure_sdk_model_defaults,
uses_chat_completions_tool_schema,
)
from strix.core.agents import AgentCoordinator
from strix.core.execution import (
respawn_subagents,
run_agent_loop,
)
from strix.core.execution import (
spawn_child_agent as start_child_agent,
)
from strix.core.hooks import BudgetExceededError, ReportUsageHooks
from strix.core.inputs import (
DEFAULT_MAX_TURNS,
build_root_task,
build_scope_context,
make_model_settings,
)
from strix.core.paths import run_dir_for, runtime_state_dir
from strix.core.sessions import open_agent_session
from strix.runtime import session_manager
from strix.telemetry.logging import set_scan_id, setup_scan_logging
if TYPE_CHECKING:
from agents.memory import SQLiteSession
from agents.result import RunResultBase
logger = logging.getLogger(__name__)
StreamEventSink = Callable[[str, Any], None]
SetupScriptEventSink = Callable[[dict[str, Any]], None]
async def run_strix_scan(
*,
scan_config: dict[str, Any],
scan_id: str | None = None,
image: str,
local_sources: list[dict[str, Any]] | None = None,
coordinator: AgentCoordinator | None = None,
interactive: bool = False,
max_turns: int = DEFAULT_MAX_TURNS,
max_budget_usd: float | None = None,
model: str | None = None,
cleanup_on_exit: bool = True,
event_sink: StreamEventSink | None = None,
setup_script_event_sink: SetupScriptEventSink | None = None,
) -> RunResultBase | None:
"""Run or resume one Strix scan against a sandbox."""
if scan_id is None:
scan_id = f"scan-{uuid.uuid4().hex[:8]}"
run_dir = run_dir_for(scan_id)
run_dir.mkdir(parents=True, exist_ok=True)
state_dir = runtime_state_dir(run_dir)
state_dir.mkdir(parents=True, exist_ok=True)
teardown_logging = setup_scan_logging(run_dir)
set_scan_id(scan_id)
agents_path = state_dir / "agents.json"
agents_db = state_dir / "agents.db"
is_resume = agents_path.exists()
logger.info(
"%s Strix scan %s (image=%s, max_turns=%d, interactive=%s, run_dir=%s)",
"Resuming" if is_resume else "Starting",
scan_id,
image,
max_turns,
interactive,
run_dir,
)
settings = load_settings()
configure_sdk_model_defaults(settings)
resolved_model = (model or settings.llm.model or "").strip()
if not resolved_model:
raise RuntimeError(
"No LLM model configured. Set STRIX_LLM env or pass model= to run_strix_scan().",
)
logger.info("LLM model resolved: %s", resolved_model)
chat_completions_tools = uses_chat_completions_tool_schema(resolved_model, settings)
if coordinator is None:
coordinator = AgentCoordinator()
coordinator.set_snapshot_path(agents_path)
from strix.tools.notes.tools import hydrate_notes_from_disk
from strix.tools.todo.tools import hydrate_todos_from_disk
hydrate_todos_from_disk(state_dir)
hydrate_notes_from_disk(state_dir)
root_id: str | None = None
if is_resume:
try:
snap = json.loads(agents_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
raise RuntimeError(
f"Cannot resume scan {scan_id}: agents.json is unreadable: {exc}",
) from exc
if not agents_db.exists():
raise RuntimeError(
f"Cannot resume scan {scan_id}: missing SDK session database at {agents_db}",
)
await coordinator.restore(snap)
for aid, parent in coordinator.parent_of.items():
if parent is None:
root_id = aid
break
if root_id is None:
raise RuntimeError(
f"Cannot resume scan {scan_id}: agents.json has no root agent (parent=None)",
)
logger.info(
"Resume: restored coordinator with %d agent(s); root=%s",
len(coordinator.statuses),
root_id,
)
else:
root_id = uuid.uuid4().hex[:8]
logger.info("Bringing up sandbox session for scan %s", scan_id)
bundle = await session_manager.create_or_reuse(
scan_id,
image=image,
local_sources=local_sources or [],
setup_script=scan_config.get("setup_script"),
setup_script_event_sink=setup_script_event_sink,
)
logger.info("Sandbox ready for scan %s", scan_id)
sessions_to_close: list[SQLiteSession] = []
try:
targets = scan_config.get("targets") or []
scan_mode = str(scan_config.get("scan_mode") or "deep")
is_whitebox = any(t.get("type") == "local_code" for t in targets)
skills = list(scan_config.get("skills") or [])
root_task = build_root_task(scan_config)
model_settings = make_model_settings(
settings.llm.reasoning_effort,
model_name=resolved_model,
)
run_config = RunConfig(
model=resolved_model,
model_provider=StrixProvider(),
model_settings=model_settings,
sandbox=SandboxRunConfig(client=bundle["client"], session=bundle["session"]),
trace_include_sensitive_data=False,
)
hooks = ReportUsageHooks(model=resolved_model, max_budget_usd=max_budget_usd)
scope_context = build_scope_context(scan_config)
root_agent = build_strix_agent(
name="strix",
skills=skills,
is_root=True,
scan_mode=scan_mode,
is_whitebox=is_whitebox,
interactive=interactive,
chat_completions_tools=chat_completions_tools,
system_prompt_context=scope_context,
)
if not is_resume:
await coordinator.register(
root_id,
"strix",
parent_id=None,
task=root_task,
skills=skills,
)
child_agent_builder = make_child_factory(
scan_mode=scan_mode,
is_whitebox=is_whitebox,
interactive=interactive,
chat_completions_tools=chat_completions_tools,
system_prompt_context=scope_context,
)
async def spawn_child_agent(**kwargs: Any) -> dict[str, Any]:
return await start_child_agent(
coordinator=coordinator,
factory=child_agent_builder,
agents_db_path=agents_db,
sessions_to_close=sessions_to_close,
run_config=run_config,
max_turns=max_turns,
interactive=interactive,
event_sink=event_sink,
hooks=hooks,
**kwargs,
)
context: dict[str, Any] = {
"coordinator": coordinator,
"sandbox_session": bundle["session"],
"caido_client": bundle["caido_client"],
"agent_id": root_id,
"parent_id": None,
"interactive": interactive,
"spawn_child_agent": spawn_child_agent,
}
root_session = open_agent_session(root_id, agents_db)
sessions_to_close.append(root_session)
await coordinator.attach_runtime(root_id, session=root_session)
if is_resume:
await respawn_subagents(
coordinator=coordinator,
factory=child_agent_builder,
agents_db_path=agents_db,
sessions_to_close=sessions_to_close,
run_config=run_config,
max_turns=max_turns,
interactive=interactive,
parent_ctx=context,
root_id=root_id,
event_sink=event_sink,
hooks=hooks,
)
initial_input: Any = [] if is_resume else root_task
# Resume + new ``--instruction``: SDK replay drives root from
# agents.db with ``initial_input=[]``, so a brand-new instruction
# passed on the resume CLI would otherwise be silently ignored.
# Inject it as a fresh user message in root's SDK session; the
# next run cycle will replay it with the rest of the session.
resume_instruction = str(scan_config.get("resume_instruction") or "").strip()
if is_resume and resume_instruction:
await coordinator.send(
root_id,
{
"from": "user",
"type": "instruction",
"priority": "high",
"content": resume_instruction,
},
)
logger.info(
"Resume: injected new instruction into root SDK session (len=%d)",
len(resume_instruction),
)
async with coordinator._lock:
root_status = coordinator.statuses.get(root_id)
result = await run_agent_loop(
agent=root_agent,
initial_input=initial_input,
run_config=run_config,
context=context,
max_turns=max_turns,
coordinator=coordinator,
agent_id=root_id,
interactive=interactive,
session=root_session,
start_parked=bool(interactive and is_resume and root_status != "running"),
event_sink=event_sink,
hooks=hooks,
)
if not interactive and result is not None:
final = getattr(result, "final_output", None)
scan_completed = False
if isinstance(final, str):
try:
parsed = json.loads(final)
scan_completed = bool(isinstance(parsed, dict) and parsed.get("scan_completed"))
except (ValueError, TypeError):
scan_completed = False
elif isinstance(final, dict):
scan_completed = bool(final.get("scan_completed"))
if not scan_completed:
logger.error(
"Scan %s ended without calling finish_scan. The agent "
"emitted a text-only turn instead of a lifecycle tool call, "
"so no executive report was written. Final output (first "
"300 chars): %r",
scan_id,
str(final)[:300],
)
return result # noqa: TRY300
except BudgetExceededError as exc:
logger.info("Scan %s stopped: %s", scan_id, exc)
if root_id is not None:
await coordinator.cancel_descendants(root_id)
with contextlib.suppress(Exception):
await coordinator.set_status(root_id, "stopped")
return None
except RateLimitError as exc:
logger.warning(
"Scan %s stopped: persistent rate limit from the LLM provider (%s). "
"Resume with 'strix --resume %s' once the limit clears.",
scan_id,
exc,
scan_id,
)
if root_id is not None:
await coordinator.cancel_descendants(root_id)
with contextlib.suppress(Exception):
await coordinator.set_status(root_id, "stopped")
return None
except BaseException:
logger.exception("Strix scan %s failed", scan_id)
if root_id is not None:
await coordinator.cancel_descendants(root_id)
with contextlib.suppress(Exception):
await coordinator.set_status(root_id, "failed")
raise
finally:
for s in sessions_to_close:
with contextlib.suppress(Exception):
s.close()
with contextlib.suppress(Exception):
await coordinator._maybe_snapshot()
if cleanup_on_exit:
logger.info("Tearing down sandbox session for scan %s", scan_id)
await session_manager.cleanup(scan_id)
logger.info("Strix scan %s done", scan_id)
teardown_logging()