jruizgit / rules

Durable Rules Engine
MIT License
1.15k stars 206 forks source link

integration with spark #98

Open agonencl opened 7 years ago

agonencl commented 7 years ago

Do you've any plans to integrate into spark? or can you provide a concept how it can be done?

jruizgit commented 7 years ago

Hi, thanks a lot for posting the question. It should be easy to integrate. Let me put together a couple of examples. Do you have any scenario in mind?

Thanks

On Jul 12, 2017, at 3:19 AM, amihay gonen notifications@github.com<mailto:notifications@github.com> wrote:

Do you've any plans to integrate into spark? or can you provide a concept how it can be done?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHubhttps://github.com/jruizgit/rules/issues/98, or mute the threadhttps://github.com/notifications/unsubscribe-auth/ADpINPrSO8VKfgQQyzVoeEr-HAE-5mrtks5sNJ3HgaJpZM4OVa6x.

agonencl commented 7 years ago

I'm looking on using rule engine, I've read on drools, but I prefer to work in Python since we are python shop. What I'm looking it correlates and aggerate data based on business rules (in my case - cyber related rules). I'll try to put a simple example:

The use case is to find breach if a user downloaded malware that steal it's login password. The idea is that malware will behave similarly on different attacked users. So I'm looking for one rule that will "filter data" and one rule to aggerated it

let's assume we two types/stream of events:

so filter rule file if we see within a time period both login event & download event and aggeration rule should aggerate all results from step 1 base on "file_hash"

I hope that example is clear enough, I've tried to reduce it to minimum

jruizgit commented 7 years ago

Hi, reporting back on this. I put together a simple example on how Spark and durable_rules can work together. In this example a rule looks for two words that contain Spark and Python. The words resulting from the file aggregation are posted to the ruleset.

from __future__ import print_function
import sys
from operator import add
from pyspark.sql import SparkSession
from durable.lang import *

# ruleset definition
with ruleset('words'):

    @when_all(c.first << m.word.matches('.*Spark.*'),
              c.second << m.word.matches('.*Python.*'))
    def spark_and_python(c):
        print('Triggered rule')
        print('   -> {0}'.format(c.first.word))
        print('   -> {0}'.format(c.second.word))

# host creation requires a redis instance
host = create_host([{'host': 'test.8zj18v.0001.usw2.cache.amazonaws.com', 'port': 6379}])
host.run()

# spark session
spark = SparkSession.builder.appName("PythonWordCount").getOrCreate()

# counting words
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
              .map(lambda x: (x, 1)) \
              .reduceByKey(add)
output = counts.collect()

# looking at results
for (word, count) in output:
    host.post('words', {'word': word})

spark.stop()

When I run the example using: spark-submit --master local[4] ./examples/src/main/python/wordcount.py README.md

I see the output:

Triggered rule
   -> Spark\"](http://spark.apache.org/docs/latest/building-spark.html).
   -> Python,
Triggered rule
   -> SparkPi
   -> Python
agonencl commented 7 years ago

thanks for quick response ,@jruizgit. In the example you've proivided the work is done from driver (=master ) node (mean for a single machine) . I'm looking to do from the workers(=slaves) machines. I'll try to play with it .

cgokulnath commented 3 years ago

Hello Ruiz, thanks for this post when i am trying to call post method inside the udfs in pyspark, i am getting the error as below. File "/opt/conda/default/lib/python2.7/site-packages/durable/lang.py", line 670, in post return get_host().post(ruleset_name, message, complete) File "/opt/conda/default/lib/python2.7/site-packages/durable/engine.py", line 805, in post rules = self.get_ruleset(ruleset_name) File "/opt/conda/default/lib/python2.7/site-packages/durable/engine.py", line 773, in get_ruleset ruleset_definition = self.load_ruleset(ruleset_name) File "/opt/conda/default/lib/python2.7/site-packages/durable/engine.py", line 764, in load_ruleset raise Exception('Ruleset with name {0} not found'.format(ruleset_name)) Exception: Ruleset with name levelcheck1 not found any help is appreciated..