Open xinali opened 4 years ago
在遇到大量crash结果时,分析crash结果,归类crash结果,有时会耗费大量的时间,对于我来说,每天上班前,运行一下该脚本,有新类型的crash就分析一下,提交report,节省了我大量的精力和时间
#encoding:utf-8 # ==================================================== # python: 3.5+ # 处理fuzz crash脚本 # 执行: python3 handle-crash.py -h 查看帮助选项 # example: # python3.exe handle-crash.py -f D:\FuzzProgram.exe --post_fix dwrite -i D:\fuzz_output # version: 0.5.20211228 # ==================================================== import os import glob import lief import json import time import signal import shutil import logging import datetime import argparse import subprocess from functools import partial from multiprocessing import Pool import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart options = {} debug_mode = False # 调试器位置 debugger_x64_origin = "C:\\Program Files (x86)\\Windows Kits\\10\Debuggers\\x64\\cdb.exe" debugger_x64_move = "D:\\Windows Kits\\10\\Debuggers\\x64\\cdb.exe" debugger_x86_origin= "C:\\Program Files (x86)\\Windows Kits\\10\\Debuggers\\x86\\cdb.exe" debugger_x86_move = "D:\\Windows Kits\\10\\Debuggers\\x86\\cdb.exe" debugger_x64 = '' debugger_x86 = '' result_dir = 'result_jsons' def prepare_debugger(): global debugger_x86, debugger_x64 if os.path.exists(debugger_x64_origin): debugger_x64 = debugger_x64_origin else: debugger_x64 = debugger_x64_move if os.path.exists(debugger_x86_origin): debugger_x86 = debugger_x86_origin else: debugger_x86 = debugger_x86_move def sendmail(content=''): sender = 'xx' passwd = 'xx' mail_host = 'xx' receiver = 'xx' message = MIMEMultipart() message['Subject'] = "[Crash Handle Finish]" message['From'] = sender message.attach(MIMEText(content)) try: s = smtplib.SMTP_SSL(mail_host, 465) # s.set_debuglevel(2) s.login(sender, passwd) s.sendmail(sender, receiver, message.as_string()) s.quit() except smtplib.SMTPException as e: pass pass class Logger(object): def __init__(self): """ initial """ # log_path = logPath logging.addLevelName(20, "INFO:") logging.addLevelName(30, "WARNING:") logging.addLevelName(40, "FATAL:") logging.addLevelName(50, "FATAL:") logging.basicConfig(level=logging.DEBUG, # format="%(levelname)s %(asctime)s %(filename)s %(message)s", format="%(levelname)s %(asctime)s %(message)s", datefmt="%m-%d %H:%M:%S", filename='HandleCrash.log', filemode="a") console = logging.StreamHandler() console.setLevel(logging.DEBUG) # formatter = logging.Formatter("%(levelname)s %(asctime)s %(filename)s %(message)s") formatter = logging.Formatter("%(levelname)s %(asctime)s %(message)s") console.setFormatter(formatter) logging.getLogger("").addHandler(console) def debug(self, msg=""): """ output DEBUG level LOG """ logging.debug(str(msg)) def info(self, msg=""): """ output INFO level LOG """ logging.info(str(msg)) def warning(self, msg=""): """ output WARN level LOG """ logging.warning(str(msg)) def exception(self, msg=""): """ output Exception stack LOG """ logging.exception(str(msg)) def error(self, msg=""): """ output ERROR level LOG """ logging.error(str(msg)) def critical(self, msg=""): """ output FATAL level LOG """ logging.critical(str(msg)) logger = Logger() def init_fuzzer(): """ Pool worker initializer for keyboard interrupt on Windows """ signal.signal(signal.SIGINT, signal.SIG_IGN) # 根据调用栈最后两个函数判定类型 (Windows) def get_crash_type_win32(stdout): # 获取到crash type decode_error = False stdout_lines = [] # 是否需要解码 if type(stdout) == str: stdout_lines = stdout.split('\n') else: # 对应数据为bytes # 第一次尝试使用utf-8解码 try: stdout_lines = stdout.decode().split('\n') except UnicodeDecodeError: decode_error = True except Exception as ex: logger.error(ex) return None # 第二次尝试使用ISO-8859-1解码 if decode_error: try: stdout_lines = stdout.decode(encoding='ISO-8859-1').split('\n') except Exception as ex: logger.error(ex) return None i = -1 get_RetAddr = False for crash_data in stdout_lines: i += 1 if "RetAddr" in crash_data: get_RetAddr = True continue # 处理第一个调用栈就出错的特殊情况 # 0:000> k # ChildEBP RetAddr # WARNING: Frame IP not in any known module. Following frames may be wrong. # 00 0019d3f8 65245480 0x8f3600c # 01 0019d964 65238ad8 ECompositeViewer!DllCanUnloadNow+0x66890 if "WARNING" in crash_data and get_RetAddr: i += 1 break if "WARNING" not in crash_data and get_RetAddr: break if i == len(stdout_lines): return None crash_type = None # 得出最后两个调用栈作为划分类型 if len(stdout_lines) > i and len(stdout_lines[i].split()) >= 3: crash_type = stdout_lines[i].split()[2] if len(stdout_lines) > i+1 and len(stdout_lines[i+1].split()) >= 3: crash_type += ' -> ' + stdout_lines[i+1].split()[2] return crash_type # 比较新产生的漏洞类型,将老的删除 def compare_crash_types(crash_types, output_root_dir): global options # 将以前存储的崩溃类型json文件打开 result_json_file = os.path.join(result_dir, options.post_fix + '.json') result_json = None total_crash_types = {} repeat_files = [] old_crash_types = [] # 判断路径是否存在 if not os.path.exists(result_dir): os.mkdir(result_dir) if os.path.exists(result_json_file): result_json = json.load(open(result_json_file, )) old_crash_types = result_json['crash_types'] # 删除已经存在的crash_type for crash_type in old_crash_types: if crash_type in crash_types: # 将重复的数据统一处理 repeat_files.extend(crash_types[crash_type]) # crash_types.remove(crash_type) del crash_types[crash_type] # 求两次的并集 total_crash_types['crash_types'] = list(set(old_crash_types).union(set(crash_types))) # 将重复的文件统一移动到repeated目录 repeat_dir = os.path.join(output_root_dir, "repeated") if not os.path.exists(repeat_dir) and len(repeat_files) > 0: os.mkdir(repeat_dir) for repeat_file in repeat_files: if not os.path.exists(repeat_file): continue if os.path.isdir(repeat_file): continue base_name = os.path.basename(repeat_file) output_file = os.path.join(repeat_dir, base_name) shutil.move(repeat_file, output_file) # 把新旧数据重新写入json with open(result_json_file, 'w') as fp: json.dump(total_crash_types, fp) # 将crash_types写入文件 def handle_crash_types(crash_types): global options output_root_dir = options.input_directory uniq_crash_type_file = 'uniq_crash_{}_{}.log'.format( (datetime.datetime.now()).strftime("%Y-%m-%d"), options.post_fix) compare_crash_types(crash_types, output_root_dir) fp = open(uniq_crash_type_file, 'w') for crash_type in crash_types: if not crash_type: continue fp.write(crash_type + ':\n') # 创建各种漏洞类型目录 output_dir = crash_type.replace(" -> ", "__") output_dir = output_dir.replace("<", "(") output_dir = output_dir.replace(">", ")") output_dir = output_dir.replace("::", "#") output_dir = os.path.join(output_root_dir, output_dir) # 判断目标目录是否存在 if not os.path.exists(output_dir): os.mkdir(output_dir) for crash_file in crash_types[crash_type]: if not os.path.exists(crash_file): continue if os.path.isdir(crash_file): continue base_name = os.path.basename(crash_file) output_file = os.path.join(output_dir, base_name) shutil.move(crash_file, output_file) fp.write(" [+] {}\n".format(output_file)) fp.close() # windows多进程处理 def crash_on_windows(input_file, options, debugger): input_file = input_file.strip() # 不处理目录 if os.path.isdir(input_file): return (None, None) try: # 将crash文件数据写入特定文件 specific_file = None if options.specific_file: file_name, file_ext = os.path.splitext(options.specific_file) specific_file = '{}_{}{}'.format(file_name, str(os.getpid()), file_ext) with open(input_file, 'rb') as f: fp_specific = open(specific_file, 'wb') crash_file_data = f.read() fp_specific.write(crash_file_data) fp_specific.close() else: specific_file = input_file except Exception as ex: logger.error("bypassing file " + input_file) return (None, None, None) if debugger == None: logger.error("No debugger found!") exit(1) process_out = None if options.fuzz_args: command = [debugger, # "-y", # "srv*c:\symbols*https://msdl.microsoft.com/download/symbols", # "srv*c:\symbols", "-c", "g;g;g;kp 10;q", options.fuzz_program, options.fuzz_args, specific_file] else: command = [debugger, "-y", "srv*c:\symbols*https://msdl.microsoft.com/download/symbols", # "srv*c:\symbols", "-c", "g;kp 10;q", options.fuzz_program, specific_file] if debug_mode: logger.info(' '.join(command)) try: wait_time = int(options.wait_time) process_out = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=wait_time) except subprocess.TimeoutExpired: return ('timeout', input_file) except Exception as ex: logger.exception("subprocess run with file: " + input_file) crash_type = None if process_out != None: crash_type = get_crash_type_win32(process_out.stdout) return (crash_type, input_file) def main(): prepare_debugger() if options.fuzz_args == None: # input "<space> -r" if dash in arguments options.fuzz_args = '' # 获取所有输入文件 files = [] files.extend(glob.glob(os.path.join(options.input_directory, '*'), recursive=True)) if len(files) == 0: logger.error("No input files") exit(1) # 确定测试程序位数 debugger = None pe = lief.parse(options.fuzz_program) pe_bits = pe.header.machine.name if pe_bits == "I386": debugger = debugger_x86 else: debugger = debugger_x64 # 开始测试 index = 1 crash_types = {} thread_nums = int(options.threads) pool = Pool(thread_nums, init_fuzzer) logger.info('start review crash...') for result in pool.imap_unordered(partial(crash_on_windows, options=options, debugger=debugger), files): crash_type, input_file = result if crash_type == None or \ input_file == None: continue file_basename = os.path.basename(input_file) dst_file = os.path.join(options.input_directory, file_basename) result_info = '{} ({}) ### {}'.format(index, crash_type, dst_file) logger.info(result_info) if crash_type not in crash_types.keys(): crash_types[crash_type] = [input_file] else: crash_types[crash_type].append(input_file) index += 1 time.sleep(2) # 写结果 logger.info('write data to file...') handle_crash_types(crash_types) uniq_types_count = len(crash_types.keys()) sendmail("type1: {}: {}".format(options.post_fix, uniq_types_count)) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-f", "--fuzz_program", help="fuzz program", required=True) parser.add_argument("-a", "--fuzz_args", help="fuzz program arguments") parser.add_argument("-i", "--input_directory", help="input files directory", required=True) parser.add_argument("-t", "--threads", help="threads num", required=True) parser.add_argument("-w", "--wait_time", help="windbg debug wait time", required=True) parser.add_argument("--post_fix", help="output file post fix", required=True) parser.add_argument("-s", "--specific_file", help="write crash file to a specific file") options = parser.parse_args() main()
result文件就是常规的debugger输出数据,可以作为对uniq结果的参考,文件特别长,uniq文件是最需要关注的,可以及时发现新出现的crash,其中uniq文件结果大概是这样的
result
debugger
uniq
crash
windows输出结果
整个处理流程还在优化中,遇到特殊的crash结果,会及时更新
20200528 添加BugId批量处理
20200528
20200623 添加libfuzzer crash处理
20200623
20200710 添加选项将crash_file写入指定文件进行debug
20200710
20211228 删除linux相关支持,目前仅支持windows,并优化多个崩溃处理逻辑
20211228
linux
windows
在遇到大量crash结果时,分析crash结果,归类crash结果,有时会耗费大量的时间,对于我来说,每天上班前,运行一下该脚本,有新类型的crash就分析一下,提交report,节省了我大量的精力和时间
result
文件就是常规的debugger
输出数据,可以作为对uniq
结果的参考,文件特别长,uniq
文件是最需要关注的,可以及时发现新出现的crash
,其中uniq
文件结果大概是这样的windows输出结果
整个处理流程还在优化中,遇到特殊的crash结果,会及时更新