想要在 spark 上算移动均值,可以参考这个
http://stackoverflow.com/quest…
You can use the sliding function from MLLIB which probably does the same thing as Daniel’s answer. You will have to sort the data by time before using the sliding function.
import org.apache.spark.mllib.rdd.RDDFunctions._
sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()
http://stackoverflow.com/quest…,改写成 java 就繁琐很多。
I took the question you were referring and struggled for a couple of hours in order to translate the Scala code into Java:
// Read a file containing the Stock Quotations
// You can also paralelize a collection of objects to create a RDD
JavaRDDlinesRDD = sc.textFile(“some sample file containing stock prices”); // Convert the lines into our business objects
JavaRDDquotationsRDD = linesRDD.flatMap(new ConvertLineToStockQuotation()); // We need these two objects in order to use the MLLib RDDFunctions object
ClassTagclassTag = scala.reflect.ClassManifestFactory.fromClass(StockQuotation.class);
RDDrdd = JavaRDD.toRDD(quotationsRDD); // Instantiate a RDDFunctions object to work with
RDDFunctionsrddFs = RDDFunctions.fromRDD(rdd, classTag); // This applies the sliding function and return the (DATE,SMA) tuple
JavaPairRDDsmaPerDate = rddFs.sliding(slidingWindow).toJavaRDD().mapToPair(new MovingAvgByDateFunction());
List> smaPerDateList = smaPerDate.collect();
Then you have to use a new Function Class to do the actual calculation of each data window:public class MovingAvgByDateFunction implements PairFunction
/**
*
*/
private static final long serialVersionUID = 9220435667459839141L;@Override
public Tuple2call(Object t) throws Exception { StockQuotation[] stocks = (StockQuotation[]) t;
ListstockList = Arrays.asList(stocks); Double result = stockList.stream().collect(Collectors.summingDouble(new ToDoubleFunction
() { @Override
public double applyAsDouble(StockQuotation value) {
return value.getValue();
}
}));result = result / stockList.size();
return new Tuple2
(stockList.get(0).getTimestamp(),result);
}
}
If you want more detail on this, I wrote about Simple Moving Averages here: https://t.co/gmWltdANd3
http://victorferrerjava.blogsp…,这里有上文提到的博客,图文并茂