package edu.calstatela.hadoop.example.associations; import edu.calstatela.utils.MatrixCalculator; import java.io.IOException; import java.util.Collections; import java.util.Iterator; import java.util.StringTokenizer; import java.util.Map; import java.util.HashMap; import java.util.List; import java.util.LinkedList; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; //import org.apache.hadoop.mapred.jobcontrol.Job; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; import org.apache.log4j.Logger; /** * Market Basket Analysis Algorithm: find the association rule for the list of items * in a basket; That is, there are transaction data in a store *
* The code reads the data as * key: first item * value: the rest of the items * *
* And, count the possible associations as requested by user: two pairs, triples, ...
* (see Jongwook's Map/Reduce blog)
*
* @date: 03/28/2011
* @author jongwook Woo
* @email: jwoo5@calstatela.edu
* @version: 1.0
*
*/
public class ItemCount extends Configured implements Tool {
public static final Logger LOG = Logger.getLogger(ItemCount.class);
public int run(String args[]) throws Exception {
String inputPath = args[0];
String outputPath = args[1];
int noReducers = Integer.parseInt(args[2]);
int noPairs = Integer.parseInt(args[3]);
LOG.info("Input Path: " + inputPath);
LOG.info("Output Path: " + outputPath);
LOG.info("# of Reducers: " + noReducers);
LOG.info("# of Pairs: " + noPairs);
// job configuration
// reference:
//http://www.hongliangjie.com/2011/01/16/passing-parameters-and-arguments-to-mapper-and-reducer-in-hadoop/
// Create a new JobConf
JobConf jobConf = new JobConf(getConf(), ItemCount.class);
jobConf.set("numPairs", args[3]);
jobConf.setJobName("ItemCountJobConf");
Job job = new Job(jobConf);
job.setJarByClass(ItemCount.class);
job.setNumReduceTasks(noReducers);
//input/output path
FileInputFormat.setInputPaths(job, new Path(inputPath));
FileOutputFormat.setOutputPath(job, new Path(outputPath));
//Mapper K, V output
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//output format
job.setOutputFormatClass(SequenceFileOutputFormat.class);
//Reducer K, V output
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// set mapper/reducer
job.setMapperClass(MyMapper.class);
// TODO: don't need combiner?
job.setCombinerClass(MyReducer.class);
job.setReducerClass(MyReducer.class);
//delete the output path if exists to avoid "existing dir/file" error
Path outputDir = new Path(outputPath);
FileSystem.get(getConf()).delete(outputDir, true);
long sTime = System.currentTimeMillis();
job.waitForCompletion(true);
System.out.println("It takes: " + (System.currentTimeMillis() - sTime) + " msec");
return 0;
}
/**
* MyMapper
* input: (key1, value1)
* output: <(key2, value2)>
* @author jongwook
*
*/
protected static class MyMapper extends Mapper> outItems = null;
String itemPair = null;
for(int i=0;i
> outItems = null;
String itemPair = null;
for(int i=0;i
> outItems = null;
String itemPair = null;
for(int i=0;i