artmoskvin / real-time-recommender

Real-time collaborative filtering recommender system
49 stars 28 forks source link

same ItemID appear as different items #5

Open willli666 opened 5 years ago

willli666 commented 5 years ago

I'm seeing the same ItemID appear as different items. I'm wondering what might lead to this error. For example, this is one response I got from Similar items. As you can see the itemId s113 appears multiple times and each time has a different score for the s029-s003 pair.

{
    "id": "e4d556d8-d2a3-4c67-9735-b69bd67985a9",
    "recommendation": [
        {
            "itemId": "s029",
            "anotherItemId": "s029",
            "similarity": 1
        },
        {
            "itemId": "s029",
            "anotherItemId": "s003",
            "similarity": 0.10238251947232521
        },
        {
            "itemId": "s029",
            "anotherItemId": "s113",
            "similarity": 0.09835019253826487
        },
        {
            "itemId": "s029",
            "anotherItemId": "s113",
            "similarity": 0.09800688512193077
        },
        {
            "itemId": "s029",
            "anotherItemId": "s113",
            "similarity": 0.09790371427275557
        },
artmoskvin commented 5 years ago

Hi @willli666. That's interesting. Did you try to reproduce it?

willli666 commented 5 years ago

@moscowart It's very reproduce-able for me.(happens on every call I tried) Now even if I call /bestsellers, this is what I got

{"id":"08de5b0b-dcf6-4933-b9e4-55e282e2562d","recommendation":[["sku1349",22453],["sku1349",22448],["sku1349",22248],["sku1349",22212],["sku1349",22163],["sku1349",22066],["sku1349",22034],["sku1349",21992],["sku1349",21983],["sku1349",21951],["sku1349",21855],["sku1349",21785],["sku1349",21768],["sku1349",21764],["sku1349",21732],["sku1349",21728],["sku1349",21726],["sku1349",21717],["sku1349",21656],["sku1349",21588],["sku1349",21548],["sku1349",21497],["sku1349",21376],["sku1349",21362],["sku1349",21304],["sku1349",21294],["sku1349",21279],["sku1349",21183],["sku1349",21166],["sku1349",21075],["sku1349",21035],["sku1349",20999],["sku1349",20980],["sku1349",20961],["sku1349",20902],["sku1349",20900],["sku1349",20897],["sku1349",20884],["sku1349",20872],["sku1349",20859],["sku1349",20801],["sku1349",20781],["sku1349",20771],["sku1349",20759],["sku1349",20735],["sku1349",20726],["sku1349",20657],["sku1349",20623],["sku1349",20581],["sku1349",20566],["sku1349",20521],["sku1349",20509],["sku1349",20493],["sku1349",20490],["sku1349",20476],["sku1349",20449],["sku1349",20440],["sku1349",20417],["sku1349",20332],["sku1349",20327],["sku1349",20324],["sku1349",20282],["sku1349",20271],["sku1349",20256],["sku1349",20239],["sku1349",20186],["sku1349",20158],["sku1349",20053],["sku1349",20048],["sku1349",20034],["sku1349",20030],["sku1349",20028],["sku1349",20007],["sku1349",20005],["sku1349",19997],["sku1349",19933],["sku1349",19928],["sku1349",19923],["sku1349",19908],["sku1349",19887],["sku1349",19859],["sku1349",19808],["sku1349",19742],["sku1349",19676],["sku1349",19670],["sku1349",19650],["sku1349",19636],["sku1349",19631],["sku1349",19603],["sku1349",19593],["sku1349",19579],["sku1349",19535],["sku1349",19522],["sku1349",19507],["sku1349",19486],["sku1349",19454],["sku1349",19452],["sku1349",19427],["sku1349",19400],["sku1349",19354]]}

It appears that the ItemID is not treated as a unique identifier. Does this have anything to do with me increasing the parallel hint for the storm bolts? (your code default to 1 spout and 1 executer for each bolt) I increase them to 2 to speed things up.

    val builder = new TopologyBuilder()
    builder.setSpout("kafka_spout", kafkaSpout, 2)
    builder.setBolt("item_item_bolt", new ItemItemBolt(), 2).shuffleGrouping("kafka_spout")
    builder.setBolt("trending_bolt", new TrendingBolt(), 2).shuffleGrouping("kafka_spout")
    builder.setBolt("counter_bolt", new CounterBolt(), 2).shuffleGrouping("kafka_spout")
    builder.setBolt("stats_bolt", new StatsBolt(),2 ).shuffleGrouping("kafka_spout")

Can you point me to which part of the code handling the uniqueness of this ? I will see if I can find out what went wrong.

willli666 commented 5 years ago

So I set all parallel hint to 1 but still see this issue with only a small set of data. I looks at your code and have some question about whether each bolt is blocking.

This is the part that handles trending items. I'm not familiar with scalar but I suspect the onComplete call to be asynchronous? If so, that would explain my case.

Let's assume we have user1 and user2 both purchased itemA. Now itemA can make to the trendingList. But if the user1's event processing is still in the middle of onComplete. User2's event may already start and cause itemA to be inserted into the trendingList twice. Do you think this can happen?

  def trackEvent(event: Event): Unit = {

    val itemId = event.itemId
    val action = event.action
    val weight = Config.ACTION_WEIGHTS(action)

    val trendingItems = getRecommendations()
    trendingItems.onComplete {
      case Success(items) =>
        if (items.exists(_.itemId == itemId)) {
          val currentItemIndex = items.indexWhere(_.itemId == itemId)
          val currentScore = items(currentItemIndex).count
          storage.trendingItemCounts.deleteRow(TrendingItemCount("trending", itemId, currentScore))
          storage.trendingItemCounts.store(TrendingItemCount("trending", itemId, currentScore + weight))
        }

        else if (items.length < Config.TRENDING_ITEMS_LIST_SIZE)
          storage.trendingItemCounts.store(TrendingItemCount("trending", itemId, weight))
        else updateCounts(items, itemId, weight)

      case Failure(msg) => println(msg)
    }

  }
artmoskvin commented 5 years ago

Hi @willli666 . Yeah, that could be the case. You can try to make those calls blocking and see if it helps.

willli666 commented 5 years ago

Yes, after I changed all calls to be blocking, the duplicate ids are gone. However, it's getting into this performance (scalability) issue. The event processing of Item-item-similarity typically takes 30 to 50 ms now. As you can see in the screenshot, sometimes it can even timeout over 1 second in extreme cases.

Screen Shot 2019-10-24 at 1 36 11 PM

Do you have some suggestions on how to address this in a production environment? Is there any part of the calculation can be made parallel? It doesn't scale if we have to process the event one by one in series.

Thanks!