twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

NullSink can't be used more than once in a flow #1715

Open johnynek opened 7 years ago

johnynek commented 7 years ago

we should 1) not have an object NullSink since each sink needs to be distict, 2) NullTap needs to have different identifiers: https://github.com/twitter/scalding/blob/develop/scalding-core/src/main/scala/com/twitter/scalding/Source.scala#L285

jeffreyolchovy commented 7 years ago

We've been using this for quite a long time internally at Tapad:

package com.tapad.scalding

import java.io._
import java.util.Properties

import cascading.tap.Tap
import com.twitter.scalding._
import org.apache.hadoop.mapred.{JobConf, OutputCollector, RecordReader}

import scala.util.Random

/**
  * If you need to write multiple pipes to NullSource in a single job, use this instead!
  *
  * import com.twitter.scalding._
  *
  * class OddsAndEvens(args: Args) extends Job(args) {
  *
  *   val input = IterableSource(Seq(0, 1, 2, 3), 'num)
  *
  *   val odds = input.filter('num)((_ % 2 != 0): Int => Boolean)
  *
  *   val evens = input.filter('num)((_ % 2 == 0): Int => Boolean)
  *
  *   odds.debug.write(new ReusableNullSource)
  *
  *   evens.debug.write(new ReusableNullSource)
  * }
  *
  */
class ReusableNullSource extends Source {

  private def nullTap[A, B, C, D, E] = new NullTap[A, B, C, D, E] {
    override val getIdentifier = Random.alphanumeric.take(32).mkString
  }

  override def createTap(readOrWrite : AccessMode)(implicit mode : Mode) : Tap[_, _, _] = {
    readOrWrite match {
      case Read => throw new Exception("not supported, reading from null")
      case Write => mode match {
        case Hdfs(_, _) => nullTap[JobConf, RecordReader[_, _], OutputCollector[_, _], Any, Any]
        case Local(_) => nullTap[Properties, InputStream, OutputStream, Any, Any]
        case Test(_) => nullTap[Properties, InputStream, OutputStream, Any, Any]
      }
    }
  }
}

Actually just came to check if this was finally resolved upstream and saw this issue was opened just a few days ago :)

johnynek commented 7 years ago

yeah, I had to make something similar. I didn't know about the issue before since I had not noticed this debug pattern.