Joystream / joystream

Joystream Monorepo
http://www.joystream.org
GNU General Public License v3.0
1.42k stars 115 forks source link

Lost object investigation tool #5006

Open yasiryagi opened 10 months ago

yasiryagi commented 10 months ago

Lost Objects:

Description:

A tool to identify and locate lost objects.

collect: Find all objects and bags that :

Compare:

compare local objects to QN objects. compare Remote objects to QN objects. Compare two buckets objects (QN) Check: Check for missing local objects by doing http head to:

Other bucket assigned to the object's bag All other buckets Download: download missing object if head check is positive 

Input:

0- Function: collect (local,remote, QN), compare , checks, download. 1- Bucket 2- Local directory

Output:

Type: json

collect: [{ Object: {bag}} ,...] Compare: [{ Object: {bag}},...] checks: [{ Object: {bag, bucket: {url}}},...] downlead: [{ Object: {bag, bucket: {url, status}}},...]

yasiryagi commented 10 months ago

Here is what @kdembler implemented so far https://github.com/joyutils/swg-compare-files

yasiryagi commented 10 months ago

Here is the check files code provide by @bwhm


const fs = require('node:fs/promises')

const LOCAL_FILES_PATH = './local.json'
const REMOTE_FILES_PATH = './remote.json'
const getRemoteBagPath = (bagId) => `./remote-${bagId}.json`
const DIFF_PATH = './diff.json'
const getDiffBagPath = (bagId) => `./diff-${bagId}.json`
const CHECK_PATH = `./checked.json`

function sortFiles(files) {
  return files.slice().sort((a, b) => a.localeCompare(b, 'en', { numeric: true }))
}

const STORAGE_BAGS_QUERY = `
query GetStorageBucketBags($storageBucket: ID!, $limit: Int!, $offset: Int!) {
  storageBags(
    where: { storageBuckets_some: { id_eq: $storageBucket } }
    orderBy: createdAt_ASC
    limit: $limit
    offset: $offset
  ) {
    id
  }
}
`

const STORAGE_BAGS_OBJECTS_QUERY = `
query GetStorageBagsObjects($storageBags: [ID!]!, $limit: Int!, $offset: Int!) {
  storageBags(
    where: { id_in: $storageBags }
    orderBy: createdAt_ASC
    limit: $limit
    offset: $offset
  ) {
    id
    objects {
      id
      isAccepted
    }
  }
}
`

const BUCKETS_ASSIGNED_STORAGE_BAGS = `
query GetStorageBags($storageBags: [ID!]!, $limit: Int!, $offset: Int!) {
  storageBags(
    where: { id_in: $storageBags }
    orderBy: createdAt_ASC
    limit: $limit
    offset: $offset
  ) {
    id
    storageBuckets {
      id
    }
  }
}
`

const ACTIVE_BUCKET_METADATA = `
query GetActiveStorageBucketEndpoints($storageBuckets: [ID!]!, $limit: Int!, $offset: Int!) {
  storageBuckets(
    where: { id_in: $storageBuckets }
    orderBy: createdAt_ASC
    limit: $limit
    offset: $offset
  ) {
    id
    operatorStatus {
      __typename
    }
    operatorMetadata {
      nodeEndpoint
    }
  }
}
`

async function headRequestAsset(baseUrl,objectId) {

  const url = `${baseUrl}/${objectId}`

  let code = 404
  try {
    const response = await fetch(url, {
      method: 'HEAD',
      headers: { 'Content-Type': 'application/json' }
    })

    if (response.status) {
      code = response.status
    } else {
      return undefined
    }
    return code
  } catch (err) {
    console.warn(`Request for ${objectId} to ${baseUrl} failed with ${err}`)
    return undefined
  }
}

async function fetchPaginatedData(query, variables, pageSize) {
  let hasMoreData = true;
  let offset = 0;
  let data = []
  const key = Object.keys(variables)[0]

  while (hasMoreData) {
    const response = await fetch('https://query.joystream.org/graphql', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        query: query,
        variables: { ...variables, limit: pageSize, offset: offset }
      }),
    })

    if (!response.ok) {
      const error = await response.text()
      console.log(error)
      throw new Error(`Error fetching data: ${response.statusText}`)
    }

    const jsonResponse = await response.json()

    data = data.concat(jsonResponse.data[key])
    hasMoreData = jsonResponse.data[key].length === pageSize;

    offset += pageSize;
  }

  return data
}

async function getAllBucketObjects(bucketId, bagId) {
  console.log('Getting bags...')
  const allBucketBags = await fetchPaginatedData(STORAGE_BAGS_QUERY, { storageBucket: bucketId }, 3000)
  const bucketBags = bagId != null ? allBucketBags.filter(bag => bag.id.includes(bagId)) : allBucketBags
  console.log(`Found ${bucketBags.length} bags`)
  const bucketBagsIds = bucketBags.map(bag => bag.id)

  console.log('Getting objects...')
  const bagObjectsMap = {}
  const BATCH_SIZE = 1000
  for (let i = 0; i < bucketBagsIds.length; i += BATCH_SIZE) {
    const bucketBagsWithObjects = await fetchPaginatedData(STORAGE_BAGS_OBJECTS_QUERY, { storageBags: bucketBagsIds.slice(i, i + BATCH_SIZE) }, BATCH_SIZE)
    bucketBagsWithObjects.forEach(bag => {
      const acceptedObjects = bag.objects.filter(object => object.isAccepted)
      if (acceptedObjects.length !== 0) {
        bagObjectsMap[bag.id] = sortFiles(acceptedObjects.map(object => object.id))
      }
    })
  }
  const totalObjectsCount = Object.values(bagObjectsMap).flat().length
  console.log(`Found ${totalObjectsCount} accepted objects`)
  await fs.writeFile(bagId != null ? getRemoteBagPath(bagId) : REMOTE_FILES_PATH, JSON.stringify(bagObjectsMap))
}

async function getLocalFiles(path) {
  console.log('Getting files...')
  const allFiles = await fs.readdir(path)
  const files = allFiles.filter(file => !isNaN(parseInt(file)))
  console.log(`Found ${files.length} files`)
  const sortedFiles = sortFiles(files)
  await fs.writeFile(LOCAL_FILES_PATH, JSON.stringify(sortedFiles))
}

async function getDifferences(bagId) {
  const localFiles = JSON.parse(await fs.readFile(LOCAL_FILES_PATH))
  const remoteFiles = JSON.parse(await fs.readFile(bagId != null ? getRemoteBagPath(bagId) : REMOTE_FILES_PATH))

  const localFilesSet = new Set(localFiles)
  const allRemoteFilesSet = new Set(Object.values(remoteFiles).flat())

  const unexpectedLocal = new Set([...localFilesSet].filter(id => !allRemoteFilesSet.has(id)))
  console.log(`Unexpected local files: ${JSON.stringify([...unexpectedLocal])}`)

  const missingObjectsPerBag = {}
  Object.entries(remoteFiles).forEach(([bagId, objects]) => {
    const missingObjects = objects.filter(id => !localFilesSet.has(id))
    if (missingObjects.length !== 0) {
      console.log(`Bag ${bagId} missing ${missingObjects.length} objects: ${JSON.stringify(missingObjects)}`)
      missingObjectsPerBag[bagId] = missingObjects
    }
  })

  const missingObjects = new Set(Object.values(missingObjectsPerBag).flat())

  console.log(`Missing ${missingObjects.size} objects`)
  console.log(`Found ${unexpectedLocal.size} unexpected local objects`)

  await fs.writeFile(bagId != null ? getDiffBagPath(bagId) : DIFF_PATH, JSON.stringify({ unexpectedLocal: [...unexpectedLocal], missingObjectsPerBag: missingObjectsPerBag }))
}

async function getMissing(diffFilePath, ignoreProviders) {
  console.log(`Checking missing objects from ${diffFilePath}...`)
  const missingObjects = JSON.parse(await fs.readFile(diffFilePath)).missingObjectsPerBag

  const bagsWithMissingObjects = Object.keys(missingObjects)

  const BATCH_SIZE = 2
  const storageProvidersToCheck = []
  const bucketsForBags = {}
  for (let i = 0; i < bagsWithMissingObjects.length; i += BATCH_SIZE) {
    const storageProvidersAssigned = await fetchPaginatedData(BUCKETS_ASSIGNED_STORAGE_BAGS, { storageBags: bagsWithMissingObjects.slice(i, i + BATCH_SIZE) }, BATCH_SIZE, 'storageBags')

    for (let bag of storageProvidersAssigned) {
      const sps = []
      for (let sp of bag.storageBuckets) {

        if (!(ignoreProviders.includes(sp.id))) {
          sps.push(sp.id)
          if (!(storageProvidersToCheck.includes(sp.id))) {
            storageProvidersToCheck.push(sp.id)
          }
        }
      }
      bucketsForBags[bag.id] = sps
    }
  }

  storageProvidersToCheck.sort((a,b) => parseInt(a) - parseInt(b))
  const storageProvidersHoldingMissingBags = await fetchPaginatedData(ACTIVE_BUCKET_METADATA, { storageBuckets: storageProvidersToCheck }, 1000, 'storageBuckets')
  storageProvidersHoldingMissingBags.sort((a,b) => parseInt(a.id) - parseInt(b.id))
  const operatingProviders = {}
  for (let sp of storageProvidersHoldingMissingBags) {
    if (sp.operatorStatus.__typename === "StorageBucketOperatorStatusActive") {
      const endpoint = sp.operatorMetadata.nodeEndpoint
      if (endpoint && endpoint.length) {
        endpoint.toString().endsWith('/')
        if (endpoint.toString().endsWith('/')) {
          operatingProviders[sp.id] = `${sp.operatorMetadata.nodeEndpoint}api/v1/files`
        } else {
          operatingProviders[sp.id] = `${sp.operatorMetadata.nodeEndpoint}/api/v1/files`
        }
      }
    }
  }

  const results = []
  let foundCount = 0
  let triedCount = 0
  for (let bagId of bagsWithMissingObjects) {
    const assignedSps = bucketsForBags[bagId]
    const missingObjectsInBag = missingObjects[bagId]
    const bagResult = {
      storageBag: bagId,
      operatorsChecked: assignedSps,
      dataObjects: missingObjectsInBag,
      granularResults: [],
      foundObjects: 0
    }
    triedCount+=missingObjectsInBag.length
    for (let dataObjectId of missingObjectsInBag) {
      const objectResult = []
      let found = false
      for (let sp of assignedSps) {
        const providerUrl = operatingProviders[sp]
        const res = await headRequestAsset(providerUrl,dataObjectId)
        if (res) {
          objectResult.push({
            bucketId: sp,
            result: res
          })
          if (res == 200) {
            found = true
            console.log(`Object ID ${dataObjectId} in bag ${bagId} is available in bucket ${sp} at ${providerUrl}/${dataObjectId}.`)
            break;
          }
        } else {
          objectResult.push({
            bucketId: sp,
            result: 'not reached'
          })
        }
      }
      bagResult.granularResults.push(objectResult)
      if (found) {
        bagResult.foundObjects ++
        foundCount++
      }
    }
    results.push(bagResult)
  }
  console.log(`A total of ${foundCount} out of ${triedCount} objects presumed lost were found.`)
  await fs.writeFile(CHECK_PATH, JSON.stringify(results,null,2))
}

async function manualHeadRequest(url,objectId) {
  try {
    const res = await headRequestAsset(url,objectId)
    console.log("Res",res)
  } catch (err) {
    console.log("err",err)
  }
}

const command = process.argv[2]
const arg = process.argv[3]
const arg2 = process.argv[4]

if (command === 'localFiles') {
  if (!arg) {
    console.log('Please provide a path')
    process.exit(1)
  }
  getLocalFiles(arg)
} else if (command === 'bucketObjects') {
  if (!arg || isNaN(parseInt(arg))) {
    console.log('Please provide a bucket id')
    process.exit(1)
  }
  if (arg2 && isNaN(parseInt(arg2))) {
    console.log('If you want to get only a single bag, provide only the number from the id, dynamic:channel:XXX')
    process.exit(1)
  }
  getAllBucketObjects(arg, arg2)
} else if (command === 'diff') {
  if (arg && isNaN(parseInt(arg))) {
    console.log('If you want to diff only a single bag, provide only the number from the id, dynamic:channel:XXX')
    process.exit(1)
  }
  getDifferences(arg)
} else if (command === 'checkMissing') {
  let path = DIFF_PATH
  let providerInputs = undefined
  const ignoreProviders = []
  if (arg) {
    if (arg.includes('.json')) {
      path = arg
    } else {
      providerInputs = arg
    }
  }
  if (arg2) {
    providerInputs = arg2
  }
  if (providerInputs) {
    try {
      providerInputs.split(',').forEach((sp) => ignoreProviders.push(parseInt(sp)))
    } catch (err) {
      console.log(`Invalid input for providers, use format 1 or 0,1,4. Err: ${err}`)
      process.exit(1)
    }
  }
  getMissing(path,ignoreProviders)
} else if (command === 'head') {
  if (arg && arg2) {
    manualHeadRequest(arg,arg2)
  } else if (!arg2) {
    const providerUrl = `${arg.split('files/')[0]}files`
    const objectId = arg.split('files/')[1]
    manualHeadRequest(providerUrl,objectId)
  } else {
    console.log('Input must be head <URL/api/v1/files> <objectId>')
    process.exit(1)
  }
} else {
  console.log('Unknown command')
  process.exit(1)
}