import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.SparkConf

import java.util.Calendar


//export PATH=$PATH:/media/alfard/Cblex/sbt/bin
//sudo ./spark-submit   --class "Streamv3"   --master local[2] /media/alfard/Cblex/Sparkreboot/TestBuild/Streamv3/target/scala-2.11/streamv3_2.11-1.0.jar 48.903296 48.924444 2.331250 2.385399 15000
//sudo ./spark-submit   --class "Streamv3"   --master spark://localhost:7077 --deploy-mode cluster --supervise /media/alfard/Cblex/Sparkreboot/TestBuild/Streamv3/target/scala-2.11/streamv3_2.11-1.0.jar
//sudo ./spark-submit   --class "Streamv3"   --master spark://localhost:7077  --supervise /media/alfard/Cblex/Sparkreboot/TestBuild/Streamv3/target/scala-2.11/streamv3_2.11-1.0.jar




//java.io.FileNotFoundException: Path is not a file:

//./bin/spark-submit \
//  --class org.apache.spark.examples.SparkPi \
//  --master spark://207.184.161.138:7077 \
//  --deploy-mode cluster \
//  --supervise \
//  /path/to/examples.jar \
//  1000

//https://stackoverflow.com/questions/30181582/spark-use-the-global-config-variables-in-executors

//object Param {
//    var long_min = 0.00
//    var long_max = 0.00
//    var lat_min = 0.00
//    var lat_max = 0.00
//    var alt = 0.00
//}


object Streamv3  {

def functionToCreateContext(): StreamingContext = {

    val sc = new SparkConf().setAppName("Streamv3").setMaster("spark://localhost:7077")
  
    val ssc = new StreamingContext(sc, Minutes(5))

    ssc.checkpoint("hdfs://localhost:54310/checkpoint")

//    RDD.checkpoint()


//    val outputFile = new File("/media/alfard/Cblex/Sparkreboot/v10.txt")

//    val lines = ssc.textFileStream("hdfs://localhost:54310/")

    val lines = ssc.socketTextStream("localhost", 30003)

//    lines.checkpoint(Milliseconds(600000))

//    lines.checkpoint(30)
//    long_min = 48.903296
//    long_max = 48.924444
//     lat_min = 2.331250
//     lat_max = 2.385399
//     alt = 15000

    val v1 = lines.map( x => x.split(",",-1).map(_.trim))    

    val v2 = v1.map(x=>(x(4).toString, x(10).toString, (x(11) match { case ""|null => "NaN" case s => s } ).toDouble,(x(14) match { case ""|null => "NaN" case s => s } ).toDouble,(x(15) match { case ""|null => "NaN" case s => s } ).toDouble,x(6).toString,x(7).toString))

    val v3 = v2.filter{ k => k._4 > 48.903296 && k._4 < 48.924444 && k._5 > 2.331250 && k._5 < 2.385399 && k._3 < 15000 }

//Altitude minimmum
    val v4 = v3.map{ case(a,b,c,d,e,f,g) => (a,c)}.reduceByKey((k,v) => Math.min(k,v))
//Creation key, value datanode
    val v3b = v3.map{ case (a,b,c,d,e,f,g) => (a,(b,c,d,e,f,g))}

    val v5 = v4.leftOuterJoin(v3b)

    val v6 = v5.filter{ case (k,(z,Some((b,c,d,e,f,g)))) => { z == c }}

    val v8 = v2.map{ case(a,b,c,d,e,f,g) => (a,b)}.transform (k => k.distinct.filter{ case (a,b) => b != ""})

    val v9 = v6.leftOuterJoin(v8)

    val v10 = v9.map{case (k,((z,Some((a,b,c,d,e,f))),Some(g))) => (k,g,b,c,d,e,f)}

//Result

    val now = Calendar.getInstance().getTime.toInstant.toString

//    v10.count

//    v10.repartition(1).saveAsTextFiles(s"file:////media/alfard/Cblex/Sparkreboot/acar + $now")

//    v10.repartition(1).saveAsTextFiles(s"hdfs://localhost:54310/acar + $now")

    val nowc = now.replaceAll(":", "-")
    val name = "hdfs://localhost:54310/acar" + s"$nowc"

    v10.saveAsTextFiles(name)

//    v10.saveAsTextFiles(s"hdfs://localhost:54310/acar + $now")

    ssc
}



def main(args: Array[String]) {

val context = StreamingContext.getOrCreate("hdfs://localhost:54310/checkpoint", functionToCreateContext _)


//Param.long_min = args(0).toFloat
//Param.long_max = args(1).toFloat
//Param.lat_min = args(2).toFloat
//Param.lat_max = args(3).toFloat
//Param.alt = args(4).toFloat


// Start the context
context.start()
context.awaitTermination()

}
}