mitmproxy + python 实现游戏协议测试

random
random
订阅者
10532
文章
0
粉丝
测试交流92,004字数 2825阅读9分25秒阅读模式

mitmproxy + python 实现游戏协议测试

本文侧重介绍如何使用 python 和 mitmproxy 实现拦截数据包、重发数据包,以及解析 protobuf 数据内容,对于相关依赖的安装不做介绍。

一、游戏协议安全测试内容

参考https://testerhome.com/topics/29053,这篇文章讲的很清楚。文章源自玩技e族-https://www.playezu.com/246905.html


二、实现原理

想直接使用的同学可以跳到第三部分。文章源自玩技e族-https://www.playezu.com/246905.html

mitmproxy 作为代理,可以获取客户端与服务端通信的数据,并且可以拦截、修改和自主发送数据。当配合其证书使用时,还可以解密 wss 连接中的 websocket 数据。文章源自玩技e族-https://www.playezu.com/246905.html

  • Websotcket 数据处理源码分析

在 http 代理的过程中若发现 upgrade websocket 请求,则创建 WebSocketLayer 实例,并调用其_call_方法。文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/protocol/http.py
"""以下为Httplayer的_process_flow方法的部分代码"""
if f.response.status_code == 101:
# Handle a successful HTTP 101 Switching Protocols Response,
    # received after e.g. a WebSocket upgrade request.
    # Check for WebSocket handshake
    is_websocket = (
websockets.check_handshake(f.request.headers) and
websockets.check_handshake(f.response.headers)
)
if is_websocket and not self.config.options.websocket:
self.log(
"Client requested WebSocket connection, but the protocol is disabled.",
"info"
)
if is_websocket and self.config.options.websocket:
layer = WebSocketLayer(self, f)
else:
layer = self.ctx.next_layer(self)
layer()

WebSocketLayer 初始化时会创建用于此次 websocket 通信的编解码器。文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的init方法,省略部分代码"""
def __init__(self, ctx, handshake_flow):
super().__init__(ctx)
self.handshake_flow = handshake_flow
self.connections: dict[object, WSConnection] = {}
client_extensions = []
server_extensions = []
# 判断交互数据是否使用deflate压缩
    if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers:
if PerMessageDeflate.name in handshake_flow.response.headers['Sec-WebSocket-Extensions']:
client_extensions = [PerMessageDeflate()]
server_extensions = [PerMessageDeflate()]
# self.client_conn和self.server_conn继承自ctx,即原http的client和server,原理为父类的__getattr__(self, name)方法返回的是getattr(self.ctx, name)。WSConnection是一个websocket协议编解码器,实际不会发送任何网络IO,文档地址:https://python-hyper.org/projects/wsproto/en/latest/basic-usage.html
    # 负责和解码server收到信息和编码server发送的信息
    self.connections[self.client_conn] = WSConnection(ConnectionType.SERVER)
# 负责和解码client收到信息和编码client发送的信息
    self.connections[self.server_conn] = WSConnection(ConnectionType.CLIENT)
# 构造发送给Server的websocket的握手请求
    request = Request(extensions=client_extensions,host=handshake_flow.request.host,target=handshake_flow.request.path)
# send()方法只会构造一个适用于对应conn的数据,并不会真正发送数据,recv_data()会将信息解码,需要通过next(conn.events())获取解码后数据
    # 按上所说,下面两行代码的操作是将握手请求按client编码后发送给server编码器,然后让server编码器解码
    data = self.connections[self.server_conn].send(request)
self.connections[self.client_conn].receive_data(data)
event = next(self.connections[self.client_conn].events())
assert isinstance(event, events.Request)
# 返回给客户端接受连接响应
    data = self.connections[self.client_conn].send(AcceptConnection(extensions=server_extensions))
self.connections[self.server_conn].receive_data(data)
assert isinstance(next(self.connections[self.server_conn].events()), events.AcceptConnection)

WebSocketLayer 实例的_call_方法负责处理后续 websocket 通信文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的call方法,省略部分代码"""
def __call__(self):
self.flow = WebSocketFlow(self.client_conn, self.server_conn, self.handshake_flow)
self.flow.metadata['websocket_handshake'] = self.handshake_flow.id
self.handshake_flow.metadata['websocket_flow'] = self.flow.id
# 调用addons中的websocket_start(self, flow)对flow进行处理
    self.channel.ask("websocket_start", self.flow)
conns = [c.connection for c in self.connections.keys()]
close_received = False
try:
while not self.channel.should_exit.is_set():
# 往client或server插入信息,self.flow._inject_messages_client/self.flow._inject_messages_server是队列,后续实现在连接中主动发消息就是通过往队列中插入数据实现
            self._inject_messages(self.client_conn, self.flow._inject_messages_client)
self._inject_messages(self.server_conn, self.flow._inject_messages_server)
# select监视原http的client和server连接的可读事件
            r = tcp.ssl_read_select(conns, 0.1)
for conn in r:
source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn
other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn
is_server = (source_conn == self.server_conn)
frame = websockets.Frame.from_file(source_conn.rfile)
# 将从conn中获取的数据放入编解码器,此方法并没有返回值,所以data是None
                data = self.connections[source_conn].receive_data(bytes(frame))
# data是None,不解此举有何意义
                source_conn.send(data)
if close_received:
return
# 处理编解码器中解码后的数据,event由pop取出,后续不会再用到。
                for event in self.connections[source_conn].events():
if not self._handle_event(event, source_conn, other_conn, is_server):
if not close_received:
close_received = True
except (socket.error, exceptions.TcpException, SSL.Error) as e:
s = 'server' if is_server else 'client'
self.flow.error = flow.Error("WebSocket connection closed unexpectedly by {}: {}".format(s, repr(e)))
# 调用addons中的websocket_start(self, flow)对flow进行处理
        self.channel.tell("websocket_start", self.flow)
finally:
self.flow.ended = True
# 调用addons中的websocket_end(self, flow)对flow进行处理
        self.channel.tell("websocket_end", self.flow)

WebSocketLayer 实例中处理 Message Event 的方法文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/protocol/websocket.py
"""WebSocketLayer类的_handle_message方法,_handle_event中,若isinstance(event, events.Message),则会调用此函数"""
def _handle_message(self, event, source_conn, other_conn, is_server):
fb = self.server_frame_buffer if is_server else self.client_frame_buffer
fb.append(event.data)
if event.message_finished:
original_chunk_sizes = [len(f) for f in fb]
if isinstance(event, events.TextMessage):
message_type = wsproto.frame_protocol.Opcode.TEXT
payload = ''.join(fb)
else:
message_type = wsproto.frame_protocol.Opcode.BINARY
payload = b''.join(fb)
fb.clear()
websocket_message = WebSocketMessage(message_type, not is_server, payload)
length = len(websocket_message.content)
self.flow.messages.append(websocket_message)
# 调用addons中的websocket_message(self, flow)对flow进行处理
        self.channel.ask("websocket_message", self.flow)
# WebsocketMessage的属性killed用于判断该信息是否需要被转发,可在websocket_message函数中调用message的kill()方法置为True
        if not self.flow.stream and not websocket_message.killed:
def get_chunk(payload):
if len(payload) == length:
# message has the same length, we can reuse the same sizes
                    pos = 0
for s in original_chunk_sizes:
yield (payload[pos:pos + s], True if pos + s == length else False)
pos += s
else:
# just re-chunk everything into 4kB frames
                    # header len = 4 bytes without masking key and 8 bytes with masking key
                    chunk_size = 4092 if is_server else 4088
chunks = range(0, len(payload), chunk_size)
for i in chunks:
yield (payload[i:i + chunk_size], True if i + chunk_size >= len(payload) else False)
# 将收到的信息重新编码后向对端发送
            for chunk, final in get_chunk(websocket_message.content):
data = self.connections[other_conn].send(Message(data=chunk, message_finished=final))
other_conn.send(data)
if self.flow.stream:
data = self.connections[other_conn].send(Message(data=event.data, message_finished=event.message_finished))
other_conn.send(data)
return True
  • Tcp 数据处理源码分析

TCP 数据处理触发条件文章源自玩技e族-https://www.playezu.com/246905.html

# mitmproxy/proxy/root_context.py
"""RootContext类_next_layer方法,省略部分代码"""
"""
4. Check for --tcp
判断Option中tcp_hosts, 类型是一个列表,包含需要转换成tcp流信息的server address正则表达式,例如['192.168.d+.d+']
"""
if self.config.check_tcp(top_layer.server_conn.address):
return protocol.RawTCPLayer(top_layer)
"""
6. Check for raw tcp mode
判断Option中rawtcp,类型是bool,若为true,则将不能处理的流转换成tcp流处理,建议开启,默认是false
"""
is_ascii = (
len(d) == 3 and
# expect A-Za-z
    all(65 <= x <= 90 or 97 <= x <= 122 for x in d)
)
if self.config.options.rawtcp and not is_ascii:
return protocol.RawTCPLayer(top_layer)

TCP 信息处理 RawTCPLayer 类源码文章源自玩技e族-https://www.playezu.com/246905.html

class RawTCPLayer(base.Layer):
chunk_size = 4096
def __init__(self, ctx, ignore=False):
self.ignore = ignore
super().__init__(ctx)
def __call__(self):
self.connect()
if not self.ignore:
f = tcp.TCPFlow(self.client_conn, self.server_conn, self)
# 调用addons中的tcp_start(self, flow)对flow进行处理
            self.channel.ask("tcp_start", f)
# 创建一个长度为4096的空bytearray
        buf = memoryview(bytearray(self.chunk_size))
client = self.client_conn.connection
server = self.server_conn.connection
conns = [client, server]
# https://github.com/openssl/openssl/issues/6234
        for conn in conns:
if isinstance(conn, SSL.Connection) and hasattr(SSL._lib, "SSL_clear_mode"):
SSL._lib.SSL_clear_mode(conn._ssl, SSL._lib.SSL_MODE_AUTO_RETRY)
try:
while not self.channel.should_exit.is_set():
r = mitmproxy.net.tcp.ssl_read_select(conns, 10)
for conn in r:
dst = server if conn == client else client
try:
# 将从conn中recv的数据存入buf,返回size
                        size = conn.recv_into(buf, self.chunk_size)
except (SSL.WantReadError, SSL.WantWriteError):
continue
if not size:
conns.remove(conn)
# Shutdown connection to the other peer
                        if isinstance(conn, SSL.Connection):
# We can't half-close a connection, so we just close everything here.
                            # Sockets will be cleaned up on a higher level.
                            return
else:
dst.shutdown(socket.SHUT_WR)
if len(conns) == 0:
return
continue
# 将recv的数据转成TCPMessage
                    tcp_message = tcp.TCPMessage(dst == server, buf[:size].tobytes())
if not self.ignore:
f.messages.append(tcp_message)
# 调用addons中的tcp_message(self, flow)对flow进行处理
                        self.channel.ask("tcp_message", f)
# 发送tcp_message中的content
                    dst.sendall(tcp_message.content)
except (socket.error, exceptions.TcpException, SSL.Error) as e:
if not self.ignore:
f.error = flow.Error("TCP connection closed unexpectedly: {}".format(repr(e)))
# 调用addons中的tcp_error(self, flow)对flow进行处理
                self.channel.tell("tcp_error", f)
finally:
if not self.ignore:
# 调用addons中的tcp_end(self, flow)对flow进行处理
                self.channel.tell("tcp_end", f)

三、开启 mitmproxy 并加载 addon

首先需要安装两个库:mitmproxy 和 mitmdump文章源自玩技e族-https://www.playezu.com/246905.html

1、编写 websocket 的 addon
"""
简略版用于websocket的Addon
后续改进可以增加判断host,避免拦截到不需要处理的连接,或者将Queue改成redis
"""
import asyncio
from multiprocessing import Queue
import mitmproxy.websocket
class WebsocketAddon:
def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
self._input_q = input_q
self._output_q = output_q
async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
while not flow.ended and not flow.error:
# 增加间隔,否则会阻塞event
            await asyncio.sleep(0.5)
while not self._input_q.empty():
# WebSocketFlow的内置方法,用于主动插入信息,这里我只主动插入client->server的信息
                flow.inject_message(flow.server_conn, self._input_q.get())
def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
# 加入发送websocket消息的task,参考了官方的示例脚本,地址:https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message
        asyncio.get_event_loop().create_task(self.inject(flow))
def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
message = flow.messages[-1]
self._output_q.put({
'from_client': message.from_client,
'data': message.content
})
# message.kill()可以让Layer不转发该条信息,我这里的目的是拦截掉所有客户端发送的数据,由自己编辑后再发送
        if message.from_client:
message.kill()
2、编写 socket 的 addon
"""
简略版用于socket的Addon
和websocket版差别不大,插入数据和拦截数据有区别
"""
import asyncio
from multiprocessing import Queue
import mitmproxy.tcp
class SocketAddon:
def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()):
self._input_q = input_q
self._output_q = output_q
async def inject(self, flow: mitmproxy.websocket.WebSocketFlow):
while flow.live and not flow.error:
await asyncio.sleep(0.5)
while not self._input_q.empty():
# 直接向对端发送socket信息完成插入
                    flow.server_conn.connection.sendall(payload)
def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow):
asyncio.get_event_loop().create_task(self.inject(flow))
def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow):
message = flow.messages[-1]
self._output_q.put({
'from_client': message.from_client,
'data': message.content
})
if message.from_client:
# socket发送0字节,conn.sendall(b'')将不会发送任何数据
           message.content = b''
3、开启 mitmproxy 并完成处理函数
import multiprocessing
from mitmdump import Options, DumpMaster
def start_proxy(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
addons = [
# 自主选择是使用Websocket还是Socket
            WebsocketAddon(input_q, output_q)
# SocketAddon(input_q, output_q)
        ]
opts = Options(listen_host='0.0.0.0', listen_port=1080, scripts=None, mode='socks5',
rawtcp=True,
# 需要转换tcp数据成的ip正则
                   tcp_hosts=['.*'],
flow_detail=0, termlog_verbosity='error', show_clientconnect_log=True, )
m = DumpMaster(opts)
m.addons.add(*addons)
m.run()
def deal_client_message_func(client_message: [bytes, str]):
if type(client_message) is bytes:
return client_message.decode('utf-8').encode('gbk')
elif type(client_message) is str:
return f"test {client_message}"
def simple_handel_message_func(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()):
while True:
if not output_q.empty()
message = output_q.get()
print(f"{'客户端' if message['from_client'] else '服务端'} 包内容:{message['data']}")
if message['from_client']:
input_q.push(deal_client_message_func(message['data']))
def main():
input_queue = multiprocessing.Queue()
output_queue = multiprocessing.Queue()
# 使用子进程启动proxy
    multiprocessing.Process(target=start_proxy, args=(input_queue, output_queue)).start()
simple_handel_message_func(input_queue, output_queue)

四、总结

​ 对于想实现开头文中所提到的功能还需要实现客户端,以及对于 protobuf 协议的编解码,这里限于篇幅不再讨论,后续有机会再更新。

​ 另外,之所以 mitmproxy 选择 socks5 模式,是因为 socks 协议支持代理除了 http、https 以外更多种类的协议,windows 开启 socks5 代理的工具:proxifer,android 开启 socks5 代理工具:postern。

 最后更新:2024-8-21
评论  9  访客  9
    • RayfordChods
      RayfordChods 1

      又来啦

      • RayfordChods
        RayfordChods 1

        • RayfordChods
          RayfordChods 1

          • Jamesanymn
            Jamesanymn 0

            [b] 资料描述: [/b]
            [url=.webp[/img][/url]

            [url=https://www.ebay.com/itm/395306559416] 在eBay上订购 [/url]

            [b]Balanset-1a设备的描述[/b]

            Balanset-1a是一种紧凑的双通道解决方案,用于平衡和振动分析旋转机构,包括破碎机,风扇,复盖机,切碎机,轴,离心机,涡轮机和其他旋转设备。

            [b]基本特征和能力[/b]

            [b]振动测量模式[/b]

            转速计:精确测量转速(RPM)。
            相位:测量振动信号的相位角以进行详细分析。
            1x振动:主要频率分量的测量和分析。
            FFT频谱:深入分析振动频谱。
            整体振动:监测整体振动水平。
            测量日志:存储数据以供后续分析。

            [b]平衡功能[/b]

            单平面平衡:调整转子在一个平面,以减少振动.
            双平面平衡:转子在两个平面内的动态调整。
            极坐标图:在极坐标图上显示不平衡,以实现精确的重量定位。
            上次会话恢复:启用上一个平衡会话的延续。
            公差计算器(ISO1940):根据ISO1940标准计算允许的不平衡。
            砂轮平衡:使用三个配重平衡砂轮。

            [b]可视化和图表[/b]

            整体图表:显示整体振动水平。
            1x图形:显示主频率下的振动模式。
            谐波图:分析谐波频率的存在和影响。
            频谱图:显示频谱以供详细检查。

            [b]附加能力[/b]

            存档:存储和重新访问过去的平衡数据。
            报告:自动生成详细的平衡报告。
            重新平衡:使用存储的会话数据启用重新平衡。
            批量生产平衡:适用于批量生产环境中的转子平衡。

            [b]包装内容[/b]

            Balanset-1a套餐包括:

            接口连通性的测量单元。
            一对振动传感器。
            带磁性支架的光学传感器(激光转速表)。
            电子天平秤。
            软件(不包括计算机,可作为选项)。
            塑料运输箱。

            [b]优点[/b]

            Balanset-1a易于使用、经济且用途广泛,可为各种平衡和振动分析需求提供解决方案。

            该设备的效率和对特定任务的适应性使其适合制造和维护需求。

            定价:US1 1,999。

            这种多功能设备满足小型车间和大型企业的需求,提供灵活,通用的使用。

            [url=https://www.ebay.com/itm/395306559416] 在eBay上订购 [/url]
            Instagram: https://www.instagram.com/vibromera_ou/
            Youtube: https://www.youtube.com/@vibromera
            Website: https://vibromera.eu/

            • Raymondcex
              Raymondcex 0

              Get millions upon millions of instant leads for your organization to jumpstart your advertising campaign. Make use of the lists an infinite number of times. We have been delivering companies and market research firms with information since 2012. Direct Marketing

              • Cuinn
                Cuinn 9

                想了解一下是 websocket+ protobuf 这种私有协议怎么去进行抓包解析,望楼主回复或出帖子分享,万分感谢!

                • 小趴菜
                  小趴菜 9

                  仅楼主可见

                  • 自然生长
                    自然生长 9

                    应该是命令行工具没法修改重播吧,官方 addon example 里还有修改 websocket 的例子呢,但原始 tcp 官方没有例子,但通过看源代码也能实现。

                    • 陈随想
                      陈随想 9

                      https://docs.mitmproxy.org/stable/concepts-protocols/
                      看了下文档,发现 V7 文档写着还不能修改重播?有点奇怪。另外看 issue 问题似乎也挺多的。

                    匿名

                    发表评论

                    匿名网友
                    :?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:
                    确定

                    拖动滑块以完成验证