cclgroupltd / ccl_chromium_reader

(Sometimes partial) Python re-implementations of the technologies involved in reading various data sources in Chrome-esque applications.
MIT License
134 stars 34 forks source link

Retrieve the ldb file of a record #25

Closed lxndrblz closed 2 months ago

lxndrblz commented 2 months ago

Hi Alex,

Is there a clean and easy way to retrieve the .ldb/.log file of a certain record?

I currently have a code snippet that looks like this:

    extracted_values = []

    for db_info in wrapper.database_ids:
        # Skip databases without a valid dbid_no
        if db_info.dbid_no is None:
            continue

        db = wrapper[db_info.dbid_no]

        for obj_store_name in db.object_store_names:
            # Skip empty object stores
            if obj_store_name is None:
                continue
            if obj_store_name in TEAMS_DB_OBJECT_STORES or do_not_filter is False:
                obj_store = db[obj_store_name]
                records_per_object_store = 0
                for record in obj_store.iterate_records():
                    records_per_object_store += 1
                    extracted_values.append({
                        "key": record.key.raw_key,
                        "value": record.value,
                        "store": obj_store_name,
                    })
                print(
                    f"{obj_store_name} {db.name} (Records: {records_per_object_store})"
                )

In addition to the key, value and store, I'd also like to to retrieve the name of the file, where the record was found (i.e. 000114.ldb). I had tried with record.database_origin, but this only gives me something else.

In my own fork before, I had implemented a custom iterate records function, but I was wondering if there is an easier way to implement it?

    def iterate_records(self, do_not_filter=False):

        blink_deserializer = ccl_blink_value_deserializer.BlinkV8Deserializer()
        # Loop through the databases and object stores based on their ids
        for global_id in self.global_metadata.db_ids:
            # print(f"Processing database: {global_id.name}")
            if None == global_id.dbid_no:
                print(f"WARNING: Skipping database {global_id.name}")
                continue

            for object_store_id in range(1, self.database_metadata.get_meta(global_id.dbid_no,
                                                                            DatabaseMetadataType.MaximumObjectStoreId) + 1):

                datastore = self.object_store_meta.get_meta(global_id.dbid_no, object_store_id,
                                                            ObjectStoreMetadataType.StoreName)

                # print(f"\t Processing object store: {datastore}")
                records_per_object_store = 0
                if datastore in TEAMS_DB_OBJECT_STORES or do_not_filter:
                    prefix = bytes([0, global_id.dbid_no, object_store_id, 1])
                    for record in self._fetched_records:
                        if record.key.startswith(prefix):
                            records_per_object_store += 1
                            # Skip records with empty values as these cant properly decoded
                            if record.value == b'':
                                continue
                            value_version, varint_raw = ccl_chromium_indexeddb.custom_le_varint_from_bytes(record.value)
                            val_idx = len(varint_raw)
                            # read the blink envelope
                            blink_type_tag = record.value[val_idx]
                            if blink_type_tag != 0xff:
                                print("Blink type tag not present")
                            val_idx += 1

                            blink_version, varint_raw = ccl_chromium_indexeddb.custom_le_varint_from_bytes(
                                record.value[val_idx:])

                            val_idx += len(varint_raw)

                            # read the raw value of the record.
                            obj_raw = io.BytesIO(record.value[val_idx:])
                            try:
                                # Initialize deserializer and try deserialization.
                                deserializer = ccl_v8_value_deserializer.Deserializer(
                                    obj_raw, host_object_delegate=blink_deserializer.read)
                                value = deserializer.read()
                                yield {'key': record.key, 'value': value, 'origin_file': record.origin_file,
                                       'store': datastore, 'state': record.state, 'seq': record.seq}
                            except Exception as e:
                                # TODO Some proper error handling wouldn't hurt
                                continue

Thanks for your support.

lxndrblz commented 2 months ago

Based on my initial research the main difference lies here: https://github.com/cclgroupltd/ccl_chromium_reader/blob/90007b29853bc7eab0f27615afd0047af2dc5108/ccl_chromium_reader/ccl_chromium_indexeddb.py#L623C17-L625C64

While I am looking for the record.origin_file (it looks something like forensicsim-data/jane_doe_v_1_4_00_11161/IndexedDB/https_teams.microsoft.com_0.indexeddb.leveldb/000118.ldb), the external_path currently pulls its data from the blink envelope, which at least in my case is most often None.

This work around seems to work for me:

def custom_iterate_records(self, db_id: int, store_id: int, *,
            live_only=False, bad_deserializer_data_handler: typing.Callable[[ccl_chromium_indexeddb.IdbKey, bytes], typing.Any] = None):
    blink_deserializer = ccl_chromium_indexeddb.ccl_blink_value_deserializer.BlinkV8Deserializer()
    # goodness me this is a slow way of doing things
    prefix = ccl_chromium_indexeddb.IndexedDb.make_prefix(db_id, store_id, 1)

    for record in self._fetched_records:
        if record.key.startswith(prefix):
            key = ccl_chromium_indexeddb.IdbKey(record.key[len(prefix):])
            if not record.value:
                # empty values will obviously fail, returning None is probably better than dying.
                yield ccl_chromium_indexeddb.IndexedDbRecord(self, db_id, store_id, key, None,
                                        record.state == ccl_chromium_indexeddb.ccl_leveldb.KeyState.Live, record.seq)
                continue
            value_version, varint_raw = ccl_chromium_indexeddb._le_varint_from_bytes(record.value)
            val_idx = len(varint_raw)
            # read the blink envelope
            precursor = self.read_record_precursor(
                key, db_id, store_id, record.value[val_idx:], bad_deserializer_data_handler)
            if precursor is None:
                continue  # only returns None on error, handled in the function if bad_deserializer_data_handler can

            blink_version, obj_raw, trailer, external_path = precursor

            try:
                deserializer = ccl_chromium_indexeddb.ccl_v8_value_deserializer.Deserializer(
                    obj_raw, host_object_delegate=blink_deserializer.read)
                value = deserializer.read()
            except Exception:
                if bad_deserializer_data_handler is not None:
                    bad_deserializer_data_handler(key, record.value)
                    continue
                raise

            # PATCH record.origin_file to external value path
            yield ccl_chromium_indexeddb.IndexedDbRecord(self, db_id, store_id, key, value,
                                    record.state == ccl_chromium_indexeddb.ccl_leveldb.KeyState.Live,
                                    record.seq, record.origin_file)

# Overwrite the iterate records method
ccl_chromium_indexeddb.IndexedDb.iterate_records = custom_iterate_records

Nevertheless I am curious if there is an easier way to retrieve the origin_file of a record, if the wrapped IndexedDB is used.

cclgroupltd commented 2 months ago

Hi Alexander,

There wasn't; there should have been; there is now. It was easy for me to add and it should have already been in there.

I've just pushed an update (bc6f1309ce21899bccda06ae6c0f8624ef4707d2) so now you can just do:

import sys
import pathlib

from ccl_chromium_reader import ChromiumProfileFolder

input_path = pathlib.Path(sys.argv[1])

with ChromiumProfileFolder(input_path) as profile:
    for rec in profile.iter_indexeddb_records("https_drive.google.com_0"):
        print(rec.origin_file)

(that's using the new profile folder interface, but the record objects are the same thing if you go from a WrappedIndexDB).

Does that do what you're after?

lxndrblz commented 2 months ago

@cclgroupltd Thanks for the quick addition!