/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.mahout.clustering.kmeans; import org.apache.mahout.common.DevURandomSeedGenerator; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.mahout.math.RandomAccessSparseVector; import org.apache.mahout.math.Vector; import org.apache.mahout.math.VectorWritable; import org.apache.mahout.common.distance.DistanceMeasure; import org.apache.mahout.common.distance.EuclideanDistanceMeasure; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.uncommons.maths.random.GaussianGenerator; import org.uncommons.maths.random.MersenneTwisterRNG; import org.uncommons.maths.random.ContinuousUniformGenerator; import org.uncommons.maths.random.DiscreteUniformGenerator; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.conf.Configuration; import org.apache.mahout.common.AbstractJob; //public class GenKMeansDataset extends Configured implements Tool { public class GenKMeansDataset extends AbstractJob { private static final Logger LOG= LoggerFactory.getLogger(GenKMeansDataset.class); private static long SAMPLES_PER_FILE = 20000000; private static final String TEST_BASE_PATH = "/KMeans"; private static final String TEST_INPUT_PATH = TEST_BASE_PATH + "/input"; private static final String TEST_INPUT_PATH_SAMPLES = TEST_INPUT_PATH + "/points"; private static final String TEST_INPUT_PATH_INITCLUSTERS = TEST_INPUT_PATH + "/clusters"; public static final double[][] simple = { { 1, 1 }, { 2, 1 }, { 1, 2 }, { 2, 2 }, { 3, 3 }, { 4, 4 }, { 5, 4 }, { 4, 5 }, { 5, 5 } }; public static List samples; public static List initialCentroids; public static List getPoints(double[][] raw) { List points = new ArrayList(); for (int i = 0; i < raw.length; i++) { double[] fr = raw[i]; Vector vec = new RandomAccessSparseVector(fr.length); vec.assign(fr); points.add(vec); } return points; } /** * Sample Producer. * The samples may be generated randomly using certain distribution, * or obtained from public data set. */ public abstract static class SampleProducer { protected FileSystem fs; protected JobConf conf; protected DistanceMeasure dm = new EuclideanDistanceMeasure(); /** * Produce vector samples and add into vector. * * @param target the target list the produced samples are added into * @return the number of samples produced */ public long produceSamples(List target) throws Exception {return 0;} /** * Produce vector samples and write them directory to file system. * This inteface is called when the samples number is too large to be contained in * a List. So write to file directly. * @param samplePath The path on filesystem the samples file to be written to * @return the number of samples produced */ public abstract long produceSamples(Path samplePath) throws Exception; /** * Produce initial cluster centroids and add into provided list. * This interface is called after produceSamples is called. * @param numClusters The number centroids to be generated * @param centroidList the centroids container * @return the actual number of centroids produced */ public abstract int produceInitialCentroids(int numClusters, List centoidList) throws Exception; /** * Produce initial cluster centroids and write the centroids directly to file system. * This interface is called after produceSamples is called. * @param numClusters The number centroids to be generated * @param centroidsPath the path on FileSystem where centroids are to be written to * @return the actual number of centroids produced */ public int produceInitialCentroids(int numClusters, Path centroidsPath) throws Exception { List iCentroids = new ArrayList(numClusters); produceInitialCentroids(numClusters, iCentroids); SequenceFile.Writer writer = new SequenceFile.Writer(getFileSystem(), getJobConf(), centroidsPath, Text.class, Kluster.class); for (int i = 0; i < iCentroids.size(); i++) { Vector vec = iCentroids.get(i); Kluster cluster = new Kluster(vec,i,dm); // add the center so the centroid will be correct upon output cluster.observe(cluster.getCenter(),1); writer.append(new Text(cluster.getIdentifier()), cluster); } writer.close(); return iCentroids.size(); } public void setFileSystem(FileSystem fs, JobConf conf){ this.fs = fs; this.conf = conf; } public FileSystem getFileSystem(){ return this.fs; } public JobConf getJobConf(){ return this.conf; } protected SequenceFile.Writer createNewFile(Path filepath) throws IOException { return createNewFile(filepath, LongWritable.class,VectorWritable.class); } protected SequenceFile.Writer createNewFile(Path filepath, Class keyClass, Class valueClass) throws IOException { SequenceFile.Writer writer = new SequenceFile.Writer(getFileSystem(), getJobConf(), filepath, keyClass, valueClass); LOG.info("creating file "+filepath.toString()); return writer; } } /** * Simple Demo Sample Producer for original samples. * The samples may be generated randomly using certain distribution, * or obtained from public data set. */ public static class DemoSampleProducer extends SampleProducer { public DemoSampleProducer() {} public long produceSamples(Path samplePath) throws Exception { return 0;} public int produceInitialCentroids(int numClusters, List centroidList) throws Exception {return 0;} } /** * Generate random sample points using Normal (Gaussian) Distribution. */ public static class GaussianSampleGenerator extends SampleProducer { private MersenneTwisterRNG rng; private long numSamples = 10; private int dimension = 2; private double [][][] genParams = { { {0,1}, {0,1} } } ; private double cMin = 0.0; private double cMax = 1.0; public static class MapClass extends MapReduceBase implements Mapper { private int dimension = 2; public void configure(JobConf jobConf){ this.dimension = Integer.parseInt(jobConf.get("genkmeansdataset.dimensions")); } public void map( IntWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { try { MersenneTwisterRNG rng = new MersenneTwisterRNG(new DevURandomSeedGenerator()); //create gussian generators based on seeds GaussianGenerator [] gg = new GaussianGenerator [dimension]; String[] numbers = value.toString().split("\t"); int i = 0; long numSamples = Long.parseLong(numbers[i++]); for (int d = 0; d< dimension; d++) { double mean = Double.parseDouble(numbers[i++]); double std = Double.parseDouble(numbers[i++]); LOG.info("dimension="+d+": mean="+mean+", std="+std); gg[d] = new GaussianGenerator(mean,std,rng); } //generate samples double [] vec = new double[dimension]; for(long count = 0; count target) throws Exception { long numTotal = this.numSamples; int centriodNum = genParams.length; int numPerCluster = (int)Math.ceil((double)numTotal/(double)centriodNum); LOG.info("Cluster number="+centriodNum+" numbers per cluster="+numPerCluster); GaussianGenerator [] gg = new GaussianGenerator [dimension]; for (int k= 0; k iCentroids) throws Exception { //create iniital centroids ContinuousUniformGenerator ug = new ContinuousUniformGenerator(this.cMin,this.cMax,rng); double [] vec = new double [dimension]; for (int k = 0; k= SAMPLES_PER_FILE) { out.close(); out = createNewFile(new Path(samplePath,"file"+(fileNo++))); samplesInCurrFile = 0; } out.append(new LongWritable(samplesInCurrFile++), new VectorWritable(p)); sampleNum ++; //LOG.info("writing sample "+samplesInCurrFile); } out.close(); in.close(); LOG.info("Parsed "+String.valueOf(sampleNum)+" samples totally."); this.numSamples = sampleNum; return this.numSamples; } public int produceInitialCentroids(int numClusters, List iCentroids) throws Exception{ MersenneTwisterRNG rng = new MersenneTwisterRNG(new DevURandomSeedGenerator()); int numMax = (numSamples >= Integer.MAX_VALUE)? Integer.MAX_VALUE:((int)numSamples); DiscreteUniformGenerator dug = new DiscreteUniformGenerator(1,numMax,rng); ArrayList indexes = new ArrayList(numClusters); for (int i=0; i(numClusters); DatasetSampleReader dp = new DatasetSampleReader(datasetFile); sp = dp; } JobConf jobConf = new JobConf(getConf(),GenKMeansDataset.class); jobConf.set("mapred.output.compress",compress); jobConf.set("mapred.output.compression.type",compressType); jobConf.set("mapred.output.compression.codec",compressCodec); LOG.info("mapred.output.compression.codec=" + jobConf.get("mapred.output.compression.codec")); FileSystem fs = FileSystem.get(jobConf); sp.setFileSystem(fs,jobConf); LOG.info("Generate K-Means input Dataset : samples = " + numSamples + "sample dimension" + dimension +" cluster num =" + numClusters); //delete input and output directory fs.delete(new Path(sampleDir)); LOG.info("Start producing samples..."); long sampleNum = sp.produceSamples(new Path(sampleDir)); LOG.info("Finished producing samples"); // pick k initial cluster centers at random fs.delete(new Path(clusterDir)); LOG.info("Start generating initial cluster centers ..."); sp.produceInitialCentroids(numClusters, new Path(clusterDir,"part-00000")); LOG.info("Finished generating initial cluster centers"); return 0; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new GenKMeansDataset(), args); System.exit(res); } }