K-nearest neighbour clustering (KNN) is a supervised classification technique that looks at the nearest neighbours, in a training set of classified instances, of an unclassified instance in order to identify the class to which it belongs, for example it may be desired to determine the probable date and origin of a shard of pottery. There are several variations and refinements of KNN clustering . Some, perhaps most of these are not needed in a parallel bath processing environment such as Hadoop or Spark but may become relevant for the rapid processing portion of a lambda architecture.
The simplest KNN algorithm was implemented in Pyspark, a map-reduce framework that is some ten times faster than Hadoop when run from disk and 100 times faster when run in memory. The well known Iris dataset was used to verify the implementation. No effort was made to allow for the effects of skew in the training set Before Pyspark coding began the algorithm was implemented in a conventional way based on  with some slight improvments to the code, for example using list comprehensions rather than loops. Doing this allowed a deeper understanding of the algorithm and gave a reference point for assessing the Pyspark implementation.
Both implementations were developed in Eclipse with a view to eventual command line use of the Pyspark implementation. Eclipse/Pydev was configured as in  with the log output sent to a file. Eclipse being Eclipse occasionally the log output did not go to the file, but did go to the file on rerunning the programme
The Pyspark code is shorter and more easily understood ( once the basics of Pyspark are mastered) than the serial version. Please note that any code presented here is proof of concept code and used at your own risk.
A set of known instances, in this case Irises, is used to train the algorithm. Since not many people go round measuring Iris details for fun the data set was split randomly split into a training set and a test set with roughly two thirds of the data used as a training set
The algorithm classifies an item in the test set by computing the distance from the test item to each member of the training set, taking the K nearest neighbours and assigning the test item to the class most represented in the K nearest neighbours.
The steps nthe algorithm are
- Load data (here data is loaded from a file)
- Split the data into test and training sets
- Compute the distance from a test instance (A member of the test set) to all members of the training set
- Select the K nearest neighbours of the test instance
- Assign the test instance to the class most represented in these nearest neighbours
Running the algorithm against every member of the test set and determining the percentage of correct classifications gives an estimate of the accuracy. Since the algorithm randomly splits the data into a test and training set the result will vary every time the test is run but the accuracy was around 90% with occasional extremes of 85% and 97%
Loading the data
Loading data from a file involves creating a spark context and using that to load the data. You can only have one spark context running at a time.
# Create spark context
sc = SparkContext(appName="PysparkKnearestNeigbours")
# Read in lines from file.
records = sc.textFile(sys.argv)
This creates an RDD (Resilient Distributed Dataset) holding all data records. For this exploratory exercise the first (header) row was removed. Since RDDs are not iterable, they must be transformed into lists using collect() for example
recordlist = records.collect()
The number of nearest neighbours is similarly read from the command line
numNearestNeigbours = int(sys.argv)
numfieldsInDistance = int(sys.argv)
Splitting data into test and training sets
# Split data into test and training sets in ratio 1:2
testset,trainingset = records.randomSplit([1,2])
This does what it says, it splits the data randomly into test and training sets with about one third in the test set.
The distance function
The distance function is an integral part of the algorithm. Various distance functions can be used but here, instead of the standard Euclidean distance the absolute values of the differences in each component was used in order to minimise numerical problems
d(x,y) = sum( |xi-yi|)
Finding the nearest neighbours
Finding the nearest neighbours ran into the restriction that only one RDD can be transformed at a time. Overcoming this involved creating an RDD comprising all pairs (training instance, test instance) using cartesian().
nearestNeigbours = trainingset.cartesian(testinstance) \
.map(lambda (training,test):(training, distanceAbs(training, test, numfields))) \
.sortBy(lambda (trainingInstance, distance):distance) \
Cartesian creates an RDD with the required K-V pairs. The map statement creates a pair
(training instance, distance to test instance). sortBy() method then sorts these pairs by value ( the distance), in ascending order and take(..) crops the result to the desired number of nearest neighbours.
Assigning the test instance to a class
The code below should be self explanatory. It transforms the neighbours into an RDD of classnames. To get the assigned class convert neighbourNames to a list as above and take the first element
# (kv) pair is typically (u'5;3.4;1.6;0.4;Iris-setosa', 0.08000000000000014)
# training = u'5;3.4;1.6;0.4;Iris-setosa'
# training.split(";")[-1] = Iris-setosa
neighbourNames = neighbours.map(lambda (trainingInstance, test):(trainingInstance.split(";")[-1]))
Check the accuracy by looping over the test set comparing the assigned class in the training set to the actual class in the test set.
The steps needed to implement the KNN classification algorithm have been outlined. The two major Gotchas encountered were being able to run only one spark context at a time and being unable to use nested RDDs. The first problem was not a major one as the python methods have access to the global scope. The second one was overcome using the cartesian() method which may prove expensive for very large training sets.
The algorithm worked well on the iris dataset, but might not work so well on less well ordered sets. One improvement would be to deal with a skewed dataset ( where one class dominates) by weighting the data accordingly.
- http://digitalsteampunk.blogspot.com/2015/10/running-spark-with-python-under-os-x.html Some notes on configuring Pyspark to run code from Eclipse or from the command line