Quellcodebibliothek Statistik Leitseite products/Sources/formale Sprachen/Java/Openclaw/src/agents/sandbox/   (KI Agentensystem Version 22©)  Datei vom 26.3.2026 mit Größe 14 kB image not shown  

Quelle  fs-bridge-mutation-helper.ts

  Sprache: JAVA
 

Spracherkennung für: .ts vermutete Sprache: Unknown {[0] [0] [0]} [Methode: Schwerpunktbildung, einfache Gewichte, sechs Dimensionen]

import { PATH_ALIAS_POLICIES } from "../../infra/path-alias-guards.js";
import type {
  PathSafetyCheck,
  PinnedSandboxDirectoryEntry,
  PinnedSandboxEntry,
} from "./fs-bridge-path-safety.js";
import type { SandboxFsCommandPlan } from "./fs-bridge-shell-command-plans.js";

export const SANDBOX_PINNED_MUTATION_PYTHON_CANDIDATES = [
  "/usr/bin/python3",
  "/usr/local/bin/python3",
  "/opt/homebrew/bin/python3",
  "/bin/python3",
] as const;

export const SANDBOX_PINNED_MUTATION_PYTHON = [
  "import errno",
  "import os",
  "import secrets",
  "import stat",
  "import sys",
  "",
  "operation = sys.argv[1]",
  "",
  "DIR_FLAGS = os.O_RDONLY",
  "if hasattr(os, 'O_DIRECTORY'):",
  "    DIR_FLAGS |= os.O_DIRECTORY",
  "if hasattr(os, 'O_NOFOLLOW'):",
  "    DIR_FLAGS |= os.O_NOFOLLOW",
  "",
  "READ_FLAGS = os.O_RDONLY",
  "if hasattr(os, 'O_NOFOLLOW'):",
  "    READ_FLAGS |= os.O_NOFOLLOW",
  "",
  "WRITE_FLAGS = os.O_WRONLY | os.O_CREAT | os.O_EXCL",
  "if hasattr(os, 'O_NOFOLLOW'):",
  "    WRITE_FLAGS |= os.O_NOFOLLOW",
  "",
  "def split_relative(path_value):",
  "    segments = []",
  "    for segment in path_value.split('/'):",
  "        if not segment or segment == '.':",
  "            continue",
  "        if segment == '..':",
  "            raise OSError(errno.EPERM, 'path traversal is not allowed', segment)",
  "        segments.append(segment)",
  "    return segments",
  "",
  "def open_dir(path_value, dir_fd=None):",
  "    return os.open(path_value, DIR_FLAGS, dir_fd=dir_fd)",
  "",
  "def walk_dir(root_fd, rel_path, mkdir_enabled):",
  "    current_fd = os.dup(root_fd)",
  "    try:",
  "        for segment in split_relative(rel_path):",
  "            try:",
  "                next_fd = open_dir(segment, dir_fd=current_fd)",
  "            except FileNotFoundError:",
  "                if not mkdir_enabled:",
  "                    raise",
  "                os.mkdir(segment, 0o777, dir_fd=current_fd)",
  "                next_fd = open_dir(segment, dir_fd=current_fd)",
  "            os.close(current_fd)",
  "            current_fd = next_fd",
  "        return current_fd",
  "    except Exception:",
  "        os.close(current_fd)",
  "        raise",
  "",
  "def create_temp_file(parent_fd, basename):",
  "    prefix = '.openclaw-write-' + basename + '.'",
  "    for _ in range(128):",
  "        candidate = prefix + secrets.token_hex(6)",
  "        try:",
  "            fd = os.open(candidate, WRITE_FLAGS, 0o600, dir_fd=parent_fd)",
  "            return candidate, fd",
  "        except FileExistsError:",
  "            continue",
  "    raise RuntimeError('failed to allocate sandbox temp file')",
  "",
  "def create_temp_dir(parent_fd, basename, mode):",
  "    prefix = '.openclaw-move-' + basename + '.'",
  "    for _ in range(128):",
  "        candidate = prefix + secrets.token_hex(6)",
  "        try:",
  "            os.mkdir(candidate, mode, dir_fd=parent_fd)",
  "            return candidate",
  "        except FileExistsError:",
  "            continue",
  "    raise RuntimeError('failed to allocate sandbox temp directory')",
  "",
  "def write_atomic(parent_fd, basename, stdin_buffer):",
  "    temp_fd = None",
  "    temp_name = None",
  "    try:",
  "        temp_name, temp_fd = create_temp_file(parent_fd, basename)",
  "        while True:",
  "            chunk = stdin_buffer.read(65536)",
  "            if not chunk:",
  "                break",
  "            os.write(temp_fd, chunk)",
  "        os.fsync(temp_fd)",
  "        os.close(temp_fd)",
  "        temp_fd = None",
  "        os.replace(temp_name, basename, src_dir_fd=parent_fd, dst_dir_fd=parent_fd)",
  "        temp_name = None",
  "        os.fsync(parent_fd)",
  "    finally:",
  "        if temp_fd is not None:",
  "            os.close(temp_fd)",
  "        if temp_name is not None:",
  "            try:",
  "                os.unlink(temp_name, dir_fd=parent_fd)",
  "            except FileNotFoundError:",
  "                pass",
  "",
  "def read_file(parent_fd, basename):",
  "    file_fd = os.open(basename, READ_FLAGS, dir_fd=parent_fd)",
  "    try:",
  "        file_stat = os.fstat(file_fd)",
  "        if not stat.S_ISREG(file_stat.st_mode):",
  "            raise OSError(errno.EPERM, 'only regular files are allowed', basename)",
  "        if file_stat.st_nlink > 1:",
  "            raise OSError(errno.EPERM, 'hardlinked file is not allowed', basename)",
  "        while True:",
  "            chunk = os.read(file_fd, 65536)",
  "            if not chunk:",
  "                break",
  "            os.write(1, chunk)",
  "    finally:",
  "        os.close(file_fd)",
  "",
  "def remove_tree(parent_fd, basename):",
  "    entry_stat = os.lstat(basename, dir_fd=parent_fd)",
  "    if not stat.S_ISDIR(entry_stat.st_mode) or stat.S_ISLNK(entry_stat.st_mode):",
  "        os.unlink(basename, dir_fd=parent_fd)",
  "        return",
  "    dir_fd = open_dir(basename, dir_fd=parent_fd)",
  "    try:",
  "        for child in os.listdir(dir_fd):",
  "            remove_tree(dir_fd, child)",
  "    finally:",
  "        os.close(dir_fd)",
  "    os.rmdir(basename, dir_fd=parent_fd)",
  "",
  "def move_entry(src_parent_fd, src_basename, dst_parent_fd, dst_basename):",
  "    try:",
  "        os.rename(src_basename, dst_basename, src_dir_fd=src_parent_fd, dst_dir_fd=dst_parent_fd)",
  "        os.fsync(dst_parent_fd)",
  "        os.fsync(src_parent_fd)",
  "        return",
  "    except OSError as err:",
  "        if err.errno != errno.EXDEV:",
  "            raise",
  "    src_stat = os.lstat(src_basename, dir_fd=src_parent_fd)",
  "    if stat.S_ISDIR(src_stat.st_mode) and not stat.S_ISLNK(src_stat.st_mode):",
  "        temp_dir_name = create_temp_dir(dst_parent_fd, dst_basename, stat.S_IMODE(src_stat.st_mode) or 0o755)",
  "        temp_dir_fd = open_dir(temp_dir_name, dir_fd=dst_parent_fd)",
  "        src_dir_fd = open_dir(src_basename, dir_fd=src_parent_fd)",
  "        try:",
  "            for child in os.listdir(src_dir_fd):",
  "                move_entry(src_dir_fd, child, temp_dir_fd, child)",
  "        finally:",
  "            os.close(src_dir_fd)",
  "            os.close(temp_dir_fd)",
  "        os.rename(temp_dir_name, dst_basename, src_dir_fd=dst_parent_fd, dst_dir_fd=dst_parent_fd)",
  "        os.rmdir(src_basename, dir_fd=src_parent_fd)",
  "        os.fsync(dst_parent_fd)",
  "        os.fsync(src_parent_fd)",
  "        return",
  "    if stat.S_ISLNK(src_stat.st_mode):",
  "        link_target = os.readlink(src_basename, dir_fd=src_parent_fd)",
  "        try:",
  "            os.unlink(dst_basename, dir_fd=dst_parent_fd)",
  "        except FileNotFoundError:",
  "            pass",
  "        os.symlink(link_target, dst_basename, dir_fd=dst_parent_fd)",
  "        os.unlink(src_basename, dir_fd=src_parent_fd)",
  "        os.fsync(dst_parent_fd)",
  "        os.fsync(src_parent_fd)",
  "        return",
  "    src_fd = os.open(src_basename, READ_FLAGS, dir_fd=src_parent_fd)",
  "    temp_fd = None",
  "    temp_name = None",
  "    try:",
  "        src_file_stat = os.fstat(src_fd)",
  "        if not stat.S_ISREG(src_file_stat.st_mode):",
  "            raise OSError(errno.EPERM, 'only regular files are allowed', src_basename)",
  "        if src_file_stat.st_nlink > 1:",
  "            raise OSError(errno.EPERM, 'hardlinked file is not allowed', src_basename)",
  "        temp_name, temp_fd = create_temp_file(dst_parent_fd, dst_basename)",
  "        while True:",
  "            chunk = os.read(src_fd, 65536)",
  "            if not chunk:",
  "                break",
  "            os.write(temp_fd, chunk)",
  "        try:",
  "            os.fchmod(temp_fd, stat.S_IMODE(src_stat.st_mode))",
  "        except AttributeError:",
  "            pass",
  "        os.fsync(temp_fd)",
  "        os.close(temp_fd)",
  "        temp_fd = None",
  "        os.replace(temp_name, dst_basename, src_dir_fd=dst_parent_fd, dst_dir_fd=dst_parent_fd)",
  "        temp_name = None",
  "        os.unlink(src_basename, dir_fd=src_parent_fd)",
  "        os.fsync(dst_parent_fd)",
  "        os.fsync(src_parent_fd)",
  "    finally:",
  "        if temp_fd is not None:",
  "            os.close(temp_fd)",
  "        if temp_name is not None:",
  "            try:",
  "                os.unlink(temp_name, dir_fd=dst_parent_fd)",
  "            except FileNotFoundError:",
  "                pass",
  "        os.close(src_fd)",
  "",
  "if operation == 'write':",
  "    root_fd = open_dir(sys.argv[2])",
  "    parent_fd = None",
  "    try:",
  "        parent_fd = walk_dir(root_fd, sys.argv[3], sys.argv[5] == '1')",
  "        write_atomic(parent_fd, sys.argv[4], sys.stdin.buffer)",
  "    finally:",
  "        if parent_fd is not None:",
  "            os.close(parent_fd)",
  "        os.close(root_fd)",
  "elif operation == 'read':",
  "    root_fd = open_dir(sys.argv[2])",
  "    parent_fd = None",
  "    try:",
  "        parent_fd = walk_dir(root_fd, sys.argv[3], False)",
  "        read_file(parent_fd, sys.argv[4])",
  "    finally:",
  "        if parent_fd is not None:",
  "            os.close(parent_fd)",
  "        os.close(root_fd)",
  "elif operation == 'mkdirp':",
  "    root_fd = open_dir(sys.argv[2])",
  "    target_fd = None",
  "    try:",
  "        target_fd = walk_dir(root_fd, sys.argv[3], True)",
  "        os.fsync(target_fd)",
  "    finally:",
  "        if target_fd is not None:",
  "            os.close(target_fd)",
  "        os.close(root_fd)",
  "elif operation == 'remove':",
  "    root_fd = open_dir(sys.argv[2])",
  "    parent_fd = None",
  "    try:",
  "        parent_fd = walk_dir(root_fd, sys.argv[3], False)",
  "        try:",
  "            if sys.argv[5] == '1':",
  "                remove_tree(parent_fd, sys.argv[4])",
  "            else:",
  "                entry_stat = os.lstat(sys.argv[4], dir_fd=parent_fd)",
  "                if stat.S_ISDIR(entry_stat.st_mode) and not stat.S_ISLNK(entry_stat.st_mode):",
  "                    os.rmdir(sys.argv[4], dir_fd=parent_fd)",
  "                else:",
  "                    os.unlink(sys.argv[4], dir_fd=parent_fd)",
  "            os.fsync(parent_fd)",
  "        except FileNotFoundError:",
  "            if sys.argv[6] != '1':",
  "                raise",
  "    finally:",
  "        if parent_fd is not None:",
  "            os.close(parent_fd)",
  "        os.close(root_fd)",
  "elif operation == 'rename':",
  "    src_root_fd = open_dir(sys.argv[2])",
  "    dst_root_fd = open_dir(sys.argv[5])",
  "    src_parent_fd = None",
  "    dst_parent_fd = None",
  "    try:",
  "        src_parent_fd = walk_dir(src_root_fd, sys.argv[3], False)",
  "        dst_parent_fd = walk_dir(dst_root_fd, sys.argv[6], sys.argv[8] == '1')",
  "        move_entry(src_parent_fd, sys.argv[4], dst_parent_fd, sys.argv[7])",
  "    finally:",
  "        if src_parent_fd is not None:",
  "            os.close(src_parent_fd)",
  "        if dst_parent_fd is not None:",
  "            os.close(dst_parent_fd)",
  "        os.close(src_root_fd)",
  "        os.close(dst_root_fd)",
  "else:",
  "    raise RuntimeError('unknown sandbox mutation operation: ' + operation)",
].join("\n");

const SANDBOX_PINNED_MUTATION_PYTHON_SHELL_LITERAL = `'${SANDBOX_PINNED_MUTATION_PYTHON.replaceAll("'", `'\\''`)}'`;

function buildPinnedMutationPlan(params: {
  args: string[];
  checks: PathSafetyCheck[];
}): SandboxFsCommandPlan {
  return {
    checks: params.checks,
    recheckBeforeCommand: true,
    // Feed the helper source over fd 3 so stdin stays available for write payload bytes.
    script: [
      "set -eu",
      "python_cmd=''",
      ...SANDBOX_PINNED_MUTATION_PYTHON_CANDIDATES.map(
        (candidate) =>
          `if [ -z "$python_cmd" ] && [ -x '${candidate}' ]; then python_cmd='${candidate}'; fi`,
      ),
      'if [ -z "$python_cmd" ]; then python_cmd=$(command -v python3 2>/dev/null || command -v python 2>/dev/null || true); fi',
      'if [ -z "$python_cmd" ]; then',
      "  echo >&2 'sandbox pinned mutation helper requires python3 or python'",
      "  exit 127",
      "fi",
      `python_script=${SANDBOX_PINNED_MUTATION_PYTHON_SHELL_LITERAL}`,
      'exec "$python_cmd" -c "$python_script" "$@"',
    ].join("\n"),
    args: params.args,
  };
}

export function buildPinnedWritePlan(params: {
  check: PathSafetyCheck;
  pinned: PinnedSandboxEntry;
  mkdir: boolean;
}): SandboxFsCommandPlan {
  return buildPinnedMutationPlan({
    checks: [params.check],
    args: [
      "write",
      params.pinned.mountRootPath,
      params.pinned.relativeParentPath,
      params.pinned.basename,
      params.mkdir ? "1" : "0",
    ],
  });
}

export function buildPinnedMkdirpPlan(params: {
  check: PathSafetyCheck;
  pinned: PinnedSandboxDirectoryEntry;
}): SandboxFsCommandPlan {
  return buildPinnedMutationPlan({
    checks: [params.check],
    args: ["mkdirp", params.pinned.mountRootPath, params.pinned.relativePath],
  });
}

export function buildPinnedRemovePlan(params: {
  check: PathSafetyCheck;
  pinned: PinnedSandboxEntry;
  recursive?: boolean;
  force?: boolean;
}): SandboxFsCommandPlan {
  return buildPinnedMutationPlan({
    checks: [
      {
        target: params.check.target,
        options: {
          ...params.check.options,
          aliasPolicy: PATH_ALIAS_POLICIES.unlinkTarget,
        },
      },
    ],
    args: [
      "remove",
      params.pinned.mountRootPath,
      params.pinned.relativeParentPath,
      params.pinned.basename,
      params.recursive ? "1" : "0",
      params.force === false ? "0" : "1",
    ],
  });
}

export function buildPinnedRenamePlan(params: {
  fromCheck: PathSafetyCheck;
  toCheck: PathSafetyCheck;
  from: PinnedSandboxEntry;
  to: PinnedSandboxEntry;
}): SandboxFsCommandPlan {
  return buildPinnedMutationPlan({
    checks: [
      {
        target: params.fromCheck.target,
        options: {
          ...params.fromCheck.options,
          aliasPolicy: PATH_ALIAS_POLICIES.unlinkTarget,
        },
      },
      params.toCheck,
    ],
    args: [
      "rename",
      params.from.mountRootPath,
      params.from.relativeParentPath,
      params.from.basename,
      params.to.mountRootPath,
      params.to.relativeParentPath,
      params.to.basename,
      "1",
    ],
  });
}

¤ Dauer der Verarbeitung: 0.18 Sekunden  (vorverarbeitet am  2026-04-27) ¤

*© Formatika GbR, Deutschland






Wurzel

Suchen

Beweissystem der NASA

Beweissystem Isabelle

NIST Cobol Testsuite

Cephes Mathematical Library

Wiener Entwicklungsmethode

Haftungshinweis

Die Informationen auf dieser Webseite wurden nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit, noch Qualität der bereit gestellten Informationen zugesichert.

Bemerkung:

Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.