Closed glenjamin closed 9 years ago
I had the same question. It looks like there's a publisher.rb, which is a light wrapper around Bunny. I think it'd be good to highlight this in the examples, if it's considered ready for use.
Hi guys, It's true, the role of publishing is somewhat intentionally diminished. This is in order to avoid locking developers into one mindset a-la Sidekiq/Resque/etc. When you are in charge of serializing objects as messages, you have more power.
Since I'm working in a polyglot environment, and Sneakers was born out of the need for more performance, it wouldn't have been wise to dictate how to push messages to RabbitMQ.
For example, in my production environment, I use Go and Node.js (separate services) in order to publish new messages to RabbitMQ, and Sneakers to process them. I also use a lightweight but flexible message format - just serialized JSON that can contain anything. The workers are responsible to answer to that contract.
In this approach you can also use binary/msgpack/protocolbuffers or anything else really to formulate your messages - which to use is best dictated by your performance and development requirements.
That being said, @LukeWinikates is right - Publisher is a small shim that will allow you to do ad-hoc publishing. It is the same shim that lets a Sneakers worker "reply" to messages, by publishing from your actual job-handling code.
Thanks guys - this discussion ignited this Wiki page :) https://github.com/jondot/sneakers/wiki/Publishing-messages
Seems sensible, it may be worth including an example of the RabbitMQ publish code via bunny, as the rest of the lib doesn't really require understanding AMQP - even if you leave serialisation out of the example.
@glenjamin that's a great idea, will do
publisher.rb
doesn't (currently) honor any of the Sneakers.config
except the :heartbeat
option. It's easy enough to fix (copying the way it's done in queue.rb
worked for me), but just thought I'd save people some time if you're looking to use it.
If I have time today, I can put together a pull request to fix that, if you're interested @jondot.
@ged of course - that would be awesome of you! :)
I just saw that Worker#publish() merges the environment name into the routing_key. Is this by design?
Shouldn't the environment rather be defined by the vhost in RabbitMQ (or other connection settings) than to mix it all up and do magic on the names?
Unintended data pollution can occur when the test/development broker and the production broker are the same. Different routing keys cam help keep the test pollution out of the production data. I like to make sure that my tests are completely isolated with test vhost, test queues, test routing keys and a test broker.
Dewayne o-*
On Jan 13, 2014, at 9:35 AM, Benjamin Hüttinger notifications@github.com wrote:
I just saw that Worker#publish() merges the environment name into the routing_key. Is this by design?
Shouldn't the environment rather be defined by the vhost in RabbitMQ (or other connection settings) than to mix it all up and do magic on the names?
— Reply to this email directly or view it on GitHub.
I'm completely agreeing about separating different environments, but this separation should be early and only once. With different vhosts (connection settings) you already have separated environments. The rest is just adding complicating logic without benefit (and might actually be hurting you if you miss it in a case).
Might it be a better idea to opt-in to the environment magic? and by default not include it?
I still don't see the point of having the magic for this at all. Its like naming tables in a database users_test users_production, which i hope no-one is doing.
But if its possible to turn it off i'm ok with it ;)
I'll spill some light into the reasons for the magic.
I too disliked the magic, but figured this is how all Ruby devs will expect it to work :)
Make sense?
I'd stick to the pattern that for example ActiveRecord (or all the other DB-Mappers) use:
when in env "development" connect to the development db (localhost etc, db "project_development") when in env "test" connect to the test db (e.g. localhost etc, db "project_test")
After this there is no difference anymore in any naming, the env is not used in table names or anything else.
Same pattern for sneakers/RabbitMQ:
when in env "development" connect to the development rabbitmq (localhost etc, vhost "/project_development") when in env "test" connect to the test rabbitmq (e.g. localhost etc, vhost "/project_test")
This is how i have been using it for dozens of projects so far (mostly with, but not only, Rails) and it works like charm. Usually loading the config from an amqp.yml file (same pattern are with ActiveRecord here with database.yml), ideally there should also be a way to load it from ENV (the heroku way), but i don't need it that way:
AMQP_CONFIG ||= begin
(YAML.load_file(::File.expand_path('../../config/amqp.yml', __FILE__))[ENV['RAILS_ENV'] || 'development'] || {}).
inject({}) { |memo,(k,v)| memo[k.to_sym] = v; memo }
end
Sneakers.configure({
amqp: "amqp://#{AMQP_CONFIG[:user]}:#{AMQP_CONFIG[:pass]}@#{AMQP_CONFIG[:host]}:#{AMQP_CONFIG[:port]}",
vhost: AMQP_CONFIG[:vhost],
....
Maybe i'm missing something here but the different vhosts should give the different environments all the separation they need to ensure nothing ever getting mixed up. Plus each environment works identical to all the others of the same project, no magic or any additional logic needed.
I see. it sounds like a better idea indeed!
I will look into it and see how to do this transition in a non-breaking way. If it will be a breaking change then we'll use a major version bump in accordance to semver.
Thanks!
Cool :-)
Currently there is another issue with publishing. The publish method uses one shared (across all threads of a worker) channel.
This works fine with only few publishes, but at high frequency a lot of them get lost (in my test about 90%) in the cross-thread noise on the channel. Here is some info from bunny on that: http://rubybunny.info/articles/exchanges.html#publishing_in_multithreaded_environments
Yep. I think that's an issue. I was worried about ack/nack/etc from the worker threads. In theory this is also data that gets put on the channel by many threads - in this case I was assured from one of the AMQP devs that because confirmations always fall on a single frame it should work.
The disadvantage of having a single channel per thread - is when you want to have a lot of threads (e.g. 200) per process.
In the case of publishing - perhaps we should lock on the channel and "pay" that price only in this use case.
Channels do not have too much overhead on the RabbitMQ side (at least they claim that) and i did not have any problem with a few hundreds myself, but on the Ruby side it might add up to quite a lot unnecessary memory if someone does really not need it. The ideal solution is probably a "channel-pool" with configureable size.
Right now i'm trying one channel per thread to make sure this is the actual source of my messages getting lost.
Correction: I had another bug that was to blame for probably all the lost publishes. Still cant confirm 100% that it works with the shared channel, but it might not even be an issue as i thought at first.
Hey Benjamin, very curious to know how it went?
Meanwhile, I've come up with a nonbreaking solution for the environmental magic here:
https://github.com/jondot/sneakers/commit/8bf710215f76b0b58ed188ec4ee74d37be43756b
You're welcome to give that branch a try before it is merged.
Basically, calling Sneakers.not_environmental!
should do it. It sounds a bit bad for the environment, but I love small jokes in my code :).
So far i did not have any issues with publishing anymore, probably had a couple million items bouncing around tasks without problem. So i guess the channel issue is none.
So, @ged, what about your intention to fix the issue in Publisher https://github.com/jondot/sneakers/blob/master/lib/sneakers/publisher.rb#L24? Recently I've faced the same bug and I need to know: Are you going to fix the last one, or I'll do it myself?
@graf-abolmasov I think its already fixed in current version :), try gem install sneakers --pre
and see if it works for you.
Hi All,
I have noticed when consuming a ton of items from a queue and having them publish to another queue that I see quite a few bunny level exceptions
Unexpected error Connection-level error: UNEXPECTED_FRAME - expected content header for class 60, got non content header frame instead
From my testing it does not appear to happen when the publish is removed or commented out. I have also seen that best practices is to have one channel for consumption and one for publishing. Would it be worthwhile to have two channels in the worker and have publish use the second?
Publishing is not thread safe, and since Sneakers allows a boatload of threads, I suppose a dedicated channel wouldn't suffice. I think as a first test, you can wrap the "reply" publish logic within the worker with a mutex (you can take a look at the publisher code), and see if this keeps happening.
Wrapping the publish code in the worker in a mutex still caused the frame issues. It causes problems even under low publish load if the consumers are running quickly (as noted in the max_retry branch with the publish to error queue).
I'm not sure there is a simple solution - is there something that could be done to fix this or would the underlying format of the framework limit the use to only processing things that did work/http/some threadsafe outcome?
I am also getting UNEXPECTED_FRAME errors. Can you share your solutions?
I'm just opening up a new channel in each task that i process (lazy, only when its required to publish), since those are usually bigger tasks it works ok. All on the same global process connection.
If you have a higher load you might want to think about moving the publishing into the manager to have one permanent channel, but thats a lot more complex.
I got rid of UNEXPECTED_FRAME errors by modifying my worker that way:
class MyWorker
include Sneakers::Worker
from_queue :test
def work msg
params = JSON.parse msg, symbolize_names: true
result = MyJobProcessor.new(params).result
publisher.publish result, to_queue: :test_callback
ack!
end
def publisher
@publisher ||= Sneakers::Publisher.new
end
end
@jondot how do you think? Should I create PR for replacing Sneakers::Worker#publish method with this two methods:
def publish *attrs
publisher.publish *attrs
end
def publisher
@publisher ||= Sneakers::Publisher.new
end
@sharshenov I see what you did there, I think it's smart. However please notice that on first look, you're localizing a publisher instance per process and on second look - since a worker is instantiated per message, you're also instantiating a new Publisher
instance per message. It would be the same as doing
Sneakers::Publisher.new.publish(...)
in the #work
body.
Is that what you need?
Oh. Now I see, that my workaround is an overhead. I'll implement manager as @maxigs suggested. Thank you!
Thanks guys, closing for cleanups :)
As an aside, we use connection_pool
to manage our bunny connections in la_gear
but we actually do it at the publisher level (see here:
def init_pool(size = ::Sidekiq.options[:concurrency],
timeout = 3)
$publisher = ConnectionPool.new(
size: size,
timeout: timeout
) { ::LaGear::Publisher.new }
$publisher.with do |bus|
fail 'Bus is lost!' unless bus.is_a?(LaGear::Publisher)
end
end
where LaGear::Publisher
is a subtype of Sneakers::Publisher
. We push messages through our abstraction into a Sidekiq/Redis "outbox" which then actually publishes the message:
$publisher.with do |publisher|
publisher.publish(msg, opts)
end
We're seeing a similar issue with sneakers 2.2.1
. However, I'm not clear on why threading would be an issue in our scenario. We pass :threads => 1
and :workers => 10
to Sneakers.configure
. And we're using our own publisher implementation, which is instantiated at startup time and stored as an instance variable in a worker class implementation (ActivityWorker
). See also worker module (Worker::@handler_params
)
Any ideas?
So there is a ton of conversation here, but still no examples of how to actually publish a message, especially when doing so outside of a worker. I plan on using Sneakers to invoke background jobs as the AMQP allows easy ways for systems outside of Ruby to publish messages. But in many cases I need to publish them myself.
I agree with the points about the flexible formats, but it would be nice to at least have an example of how to use the publisher to actually send a message from outside a worker. That's not documented anywhere and beings reading the code hasn't proved useful to me to figure it out, I am struggling with how to stay with Sneakers for this as opposed to switching to a different gem.
Can we just get a simple example of sending a message to Rabbit that a different worker can pickup and work on? You can omit the worker part, but something show how to define the exchange that you are publishing to would be helpful.
You could also just use a different library to send messages into RabbitMQ, for queues to which sneakers is subscribed to.
I used the bunny
or amqp
gem for that, depending on the type of process. This triggered my initial job, and from the sneakers
takes over with processing everything.
Publishing from within sneakers should already be mentioned above. I currently don't have any active project using it anymore, so i don't have a direct example to share at hand at the moment :(
The only examples of publishing within sneakers were related to publishing from within a worker. But it's not clear how to actually invoke any of that from outside the worker. Especially when the worker config only uses the queue, and doesn't define the exchange (which is critical for publishing).
We ended up following the ConnetionPool
pattern described here where the pool contains instances of our custom publisher implementation. The connection pool and publisher instances are initialized in our daemon startup script, with one publisher instance per worker thread. We also have the name of our exchange available at that time as we're also using it for worker configs.
# configure the publisher pool
# publishing is NOT threadsafe so we need to allocate
# a publisher instance for each worker
# see: https://github.com/jondot/sneakers/issues/6#issuecomment-146034580
publisher_pool = ConnectionPool.new(size: worker_count, timeout: 5) do
RealSelf::Stream::Publisher.new(
{
:heartbeat => 60,
:host => rmq_url.host,
:password => rmq_url.password,
:port => rmq_url.port,
:user => rmq_url.user,
:vhost => '/'
},
stream_activity_exchange)
end
@jesmith17 Do you really have a case where you need to publish from outside a worker but still within sneakers processes?
If yes, you could still follow the way i describe and use AMQP directly to publish a message going into a sneakers queue. Not really the best way, but straight forward to use.
I'm not sure if I'm just being dense here, but I can't seem to see any docs or examples of publishing messages into Rabbit, only worker docs.
Are publishers outside the scope of sneakers?