pwn_std
此模板后续还会继续完善,这是由贺师傅提出的一个板子,由我以及王师傅共同完善的一个板子
from pwn import *
from pwnlib.util.packing import u64
from pwnlib.util.packing import u32
from pwnlib.util.packing import u16
from pwnlib.util.packing import u8
from pwnlib.util.packing import p64
from pwnlib.util.packing import p32
from pwnlib.util.packing import p16
from pwnlib.util.packing import p8
import psutil, time
import json
from pathlib import Path
from urllib.parse import urlencode
import subprocess
import time
import os
DOCKER_RECIPE_DIR = Path.home() / ".cache" / "pwn-docker-dbg"
def getProcess(ip,port,name):
global p
if len(sys.argv) > 1 and sys.argv[1] == 'r':
# p = remote(ip, port,ssl=True)
p = remote(ip, port)
return p
else:
# p = process(name)
p = process(name,stdin=PTY)
return p
sl = lambda x: p.sendline(x)
sd = lambda x: p.send(x)
sa = lambda x, y: p.sendafter(x, y)
sla = lambda x, y: p.sendlineafter(x, y)
rc = lambda x: p.recv(x)
rl = lambda: p.recvline()
ru = lambda x: p.recvuntil(x)
ita = lambda: p.interactive()
slc = lambda: asm(shellcraft.sh())
uu64 = lambda x: u64(x.ljust(8, b'\0'))
uu32 = lambda x: u32(x.ljust(4, b'\0'))
def gdbbug(cmd=''):
gdb.attach(p,cmd)
pause()
def _pause_with_message(message=None):
if message:
log.info(message)
pause()
def forkbug(name,cmd=''):
command = ["ps", "-ax"]
grep_command = ["grep", name]
# 执行 ps 命令
ps_process = subprocess.Popen(command, stdout=subprocess.PIPE)
# 将 ps 命令的输出作为 grep 命令的输入
grep_process = subprocess.Popen(grep_command, stdin=ps_process.stdout, stdout=subprocess.PIPE)
# 允许 ps 命令的输出流直接传递到 grep 命令
ps_process.stdout.close()
# 获取最终输出
output = grep_process.communicate()[0]
# 输出结果
print(output.decode())
# pick the newest matching process and avoid the grep line
candidates = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'create_time']):
try:
cmdline = ' '.join(proc.info.get('cmdline') or [])
if name in cmdline and 'grep' not in cmdline:
candidates.append(proc.info)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if not candidates:
raise ValueError('no matching process found for: {}'.format(name))
pid = max(candidates, key=lambda x: x['create_time'])['pid']
gdb.attach(pid, cmd)
pause()
def docker_restart(container_name, wait=1.0):
log.info(f"Restarting docker container '{container_name}'...")
result = subprocess.run(
["docker", "restart", container_name],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
if result.returncode != 0:
log.warning(f"Docker restart failed for '{container_name}', trying to recreate it...")
docker_recreate(container_name)
if wait:
time.sleep(wait)
log.success(f"Docker container '{container_name}' restarted.")
def docker_recreate(container_name):
recipe_path = DOCKER_RECIPE_DIR / f"{container_name}.json"
if not recipe_path.exists():
raise FileNotFoundError(
f"No docker recipe found for {container_name}. Expected {recipe_path}"
)
recipe = json.loads(recipe_path.read_text())
subprocess.run(["docker", "rm", "-f", container_name], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
cmd = ["docker", "run", "-d", "--rm", "--name", container_name]
for port in recipe["ports"]:
cmd.extend(["-p", port])
gdb_port = recipe["gdb_port"]
cmd.extend([
"-p", f"{gdb_port}:{gdb_port}",
"--cap-add=SYS_PTRACE",
"--security-opt", "seccomp=unconfined",
recipe["debug_image"],
])
subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
log.success(f"Docker container '{container_name}' recreated from saved recipe.")
def docker_remote(ip, port, container_name=None, wait=1.0, *args, **kwargs):
if container_name:
docker_restart(container_name, wait=wait)
return remote(ip, port, *args, **kwargs)
def _docker_find_newest_pid(container_name, binary_name):
finder = r'''pattern="$1"
self_pid="$$"
best_pid=""
for proc in /proc/[0-9]*; do
pid="${proc#/proc/}"
if [ "$pid" = "$self_pid" ]; then
continue
fi
if [ ! -r "$proc/cmdline" ]; then
continue
fi
cmdline=$(tr '\000' ' ' < "$proc/cmdline" 2>/dev/null)
if [ -z "$cmdline" ]; then
continue
fi
case "$cmdline" in
*"$pattern"*)
if [ -z "$best_pid" ] || [ "$pid" -gt "$best_pid" ]; then
best_pid="$pid"
fi
;;
esac
done
if [ -z "$best_pid" ]; then
exit 1
fi
printf '%s\n' "$best_pid"
'''
return subprocess.check_output([
"docker", "exec", container_name,
"sh", "-c", finder, "sh", binary_name
]).decode().strip()
def _docker_get_cmdline(container_name, pid):
return subprocess.check_output([
"docker", "exec", container_name,
"sh", "-c", f"tr '\\000' ' ' < /proc/{pid}/cmdline"
]).decode(errors="replace").strip()
def _docker_find_newest_process(container_name, binary_name):
pid = _docker_find_newest_pid(container_name, binary_name)
cmdline = _docker_get_cmdline(container_name, pid)
return pid, cmdline
def _docker_start_gdbserver(container_name, pid, debug_port):
log.info(f"Starting gdbserver inside container for PID {pid}...")
subprocess.Popen([
"docker", "exec", "-u", "root", container_name,
"gdbserver", f"0.0.0.0:{debug_port}", "--attach", pid
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def _docker_write_gdb_script(script_path, debug_port, gdbscript, connect_timeout, poll_interval, target_kind="remote", setup_script=""):
with open(script_path, "w") as f:
f.write("set sysroot target:/\n")
f.write("set breakpoint pending on\n")
f.write("python\n")
f.write("import gdb, time\n")
f.write(f"deadline = time.time() + {float(connect_timeout)}\n")
f.write(f"poll_interval = {float(poll_interval)}\n")
f.write(f"target_cmd = {json.dumps(f'target {target_kind} 127.0.0.1:{debug_port}')}\n")
f.write("last_error = None\n")
f.write("while True:\n")
f.write(" try:\n")
f.write(" gdb.execute(target_cmd)\n")
f.write(" break\n")
f.write(" except gdb.error as exc:\n")
f.write(" last_error = exc\n")
f.write(" if time.time() >= deadline:\n")
f.write(" raise last_error\n")
f.write(" time.sleep(poll_interval)\n")
f.write("end\n")
if setup_script:
f.write(setup_script)
if not setup_script.endswith("\n"):
f.write("\n")
if gdbscript:
f.write(gdbscript + "\n")
def docker_attach(container_name, binary_name, gdbscript="", debug_port=1234, pause_after=True, pause_msg=None, connect_timeout=3.0, connect_retry_interval=0.1):
"""
Attach GDB to an already running process inside a Docker container.
Typical usage:
1. Start or restart the target container.
2. Call docker_attach(...) for a long-lived process such as a daemon or server.
3. Let GDB connect and set breakpoints.
4. Return to the exploit terminal and press Enter to continue the script.
"""
script_path = f"/tmp/docker_gdb_{container_name}_{debug_port}.gdb".replace("/", "_")
_docker_write_gdb_script(script_path, debug_port, gdbscript, connect_timeout, connect_retry_interval)
log.info(f"Looking for newest process '{binary_name}' in container '{container_name}'...")
time.sleep(0.5)
try:
pid, cmdline = _docker_find_newest_process(container_name, binary_name)
except subprocess.CalledProcessError:
log.error(f"Could not find process '{binary_name}' in container '{container_name}'.")
return
log.success(f"Found target '{binary_name}' with PID: {pid}")
log.info(f"Matched cmdline: {cmdline}")
_docker_start_gdbserver(container_name, pid, debug_port)
log.info("🚀 Launching local GDB session...")
# Tell pwntools to spawn gdb natively
# Automatically works with context.terminal = ['tmux', 'splitw', '-h']
util.misc.run_in_new_terminal(["gdb", "-x", script_path])
if pause_after:
if pause_msg:
_pause_with_message(pause_msg)
else:
pause()
def docker_attach_cgi(container_name, server_name, gdbscript="", cgi_name=None, debug_port=1234, pause_after=True, pause_msg=None, connect_timeout=3.0, connect_retry_interval=0.1):
"""
Attach GDB to a long-lived CGI parent process and let GDB follow fork/exec.
Typical usage:
1. Attach to the HTTP daemon such as http/lighttpd/uhttpd.
2. Keep the exploit running with pause_after=False.
3. Trigger the CGI request.
4. GDB follows the child and stops on exec, where your breakpoint script runs.
"""
setup_lines = [
"set detach-on-fork on",
"set follow-fork-mode child",
"set follow-exec-mode new",
"catch exec",
"commands",
"silent",
"python",
"import gdb",
"filename = gdb.current_progspace().filename or '<unknown>'",
"gdb.write(f'Caught exec into {filename}\\n')",
"end",
]
if cgi_name:
setup_lines.extend([
"python",
f"target_name = {json.dumps(cgi_name)}",
"filename = gdb.current_progspace().filename or ''",
"if target_name not in filename:",
" gdb.write(f'Exec target does not match {target_name}: {filename or \"<unknown>\"}\\n')",
"end",
])
if gdbscript:
setup_lines.append(gdbscript.rstrip("\n"))
setup_lines.append("end")
setup_script = "\n".join(setup_lines) + "\n"
script_path = f"/tmp/docker_cgi_gdb_{container_name}_{debug_port}.gdb".replace("/", "_")
_docker_write_gdb_script(
script_path,
debug_port,
gdbscript="",
connect_timeout=connect_timeout,
poll_interval=connect_retry_interval,
target_kind="extended-remote",
setup_script=setup_script,
)
log.info(f"Looking for CGI parent process '{server_name}' in container '{container_name}'...")
time.sleep(0.5)
try:
pid, cmdline = _docker_find_newest_process(container_name, server_name)
except subprocess.CalledProcessError:
log.error(f"Could not find process '{server_name}' in container '{container_name}'.")
return
log.success(f"Found CGI parent '{server_name}' with PID: {pid}")
log.info(f"Matched parent cmdline: {cmdline}")
_docker_start_gdbserver(container_name, pid, debug_port)
log.info("🚀 Launching local GDB session for CGI follow mode...")
util.misc.run_in_new_terminal(["gdb", "-x", script_path])
if pause_after:
if pause_msg:
_pause_with_message(pause_msg)
else:
_pause_with_message("GDB is attached to the parent server. Trigger the CGI request, then press Enter to continue.")
class HttpResponse:
"""Parsed HTTP response wrapper used by HttpSession."""
def __init__(self, raw=b""):
self.raw = raw or b""
self.status_line = b""
self.http_version = ""
self.status_code = 0
self.reason = ""
self.headers = {}
self.body = b""
self._parse()
def _parse(self):
# Split once at the HTTP header/body boundary.
if not self.raw:
return
if b"\r\n\r\n" in self.raw:
header_block, self.body = self.raw.split(b"\r\n\r\n", 1)
else:
header_block = self.raw
self.body = b""
lines = header_block.split(b"\r\n")
if not lines:
return
self.status_line = lines[0]
parts = self.status_line.decode("latin-1", errors="replace").split(" ", 2)
if len(parts) >= 2:
self.http_version = parts[0]
try:
self.status_code = int(parts[1])
except ValueError:
self.status_code = 0
if len(parts) >= 3:
self.reason = parts[2]
# Store headers as lowercase -> list[str] so repeated headers like
# Set-Cookie are preserved instead of overwritten.
for line in lines[1:]:
if b":" not in line:
continue
key, value = line.split(b":", 1)
key = key.decode("latin-1", errors="replace").strip().lower()
value = value.decode("latin-1", errors="replace").strip()
self.headers.setdefault(key, []).append(value)
def get_header(self, name, default=None):
values = self.headers.get(name.lower())
if not values:
return default
return values[-1]
def get_headers(self, name):
return list(self.headers.get(name.lower(), []))
@property
def text(self):
return self.body.decode("utf-8", errors="replace")
class HttpSession:
"""
Minimal HTTP client built on pwntools tubes.
Use this when you still want pwntools-level control over raw traffic,
but do not want to hand-write Host, Content-Length, Cookie, and basic
response parsing for every exploit.
Common flow:
sess = HttpSession("127.0.0.1", 8888)
r1 = sess.post_form("/cgi-bin/login.cgi", {"username": "a", "password": "b"})
r2 = sess.follow_redirect(r1)
r3 = sess.get("/control.html")
Cookies from Set-Cookie are stored automatically in sess.cookies and will
be sent with later requests unless you override the Cookie header yourself.
"""
def __init__(self, host, port, use_ssl=False, timeout=2, default_headers=None, connector=None):
self.host = host
self.port = port
self.use_ssl = use_ssl
self.timeout = timeout
self.cookies = {}
self.default_headers = {
"Host": f"{host}:{port}",
"User-Agent": "pwntools-http/0.1",
"Connection": "close",
}
if default_headers:
self.default_headers.update(default_headers)
self.connector = connector or self._default_connector
def _default_connector(self):
return remote(self.host, self.port, ssl=self.use_ssl)
@staticmethod
def _ensure_bytes(value):
if value is None:
return b""
if isinstance(value, bytes):
return value
if isinstance(value, str):
return value.encode()
return str(value).encode()
def _build_cookie_header(self, extra_cookie=None):
# Merge the session cookie jar with any per-request Cookie override.
cookie_map = dict(self.cookies)
if extra_cookie:
if isinstance(extra_cookie, dict):
cookie_map.update(extra_cookie)
else:
return extra_cookie
if not cookie_map:
return None
return "; ".join(f"{key}={value}" for key, value in cookie_map.items())
def build_request(self, method, path, headers=None, body=b"", append_body_crlf=False, body_suffix=b""):
# Build a raw HTTP/1.1 request. This keeps the output easy to inspect
# in pwntools debug mode and avoids hidden behavior from higher-level clients.
body = self._ensure_bytes(body)
body_suffix = self._ensure_bytes(body_suffix)
if body_suffix:
body += body_suffix
elif append_body_crlf and body and not body.endswith(b"\r\n"):
body += b"\r\n"
request_headers = dict(self.default_headers)
extra_cookie = None
if headers:
headers = dict(headers)
extra_cookie = headers.pop("Cookie", None)
request_headers.update(headers)
cookie_header = self._build_cookie_header(extra_cookie)
if cookie_header:
request_headers["Cookie"] = cookie_header
if body and "Content-Length" not in request_headers:
request_headers["Content-Length"] = str(len(body))
req = [f"{method} {path} HTTP/1.1"]
for key, value in request_headers.items():
req.append(f"{key}: {value}")
req.append("")
req.append("")
return "\r\n".join(req).encode() + body
def update_cookies(self, response):
# Only keep the name=value part of each Set-Cookie line.
for cookie_header in response.get_headers("set-cookie"):
first = cookie_header.split(";", 1)[0].strip()
if "=" not in first:
continue
key, value = first.split("=", 1)
self.cookies[key] = value
return dict(self.cookies)
def request(self, method, path, headers=None, body=b"", update_cookies=True, io=None, append_body_crlf=False, body_suffix=b""):
# By default, one request uses one connection. This is simpler and more
# stable for CTF services than trying to maintain keep-alive state.
tube = io or self.connector()
request_data = self.build_request(
method,
path,
headers=headers,
body=body,
append_body_crlf=append_body_crlf,
body_suffix=body_suffix,
)
tube.send(request_data)
raw = tube.recvall(timeout=self.timeout)
if io is None:
tube.close()
response = HttpResponse(raw)
if update_cookies:
self.update_cookies(response)
return response
def get(self, path, headers=None, update_cookies=True, io=None, append_body_crlf=False, body_suffix=b""):
return self.request("GET", path, headers=headers, update_cookies=update_cookies, io=io, append_body_crlf=append_body_crlf, body_suffix=body_suffix)
def post(self, path, body=b"", headers=None, update_cookies=True, io=None, append_body_crlf=False, body_suffix=b""):
return self.request("POST", path, headers=headers, body=body, update_cookies=update_cookies, io=io, append_body_crlf=append_body_crlf, body_suffix=body_suffix)
def post_form(self, path, data, headers=None, update_cookies=True, io=None, append_body_crlf=False, body_suffix=b""):
# data can contain plain strings or numbers. For raw bytes payloads,
# prefer post(...) and build the body yourself.
# body_suffix lets you explicitly control what is appended to the end
# of the form body, for example b"", b"\r\n", or b"\r\n\r\n".
body = urlencode(data).encode()
req_headers = {"Content-Type": "application/x-www-form-urlencoded"}
if headers:
req_headers.update(headers)
return self.post(path, body=body, headers=req_headers, update_cookies=update_cookies, io=io, append_body_crlf=append_body_crlf, body_suffix=body_suffix)
def post_json(self, path, data, headers=None, update_cookies=True, io=None, append_body_crlf=False, body_suffix=b""):
body = json.dumps(data).encode()
req_headers = {"Content-Type": "application/json"}
if headers:
req_headers.update(headers)
return self.post(path, body=body, headers=req_headers, update_cookies=update_cookies, io=io, append_body_crlf=append_body_crlf, body_suffix=body_suffix)
def follow_redirect(self, response, headers=None, update_cookies=True, io=None):
# Deliberately follows exactly one Location hop so the exploit keeps
# control over redirect chains and intermediate cookies.
location = response.get_header("location")
if not location:
return response
return self.get(location, headers=headers, update_cookies=update_cookies, io=io)
pwn_std
https://a1b2rt.cn//archives/wo-de-pwn_std