siddhi-io / siddhi

Stream Processing and Complex Event Processing Engine
http://siddhi.io
Apache License 2.0
1.52k stars 529 forks source link

read a csv file automatically when it overwritten #1767

Open gamorav opened 2 years ago

gamorav commented 2 years ago

Description: I am using WSO2 Streaming Integrator Tooling 4.0.0 to test this script:

@App:name('UntitledETLTaskFlow')
@App:description('Description of the plan')

@source(type='file',file.uri = "file:/C:/Users/admin/Documents/test/productions.csv",mode = "TEXT.FULL",tailing = "false",header.present = "false",
    @map(type='csv'))
define stream input_stream (name string,amount double);

@sink(type = 'log',
    @map(type = 'passThrough'))
define stream LogStream (name string,amount double);

@info(name='query1')
from input_stream
select name,amount 
insert  into LogStream;

Log:

UntitledETLTaskFlow.siddhi -  Started Successfully!
[2022-02-12_02-27-02_029] INFO {io.siddhi.core.stream.output.sink.LogSink} - UntitledETLTaskFlow : LogStream : [Event{timestamp=1644643622029, data=[Almond cookie, 90.0], isExpired=false}, Event{timestamp=1644643622029, data=[Almond cookie, 95.0], isExpired=false}, Event{timestamp=1644643622029, data=[Almond cookie, 100.0], isExpired=false}, Event{timestamp=1644643622029, data=[Almond cookie, 150.0], isExpired=false}, Event{timestamp=1644643622029, data=[Almond cookie, 155.0], isExpired=false}] 

My problem is that when I overwrite the csv file with new data the log doesn't show anything. The only way to do that is by saving the siddhi file.

How can I solve this issue?. In other words, How can I automatically poll the entire file (not line by line) every time it is modified or overwritten?

Thanks.

senthuran16 commented 2 years ago

Hi @gamorav,

One easier approach to solving this would be, listening to the directory instead of the file.

@source(type='file',dir.uri = "file:/C:/Users/admin/Documents/test",mode = "TEXT.FULL",tailing = "false",header.present = "false",
    @map(type='csv'))
define stream input_stream (name string,amount double);

Upon dropping a new CSV file (with a new name) to your file:/C:/Users/admin/Documents/test directory, the input_stream will read the lines from it. Will this work for you?

gamorav commented 2 years ago

Hi @gamorav, One easier approach to solving this would be, listening to the directory instead of the file.

@source(type='file',dir.uri = "file:/C:/Users/admin/Documents/test",mode = "TEXT.FULL",tailing = "false",header.present = "false",
    @map(type='csv'))
define stream input_stream (name string,amount double);

Upon dropping a new CSV file (with a new name) to your file:/C:/Users/admin/Documents/test directory, the input_stream will read the lines from it. Will this work for you?

Hi @senthuran16 , thank you, but I need that the csv has to be the same name, any workaround for that condition?.

senthuran16 commented 2 years ago

Hi @gamorav , We can use file.uri with mode = "LINE", and tailing = "TRUE" [1], to achieve this.

@source(type='file', file.uri = "file:/C:/Users/admin/Documents/test/productions.csv", mode = "LINE", tailing = "true", header.present = "false",
    @map(type='csv'))
define stream input_stream (name string,amount double);

When we add a new line to productions.csv file and save that, the input_stream will receive a new event.

[1] https://siddhi-io.github.io/siddhi-io-file/api/latest/#file-source