husky-team / husky

A more expressive and most importantly, more efficient system for distributed data analytics.
http://www.husky-project.com/
Other
98 stars 55 forks source link

Problem with reading a file with HDFS #313

Closed eneskaya closed 4 years ago

eneskaya commented 4 years ago

I tried running the PageRank example on a very simple input file, which I stored in HDFS (Which was very difficult, to begin with, because getting the libhdfs3 library to work was a nightmare). But now, when I run ./PageRank -C configfile --input hdfs:///graphs/simple_graph.txt the pagerank job just doe's nothing during the husky::load method, it just waits.

This is the code:

// Copyright 2016 Husky Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string>
#include <vector>

#include "boost/tokenizer.hpp"

#include "core/engine.hpp"
#include "io/input/inputformat_store.hpp"
#include "io/input/nfs_binary_inputformat.hpp"
#include "io/input/hdfs_binary_inputformat.hpp"

class Vertex {
   public:
    using KeyT = int;

    Vertex() : pr(0.15) {}
    explicit Vertex(const KeyT& id) : vertexId(id), pr(0.15) {}
    const KeyT& id() const { return vertexId; }

    // Serialization and deserialization
    friend husky::BinStream& operator<<(husky::BinStream& stream, const Vertex& u) {
        stream << u.vertexId << u.adj << u.pr;
        return stream;
    }
    friend husky::BinStream& operator>>(husky::BinStream& stream, Vertex& u) {
        stream >> u.vertexId >> u.adj >> u.pr;
        return stream;
    }

    int vertexId;
    std::vector<int> adj;
    float pr;
};

void pagerank() {
    auto& infmt = husky::io::InputFormatStore::create_line_inputformat();
    infmt.set_input(husky::Context::get_param("input"));

    // Create and globalize vertex objects
    auto& vertex_list = husky::ObjListStore::create_objlist<Vertex>();
    husky::LOG_I << "Created vertexlist";

    husky::LOG_I << "loading";
    husky::load(infmt, [&vertex_list](auto& chunk) {
        if (chunk.size() == 0)
            return;
        boost::char_separator<char> sep(" \t");
        boost::tokenizer<boost::char_separator<char>> tok(chunk, sep);
        boost::tokenizer<boost::char_separator<char>>::iterator it = tok.begin();

        int id = stoi(*it++);

        // it++; // Skip num_neighbors, otherwise comment out
        Vertex v(id);

        while (it != tok.end()) {
            v.adj.push_back(stoi(*it++));
        }
        vertex_list.add_object(std::move(v));
    });

    husky::LOG_I << "globalizing";
    husky::globalize(vertex_list);

    // Iterative PageRank computation
    auto& prch =
        husky::ChannelStore::create_push_combined_channel<float, husky::SumCombiner<float>>(vertex_list, vertex_list);
    int numIters = stoi(husky::Context::get_param("iters"));
    for (int iter = 0; iter < numIters; ++iter) {
        husky::list_execute(vertex_list, [&prch, iter](Vertex& u) {
            if (iter > 0)
                u.pr = 0.85 * prch.get(u) + 0.15;

            if (u.adj.size() == 0)
                return;
            float sendPR = u.pr / u.adj.size();
            for (auto& nb : u.adj) {
                prch.push(sendPR, nb);
            }
        });
    }
}

int main(int argc, char** argv) {
    husky::LOG_I << "STARTING";
    std::vector<std::string> args;
    args.push_back("hdfs_namenode");
    args.push_back("hdfs_namenode_port");
    args.push_back("input");
    args.push_back("iters");
    if (husky::init_with_args(argc, argv, args)) {
        husky::run_job(pagerank);
        return 0;
    }

    return 1;
}

I run it as follows:

  1. In release folder I run make PageRank which compiles a PageRank executable
  2. I enter: ./PageRank -C ../examples/pr.cfg --hdfs_namenode localhost --hdfs_namenode_port 9000 --input hdfs:///graphs/simple_graph.txt --iters 20
  3. Output:
WARNING: Logging before InitGoogleLogging() is written to STDERR
[INFO 2019-08-07 15:52:05.129667 28843 pagerank.cpp:98] STARTING
[INFO 2019-08-07 15:52:05.139071 28849 pagerank.cpp:54] Created vertexlist
[INFO 2019-08-07 15:52:05.139737 28849 pagerank.cpp:56] loading

The simple_graph.txt looks like this:

1        1
2        1
2        3
...

...and so on, each line representing an edge.

What am I missing here? Any help is appreciated! @kygx-legend @ddmbr

kygx-legend commented 4 years ago

PageRank is an application. You should run Master alongside first.

eneskaya commented 4 years ago

Oh boy, alright! Misunderstood everything then 🙈