Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Feature/shard fm #416

Merged
merged 4 commits into from
Jan 19, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -291,14 +291,8 @@ object Scalding {
val srcPf = if (shards <= 1)
src
else
src.map { flowP =>
flowP.map { pipe =>
pipe.groupBy { event => new java.util.Random().nextInt(shards) }
.mapValues(identity(_)) // hack to get scalding to actually do the groupBy
.withReducers(shards)
.values
}
}
src.map(_.map(_.shard(shards)))

(srcPf, built)
}
case IdentityKeyedProducer(producer) => recurse(producer)
Expand Down Expand Up @@ -374,8 +368,14 @@ object Scalding {
}, m)
case FlatMappedProducer(producer, op) =>
// Map in two monads here, first state then reader
val shards = getOrElse(options, names, producer, FlatMapShards.default).count
val (fmp, m) = recurse(producer)
(fmp.map { flowP =>
val fmpSharded = if (shards < 1)
fmp
else
fmp.map(_.map(_.shard(shards)))

(fmpSharded.map { flowP =>
flowP.map { typedPipe =>
typedPipe.flatMap { case (time, item) =>
op(item).map((time, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@ package com.twitter.summingbird.scalding

import com.twitter.scalding.{TimePathedSource => STPS, _}
import org.apache.hadoop.fs.Path
import org.slf4j.LoggerFactory

// TODO move this to scalding:
// https://github.com/twitter/scalding/issues/529
object TimePathedSource extends java.io.Serializable {

@transient private val logger = LoggerFactory.getLogger(TimePathedSource.getClass)

// Assumes linear expanders
private def unexpander(init: DateRange, expander: DateRange => DateRange): (DateRange) => Option[DateRange] = {
val expanded = expander(init)
Expand Down Expand Up @@ -82,9 +86,11 @@ object TimePathedSource extends java.io.Serializable {

def pathIsGood(p : String): Boolean = {
val path = new Path(p)
Option(path.getFileSystem(mode.conf).globStatus(path))
val valid = Option(path.getFileSystem(mode.conf).globStatus(path))
.map(_.length > 0)
.getOrElse(false)
logger.debug("Tested input %s, Valid: %s. Conditions: Any files present, DateRange: %s".format(p, valid, desired))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice.

valid
}

val vertractor = { (dr: DateRange) =>
Expand Down