Closed jselamy closed 7 years ago
Hi,
Starting from v3, Easy Rules is thread safe, see FAQ N6.
Do you still have thread safety issues?
Kr Mahmoud
Hi,
In facts yes i've read the FAQ perhaps, and it's saying that RuleEngine can be share But using it i'm having issues in a multi-threading setup. where i get concurrent error.
java.util.ConcurrentModificationException: null
at java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
at java.util.TreeMap$KeyIterator.next(TreeMap.java:1265)
at org.jeasy.rules.core.DefaultRulesEngine.apply(DefaultRulesEngine.java:136)
at org.jeasy.rules.core.DefaultRulesEngine.fire(DefaultRulesEngine.java:109)
Interesting. Can you share your multi-threaded setup? Where does the concurrency occur?
A code example would be great to understand the issue.
Hey,
Thanks for that quick reply.
Using spring-amqp, with a ThreadPoolTaskExecutor to consume message from a queue. Based on the paylaod received we do run the RuleEngine.fire
@EnableAsync
@Configuration
public class RabbitMQConfig {
@Autowired
private AppProperties appProperties;
@Autowired
private ConnectionFactory connectionFactory;
@Bean
public Queue queue() {
return new Queue(appProperties.getQueue().getName());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public Executor threadPoolExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setThreadNamePrefix("async-consumer-");
threadPoolTaskExecutor.setBeanName("taskExecutor");
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(100);
return threadPoolTaskExecutor;
}
@Bean
public MessageListener messageListener() {
return new PushEventConsumer();
}
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setConnectionFactory(connectionFactory);
listenerContainer.setQueues(queue());
listenerContainer.setMessageConverter(jsonMessageConverter());
listenerContainer.setMessageListener(messageListener());
listenerContainer.setTaskExecutor(threadPoolExecutor());
listenerContainer.setConcurrentConsumers(5);
listenerContainer.setMaxConcurrentConsumers(10);
listenerContainer.setPrefetchCount(10);
listenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
return listenerContainer;
}
}
public class PayloadRulesEngine {
@Autowired
private RuleCacheService ruleService;
@Autowired
private PayloadRuleFactory payloadRuleFactory;
private final Rules rules = new Rules();
private org.jeasy.rules.api.RulesEngine rulesEngine;
public PayloadRulesEngine(org.jeasy.rules.api.RulesEngine rulesEngine) {
this.rulesEngine = rulesEngine;
}
public void fire(String payload) {
Facts facts = new Facts();
facts.put(AbstractPayloadRule.STATS_EVENT_PAYLOAD, payload);
registerRules();
rulesEngine.fire(rules, facts);
}
private void registerRules() {
rules.clear();
ruleService.findAll()
.stream()
.map(payloadRuleFactory::createRegexRule)
.forEach(rules::register);
}
}
@Configuration
public class RulesConfig {
@Bean
public PayloadRulesEngine payloadRulesEngine() {
RulesEngine rulesEngine = RulesEngineBuilder
.aNewRulesEngine()
.withSilentMode(true)
.withSkipOnFirstAppliedRule(false)
.withSkipOnFirstFailedRule(false)
.withSkipOnFirstNonTriggeredRule(false)
.build();
return new PayloadRulesEngine(rulesEngine);
}
}
@Component
public class PushEventConsumer implements MessageListener {
private static final Logger LOG = LoggerFactory.getLogger(PushEventConsumer.class);
@Autowired
private PayloadRulesEngine rulesEngine;
@Override
public void onMessage(Message message) {
try {
String textMessage = new String(message.getBody());
Validate.notEmpty(textMessage, "Incoming messaging in consumer is empty");
LOG.debug("calling rule engine for payload with URI::{}", JsonPath.parse(textMessage).<String>read("$.endpointURI"));
rulesEngine.fire(textMessage);
} catch (Exception e) {
LOG.error("Impossible to read the incoming message", e);
}
}
}
Any news ?
Didn't get time to work on that yet.
I'll get back to you asap.
Hi,
Wanted to do a follow up, on the outstanding issue.
Regards.
___ Jimmy M. Sélamy
Consultant Informatique JMS Key Solutions Inc. 1-514-771-7436 jselamy@jmskeysolutions.ca
2017-09-14 22:17 GMT+02:00 Mahmoud Ben Hassine notifications@github.com:
Didn't get time to work on that yet.
I'll get back to you asap.
— You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/j-easy/easy-rules/issues/110#issuecomment-329596947, or mute the thread https://github.com/notifications/unsubscribe-auth/AAbXUnXaHx8Opbfv14O2tGPJ5PlQ3L9yks5siYn1gaJpZM4PWehr .
Hi,
I have some free time this week. I was planning to work on this issue in priority.
Will keep you posted.
Kr Mahmoud
Hi,
Thank you for providing this code example!
What I see from your code is that the Facts
object is shared between multiple consumers.
Facts (data) should be specific to each thread while rules (algorithm) can be shared between them.
I was trying to reproduce the issue with this test case:
package org.jeasy.rules.core;
import org.jeasy.rules.api.Facts;
import org.jeasy.rules.api.Rules;
import org.jeasy.rules.api.RulesEngine;
import org.junit.Test;
import static org.junit.Assert.fail;
public class ThreadSafetyTest {
@Test
public void rulesEngineShouldBeThreadSafe() throws Exception {
RulesEngine rulesEngine = new DefaultRulesEngine();
EvenNumberRule rule1 = new EvenNumberRule();
OddNumberRule rule2 = new OddNumberRule();
Rules rules = new Rules();
rules.register(rule1);
rules.register(rule2);
MyThread thread1 = new MyThread(rulesEngine, rules);
MyThread thread2 = new MyThread(rulesEngine, rules);
try {
thread1.start();
thread2.start();
} catch (java.util.ConcurrentModificationException e) {
fail("The rules engine should be thread safe");
}
}
class EvenNumberRule extends BasicRule {
@Override
public boolean evaluate(Facts facts) {
Integer number = (Integer) facts.get("number");
return number % 2 == 0;
}
@Override
public void execute(Facts facts) throws Exception {
Integer number = (Integer) facts.get("number");
System.out.println(Thread.currentThread().getName() + ": number " + number + " is even");
}
}
class OddNumberRule extends BasicRule {
@Override
public boolean evaluate(Facts facts) {
Integer number = (Integer) facts.get("number");
return number % 2 != 0;
}
@Override
public void execute(Facts facts) throws Exception {
Integer number = (Integer) facts.get("number");
System.out.println(Thread.currentThread().getName() + ": number " + number + " is odd");
}
}
class MyThread extends Thread {
private RulesEngine rulesEngine;
private Rules rules;
MyThread(RulesEngine rulesEngine, Rules rules) {
this.rulesEngine = rulesEngine;
this.rules = rules;
}
@Override
public void run() {
for (int i = 0; i < 100; i++) {
Facts facts = new Facts();
facts.put("number", i);
System.out.println(Thread.currentThread().getName() + ": Firing rules on facts: " + facts);
rulesEngine.fire(rules, facts);
}
}
}
}
But I have no exception after multiple executions. This test shows how the rules engine and the rules can be shared between threads, while each thread has its own facts.
In your example, I would do something like:
@Component
public class PushEventConsumer implements MessageListener {
private static final Logger LOG = LoggerFactory.getLogger(PushEventConsumer.class);
@Autowired
private RulesEngine rulesEngine;
@Autowired
private Rules rules;
@Override
public void onMessage(Message message) {
try {
String textMessage = new String(message.getBody());
Validate.notEmpty(textMessage, "Incoming messaging in consumer is empty");
LOG.debug("calling rule engine for payload with URI::{}", JsonPath.parse(textMessage).<String>read("$.endpointURI"));
Facts facts = new Facts();
facts.put(AbstractPayloadRule.STATS_EVENT_PAYLOAD, textMessage);
rulesEngine.fire(rules, facts);
} catch (Exception e) {
LOG.error("Impossible to read the incoming message", e);
}
}
}
Do you see the difference?
Please let me know if this helps.
Kr Mahmoud
Hi,
Sorry for the late reply, Yes i see what do you mean.
Perhaps can you explain me why the fact
cannot be shared between multiple consumers ?
Hi,
Glad it helped.
Perhaps can you explain me why the fact cannot be shared between multiple consumers ?
Because Facts
is a mutable object (you may add/remove/modify facts) and hence is not thread safe.
If it is shared between threads, you may expect strange behaviour as you had.
As said previously, Facts (data) should be specific to each thread while rules (algorithm) can be shared between them.
Kr Mahmoud
Hi,
Thank you for providing this code example! What I see from your code is that the
Facts
object is shared between multiple consumers. Facts (data) should be specific to each thread while rules (algorithm) can be shared between them.I was trying to reproduce the issue with this test case:
package org.jeasy.rules.core; import org.jeasy.rules.api.Facts; import org.jeasy.rules.api.Rules; import org.jeasy.rules.api.RulesEngine; import org.junit.Test; import static org.junit.Assert.fail; public class ThreadSafetyTest { @Test public void rulesEngineShouldBeThreadSafe() throws Exception { RulesEngine rulesEngine = new DefaultRulesEngine(); EvenNumberRule rule1 = new EvenNumberRule(); OddNumberRule rule2 = new OddNumberRule(); Rules rules = new Rules(); rules.register(rule1); rules.register(rule2); MyThread thread1 = new MyThread(rulesEngine, rules); MyThread thread2 = new MyThread(rulesEngine, rules); try { thread1.start(); thread2.start(); } catch (java.util.ConcurrentModificationException e) { fail("The rules engine should be thread safe"); } } class EvenNumberRule extends BasicRule { @Override public boolean evaluate(Facts facts) { Integer number = (Integer) facts.get("number"); return number % 2 == 0; } @Override public void execute(Facts facts) throws Exception { Integer number = (Integer) facts.get("number"); System.out.println(Thread.currentThread().getName() + ": number " + number + " is even"); } } class OddNumberRule extends BasicRule { @Override public boolean evaluate(Facts facts) { Integer number = (Integer) facts.get("number"); return number % 2 != 0; } @Override public void execute(Facts facts) throws Exception { Integer number = (Integer) facts.get("number"); System.out.println(Thread.currentThread().getName() + ": number " + number + " is odd"); } } class MyThread extends Thread { private RulesEngine rulesEngine; private Rules rules; MyThread(RulesEngine rulesEngine, Rules rules) { this.rulesEngine = rulesEngine; this.rules = rules; } @Override public void run() { for (int i = 0; i < 100; i++) { Facts facts = new Facts(); facts.put("number", i); System.out.println(Thread.currentThread().getName() + ": Firing rules on facts: " + facts); rulesEngine.fire(rules, facts); } } } }
But I have no exception after multiple executions. This test shows how the rules engine and the rules can be shared between threads, while each thread has its own facts.
In your example, I would do something like:
@Component public class PushEventConsumer implements MessageListener { private static final Logger LOG = LoggerFactory.getLogger(PushEventConsumer.class); @Autowired private RulesEngine rulesEngine; @Autowired private Rules rules; @Override public void onMessage(Message message) { try { String textMessage = new String(message.getBody()); Validate.notEmpty(textMessage, "Incoming messaging in consumer is empty"); LOG.debug("calling rule engine for payload with URI::{}", JsonPath.parse(textMessage).<String>read("$.endpointURI")); Facts facts = new Facts(); facts.put(AbstractPayloadRule.STATS_EVENT_PAYLOAD, textMessage); rulesEngine.fire(rules, facts); } catch (Exception e) { LOG.error("Impossible to read the incoming message", e); } } }
Do you see the difference?
Please let me know if this helps.
Kr Mahmoud
Hi, facing the same problem and I don't understand what do you mean by 'What I see from your code is that the Facts object is shared between multiple consumers' because Facts object is created every time PayloadRulesEngine.fire(String)
method is called and for me it means that Facts object is new for each consumer. Could you please explain it more in details?
Hi wanted to know, if by any chances there's plan to make this lib totally thread safe ?
regards.