import java.io.{FileNotFoundException, IOException}

import org.apache.hadoop.fs.{
  FileSystem,
  FileStatus,
  FSDataInputStream,
  FSDataOutputStream,
  Path
}

/** IO Monad for HDFS */
sealed trait HdfsIO[+T] {
  def map[U](fn: T => U): HdfsIO[U] =
    flatMap(fn.andThen(HdfsIO.pure(_)))
  def flatMap[U](fn: T => HdfsIO[U]): HdfsIO[U] =
    FlatMapped(this, fn)
  def recover[U >: T](rec: IOException => U): HdfsIO[U] =
    recoverWith(rec.andThen(HdfsIO.pure(_)))
  def recoverWith[U >: T](rec: IOException => HdfsIO[U]): HdfsIO[U] =
    Recover(this, rec)
}

case class FSOp[+T](effect: (FileSystem) => T) extends HdfsIO[T]
case class FlatMapped[U,T](prev: HdfsIO[U], next: U => HdfsIO[T]) extends HdfsIO[T]
case class Recover[U,T](prev: HdfsIO[U], rec: IOException => HdfsIO[T]) extends HdfsIO[T]

object HdfsIO {
  def maybeError[T](t: => T): Either[IOException, T] =
    try { Right(t) }
    catch {
      case x: IOException => Left(x)
    }

  implicit def pathFromString(path: String): Path = new Path(path)

  /** returns false if the file already exists */
  def createNewEmptyFile(path: Path): HdfsIO[Boolean] =
    FSOp(_.createNewFile(path))

  /** Creates a new file or gives an exception,
   * which you should recover from */
  def createNewFile(path: Path): HdfsIO[FSDataOutputStream] =
    FSOp(_.create(path, false))

  /** Delete a path when the FileSystem is closed. Useful
   * for cleaning up temp files
   */
  def deleteOnExit(path: Path): HdfsIO[Boolean] =
    FSOp(_.deleteOnExit(path))

  /** same as delete(path, false), fails if path is a directory */
  def deleteFile(path: Path): HdfsIO[Boolean] =
    FSOp(_.delete(path, false))

  /** recursively delete a directory
   * same as delete(path, true) */
  def deleteDir(path: Path): HdfsIO[Boolean] =
    FSOp(_.delete(path, true))

  def exists(path: Path): HdfsIO[Boolean] =
    FSOp(_.exists(path))

  def getFileStatus(path: Path): HdfsIO[Either[FileNotFoundException, FileStatus]] =
    FSOp({fs =>
      try {
        Right(fs.getFileStatus(path))
      }
      catch {
        case fnf: FileNotFoundException => Left(fnf)
      }
    })

  def getHomeDir: HdfsIO[Path] =
    FSOp(_.getHomeDirectory)

  def isFile(p: Path): HdfsIO[Boolean] =
    FSOp(_.isFile(p))

  def mkdirs(path: Path): HdfsIO[Boolean] =
    FSOp(_.mkdirs(path))

  def open(path: Path): HdfsIO[FSDataInputStream] =
    FSOp(_.open(path))

  /** Overwrite OR create a new file */
  def overwriteFile(path: Path): HdfsIO[FSDataOutputStream] =
    FSOp(_.create(path, true))

  def rename(src: Path, dest: Path): HdfsIO[Boolean] =
    FSOp(_.rename(src, dest))

///////////////////////////////

  def pure[T](t: => T) = FSOp(_ => t)
  /** Actually run an entire file system operation */
  def run[T](h: HdfsIO[T])(implicit fs: FileSystem): T = {
    @annotation.tailrec
    def loop(cur: HdfsIO[Any],
      stack: List[Either[IOException => HdfsIO[Any], Any => HdfsIO[Any]]]): Any = {
      cur match {
        case FSOp(effect) =>
          maybeError(effect(fs)) match {
            case Right(r) =>
              stack.dropWhile(_.isLeft) match {
                case Nil => r
                case h::tail =>
                  loop(h.asInstanceOf[Any => HdfsIO[Any]].apply(r), tail)
              }
            case Left(x) =>
              stack.dropWhile(_.isRight) match {
                case Nil => throw x
                case h::tail =>
                  loop(h.asInstanceOf[IOException => HdfsIO[Any]].apply(x), tail)
              }
          }
        case FlatMapped(p, n) => loop(p, Right(n) :: stack)
        case Recover(p, n) => loop(p, Left(n) :: stack)
      }
    }
    loop(h, Nil).asInstanceOf[T]
  }
}