Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

Clustering (fkmeans) with Mahout using Clojure

I am trying to write a short script to cluster my data via clojure (calling Mahout classes though). I have my input data in this format (which is an output from a php script)

format: (tag) (image) (frequency) tag_sit image_a 0 tag_sit image_b 1 tag_lorem image_a 1 tag_lorem image_b 0 tag_dolor image_a 0 tag_dolor image_b 1 tag_ipsum image_a 1 tag_ipsum image_b 1 tag_amit image_a 1 tag_amit image_b 0 ... (more) 

Then I write them into a SequenceFile using this script (clojure)

#!./bin/clj (ns sensei.sequence.core)  (require 'clojure.string) (require 'clojure.java.io)  (import org.apache.hadoop.conf.Configuration) (import org.apache.hadoop.fs.FileSystem) (import org.apache.hadoop.fs.Path) (import org.apache.hadoop.io.SequenceFile) (import org.apache.hadoop.io.Text)  (import org.apache.mahout.math.VectorWritable) (import org.apache.mahout.math.SequentialAccessSparseVector)  (with-open [reader (clojure.java.io/reader *in*)]   (let [hadoop_configuration ((fn []                                 (let [conf (new Configuration)]                                   (. conf set "fs.default.name" "hdfs://localhost:9000/")                                   conf)))         hadoop_fs (FileSystem/get hadoop_configuration)]     (reduce       (fn [writer [index value]]         (. writer append index value)         writer)       (SequenceFile/createWriter         hadoop_fs         hadoop_configuration         (new Path "test/sensei")         Text         VectorWritable)       (map         (fn [[tag row_vector]]           (let [input_index (new Text tag)                 input_vector (new VectorWritable)]             (. input_vector set row_vector)             [input_index input_vector]))         (map           (fn [[tag photo_list]]             (let [photo_map (apply hash-map photo_list)                   input_vector (new SequentialAccessSparseVector (count (vals photo_map)))]               (loop [frequency_list (vals photo_map)]                 (if (zero? (count frequency_list))                   [tag input_vector]                   (when-not (zero? (count frequency_list))                     (. input_vector set                        (mod (count frequency_list) (count (vals photo_map)))                        (Integer/parseInt (first frequency_list)))                     (recur (rest frequency_list)))))))           (reduce             (fn [result next_line]               (let [[tag photo frequency] (clojure.string/split next_line #" ")]                 (update-in result [tag]                   #(if (nil? %)                      [photo frequency]                      (conj % photo frequency)))))             {}             (line-seq reader))))))) 

Basically it turns the input into sequence file, in this format

key (Text): $tag_uri value (VectorWritable): a vector (cardinality = number of documents) with numeric index and the respective frequency <0:1 1:0 2:0 3:1 4:0 ...>

Then I proceed to do the actual cluster with this script (by referring to this blog post)

#!./bin/clj  (ns sensei.clustering.fkmeans)  (import org.apache.hadoop.conf.Configuration) (import org.apache.hadoop.fs.Path)  (import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver) (import org.apache.mahout.common.distance.EuclideanDistanceMeasure) (import org.apache.mahout.clustering.kmeans.RandomSeedGenerator)  (let [hadoop_configuration ((fn []                                 (let [conf (new Configuration)]                                   (. conf set "fs.default.name" "hdfs://127.0.0.1:9000/")                                   conf)))       input_path (new Path "test/sensei")       output_path (new Path "test/clusters")       clusters_in_path (new Path "test/clusters/cluster-0")]   (FuzzyKMeansDriver/run     hadoop_configuration     input_path     (RandomSeedGenerator/buildRandom       hadoop_configuration       input_path       clusters_in_path       (int 2)       (new EuclideanDistanceMeasure))     output_path     (new EuclideanDistanceMeasure)     (double 0.5)     (int 10)     (float 5.0)     true     false     (double 0.0)     false)) '' runSequential 

However I am getting output like this

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 11/08/25 15:20:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new compressor 11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new decompressor 11/08/25 15:20:17 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 11/08/25 15:20:17 INFO input.FileInputFormat: Total input paths to process : 1 11/08/25 15:20:17 INFO mapred.JobClient: Running job: job_local_0001 11/08/25 15:20:17 INFO mapred.MapTask: io.sort.mb = 100 11/08/25 15:20:17 INFO mapred.MapTask: data buffer = 79691776/99614720 11/08/25 15:20:17 INFO mapred.MapTask: record buffer = 262144/327680 11/08/25 15:20:17 WARN mapred.LocalJobRunner: job_local_0001 java.lang.IllegalStateException: No clusters found. Check your -c path.         at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.setup(FuzzyKMeansMapper.java:62)         at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142)         at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)         at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)         at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210) 11/08/25 15:20:18 INFO mapred.JobClient:  map 0% reduce 0% 11/08/25 15:20:18 INFO mapred.JobClient: Job complete: job_local_0001 11/08/25 15:20:18 INFO mapred.JobClient: Counters: 0 Exception in thread "main" java.lang.RuntimeException: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed         at clojure.lang.Util.runtimeException(Util.java:153)         at clojure.lang.Compiler.eval(Compiler.java:6417)         at clojure.lang.Compiler.load(Compiler.java:6843)         at clojure.lang.Compiler.loadFile(Compiler.java:6804)         at clojure.main$load_script.invoke(main.clj:282)         at clojure.main$script_opt.invoke(main.clj:342)         at clojure.main$main.doInvoke(main.clj:426)         at clojure.lang.RestFn.invoke(RestFn.java:436)         at clojure.lang.Var.invoke(Var.java:409)         at clojure.lang.AFn.applyToHelper(AFn.java:167)         at clojure.lang.Var.applyTo(Var.java:518)         at clojure.main.main(main.java:37) Caused by: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed         at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.runIteration(FuzzyKMeansDriver.java:252)         at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersMR(FuzzyKMeansDriver.java:421)         at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:345)         at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295)         at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35)         at clojure.lang.Compiler.eval(Compiler.java:6406)         ... 10 more 

When runSequential is set to true

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 11/09/07 14:32:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new compressor 11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new decompressor Exception in thread "main" java.lang.IllegalStateException: Clusters is empty!         at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersSeq(FuzzyKMeansDriver.java:361)         at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:343)         at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295)         at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35)         at clojure.lang.Compiler.eval(Compiler.java:6465)         at clojure.lang.Compiler.load(Compiler.java:6902)         at clojure.lang.Compiler.loadFile(Compiler.java:6863)         at clojure.main$load_script.invoke(main.clj:282)         at clojure.main$script_opt.invoke(main.clj:342)         at clojure.main$main.doInvoke(main.clj:426)         at clojure.lang.RestFn.invoke(RestFn.java:436)         at clojure.lang.Var.invoke(Var.java:409)         at clojure.lang.AFn.applyToHelper(AFn.java:167)         at clojure.lang.Var.applyTo(Var.java:518)         at clojure.main.main(main.java:37) 

I have also rewritten the fkmeans script to this form

#!./bin/clj  (ns sensei.clustering.fkmeans)  (import org.apache.hadoop.conf.Configuration) (import org.apache.hadoop.fs.Path)  (import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver) (import org.apache.mahout.common.distance.EuclideanDistanceMeasure) (import org.apache.mahout.clustering.kmeans.RandomSeedGenerator)  (let [hadoop_configuration ((fn []                                 (let [conf (new Configuration)]                                   (. conf set "fs.default.name" "hdfs://localhost:9000/")                                   conf)))       driver (new FuzzyKMeansDriver)]   (. driver setConf hadoop_configuration)   (. driver      run      (into-array String ["--input" "test/sensei"                          "--output" "test/clusters"                          "--clusters" "test/clusters/clusters-0"                          "--clustering"                          "--overwrite"                          "--emitMostLikely" "false"                          "--numClusters" "3"                          "--maxIter" "10"                          "--m" "5"]))) 

but is still getting same error as the first initial version :/

Command Line tool runs fine

$ bin/mahout fkmeans --input test/sensei --output test/clusters --clusters test/clusters/clusters-0 --clustering --overwrite --emitMostLikely false --numClusters 10 --maxIter 10 --m 5 

However it would not return the points when I try clusterdumper even though --clustering option exists in the previous command and --pointsDir is defined here

$ ./bin/mahout clusterdump --seqFileDir test/clusters/clusters-1 --pointsDir test/clusters/clusteredPoints --output sensei.txt 

Mahout version used: 0.6-snapshot, clojure 1.3.0-snapshot

Please let me know if I miss out anything

like image 910
Jeffrey04 Avatar asked Aug 25 '11 07:08

Jeffrey04


1 Answers

My guess is that the Mahout implementation of fuzzy-c-means needs initial clusters to start with, which you maybe did not supply?

Also it sounds a bit as if you are running single-node? Note that for single-node systems you should avoid all the Mahout/Hadoop overhead and just use a regular clustering algorithm. Hadoop/Mahout comes at quite a cost that only pays off when you can no longer process the data on a single system. It is not "map reduce" unless you do that on a large number of systems.

like image 153
Has QUIT--Anony-Mousse Avatar answered Oct 15 '22 03:10

Has QUIT--Anony-Mousse