Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import importlib.util
import os
import re
import subprocess
from types import ModuleType
from typing import Any, ClassVar, TypedDict
Expand Down Expand Up @@ -80,6 +81,23 @@ class SandboxPython:
"vars",
"help",
"dir",
"getattr",
"setattr",
"delattr",
"type",
"breakpoint",
}

BLOCKED_ATTRS: ClassVar[set[str]] = {
"__class__",
"__bases__",
"__subclasses__",
"__mro__",
"__globals__",
"__code__",
"__reduce__",
"__reduce_ex__",
"__builtins__",
}

@staticmethod
Expand Down Expand Up @@ -126,6 +144,22 @@ def safe_builtins() -> dict[str, Any]:
safe_builtins["__import__"] = SandboxPython.restricted_import
return safe_builtins

@staticmethod
def _check_for_blocked_attrs(code: str) -> None:
"""Checks if code contains any blocked attribute access patterns.

Args:
code: The Python code to check.

Raises:
SecurityError: If blocked attribute patterns are found.
"""
for attr in SandboxPython.BLOCKED_ATTRS:
if re.search(r'\b' + re.escape(attr) + r'\b', code):
raise RuntimeError(
f"Access to '{attr}' is not allowed in the sandbox."
)

@staticmethod
def exec(code: str, locals_: dict[str, Any]) -> None:
"""Executes Python code in a restricted environment.
Expand All @@ -134,6 +168,7 @@ def exec(code: str, locals_: dict[str, Any]) -> None:
code: The Python code to execute as a string.
locals_: A dictionary that will be used for local variable storage.
"""
SandboxPython._check_for_blocked_attrs(code)
exec(code, {"__builtins__": SandboxPython.safe_builtins()}, locals_) # noqa: S102


Expand Down Expand Up @@ -380,7 +415,11 @@ def run_code_unsafe(code: str, libraries_used: list[str]) -> str:
Printer.print("WARNING: Running code in unsafe mode", color="bold_magenta")
# Install libraries on the host machine
for library in libraries_used:
os.system(f"pip install {library}") # noqa: S605
subprocess.run( # noqa: S603
["pip", "install", library], # noqa: S607
check=True,
capture_output=True,
)

# Execute the code
try:
Expand Down
227 changes: 226 additions & 1 deletion lib/crewai-tools/tests/tools/test_code_interpreter_tool.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest.mock import patch
from unittest.mock import patch, MagicMock

from crewai_tools.tools.code_interpreter_tool.code_interpreter_tool import (
CodeInterpreterTool,
Expand Down Expand Up @@ -172,3 +172,228 @@ def test_unsafe_mode_running_unsafe_code(printer_mock, docker_unavailable_mock):
"WARNING: Running code in unsafe mode", color="bold_magenta"
)
assert 5.0 == result


class TestCommandInjectionPrevention:
"""Tests for command injection prevention in unsafe mode (CVE: CWE-78)."""

@patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run"
)
def test_unsafe_mode_uses_subprocess_run_with_list_args(
self, subprocess_mock, printer_mock
):
"""Verify libraries are installed via subprocess.run with list args, not os.system."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = "result = 'done'"
tool.run_code_unsafe(code, ["numpy"])
subprocess_mock.assert_called_once_with(
["pip", "install", "numpy"],
check=True,
capture_output=True,
)

@patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run"
)
def test_unsafe_mode_library_with_shell_metacharacters(
self, subprocess_mock, printer_mock
):
"""Ensure shell metacharacters in library names are not interpreted."""
tool = CodeInterpreterTool(unsafe_mode=True)
malicious_lib = "numpy; id #"
code = "result = 'done'"
tool.run_code_unsafe(code, [malicious_lib])
subprocess_mock.assert_called_once_with(
["pip", "install", "numpy; id #"],
check=True,
capture_output=True,
)

@patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run"
)
def test_unsafe_mode_library_with_command_substitution(
self, subprocess_mock, printer_mock
):
"""Ensure command substitution in library names is not executed."""
tool = CodeInterpreterTool(unsafe_mode=True)
malicious_lib = "numpy && rm -rf /"
code = "result = 'done'"
tool.run_code_unsafe(code, [malicious_lib])
subprocess_mock.assert_called_once_with(
["pip", "install", "numpy && rm -rf /"],
check=True,
capture_output=True,
)

@patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run"
)
def test_unsafe_mode_library_with_backtick_injection(
self, subprocess_mock, printer_mock
):
"""Ensure backtick command injection is not executed."""
tool = CodeInterpreterTool(unsafe_mode=True)
malicious_lib = "numpy`whoami`"
code = "result = 'done'"
tool.run_code_unsafe(code, [malicious_lib])
subprocess_mock.assert_called_once_with(
["pip", "install", "numpy`whoami`"],
check=True,
capture_output=True,
)

@patch(
"crewai_tools.tools.code_interpreter_tool.code_interpreter_tool.subprocess.run"
)
def test_unsafe_mode_multiple_libraries_installed_separately(
self, subprocess_mock, printer_mock
):
"""Each library is installed in a separate subprocess call."""
tool = CodeInterpreterTool(unsafe_mode=True)
code = "result = 'done'"
tool.run_code_unsafe(code, ["numpy", "pandas"])
assert subprocess_mock.call_count == 2


class TestSandboxEscapePrevention:
"""Tests for sandbox escape prevention via object introspection (CVE: CWE-94)."""

def test_sandbox_blocks_class_attribute(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via __class__ introspection."""
tool = CodeInterpreterTool()
code = """
result = ().__class__
"""
result = tool.run(code=code, libraries_used=[])
assert "Access to '__class__' is not allowed" in result

def test_sandbox_blocks_bases_attribute(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via __bases__ introspection."""
tool = CodeInterpreterTool()
code = """
result = object.__bases__
"""
result = tool.run(code=code, libraries_used=[])
assert "Access to '__bases__' is not allowed" in result

def test_sandbox_blocks_subclasses_method(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via __subclasses__ introspection."""
tool = CodeInterpreterTool()
code = """
result = object.__subclasses__()
"""
result = tool.run(code=code, libraries_used=[])
assert "Access to '__subclasses__' is not allowed" in result

def test_sandbox_blocks_mro_attribute(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via __mro__ introspection."""
tool = CodeInterpreterTool()
code = """
result = int.__mro__
"""
result = tool.run(code=code, libraries_used=[])
assert "Access to '__mro__' is not allowed" in result

def test_sandbox_blocks_globals_attribute(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via __globals__ introspection."""
tool = CodeInterpreterTool()
code = """
def f(): pass
result = f.__globals__
"""
result = tool.run(code=code, libraries_used=[])
assert "Access to '__globals__' is not allowed" in result

def test_sandbox_blocks_builtins_attribute(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via __builtins__ access."""
tool = CodeInterpreterTool()
code = """
result = __builtins__
"""
result = tool.run(code=code, libraries_used=[])
assert "Access to '__builtins__' is not allowed" in result

def test_sandbox_blocks_full_introspection_chain(
self, printer_mock, docker_unavailable_mock
):
"""Prevent the full sandbox escape PoC from the issue."""
tool = CodeInterpreterTool()
code = """
for c in ().__class__.__bases__[0].__subclasses__():
if c.__name__ == 'BuiltinImporter':
result = c.load_module('os').system('id')
break
"""
result = tool.run(code=code, libraries_used=[])
assert "is not allowed" in result

def test_sandbox_blocks_getattr_builtin(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via getattr builtin."""
tool = CodeInterpreterTool()
code = """
result = getattr(object, '__subclasses__')()
"""
result = tool.run(code=code, libraries_used=[])
assert "An error occurred" in result

def test_sandbox_blocks_type_builtin(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via type builtin."""
tool = CodeInterpreterTool()
code = """
result = type('X', (object,), {})
"""
result = tool.run(code=code, libraries_used=[])
assert "An error occurred" in result

def test_sandbox_blocks_code_attribute(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via __code__ attribute."""
tool = CodeInterpreterTool()
code = """
def f(): pass
result = f.__code__
"""
result = tool.run(code=code, libraries_used=[])
assert "Access to '__code__' is not allowed" in result

def test_sandbox_allows_normal_code(
self, printer_mock, docker_unavailable_mock
):
"""Ensure normal code still works in sandbox."""
tool = CodeInterpreterTool()
code = """
data = [1, 2, 3, 4, 5]
result = sum(data) * 2
"""
result = tool.run(code=code, libraries_used=[])
assert result == 30

def test_sandbox_blocks_reduce_attribute(
self, printer_mock, docker_unavailable_mock
):
"""Prevent sandbox escape via __reduce__ for pickle attacks."""
tool = CodeInterpreterTool()
code = """
result = [].__reduce__()
"""
result = tool.run(code=code, libraries_used=[])
assert "Access to '__reduce__' is not allowed" in result
23 changes: 0 additions & 23 deletions lib/crewai-tools/tool.specs.json
Original file line number Diff line number Diff line change
Expand Up @@ -20117,18 +20117,6 @@
"humanized_name": "Web Automation Tool",
"init_params_schema": {
"$defs": {
"AvailableModel": {
"enum": [
"gpt-4o",
"gpt-4o-mini",
"claude-3-5-sonnet-latest",
"claude-3-7-sonnet-latest",
"computer-use-preview",
"gemini-2.0-flash"
],
"title": "AvailableModel",
"type": "string"
},
"EnvVar": {
"properties": {
"default": {
Expand Down Expand Up @@ -20206,17 +20194,6 @@
"default": null,
"title": "Model Api Key"
},
"model_name": {
"anyOf": [
{
"$ref": "#/$defs/AvailableModel"
},
{
"type": "null"
}
],
"default": "claude-3-7-sonnet-latest"
},
"project_id": {
"anyOf": [
{
Expand Down