基于angr和idapython的非标准OLLVM-fla反混淆分析还原
本文章主要是采用angr框架和idapython脚本相结合,实现对非标准ollvm-fla控制流平坦化反混淆的分析和处理,以及对angr和idapython相关api进项讲解。
要解决fla的混淆,需要实现三大步骤:
步骤1,3相对简单,这里可以看大家的喜好,愿意用angr也是可以的,我倾向于在分析fla混淆的时候能够实时的观测到修改块的实时现象,我就采用了idapython脚本处理。
这里主要说的是步骤2,比如,当我们拿到了所有真实块,我们应该怎么去找到真是块之间的连接关系。遇到混淆程度不高的,可以尝试体力活修改。那么遇到下面的这种混淆,请问阁下该如何应对。

所以,这里我们需要用到angr的一个强大功能:符号执行(angr更多的原理需自行百度查阅)
这里又衍生出一个新问题,什么是符号执行?
它的运行和unidbg类似,都是通过模拟执行,但是不同的是,unidbg的模拟执行需要传入具体的数值,而angr可以不需要。
比如下面一个简单的加法函数,unidbg要模拟执行它,就需要传入a和b具体的数值,假设传入a=1,b=2,这里unidbg的执行结果就是3。
而angr不需要,它执行直接传入符号a和b,注意这里是符号,不是具体的值,最后通过模拟执行,输出结果a+b。
所以我们就是利用angr符号执行的特性,用到其中主要的路径探索的能力,当执行到第一个真实块A的时候 ,把它标记为主块,然后让其继续运行,当它碰到的第一个真实块B的时候,这里就是A的后继块,那么A和B的连接关系就被我们给找到了。这里也要注意到angr的一个路径爆炸的问题,后面会说怎么去规避。
此函数作为本章内容的分析目标,这是一个非标准的ollvm,其有两个循环头

反混淆的第一步,找到函数的所有真实块。用idapython脚本处理
通过循环头我们可以直接获取到所有对应的真实块
采用广度搜索的原理实现循环头的获取
这里要处理标准fla和非标准fla的获取方式
非标准fla的循环头地址和汇聚块的地址是相等的

标准fla的循环头地址和汇聚块的地址是不相等的,其循环头的前驱只有两个基本块,一个是序言块,一个是汇聚块。

有了汇聚块,就可以通过block所属的preds()方法获得所有的前驱块,也就是它的真实块。标准fla还得注意循环头的前驱块中,需要保留序言块
这个真实块的获取也需要注意,有的会出现多个基本块的尾部指令是相同的,ida会把它单独提取出来共享,如果我们直接使用0x42288的基本块作为真实块,会出现真实块遗漏,导致反混淆的代码不全。所以这里需要取0x42288的所有前驱作为真实块。

ret块的获取
像标准fla,我们就需要0xA66C作为ret块

非标准fla,我们就需要0x42AB0作为ret块,为什么不选择0x42AC4为ret块呢?有两个原因:1.0x42AB0块中出有变量初始化的指令,如果直接选择0x42AC4,会导致反汇编后的真实代码遗漏。2.0x42AE4这条分支也没有后继。
从这里也可以引申出为社么标准fla要选择0xA66C作为ret块,如果选择0x96EC,因为0x9700分支有后继,还是会存在混淆代码,导致反混淆无效,虽然去除了一部分,但是还是无法直观分析代码。



调用后,通过颜色标记了所有的真实块
反混淆的第二步,找到函数的所有真实块连接关系。用angr处理
这里就解决前面说到的在探索路径时遇到的路径爆炸的问题,一般常出现在一个循环里带一个if条件判断,这个时候angr就会由一条路径分裂出两条路径,这两条路径分别是if为true时的路径,和为false的路径,然后继续执行循环循环,此时2条路径就会变成4条路径,继续循环,4条路径就会出现8条路径......所以遇到这种情况,路径会以指数的形式增加,最后路径会膨胀到非常大,导致程序卡死。

real_blocks是我上面获取到的所有真实块
所以接下来我采用的方式是,不让它整个程序一次性执行完,而是每次取一个真实块地址real_blocks[0],作为主块A,让其运行,当再次遇到的一个B地址在我保存的真实块地址里时我就停止运行,把这个块连接A->B保存下来。然后再取real_blocks[1]为主块,从头开始继续运行,再次遇到的一个地址在真实块地址里就停止运行。重复这个操作,我也就不用担心路径爆炸的问题,并且也会获得所有真实块的连接关系。
在一般情况下,加载程序都会将 auto_load_libs
置为 False
,这是因为如果将外部库一并加载,那么 Angr
就也会跟着一起去分析那些库了,这对性能的消耗是比较大的。
在序言块里会有许多寄存器的赋值操作,这些都是一些基本块的条件判断,通过寄存器值判断应该走哪条路径

典型的就是以基本块最后一条指令的B.EQ,B.GT等等作为判断,这些都是子分发器。

通过hook操作,当程序执行到主序言块的最后一条指令时,将pc寄存器赋值为真实块的值,这样可以避免执行大量的无用指令,减少性能消耗,节约更多的时间。
这里主要处理主序言块所有的真实块的操作,相关流程:

这个是第二个循环头的序言块,后面就叫它子序言块


这里也是一样用hook操作,改变pc寄存器的值。但是这里多了一步hook,多的 proj.hook(first_block_last_ins.address, jump_to_child_prologue_address, first_block_last_ins.size)这个hook是为了初始化子序言块里的寄存器值,因为子序言块0x42258里也有一些条件判断的寄存器赋值操作。
所以这里的流程是:

构建流程分析完了,这里就直接贴上相关脚本,脚本里也注释了相关代码的作用

反混淆的第三步,重建真实块之间的控制流。用idapython处理
重建控制流主要对两种方式进行处理
1.带csel指令的分支跳转

2.无分支跳转

脚本里写好了相关注释,这里直接贴代码

查看重建结果,可以看到已经反混淆成功了

针对上述的脚本最后也是归纳到一起了,内容较多,就不贴代码了,脚本文件会放置在github地址下载
使用的时候只需要提供函数地址即可

1.cfg图


2.cfg图


ollvm-fla的混淆围绕三大步骤展开可以实现反混淆,脚本不是全部通用,如果遇到混淆程度非常复杂的,还得需要针对性去完善相关功能。
分析样本可以用上篇文章的
相关文件下载地址:81aK9s2c8@1M7s2y4Q4x3@1q4Q4x3V1k6Q4x3V1k6Y4K9i4c8Z5N6h3u0Q4x3X3g2U0L8$3#2Q4x3V1k6B7K9i4g2@1K9h3q4F1y4U0j5$3i4K6u0r3P5r3c8W2k6X3I4S2i4K6u0W2k6$3W2@1
int
add(
int
a ,
int
b){
return
a
+
b;
}
int
add(
int
a ,
int
b){
return
a
+
b;
}
blocks
=
idaapi.FlowChart(idaapi.get_func(func_ea))
blocks
=
idaapi.FlowChart(idaapi.get_func(func_ea))
def
find_loop_heads(func):
loop_heads
=
set
()
queue
=
deque()
block
=
get_block_by_address(func)
queue.append((block, []))
while
len
(queue) >
0
:
cur_block, path
=
queue.popleft()
if
cur_block.start_ea
in
path:
loop_heads.add(cur_block.start_ea)
continue
path
=
path
+
[cur_block.start_ea]
queue.extend((succ, path)
for
succ
in
cur_block.succs())
all_loop_heads
=
list
(loop_heads)
all_loop_heads.sort()
return
all_loop_heads
def
find_loop_heads(func):
loop_heads
=
set
()
queue
=
deque()
block
=
get_block_by_address(func)
queue.append((block, []))
while
len
(queue) >
0
:
cur_block, path
=
queue.popleft()
if
cur_block.start_ea
in
path:
loop_heads.add(cur_block.start_ea)
continue
path
=
path
+
[cur_block.start_ea]
queue.extend((succ, path)
for
succ
in
cur_block.succs())
all_loop_heads
=
list
(loop_heads)
all_loop_heads.sort()
return
all_loop_heads
def
find_converge_addr(loop_head_addr):
converge_addr
=
None
block
=
get_block_by_address(loop_head_addr)
preds
=
block.preds()
pred_list
=
list
(preds)
if
len
(pred_list)
=
=
2
:
for
pred
in
pred_list:
tmp_list
=
list
(pred.preds())
if
len
(tmp_list) >
1
:
converge_addr
=
pred.start_ea
else
:
converge_addr
=
loop_head_addr
return
converge_addr
def
find_converge_addr(loop_head_addr):
converge_addr
=
None
block
=
get_block_by_address(loop_head_addr)
preds
=
block.preds()
pred_list
=
list
(preds)
if
len
(pred_list)
=
=
2
:
for
pred
in
pred_list:
tmp_list
=
list
(pred.preds())
if
len
(tmp_list) >
1
:
converge_addr
=
pred.start_ea
else
:
converge_addr
=
loop_head_addr
return
converge_addr
real_blocks
=
[]
if
loop_head_addr !
=
converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)
converge_block
=
get_block_by_address(converge_addr)
list_preds
=
list
(converge_block.preds())
for
pred_block
in
list_preds:
if
pred_block.start_ea
=
=
loop_head_addr:
continue
end_ea
=
pred_block.end_ea
last_ins_ea
=
idc.prev_head(end_ea)
mnem
=
idc.print_insn_mnem(last_ins_ea)
size
=
get_basic_block_size(pred_block)
if
size >
4
and
"B."
not
in
mnem:
start_ea
=
pred_block.start_ea
mnem
=
idc.print_insn_mnem(start_ea)
if
mnem
=
=
"CSEL"
:
csel_preds
=
pred_block.preds()
for
csel_pred
in
csel_preds:
real_blocks.append(csel_pred.start_ea)
else
:
real_blocks.append(pred_block.start_ea)
real_blocks
=
[]
if
loop_head_addr !
=
converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)
converge_block
=
get_block_by_address(converge_addr)
list_preds
=
list
(converge_block.preds())
for
pred_block
in
list_preds:
if
pred_block.start_ea
=
=
loop_head_addr:
continue
end_ea
=
pred_block.end_ea
last_ins_ea
=
idc.prev_head(end_ea)
mnem
=
idc.print_insn_mnem(last_ins_ea)
size
=
get_basic_block_size(pred_block)
if
size >
4
and
"B."
not
in
mnem:
start_ea
=
pred_block.start_ea
mnem
=
idc.print_insn_mnem(start_ea)
if
mnem
=
=
"CSEL"
:
csel_preds
=
pred_block.preds()
for
csel_pred
in
csel_preds:
real_blocks.append(csel_pred.start_ea)
else
:
real_blocks.append(pred_block.start_ea)
start_ea
=
pred_block.start_ea
mnem
=
idc.print_insn_mnem(start_ea)
if
mnem
=
=
"CSEL"
:
csel_preds
=
pred_block.preds()
for
csel_pred
in
csel_preds:
real_blocks.append(csel_pred.start_ea)
start_ea
=
pred_block.start_ea
mnem
=
idc.print_insn_mnem(start_ea)
if
mnem
=
=
"CSEL"
:
csel_preds
=
pred_block.preds()
for
csel_pred
in
csel_preds:
real_blocks.append(csel_pred.start_ea)
from
collections
import
deque
import
idaapi
import
idc
def
get_block_by_address(ea):
func
=
idaapi.get_func(ea)
blocks
=
idaapi.FlowChart(func)
for
block
in
blocks:
if
block.start_ea <
=
ea < block.end_ea:
return
block
return
None
def
find_loop_heads(func):
loop_heads
=
set
()
queue
=
deque()
block
=
get_block_by_address(func)
queue.append((block, []))
while
len
(queue) >
0
:
cur_block, path
=
queue.popleft()
if
cur_block.start_ea
in
path:
loop_heads.add(cur_block.start_ea)
continue
path
=
path
+
[cur_block.start_ea]
queue.extend((succ, path)
for
succ
in
cur_block.succs())
all_loop_heads
=
list
(loop_heads)
all_loop_heads.sort()
return
all_loop_heads
def
find_converge_addr(loop_head_addr):
converge_addr
=
None
block
=
get_block_by_address(loop_head_addr)
preds
=
block.preds()
pred_list
=
list
(preds)
if
len
(pred_list)
=
=
2
:
for
pred
in
pred_list:
tmp_list
=
list
(pred.preds())
if
len
(tmp_list) >
1
:
converge_addr
=
pred.start_ea
else
:
converge_addr
=
loop_head_addr
return
converge_addr
def
get_basic_block_size(bb):
return
bb.end_ea
-
bb.start_ea
def
add_block_color(ea):
block
=
get_block_by_address(ea)
curr_addr
=
block.start_ea
while
curr_addr <block.end_ea:
idc.set_color(curr_addr,idc.CIC_ITEM,
0xffcc33
)
curr_addr
=
idc.next_head(curr_addr)
def
del_func_color(curr_addr):
end_ea
=
idc.find_func_end(curr_addr)
while
curr_addr < end_ea:
idc.set_color(curr_addr, idc.CIC_ITEM,
0xffffffff
)
curr_addr
=
idc.next_head(curr_addr)
def
find_ret_block_addr(blocks):
for
block
in
blocks:
succs
=
block.succs()
succs_list
=
list
(succs)
end_ea
=
block.end_ea
last_ins_ea
=
idc.prev_head(end_ea)
mnem
=
idc.print_insn_mnem(last_ins_ea)
if
len
(succs_list)
=
=
0
:
if
mnem
=
=
"RET"
:
ori_ret_block
=
block
while
True
:
tmp_block
=
block.preds()
pred_list
=
list
(tmp_block)
if
len
(pred_list)
=
=
1
:
block
=
pred_list[
0
]
if
get_basic_block_size(block)
=
=
4
:
continue
else
:
break
else
:
break
block2
=
block
num
=
0
i
=
0
while
True
:
i
+
=
1
succs_block
=
block2.succs()
for
succ
in
succs_block:
child_succs
=
succ.succs()
succ_list
=
list
(child_succs)
if
len
(succ_list) !
=
0
:
block2
=
succ
num
+
=
1
if
num >
2
:
block
=
ori_ret_block
break
if
i >
2
:
break
return
block.start_ea
def
find_all_real_block(func_ea):
blocks
=
idaapi.FlowChart(idaapi.get_func(func_ea))
loop_heads
=
find_loop_heads(func_ea)
print
(f
"循环头数量:{len(loop_heads)}----{[hex(loop_head) for loop_head in loop_heads]}"
)
all_real_block
=
[]
for
loop_head_addr
in
loop_heads:
loop_head_block
=
get_block_by_address(loop_head_addr)
loop_head_preds
=
list
(loop_head_block.preds())
loop_head_preds_addr
=
[loop_head_pred.start_ea
for
loop_head_pred
in
loop_head_preds]
converge_addr
=
find_converge_addr(loop_head_addr)
real_blocks
=
[]
if
loop_head_addr !
=
converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)
converge_block
=
get_block_by_address(converge_addr)
list_preds
=
list
(converge_block.preds())
for
pred_block
in
list_preds:
end_ea
=
pred_block.end_ea
last_ins_ea
=
idc.prev_head(end_ea)
mnem
=
idc.print_insn_mnem(last_ins_ea)
size
=
get_basic_block_size(pred_block)
if
size >
4
and
"B."
not
in
mnem:
start_ea
=
pred_block.start_ea
mnem
=
idc.print_insn_mnem(start_ea)
if
mnem
=
=
"CSEL"
:
csel_preds
=
pred_block.preds()
for
csel_pred
in
csel_preds:
real_blocks.append(csel_pred.start_ea)
else
:
real_blocks.append(pred_block.start_ea)
real_blocks.sort()
all_real_block.append(real_blocks)
print
(
"子循环头:"
, [
hex
(child_block_ea)
for
child_block_ea
in
real_blocks])
ret_addr
=
find_ret_block_addr(blocks)
all_real_block.append(ret_addr)
print
(
"all_real_block:"
,all_real_block)
all_real_block_list
=
[]
for
real_blocks
in
all_real_block:
if
isinstance
(real_blocks,
list
):
all_real_block_list.extend(real_blocks)
else
:
all_real_block_list.append(real_blocks)
for
real_block_ea
in
all_real_block_list:
add_block_color(real_block_ea)
print
(
"\n所有真实块获取完成"
)
print
(
"===========INT==============="
)
print
(all_real_block_list)
print
(
"===========HEX==============="
)
print
(f
"数量:{len(all_real_block_list)}"
)
print
([
hex
(real_block_ea)
for
real_block_ea
in
all_real_block_list],
"\n"
)
all_child_prologue_addr
=
all_real_block.copy()
all_child_prologue_addr.remove(ret_addr)
all_child_prologue_addr.remove(all_child_prologue_addr[
0
])
print
(
"所有子序言块相关的真实块地址:"
,all_child_prologue_addr)
all_child_prologue_last_ins_ea
=
[]
for
child_prologue_array
in
all_child_prologue_addr:
child_prologue_addr
=
child_prologue_array[
0
]
child_prologue_block
=
get_block_by_address(child_prologue_addr)
child_prologue_end_ea
=
child_prologue_block.end_ea
child_prologue_last_ins_ea
=
idc.prev_head(child_prologue_end_ea)
all_child_prologue_last_ins_ea.append(child_prologue_last_ins_ea)
print
(
"所有子序言块的最后一条指令的地址:"
, all_child_prologue_last_ins_ea)
return
all_real_block,all_child_prologue_addr,all_child_prologue_last_ins_ea
func_ea
=
0x41D08
reals
=
find_all_real_block(func_ea)
from
collections
import
deque
import
idaapi
import
idc
def
get_block_by_address(ea):
func
=
idaapi.get_func(ea)
blocks
=
idaapi.FlowChart(func)
for
block
in
blocks:
if
block.start_ea <
=
ea < block.end_ea:
return
block
return
None
def
find_loop_heads(func):
loop_heads
=
set
()
queue
=
deque()
block
=
get_block_by_address(func)
queue.append((block, []))
while
len
(queue) >
0
:
cur_block, path
=
queue.popleft()
if
cur_block.start_ea
in
path:
loop_heads.add(cur_block.start_ea)
continue
path
=
path
+
[cur_block.start_ea]
queue.extend((succ, path)
for
succ
in
cur_block.succs())
all_loop_heads
=
list
(loop_heads)
all_loop_heads.sort()
return
all_loop_heads
def
find_converge_addr(loop_head_addr):
converge_addr
=
None
block
=
get_block_by_address(loop_head_addr)
preds
=
block.preds()
pred_list
=
list
(preds)
if
len
(pred_list)
=
=
2
:
for
pred
in
pred_list:
tmp_list
=
list
(pred.preds())
if
len
(tmp_list) >
1
:
converge_addr
=
pred.start_ea
else
:
converge_addr
=
loop_head_addr
return
converge_addr
def
get_basic_block_size(bb):
return
bb.end_ea
-
bb.start_ea
def
add_block_color(ea):
block
=
get_block_by_address(ea)
curr_addr
=
block.start_ea
while
curr_addr <block.end_ea:
idc.set_color(curr_addr,idc.CIC_ITEM,
0xffcc33
)
curr_addr
=
idc.next_head(curr_addr)
def
del_func_color(curr_addr):
end_ea
=
idc.find_func_end(curr_addr)
while
curr_addr < end_ea:
idc.set_color(curr_addr, idc.CIC_ITEM,
0xffffffff
)
curr_addr
=
idc.next_head(curr_addr)
def
find_ret_block_addr(blocks):
for
block
in
blocks:
succs
=
block.succs()
succs_list
=
list
(succs)
end_ea
=
block.end_ea
last_ins_ea
=
idc.prev_head(end_ea)
mnem
=
idc.print_insn_mnem(last_ins_ea)
if
len
(succs_list)
=
=
0
:
if
mnem
=
=
"RET"
:
ori_ret_block
=
block
while
True
:
tmp_block
=
block.preds()
pred_list
=
list
(tmp_block)
if
len
(pred_list)
=
=
1
:
block
=
pred_list[
0
]
if
get_basic_block_size(block)
=
=
4
:
continue
else
:
break
else
:
break
block2
=
block
num
=
0
i
=
0
while
True
:
i
+
=
1
succs_block
=
block2.succs()
for
succ
in
succs_block:
child_succs
=
succ.succs()
succ_list
=
list
(child_succs)
if
len
(succ_list) !
=
0
:
block2
=
succ
num
+
=
1
if
num >
2
:
block
=
ori_ret_block
break
if
i >
2
:
break
return
block.start_ea
def
find_all_real_block(func_ea):
blocks
=
idaapi.FlowChart(idaapi.get_func(func_ea))
loop_heads
=
find_loop_heads(func_ea)
print
(f
"循环头数量:{len(loop_heads)}----{[hex(loop_head) for loop_head in loop_heads]}"
)
all_real_block
=
[]
for
loop_head_addr
in
loop_heads:
loop_head_block
=
get_block_by_address(loop_head_addr)
loop_head_preds
=
list
(loop_head_block.preds())
loop_head_preds_addr
=
[loop_head_pred.start_ea
for
loop_head_pred
in
loop_head_preds]
converge_addr
=
find_converge_addr(loop_head_addr)
real_blocks
=
[]
if
loop_head_addr !
=
converge_addr:
loop_head_preds_addr.remove(converge_addr)
real_blocks.extend(loop_head_preds_addr)
converge_block
=
get_block_by_address(converge_addr)
list_preds
=
list
(converge_block.preds())
for
pred_block
in
list_preds:
end_ea
=
pred_block.end_ea
last_ins_ea
=
idc.prev_head(end_ea)
mnem
=
idc.print_insn_mnem(last_ins_ea)
size
=
get_basic_block_size(pred_block)
if
size >
4
and
"B."
not
in
mnem:
start_ea
=
pred_block.start_ea
mnem
=
idc.print_insn_mnem(start_ea)
if
mnem
=
=
"CSEL"
:
csel_preds
=
pred_block.preds()
for
csel_pred
in
csel_preds:
real_blocks.append(csel_pred.start_ea)
else
:
real_blocks.append(pred_block.start_ea)
real_blocks.sort()
all_real_block.append(real_blocks)
print
(
"子循环头:"
, [
hex
(child_block_ea)
for
child_block_ea
in
real_blocks])
ret_addr
=
find_ret_block_addr(blocks)
all_real_block.append(ret_addr)
print
(
"all_real_block:"
,all_real_block)
all_real_block_list
=
[]
for
real_blocks
in
all_real_block:
if
isinstance
(real_blocks,
list
):
all_real_block_list.extend(real_blocks)
else
:
all_real_block_list.append(real_blocks)
for
real_block_ea
in
all_real_block_list:
add_block_color(real_block_ea)
print
(
"\n所有真实块获取完成"
)
print
(
"===========INT==============="
)
print
(all_real_block_list)
print
(
"===========HEX==============="
)
print
(f
"数量:{len(all_real_block_list)}"
)
print
([
hex
(real_block_ea)
for
real_block_ea
in
all_real_block_list],
"\n"
)
all_child_prologue_addr
=
all_real_block.copy()
all_child_prologue_addr.remove(ret_addr)
all_child_prologue_addr.remove(all_child_prologue_addr[
0
])
print
(
"所有子序言块相关的真实块地址:"
,all_child_prologue_addr)
all_child_prologue_last_ins_ea
=
[]
for
child_prologue_array
in
all_child_prologue_addr:
child_prologue_addr
=
child_prologue_array[
0
]
child_prologue_block
=
get_block_by_address(child_prologue_addr)
child_prologue_end_ea
=
child_prologue_block.end_ea
child_prologue_last_ins_ea
=
idc.prev_head(child_prologue_end_ea)
all_child_prologue_last_ins_ea.append(child_prologue_last_ins_ea)
print
(
"所有子序言块的最后一条指令的地址:"
, all_child_prologue_last_ins_ea)
return
all_real_block,all_child_prologue_addr,all_child_prologue_last_ins_ea
func_ea
=
0x41D08
reals
=
find_all_real_block(func_ea)
proj
=
angr.Project(file_path, auto_load_libs
=
False
)
base
=
proj.loader.min_addr
func_addr
=
base
+
func_offset
init_state
=
proj.factory.blank_state(addr
=
func_addr)
init_state.options.add(angr.options.CALLLESS)
proj
=
angr.Project(file_path, auto_load_libs
=
False
)
base
=
proj.loader.min_addr
[培训]内核驱动高级班,冲击BAT一流互联网大厂工作,每周日13:00-18:00直播授课
最后于 2025-4-21 01:24
被九天666编辑
,原因: