spark 移动均值

想要在 spark 上算移动均值,可以参考这个

http://stackoverflow.com/quest…

You can use the sliding function from MLLIB which probably does the same thing as Daniel’s answer. You will have to sort the data by time before using the sliding function.

import org.apache.spark.mllib.rdd.RDDFunctions._

sc.parallelize(1 to 100, 10)
.sliding(3)
.map(curSlice => (curSlice.sum / curSlice.size))
.collect()

http://stackoverflow.com/quest…,改写成 java 就繁琐很多。

I took the question you were referring and struggled for a couple of hours in order to translate the Scala code into Java:

// Read a file containing the Stock Quotations
// You can also paralelize a collection of objects to create a RDD
JavaRDD linesRDD = sc.textFile(“some sample file containing stock prices”);

// Convert the lines into our business objects
JavaRDD quotationsRDD = linesRDD.flatMap(new ConvertLineToStockQuotation());

// We need these two objects in order to use the MLLib RDDFunctions object
ClassTag classTag = scala.reflect.ClassManifestFactory.fromClass(StockQuotation.class);
RDD rdd = JavaRDD.toRDD(quotationsRDD);

// Instantiate a RDDFunctions object to work with
RDDFunctions rddFs = RDDFunctions.fromRDD(rdd, classTag);

// This applies the sliding function and return the (DATE,SMA) tuple
JavaPairRDD smaPerDate = rddFs.sliding(slidingWindow).toJavaRDD().mapToPair(new MovingAvgByDateFunction());
List> smaPerDateList = smaPerDate.collect();
Then you have to use a new Function Class to do the actual calculation of each data window:

public class MovingAvgByDateFunction implements PairFunction {

/**
*
*/
private static final long serialVersionUID = 9220435667459839141L;

@Override
public Tuple2 call(Object t) throws Exception {

StockQuotation[] stocks = (StockQuotation[]) t;
List stockList = Arrays.asList(stocks);

Double result = stockList.stream().collect(Collectors.summingDouble(new ToDoubleFunction() {

@Override
public double applyAsDouble(StockQuotation value) {
return value.getValue();
}
}));

result = result / stockList.size();

return new Tuple2(stockList.get(0).getTimestamp(),result);
}
}
If you want more detail on this, I wrote about Simple Moving Averages here: https://t.co/gmWltdANd3

http://victorferrerjava.blogsp…,这里有上文提到的博客,图文并茂

Leave a Reply

Your email address will not be published. Required fields are marked *