import java.io.{FileNotFoundException, IOException} import org.apache.hadoop.fs.{ FileSystem, FileStatus, FSDataInputStream, FSDataOutputStream, Path } /** IO Monad for HDFS */ sealed trait HdfsIO[+T] { def map[U](fn: T => U): HdfsIO[U] = flatMap(fn.andThen(HdfsIO.pure(_))) def flatMap[U](fn: T => HdfsIO[U]): HdfsIO[U] = FlatMapped(this, fn) def recover[U >: T](rec: IOException => U): HdfsIO[U] = recoverWith(rec.andThen(HdfsIO.pure(_))) def recoverWith[U >: T](rec: IOException => HdfsIO[U]): HdfsIO[U] = Recover(this, rec) } case class FSOp[+T](effect: (FileSystem) => T) extends HdfsIO[T] case class FlatMapped[U,T](prev: HdfsIO[U], next: U => HdfsIO[T]) extends HdfsIO[T] case class Recover[U,T](prev: HdfsIO[U], rec: IOException => HdfsIO[T]) extends HdfsIO[T] object HdfsIO { def maybeError[T](t: => T): Either[IOException, T] = try { Right(t) } catch { case x: IOException => Left(x) } implicit def pathFromString(path: String): Path = new Path(path) /** returns false if the file already exists */ def createNewEmptyFile(path: Path): HdfsIO[Boolean] = FSOp(_.createNewFile(path)) /** Creates a new file or gives an exception, * which you should recover from */ def createNewFile(path: Path): HdfsIO[FSDataOutputStream] = FSOp(_.create(path, false)) /** Delete a path when the FileSystem is closed. Useful * for cleaning up temp files */ def deleteOnExit(path: Path): HdfsIO[Boolean] = FSOp(_.deleteOnExit(path)) /** same as delete(path, false), fails if path is a directory */ def deleteFile(path: Path): HdfsIO[Boolean] = FSOp(_.delete(path, false)) /** recursively delete a directory * same as delete(path, true) */ def deleteDir(path: Path): HdfsIO[Boolean] = FSOp(_.delete(path, true)) def exists(path: Path): HdfsIO[Boolean] = FSOp(_.exists(path)) def getFileStatus(path: Path): HdfsIO[Either[FileNotFoundException, FileStatus]] = FSOp({fs => try { Right(fs.getFileStatus(path)) } catch { case fnf: FileNotFoundException => Left(fnf) } }) def getHomeDir: HdfsIO[Path] = FSOp(_.getHomeDirectory) def isFile(p: Path): HdfsIO[Boolean] = FSOp(_.isFile(p)) def mkdirs(path: Path): HdfsIO[Boolean] = FSOp(_.mkdirs(path)) def open(path: Path): HdfsIO[FSDataInputStream] = FSOp(_.open(path)) /** Overwrite OR create a new file */ def overwriteFile(path: Path): HdfsIO[FSDataOutputStream] = FSOp(_.create(path, true)) def rename(src: Path, dest: Path): HdfsIO[Boolean] = FSOp(_.rename(src, dest)) /////////////////////////////// def pure[T](t: => T) = FSOp(_ => t) /** Actually run an entire file system operation */ def run[T](h: HdfsIO[T])(implicit fs: FileSystem): T = { @annotation.tailrec def loop(cur: HdfsIO[Any], stack: List[Either[IOException => HdfsIO[Any], Any => HdfsIO[Any]]]): Any = { cur match { case FSOp(effect) => maybeError(effect(fs)) match { case Right(r) => stack.dropWhile(_.isLeft) match { case Nil => r case h::tail => loop(h.asInstanceOf[Any => HdfsIO[Any]].apply(r), tail) } case Left(x) => stack.dropWhile(_.isRight) match { case Nil => throw x case h::tail => loop(h.asInstanceOf[IOException => HdfsIO[Any]].apply(x), tail) } } case FlatMapped(p, n) => loop(p, Right(n) :: stack) case Recover(p, n) => loop(p, Left(n) :: stack) } } loop(h, Nil).asInstanceOf[T] } }