import { PATH_ALIAS_POLICIES } from "../../infra/path-alias-guards.js" ;
import type {
PathSafetyCheck,
PinnedSandboxDirectoryEntry,
PinnedSandboxEntry,
} from "./fs-bridge-path-safety.js" ;
import type { SandboxFsCommandPlan } from "./fs-bridge-shell-command-plans.js" ;
export const SANDBOX_PINNED_MUTATION_PYTHON_CANDIDATES = [
"/usr/bin/python3" ,
"/usr/local/bin/python3" ,
"/opt/homebrew/bin/python3" ,
"/bin/python3" ,
] as const ;
export const SANDBOX_PINNED_MUTATION_PYTHON = [
"import errno" ,
"import os" ,
"import secrets" ,
"import stat" ,
"import sys" ,
"" ,
"operation = sys.argv[1]" ,
"" ,
"DIR_FLAGS = os.O_RDONLY" ,
"if hasattr(os, 'O_DIRECTORY'):" ,
" DIR_FLAGS |= os.O_DIRECTORY" ,
"if hasattr(os, 'O_NOFOLLOW'):" ,
" DIR_FLAGS |= os.O_NOFOLLOW" ,
"" ,
"READ_FLAGS = os.O_RDONLY" ,
"if hasattr(os, 'O_NOFOLLOW'):" ,
" READ_FLAGS |= os.O_NOFOLLOW" ,
"" ,
"WRITE_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL" ,
"if hasattr(os, 'O_NOFOLLOW'):" ,
" WRITE_FLAGS |= os.O_NOFOLLOW" ,
"" ,
"def split_relative(path_value):" ,
" segments = []" ,
" for segment in path_value.split('/'):" ,
" if not segment or segment == '.':" ,
" continue" ,
" if segment == '..':" ,
" raise OSError(errno.EPERM, 'path traversal is not allowed', segment)" ,
" segments.append(segment)" ,
" return segments" ,
"" ,
"def open_dir(path_value, dir_fd=None):" ,
" return os.open(path_value, DIR_FLAGS, dir_fd=dir_fd)" ,
"" ,
"def walk_dir(root_fd, rel_path, mkdir_enabled):" ,
" current_fd = os.dup(root_fd)" ,
" try:" ,
" for segment in split_relative(rel_path):" ,
" try:" ,
" next_fd = open_dir(segment, dir_fd=current_fd)" ,
" except FileNotFoundError:" ,
" if not mkdir_enabled:" ,
" raise" ,
" os.mkdir(segment, 0o777, dir_fd=current_fd)" ,
" next_fd = open_dir(segment, dir_fd=current_fd)" ,
" os.close(current_fd)" ,
" current_fd = next_fd" ,
" return current_fd" ,
" except Exception:" ,
" os.close(current_fd)" ,
" raise" ,
"" ,
"def create_temp_file(parent_fd, basename):" ,
" prefix = '.openclaw-write-' + basename + '.'" ,
" for _ in range(128):" ,
" candidate = prefix + secrets.token_hex(6)" ,
" try:" ,
" fd = os.open(candidate, WRITE_FLAGS, 0o600, dir_fd=parent_fd)" ,
" return candidate, fd" ,
" except FileExistsError:" ,
" continue" ,
" raise RuntimeError('failed to allocate sandbox temp file')" ,
"" ,
"def create_temp_dir(parent_fd, basename, mode):" ,
" prefix = '.openclaw-move-' + basename + '.'" ,
" for _ in range(128):" ,
" candidate = prefix + secrets.token_hex(6)" ,
" try:" ,
" os.mkdir(candidate, mode, dir_fd=parent_fd)" ,
" return candidate" ,
" except FileExistsError:" ,
" continue" ,
" raise RuntimeError('failed to allocate sandbox temp directory')" ,
"" ,
"def write_atomic(parent_fd, basename, stdin_buffer):" ,
" temp_fd = None" ,
" temp_name = None" ,
" try:" ,
" temp_name, temp_fd = create_temp_file(parent_fd, basename)" ,
" while True:" ,
" chunk = stdin_buffer.read(65536)" ,
" if not chunk:" ,
" break" ,
" os.write(temp_fd, chunk)" ,
" os.fsync(temp_fd)" ,
" os.close(temp_fd)" ,
" temp_fd = None" ,
" os.replace(temp_name, basename, src_dir_fd=parent_fd, dst_dir_fd=parent_fd)" ,
" temp_name = None" ,
" os.fsync(parent_fd)" ,
" finally:" ,
" if temp_fd is not None:" ,
" os.close(temp_fd)" ,
" if temp_name is not None:" ,
" try:" ,
" os.unlink(temp_name, dir_fd=parent_fd)" ,
" except FileNotFoundError:" ,
" pass" ,
"" ,
"def read_file(parent_fd, basename):" ,
" file_fd = os.open(basename, READ_FLAGS, dir_fd=parent_fd)" ,
" try:" ,
" file_stat = os.fstat(file_fd)" ,
" if not stat.S_ISREG(file_stat.st_mode):" ,
" raise OSError(errno.EPERM, 'only regular files are allowed', basename)" ,
" if file_stat.st_nlink > 1:" ,
" raise OSError(errno.EPERM, 'hardlinked file is not allowed', basename)" ,
" while True:" ,
" chunk = os.read(file_fd, 65536)" ,
" if not chunk:" ,
" break" ,
" os.write(1, chunk)" ,
" finally:" ,
" os.close(file_fd)" ,
"" ,
"def remove_tree(parent_fd, basename):" ,
" entry_stat = os.lstat(basename, dir_fd=parent_fd)" ,
" if not stat.S_ISDIR(entry_stat.st_mode) or stat.S_ISLNK(entry_stat.st_mode):" ,
" os.unlink(basename, dir_fd=parent_fd)" ,
" return" ,
" dir_fd = open_dir(basename, dir_fd=parent_fd)" ,
" try:" ,
" for child in os.listdir(dir_fd):" ,
" remove_tree(dir_fd, child)" ,
" finally:" ,
" os.close(dir_fd)" ,
" os.rmdir(basename, dir_fd=parent_fd)" ,
"" ,
"def move_entry(src_parent_fd, src_basename, dst_parent_fd, dst_basename):" ,
" try:" ,
" os.rename(src_basename, dst_basename, src_dir_fd=src_parent_fd, dst_dir_fd=dst_parent_fd)" ,
" os.fsync(dst_parent_fd)" ,
" os.fsync(src_parent_fd)" ,
" return" ,
" except OSError as err:" ,
" if err.errno != errno.EXDEV:" ,
" raise" ,
" src_stat = os.lstat(src_basename, dir_fd=src_parent_fd)" ,
" if stat.S_ISDIR(src_stat.st_mode) and not stat.S_ISLNK(src_stat.st_mode):" ,
" temp_dir_name = create_temp_dir(dst_parent_fd, dst_basename, stat.S_IMODE(src_stat.st_mode) or 0o755)" ,
" temp_dir_fd = open_dir(temp_dir_name, dir_fd=dst_parent_fd)" ,
" src_dir_fd = open_dir(src_basename, dir_fd=src_parent_fd)" ,
" try:" ,
" for child in os.listdir(src_dir_fd):" ,
" move_entry(src_dir_fd, child, temp_dir_fd, child)" ,
" finally:" ,
" os.close(src_dir_fd)" ,
" os.close(temp_dir_fd)" ,
" os.rename(temp_dir_name, dst_basename, src_dir_fd=dst_parent_fd, dst_dir_fd=dst_parent_fd)" ,
" os.rmdir(src_basename, dir_fd=src_parent_fd)" ,
" os.fsync(dst_parent_fd)" ,
" os.fsync(src_parent_fd)" ,
" return" ,
" if stat.S_ISLNK(src_stat.st_mode):" ,
" link_target = os.readlink(src_basename, dir_fd=src_parent_fd)" ,
" try:" ,
" os.unlink(dst_basename, dir_fd=dst_parent_fd)" ,
" except FileNotFoundError:" ,
" pass" ,
" os.symlink(link_target, dst_basename, dir_fd=dst_parent_fd)" ,
" os.unlink(src_basename, dir_fd=src_parent_fd)" ,
" os.fsync(dst_parent_fd)" ,
" os.fsync(src_parent_fd)" ,
" return" ,
" src_fd = os.open(src_basename, READ_FLAGS, dir_fd=src_parent_fd)" ,
" temp_fd = None" ,
" temp_name = None" ,
" try:" ,
" src_file_stat = os.fstat(src_fd)" ,
" if not stat.S_ISREG(src_file_stat.st_mode):" ,
" raise OSError(errno.EPERM, 'only regular files are allowed', src_basename)" ,
" if src_file_stat.st_nlink > 1:" ,
" raise OSError(errno.EPERM, 'hardlinked file is not allowed', src_basename)" ,
" temp_name, temp_fd = create_temp_file(dst_parent_fd, dst_basename)" ,
" while True:" ,
" chunk = os.read(src_fd, 65536)" ,
" if not chunk:" ,
" break" ,
" os.write(temp_fd, chunk)" ,
" try:" ,
" os.fchmod(temp_fd, stat.S_IMODE(src_stat.st_mode))" ,
" except AttributeError:" ,
" pass" ,
" os.fsync(temp_fd)" ,
" os.close(temp_fd)" ,
" temp_fd = None" ,
" os.replace(temp_name, dst_basename, src_dir_fd=dst_parent_fd, dst_dir_fd=dst_parent_fd)" ,
" temp_name = None" ,
" os.unlink(src_basename, dir_fd=src_parent_fd)" ,
" os.fsync(dst_parent_fd)" ,
" os.fsync(src_parent_fd)" ,
" finally:" ,
" if temp_fd is not None:" ,
" os.close(temp_fd)" ,
" if temp_name is not None:" ,
" try:" ,
" os.unlink(temp_name, dir_fd=dst_parent_fd)" ,
" except FileNotFoundError:" ,
" pass" ,
" os.close(src_fd)" ,
"" ,
"if operation == 'write':" ,
" root_fd = open_dir(sys.argv[2])" ,
" parent_fd = None" ,
" try:" ,
" parent_fd = walk_dir(root_fd, sys.argv[3], sys.argv[5] == '1')" ,
" write_atomic(parent_fd, sys.argv[4], sys.stdin.buffer)" ,
" finally:" ,
" if parent_fd is not None:" ,
" os.close(parent_fd)" ,
" os.close(root_fd)" ,
"elif operation == 'read':" ,
" root_fd = open_dir(sys.argv[2])" ,
" parent_fd = None" ,
" try:" ,
" parent_fd = walk_dir(root_fd, sys.argv[3], False)" ,
" read_file(parent_fd, sys.argv[4])" ,
" finally:" ,
" if parent_fd is not None:" ,
" os.close(parent_fd)" ,
" os.close(root_fd)" ,
"elif operation == 'mkdirp':" ,
" root_fd = open_dir(sys.argv[2])" ,
" target_fd = None" ,
" try:" ,
" target_fd = walk_dir(root_fd, sys.argv[3], True)" ,
" os.fsync(target_fd)" ,
" finally:" ,
" if target_fd is not None:" ,
" os.close(target_fd)" ,
" os.close(root_fd)" ,
"elif operation == 'remove':" ,
" root_fd = open_dir(sys.argv[2])" ,
" parent_fd = None" ,
" try:" ,
" parent_fd = walk_dir(root_fd, sys.argv[3], False)" ,
" try:" ,
" if sys.argv[5] == '1':" ,
" remove_tree(parent_fd, sys.argv[4])" ,
" else:" ,
" entry_stat = os.lstat(sys.argv[4], dir_fd=parent_fd)" ,
" if stat.S_ISDIR(entry_stat.st_mode) and not stat.S_ISLNK(entry_stat.st_mode):" ,
" os.rmdir(sys.argv[4], dir_fd=parent_fd)" ,
" else:" ,
" os.unlink(sys.argv[4], dir_fd=parent_fd)" ,
" os.fsync(parent_fd)" ,
" except FileNotFoundError:" ,
" if sys.argv[6] != '1':" ,
" raise" ,
" finally:" ,
" if parent_fd is not None:" ,
" os.close(parent_fd)" ,
" os.close(root_fd)" ,
"elif operation == 'rename':" ,
" src_root_fd = open_dir(sys.argv[2])" ,
" dst_root_fd = open_dir(sys.argv[5])" ,
" src_parent_fd = None" ,
" dst_parent_fd = None" ,
" try:" ,
" src_parent_fd = walk_dir(src_root_fd, sys.argv[3], False)" ,
" dst_parent_fd = walk_dir(dst_root_fd, sys.argv[6], sys.argv[8] == '1')" ,
" move_entry(src_parent_fd, sys.argv[4], dst_parent_fd, sys.argv[7])" ,
" finally:" ,
" if src_parent_fd is not None:" ,
" os.close(src_parent_fd)" ,
" if dst_parent_fd is not None:" ,
" os.close(dst_parent_fd)" ,
" os.close(src_root_fd)" ,
" os.close(dst_root_fd)" ,
"else:" ,
" raise RuntimeError('unknown sandbox mutation operation: ' + operation)" ,
].join("\n" );
const SANDBOX_PINNED_MUTATION_PYTHON_SHELL_LITERAL = `'${SANDBOX_PINNED_MUTATION_PYTHON.replaceAll("' ", `'\\''`)}'`;
function buildPinnedMutationPlan(params: {
args: string[];
checks: PathSafetyCheck[];
}): SandboxFsCommandPlan {
return {
checks: params.checks,
recheckBeforeCommand: true ,
// Feed the helper source over fd 3 so stdin stays available for write payload bytes.
script: [
"set -eu" ,
"python_cmd=''" ,
...SANDBOX_PINNED_MUTATION_PYTHON_CANDIDATES.map(
(candidate) =>
`if [ -z "$python_cmd" ] && [ -x '${candidate}' ]; then python_cmd='${candidate}' ; fi`,
),
'if [ -z "$python_cmd" ]; then python_cmd=$(command -v python3 2>/dev/null || command -v python 2>/dev/null || true); fi' ,
'if [ -z "$python_cmd" ]; then' ,
" echo >&2 'sandbox pinned mutation helper requires python3 or python'" ,
" exit 127" ,
"fi" ,
`python_script=${SANDBOX_PINNED_MUTATION_PYTHON_SHELL_LITERAL}`,
'exec "$python_cmd" -c "$python_script" "$@"' ,
].join("\n" ),
args: params.args,
};
}
export function buildPinnedWritePlan(params: {
check: PathSafetyCheck;
pinned: PinnedSandboxEntry;
mkdir: boolean ;
}): SandboxFsCommandPlan {
return buildPinnedMutationPlan({
checks: [params.check],
args: [
"write" ,
params.pinned.mountRootPath,
params.pinned.relativeParentPath,
params.pinned.basename,
params.mkdir ? "1" : "0" ,
],
});
}
export function buildPinnedMkdirpPlan(params: {
check: PathSafetyCheck;
pinned: PinnedSandboxDirectoryEntry;
}): SandboxFsCommandPlan {
return buildPinnedMutationPlan({
checks: [params.check],
args: ["mkdirp" , params.pinned.mountRootPath, params.pinned.relativePath],
});
}
export function buildPinnedRemovePlan(params: {
check: PathSafetyCheck;
pinned: PinnedSandboxEntry;
recursive?: boolean ;
force?: boolean ;
}): SandboxFsCommandPlan {
return buildPinnedMutationPlan({
checks: [
{
target: params.check.target,
options: {
...params.check.options,
aliasPolicy: PATH_ALIAS_POLICIES.unlinkTarget,
},
},
],
args: [
"remove" ,
params.pinned.mountRootPath,
params.pinned.relativeParentPath,
params.pinned.basename,
params.recursive ? "1" : "0" ,
params.force === false ? "0" : "1" ,
],
});
}
export function buildPinnedRenamePlan(params: {
fromCheck: PathSafetyCheck;
toCheck: PathSafetyCheck;
from: PinnedSandboxEntry;
to: PinnedSandboxEntry;
}): SandboxFsCommandPlan {
return buildPinnedMutationPlan({
checks: [
{
target: params.fromCheck.target,
options: {
...params.fromCheck.options,
aliasPolicy: PATH_ALIAS_POLICIES.unlinkTarget,
},
},
params.toCheck,
],
args: [
"rename" ,
params.from.mountRootPath,
params.from.relativeParentPath,
params.from.basename,
params.to.mountRootPath,
params.to.relativeParentPath,
params.to.basename,
"1" ,
],
});
}
Messung V0.5 in Prozent C=99 H=99 G=98
¤ Dauer der Verarbeitung: 0.4 Sekunden
¤
*© Formatika GbR, Deutschland