19c19 < package org.apache.hadoop.fs.dfsioe; --- > package org.apache.hadoop.fs; 22a23 > import junit.framework.TestCase; 33,34d33 < import org.apache.hadoop.util.Tool; < import org.apache.hadoop.util.ToolRunner; 83,104c82 < public class TestDFSIOEnh extends Configured implements Tool { < < private static final Log LOG = LogFactory.getLog(TestDFSIOEnh.class); < private static final int TEST_TYPE_READ = 0; < private static final int TEST_TYPE_WRITE = 1; < private static final int TEST_TYPE_CLEANUP = 2; < private static final int DEFAULT_BUFFER_SIZE = 1000000; < private static final String BASE_FILE_NAME = "test_io_"; < private static final String DEFAULT_RES_FILE_NAME = "TestDFSIOEnh_results.log"; < < private static String TEST_ROOT_DIR = System.getProperty("test.build.data","/benchmarks/TestDFSIO-Enh"); < private static Configuration fsConfig = new Configuration(); < private static Path CONTROL_DIR = new Path(TEST_ROOT_DIR, "io_control"); < private static Path WRITE_DIR = new Path(TEST_ROOT_DIR, "io_write"); < private static Path READ_DIR = new Path(TEST_ROOT_DIR, "io_read"); < private static Path DATA_DIR = new Path(TEST_ROOT_DIR, "io_data"); < < < static{ < Configuration.addDefaultResource("hdfs-default.xml"); < Configuration.addDefaultResource("hdfs-site.xml"); < } --- > public class TestDFSIOEnh extends TestDFSIO { 108d85 < private static final long MEGA = 1024*1024; 112c89 < private static final String DEFAULT_TPUT_RESFILE_NAME = "TestDFSIOEnh_Tput.csv"; //result file name --- > private static final String DEFAULT_TPUT_RESFILE_NAME = "TestDFSIO_Tput.csv"; //result file name 117d93 < 126,129c102 < protected abstract static class IOStatMapperEnh extends IOMapperBase { < IOStatMapperEnh() { < super(fsConfig); < } --- > protected abstract static class IOStatMapperEnh extends IOStatMapper { 147,159c120,121 < //super.collectStats(output, name, execTime,stats.objSize); < long totalSize = ((Long)stats.objSize).longValue(); < float ioRateMbSec = (float)totalSize * 1000 / (execTime * MEGA); < LOG.info("Number of bytes processed = " + totalSize); < LOG.info("Exec time = " + execTime); < LOG.info("IO rate = " + ioRateMbSec); < < output.collect(new UTF8("l:tasks"), new UTF8(String.valueOf(1))); < output.collect(new UTF8("l:size"), new UTF8(String.valueOf(totalSize))); < output.collect(new UTF8("l:time"), new UTF8(String.valueOf(execTime))); < output.collect(new UTF8("f:rate"), new UTF8(String.valueOf(ioRateMbSec*1000))); < output.collect(new UTF8("f:sqrate"), new UTF8(String.valueOf(ioRateMbSec*ioRateMbSec*1000))); < --- > super.collectStats(output, name, execTime,stats.objSize); > 247,250c209 < < private static String getFileName(int fIdx) { < return BASE_FILE_NAME + Integer.toString(fIdx); < } --- > 252c211 < protected static void writeTest(FileSystem fs, Configuration fsConfig) --- > protected static void writeTest(FileSystem fs) 295,296d253 < < long actualSize = 0; 298,300c255,257 < while (actualSize < totalSize) { < int curSize = in.read(buffer, 0, bufferSize); < if (curSize < 0) break; --- > long actualSize = 0; > for(int curSize = bufferSize; curSize == bufferSize;) { > curSize = in.read(buffer, 0, bufferSize); 321c278 < stats.statHdfs.add(new Long(actualSize)); --- > stats.statHdfs.add(new Long(totalSize)); 328c285 < protected static void readTest(FileSystem fs,Configuration fsConfig) throws IOException { --- > protected static void readTest(FileSystem fs) throws IOException { 356,372d312 < protected static void runIOTest( Class mapperClass, < Path outputDir, < JobConf job < ) throws IOException { < FileInputFormat.setInputPaths(job, CONTROL_DIR); < job.setInputFormat(SequenceFileInputFormat.class); < < job.setMapperClass(mapperClass); < job.setReducerClass(AccumulatingReducer.class); < < FileOutputFormat.setOutputPath(job, outputDir); < job.setOutputKeyClass(UTF8.class); < job.setOutputValueClass(UTF8.class); < job.setNumReduceTasks(1); < JobClient.runJob(job); < } < 476c416 < public int run(String[] args) throws Exception { --- > public static void main(String[] args) { 498c438 < return -1; --- > System.exit(-1); 561,563d500 < < Configuration fsConfig = new Configuration(getConf()); < 585c522 < return 0; --- > return; 589c526 < return 0; --- > return; 591c528 < createControlFile(fs, fileSize, nrFiles,fsConfig); --- > createControlFile(fs, fileSize, nrFiles); 594c531 < writeTest(fs,fsConfig); --- > writeTest(fs); 596c533 < readTest(fs,fsConfig); --- > readTest(fs); 605c542 < return -1; --- > System.exit(-1); 608,609d544 < return 0; < 612,645d546 < public static void main(String[] args) throws Exception{ < int res = ToolRunner.run(new TestDFSIOEnh(), args); < System.exit(res); < } < < private static void createControlFile( < FileSystem fs, < int fileSize, // in MB < int nrFiles, < Configuration fsConfig < ) throws IOException { < LOG.info("creating control file: "+fileSize+" mega bytes, "+nrFiles+" files"); < < fs.delete(CONTROL_DIR, true); < < for(int i=0; i < nrFiles; i++) { < String name = getFileName(i); < Path controlFile = new Path(CONTROL_DIR, "in_file_" + name); < SequenceFile.Writer writer = null; < try { < writer = SequenceFile.createWriter(fs, fsConfig, controlFile, < UTF8.class, LongWritable.class, < CompressionType.NONE); < writer.append(new UTF8(name), new LongWritable(fileSize)); < } catch(Exception e) { < throw new IOException(e.getLocalizedMessage()); < } finally { < if (writer != null) < writer.close(); < writer = null; < } < } < LOG.info("created control files for: "+nrFiles+" files"); < } 693c594 < if (!wrFileName.equals(getFileName(f))) continue; --- > if (!wrFileName.equals(TestDFSIO.getFileName(f))) continue; 834,835c735 < //TestDFSIO.analyzeResult(fs,testType,execTime,resFileName); < --- > TestDFSIO.analyzeResult(fs,testType,execTime,resFileName); 837,841c737,738 < long tasks = 0; < long size = 0; < long time = 0; < float rate = 0; < float sqrate = 0; --- > // the extended report > BufferedReader lines; 848a746,748 > DataInputStream in; > in = new DataInputStream(fs.open(reduceFile)); > lines = new BufferedReader(new InputStreamReader(in)); 850c750,751 < //long time = 0; --- > > long time = 0; 860,866c761 < < DataInputStream in = null; < BufferedReader lines = null; < try { < in = new DataInputStream(fs.open(reduceFile)); < lines = new BufferedReader(new InputStreamReader(in)); < while((line = lines.readLine()) != null) { --- > while((line = lines.readLine()) != null) { 883,897c778,779 < } else if (attr.endsWith(":tasks")){ < tasks = Long.parseLong(tokens.nextToken()); < } else if (attr.endsWith(":size")){ < size = Long.parseLong(tokens.nextToken()); < } else if (attr.endsWith(":rate")) { < rate = Float.parseFloat(tokens.nextToken()); < } else if (attr.endsWith(":sqrate")) { < sqrate = Float.parseFloat(tokens.nextToken()); < } < } < } finally { < if(in != null) in.close(); < if(lines != null) lines.close(); < } < --- > } > } 899,913c781,782 < double med = rate / 1000 / tasks; < double stdDev = Math.sqrt(Math.abs(sqrate / 1000 / tasks - med*med)); < String resultLines[] = { < "----- TestDFSIO ----- : " + ((testType == TEST_TYPE_WRITE) ? "write" : < (testType == TEST_TYPE_READ) ? "read" : < "unknown"), < " Date & time: " + new Date(System.currentTimeMillis()), < " Number of files: " + tasks, < "Total MBytes processed: " + size/MEGA, < " Throughput mb/sec: " + size * 1000.0 / (time * MEGA), < "Average IO rate mb/sec: " + med, < " IO rate std deviation: " + stdDev, < " Test exec time sec: " + (float)execTime / 1000, < "" }; < --- > LOG.info("Total number of Throughput Samples = "+(wrSamples.size()/2)); > 920c789 < String enhResultLines[] = { --- > String resultLines[] = { 932,933d800 < < 937,941d803 < } < < for(int i = 0; i < enhResultLines.length; i++) { < LOG.info(enhResultLines[i]); < res.println(enhResultLines[i]); 951,956c813 < private static void cleanup(FileSystem fs) throws IOException { < LOG.info("Cleaning up test files"); < fs.delete(new Path(TEST_ROOT_DIR), true); < } < < }//end --- > }//end of test case