import java.io.File
import java.nio.charset.Charset
import com.google.common.io.Files
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import java.util.Calendar
//export PATH=$PATH:/media/alfard/Cblex/sbt/bin
//sudo ./spark-submit --class "Streamv3" --master local[2] /media/alfard/Cblex/Sparkreboot/TestBuild/Streamv3/target/scala-2.11/streamv3_2.11-1.0.jar 48.903296 48.924444 2.331250 2.385399 15000
//sudo ./spark-submit --class "Streamv3" --master spark://localhost:7077 --deploy-mode cluster --supervise /media/alfard/Cblex/Sparkreboot/TestBuild/Streamv3/target/scala-2.11/streamv3_2.11-1.0.jar
//sudo ./spark-submit --class "Streamv3" --master spark://localhost:7077 --supervise /media/alfard/Cblex/Sparkreboot/TestBuild/Streamv3/target/scala-2.11/streamv3_2.11-1.0.jar
//java.io.FileNotFoundException: Path is not a file:
//./bin/spark-submit \
// --class org.apache.spark.examples.SparkPi \
// --master spark://207.184.161.138:7077 \
// --deploy-mode cluster \
// --supervise \
// /path/to/examples.jar \
// 1000
//https://stackoverflow.com/questions/30181582/spark-use-the-global-config-variables-in-executors
//object Param {
// var long_min = 0.00
// var long_max = 0.00
// var lat_min = 0.00
// var lat_max = 0.00
// var alt = 0.00
//}
object Streamv3 {
def functionToCreateContext(): StreamingContext = {
val sc = new SparkConf().setAppName("Streamv3").setMaster("spark://localhost:7077")
val ssc = new StreamingContext(sc, Minutes(5))
ssc.checkpoint("hdfs://localhost:54310/checkpoint")
// RDD.checkpoint()
// val outputFile = new File("/media/alfard/Cblex/Sparkreboot/v10.txt")
// val lines = ssc.textFileStream("hdfs://localhost:54310/")
val lines = ssc.socketTextStream("localhost", 30003)
// lines.checkpoint(Milliseconds(600000))
// lines.checkpoint(30)
// long_min = 48.903296
// long_max = 48.924444
// lat_min = 2.331250
// lat_max = 2.385399
// alt = 15000
val v1 = lines.map( x => x.split(",",-1).map(_.trim))
val v2 = v1.map(x=>(x(4).toString, x(10).toString, (x(11) match { case ""|null => "NaN" case s => s } ).toDouble,(x(14) match { case ""|null => "NaN" case s => s } ).toDouble,(x(15) match { case ""|null => "NaN" case s => s } ).toDouble,x(6).toString,x(7).toString))
val v3 = v2.filter{ k => k._4 > 48.903296 && k._4 < 48.924444 && k._5 > 2.331250 && k._5 < 2.385399 && k._3 < 15000 }
//Altitude minimmum
val v4 = v3.map{ case(a,b,c,d,e,f,g) => (a,c)}.reduceByKey((k,v) => Math.min(k,v))
//Creation key, value datanode
val v3b = v3.map{ case (a,b,c,d,e,f,g) => (a,(b,c,d,e,f,g))}
val v5 = v4.leftOuterJoin(v3b)
val v6 = v5.filter{ case (k,(z,Some((b,c,d,e,f,g)))) => { z == c }}
val v8 = v2.map{ case(a,b,c,d,e,f,g) => (a,b)}.transform (k => k.distinct.filter{ case (a,b) => b != ""})
val v9 = v6.leftOuterJoin(v8)
val v10 = v9.map{case (k,((z,Some((a,b,c,d,e,f))),Some(g))) => (k,g,b,c,d,e,f)}
//Result
val now = Calendar.getInstance().getTime.toInstant.toString
// v10.count
// v10.repartition(1).saveAsTextFiles(s"file:////media/alfard/Cblex/Sparkreboot/acar + $now")
// v10.repartition(1).saveAsTextFiles(s"hdfs://localhost:54310/acar + $now")
val nowc = now.replaceAll(":", "-")
val name = "hdfs://localhost:54310/acar" + s"$nowc"
v10.saveAsTextFiles(name)
// v10.saveAsTextFiles(s"hdfs://localhost:54310/acar + $now")
ssc
}
def main(args: Array[String]) {
val context = StreamingContext.getOrCreate("hdfs://localhost:54310/checkpoint", functionToCreateContext _)
//Param.long_min = args(0).toFloat
//Param.long_max = args(1).toFloat
//Param.lat_min = args(2).toFloat
//Param.lat_max = args(3).toFloat
//Param.alt = args(4).toFloat
// Start the context
context.start()
context.awaitTermination()
}
}