mitmproxy + python 实现游戏协议测试
本文侧重介绍如何使用 python 和 mitmproxy 实现拦截数据包、重发数据包,以及解析 protobuf 数据内容,对于相关依赖的安装不做介绍。
一、游戏协议安全测试内容
参考https://testerhome.com/topics/29053,这篇文章讲的很清楚。文章源自玩技e族-https://www.playezu.com/246905.html
二、实现原理
想直接使用的同学可以跳到第三部分。文章源自玩技e族-https://www.playezu.com/246905.html
mitmproxy 作为代理,可以获取客户端与服务端通信的数据,并且可以拦截、修改和自主发送数据。当配合其证书使用时,还可以解密 wss 连接中的 websocket 数据。文章源自玩技e族-https://www.playezu.com/246905.html
- Websotcket 数据处理源码分析
在 http 代理的过程中若发现 upgrade websocket 请求,则创建 WebSocketLayer 实例,并调用其_call_方法。文章源自玩技e族-https://www.playezu.com/246905.html
# mitmproxy/proxy/protocol/http.py """以下为Httplayer的_process_flow方法的部分代码""" if f.response.status_code == 101: # Handle a successful HTTP 101 Switching Protocols Response, # received after e.g. a WebSocket upgrade request. # Check for WebSocket handshake is_websocket = ( websockets.check_handshake(f.request.headers) and websockets.check_handshake(f.response.headers) ) if is_websocket and not self.config.options.websocket: self.log( "Client requested WebSocket connection, but the protocol is disabled.", "info" ) if is_websocket and self.config.options.websocket: layer = WebSocketLayer(self, f) else: layer = self.ctx.next_layer(self) layer()
WebSocketLayer 初始化时会创建用于此次 websocket 通信的编解码器。文章源自玩技e族-https://www.playezu.com/246905.html
# mitmproxy/proxy/protocol/websocket.py """WebSocketLayer类的init方法,省略部分代码""" def __init__(self, ctx, handshake_flow): super().__init__(ctx) self.handshake_flow = handshake_flow self.connections: dict[object, WSConnection] = {} client_extensions = [] server_extensions = [] # 判断交互数据是否使用deflate压缩 if 'Sec-WebSocket-Extensions' in handshake_flow.response.headers: if PerMessageDeflate.name in handshake_flow.response.headers['Sec-WebSocket-Extensions']: client_extensions = [PerMessageDeflate()] server_extensions = [PerMessageDeflate()] # self.client_conn和self.server_conn继承自ctx,即原http的client和server,原理为父类的__getattr__(self, name)方法返回的是getattr(self.ctx, name)。WSConnection是一个websocket协议编解码器,实际不会发送任何网络IO,文档地址:https://python-hyper.org/projects/wsproto/en/latest/basic-usage.html # 负责和解码server收到信息和编码server发送的信息 self.connections[self.client_conn] = WSConnection(ConnectionType.SERVER) # 负责和解码client收到信息和编码client发送的信息 self.connections[self.server_conn] = WSConnection(ConnectionType.CLIENT) # 构造发送给Server的websocket的握手请求 request = Request(extensions=client_extensions,host=handshake_flow.request.host,target=handshake_flow.request.path) # send()方法只会构造一个适用于对应conn的数据,并不会真正发送数据,recv_data()会将信息解码,需要通过next(conn.events())获取解码后数据 # 按上所说,下面两行代码的操作是将握手请求按client编码后发送给server编码器,然后让server编码器解码 data = self.connections[self.server_conn].send(request) self.connections[self.client_conn].receive_data(data) event = next(self.connections[self.client_conn].events()) assert isinstance(event, events.Request) # 返回给客户端接受连接响应 data = self.connections[self.client_conn].send(AcceptConnection(extensions=server_extensions)) self.connections[self.server_conn].receive_data(data) assert isinstance(next(self.connections[self.server_conn].events()), events.AcceptConnection)
WebSocketLayer 实例的_call_方法负责处理后续 websocket 通信文章源自玩技e族-https://www.playezu.com/246905.html
# mitmproxy/proxy/protocol/websocket.py """WebSocketLayer类的call方法,省略部分代码""" def __call__(self): self.flow = WebSocketFlow(self.client_conn, self.server_conn, self.handshake_flow) self.flow.metadata['websocket_handshake'] = self.handshake_flow.id self.handshake_flow.metadata['websocket_flow'] = self.flow.id # 调用addons中的websocket_start(self, flow)对flow进行处理 self.channel.ask("websocket_start", self.flow) conns = [c.connection for c in self.connections.keys()] close_received = False try: while not self.channel.should_exit.is_set(): # 往client或server插入信息,self.flow._inject_messages_client/self.flow._inject_messages_server是队列,后续实现在连接中主动发消息就是通过往队列中插入数据实现 self._inject_messages(self.client_conn, self.flow._inject_messages_client) self._inject_messages(self.server_conn, self.flow._inject_messages_server) # select监视原http的client和server连接的可读事件 r = tcp.ssl_read_select(conns, 0.1) for conn in r: source_conn = self.client_conn if conn == self.client_conn.connection else self.server_conn other_conn = self.server_conn if conn == self.client_conn.connection else self.client_conn is_server = (source_conn == self.server_conn) frame = websockets.Frame.from_file(source_conn.rfile) # 将从conn中获取的数据放入编解码器,此方法并没有返回值,所以data是None data = self.connections[source_conn].receive_data(bytes(frame)) # data是None,不解此举有何意义 source_conn.send(data) if close_received: return # 处理编解码器中解码后的数据,event由pop取出,后续不会再用到。 for event in self.connections[source_conn].events(): if not self._handle_event(event, source_conn, other_conn, is_server): if not close_received: close_received = True except (socket.error, exceptions.TcpException, SSL.Error) as e: s = 'server' if is_server else 'client' self.flow.error = flow.Error("WebSocket connection closed unexpectedly by {}: {}".format(s, repr(e))) # 调用addons中的websocket_start(self, flow)对flow进行处理 self.channel.tell("websocket_start", self.flow) finally: self.flow.ended = True # 调用addons中的websocket_end(self, flow)对flow进行处理 self.channel.tell("websocket_end", self.flow)
WebSocketLayer 实例中处理 Message Event 的方法文章源自玩技e族-https://www.playezu.com/246905.html
# mitmproxy/proxy/protocol/websocket.py """WebSocketLayer类的_handle_message方法,_handle_event中,若isinstance(event, events.Message),则会调用此函数""" def _handle_message(self, event, source_conn, other_conn, is_server): fb = self.server_frame_buffer if is_server else self.client_frame_buffer fb.append(event.data) if event.message_finished: original_chunk_sizes = [len(f) for f in fb] if isinstance(event, events.TextMessage): message_type = wsproto.frame_protocol.Opcode.TEXT payload = ''.join(fb) else: message_type = wsproto.frame_protocol.Opcode.BINARY payload = b''.join(fb) fb.clear() websocket_message = WebSocketMessage(message_type, not is_server, payload) length = len(websocket_message.content) self.flow.messages.append(websocket_message) # 调用addons中的websocket_message(self, flow)对flow进行处理 self.channel.ask("websocket_message", self.flow) # WebsocketMessage的属性killed用于判断该信息是否需要被转发,可在websocket_message函数中调用message的kill()方法置为True if not self.flow.stream and not websocket_message.killed: def get_chunk(payload): if len(payload) == length: # message has the same length, we can reuse the same sizes pos = 0 for s in original_chunk_sizes: yield (payload[pos:pos + s], True if pos + s == length else False) pos += s else: # just re-chunk everything into 4kB frames # header len = 4 bytes without masking key and 8 bytes with masking key chunk_size = 4092 if is_server else 4088 chunks = range(0, len(payload), chunk_size) for i in chunks: yield (payload[i:i + chunk_size], True if i + chunk_size >= len(payload) else False) # 将收到的信息重新编码后向对端发送 for chunk, final in get_chunk(websocket_message.content): data = self.connections[other_conn].send(Message(data=chunk, message_finished=final)) other_conn.send(data) if self.flow.stream: data = self.connections[other_conn].send(Message(data=event.data, message_finished=event.message_finished)) other_conn.send(data) return True
- Tcp 数据处理源码分析
TCP 数据处理触发条件文章源自玩技e族-https://www.playezu.com/246905.html
# mitmproxy/proxy/root_context.py """RootContext类_next_layer方法,省略部分代码""" """ 4. Check for --tcp 判断Option中tcp_hosts, 类型是一个列表,包含需要转换成tcp流信息的server address正则表达式,例如['192.168.d+.d+'] """ if self.config.check_tcp(top_layer.server_conn.address): return protocol.RawTCPLayer(top_layer) """ 6. Check for raw tcp mode 判断Option中rawtcp,类型是bool,若为true,则将不能处理的流转换成tcp流处理,建议开启,默认是false """ is_ascii = ( len(d) == 3 and # expect A-Za-z all(65 <= x <= 90 or 97 <= x <= 122 for x in d) ) if self.config.options.rawtcp and not is_ascii: return protocol.RawTCPLayer(top_layer)
TCP 信息处理 RawTCPLayer 类源码文章源自玩技e族-https://www.playezu.com/246905.html
class RawTCPLayer(base.Layer): chunk_size = 4096 def __init__(self, ctx, ignore=False): self.ignore = ignore super().__init__(ctx) def __call__(self): self.connect() if not self.ignore: f = tcp.TCPFlow(self.client_conn, self.server_conn, self) # 调用addons中的tcp_start(self, flow)对flow进行处理 self.channel.ask("tcp_start", f) # 创建一个长度为4096的空bytearray buf = memoryview(bytearray(self.chunk_size)) client = self.client_conn.connection server = self.server_conn.connection conns = [client, server] # https://github.com/openssl/openssl/issues/6234 for conn in conns: if isinstance(conn, SSL.Connection) and hasattr(SSL._lib, "SSL_clear_mode"): SSL._lib.SSL_clear_mode(conn._ssl, SSL._lib.SSL_MODE_AUTO_RETRY) try: while not self.channel.should_exit.is_set(): r = mitmproxy.net.tcp.ssl_read_select(conns, 10) for conn in r: dst = server if conn == client else client try: # 将从conn中recv的数据存入buf,返回size size = conn.recv_into(buf, self.chunk_size) except (SSL.WantReadError, SSL.WantWriteError): continue if not size: conns.remove(conn) # Shutdown connection to the other peer if isinstance(conn, SSL.Connection): # We can't half-close a connection, so we just close everything here. # Sockets will be cleaned up on a higher level. return else: dst.shutdown(socket.SHUT_WR) if len(conns) == 0: return continue # 将recv的数据转成TCPMessage tcp_message = tcp.TCPMessage(dst == server, buf[:size].tobytes()) if not self.ignore: f.messages.append(tcp_message) # 调用addons中的tcp_message(self, flow)对flow进行处理 self.channel.ask("tcp_message", f) # 发送tcp_message中的content dst.sendall(tcp_message.content) except (socket.error, exceptions.TcpException, SSL.Error) as e: if not self.ignore: f.error = flow.Error("TCP connection closed unexpectedly: {}".format(repr(e))) # 调用addons中的tcp_error(self, flow)对flow进行处理 self.channel.tell("tcp_error", f) finally: if not self.ignore: # 调用addons中的tcp_end(self, flow)对flow进行处理 self.channel.tell("tcp_end", f)
三、开启 mitmproxy 并加载 addon
首先需要安装两个库:mitmproxy 和 mitmdump文章源自玩技e族-https://www.playezu.com/246905.html
1、编写 websocket 的 addon
""" 简略版用于websocket的Addon 后续改进可以增加判断host,避免拦截到不需要处理的连接,或者将Queue改成redis """ import asyncio from multiprocessing import Queue import mitmproxy.websocket class WebsocketAddon: def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()): self._input_q = input_q self._output_q = output_q async def inject(self, flow: mitmproxy.websocket.WebSocketFlow): while not flow.ended and not flow.error: # 增加间隔,否则会阻塞event await asyncio.sleep(0.5) while not self._input_q.empty(): # WebSocketFlow的内置方法,用于主动插入信息,这里我只主动插入client->server的信息 flow.inject_message(flow.server_conn, self._input_q.get()) def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow): # 加入发送websocket消息的task,参考了官方的示例脚本,地址:https://docs.mitmproxy.org/stable/addons-examples/#websocket-inject-message asyncio.get_event_loop().create_task(self.inject(flow)) def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow): message = flow.messages[-1] self._output_q.put({ 'from_client': message.from_client, 'data': message.content }) # message.kill()可以让Layer不转发该条信息,我这里的目的是拦截掉所有客户端发送的数据,由自己编辑后再发送 if message.from_client: message.kill()
2、编写 socket 的 addon
""" 简略版用于socket的Addon 和websocket版差别不大,插入数据和拦截数据有区别 """ import asyncio from multiprocessing import Queue import mitmproxy.tcp class SocketAddon: def __init__(self, input_q: Queue = Queue(), output_q: Queue = Queue()): self._input_q = input_q self._output_q = output_q async def inject(self, flow: mitmproxy.websocket.WebSocketFlow): while flow.live and not flow.error: await asyncio.sleep(0.5) while not self._input_q.empty(): # 直接向对端发送socket信息完成插入 flow.server_conn.connection.sendall(payload) def websocket_start(self, flow: mitmproxy.websocket.WebSocketFlow): asyncio.get_event_loop().create_task(self.inject(flow)) def websocket_message(self, flow: mitmproxy.websocket.WebSocketFlow): message = flow.messages[-1] self._output_q.put({ 'from_client': message.from_client, 'data': message.content }) if message.from_client: # socket发送0字节,conn.sendall(b'')将不会发送任何数据 message.content = b''
3、开启 mitmproxy 并完成处理函数
import multiprocessing from mitmdump import Options, DumpMaster def start_proxy(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()): addons = [ # 自主选择是使用Websocket还是Socket WebsocketAddon(input_q, output_q) # SocketAddon(input_q, output_q) ] opts = Options(listen_host='0.0.0.0', listen_port=1080, scripts=None, mode='socks5', rawtcp=True, # 需要转换tcp数据成的ip正则 tcp_hosts=['.*'], flow_detail=0, termlog_verbosity='error', show_clientconnect_log=True, ) m = DumpMaster(opts) m.addons.add(*addons) m.run() def deal_client_message_func(client_message: [bytes, str]): if type(client_message) is bytes: return client_message.decode('utf-8').encode('gbk') elif type(client_message) is str: return f"test {client_message}" def simple_handel_message_func(input_q: multiprocessing.Queue(), output_q: multiprocessing.Queue()): while True: if not output_q.empty() message = output_q.get() print(f"{'客户端' if message['from_client'] else '服务端'} 包内容:{message['data']}") if message['from_client']: input_q.push(deal_client_message_func(message['data'])) def main(): input_queue = multiprocessing.Queue() output_queue = multiprocessing.Queue() # 使用子进程启动proxy multiprocessing.Process(target=start_proxy, args=(input_queue, output_queue)).start() simple_handel_message_func(input_queue, output_queue)
四、总结
对于想实现开头文中所提到的功能还需要实现客户端,以及对于 protobuf 协议的编解码,这里限于篇幅不再讨论,后续有机会再更新。
另外,之所以 mitmproxy 选择 socks5 模式,是因为 socks 协议支持代理除了 http、https 以外更多种类的协议,windows 开启 socks5 代理的工具:proxifer,android 开启 socks5 代理工具:postern。
台湾省新北市 12F
第一
俄罗斯 11F
便携式平衡器及振动分析仪Balanset-1a