ovikassingho / gstreamer-java

Automatically exported from code.google.com/p/gstreamer-java
0 stars 0 forks source link

WriteableByteChannelSink Update #91

Closed GoogleCodeExporter closed 8 years ago

GoogleCodeExporter commented 8 years ago
Hi Guys

I have updated the class writeableByteChannelSink, and specifically the 
function sinkRender.

I am using these via inheritance, the OutputStreamSink.

Anyway, I have added a function to ensure that once sinkRender has written the 
buffer out the NIO channel then we call buffer.dispose automatically. This 
should ensure that we arnt waiting for the GC to dispose of the of buffer, as 
the object will have a native object pointer.

I looked through the code and couldnt find anywhere the buffer we explicitly 
disposed, hence these updates.

/* 
 * Copyright (c) 2007 Wayne Meissner
 * 
 * This file is part of gstreamer-java.
 *
 * This code is free software: you can redistribute it and/or modify it under
 * the terms of the GNU Lesser General Public License version 3 only, as
 * published by the Free Software Foundation.
 *
 * This code is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License
 * version 3 for more details.
 *
 * You should have received a copy of the GNU Lesser General Public License
 * version 3 along with this work.  If not, see <http://www.gnu.org/licenses/>.
 */

package org.gstreamer.io;

import java.io.IOException;
import java.nio.channels.WritableByteChannel;

import org.gstreamer.Buffer;
import org.gstreamer.FlowReturn;
import org.gstreamer.elements.CustomSink;

/**
 *
 * @author wayne
 */
public class WriteableByteChannelSink extends CustomSink {

    private WritableByteChannel channel;
    private boolean autoFlushBuffer = false;

    public WriteableByteChannelSink(final WritableByteChannel channel, String name) {
        super(WriteableByteChannelSink.class, name);
        this.channel = channel;
    }

    public void setAutoFlushBuffer(boolean value){
        this.autoFlushBuffer = value;
    }

    @Override
    protected final FlowReturn sinkRender(Buffer buffer) throws IOException {
        channel.write(buffer.getByteBuffer());

        //Dispose immediate to avoid GC Delay
        if(autoFlushBuffer){
            if((null != buffer) && (null != buffer.getAddress())){
                buffer.dispose();
            }
        }

        return FlowReturn.OK;
    }   
}

If these are not needed then can someone let me know. Else can we get these 
changes added to trunk ?

Thanks
Chris

Original issue reported on code.google.com by ch...@crazyfool.org on 7 Dec 2011 at 5:12

GoogleCodeExporter commented 8 years ago
could you send a patch?

Original comment by lfar...@lfarkas.org on 8 Dec 2011 at 11:48

GoogleCodeExporter commented 8 years ago
hi here is a svn diff, from my locally modified source, to the current SVN repo.

Index: WriteableByteChannelSink.java
===================================================================
--- WriteableByteChannelSink.java       (revision 536)
+++ WriteableByteChannelSink.java       (working copy)
@@ -32,15 +32,28 @@
 public class WriteableByteChannelSink extends CustomSink {

     private WritableByteChannel channel;
+    private boolean autoFlushBuffer = false;

     public WriteableByteChannelSink(final WritableByteChannel channel, String name) {
         super(WriteableByteChannelSink.class, name);
         this.channel = channel;
     }

+    public void setAutoFlushBuffer(boolean value){
+       this.autoFlushBuffer = value;
+    }
+    
     @Override
     protected final FlowReturn sinkRender(Buffer buffer) throws IOException {
         channel.write(buffer.getByteBuffer());
+        
+        //Dispose immediate to avoid GC Delay
+        if(autoFlushBuffer){
+               if((null != buffer) && (null != buffer.getAddress())){
+                       buffer.dispose();
+               }
+        }
+        
         return FlowReturn.OK;
     }   
 }

Original comment by ch...@crazyfool.org on 8 Dec 2011 at 3:12

GoogleCodeExporter commented 8 years ago
little bit modified: r545

Original comment by lfar...@lfarkas.org on 8 Dec 2011 at 3:28

GoogleCodeExporter commented 8 years ago
Hi Farkas

I have made some more changes. I have created a new lock class, and updated the 
classes ReadableByteChannelSrc and WriteableByteChannelSink to use it. 

The changes mean that a 'main' thread could create the above input/output 
classes, then wait on the new lock object, until someone signals completion. 
i.e. the bus callbacks or alternatively an error from somewhere. If any NIO 
error occurs within the input/output classes then it will signal the lock.

Here is some sample code, this is our main thread :

lock = new StreamLock();
outputStreamSink = new OutputStreamSink(outputstream, "outputStream");
outputStreamSink.setAutoFlushBuffer(true);
outputStreamSink.set("sync", false);
outputStreamSink.setNotifyOnError(lock);
inputStreamSrc.setNotifyOnError(lock);

pipeline.setState(State.PLAYING);

synchronized (lock) {
while (lock.isDone() == false) {
try {
lock.wait();
} catch (InterruptedException e) {
                    }
                }
            }

I have left some things out, but above is just a pointer. To active this code, 
you have to call setNotifyOnError with the lock object. Without this call, 
everything works as before.

Here is a diff of the new changes :

Index: src/org/gstreamer/io/WriteableByteChannelSink.java
===================================================================
--- src/org/gstreamer/io/WriteableByteChannelSink.java  (revision 536)
+++ src/org/gstreamer/io/WriteableByteChannelSink.java  (working copy)
@@ -24,23 +24,55 @@
 import org.gstreamer.Buffer;
 import org.gstreamer.FlowReturn;
 import org.gstreamer.elements.CustomSink;
-
+import org.gstreamer.io.StreamLock;
 /**
  *
  * @author wayne
  */
-public class WriteableByteChannelSink extends CustomSink {
+public class WriteableByteChannelSink extends CustomSink{

     private WritableByteChannel channel;
+    private boolean autoFlushBuffer = false;
+    private StreamLock lock = null;

     public WriteableByteChannelSink(final WritableByteChannel channel, String name) {
         super(WriteableByteChannelSink.class, name);
         this.channel = channel;
     }

-    @Override
+    public void setAutoFlushBuffer(boolean value){
+       this.autoFlushBuffer = value;
+    }
+    
+    public void setNotifyOnError(StreamLock lock){
+       this.lock = lock;
+    }
+    
+    private void signalError(){
+       if(null != lock){
+           lock.setDone();
+       }
+    }
+    
+    
+   @Override
     protected final FlowReturn sinkRender(Buffer buffer) throws IOException {
-        channel.write(buffer.getByteBuffer());
-        return FlowReturn.OK;
-    }   
+       
+       try{
+           channel.write(buffer.getByteBuffer());
+           return FlowReturn.OK;
+       }catch(IOException ex){
+           signalError();
+           return FlowReturn.ERROR;
+       }finally{
+            //Dispose immediate to avoid GC Delay
+            if(autoFlushBuffer){
+               if((null != buffer) && (null != buffer.getAddress())){
+                   buffer.dispose();
+               }
+            }
+       }
+    }
+    
+    
 }
Index: src/org/gstreamer/io/ReadableByteChannelSrc.java
===================================================================
--- src/org/gstreamer/io/ReadableByteChannelSrc.java    (revision 536)
+++ src/org/gstreamer/io/ReadableByteChannelSrc.java    (working copy)
@@ -38,6 +38,8 @@
     private final ReadableByteChannel channel;
     private FileChannel fileChannel;
     private long channelPosition = 0;
+    private StreamLock lock = null;
+    
     public ReadableByteChannelSrc(ReadableByteChannel src, String name) {
         super(ReadableByteChannelSrc.class, name);  
         this.channel = src;
@@ -83,6 +85,7 @@
             readFully(offset, size, buffer);
             return FlowReturn.OK;
         } catch (IOException ex) {
+           signalError();
 //            System.out.println(ex);
             return FlowReturn.UNEXPECTED;
         }        
@@ -104,6 +107,7 @@
                 segment.write();
                 return true;
             } catch (IOException ex) {
+               signalError();
                 Logger.getLogger(getClass().getName()).log(Level.SEVERE, null, ex);
                 return false;
             }
@@ -119,6 +123,7 @@
             try {
                 return fileChannel.size();
             } catch (IOException ex) {
+               signalError();
                 Logger.getLogger(ReadableByteChannelSrc.class.getName()).log(Level.SEVERE, null, ex);
                 return -1;
             }
@@ -126,4 +131,14 @@
         // We can't figure out the size of non-filechannel files
         return -1;
     }
+    
+    private void signalError(){
+       if(null != lock){
+           lock.setDone();
+       }
+    }
+    
+    public void setNotifyOnError(StreamLock lock){
+       this.lock = lock;
+    }
 }

And here is the new StreamLock class at 
gstreamer-java-orig/gstreamer-java/src/org/gstreamer/io/StreamLock.java

package org.gstreamer.io;

public class StreamLock {
        protected boolean done;

        public StreamLock(){
                done = false;
        }

        public boolean isDone() {
                return done;
        }

        public void setDone(){
                synchronized(this){
                        done = true;
                        this.notifyAll();
                }
        }

}

Original comment by ch...@crazyfool.org on 14 Dec 2011 at 4:27