onekey-sec / unblob

Extract files from any kind of container formats
https://unblob.org
Other
2.21k stars 80 forks source link

API assistance? #878

Open AndrewFasano opened 5 months ago

AndrewFasano commented 5 months ago

I'm trying to use the unblob API for what I think should be a fairly straightforward task but I'm having some difficulties and hoping to get some help. I haven't found many examples of API usage so I'm hoping this issue might also help other users get started with the API from the code I have and learn from my mistakes.

My goal here is to use unblob to do a recursive extraction of a blob but to produce a clean copy of each extraction without any sub-extractions (i.e., have no _extract files within any of my output directories). Instead I want each of the extractions to be stored one directory deep within an output directory (e.g., output/extraction1, output/extraction2). I've previously implemented something like this by just running unblob then parsing the generated outputs looking for files named *_extract but I think it should be much cleaner to do this with the API.

I've written the code below which successfully logs a lot of information about the extraction process and almost gets me what I want, but I find that extraction files sometimes still end up in my output so I suspect I'm doing something wrong or missing something obvious here.

I have 3 specific questions, but any advice or guidance would be much appreciated! Thanks

  1. Am I missing a much easier way to do this with the API?
  2. Are blob_id values supposed to be unique per blob? It seems like the same blob_id will show up with distinct paths for example if a blob is carved into 2 files, both the base blob and the 2 generated files will have the same blob_id. Am I just misunderstanding this interface?
  3. Are there examples of API usage somewhere?

Example usage after installing unblob dependencies, unblob itself, and saving the below script as extracator.py

wget https://dlcdnets.asus.com/pub/ASUS/wireless/RT-N66U_C1/FW_RT_N66U_C1_300438510000.zip
python3 extractor.py FW_RT_N66U_C1_300438510000.zip output

In the generated output directory I see one of the extracted directories contains two _extract directories

$ ls output/extracted/*
output/extracted/00dd3c22e77b9382941bf19afb8370b1897535fb.unblob:
Firmware_Release

output/extracted/31c56af333e9f4652626f6e0e10418e27dd1af33.unblob:
part0  part0_extract  part1  part1_extract

output/extracted/a0f5794c735f16dd4a7b12042ef23e1d95dcb134.unblob:
bin  cifs1  cifs2  dev  etc  home  jffs  lib  media  mmc  mnt  opt  proc  rom  root  sbin  sys  sysroot  tmp  usr  var  www

output/extracted/e008abdad83a71b718083b949b91dafaf071b3c5.unblob:
lzma.uncompressed

It's almost right, but the part0_extract and part1_extract directories within output/extracted/31c56af333e9f4652626f6e0e10418e27dd1af33.unblob shouldn't be there!

import os
import sys
import multiprocessing
import shutil
import hashlib
import ipdb
import subprocess
from pathlib import Path
from typing import Dict, List
from unblob.processing import ExtractionConfig, process_file
from unblob.logging import configure_logger
from unblob import report

def sha1sum_file(filename):
    sha1  = hashlib.sha1() # XXX: if minimum python version >= 3.9, pass usedforsecurity=False
    buf  = bytearray(128*1024)
    bufview = memoryview(buf)
    with open(filename, 'rb', buffering=0) as f:
        for n in iter(lambda : f.readinto(bufview), 0):
            sha1.update(bufview[:n])
    return sha1.hexdigest()

def extract(filesystem: Path, outdir: Path):
    '''
    Given a filesystem blob, extract it into outdir. Return a dictionary
    mapping a unique ID for each extracted blob to (ID of source file, source file, extracted directory)
    Note that one ID may be an empty string (e.g., for the input file).
    '''
    configure_logger(0, outdir, Path('unblob.log'))
    unblob_results = process_file(
                                    ExtractionConfig(extract_root=outdir / 'unblob.root',
                                                    entropy_depth=0,
                                                    max_depth=3,
                                                    verbose=False,
                                                    keep_extracted_chunks=True),
                                    filesystem)

    known_tasks = {} # task_id -> [task_file, subtask.path, [files_in_subtask_path]]

    for task_result in unblob_results.results:
        task_file = task_result.task.path
        task_id = task_result.task.blob_id

        for subtask in task_result.subtasks:
            if subtask.blob_id not in known_tasks:
                # XXX: We'll see the same subtask.blob_id for each time we extract more data from a blob. E.g., we could have
                # blob_id=1 for foo.zip which has a subtask.path of foo.zip_extracted. Then we'll see blob_id=1 for foo.zip_extracted/nextfile
                # because the nextfile is derived from foo.zip.
                known_tasks[subtask.blob_id] = [task_file, subtask.path, [subtask.path]]
                #print(f"New blob_id in subtask with ID={subtask.blob_id}\n\tExtracted from {task_file} (ID={task_id})\n\tResults at {subtask.path}")
            else:
                known_tasks[subtask.blob_id][2].append(subtask.path)
                #print(f"Duplicate blob_id with ID={subtask.blob_id}\n\tExtracted from {task_file} (ID={task_id})\n\tResults at {subtask.path}")

    return known_tasks

def package(known_tasks, output_dir):
    '''
    We want to create a clean set of directories in outdir.
    For each extraction we want to create
        outdir/[sha1sum_of_blob].[unblob]/
    '''
    # Create results directories. output/blobs/<sha1sum> and output/extracted/<sha1sum>.<extractor>
    blob_dir = output_dir / 'blobs'
    blob_dir.mkdir()

    extracted_dir = output_dir / 'extracted'
    extracted_dir.mkdir()

    # Copy each blob to the blob directory
    for task_id, info in known_tasks.items():
        input_hash = sha1sum_file(info[0])
        shutil.copy(info[0], blob_dir / input_hash)

    # Copy each extraction to the extracted directory
    # Identify all extraction directories that were created (so we don't copy later)
    extraction_dirs = set()
    for task_id, info in known_tasks.items():
        extraction_dirs.add(info[1])

    for task_id, info in known_tasks.items():
        input_hash = sha1sum_file(info[0])
        out_dir = extracted_dir / f"{input_hash}.unblob"
        # We want to copy everything from info[1] to out_dir but exclude any directories in extraction_dirs
        # First copy everything - we'll use CP over shutil to handle weird files
        subprocess.check_output(['cp', '-r', info[1], out_dir])

        # Now for each file in extraction_dirs (except info[1]), delete
        for extraction_dir in extraction_dirs:
            if extraction_dir == info[1]:
                continue
            # Rewrite path to be relative to out_dir
            try:
                relative_path = Path(extraction_dir).relative_to(info[1])
            except ValueError:
                # Not a subpath, skip
                continue
            # Ensure output is not-empty and within the output directory
            if not len(str(out_dir / relative_path)) or not str(out_dir / relative_path).startswith(str(out_dir)):
                print("XXX skipping rm of unsafe path", out_dir / relative_path)
                continue
            subprocess.check_output(['rm', '-rf', str(out_dir / relative_path)])

def extract_and_package(firmware, output_dir):

    # Initial extraction
    known_tasks = extract(firmware, output_dir)

    # Log extraction results
    for task_id, info in known_tasks.items():
        print(f"\nID {task_id}:")
        print(f"\tPath: {info[0]} {sha1sum_file(info[0])}")
        print(f"\tExtraction directory {info[1]}")
        print(f"\tExtracted children:")
        for subtask_path in info[2]:
            print(f"\t  - {subtask_path}")

    package(known_tasks, output_dir)

if __name__ == '__main__':
    firmware = Path(sys.argv[1])
    if not firmware.exists():
        print(f"File {firmware} does not exist")
        os.exit(1)

    output_dir = Path(sys.argv[2])
    if output_dir.exists():
        print(f"Output directory {output_dir} already exists. Removing it.")
        shutil.rmtree(output_dir)

    output_dir.mkdir(parents=True)

    extract_and_package(firmware, output_dir)
qkaiser commented 5 months ago

We'll have a look at it this week. Thanks for bringing this up, it's a good opportunity to improve our documentation.

qkaiser commented 5 months ago
  1. Am I missing a much easier way to do this with the API ?

Issue is on us, not you. part0 has chunk id 14692:1 but it does not show up in the JSON report structure because it's made of two different chunks with chunk ids 14693:1 and 14693:2. The report is therefore missing a TaskResult holding a task with path /tmp/out/FW_RT_N66U_C1_300438510000.zip_extract/Firmware_Release/RT-N66U_C1_3.0.0.4_385_10000-gd8ccd3c.trx_extract/part0_extract. Since it's not there, it's not in known_tasks and your package() function is blind.

This issue is documented at https://github.com/onekey-sec/unblob/issues/554, I'll see if we can prioritize it.

On another note, one improvement you can make is to use the HashReport returned by unblob to get the file SHA1 instead of recomputing it. unblob computes MD5, SHA1, and SHA256 of processed files.

Something along those lines. I leave the hash_report usage to you.

--- extractor.py    2024-06-17 09:10:51.035069581 +0200
+++ extractor.py    2024-06-17 08:57:03.218147812 +0200
@@ -8,6 +8,7 @@
 from pathlib import Path
 from typing import Dict, List
 from unblob.processing import ExtractionConfig, process_file
+from unblob.report import HashReport
 from unblob.logging import configure_logger
 from unblob import report

@@ -40,7 +41,7 @@
     for task_result in unblob_results.results:
         task_file = task_result.task.path
         task_id = task_result.task.blob_id
-
+       hash_report = [report for report in task_result.reports if isinstance(report, HashReport)]
         for subtask in task_result.subtasks:
             if subtask.blob_id not in known_tasks:
                 # XXX: We'll see the same subtask.blob_id for each time we extract more data from a blob. E.g., we could have
  1. Are blob_id values supposed to be unique per blob? It seems like the same blob_id will show up with distinct paths for example if a blob is carved into 2 files, both the base blob and the 2 generated files will have the same blob_id. Am I just misunderstanding this interface ?

You can think of blob_id as a kind of parent_id, indicating to which blob a chunk belongs to. You can use the id field from ChunkReport to have a unique identifier for files that are extracted. With the file you're extracting, you get a ZIP file with chunk id 14688:1 that contains a trx file with id 14691:1 and blob_id 14688:1. The trx file contains part0 holding two chunks with id 14693:1 and 14693:2.

  1. Are there examples of API usage somewhere ?

Not at the moment. We have some auto-generated documentation at https://unblob.org/api/ but it's clearly insufficient.

AndrewFasano commented 5 months ago

Thanks so much! Really appreciate the guidance.