使用bianryninja workflow解混淆0x1 - 双分支间接跳转

用的mtguard.so,算比较麻烦的样本,不能完美解

bianryninja 的运行流程

bianryninja默认带了两个workflow,通常我们用的是默认的metaAnalyze workflowworkflow可以理解成反编译器将二进制文件从机器码逐步提升成伪代码的流程,bianryninjaworkflow由多个activity组成,每个activity代表了一项分析任务,activity之间组成cfg式的结构依次运行,将机器码依次提升到汇编,llil,mlil,hlil,这其实就是llvm pass的思路
很多时候二进制样本中会有各种混淆,在arm64样本中比较常见的就是间接跳转混淆,此时默认的workflow因为算不出来跳转的地址,反编译控制流就会断开,此时有很多修复的方案,比如修改汇编将原本的br指令替换成条件跳转,或者修改iljump替换成if这样的分支语句,这里既然使用到了workflow,自然就是使用修改il的方式了,修改il有很多好处,首先il本身包含了许多元数据,可以很方便的查找其前驱和让我们分析数据流,其次il本身可以任意修改,包括增加或减少语句的数量,而汇编因为是嵌入在二进制程序里的,如果出现空间问题就很难办,最后就是il在处理条件上相比汇编更加简洁,主要是ceslcest的判断符太多了,而il会自动把条件处理为cond变量
这里我们选择在mlil层和hlil层之间插入我们的

样本分析

来看一下mtguard.so中的这个混淆样本


可以看到美团的这个样本也是很难搞,一处跳转就用了好多处分支赋值指令,而且判断符还不一样
我们从mlil入手看一下混淆的结构,因为在mlil之前变量定义还未生成,不方便做数据流分析,而且mlil一个重要的功能是将未分析出值的变量指定为常量,这样如果bn无法分析表地址,我们还可以手动指定表地址

我们的最终目的是求解x9_1变量的值,这个值理论有两个,让我们看一下bn已经分析到哪一步了



可以看到其实我们需要的值都有了,只是bn没办法把这些值两两对应起来分析

此处补充一个前提,我们假定即使存在多个csel类指令,他们都对应同一个cmp,即cond全部是等价的,做出这种假定的前提是考虑这种混淆生成的方式,假设有n个cmp,那可能的状态数就是$2^n$,而正常的代码中的跳转语句大部分就是直接跳转,分支和switch三种结构,跳转状态为$2^n$的几乎没有,那我们观察这个结构,怎么看都像是分支,也就是说这个其实是b.ne或类似的指令通过llvm变换来的,所以说cmp大概率只有一个

我们的目标就是将这个跳转的的真假两个分支的的值分别计算出来,也就是把里面有两个值的变量和TrueBranchFalseBranch对应起来,因为之前已经论证过cond应该都是一样的,所以我们在两个if中任意找一个cond就行(不排除做一个混淆然后把这两个cond做成相反的情况,但这个样本里没有,如果有的话其实最后拿cond时全拿下来然后判断一下就行,因为你从源代码的角度他大概率就是一句简单的if语句)

workflow

这里先贴一个完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import json
from dataclasses import dataclass,field
from binaryninja import(
Workflow,
AnalysisContext,
Activity,
MediumLevelILFunction,
MediumLevelILConstPtr,
MediumLevelILConstData,
MediumLevelILConst,
MediumLevelILAdd,
MediumLevelILSub,
MediumLevelILXor,
MediumLevelILJump,
RegisterValueType,
MediumLevelILJumpTo,
MediumLevelILFunction,
BinaryView,

)
mlil_const=MediumLevelILConstPtr | MediumLevelILConstData | MediumLevelILConst
from binaryninja import(
MediumLevelILVarSsa,
MediumLevelILVarPhi,
MediumLevelILVar,
MediumLevelILSetVarSsa,
MediumLevelILLoadSsa,
SSAVariable,
ExpressionIndex,
MediumLevelILLabel,
Function,
)
class two_direct_indirect_jump_handler:
@dataclass
class resolved_data:
cond:ExpressionIndex
trueAddr:int
falseAddr:int
def __init__(self,ctx:AnalysisContext):
self.ctx=ctx
self.resolved:dict[int,two_direct_indirect_jump_handler.resolved_data]={}
pass

@staticmethod
def search_var_value_ssa(var:SSAVariable,mlil_ssa:MediumLevelILFunction,cond):
def_site=var.def_site
# print(hex(def_site.address),var,def_site.get_possible_reg_values_after(var.var.storage))
lv,rv=None,None
if isinstance(def_site,MediumLevelILVarPhi):
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src[0],mlil_ssa,cond)
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src[1],mlil_ssa,cond)
if lv==None or rv==None:
raise RuntimeError("phi calc fail\n {} {} \n {} {}".format(def_site.src[0].def_site,def_site.src[1].def_site,hex(def_site.address),def_site))
return lv | rv


if isinstance(def_site,MediumLevelILSetVarSsa):
if isinstance(def_site.src,MediumLevelILLoadSsa):
tokens=list(def_site.src.traverse(lambda x: x))
table=def_site.get_possible_reg_values_after(def_site.dest.var.storage)
if table.type!=RegisterValueType.LookupTableValue:
raise RuntimeError("not lookup table {} {}".format(hex(def_site.address),def_site))
# print(table.mapping)
res={}
for token in tokens:
if isinstance(token,MediumLevelILVarSsa):
mp=two_direct_indirect_jump_handler.search_var_value_ssa(token.var,mlil_ssa,cond)
if mp==None:
continue
try:
res|= {"t":table.mapping[mp["t"]],"f":table.mapping[mp["f"]]}
except KeyError:
try:
res|= {"t":table.mapping[mp["t"]//8],"f":table.mapping[mp["f"]//8]}
except KeyError:
print(mp,token.var,table.mapping)
print(hex(def_site.address),def_site)
print("key error",mp,table.mapping)
raise RuntimeError("maybe not two direct indirect jump {}".format(hex(def_site.address)))

if len(res)!=2:
raise RuntimeError("calc load fail len!=2 {} {}".format(hex(def_site.address),def_site))
return res
if isinstance(def_site.src,mlil_const):
dependence=def_site.branch_dependence
if len(dependence)==0:
print("no dependence",def_site.address,def_site)
return def_site.src.value.value
key=max(dependence.keys())
if cond["value"]==None:
cond["value"]=mlil_ssa[key].condition.non_ssa_form.expr_index
if "True" in dependence[key].name:
return {"t":def_site.src.value.value}
elif "False" in dependence[key].name:
return {"f":def_site.src.value.value}
else:
print("const but unknown branch name",def_site.address,def_site,dependence[key].name)
return def_site.src.value.value
pass
def calc_cbk(lv,rv,op:function):
if isinstance(lv,int) and isinstance(rv,int):
return op(lv,rv)
if isinstance(lv,dict) and isinstance(rv,dict):
t=op(lv["t"],rv["t"])
f=op(lv["f"],rv["f"])
return {"t":t,"f":f}
if isinstance(lv,int) and isinstance(rv,dict):
t=op(lv,rv["t"])
f=op(lv,rv["f"])
return {"t":t,"f":f}
if isinstance(lv,dict) and isinstance(rv,int):
t=op(lv["t"],rv)
f=op(lv["f"],rv)
return {"t":t,"f":f}
if isinstance(def_site.src,MediumLevelILXor):
lv=def_site.src.left
rv=def_site.src.right
if isinstance(lv,mlil_const):
lv=lv.value.value
else:
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
if isinstance(rv,mlil_const):
rv=rv.value.value
else:
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
return calc_cbk(lv,rv,lambda x,y: x^y)
if isinstance(def_site.src,MediumLevelILAdd):
lv=def_site.src.left
rv=def_site.src.right
if isinstance(lv,mlil_const):
lv=lv.value.value
else:
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
if isinstance(rv,mlil_const):
rv=rv.value.value
else:
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
return calc_cbk(lv,rv,lambda x,y: x+y)
if isinstance(def_site.src,MediumLevelILSub):
lv=def_site.src.left
rv=def_site.src.right
if isinstance(lv,mlil_const):
lv=lv.value.value
else:
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
if isinstance(rv,mlil_const):
rv=rv.value.value
else:
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
return calc_cbk(lv,rv,lambda x,y: x-y)
def check_manual_update(self,func:Function,addr:int,res:dict):
auto_branch=func.get_indirect_branches_at(addr)
print(auto_branch,res)
if len(auto_branch)!=2:
return False
if auto_branch[0].dest_addr!=res["t"] and auto_branch[0].dest_addr!=res["f"]:
return True
if auto_branch[1].dest_addr!=res["t"] and auto_branch[1].dest_addr!=res["f"]:
return True
def check_addr_vaild(self,addr:int,bv:BinaryView):
for seg in bv.segments:
if seg.executable and seg.start<=addr<=seg.end and addr%4==0:
return True
return False
def connect_basic_block(self,func:Function):
mlil=func.mlil

for _ in func.unresolved_indirect_branches:
unsolved_addr=_[1]
try:
jump:MediumLevelILJump|MediumLevelILJumpTo=mlil[mlil.get_instruction_start(unsolved_addr)].ssa_form
if not isinstance(jump,MediumLevelILJump) and not isinstance(jump,MediumLevelILJumpTo):
raise RuntimeError("not jump {}".format(hex(jump.address)))
var=jump.dest.ssa_form.var
cond={"value":None}
res=two_direct_indirect_jump_handler.search_var_value_ssa(var,mlil.ssa_form,cond)
if res==None or cond["value"]==None:
raise RuntimeError("calc fail {}".format(hex(jump.address)))
# print(hex(unsolved_addr),self.check_bug_tag(mlil.source_function,unsolved_addr))
if (isinstance(jump,(MediumLevelILJump)) or self.check_manual_update(func,unsolved_addr,res)):
print(hex(unsolved_addr),jump,func.mlil.get_expr(cond["value"]),res)
func.set_user_indirect_branches(unsolved_addr, [(func.arch,res["t"]),(func.arch,res["f"])],func.arch)
self.resolved[unsolved_addr]=two_direct_indirect_jump_handler.resolved_data(cond=cond["value"],trueAddr=res["t"],falseAddr=res["f"])
# print(hex(unsolved_addr),hex(res["t"]),hex(res["f"]))
except Exception as e:
print(hex(unsolved_addr),e)
continue

def convert_jump_to_if(self,ctx:AnalysisContext):
new_func=MediumLevelILFunction(ctx.function.arch,low_level_il=ctx.llil)
old_mlil=ctx.function.mlil
new_func.prepare_to_copy_function(old_mlil)
for old_block in old_mlil:
new_func.prepare_to_copy_block(old_block)
for instr_idx in range(old_block.start,old_block.end):
instr=old_mlil[instr_idx]
if isinstance(instr,MediumLevelILJumpTo) and isinstance(instr.dest,MediumLevelILVar) and not instr.get_possible_reg_values(instr.dest.var.storage).type==RegisterValueType.ConstantValue:
# print(old_mlil.source_function.get_tags_at(instr.address,auto=True))
for tag in old_mlil.source_function.get_tags_at(instr.address,auto=True):
old_mlil.source_function.remove_auto_address_tags_of_type(instr.address,tag.type.name)
cond={"value":None}
try:
res=two_direct_indirect_jump_handler.search_var_value_ssa(instr.dest.ssa_form.var,old_mlil.ssa_form,cond)
if res==None or cond["value"]==None or not self.check_addr_vaild(res["t"],ctx.function.view) or not self.check_addr_vaild(res["f"],ctx.function.view):
old_mlil.source_function.add_tag("Bugs","need manual analyze jump",instr.address,auto=True)
print("add bug tag at {}".format(hex(instr.address)))
except Exception as e:
old_mlil.source_function.add_tag("Bugs","need manual analyze jump",instr.address,auto=True)
print("add bug tag at {}".format(hex(instr.address)))
print(e)
if isinstance(instr,MediumLevelILJumpTo) and instr.address in self.resolved:
try:
label_t=MediumLevelILLabel()
label_f=MediumLevelILLabel()
indirect_branches=ctx.function.get_indirect_branches_at(instr.address)
if len(indirect_branches)!=2:
raise RuntimeError("indirect branches len!=2 {} {}".format(hex(instr.address),indirect_branches))
for branch in indirect_branches:
if branch.dest_addr==self.resolved[instr.address].trueAddr:
label_t.operand=instr.targets[branch.dest_addr]
if branch.dest_addr==self.resolved[instr.address].falseAddr:
label_f.operand=instr.targets[branch.dest_addr]
if_expr=new_func.if_expr(old_mlil.get_expr(self.resolved[instr.address].cond).copy_to(new_func),
label_t,label_f,old_mlil[instr_idx].source_location)
new_func.append(if_expr,old_mlil[instr_idx].source_location)
except Exception as e:
print(e)
new_func.append(old_mlil[instr_idx].copy_to(new_func),old_mlil[instr_idx].source_location)

else:
new_func.append(old_mlil[instr_idx].copy_to(new_func),old_mlil[instr_idx].source_location)
new_func.finalize()
new_func.generate_ssa_form()
ctx.mlil=new_func

def re_run_calc_check(self,var:SSAVariable):
cond={"value":None}
try:
res=self.search_var_value_ssa(var,self.ctx.function.mlil.ssa_form,cond)
except Exception as e:
print(e)
return False
if res==None or cond["value"]==None:
return False
return True
def run(self):
self.connect_basic_block(self.ctx.function)
self.convert_jump_to_if(self.ctx)



def install_two_direct_indirect_jump_handler(ctx:AnalysisContext):
handler=two_direct_indirect_jump_handler(ctx)

handler.run()


wf=Workflow("").clone("satori.function.deobf")
wf.register_activity(Activity(configuration=json.dumps({
"name":"satori.function.handle_two_direct_indirect_jump.activity",
"title":"handle_two_direct_indirect_jump",
"description":"handle_two_direct_indirect_jump",
"eligibility":{
"auto":{
"default":True
}
}

}),action=lambda context: install_two_direct_indirect_jump_handler(context)))
wf.insert("core.function.generateHighLevelIL",["satori.function.handle_two_direct_indirect_jump.activity"])

wf.register()
search_var_value_ssa
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
def search_var_value_ssa(var:SSAVariable,mlil_ssa:MediumLevelILFunction,cond):
def_site=var.def_site
# print(hex(def_site.address),var,def_site.get_possible_reg_values_after(var.var.storage))
lv,rv=None,None
if isinstance(def_site,MediumLevelILVarPhi):
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src[0],mlil_ssa,cond)
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src[1],mlil_ssa,cond)
if lv==None or rv==None:
raise RuntimeError("phi calc fail\n {} {} \n {} {}".format(def_site.src[0].def_site,def_site.src[1]def_site,hex(def_site.address),def_site))
return lv | rv
if isinstance(def_site,MediumLevelILSetVarSsa):
if isinstance(def_site.src,MediumLevelILLoadSsa):
tokens=list(def_site.src.traverse(lambda x: x))
table=def_site.get_possible_reg_values_after(def_site.dest.var.storage)
if table.type!=RegisterValueType.LookupTableValue:
raise RuntimeError("not lookup table {} {}".format(hex(def_site.address),def_site))
# print(table.mapping)
res={}
for token in tokens:
if isinstance(token,MediumLevelILVarSsa):
mp=two_direct_indirect_jump_handler.search_var_value_ssa(token.var,mlil_ssa,cond)
if mp==None:
continue
try:
res|= {"t":table.mapping[mp["t"]],"f":table.mapping[mp["f"]]}
except KeyError:
try:
res|= {"t":table.mapping[mp["t"]//8],"f":table.mapping[mp["f"]//8]}
except KeyError:
print(mp,token.var,table.mapping)
print(hex(def_site.address),def_site)
print("key error",mp,table.mapping)
raise RuntimeError("maybe not two direct indirect jump {}".format(hex(def_siteaddress)))
if len(res)!=2:
raise RuntimeError("calc load fail len!=2 {} {}".format(hex(def_site.address),def_site))
return res
if isinstance(def_site.src,mlil_const):
dependence=def_site.branch_dependence
if len(dependence)==0:
print("no dependence",def_site.address,def_site)
return def_site.src.value.value
key=max(dependence.keys())
if cond["value"]==None:
cond["value"]=mlil_ssa[key].condition.non_ssa_form.expr_index
if "True" in dependence[key].name:
return {"t":def_site.src.value.value}
elif "False" in dependence[key].name:
return {"f":def_site.src.value.value}
else:
print("const but unknown branch name",def_site.address,def_site,dependence[key].name)
return def_site.src.value.value
pass
def calc_cbk(lv,rv,op:function):
if isinstance(lv,int) and isinstance(rv,int):
return op(lv,rv)
if isinstance(lv,dict) and isinstance(rv,dict):
t=op(lv["t"],rv["t"])
f=op(lv["f"],rv["f"])
return {"t":t,"f":f}
if isinstance(lv,int) and isinstance(rv,dict):
t=op(lv,rv["t"])
f=op(lv,rv["f"])
return {"t":t,"f":f}
if isinstance(lv,dict) and isinstance(rv,int):
t=op(lv["t"],rv)
f=op(lv["f"],rv)
return {"t":t,"f":f}
if isinstance(def_site.src,MediumLevelILXor):
lv=def_site.src.left
rv=def_site.src.right
if isinstance(lv,mlil_const):
lv=lv.value.value
else:
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
if isinstance(rv,mlil_const):
rv=rv.value.value
else:
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
return calc_cbk(lv,rv,lambda x,y: x^y)
if isinstance(def_site.src,MediumLevelILAdd):
lv=def_site.src.left
rv=def_site.src.right
if isinstance(lv,mlil_const):
lv=lv.value.value
else:
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
if isinstance(rv,mlil_const):
rv=rv.value.value
else:
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
return calc_cbk(lv,rv,lambda x,y: x+y)
if isinstance(def_site.src,MediumLevelILSub):
lv=def_site.src.left
rv=def_site.src.right
if isinstance(lv,mlil_const):
lv=lv.value.value
else:
lv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.left.var,mlil_ssa,cond)
if isinstance(rv,mlil_const):
rv=rv.value.value
else:
rv=two_direct_indirect_jump_handler.search_var_value_ssa(def_site.src.right.var,mlil_ssa,cond)
return calc_cbk(lv,rv,lambda x,y: x-y)

这是整个activity的核心函数,通过搜索来查找各变量的值,顺带收集cond,这里采用SSA形式的mlil是因为常规mlil的变量定义位置def_site可能很多,因为变量会被复用

SSA是反编译领域的一种代码格式,指的是一个变量只在其被定义的时候赋值一次,这种格式的代码非常适合做数据流分析

BianryNinja的变量搞得比较臃肿,有*LevelILVar,*LevelILVarSsa,Variable,SSAVariable,其中前两个是从指令的参数中直接取出来的,后两个可以由前两个的.var成员获取,前两个直接可以通过ssa_formnon_ssa_form成员互相转化(因为前两个其实本质是expr

在BianryNinja IL 中,用户看到的一条条的指令称为instruction,而组成指令的操作数,操作符称为expressionexpression使用类似AST的结构组成instructioninstructionexpression每个函数中分别独立拥有自己的index,bn正是通过维护这些index来将分析的结果不停的传给下一个activity

这里使用SSAVariable类型进行搜索,因为就这种类型有def_site
这里考虑一种更加泛化的思路,对于我们在正在搜索的一个变量,他首先有这几种情况

phi指令也是ssa分析中的一种概念,指的是同一变量的不同版本(因为只在其定义时被赋值,所以会有很多版本)在分支的合并处进行值合并

  • phi指令,也就是两种逻辑分支的值进行合并
  • set指令,也就是被一个表达式赋值,而这又可以分出三种情况
    • MediumLevelILLoadSsa,从查找表中取值
    • 数值运算,比如sub,add
    • 定值赋值,这个可能出现在常量中,也可能出现在if分支开始的地方

对于phi指令,其实就是csel的两个分支做合并了,这里使用key为t,f的字典储存结果,显然这里只需要把两个变量搜出来的结果合并即可
也就是把tf的case或一下,按理说不出bug的情况肯定是tf各一个

查找表取值的情况直接用traverse先把查找语句中所有的expr都取出来,然后依次遍历,常量就不管,通常这里只有一个变量,因为有两个变量的话get_possible_reg_values_after取不出来映射表,我们这里的目的是吧index -> value的映射即LookupTable转换为t/f -> value的映射,遇到变量后直接以这个变量开始搜
数值运算的话稍微分类讨论下,根据左右值为字典或常量分出4种case,然后求解即可,这里其实就是把两个搜出来的结果做合并
赋值的话直接返回常量,没什么好说的

这个函数最终要么返回常量,要么返回t/f的字典,要么直接抛错误,我们这里只要返回字典的情况

connect_basic_block
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def connect_basic_block(self,func:Function):
mlil=func.mlil

for _ in func.unresolved_indirect_branches:
unsolved_addr=_[1]
try:
jump:MediumLevelILJump|MediumLevelILJumpTo=mlil[mlil.get_instruction_start(unsolved_addr)].ssa_form
if not isinstance(jump,MediumLevelILJump) and not isinstance(jump,MediumLevelILJumpTo):
raise RuntimeError("not jump {}".format(hex(jump.address)))
var=jump.dest.ssa_form.var
cond={"value":None}
res=two_direct_indirect_jump_handler.search_var_value_ssa(var,mlil.ssa_form,cond)
if res==None or cond["value"]==None:
raise RuntimeError("calc fail {}".format(hex(jump.address)))
# print(hex(unsolved_addr),self.check_bug_tag(mlil.source_function,unsolved_addr))
if (isinstance(jump,(MediumLevelILJump)) or self.check_manual_update(func,unsolved_addr,res)):
print(hex(unsolved_addr),jump,func.mlil.get_expr(cond["value"]),res)
func.set_user_indirect_branches(unsolved_addr, [(func.arch,res["t"]),(func.arch,res["f"])],funcarch)
self.resolved[unsolved_addr]=two_direct_indirect_jump_handler.resolved_data(cond=cond["value"]trueAddr=res["t"],falseAddr=res["f"])
# print(hex(unsolved_addr),hex(res["t"]),hex(res["f"]))
except Exception as e:
print(hex(unsolved_addr),e)
continue

这个函数主要的工作是遍历所有标记了unresolved_indirect_branch的地址,然后尝试计算跳转情况并且使用set_user_indirect_branches将当前基本块与算出来的地址所在的基本块连起来,注意到set_user_indirect_branches会触发BinaryNinja的重分析事件,也就是说workflow会在此时立刻被打断且从头开始,因此这里需要小心处理避免产生死循环
这里check_manual_update的情况是有可能先前因为表地址错误之类的原因导致算出来的地址是错的,因此要通过手动set user variable value来修正错误,此时算出来的结果肯定和get_indirect_branches_at获取到的的不一样,因此要重新设置此处的indirect_branches
,至于已经连接过的,就把结果添加到resolved在之后使用

convert_jump_to_if
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def convert_jump_to_if(self,ctx:AnalysisContext):
new_func=MediumLevelILFunction(ctx.function.arch,low_level_il=ctx.llil)
old_mlil=ctx.function.mlil
new_func.prepare_to_copy_function(old_mlil)
for old_block in old_mlil:
new_func.prepare_to_copy_block(old_block)
for instr_idx in range(old_block.start,old_block.end):
instr=old_mlil[instr_idx]
if isinstance(instr,MediumLevelILJumpTo) and isinstance(instr.dest,MediumLevelILVar) and not instrget_possible_reg_values(instr.dest.var.storage).type==RegisterValueType.ConstantValue:
# print(old_mlil.source_function.get_tags_at(instr.address,auto=True))
for tag in old_mlil.source_function.get_tags_at(instr.address,auto=True):
old_mlil.source_function.remove_auto_address_tags_of_type(instr.address,tag.type.name)
cond={"value":None}
try:
res=two_direct_indirect_jump_handler.search_var_value_ssa(instr.dest.ssa_form.var,old_mlilssa_form,cond)
if res==None or cond["value"]==None or not self.check_addr_vaild(res["t"],ctx.function.view)or not self.check_addr_vaild(res["f"],ctx.function.view):
old_mlil.source_function.add_tag("Bugs","need manual analyze jump",instr.addressauto=True)
print("add bug tag at {}".format(hex(instr.address)))
except Exception as e:
old_mlil.source_function.add_tag("Bugs","need manual analyze jump",instr.address,auto=True)
print("add bug tag at {}".format(hex(instr.address)))
print(e)
if isinstance(instr,MediumLevelILJumpTo) and instr.address in self.resolved:
try:
label_t=MediumLevelILLabel()
label_f=MediumLevelILLabel()
indirect_branches=ctx.function.get_indirect_branches_at(instr.address)
if len(indirect_branches)!=2:
raise RuntimeError("indirect branches len!=2 {} {}".format(hex(instr.address)indirect_branches))
for branch in indirect_branches:
if branch.dest_addr==self.resolved[instr.address].trueAddr:
label_t.operand=instr.targets[branch.dest_addr]
if branch.dest_addr==self.resolved[instr.address].falseAddr:
label_f.operand=instr.targets[branch.dest_addr]
if_expr=new_func.if_expr(old_mlil.get_expr(self.resolved[instr.address].cond).copy_t(new_func),
label_t,label_f,old_mlil[instr_idx].source_location)
new_func.append(if_expr,old_mlil[instr_idx].source_location)
except Exception as e:
print(e)
new_func.append(old_mlil[instr_idx].copy_to(new_func),old_mlil[instr_idx].source_location)

else:
new_func.append(old_mlil[instr_idx].copy_to(new_func),old_mlil[instr_idx].source_location)
new_func.finalize()
new_func.generate_ssa_form()
ctx.mlil=new_func

这个函数负责两个工作,一是根据之前的计算结果构造if指令,这样本来非常丑的switch就会变成if,二是检查计算结果是否合法,如果不合法的话要打上tag之后再修复
构造if指令主要麻烦的点在于设置label,这里的话必须把原函数整个拷贝一遍,根据文档中的说法要对函数和基本块依次调用prepare_to_copy_functionprepare_to_copy_block,这两个方法会向新函数中复制一些元数据,然后遍历指令,匹配MediumLevelILJumpTo(设置了indirect branchjump会自动变成jumpTo)并且检查地址是否在resolved里(似乎有点重复),这里dest_addrtarget中的key和我们当初设置的addr是完全一样的,而其对应的mliladdress属性的值和我们设置的addr不一定是一样的,这可能是因为mlil指令的宽度问题,但是bn在这里的处理不是很好,目前看来从indirect_branchestargets将地址转换为instrution index是比较好的
设置label目前就只能通过直接构造,然后给oprand属性赋值的办法,在调用finalize会自动处理label,除此之外找不到更好的设置mlil label的方式,get_label_for_source_instruction完全不好用
然后就是把之前顺带搜到的cond复制到新的mlil里,然后构造if_expr

对于添加tag的部分,主要是mtguard.so的这种跳转表地址不是唯一的,也就是对于一个load,他的表变量不是常量,可能有preApreB两个前驱,这里显然是需要手动指定表地址的,而bn在分析的过程中,可能preB还没修复完毕就分析了这里,那很可能就会算出错的结果,而当preB也修复完后,此处的计算会正确的抛出错误,但是我们没法撤销这里设置的跳转,因为这里的控制流可能会往前指,撤销设置触发重分析后可能又会导致计算不抛错误,那就死循环了
所以我们的处理方式是最后再跑一次计算,如果发现算出来结果不一样,或者抛错误了,就设置一个Bugs标签

最后所有指令都复制完了,调用finalizegenerate_ssa_form,然后把新构造好的mlil对象赋值给旧的做替换

效果


这里设置一下函数的分析方式,使用我们自己的workflow分析satori.function.deobf

修完大概长这样,看着还可以,他计算cond应该是用了mba表达式,乱七八糟的计算比较多,没有删掉显得比较多

使用bianryninja workflow解混淆0x1 - 双分支间接跳转

https://sgsgsama.github.io/ctf/auto-re-dev/bnWorkflow0x1/

Author

SGSG

Posted on

2025-12-04

Updated on

2025-12-07

Licensed under