HelixNetwork / pendulum

Pendulum is a distributed messaging protocol that enables globally available tamper proof timestamps :hourglass_flowing_sand:
https://dev.hlx.ai
Other
10 stars 6 forks source link

ZMQ not publishing value transactions on expected topic #130

Closed stefan-precup closed 5 years ago

stefan-precup commented 5 years ago

Expected Behavior The ZMQ should publish the bundle on the topic on the address to which value is sent. Instead, it publishes it on another topic, another address that is part of the bundle.

Current Behavior When a transaction is sent, the bundle is sent to another topic than the one to which the transaction was sent to.

Steps to Reproduce Run this script

import {ETH_GAS_FEES, SERVICE_FEES, VAULT_ADDRESS} from "../constants";
// @ts-ignore
let composeAPI = require("@helixnetwork/core").composeAPI;

const helix = composeAPI({
    provider: "https://hlxtest.net:8087"
    // provider: "http://192.168.110.33:8085"
});

// helix.getNodeInfo()
//     .then(info => console.log(info))
//     .catch(err => {})

// must be truly random
const seed = "c65be465e437443584ace45b69500c96ae7f8fd684d58e27387a8f9c1873e889";

// Array of transfers which defines transfer recipients and value transferred in helixs.
const transfers = [{
    // address: "9370887f02b41fa9051531ffc8027126cbc68d3971952d0e865eb5b5fe6d2201",
    // address: VAULT_ADDRESS,
    address: '0000000000000000000000000000000000000000000000000000000000000000',
    message: asciiToTxHex(`{ "address": "0xc0ffee254729296a45a3885639AC7E10F9d54979",
                "amount": 2
            }`), // optional message in hexString
    // message: "",
    // "qHash":"000055bd0701f977c37de8de65aa6b79b297109f833853bbe320cc08100c4089"
    tag: "", // optional tag in hexString
    // value: SERVICE_FEES + ETH_GAS_FEES, // 1Kh
    value: 1
}];

// Depth or how far to go for tip selection entry point
const depth = 3;

// Difficulty of Proof-of-Work required to attach transaction to tangle.
// Minimum value on testnet is currently 2.
const minWeightMagnitude = 2;
    helix.prepareTransfers(seed, transfers, {security: 2})
        .then((txs: any) => helix.sendTransactionStrings(txs, depth, minWeightMagnitude))
        .then((bundle: any) => {
            console.log(`Published transaction; with tail hash: ${bundle[0].hash}`);
            console.log(`Bundle: ${JSON.stringify(bundle)}`);
        })
        .catch((err: any) => {
            // catch any errors
            console.log(err);
        });

@dnck has the script that listens for all subjects.

I listen only on the topic of ORACLE_VaultAddress. Here's my code snippet.

var zmq = require('zeromq')
    , sock = zmq.socket('sub');
var provider = 'tcp://zmq.hlxtest.net:5556';
sock.connect(provider);
sock.subscribe(`ORACLE_${'0000000000000000000000000000000000000000000000000000000000000000'}`);
console.log('Subscriber connected to: ' + provider);
sock.on('message', function(message) {
    console.log(message.toString());
});

Context I'm expecting the zmq bundle to be published on the ORACLE_VaultAddress topic .

Version: Operating System: Linux Failure Logs: none

dnck commented 5 years ago

Hi @stefan-precup

Subscription to only the "ORACLE_0000000000000000000000000000000000000000000000000000000000000000" topic will not result in capturing value transactions in the client.

This is because the hexidecimal string after ORACLE_ is simply a received transaction address, and these received transaction addresses may or may not correspond to value transactions.

Below, you can find my zmq subscription client that listens for all topics.

When it encounters a topic that starts with "ORACLE", it parses the received string and returns a json object that looks something like this,

{
    "0": {
        "bundle_hash": "0821b90e70d3c92e786a911b4d1b9a5f95d25a3530acdca55bb3798b1e183457",
        "bundle_index": 0,
        "signature": "...",
        "tx_hash": "008cae947aaf3fcb1a6a9f8641ab369feb871dc3cd5faf7d020b3b85fce6d240"
    },
    "1": {
        "bundle_hash": "0821b90e70d3c92e786a911b4d1b9a5f95d25a3530acdca55bb3798b1e183457",
        "bundle_index": 1,
        "signature": "...",
        "tx_hash": "000bd543405834588983cdf19681f5a3714fe69e650f021403706a52298f9e1a"
    },
    "2": {
        "bundle_hash": "0821b90e70d3c92e786a911b4d1b9a5f95d25a3530acdca55bb3798b1e183457",
        "bundle_index": 2,
        "signature": "...",
        "tx_hash": "00e4d2cf3b496b0d5444eae70ef5d71e2e0bd4d857a2c4644ba8deab94c237d5"
    },
    "address": "0000000000000000000000000000000000000000000000000000000000000000"
},

In the above, the keys, "1", "2", "3" correspond to the bundle_index of the item in the collection, while the "address" key is the receivedTransactionViewModel.getAddressHash().toString() in these lines of the Helix-1.0 project here. In other words, the value of the "address" key is a received transaction's address.

# -*- coding:utf-8 -*-
"""
"""
import argparse
import json
import os
import regex
import zmq

def subscribe_to_zmq_topics(host, port):
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://{}:{}".format(host, port))
    # subscribe to all topics with an empty string
    socket.setsockopt_string(zmq.SUBSCRIBE, '')
    value_tx = (None, None)
    while True:
        string = socket.recv_string()
        data = {string.split(' ')[0]: string.split(' ')[1:]}
        # tx_hash topic from Java MsgQPrvImpl.publishTx
        if 'tx_hash' in data.keys():
            data = convert_txhash_topic(txhash_template, data)
        # oracle topic from Java Node.processReceivedData
        if list(data.keys())[0].startswith('ORACLE'):
            data = convert_oracle_topic(data)
        # tx topic from where in Java idk??
        match = regex.match(txhash_pattern, list(data.keys())[0])
        if match:
            data = json.loads(data[match.string][0])
            data.update({'address': match.string})
        # all other topics are already formatted pretty
        write_dict_to_json('./results/test.json', data)
        # TODO: label the topics during input.

txhash_pattern = regex.compile(r"\b[a-f0-9]{64}")

txhash_template = {
    'hash': None,
    'address1': None,
    'msg': None,
    'address2': None,
    'value': None,
    'bundleNonceHash': None,
    'timestamp': None,
    'currentIndex': None,
    'lastIndex': None,
    'bundleHash': None,
    'trunk': None,
    'branch': None,
    'arrivalTime': None,
    'tagValue': None
}

def convert_txhash_topic(txhash_template, response):
    response = [i.split() for i in response['tx_hash']]
    txhash_template['hash'] = response[0][0]
    txhash_template['address1'] = response[1][0]
    txhash_template['msg'] = response[2]
    txhash_template['address2'] = response[3][0]
    txhash_template['value'] = response[4][0]
    txhash_template['timestamp'] = response[5][0]
    txhash_template['timestamp'] = response[6][0]
    txhash_template['currentIndex'] = response[7][0]
    txhash_template['lastIndex'] = response[8][0]
    txhash_template['bundleHash'] = response[9][0]
    txhash_template['trunk'] = response[10][0]
    txhash_template['branch'] = response[11][0]
    txhash_template['arrivalTime'] = response[12][0]
    txhash_template['tagValue'] = response[13][0]
    return txhash_template

def convert_oracle_topic(d):
    temp = {'address': None}
    for k,v in d.items():
        temp['address'] = k.split('ORACLE_')[1]
        for item in json.loads(v[0]):
            temp.update({str(item['bundle_index']): item})
    return temp

def write_dict_to_json(filename, data):
    """Write an in-memory Python dictionary to a formatted .json file."""
    filename = os.path.normpath(filename)
    with open(filename, "a") as file_obj:
        json.dump(data, file_obj, indent=4, sort_keys=True)
        file_obj.write(',\n')

def mkdir_if_not_exists(dirname):
    if not os.path.isdir(dirname):
        os.mkdir(dirname)

if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='Subscribe to a stream of data'
    )
    parser.add_argument(
        '--host', metavar='host',
        type=str,
        default='zmq.hlxtest.net',
        help='IP of host publisher'
    )
    parser.add_argument(
        '--port',
        metavar='port',
        type=str,
        default='5556',
        help='Port of the host publisher'
    )
    parser.add_argument(
        '--logs_dir',
        metavar='logs_dir',
        type=str,
        default='./results',
        help='Directory to write the json file to'
    )
    args = parser.parse_args()

    mkdir_if_not_exists(args.logs_dir)

    subscribe_to_zmq_topics(args.host, args.port)
stefan-precup commented 5 years ago

Hey @dnck , I got the chance to look over the issue with @cristina-vasiu .

We identified the issue to be here.

The reason why I was always receiving the 0 value transactions was that for a 0 value transaction, there was only 1 transaction made. For a value transaction, there will be several value transactions. They are not processed in a predefined order. This explains why sometimes I was receiving my transactions on ORACLE_0....00 while other times they were published on the topic ORACLE_TheHashOfWatheverTransactionIsBeingProcessed.

@cristina-vasiu thinks that it should either publish for each output address or just for the first transaction that contains the address for which the value is sent to.

image

Looking at the bundle types we can see that TX0 is the one that contains the address of the recipient. We can use that transaction to publish on the desired topic on zmq