Groovy Apache Groovy

Entries tagged [kmeans]

Whiskey Clustering with Groovy and Apache Ignite

by paulk


Posted on Thursday October 27, 2022 at 11:13AM in Technology


In a previous blog post, we looked at using Apache Wayang (incubating) and Apache Spark to scale up the k-means clustering algorithm. Let's look at another useful technology for scaling up this problem, Apache Ignite. They recently released a new version, but earlier versions are also fine for our example. Before we start, a quick reminder of the problem.

Whiskey Clustering

groovy.pngThis problem looks at the quest of finding the perfect single-malt Scotch whiskey. The whiskies produced from 86 distilleries have been ranked by expert tasters according to 12 criteria (Body, Sweetness, Malty, Smoky, Fruity, etc.). We'll use a K-means algorithm to calculate the centroids.

whiskey_bottles.jpg

K-means is a standard data-science clustering technique. In our case, it groups whiskies with similar characteristics (according to the 12 criteria) into clusters. If we have a favourite whiskey, chances are we can find something similar by looking at other instances in the same cluster. If we are feeling like a change, we can look for a whiskey in some other cluster. The centroid is the notional "point" in the middle of the cluster. For us it reflects the typical measure of each criteria for a whiskey in that cluster.

Apache Ignite

Apache Ignite is a distributed database for high-performance computing with in-memory speed. It makes a cluster (or grid) of nodes appear like an in-memory cache.

This explanation drastically simplifies Ignite's feature set. Ignite can be used as:

  • an in-memory cache with special features like SQL querying and transactional properties
  • an in-memory data-grid with advanced read-through & write-through capabilities on top of one or more distributed databases
  • an ultra-fast and horizontally scalable in-memory database
  • a high-performance computing engine for custom or built-in tasks including machine learning

It is mostly this last capability that we will use. Ignite's Machine Learning API has purpose built, cluster-aware machine learning and deep learning algorithms for Classification, Regression, Clustering, and Recommendation among others. We'll use the distributed K-means Clustering algorithm from their library.

2022-10-27 21_17_19-Machine Learning _ Ignite Documentation.png

Implementation Details

Apache Ignite has special capabilities for reading data into the cache. We could use IgniteDataStreamer or IgniteCache.loadCache() and load data from files, stream sources, various database sources and so forth. This is particularly relevant when using a cluster.

For our little example, our data is in a relatively small CSV file and we will be using a single node, so we'll just read our data using Apache Commons CSV:

var file = getClass().classLoader.getResource('whiskey.csv').file as File
var rows = file.withReader {r -> RFC4180.parse(r).records*.toList() }
var data = rows[1..-1].collect{ it[2..-1]*.toDouble() } as double[][]

We'll configure our single node Ignite data cache using code (but we could place the details in a configuration file in more complex scenarios):

var cfg = new IgniteConfiguration(
peerClassLoadingEnabled: true,
discoverySpi: new TcpDiscoverySpi(
ipFinder: new TcpDiscoveryMulticastIpFinder(
addresses: ['127.0.0.1:47500..47509']
)
)
)

We'll create a few helper variables:

var features = ['Body', 'Sweetness', 'Smoky', 'Medicinal', 'Tobacco',
'Honey', 'Spicy', 'Winey', 'Nutty', 'Malty', 'Fruity', 'Floral']
var pretty = this.&sprintf.curry('%.4f')
var dist = new EuclideanDistance()
var vectorizer = new DoubleArrayVectorizer().labeled(FIRST)

Now we start the node, populate the cache, run our k-means algorithm, and print the result.

Ignition.start(cfg).withCloseable { ignite ->
println ">>> Ignite grid started for data: ${data.size()} rows X ${data[0].size()} cols"
var dataCache = ignite.createCache(new CacheConfiguration<Integer, double[]>(
name: "TEST_${UUID.randomUUID()}",
affinity: new RendezvousAffinityFunction(false, 10)))
data.indices.each { int i -> dataCache.put(i, data[i]) }
var trainer = new KMeansTrainer().withDistance(dist).withAmountOfClusters(5)
var mdl = trainer.fit(ignite, dataCache, vectorizer)
println ">>> KMeans centroids:\n${features.join(', ')}"
var centroids = mdl.centers*.all()
centroids.each { c -> println c*.get().collect(pretty).join(', ') }
dataCache.destroy()
}

Results

Here is the output:

[18:13:11]    __________  ________________
[18:13:11]   /  _/ ___/ |/ /  _/_  __/ __/
[18:13:11]  _/ // (7 7    // /  / / / _/
[18:13:11] /___/\___/_/|_/___/ /_/ /x___/
[18:13:11]
[18:13:11] ver. 2.14.0#20220929-sha1:951e8deb
[18:13:11] 2022 Copyright(C) Apache Software Foundation
...
[18:13:11] Configured plugins:
[18:13:11]   ^-- ml-inference-plugin 1.0.0
[18:13:14] Ignite node started OK (id=f731e4ab)
...
>>> Ignite grid started for data: 86 rows X 13 cols
>>> KMeans centroids
Body, Sweetness, Smoky, Medicinal, Tobacco, Honey, Spicy, Winey, Nutty, Malty, Fruity, Floral
2.7037, 2.4444, 1.4074, 0.0370, 0.0000, 1.8519, 1.6667, 1.8519, 1.8889, 2.0370, 2.1481, 1.6667
1.8500, 1.9000, 2.0000, 0.9500, 0.1500, 1.1000, 1.5000, 0.6000, 1.5500, 1.7000, 1.3000, 1.5000
1.2667, 2.1333, 0.9333, 0.1333, 0.0000, 1.0667, 0.8000, 0.5333, 1.8000, 1.7333, 2.2667, 2.2667
3.6667, 1.5000, 3.6667, 3.3333, 0.6667, 0.1667, 1.6667, 0.5000, 1.1667, 1.3333, 1.1667, 0.1667
1.5000, 2.8889, 1.0000, 0.2778, 0.1667, 1.0000, 1.2222, 0.6111, 0.5556, 1.7778, 1.6667, 2.0000
[18:13:15] Ignite node stopped OK [uptime=00:00:00.663]

We can plot the centroid characteristics in a spider plot.

2022-10-27 20_42_01-Whiskey clusters with Apache Ignite.png


More Information

  • Repo containing the source code: WhiskeyIgnite
  • Repo containing similar examples using a variety of libraries including Apache Commons CSV, Weka, Smile, Tribuo and others: Whiskey
  • A similar example using Apache Spark directly but with a built-in parallelized k-means from the spark-mllib library rather than a hand-crafted algorithm: WhiskeySpark


Fruity Eclipse Collections

by paulk


Posted on Thursday October 13, 2022 at 11:05AM in Technology


This blog post continues on to some degree from the previous post, but instead of deep learning, we'll look at clustering using k-means after first exploring some top methods of Eclipse Collections with fruit emoji examples.

Eclipse Collections Fruit Salad

First, we'll define a Fruit enum (it adds one additional fruit compared to the related Eclipse Collections kata):

code for fruit enum

We can use this enum in the following examples:

usage.png

The last example calculates red fruit in parallel threads. As coded, it uses virtual threads when run on JDK19 with preview features enabled. You can follow the suggestion in the comment to run on other JDK versions or with normal threads. In addition to Eclipse Collections, we have the GPars library on our classpath. Here we are only using one method which is managing pool lifecycle for us.

Exploring emoji colors

For some fun, let's look at whether the nominated color of each fruit matches the color of the related emoji. As in the previous blog, we'll use the slightly nicer Noto Color Emoji fonts for our fruit as shown here:

2022-10-12 14_16_42-Noto Color Emoji - Google Fonts.png

We'll use an Eclipse Collection BiMap to switch back and forth between the color names and java.awt colors:

@Field public static COLOR_OF = BiMaps.immutable.ofAll([
WHITE: WHITE, RED: RED, GREEN: GREEN, BLUE: BLUE,
ORANGE: ORANGE, YELLOW: YELLOW, MAGENTA: MAGENTA
])
@Field public static NAME_OF = COLOR_OF.inverse()

We are also going to use some helper functions to switch between RGB and HSB color values:

static hsb(int r, int g, int b) {
float[] hsb = new float[3]
RGBtoHSB(r, g, b, hsb)
hsb
}

static rgb(BufferedImage image, int x, int y) {
int rgb = image.getRGB(x, y)
int r = (rgb >> 16) & 0xFF
int g = (rgb >> 8) & 0xFF
int b = rgb & 0xFF
[r, g, b]
}

The HSB color space represents colors in a spectrum from 0 to 360 degrees:

Color Circle, Credit: https://nycdoe-cs4all.github.io/units/1/lessons/lesson_3.2

Image credit: https://nycdoe-cs4all.github.io/units/1/lessons/lesson_3.2

We have two helper methods to assist with colors. The first picks out "mostly black" and "mostly white" colors while the second uses a switch expression to carve out some regions of the color space for our colors of interest:

static range(float[] hsb) {
if (hsb[1] < 0.1 && hsb[2] > 0.9) return [0, WHITE]
if (hsb[2] < 0.1) return [0, BLACK]
int deg = (hsb[0] * 360).round()
return [deg, range(deg)]
}

static range(int deg) {
switch (deg) {
case 0..<16 -> RED
case 16..<35 -> ORANGE
case 35..<75 -> YELLOW
case 75..<160 -> GREEN
case 160..<250 -> BLUE
case 250..<330 -> MAGENTA
default -> RED
}
}

Note that the JDK doesn't have a standard color of PURPLE, so we combine purple with magenta by choosing an appropriate broad spectrum for MAGENTA.

We used a Plotly 3D interactive scatterplot (as supported by the Tablesaw Java dataframe and visualization library) to visualize our emoji colors (as degrees on the color spectrum) vs the XY coordinates:

2022-10-13 20_04_10-Color vs xy.png

We are going to try out 3 approaches for determining the predominant color of each emoji:

  1. Most common color: We find the color spectrum value for each point and count up the number of points of each color. The color with the most points will be selected. This is simple and works in many scenarios but if an apple or cherry has 100 shades of red but only one shade of green for the stalk or a leaf, green may be selected.
  2. Most common range: We group each point into a color range. The range with the most points will be selected.
  3. Centroid of biggest cluster: We divide our emoji image into a grid of sub-images. We will perform k-means clustering of the RGB values for each point in the sub-image. This will cluster similar colored points together in a cluster. The cluster with the most points will be selected and its centroid will be chosen as the selected pre-dominant color. This approach has the affect of pixelating our sub-image by color. This approach is inspired by this python article.

Most Common Color

Ignoring the background white color, the most common color for our PEACH emoji is a shade of orange. The graph below shows the count of each color:

2022-10-17 15_57_40-Color histogram for PEACH.png

Most Common Range

If instead of counting each color, we group colors into their range and count the numbers in each range, we get the following graph for PEACH:

2022-10-17 15_56_58-Range histogram for PEACH.png


K-Means

K-Means is an algorithm for finding cluster centroids. For k=3, we would start by picking 3 random points as our starting centroids.

kmeans_step1.png

We allocate all points to their closest centroid:

kmeans_step2.png

Given this allocation, we re-calculate each centroid from all of its points:

kmeans_step3.png

We repeat this process until either a stable centroid selection is found, or we have reached a certain number of iterations.

We used the K-Means algorithm from Apache Commons Math.

Here is the kind of result we would expect if run on the complete set of points for the PEACH emoji. The black dots are the centroids. It has found one green, one orange and one red centroid. The centroid with the most points allocated to it should be the most predominant color. (This is another interactive 3D scatterplot.)


RGB_3D_PEACH.png


We can plot the number of points allocated to each cluster as a bar chart. (We used a Scala plotting library to show Groovy integration with Scala.)

2022-10-17 16_56_28-Centroid sizes for PEACH.png

The code for drawing the above chart looks like this:

var trace = new Bar(intSeq([1, 2, 3]), intSeq(sizes))
.withMarker(new Marker().withColor(oneOrSeq(colors)))

var traces = asScala([trace]).toSeq()

var layout = new Layout()
.withTitle("Centroid sizes for $fruit")
.withShowlegend(false)
.withHeight(600)
.withWidth(800)

Plotly.plot(path, traces, layout, defaultConfig, false, false, true)

K-Means with subimages

The approach we will take for our third option enhances K-Means. Instead of finding centroids for the whole image as the graphs just shown do, we divide the image into subimages and perform the K-Means on each subimage. Our overall pre-dominant color is determined to be the most common color predicated across all of our subimages.

Putting it all together

Here is the final code covering all three approaches (including printing some pretty images highlighting the third approach and the Plotly 3D scatter plots):

var results = Fruit.ALL.collect { fruit ->
var file = getClass().classLoader.getResource("${fruit.name()}.png").file as File
var image = ImageIO.read(file)

var colors = [:].withDefault { 0 }
var ranges = [:].withDefault { 0 }
for (x in 0..<image.width) {
for (y in 0..<image.height) {
def (int r, int g, int b) = rgb(image, x, y)
float[] hsb = hsb(r, g, b)
def (deg, range) = range(hsb)
if (range != WHITE) { // ignore white background
ranges[range]++
colors[deg]++
}
}
}
var maxRange = ranges.max { e -> e.value }.key
var maxColor = range(colors.max { e -> e.value }.key)

int cols = 8, rows = 8
int grid = 5 // thickness of black "grid" between subimages
int stepX = image.width / cols
int stepY = image.height / rows
var splitImage = new BufferedImage(image.width + (cols - 1) * grid, image.height + (rows - 1) * grid, image.type)
var g2a = splitImage.createGraphics()
var pixelated = new BufferedImage(image.width + (cols - 1) * grid, image.height + (rows - 1) * grid, image.type)
var g2b = pixelated.createGraphics()

ranges = [:].withDefault { 0 }
for (i in 0..<rows) {
for (j in 0..<cols) {
def clusterer = new KMeansPlusPlusClusterer(5, 100)
List<DoublePoint> data = []
for (x in 0..<stepX) {
for (y in 0..<stepY) {
def (int r, int g, int b) = rgb(image, stepX * j + x, stepY * i + y)
var dp = new DoublePoint([r, g, b] as int[])
var hsb = hsb(r, g, b)
def (deg, col) = range(hsb)
data << dp
}
}
var centroids = clusterer.cluster(data)
var biggestCluster = centroids.max { ctrd -> ctrd.points.size() }
var ctr = biggestCluster.center.point*.intValue()
var hsb = hsb(*ctr)
def (_, range) = range(hsb)
if (range != WHITE) ranges[range]++
g2a.drawImage(image, (stepX + grid) * j, (stepY + grid) * i, stepX * (j + 1) + grid * j, stepY * (i + 1) + grid * i,
stepX * j, stepY * i, stepX * (j + 1), stepY * (i + 1), null)
g2b.color = new Color(*ctr)
g2b.fillRect((stepX + grid) * j, (stepY + grid) * i, stepX, stepY)
}
}
g2a.dispose()
g2b.dispose()

var swing = new SwingBuilder()
var maxCentroid = ranges.max { e -> e.value }.key
swing.edt {
frame(title: 'Original vs Subimages vs K-Means',
defaultCloseOperation: DISPOSE_ON_CLOSE, pack: true, show: true) {
flowLayout()
label(icon: imageIcon(image))
label(icon: imageIcon(splitImage))
label(icon: imageIcon(pixelated))
}
}

[fruit, maxRange, maxColor, maxCentroid]
}

println "Fruit Expected By max color By max range By k-means"
results.each { fruit, maxRange, maxColor, maxCentroid ->
def colors = [fruit.color, maxColor, maxRange, maxCentroid].collect {
NAME_OF[it].padRight(14)
}.join().trim()
println "${fruit.emoji.padRight(6)} $colors"
}

Here are the resulting images:

2022-10-13 20_37_25-Original.png

2022-10-13 20_37_08-Original.png

2022-10-13 20_36_49-Original.png

2022-10-13 20_36_27-Original.png

2022-10-13 20_36_07-Original.png

2022-10-13 20_35_21-Original.png

And, here are the final results:

final results

In our case, all three approaches yielded the same results. Results for other emojis may vary.

Further information



Using Groovy with Apache Wayang and Apache Spark

by paulk


Posted on Sunday June 19, 2022 at 01:01PM in Technology


wayang.pngApache Wayang (incubating) is an API for big data cross-platform processing. It provides an abstraction over other platforms like Apache Spark and Apache Flink as well as a default built-in stream-based "platform". The goal is to provide a consistent developer experience when writing code regardless of whether a light-weight or highly-scalable platform may eventually be required. Execution of the application is specified in a logical plan which is again platform agnostic. Wayang will transform the logical plan into a set of physical operators to be executed by specific underlying processing platforms.

Whiskey Clustering

groovy.pngWe'll take a look at using Apache Wayang with Groovy to help us in the quest to find the perfect single-malt Scotch whiskey. The whiskies produced from 86 distilleries have been ranked by expert tasters according to 12 criteria (Body, Sweetness, Malty, Smoky, Fruity, etc.). We'll use a KMeans algorithm to calculate the centroids. This is similar to the KMeans example in the Wayang documentation but instead of 2 dimensions (x and y coordinates), we have 12 dimensions corresponding to our criteria. The main point is that it is illustrative of typical data science and machine learning algorithms involving iteration (the typical map, filter, reduce style of processing).

whiskey_bottles.jpg

KMeans is a standard data-science clustering technique. In our case, it groups whiskies with similar characteristics (according to the 12 criteria) into clusters. If we have a favourite whiskey, chances are we can find something similar by looking at other instances in the same cluster. If we are feeling like a change, we can look for a whiskey in some other cluster. The centroid is the notional "point" in the middle of the cluster. For us it reflects the typical measure of each criteria for a whiskey in that cluster.

Implementation Details

We'll start with defining a Point record:

record Point(double[] pts) implements Serializable {
static Point fromLine(String line) { new Point(line.split(',')[2..-1]*.toDouble() as double[]) }
}

We've made it Serializable (more on that later) and included a fromLine factory method to help us make points from a CSV file. We'll do that ourselves rather than rely on other libraries which could assist. It's not a 2D or 3D point for us but 12D corresponding to the 12 criteria. We just use a double array, so any dimension would be supported but the 12 comes from the number of columns in our data file.

We'll define a related TaggedPointCounter record. It's like a Point but tracks a cluster Id and count used when clustering the "points":

record TaggedPointCounter(double[] pts, int cluster, long count) implements Serializable {
TaggedPointCounter plus(TaggedPointCounter that) {
new TaggedPointCounter((0..<pts.size()).collect{ pts[it] + that.pts[it] } as double[], cluster, count + that.count)
}

TaggedPointCounter average() {
new TaggedPointCounter(pts.collect{ double d -> d/count } as double[], cluster, 0)
}
}

We have plus and average methods which will be helpful in the map/reduce parts of the algorithm.

Another aspect of the KMeans algorithm is assigning points to the cluster associated with their nearest centroid. For 2 dimensions, recalling pythagoras' theorem, this would be the square root of x squared plus y squared, where x and y are the distance of a point from the centroid in the x and y dimensions respectively. We'll do the same across all dimensions and define the following helper class to capture this part of the algorithm:

class SelectNearestCentroid implements ExtendedSerializableFunction<Point, TaggedPointCounter> {
Iterable<TaggedPointCounter> centroids

void open(ExecutionContext context) {
centroids = context.getBroadcast("centroids")
}

TaggedPointCounter apply(Point p) {
def minDistance = Double.POSITIVE_INFINITY
def nearestCentroidId = -1
for (c in centroids) {
def distance = sqrt((0..<p.pts.size()).collect{ p.pts[it] - c.pts[it] }.sum{ it ** 2 } as double)
if (distance < minDistance) {
minDistance = distance
nearestCentroidId = c.cluster
}
}
new TaggedPointCounter(p.pts, nearestCentroidId, 1)
}
}

In Wayang parlance, the SelectNearestCentroid class is a UDF, a User-Defined Function. It represents some chunk of functionality where an optimization decision can be made about where to run the operation.

Once we get to using Spark, the classes in the map/reduce part of our algorithm will need to be serializable. Method closures in dynamic Groovy aren't serializable. We have a few options to avoid using them. I'll show one approach here which is to use some helper classes in places where we might typically use method references. Here are the helper classes:

class Cluster implements SerializableFunction<TaggedPointCounter, Integer> {
Integer apply(TaggedPointCounter tpc) { tpc.cluster() }
}

class Average implements SerializableFunction<TaggedPointCounter, TaggedPointCounter> {
TaggedPointCounter apply(TaggedPointCounter tpc) { tpc.average() }
}

class Plus implements SerializableBinaryOperator<TaggedPointCounter> {
TaggedPointCounter apply(TaggedPointCounter tpc1, TaggedPointCounter tpc2) { tpc1.plus(tpc2) }
}

Now we are ready for our KMeans script:

int k = 5
int iterations = 20

// read in data from our file
def url = WhiskeyWayang.classLoader.getResource('whiskey.csv').file
def pointsData = new File(url).readLines()[1..-1].collect{ Point.fromLine(it) }
def dims = pointsData[0].pts().size()

// create some random points as initial centroids
def r = new Random()
def initPts = (1..k).collect { (0..<dims).collect { r.nextGaussian() + 2 } as double[] }

// create planbuilder with Java and Spark enabled
def configuration = new Configuration()
def context = new WayangContext(configuration)
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin())
def planBuilder = new JavaPlanBuilder(context, "KMeans ($url, k=$k, iterations=$iterations)")

def points = planBuilder
.loadCollection(pointsData).withName('Load points')

def initialCentroids = planBuilder
.loadCollection((0..<k).collect{ idx -> new TaggedPointCounter(initPts[idx], idx, 0) })
.withName("Load random centroids")

def finalCentroids = initialCentroids
.repeat(iterations, currentCentroids ->
points.map(new SelectNearestCentroid())
.withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid")
.reduceByKey(new Cluster(), new Plus()).withName("Add up points")
.map(new Average()).withName("Average points")
.withOutputClass(TaggedPointCounter)).withName("Loop").collect()

println 'Centroids:'
finalCentroids.each { c ->
println "Cluster$c.cluster: ${c.pts.collect{ sprintf('%.3f', it) }.join(', ')}"
}

Here, k is the desired number of clusters, and iterations is the number of times to iterate through the KMeans loop. The pointsData variable is a list of Point instances loaded from our data file. We'd use the readTextFile method instead of loadCollection if our data set was large. The initPts variable is some random starting positions for our initial centroids. Being random, and given the way the KMeans algorithm works, it is possible that some of our clusters may have no points assigned.

Our algorithm works by assigning, at each iteration, all the points to their closest current centroid and then calculating the new centroids given those assignments. Finally, we output the results.

Running with the Java streams-backed platform

As we mentioned earlier, Wayang selects which platform(s) will run our application. It has numerous capabilities whereby cost functions and load estimators can be used to influence and optimize how the application is run. For our simple example, it is enough to know that even though we specified Java or Spark as options, Wayang knows that for our small data set, the Java streams option is the way to go.

Since we prime the algorithm with random data, we expect the results to be slightly different each time the script is run, but here is one output:

> Task :WhiskeyWayang:run
Centroids:
Cluster0: 2.548, 2.419, 1.613, 0.194, 0.097, 1.871, 1.742, 1.774, 1.677, 1.935, 1.806, 1.613
Cluster2: 1.464, 2.679, 1.179, 0.321, 0.071, 0.786, 1.429, 0.429, 0.964, 1.643, 1.929, 2.179
Cluster3: 3.250, 1.500, 3.250, 3.000, 0.500, 0.250, 1.625, 0.375, 1.375, 1.375, 1.250, 0.250
Cluster4: 1.684, 1.842, 1.211, 0.421, 0.053, 1.316, 0.632, 0.737, 1.895, 2.000, 1.842, 1.737 ...

Which if plotted looks like this:

WhiskeyWayang Centroid Spider Plot

If you are interested, check out the examples in the repo links at the end of this article to see the code for producing this centroid spider plot or the Jupyter/BeakerX notebook in this project's github repo.

Running with Apache Spark

spark.pngGiven our small dataset size and no other customization, Wayang will choose the Java streams based solution. We could use Wayang optimization features to influence which processing platform it chooses, but to keep things simple, we'll just disable the Java streams platform in our configuration by making the following change in our code:

WhiskeyWayang_DisableJava.png

Now when we run the application, the output will be something like this (a solution similar to before but with 1000+ extra lines of Spark and Wayang log information - truncated for presentation purposes):

[main] INFO org.apache.spark.SparkContext - Running Spark version 3.3.0
[main] INFO org.apache.spark.util.Utils - Successfully started service 'sparkDriver' on port 62081.
...
Centroids:
Cluster4: 1.414, 2.448, 0.966, 0.138, 0.034, 0.862, 1.000, 0.483, 1.345, 1.690, 2.103, 2.138
Cluster0: 2.773, 2.455, 1.455, 0.000, 0.000, 1.909, 1.682, 1.955, 2.091, 2.045, 2.136, 1.818
Cluster1: 1.762, 2.286, 1.571, 0.619, 0.143, 1.714, 1.333, 0.905, 1.190, 1.952, 1.095, 1.524
Cluster2: 3.250, 1.500, 3.250, 3.000, 0.500, 0.250, 1.625, 0.375, 1.375, 1.375, 1.250, 0.250
Cluster3: 2.167, 2.000, 2.167, 1.000, 0.333, 0.333, 2.000, 0.833, 0.833, 1.500, 2.333, 1.667
...
[shutdown-hook-0] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext
[shutdown-hook-0] INFO org.apache.spark.util.ShutdownHookManager - Shutdown hook called

Discussion

A goal of Apache Wayang is to allow developers to write platform-agnostic applications. While this is mostly true, the abstractions aren't perfect. As an example, if I know I am only using the streams-backed platform, I don't need to worry about making any of my classes serializable (which is a Spark requirement). In our example, we could have omitted the "implements Serializable" part of the TaggedPointCounter record, and we could have used a method reference TaggedPointCounter::average instead of our Average helper class. This isn't meant to be a criticism of Wayang, after all if you want to write cross-platform UDFs, you might expect to have to follow some rules. Instead, it is meant to just indicate that abstractions often have leaks around the edges. Sometimes those leaks can be beneficially used, other times they are traps waiting for unknowing developers.

To summarise, if using the Java streams-backed platform, you can run the application on JDK17 (which uses native records) as well as JDK11 and JDK8 (where Groovy provides emulated records). Also, we could make numerous simplifications if we desired. When using the Spark processing platform, the potential simplifications aren't applicable, and we can run on JDK8 and JDK11 (Spark isn't yet supported on JDK17).

Conclusion

We have looked at using Apache Wayang to implement a KMeans algorithm that runs either backed by the JDK streams capabilities or by Apache Spark. The Wayang API hid from us some of the complexities of writing code that works on a distributed platform and some of the intricacies of dealing with the Spark platform. The abstractions aren't perfect but they certainly aren't hard to use and provide extra protection should we wish to move between platforms. As an added bonus, they open up numerous optimization possibilities.

Apache Wayang is an incubating project at Apache and still has work to do before it graduates but lots of work has gone on previously (it was previously known as Rheem and was started in 2015). Platform agnostic applications is a holy grail that has been desired for many years but is hard to achieve. It should be exciting to see how far Apache Wayang progresses in achieving this goal.

More Information

  • Repo containing the source code: WhiskeyWayang
  • Repo containing similar examples using a variety of libraries including Apache Commons CSV, Weka, Smile, Tribuo and others: Whiskey
  • A similar example using Apache Spark directly but with a built-in parallelized KMeans from the spark-mllib library rather than a hand-crafted algorithm: WhiskeySpark
  • A similar example using Apache Ignite directly but with a built-in clustered KMeans from the ignite-ml library rather than a hand-crafted algorithm: WhiskeyIgnite