asdine / storm

Simple and powerful toolkit for BoltDB
MIT License
2.06k stars 138 forks source link

KeyValueStore - how do I iterate through keys #209

Open tandr opened 6 years ago

tandr commented 6 years ago

I am probably missing something obvious, but how do I iterate over values in a bucket or get a list of keys from KeyValue storage?

func store(db *storm.DB) {
    for i := 0; i < rand.Intn(20); i++ {
        db.Put("r", rand.Intn(1000), rand.Intn(1000))
    }
}

func retrieve(db *storm.DB) {
    for k := range ???? {
        var value int
        db.Get("r", k, &value)
    }
}
tandr commented 6 years ago

I guess there is no easy way on a bolt side - it always retrieves keys with values. Oh well.

asdine commented 6 years ago

Sorry for the delay, you can use Select with Each.

db.Select().Each(new(User), func(record interface{}) error) {
  u := record.(*User)
  ...
  return nil
})
tandr commented 6 years ago

Thanks Asdine. What will be lock granularity here - one for per Select, one per whole Each, or one per each function invocation?

asdine commented 6 years ago

One read lock for the entire Each scan. All the methods not returning an Query in the Query interface perform a read lock until they're done https://github.com/asdine/storm/blob/master/query.go#L33-L51

tandr commented 6 years ago

I have to iterate through live and possible quite big set of items (while they are keep coming). Any suggestions how to improve that?

I don't care much about missing them on iteration, but whole-Each locking will be problematic for adding items...

asdine commented 6 years ago

Something like a stream of data that continously gets written in Storm ?

If i understand well, you are trying to use Storm like a queue system and expect the Eachmethod to call your func everytime there is a new record ?

tandr commented 6 years ago

It might come to that (streaming), but not yet.

There is a semi-constant stream of files coming (from 3rd party), that contains "Events" of some sort. We get them, parse them, and put them into perm storage for tracking. Each Event has EventID, and if EventID is known already, an exiting record for that Event is getting updated. When there is a change to an Event Set, Operator's Application is getting notified, and it comes back and retrieves them for the operator to see. (Right now all of them, later will be just changed ones). If I lock the set to iterate through for sending them out, updates on that "table" will be stalled, and it is not a desired behaviour.

I hope it describes it well enough, sorry I cannot go into more details.

asdine commented 6 years ago

You won't be able to read and update concurrently, that's not how Bolt was designed. Also you cannot scan a bucket consistently using several transactions because you have no guarantees that new records were not added before your current cursor (which won't be valid once the transaction is closed anyway).

There are no obvious solution for your use case but maybe implementing some kind of CQRS might help a bit:

This solution allows you to write at high speed with no read lock so all of your update requests can return rapidly. But this also introduces a delta (i.e the time for your goroutine to apply one change from A to B) which must be taken into account for your business.

asdine commented 6 years ago

You could also have a pagination system:

var lastCreatedAt time.Time
for {
  var record Record
  err := db.Select(q.Gt("CreatedAt", lastCreatedAt)).OrderBy("CreatedAt").Limit(1).First(&record)
  if err != nil {
     return err
  }

  if record == nil {
    // you reach the end of the bucket
    break
  }

  // send the record somewhere

  // update the cursor
  lastCreatedAt = record.CreatedAt
}

EDIT: Each Select with an OrderBy will occasionate a full bucket scan so it might not be worth it.