RCTF2025

爆零了,赛后看其他战队师傅的WP,发现有很多题还是在能解决的范围内的,关于调教AI的手法还有些许不足,也有些知识点不太全面,来进行一个复现

SuanP01y

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from sage.all import GF, ZZ, sample, gcd, PolynomialRing
from Crypto.Cipher import AES
from hashlib import md5
import os

r, d = 16381, 41
R.<x> = PolynomialRing(GF(2))
S.<X> = R.quo(x^r - 1)

def suan_p01y(nt, db):
return sum(x^i for i in set(sample(range(db+1), nt)))

while True:
t = [suan_p01y(d, r//3) for _ in range(2)]
if gcd(t[0], t[1]) != 1:
continue
h = [(ti * X^ZZ.random_element(r)) for ti in t]
if h[0].is_unit():
break

with open("output.txt", "w") as f:
f.write(f"hint = {h[1] / h[0]}\n")
f.write(
AES.new(
key=md5(str(h[0]).encode()).digest(),
nonce=b"suanp01y",
mode=AES.MODE_CTR
).encrypt(os.environ.get("FLAG", "RCTF{fake_flag}").encode()).hex()
)

代数结构基于多项式环上的商环 给出两个稀疏随机多项式 满足

  • 指数采样空间为
  • 多项式不同指数数固定为
  • 多项式系数只为0或者1 即为二元稀疏多项式

随后进行随机的平移得到我们的 告诉你 需要从中恢复

给出的形如一系列包含商环元的代数式 我们可以把它会恢复到我们商环上对应到多项式,也就是进行一个Lifting 此时我们的问题就转化为 要你去恢复我们的

参考星盟师傅的思路 对于1式 我们可以进行如下的一个调整 我们可以得到 完全可以爆破 那我们转化为 因为的限制很足 可以放心进行多项式重构 我们补充点知识点

已知模多项式 多项式,当次数上界给定下,对于问题

如果 那么我们的多项式重构比例是唯一的 这里我们的的阶为 恰好加起来小于

那么关于有理重构的手法 山海关的师傅们给出了一个很优雅的EEA重构

用GPT-5.1-Codex试了下 能够直接看出我们的背景 BIKE/QC‑MDPC 下的私钥恢复

exp by Codex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from sageall import *
from Crypto.Cipher import AES
from hashlib import md5

r = 16381; n = r//3
R.<x> = PolynomialRing(GF(2))
S.<X> = R.quo(x^r - 1)
hint_poly = S("....")
ct = bytes.fromhex(open("output.txt").read().splitlines()[1].strip())

def rat_rec(hs):
m = x^r - 1
r0, r1 = m, hs.lift()
a0, a1 = R(1), R(0)
b0, b1 = R(0), R(1)
while True:
if r1 == 0: return None
if r1.degree() <= n and b1.degree() <= n:
return r1, b1 # hs*b1 ≡ r1 (mod m)
q = r0 // r1
r0, r1 = r1, r0 - q*r1
a0, a1 = a1, a0 - q*a1
b0, b1 = b1, b0 - q*b1

t0 = t1 = None
for s in range(r):
hs = X^s * hint_poly
cand = rat_rec(hs)
if not cand: continue
num, den = cand
if len(num.exponents()) == 41 and len(den.exponents()) == 41:
t1, t0 = num, den # shift d = -s mod r
break

for k in range(r):
h0 = X^k * t0
key = md5(str(h0).encode()).digest()
pt = AES.new(key=key, mode=AES.MODE_CTR, nonce=b"suanp01y").decrypt(ct)
if b"RCTF{" in pt:
print("found shift", k)
print(pt.decode(errors="ignore"))
break

'''
found shift 12246
RCTF{i_just_h0pe_ChatGPT_doesnt_inst@ntly_so1ve_thi5_one.}
'''

Repair

main.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use std::env;
use std::io::{self, Write};

use ark_bls12_381::{Fr, G1Projective};
use ark_ec::{AffineRepr, CurveGroup, PrimeGroup};
use ark_ec::pairing::Pairing;
use ark_std::UniformRand;
use rand::rngs::OsRng;

use common::*;

fn read_line_trimmed() -> String {
let mut s = String::new();
let _ = io::stdin().read_line(&mut s);
s.trim().to_string()
}

fn main() {
let flag = env::var("FLAG")
.ok()
.filter(|s| !s.is_empty())
.unwrap_or_else(|| "RCTF{test_flag}".to_string());
let flag_len = flag.as_bytes().len();

let (msk, mpk) = setup();
let id = "suansuan@rois.team";
let partial_sk = psk(id, &msk);
let (sk, pk) = sk(id, &partial_sk, &mpk);

let mut rng = OsRng;
let pairing_key = ark_bls12_381::Bls12_381::pairing(
(G1Projective::generator() * Fr::rand(&mut rng)).into_affine(),
(h1("xorkey") * Fr::rand(&mut rng)).into_affine(),
)
.0;

let ct = enc(&pk, id, pairing_key);
let key = kdf(&pairing_key, flag_len);

let mut encrypted_flag = flag.as_bytes().to_vec();
xor(&mut encrypted_flag, &key);

let (c1_hex, c2_hex, c3_hex) = hex_ct(&ct);
let banner = format!(
"{}|{}|{}|{}|{}|{}|{}|{}\n",
hex::encode(id.as_bytes()),
hex::encode(DST),
hex_gt(&pk.pk),
hex_g2(&h1(id)),
c1_hex,
c2_hex,
c3_hex,
hex::encode(&encrypted_flag),
);
print!("{banner}");
let _ = io::stdout().flush();

// read <C1'>|<C2'>|<C3'>
let line = read_line_trimmed();
let mut parts = line.split('|');
let (Some(x1), Some(x2), Some(x3)) = (parts.next(), parts.next(), parts.next()) else {
println!("bad");
return;
};

if parts.next().is_some() {
println!("bad");
return;
}

let c1_q = match parse_gt(x1) {
Ok(v) => v,
Err(_) => {
println!("bad");
return;
}
};
let c2_q = match parse_g1(x2) {
Ok(v) => v,
Err(_) => {
println!("bad");
return;
}
};
let c3_q = match parse_g2(x3) {
Ok(v) => v,
Err(_) => {
println!("bad");
return;
}
};

let a2 = c2_q.into_affine();
let a3 = c3_q.into_affine();

if a2.is_zero() || a3.is_zero() {
println!("bad");
return;
}

if !a2.is_in_correct_subgroup_assuming_on_curve()
|| !a3.is_in_correct_subgroup_assuming_on_curve()
{
println!("bad");
return;
}

if c1_q == ct.c1 && c2_q == ct.c2 && c3_q == ct.c3 {
println!("no");
return;
}

let shared = dec(
&sk,
&CT {
c1: c1_q,
c2: c2_q,
c3: c3_q,
},
);
let key = kdf(&shared, flag_len);
println!("{}", hex::encode(&key));
}

lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
use ark_bls12_381::{Bls12_381, Fr, G1Affine, G1Projective, G2Affine, G2Projective};
use ark_ec::{hashing::{
curve_maps::wb::WBMap,
map_to_curve_hasher::MapToCurveBasedHasher,
HashToCurve,
}, pairing::Pairing, CurveGroup, PrimeGroup};
use ark_ff::{field_hashers::DefaultFieldHasher, Field, PrimeField, UniformRand, Zero};
use ark_serialize::{CanonicalDeserialize, CanonicalSerialize};
use rand::rngs::OsRng;
use sha2::{Digest, Sha256};
use std::io;

pub const DST: &[u8] = b"RCTF-H1:BLS12381G2_XMD:SHA-256_CTF_RO_v1";

pub type H2G2 = MapToCurveBasedHasher<
G2Projective,
DefaultFieldHasher<Sha256, 128>,
WBMap<ark_bls12_381::g2::Config>,
>;
pub type GT = <Bls12_381 as Pairing>::TargetField;

#[derive(Clone)]
pub struct PSK {
pub p1: G2Projective,
pub p2: G1Projective,
}

#[derive(Clone)]
pub struct SK {
pub s1: G2Projective,
pub s2: G1Projective,
}

#[derive(Clone)]
pub struct PK {
pub pk: GT,
}

#[derive(Clone)]
pub struct CT {
pub c1: GT,
pub c2: G1Projective,
pub c3: G2Projective,
}

pub fn h1(id: &str) -> G2Projective {
let h = H2G2::new(DST).expect("init");
h.hash(id.as_bytes()).expect("h2c").into()
}

// ---- hex helpers ----

pub fn hex_g1(p: &G1Projective) -> String {
let a: G1Affine = (*p).into_affine();
let mut v = Vec::new();
a.serialize_compressed(&mut v).unwrap();
hex::encode(v)
}

pub fn hex_g2(p: &G2Projective) -> String {
let a: G2Affine = (*p).into_affine();
let mut v = Vec::new();
a.serialize_compressed(&mut v).unwrap();
hex::encode(v)
}

pub fn hex_gt(x: &GT) -> String {
let mut v = Vec::new();
x.serialize_compressed(&mut v).unwrap();
hex::encode(v)
}

pub fn parse_g1(s: &str) -> io::Result<G1Projective> {
let b = hex::decode(s)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "hex"))?;
let a = G1Affine::deserialize_compressed(&*b)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "g1"))?;
Ok(G1Projective::from(a))
}

pub fn parse_g2(s: &str) -> io::Result<G2Projective> {
let b = hex::decode(s)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "hex"))?;
let a = G2Affine::deserialize_compressed(&*b)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "g2"))?;
Ok(G2Projective::from(a))
}

pub fn parse_gt(s: &str) -> io::Result<GT> {
let b = hex::decode(s)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "hex"))?;
GT::deserialize_compressed(&*b)
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "gt"))
}

// ---- KDF & XOR ----

pub fn kdf(k: &GT, n: usize) -> Vec<u8> {
let mut ser = Vec::new();
k.serialize_compressed(&mut ser).unwrap();

let mut out = Vec::with_capacity(n);
let mut ctr: u32 = 0;

while out.len() < n {
let mut h = Sha256::new();
h.update(b"RCTF-KDF-v1");
h.update(&ser);
h.update(ctr.to_be_bytes());

let digest = h.finalize();
let take = std::cmp::min(n - out.len(), digest.len());
out.extend_from_slice(&digest[..take]);
ctr = ctr.wrapping_add(1);
}

out
}

pub fn xor(buf: &mut [u8], mask: &[u8]) {
for (a, b) in buf.iter_mut().zip(mask.iter()) {
*a ^= *b;
}
}

// ---- random non-zero scalar ----

fn random_non_zero_scalar() -> Fr {
let mut rng = OsRng;
loop {
let s = Fr::rand(&mut rng);
if !s.is_zero() {
return s;
}
}
}

// ---- scheme ----

pub fn setup() -> (G2Projective, GT) {
let mut rng = OsRng;
let h = G2Projective::rand(&mut rng);
let a = random_non_zero_scalar();

let msk = h * a;
let mpk = Bls12_381::pairing(
G1Projective::generator().into_affine(),
msk.into_affine(),
)
.0;

(msk, mpk)
}

pub fn psk(id: &str, msk: &G2Projective) -> PSK {
let r1 = random_non_zero_scalar();
let q = h1(id);

PSK {
p1: *msk + q * r1,
p2: G1Projective::generator() * r1,
}
}

pub fn sk(id: &str, ps: &PSK, mpk: &GT) -> (SK, PK) {
let xi = random_non_zero_scalar();
let r2 = random_non_zero_scalar();
let q = h1(id);

let s1 = ps.p1 * xi + q * r2;
let s2 = ps.p2 * xi + G1Projective::generator() * r2;
let pk = mpk.pow(xi.into_bigint());

(SK { s1, s2 }, PK { pk })
}

pub fn enc(pk: &PK, id: &str, shared_key: GT) -> CT {
let t = random_non_zero_scalar();
let q = h1(id);

CT {
c1: shared_key * pk.pk.pow(t.into_bigint()),
c2: G1Projective::generator() * t,
c3: q * t,
}
}

pub fn dec(sk: &SK, ct: &CT) -> GT {
let numerator = Bls12_381::pairing(sk.s2.into_affine(), ct.c3.into_affine()).0;
let denominator = Bls12_381::pairing(ct.c2.into_affine(), sk.s1.into_affine()).0;
ct.c1 * numerator * denominator.inverse().unwrap()
}

// ---- io-friendly ----

pub fn hex_ct(ct: &CT) -> (String, String, String) {
(hex_gt(&ct.c1), hex_g1(&ct.c2), hex_g2(&ct.c3))
}

题目构造了一个基于Rust的配对加密系统 逻辑如下

  • 启动时:用 setup() 生成 (msk, mpk),固定 id,派生 partial_sksk/`pk。随机取一对 GT 密钥 pairing_key,用 enc(&pk, id, pairing_key) 生成挑战密文 ct。对 FLAG 做 kdf(pairing_key) 后异或得到 encrypted_flag
  • 输出一行 banner,8 段 | 分隔:id_hex | DST_hex | pk_gt | h1(id)_g2 | c1 | c2 | c3 | encrypted_flag_hex
  • 交互:读取用户回传的 c1'|c2'|c3';校验格式、非零、子群;仅拒绝与原始 (c1,c2,c3) 完全相同的输入。其它都解密:shared = dec(sk, CT{c1',c2',c3'}),再打印 hex(kdf(shared))。

因为这里的检测逻辑是

  • 格式检查:读取一行,按 | 切成三段,逐段用 parse_gt/parse_g1/parse_g2 解码,失败即 “bad”。
  • 零元检查:a2.is_zero() || a3.is_zero() 拒绝零点。
  • 子群检查:is_in_correct_subgroup_assuming_on_curve() 确保在正确的子群。
  • 原始密文检查:如果 c1'==c1 && c2'==c2 && c3'==c3 则 “no”,拒绝解密原始挑战。

所以说我们可以对我们的c1 c2 c3进行一个合法的重随机处理 让我们的靶机无意间对自己加密的flag进行解密 因为我们的计算过程是在EG上

我们进行一个新的随机化 即

那么我们传回我们的(c1', c2', c3')就相当于用新的随机数对相同的明文进行加密 那么解密得到的kdf(M')就是一致的 直接XOR即可还原flag

exp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
use std::io::{BufRead, BufReader, Write};
use std::net::TcpStream;

use ark_bls12_381::{Fr, G1Projective};
use ark_ec::PrimeGroup;
use ark_ff::{Field, PrimeField};
use ark_std::UniformRand;

use common::*;

fn main() {
// 连接 socat 暴露的本地服务
let mut stream = TcpStream::connect("1.14.196.78:42601").expect("connect server");
let mut out = BufReader::new(stream.try_clone().expect("clone stream"));

// 读取服务端 banner
let mut banner = String::new();
out.read_line(&mut banner).expect("read banner");
let parts: Vec<_> = banner.trim().split('|').collect();
let pk = parse_gt(parts[2]).expect("pk");
let q = parse_g2(parts[3]).expect("q");
let c1 = parse_gt(parts[4]).expect("c1");
let c2 = parse_g1(parts[5]).expect("c2");
let c3 = parse_g2(parts[6]).expect("c3");
let mut enc_flag = hex::decode(parts[7]).expect("enc flag");

// 重随机化 ciphertext: t -> t + r
let r = Fr::rand(&mut rand::thread_rng());
let c1p = c1 * pk.pow(r.into_bigint());
let c2p = c2 + G1Projective::generator() * r;
let c3p = c3 + q * r;

// 发送新密文
let payload = format!("{}|{}|{}\n", hex_gt(&c1p), hex_g1(&c2p), hex_g2(&c3p));
stream
.write_all(payload.as_bytes())
.expect("write payload");

// 读取 oracle 返回的 kdf(M),解出 flag
// pty 可能会回显我们发的 payload,过滤到真正的 hex 行
let key = loop {
let mut line = String::new();
let n = out.read_line(&mut line).expect("read kdf/echo");
if n == 0 {
panic!("server closed");
}
let t = line.trim();
if t.contains('|') || t.len() % 2 != 0 {
continue;
}
if let Ok(buf) = hex::decode(t) {
break buf;
}
};

xor(&mut enc_flag, &key);

println!("flag = {}", String::from_utf8_lossy(&enc_flag));
}

SuanHash

Task.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from __future__ import annotations
from suanhash import SuanHash
import os

ROUNDS = 25 * 20
QUERIES_PER_ROUND = 4

def suanhash_fn():
base = SuanHash()
return lambda msg: (lambda h=base.copy(): (h.update(msg), h.hexdigest())[1])()

print(
"""
███████╗██╗ ██╗ █████╗ ███╗ ██╗██╗ ██╗ █████╗ ███████╗██╗ ██╗
██╔════╝██║ ██║██╔══██╗████╗ ██║██║ ██║██╔══██╗██╔════╝██║ ██║
███████╗██║ ██║███████║██╔██╗ ██║███████║███████║███████╗███████║
╚════██║██║ ██║██╔══██║██║╚██╗██║██╔══██║██╔══██║╚════██║██╔══██║
███████║╚██████╔╝██║ ██║██║ ╚████║██║ ██║██║ ██║███████║██║ ██║
╚══════╝ ╚═════╝ ╚═╝ ╚═╝╚═╝ ╚═══╝╚═╝ ╚═╝╚═╝ ╚═╝╚══════╝╚═╝ ╚═╝
"""
)
print(f"💪 Survive {ROUNDS} rounds to get the flag!\n", flush=True)

for rnd in range(1, ROUNDS + 1):
suan_fn = suanhash_fn()

print(f"[Round {rnd}]", flush=True)

seen = {} # hash -> msg
collided = False

for q in range(1, QUERIES_PER_ROUND + 1):
msg = bytes.fromhex(input(f"💬 MSG {q} (hex): ").strip())
h = suan_fn(msg)
print(f"H = {h}", flush=True)
if h in seen and seen[h] != msg:
collided = True
else:
seen[h] = msg

if not collided:
print("❌ No hash collision in this round. Game over.")
exit(-1)
else:
print("✅ Hash collision found! Next round.\n", flush=True)

print(f"🎉 Nice! Here is your flag: {os.environ.get('FLAG', 'RCTF{fake_flag}')}")

suanHash.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import os
import secrets
from Crypto.Cipher import AES


class SuanHash:
state_bits = 128
rate_bits = 64
cap_bits = state_bits - rate_bits
digest_size = state_bits // 8
block_size = state_bits // 8
_rate_mask = (1 << rate_bits) - 1
_cap_mask = (1 << cap_bits) - 1

def __init__(self, data: bytes = b""):
self._perm_seed = os.urandom(16)
self._cipher = AES.new(self._perm_seed, AES.MODE_ECB)
self._cfg_hi = secrets.randbits(self.rate_bits)
self._cfg_lo = secrets.randbits(self.cap_bits)
self._buf = bytearray()
self._done = False
self._value = None
if data:
self.update(data)

def _permute(self, x: int) -> int:
return int.from_bytes(self._cipher.encrypt(x.to_bytes(16, "big")), "big")

def _pad_to_blocks(self, msg: bytes) -> list[int]:
n = self.block_size
if not msg:
data = b"\x80" + b"\x00" * (n - 1)
else:
data = msg + b"\x80"
data += b"\x00" * (-len(data) % n)
return [int.from_bytes(data[i : i + n], "big") for i in range(0, len(data), n)]

def _core(self, blocks: list[int], out_bits: int | None = None) -> int:
if out_bits is None:
out_bits = self.state_bits
b, r, c = self.state_bits, self.rate_bits, self.cap_bits
rm, cm = self._rate_mask, self._cap_mask
s0 = self._permute(((self._cfg_hi & rm) << c) | (self._cfg_lo & cm))
upper, lower, last_low = s0 >> c, s0 & cm, 0
for blk in blocks:
mixed = ((upper << c) | lower) ^ blk
s = self._permute(mixed)
upper, lower = s >> c, s & cm
last_low = mixed & cm
need = (out_bits + b - 1) // b
acc_hi = acc_lo = used_hi = used_lo = 0
cur_hi, cur_lo, cur_w = upper, lower, last_low
for i in range(need):
acc_hi = (acc_hi << r) | (cur_hi & rm)
used_hi += r
acc_lo = (acc_lo << c) | ((cur_w ^ cur_lo) & cm)
used_lo += c
if i < need - 1:
nxt_in = ((cur_hi & rm) << c) | (cur_lo & cm)
nxt = self._permute(nxt_in)
cur_hi, cur_lo, cur_w = nxt >> c, nxt & cm, nxt_in & cm
full = (acc_hi << used_lo) | acc_lo
return (full >> (used_hi + used_lo - out_bits)) & ((1 << out_bits) - 1)

def update(self, data: bytes):
if self._done:
raise ValueError("hash object already finalized")
self._buf.extend(data)
return self

def _compute(self) -> int:
return self._core(self._pad_to_blocks(bytes(self._buf)), self.state_bits)

def digest(self) -> bytes:
if not self._done:
self._value = self._compute()
self._done = True
return self._value.to_bytes(self.digest_size, "big")

def hexdigest(self) -> str:
return self.digest().hex()

def copy(self):
other = self.__class__.__new__(self.__class__)
other._perm_seed = self._perm_seed
other._cipher = AES.new(self._perm_seed, AES.MODE_ECB)
other._cfg_hi = self._cfg_hi
other._cfg_lo = self._cfg_lo
other._buf = bytearray(self._buf)
other._done = False # reset finalized flag
other._value = None # clear cached digest
return other

@classmethod
def new(cls, data: bytes = b""):
h = cls()
if data:
h.update(data)
return h

Task要求在4轮内找到一组碰撞的明文msg,500轮后即可获得flag

先把SuanHash喂给LLM 审计后得到以下特点

SuanHash 结构

  • 海绵式:状态 128 位,rate 64、capacity 64(在实现中拆成 hi 64 / lo 64)。
  • 初始化:随机 perm_seed 生成 AES-ECB 置换 _permute;随机 cfg_hi/cfg_lo 作为 IV。
  • _pad_to_blocks:标准 0x80 + 0 填充到 16 字节倍数。
  • _core
    1. 初态 s0 = perm((cfg_hi<<64) | cfg_lo);拆成 upper, lower。
    2. 吸收:对每块 blk 做 mixed = ((upper<<64)|lower) ^ blk;s = perm(mixed);更新 upper = s>>64, lower = s&((1<<64)-1);同时记录 last_low = mixed & cap_mask。
    3. 榨出:输出比特来自迭代 perm,每轮取 cur_hi & rate_mask 进入高部,取 (cur_w ^ cur_lo) & cap_mask 进入低部,cur_w 初始为 last_low,下一轮设为输入的低 64 位。
    4. digest/hexdigest 缓存,copy 复制 cfg 和 perm seed。

关键点:输出的低 64 位是 cur_w ^ cur_lo,其中 cur_w 是上一轮输入低 64 位;在单块消息时,cur_w 就是第一块的低 64 位(被 XOR 进输出)。

Task 逻辑

  • 每轮重新构造 SuanHash(随机 perm 和 cfg),允许 4 次查询;如无碰撞则直接失败;共 500 轮,全部通过给 flag。
  • suanhash_fn 固定了 base 实例,但在每轮对 base.copy(),保证同一轮内配置一致,不同轮重新随机。

碰撞原因与构造 对单块消息 B(含填充):

  • 吸收后 t = ((upper0<<64) | lower0) ^ B,s = perm(t)。
  • 输出块时,低部为 low_out = t_low ^ s_low(因为 cur_w = t_low,cur_low = s_low);高部为 s_high。
  • 将该哈希 H(B) 作为第二块再喂入:
    • 新输入的低 64 位等于 low_out,而这一轮的 cur_w(上一输入低 64)正好也等于 low_out,因此榨出时低部变成 low_out ^ new_s_low;高部来自 new_s_high。
    • 关键:第二块吸收前的输入为 mix2 = ((s_high<<64)|s_low) ^ H(B)。如果两条消息的第一块低 64 位相同,则它们的 t_low 相同,low_out 相同。由于第二块是各自的 H(B),mix2 的低 64 位一致,高 64 位也一致(因为 s_high 仅由同样的低输入决定 perm 输出)。于是两条路径在进入第二块时状态完全一致,后续 padding 块一致,最终输出一致。

LLM没怎么说人话()

核心思路就是输入给AES置换的mixed的差分等价关系人工可控 我们假设喂进去的块是B 我们的输出是

  • high = U1 也就是吸收B后的高64位
  • low = (L0 xor B_low) xor L1 L0/1是吸收前/吸收后的State低64位, B_low是喂进去Pad后的低64位

首先high是没有任何干扰的 low的话 我们很明显知道B_low 而L0 也就是吸收前的state 在我们4轮轮询中是不变的

因为已知填充规则 我们有四次 先喂数据X Y

那么我们能得到 (high1,low1) (high2,low2)

吸收后State高位的差分就直接是 high_delta = high1 xor high2

那低位呢? 我们注意到X_low Y_low已知 那么直接去xor把我们喂的数据块给剥离 也就是

(low1 xor X_low) xor (low2 xor Y_low) 可以得到 L0 xor L0 xor Lx1 xor Ly1 = Lx1 xor Ly1

转换回我们想看到的 就是 Low1 xor Low2 = Lx1 xor Ly1 = (low1 xor X_low) xor (low2 xor Y_low) = low_delta

所以我们能构造出任意两条数据喂进去吸收之后的状态差分Delta

接下来我们利用这个Delta去构造对齐完成碰撞

我们先输入一个块M 它被吸完后对应状态S1 我们知道AES吃的是 mixed1 = M xor S1

我们构造N = M xor Delta 那么吸收完后对应的状态S2 想要AES吃的 mixed2 = N xor S2 = mixed1

此时因为N xor S2 = M xor S2 xor Delta

而我们知道所有的状态前后差分都是Delta 那么对应M N S1 S2 我们进行XOR 能将S2的状态纠正回S1

就得到了mixed1 = mixed2 此时AES置换输出相同 后续混淆也不用管了

思路理清楚了 让LLM给出了一个构造 第一个块选取低64位一致的X Y 第二块选取对应的哈希H(X) H(Y)

exp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import os
import sys
from subprocess import STDOUT
from typing import Tuple

from pwn import process, context

context.log_level = "info"
ROUNDS = 20 * 25


def craft_payloads(h1: str, h2: str) -> Tuple[str, str]:
"""Return hex-encoded payloads for queries 3 and 4."""
b1 = b"\x80" + b"\x00" * 15 # padding block for empty message
b2 = b"\x01\x80" + b"\x00" * 14 # padding block for b"\x01"
m3 = b1 + bytes.fromhex(h1)
m4 = b2 + bytes.fromhex(h2)
return m3.hex(), m4.hex()


def recv_hash(p) -> str:
"""Read a line like 'H = <hex>' and return the hex digest."""
while True:
line = p.recvline().decode(errors="ignore").strip()
if line.startswith("H = "):
return line.split(" = ", 1)[1]


def main():
# Use the current interpreter to avoid path issues on Windows/WSL.
p = process(
[sys.executable, "task.py"],
cwd=".",
env=dict(os.environ, PWNLIB_NOTERM="1"),
stderr=STDOUT, # surface import/runtime errors
)

# Skip banner until the first prompt of round 1.
if not p.can_recv(timeout=5):
buf = p.recvall(timeout=1)
print(f"[!] No banner/prompt, got: {buf!r}")
return
try:
p.recvuntil(b"MSG 1", timeout=5)
except Exception:
buf = p.recv(timeout=1)
print(f"[!] Did not find 'MSG 1' prompt, got: {buf!r}")
return

for rnd in range(1, ROUNDS + 1):
# Q1: empty message
p.recvuntil(b"MSG 1", timeout=5)
p.recvuntil(b": ", timeout=5)
p.sendline(b"")
h1 = recv_hash(p)

# Q2: single byte 0x01
p.recvuntil(b"MSG 2", timeout=5)
p.recvuntil(b": ", timeout=5)
p.sendline(b"01")
h2 = recv_hash(p)

# Q3/Q4: feed back hashes with crafted first blocks
m3_hex, m4_hex = craft_payloads(h1, h2)

p.recvuntil(b"MSG 3", timeout=5)
p.recvuntil(b": ", timeout=5)
p.sendline(m3_hex.encode())
_ = recv_hash(p)

p.recvuntil(b"MSG 4", timeout=5)
p.recvuntil(b": ", timeout=5)
p.sendline(m4_hex.encode())
_ = recv_hash(p)

# Read collision status
status = p.recvline().decode(errors="ignore")
if "Hash collision found" not in status:
print(f"[!] Unexpected response at round {rnd}: {status.strip()}")
return

if rnd < ROUNDS:
# Skip to the next round's first prompt
p.recvuntil(b"MSG 1")
else:
# Final round: expect the flag line next
flag_line = p.recvline().decode(errors="ignore").strip()
print(flag_line)
rest = p.recvall(timeout=1).decode(errors="ignore").strip()
if rest:
print(rest)
return


if __name__ == "__main__":
main()