orchestracities / anubis

Anubis: a flexible policy enforcement solution for NGSI APIs (and beyond!)
https://anubis-pep.readthedocs.org
Apache License 2.0
7 stars 5 forks source link

check potential concurrency issues when retrieving policies on subscription #153

Open github-actions[bot] opened 1 year ago

github-actions[bot] commented 1 year ago

https://github.com/orchestracities/anubis/blob/a4127fe14f5fe9ee9a508d58142c3b35bc47e23e/policy-governance-middleware/src/main.js#L344


import express from "express"
import bp from "body-parser"
import { createLibp2p } from 'libp2p'
import { TCP } from '@libp2p/tcp'
import { WebSockets } from '@libp2p/websockets'
import { Mplex } from '@libp2p/mplex'
import { Noise } from '@chainsafe/libp2p-noise'
import { CID } from 'multiformats/cid'
import { KadDHT } from '@libp2p/kad-dht'
import all from 'it-all'
import delay from 'delay'
import { Bootstrap } from '@libp2p/bootstrap'
import * as json from 'multiformats/codecs/json'
import { sha256 } from 'multiformats/hashes/sha2'
import { FloodSub } from '@libp2p/floodsub'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import { MulticastDNS } from '@libp2p/mdns'
import axios from 'axios'
import fs from 'fs'
import { Multiaddr } from "@multiformats/multiaddr";
import dns from "dns/promises";
import cors from 'cors';

// Configuration for the port used by this node
const server_port = process.env.SERVER_PORT || 8099
// Uri of the Anubis API connected to this middleware
const anubis_api_uri = process.env.ANUBIS_API_URI || "127.0.0.1:8085"
// The multiaddress format address this middleware listens on
const listen_address = process.env.LISTEN_ADDRESS || '/dnsaddr/localhost/tcp/49662'
// Is this a private organisation? (Private org won't share policies that aren't of a specific user only)
const is_private_org = process.env.IS_PRIVATE_ORG || "true"

// Convert DNS address to IP address (solves Docker issues)
var listen_ma = new Multiaddr(listen_address)
var options = listen_ma.toOptions()
if(listen_address.includes("dnsaddr") && options.host != 'localhost') {
  const lookup = await dns.lookup(options.host)
  listen_ma = new Multiaddr(listen_ma.toString().replace(options.host, lookup.address).replace("dnsaddr", "ip4"))
}

// Setting up Node app
var app = express()
app.use(cors())
app.use(bp.json())
app.use(bp.urlencoded({ extended: true }))

// Keeping track of resources being provided
var providedResources = []

// Setting up the Libp2p node
const node = await createLibp2p({
  addresses: {
    listen: [listen_ma]
  },
  transports: [new TCP(), new WebSockets()],
  streamMuxers: [new Mplex()],
  connectionEncryption: [new Noise()],
  dht: new KadDHT(),
  peerDiscovery: [
    new MulticastDNS({
      interval: 20e3
    })
  ],
  connectionManager: {
    autoDial: true
  },
  pubsub: new FloodSub(),
  relay: {
    enabled: true,
    hop: {
      enabled: true
    },
    advertise: {
      enabled: true,
    }
  }
})

// Endpoint to retrieve node metadata
app.get('/metadata', async(req, res) => {
  res.json({"policy_api_uri": anubis_api_uri})
})

// Endpoint for receiving resources from the mobile app
app.post('/resource/mobile/retrieve', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  // TODO: Email of the user
  var responseData = [
      {
          "usrMail": "test@gmail.com",
          "usrData": [
              {
                  "id": resource,
                  "description": "test",
                  "children": []
              }
          ]
      }
  ]

  const bytes = json.encode({ resource: resource })
  const hash = await sha256.digest(bytes)
  const cid = CID.create(1, json.code, hash)

  var providers = []
  try {
    providers = await all(node.contentRouting.findProviders(cid, { timeout: 3000 }))
  }
  catch(error) {
    res.end(`No providers for ${resource}`)
    return
  }
  for (const provider of providers) {
    var providerPolicyApi = null
    await axios({
      method: 'get',
      url: `http://${provider.multiaddrs[0].nodeAddress().address}:8098/metadata`
    })
    .then(async function (response) {
      providerPolicyApi = response.data["policy_api_uri"]
    })
    .catch(function (error) {
      console.log(`Can't retrieve policy API URL for provider ${provider.multiaddrs[0].nodeAddress().address}`)
    })
    if(!providerPolicyApi) {
      continue
    }
    await axios({
      method: 'get',
      url: `http://${providerPolicyApi}/v1/policies`,
      headers: {
        'fiware-Service': service,
        'fiware-Servicepath': servicepath
      },
      params: {
        'resource': resource
      }
    })
    .then(async function (response) {
      for (const policy_entry of response.data) {
        if(is_private_org != "true") {
          var filtered_agents = policy_entry.agent.filter(a => !a.includes("acl:agent:"))
          if(filtered_agents.length > 0) {
            continue
          }
        }
        responseData[0].usrData[0].children.push({"id": policy_entry["id"], "actorType": policy_entry["agent"], "mode": policy_entry["mode"]})
      }
    })
    .catch(function (error) {
      console.log(error.response.data)
    })
  }
  res.json({responseData})
})

// Endpoint for providing a resource from the mobile app
app.post('/resource/mobile/send', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { policies, service, servicepath } = req.body
  if (!policies) {
   res.status(400).json({
     message: "Ensure you sent a policies field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }
  for(const entry of policies) {
    const usrMail = entry.usrMail
    for(const resource of entry.usrData) {
      const resId = resource.id
      for(const policy of resource.children) {
        var modes = []
        for(const m of policy.mode.split(",")) {
          modes.push(m)
        }
        modes = modes.filter(e => e != '')
        var new_policy = {
            "id": policy.id,
            "access_to": resId,
            "resource_type": "mobile",
            "mode": modes,
            "agent": [
                policy.actorType
            ]
        }
        var message = {
          "action": "send_mobile",
          "policy": new_policy,
          "service": service,
          "servicepath": servicepath,
        }
        message = JSON.stringify(message)
        await node.pubsub.publish(resId, uint8ArrayFromString(message)).catch(err => {
          console.error(err)
          res.end(`Error: ${err}`)
        })
      }
    }
  }
  res.json({})
})

// Endpoint for providing a resource
app.post('/resource/provide', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }

  const bytes = json.encode({ resource: resource })
  const hash = await sha256.digest(bytes)
  const cid = CID.create(1, json.code, hash)
  await node.contentRouting.provide(cid)
  providedResources.push(resource)

  console.log(`Provided policy for resource ${resource}`)
  res.end(`Provided policy for resource ${resource}`)
})

// Endpoint for subscribing to a resource topic
app.post('/resource/subscribe', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, policy, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  const topics = await node.pubsub.getTopics()
  if (!topics.includes(resource)) {
    await node.pubsub.subscribe(resource)
    console.log(`Subscribed to ${resource}`)
  }

  const bytes = json.encode({ resource: resource })
  const hash = await sha256.digest(bytes)
  const cid = CID.create(1, json.code, hash)

  var providers = []
  try {
    providers = await all(node.contentRouting.findProviders(cid, { timeout: 3000 }))
  }
  catch(error) {
    res.end(`Subscribed to ${resource}, no other providers found`)
    return
  }
  console.log(`Syncing with other providers for ${resource}...`)
  for (const provider of providers) {
    var providerPolicyApi = null
    await axios({
      method: 'get',
      url: `http://${provider.multiaddrs[0].nodeAddress().address}:8098/metadata`
    })
    .then(async function (response) {
      providerPolicyApi = response.data["policy_api_uri"]
    })
    .catch(function (error) {
      console.log(`Can't retrieve policy API URL for provider ${provider.multiaddrs[0].nodeAddress().address}`)
    })
    if(!providerPolicyApi) {
      continue
    }
    await axios({
      method: 'post',
      url: `http://${anubis_api_uri}/v1/tenants/`,
      data: {"name": service}
    })
    .then(async function (response) {
      console.log(`Created Tenant ${service}`)
    })
    .catch(function (error) {
      console.log(`No new Tenant created`)
    })
    await axios({
      method: 'get',
      url: `http://${providerPolicyApi}/v1/policies`,
      headers: {
        'fiware-Service': service,
        'fiware-Servicepath': servicepath
      },
      params: {
        'resource': resource
      }
    })
    .then(async function (response) {
      for (const policy_entry of response.data) {
        // TODO: Concurrency
        if(is_private_org != "true") {
          var filtered_agents = policy_entry.agent.filter(a => !a.includes("acl:agent:"))
          if(filtered_agents.length > 0) {
            continue
          }
        }
        await axios({
          method: 'post',
          url: `http://${anubis_api_uri}/v1/policies`,
          headers: {
            'fiware-Service': service,
            'fiware-Servicepath': servicepath
          },
          data: policy_entry
        })
        .then(function (r) {
          console.log(r.data)
        })
        .catch(function (err) {
          console.log(err.response.data)
        })
      }
    })
    .catch(function (error) {
      console.log(error.response.data)
    })
  }
  res.end(`Subscribed to ${resource}`)
})

// Endpoint when a new policy is created
app.post('/resource/policy/new', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, policy, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!policy) {
   res.status(400).json({
     message: "Ensure you sent a policy field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  var message = {
    "action": "post",
    "policy_id": policy,
    "service": service,
    "servicepath": servicepath,
  }
  message = JSON.stringify(message)
  await node.pubsub.publish(resource, uint8ArrayFromString(message)).catch(err => {
    console.error(err)
    res.end(`Error: ${err}`)
  })

  res.end("Policy message sent: " + message)
  console.log("Policy message sent: " + message)
})

// Endpoint when a policy is updated
app.post('/resource/policy/update', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, policy, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!policy) {
   res.status(400).json({
     message: "Ensure you sent a policy field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  var message = {
    "action": "put",
    "policy_id": policy,
    "service": service,
    "servicepath": servicepath,
  }
  message = JSON.stringify(message)
  await node.pubsub.publish(resource, uint8ArrayFromString(message)).catch(err => {
    console.error(err)
    res.end(`Error: ${err}`)
  })

  res.end("Policy message sent: " + message)
  console.log("Policy message sent: " + message)
})

// Endpoint when a policy is deleted
app.post('/resource/policy/delete', async(req, res) => {
  if (!Object.keys(req.body).length) {
   return res.status(400).json({
     message: "Request body cannot be empty",
   })
  }
  var { resource, policy, service, servicepath } = req.body
  if (!resource) {
   res.status(400).json({
     message: "Ensure you sent a resource field",
   })
  }
  if (!policy) {
   res.status(400).json({
     message: "Ensure you sent a policy field",
   })
  }
  if (!service) {
   res.status(400).json({
     message: "Ensure you sent a service field",
   })
  }
  if (!servicepath) {
   res.status(400).json({
     message: "Ensure you sent a servicepath field",
   })
  }

  var message = {
    "action": "delete",
    "policy_id": policy,
    "service": service,
    "servicepath": servicepath,
  }
  message = JSON.stringify(message)
  await node.pubsub.publish(resource, uint8ArrayFromString(message)).catch(err => {
    console.error(err)
    res.end(`Error: ${err}`)
  })

  res.end("Policy message sent: " + message)
  console.log("Policy message sent: " + message)
})

// Function to process a message arriving on a topic (resource)
async function processTopicMessage(evt) {
  const sender = await node.peerStore.addressBook.get(evt.detail.from)
  const message = JSON.parse(uint8ArrayToString(evt.detail.data))
  console.log(`Node received: ${uint8ArrayToString(evt.detail.data)} on topic ${evt.detail.topic}`)
  if(message.action == "send_mobile") {
    await axios({
      method: 'post',
      url: `http://${anubis_api_uri}/v1/tenants/`,
      data: {"name": message.service}
    })
    .then(async function (response) {
      console.log(`Created Tenant ${message.service}`)
    })
    .catch(function (error) {
      console.log(error)
    })
    await axios({
      method: 'post',
      url: `http://${anubis_api_uri}/v1/policies`,
      headers: {
        'fiware-Service': message.service,
        'fiware-Servicepath': message.servicepath
      },
      data: message.policy
    })
    .then(function (r) {
      console.log(r)
    })
    .catch(function (err) {
      console.log(err.response.data)
    })
    return
  }
  var providerPolicyApi = null
  await axios({
    method: 'get',
    url: `http://${sender[0].multiaddr.nodeAddress().address}:8098/metadata`
  })
  .then(async function (response) {
    providerPolicyApi = response.data["policy_api_uri"]
  })
  .catch(function (error) {
    console.log(error)
  })
  if(message.action == "delete") {
    await axios({
      method: 'delete',
      url: `http://${anubis_api_uri}/v1/policies/${message.policy_id}`,
      headers: {
        'fiware-Service': message.service,
        'fiware-Servicepath': message.servicepath
      },
      data: response.data
    })
    .then(function (r) {
      console.log(r)
    })
    .catch(function (err) {
      console.log(err)
    })
    return
  }
  await axios({
    method: 'get',
    url: `http://${providerPolicyApi}/v1/policies/${message.policy_id}`,
    headers: {
      'fiware-Service': message.service,
      'fiware-Servicepath': message.servicepath
    }
  })
  .then(async function (response) {
    if(message.action == "post") {
      await axios({
        method: 'post',
        url: `http://${anubis_api_uri}/v1/tenants/`,
        data: {"name": message.service}
      })
      .then(async function (response) {
        console.log(`Created Tenant ${resource}`)
      })
      .catch(function (error) {
        console.log(error)
      })
      await axios({
        method: 'post',
        url: `http://${anubis_api_uri}/v1/policies`,
        headers: {
          'fiware-Service': message.service,
          'fiware-Servicepath': message.servicepath
        },
        data: response.data
      })
      .then(function (r) {
        console.log(r)
      })
      .catch(function (err) {
        console.log(err.response.data)
      })
    }
    else if(message.action == "put") {
      await axios({
        method: 'put',
        url: `http://${anubis_api_uri}/v1/policies/${message.policy_id}`,
        headers: {
          'fiware-Service': message.service,
          'fiware-Servicepath': message.servicepath
        },
        data: response.data
      })
      .then(function (r) {
        console.log(r)
      })
      .catch(function (err) {
        console.log(err)
      })
    }
  })
  .catch(function (error) {
    console.log(error)
  })
}

// Saving config to file
async function saveConfiguration() {
  var persistentDataFileStream = fs.createWriteStream('data.json')
  const topics = await node.pubsub.getTopics()
  let data2 = JSON.stringify({"topics": topics, "resources": providedResources})
  persistentDataFileStream.write(data2)
  persistentDataFileStream.close()
}

// Starting server
var server = app.listen(server_port, async() => {

  await node.start()

  try {
    let rawdata = fs.readFileSync('data.json')
    let data = JSON.parse(rawdata)

    for(const resource of data.resources) {
      providedResources.push(resource)
      const bytes = json.encode({ resource: resource })
      const hash = await sha256.digest(bytes)
      const cid = CID.create(1, json.code, hash)
      try {
        await node.contentRouting.provide(cid)
      }
      catch(err) {
        console.log(`Failed to initially provide ${resource}`)
      }
    }

    for(const topic of data.topics) {
      node.pubsub.subscribe(topic)
    }

    await saveConfiguration()
  }
  catch(err) {
    console.log("Couldn't read any initial config")
  }

  console.log("Node started with:")
  node.getMultiaddrs().forEach((ma) => console.log(`${ma.toString()}`))

  node.connectionManager.addEventListener('peer:connect', (evt) => {
    const connection = evt.detail
    console.log('Connection established to:', connection.remotePeer.toString())
  })

  node.addEventListener('peer:discovery', async(evt) => {
    const peer = evt.detail
    if (node.peerId.toString() == peer.id.toString()) {
      return
    }
    var peerId = node.peerStore.addressBook.get(peer.id)
    if (!peerId) {
      console.log('Discovered:', peer.id.toString())
      node.peerStore.addressBook.set(peer.id, peer.multiaddrs)
      node.dial(peer.id)
    }
  })

  await node.pubsub.addEventListener("message", (evt) => processTopicMessage(evt))

  await delay(1000)

  console.log(node.peerId.toString())

  var host = server.address().address
  var port = server.address().port
  console.log("App listening at http://%s:%s", host, port)
})