twitter / scalding

A Scala API for Cascading
http://twitter.com/scalding
Apache License 2.0
3.5k stars 706 forks source link

`TimeSeqPathedSource` internal inconsistency #1442

Open Gabriella439 opened 8 years ago

Gabriella439 commented 8 years ago

TimeSeqPathedSource provides the following method so that you can override the period of hourly directories (i.e. one directory every 6 hours instead of every hour as the comment indicates):

  /**
   * Override this if you have for instance an hourly pattern but want to run every 6 hours.
   * By default, we call TimePathedSource.stepSize(pattern, tz)
   */
  protected def defaultDurationFor(pattern: String): Option[Duration] =
    TimePathedSource.stepSize(pattern, tz)

However, there's actually an inconsistency between the behavior of allPaths (which is used to generate the set of paths to validate) and the behavior of hdfsWritePath (which is used to generate the path to write out to).

If you specify, say, a 6 hour window then allPaths will look for a directory at the start of each 6-hour window because it uses the start of the DateRange to compute the path to validate:

  protected def allPathsFor(pattern: String): Iterable[String] =
    defaultDurationFor(pattern) match {
      case Some(duration) => TimePathedSource.allPathsWithDuration(pattern, duration, dateRange, tz)
      case None => sys.error(s"No suitable step size for pattern: $pattern")
    }

  def allPathsWithDuration(pattern: String, duration: Duration, dateRange: DateRange, tz: TimeZone): Iterable[String] =
    // This method is exhaustive, but too expensive for Cascading's JobConf writing.
    dateRange.each(duration)
      .map { dr: DateRange =>
        toPath(pattern, dr.start, tz)
      }

However, writePathFor will write out to a path at the end of the 6-hour window because it uses the end of the DateRange to compute the path to write out to:

  //Write to the path defined by the end time:
  override def hdfsWritePath = TimePathedSource.writePathFor(pattern, dateRange, tz)

  /**
   * Gives the write path based on daterange end.
   */
  def writePathFor(pattern: String, dateRange: DateRange, tz: TimeZone): String = {
    assert(pattern.takeRight(2) == "/*", "Pattern must end with /* " + pattern)
    val lastSlashPos = pattern.lastIndexOf('/')
    val stripped = pattern.slice(0, lastSlashPos)
    toPath(stripped, dateRange.end, tz)
  }

This means that if you write out data using TimeSeqPathedSource you must ensure that defaultDurationFor matches dateRange supplied to a TimeSeqPathedSource, otherwise you will not be able to read the data back in using the same source unless you disable validation.

johnynek commented 8 years ago

Yeah, this feature was somewhat recently (and hastily added). I don't think it makes a ton of sense to merge the read and write versions on this in one class that accepts a date range. Writing should not take a date range, it should take a specific date and a format string.