Tencent / mars

Mars is a cross-platform network component developed by WeChat.
Other
17.31k stars 3.63k forks source link

什么时候可以提供一个python3 实现的解密脚本, 现在解文件的时候一直报错!!! #1315

Open Miles612 opened 1 month ago

Miles612 commented 1 month ago

什么时候可以提供一个python3 实现的解密脚本, 现在解文件的时候一直报错!!!

NaibManco commented 1 month ago

@Miles612 直接用大模型把现有的改造一下就行了,我已经试过了

czeras commented 3 weeks ago

@Miles612 直接用大模型把现有的改造一下就行了,我已经试过了

大佬怎么改造的?自从换了mac,装了最新的系统,解密文件从来没有成功过 .....

NaibManco commented 3 weeks ago

@czeras 你就直接把脚本丢给大模型,让它改造成支持python3的就好了呀,我用chatgpt4.0改的

Miles612 commented 3 weeks ago

@czeras

import sys
import os
import glob
import zlib
import struct
import binascii
import traceback

MAGIC_NO_COMPRESS_START = 0x03
MAGIC_NO_COMPRESS_START1 = 0x06
MAGIC_NO_COMPRESS_NO_CRYPT_START = 0x08
MAGIC_COMPRESS_START = 0x04
MAGIC_COMPRESS_START1 = 0x05
MAGIC_COMPRESS_START2 = 0x07
MAGIC_COMPRESS_NO_CRYPT_START = 0x09

MAGIC_END = 0x00

lastseq = 0

PRIV_KEY = ""
PUB_KEY = ""

def tea_decipher(v, k):
    op = 0xffffffff
    v0, v1 = struct.unpack('=LL', v[0:8])
    k1, k2, k3, k4 = struct.unpack('=LLLL', k[0:16])
    delta = 0x9E3779B9
    s = (delta << 4) & op
    for i in range(16):
        v1 = (v1 - (((v0 << 4) + k3) ^ (v0 + s) ^ ((v0 >> 5) + k4))) & op
        v0 = (v0 - (((v1 << 4) + k1) ^ (v1 + s) ^ ((v1 >> 5) + k2))) & op
        s = (s - delta) & op
    return struct.pack('=LL', v0, v1)

def tea_decrypt(v, k):
    num = len(v) // 8 * 8
    ret = bytearray()
    for i in range(0, num, 8):
        x = tea_decipher(v[i:i+8], k)
        ret += x

    ret += v[num:]
    return ret

def IsGoodLogBuffer(_buffer, _offset, count):
    if _offset == len(_buffer):
        return (True, '')

    magic_start = _buffer[_offset]
    if magic_start in {MAGIC_NO_COMPRESS_START, MAGIC_COMPRESS_START, MAGIC_COMPRESS_START1}:
        crypt_key_len = 4
    elif magic_start in {MAGIC_COMPRESS_START2, MAGIC_NO_COMPRESS_START1, MAGIC_NO_COMPRESS_NO_CRYPT_START, MAGIC_COMPRESS_NO_CRYPT_START}:
        crypt_key_len = 64
    else:
        return (False, f'_buffer[{_offset}]:{_buffer[_offset]} != MAGIC_NUM_START')

    header_len = 1 + 2 + 1 + 1 + 4 + crypt_key_len

    if _offset + header_len + 1 + 1 > len(_buffer):
        return (False, f'offset:{_offset} > len(buffer):{len(_buffer)}')

    length = struct.unpack_from("I", memoryview(_buffer)[_offset+header_len-4-crypt_key_len:])[0]

    if _offset + header_len + length + 1 > len(_buffer):
        return (False, f'log length:{length}, end pos {_offset + header_len + length + 1} > len(buffer):{len(_buffer)}')

    if MAGIC_END != _buffer[_offset + header_len + length]:
        return (False, f'log length:{length}, buffer[{_offset + header_len + length}]:{_buffer[_offset + header_len + length]} != MAGIC_END')

    if count <= 1:
        return (True, '')
    else:
        return IsGoodLogBuffer(_buffer, _offset + header_len + length + 1, count - 1)

def GetLogStartPos(_buffer, _count):
    offset = 0
    while True:
        if offset >= len(_buffer):
            break
        if _buffer[offset] in {MAGIC_NO_COMPRESS_START, MAGIC_NO_COMPRESS_START1, MAGIC_COMPRESS_START, MAGIC_COMPRESS_START1, MAGIC_COMPRESS_START2, MAGIC_COMPRESS_NO_CRYPT_START, MAGIC_NO_COMPRESS_NO_CRYPT_START}:
            if IsGoodLogBuffer(_buffer, offset, _count)[0]:
                return offset
        offset += 1
    return -1

def DecodeBuffer(_buffer, _offset, _outbuffer):
    if _offset >= len(_buffer):
        return -1

    ret = IsGoodLogBuffer(_buffer, _offset, 1)
    if not ret[0]:
        fixpos = GetLogStartPos(_buffer[_offset:], 1)
        if fixpos == -1:
            return -1
        else:
            _outbuffer.extend(f"[F]decode_log_file.py decode error len={fixpos}, result:{ret[1]} \n")
            _offset += fixpos

    magic_start = _buffer[_offset]
    if magic_start in {MAGIC_NO_COMPRESS_START, MAGIC_COMPRESS_START, MAGIC_COMPRESS_START1}:
        crypt_key_len = 4
    elif magic_start in {MAGIC_COMPRESS_START2, MAGIC_NO_COMPRESS_START1, MAGIC_NO_COMPRESS_NO_CRYPT_START, MAGIC_COMPRESS_NO_CRYPT_START}:
        crypt_key_len = 64
    else:
        _outbuffer.extend(f'in DecodeBuffer _buffer[{_offset}]:{magic_start} != MAGIC_NUM_START')
        return -1

    header_len = 1 + 2 + 1 + 1 + 4 + crypt_key_len
    length = struct.unpack_from("I", memoryview(_buffer)[_offset+header_len-4-crypt_key_len:])[0]
    tmpbuffer = bytearray(length)

    seq = struct.unpack_from("H", memoryview(_buffer)[_offset+header_len-4-crypt_key_len-2-2:])[0]
    begin_hour = struct.unpack_from("c", memoryview(_buffer)[_offset+header_len-4-crypt_key_len-1-1:])[0]
    end_hour = struct.unpack_from("c", memoryview(_buffer)[_offset+header_len-4-crypt_key_len-1:])[0]

    global lastseq
    if seq != 0 and seq != 1 and lastseq != 0 and seq != (lastseq + 1):
        _outbuffer.extend(f"[F]decode_log_file.py log seq:{lastseq+1}-{seq-1} is missing\n")

    if seq != 0:
        lastseq = seq

    tmpbuffer[:] = _buffer[_offset+header_len:_offset+header_len+length]

    try:
        decompressor = zlib.decompressobj(-zlib.MAX_WBITS)

        if magic_start == MAGIC_NO_COMPRESS_START1:
            pass
        elif magic_start == MAGIC_COMPRESS_START2:
            from pyelliptic import ECC
            svr = ECC(curve='secp256k1')
            client = ECC(curve='secp256k1')
            client.pubkey_x = memoryview(_buffer)[_offset+header_len-crypt_key_len:_offset+header_len-crypt_key_len//2].tobytes()
            client.pubkey_y = memoryview(_buffer)[_offset+header_len-crypt_key_len//2:_offset+header_len].tobytes()

            svr.privkey = binascii.unhexlify(PRIV_KEY)
            tea_key = svr.get_ecdh_key(client.get_pubkey())

            tmpbuffer = tea_decrypt(tmpbuffer, tea_key)
            tmpbuffer = decompressor.decompress(tmpbuffer)
        elif magic_start in {MAGIC_COMPRESS_START, MAGIC_COMPRESS_NO_CRYPT_START}:
            tmpbuffer = decompressor.decompress(tmpbuffer)
        elif magic_start == MAGIC_COMPRESS_START1:
            decompress_data = bytearray()
            while len(tmpbuffer) > 0:
                single_log_len = struct.unpack_from("H", memoryview(tmpbuffer)[0:2])[0]
                decompress_data.extend(tmpbuffer[2:single_log_len+2])
                tmpbuffer[:] = tmpbuffer[single_log_len+2:]

            tmpbuffer = decompressor.decompress(decompress_data)

    except Exception as e:
        traceback.print_exc()
        _outbuffer.extend(f"[F]decode_log_file.py decompress err, {str(e)}\n")
        return _offset + header_len + length + 1

    _outbuffer.extend(tmpbuffer)

    return _offset + header_len + length + 1

def ParseFile(_file, _outfile):
    with open(_file, "rb") as fp:
        _buffer = bytearray(os.path.getsize(_file))
        fp.readinto(_buffer)

    startpos = GetLogStartPos(_buffer, 2)
    if startpos == -1:
        return

    outbuffer = bytearray()

    while True:
        startpos = DecodeBuffer(_buffer, startpos, outbuffer)
        if startpos == -1:
            break

    if len(outbuffer) == 0:
        return

    with open(_outfile, "wb") as fpout:
        fpout.write(outbuffer)

def main(args):
    global lastseq

    if len(args) == 1:
        if os.path.isdir(args[0]):
            filelist = glob.glob(args[0] + "/*.xlog")
            for filepath in filelist:
                lastseq = 0
                ParseFile(filepath, filepath + ".log")
        else:
            ParseFile(args[0], args[0] + ".log")
    elif len(args) == 2:
        ParseFile(args[0], args[1])
    else:
        filelist = glob.glob("*.xlog")
        for filepath in filelist:
            lastseq = 0
            ParseFile(filepath, filepath + ".log")

if __name__ == "__main__":
    main(sys.argv[1:])