cea-sec / miasm

Reverse engineering framework in Python
https://miasm.re/
GNU General Public License v2.0
3.5k stars 473 forks source link

A question on miasm instrumentation #1293

Open liuya0904 opened 4 years ago

liuya0904 commented 4 years ago

Hello,

I want to use miasm to symboliclly execute a single function in ELF, with the sub functions called inside skipped but with args logged. I modied code from dse_crackme.py in the release code, with detailed steps as follow:

1, add a general hook function, with the to be hooked functions globally hardcoded in a dictionary named hooks. This hook function does nothing but restore the stack, update regs.RIP, and symblize the return value of regs.RAX.

hooks={4223168L: 'sub_4070C0', 4221632L: 'strlen', 4214472L: 'time', 4213740L: '_exit', 4203744L: 'sub_4024E0', 4223600L: 'socket', 4223240L: 'connect', 4213784L: 'close', 4223484L: 'send'} def hook(dse): global hooks

# ['RDI', 'RSI', 'RDX', 'RCX', 'R8', 'R9']
# main, argc, argv, ...
pdb.set_trace()

#
#dse.update_state_from_concrete(mem=True)

pc=dse.jitter.cpu.EIP
print("pc: 0x%x"%pc)

regs = dse.ir_arch.arch.regs
top_stack = dse.eval_expr(regs.RSP)
'''main_addr = dse.eval_expr(regs.RDI)
argc = dse.eval_expr(regs.RSI)
argv = dse.eval_expr(regs.RDX)
'''

# skip current instruction
#dse.jitter.cpu.EIP=pc+5
#ret_addr = ExprInt(pc+5, regs.RIP.size)
ret_addr = ExprInt(dse.jitter.get_stack_arg(0), regs.RIP.size)
print ret_addr
print regs.RIP
ret_val= ExprId("SF_%x_%s" % (pc, hooks[pc]), regs.RAX.size)
dse.update_state({
    regs.RSP: dse.symb.eval_expr(regs.RSP + ExprInt(8, regs.RSP.size)),
    dse.ir_arch.IRDst: ret_addr,
    regs.RIP: ret_addr,
    regs.RAX: ret_val
})

2, hook the functions in the global of hooks with dse API of dse.add_instrumentation():

for ea in hooks: dse.add_instrumentation(ea, hook)

3, run it directly from the function start of 0x402150. entry_p=0x402150 sb.run(entry_p)

The function partially looks like: .text:0000000000402150 sub_402150 proc near ; CODE XREF: ProcessCmd+561↑p .text:0000000000402150 .text:0000000000402150 var_121 = byte ptr -121h .text:0000000000402150 var_50 = qword ptr -50h .text:0000000000402150 var_48 = qword ptr -48h .text:0000000000402150 var_40 = word ptr -40h .text:0000000000402150 var_3E = word ptr -3Eh .text:0000000000402150 var_3C = dword ptr -3Ch .text:0000000000402150 .text:0000000000402150 push rbp .text:0000000000402151 mov rbp, rsp .text:0000000000402154 push r15 .text:0000000000402156 mov r15d, edx .text:0000000000402159 push r14 .text:000000000040215B mov r14d, ecx .text:000000000040215E push r13 .text:0000000000402160 mov r13d, esi .text:0000000000402163 push r12 .text:0000000000402165 push rbx .text:0000000000402166 mov rbx, rdi .text:0000000000402169 xor edi, edi .text:000000000040216B sub rsp, 28h .text:000000000040216F call time .text:0000000000402174 sub rsp, 0E0h .text:000000000040217B mov r12, rax .text:000000000040217E mov rdi, rbx .text:0000000000402181 lea rax, [rsp+130h+var_121] .text:0000000000402186 mov [rbp+var_40], 2 .text:000000000040218C and rax, 0FFFFFFFFFFFFFFF0h .text:0000000000402190 mov [rbp+var_50], rax .text:0000000000402194 mov eax, r13d .text:0000000000402197 ror ax, 8 .text:000000000040219B mov [rbp+var_3E], ax .text:000000000040219F call sub_4070C0

When fun it, the first call to time() could be hookded successfully but with the following AssertError: -> pc=dse.jitter.cpu.EIP (Pdb) c pc: 0x404ec8 0x402174 RIP Traceback (most recent call last): File "symbol_vbot_atk.py", line 216, in sb.run(entry_p) File "/usr/local/lib/python2.7/dist-packages/miasm/analysis/sandbox.py", line 684, in run super(Sandbox_Linux_x86_64, self).run(addr) File "/usr/local/lib/python2.7/dist-packages/miasm/analysis/sandbox.py", line 136, in run self.jitter.continue_run() File "/usr/local/lib/python2.7/dist-packages/miasm/jitter/jitload.py", line 412, in continue_run return next(self.run_iterator) File "/usr/local/lib/python2.7/dist-packages/miasm/jitter/jitload.py", line 347, in runiter_once res = self.exec_cb(self) File "/usr/local/lib/python2.7/dist-packages/miasm/analysis/dse.py", line 338, in callback self.handle(ExprInt(cur_addr, self.ir_arch.IRDst.size)) File "/usr/local/lib/python2.7/dist-packages/miasm/analysis/dse.py", line 648, in handle assert dst == cur_addr AssertionError

Does anyone know what's wrong in my code? Thanks in advance.

liuya

serpilliere commented 4 years ago

Hi @liuya0904 ! Sorry for the delay. Maybe the IRDst is not set to the good destination ? It seems you paste your code without indentation (a bit hard to read) Can you use markdown formatting to repost your hook code ?

liuya0904 commented 4 years ago

I pasted all the code as follows:

from miasm.analysis.sandbox import Sandbox_Linux_x86_64
from miasm.analysis.dse import DSEEngine
from miasm.expression.expression import *
from miasm.core.utils import int_to_byte
from miasm.jitter.csts import PAGE_READ, PAGE_WRITE
from miasm.analysis.sandbox import Sandbox_Linux_x86_64
from miasm.expression.expression import *
from miasm.os_dep.win_api_x86_32 import get_win_str_a
from miasm.core.locationdb import LocationDB
from miasm.analysis.dse import DSEPathConstraint
from miasm.analysis.machine import Machine

def xxx___printf_chk(jitter):
    ret_ad, args = jitter.func_args_systemv(["flag", "format", "arg"])
    print(jitter.get_c_str(args.format))
    return jitter.func_ret_systemv(ret_ad, 1)

s1='aaaaaaa'
def xxx_fgets(jitter):
    global s1
    ret_ad, args = jitter.func_args_systemv(["dest", "size", "stream"])
    #s = input()
    s=s1
    s1="aaaaaaaaaaa"
    print s
    jitter.vm.set_mem(args.dest, s.encode())
    return jitter.func_ret_systemv(ret_ad, len(s))

def xxx_strcspn(jitter):
    ret_ad, args = jitter.func_args_systemv(["s", "rejected"])
    s = jitter.get_c_str(args.s)
    jitter.vm.set_mem(args.s, s.strip().encode())
    return jitter.func_ret_systemv(ret_ad, len(s))

def xxx___memcpy_chk(jitter):
    ret_ad, args = jitter.func_args_systemv(["dest", "src", "len", "destlen"])
    src = jitter.vm.get_mem(args.src, args.len)
    jitter.vm.set_mem(args.dest, src)

    global dse
    dse.attach(jitter)

    return jitter.func_ret_systemv(ret_ad, args.dest)

def xxx_puts_symb(dse):
    raise RuntimeError("Exit")

def symbolize_vm(dse):
    global vm_registers_symb, already_disass

    # update the DSE state (including the memory) from the concrete state
    dse.update_state_from_concrete(mem=True)
    print("symbolize_vm")

    # symbolize the memory corresponding to the VM registers (16 registers of 32 bits at 0x203040)
    for i in range(16):
        vm_registers_symb[ExprMem(ExprInt(0x203040 + i*4, 64), 32)] = ExprId("VM_R{}".format(i), 32)

    # symbolize the VM registers that correpond to real registers
    vm_registers_symb[dse.ir_arch.arch.regs.R9] = ExprId("VM_FLAGS", 64)
    vm_registers_symb[dse.ir_arch.arch.regs.RSI] = ExprId("VM_PC", 64)

    # update the DSE state with the VM registers symbols
    dse.update_state(vm_registers_symb)

    # get the VM state (PC, instruction bytes and opcode)
    vm_pc = int(dse.jitter.cpu.RSI)
    vm_instr = int(dse.jitter.cpu.RCX)
    vm_opcode = int(dse.jitter.cpu.RAX)

    # if the VM instruction was not already disassembled, we print the state and add a breakpoint at NEXT_ADDR
    if not vm_pc in already_disass or (vm_pc in already_disass and vm_instr != already_disass[vm_pc]):
        #print("\n{:x}:".format(vm_pc), end=" ")
        print("\nsymbolize_vm: 0x%x, end=" % vm_pc)

        already_disass[vm_pc] = vm_instr

        # VM opcode 0xFF exits the VM
        if vm_opcode == 0xFF:
            print("EXIT")

        # VM opcode 30 executes aesenc instruction but this instruction is not implemented in miasm
        if vm_opcode == 30:
            arg0 = vm_registers_symb[ExprMem(ExprInt(0x203040+(((vm_instr >> 16) & 0xF)*4), 64), 32)]
            arg1 = vm_registers_symb[ExprMem(ExprInt(0x203040+(((vm_instr >> 12) & 0xF)*4), 64), 32)]
            dest = vm_registers_symb[ExprMem(ExprInt(0x203040+(((vm_instr >> 20) & 0xF)*4), 64), 32)]
            print("@128[{} + 0x203080] = AESENC(@128[{} + 0x203080], @128[{} + 0x203080])".format(dest, arg0, arg1))

        dse.add_instrumentation(NEXT_ADDR, disass_vm_instruction)

    # as we do not want miasm to raise an exception when aesenc is jitted, we jump after the instruction and update the DSE state accordingly
    if vm_instr >> 24 == 30:
        dse.jitter.pc = 0x232d
        dse.jitter.cpu.RIP = 0x232d
        dse.update_state({
            dse.ir_arch.arch.regs.RIP: ExprInt(0x232d, 64),
            dse.ir_arch.arch.regs.RAX: ExprInt(vm_pc+4, 64) # update pc
        })

    return True

def disass_vm_instruction(dse):
    global vm_registers_symb

    vm_instr = ""

    # get memory modifications
    for dst, src in dse.symb.modified(ids=False):
        # do not print vm registers unchanged
        if dst in vm_registers_symb and src == vm_registers_symb[dst]:
            continue
        vm_instr += "{} = {}\n".format(dst.replace_expr(vm_registers_symb), dse.eval_expr(src))

    # get register modifications
    for dst, src in dse.symb.modified(mems=False):
        # dst = ExprMem(VM_REG)
        if src in vm_registers_symb:
            vm_instr += "{} = {}\n".format(dst, dse.eval_expr(src))
        # VM_REG != VM_REG_ID
        elif dst in vm_registers_symb and src != vm_registers_symb[dst] and vm_registers_symb[dst] != ExprId("VM_PC", 64):
            vm_instr += "{} = {}\n".format(vm_registers_symb[dst], dse.eval_expr(src))

    # if no modifications then print ZF and VM_PC changes
    if not vm_instr:
        for dst, src in dse.symb.modified(mems=False):
            if dst == dse.ir_arch.arch.regs.zf:
                vm_instr += "ZF = {}\n".format(dse.eval_expr(src))
            elif dst in vm_registers_symb and vm_registers_symb[dst] == ExprId("VM_PC", 64):
                vm_instr += "VM_PC = {}\n".format(dse.eval_expr(src))

    print(vm_instr.strip())

    # remove callback
    del dse.instrumentation[NEXT_ADDR]

    return True

import pdb
hooks={4223168L: 'sub_4070C0', 4221632L: 'strlen', 4214472L: 'time', 4213740L: '_exit', 4203744L: 'sub_4024E0', 4223600L: 'socket', 4223240L: 'connect', 4213784L: 'close', 4223484L: 'send'}
def hook(dse):
    global hooks

    # ['RDI', 'RSI', 'RDX', 'RCX', 'R8', 'R9']
    # main, argc, argv, ...
    pdb.set_trace()

    #
    #dse.update_state_from_concrete(mem=True)

    pc=dse.jitter.cpu.EIP
    print("pc: 0x%x"%pc)

    regs = dse.ir_arch.arch.regs
    top_stack = dse.eval_expr(regs.RSP)
    '''main_addr = dse.eval_expr(regs.RDI)
    argc = dse.eval_expr(regs.RSI)
    argv = dse.eval_expr(regs.RDX)
    '''

    # skip current instruction
    #dse.jitter.cpu.EIP=pc+5
    #ret_addr = ExprInt(pc+5, regs.RIP.size)
    ret_addr = ExprInt(dse.jitter.get_stack_arg(0), regs.RIP.size)
    print ret_addr
    print regs.RIP
    ret_val= ExprId("SF_%x_%s" % (pc, hooks[pc]), regs.RAX.size)
    dse.update_state({
        regs.RSP: dse.symb.eval_expr(regs.RSP + ExprInt(8, regs.RSP.size)),
        dse.ir_arch.IRDst: ret_addr,
        regs.RIP: ret_addr,
        regs.RAX: ret_val
    })

parser = Sandbox_Linux_x86_64.parser("Solver for vbot attack function")
parser.add_argument("filename", help="Challenge filename")
options = parser.parse_args()

from miasm.core.locationdb import LocationDB
from miasm.jitter.csts import PAGE_READ, PAGE_WRITE

machine = Machine("x86_64")
loc_db = LocationDB()
sb = Sandbox_Linux_x86_64(loc_db, options.filename, options, globals())

# Init segment
sb.jitter.ir_arch.do_stk_segm = True
sb.jitter.ir_arch.do_ds_segm = True
sb.jitter.ir_arch.do_str_segm = True
sb.jitter.ir_arch.do_all_segm = True
FS_0_ADDR = 0x7ff70000
sb.jitter.cpu.FS = 0x4
sb.jitter.cpu.set_segm_base(sb.jitter.cpu.FS, FS_0_ADDR)
sb.jitter.vm.add_memory_page(
    FS_0_ADDR + 0x28,
    PAGE_READ,
    b"\x42\x42\x42\x42\x42\x42\x42\x42",
    "Stack canary FS[0x28]"
)

# Prepare the execution
entry_p=0x402150
sb.jitter.init_run(entry_p)

strategy = DSEPathConstraint.PRODUCE_SOLUTION_CODE_COV
# Other possibilities:
# strategy = DSEPathConstraint.PRODUCE_SOLUTION_BRANCH_COV,
# strategy = DSEPathConstraint.PRODUCE_SOLUTION_PATH_COV
dse = DSEPathConstraint(sb.machine, loc_db, produce_solution=strategy)
dse.add_lib_handler(sb.libs, globals())
dse.attach(sb.jitter)

FN= 0x40216F
for ea in hooks:
    dse.add_instrumentation(ea, hook)
sb.run(entry_p)