Groovy Apache Groovy

Entries tagged [records]

Reading and Writing CSV files with Groovy

by paulk


Posted on Monday July 25, 2022 at 02:26PM in Technology


In this post, we'll look at reading and writing CSV files using Groovy.

Aren't CSV files just text files?

For simple cases, we can treat CSV files no differently than we would other text files. Suppose we have the following data that we would like to write to a CSV file:

def data = [
['place', 'firstname', 'lastname', 'team'],
['1', 'Lorena', 'Wiebes', 'Team DSM'],
['2', 'Marianne', 'Vos', 'Team Jumbo Visma'],
['3', 'Lotte', 'Kopecky', 'Team SD Worx']
]

Groovy uses File or Path objects similar to Java. We'll use a File object here and, for our purposes, we'll just use a temporary file since we are just going to read it back in and check it against our data. Here is how to create a temporary file:

def file = File.createTempFile('FemmesStage1Podium', '.csv')

Writing our CSV (in this simple example) is as simple as joining the data with commas and the lines with line separator character(s):

file.text = data*.join(',').join(System.lineSeparator())

Here we "wrote" the entire file contents in one go but there are options for writing a line or character or byte at a time.

Reading the data in is just as simple. We read the lines and split on commas:

assert file.readLines()*.split(',') == data

In general, we might want to further process the data. Groovy provides nice options for this too. Suppose we have the following existing CSV file:
HommesOverall.png
We can read in the file and select various columns of interest with code like below:

def file = new File('HommesStageWinners.csv')
def rows = file.readLines().tail()*.split(',')
int total = rows.size()
Set names = rows.collect { it[1] + ' ' + it[2] }
Set teams = rows*.getAt(3)
Set countries = rows*.getAt(4)
String result = "Across $total stages, ${names.size()} riders from " +
"${teams.size()} teams and ${countries.size()} countries won stages."
assert result == 'Across 21 stages, 15 riders from 10 teams and 9 countries won stages.'

Here, the tail() method skips over the header line. Column 0 contains the stage number which we ignore. Column 1 contains the first name, column 2 the last name, column 3 the team, and column 4 the country of the rider. We store away the full names, teams and countries in sets to remove duplicates. We then create an overall result message using the size of those sets.

While for this simple example, the coding was fairly simple, it isn't recommended to hand process CSV files in this fashion. The details for CSV can quickly get messy. What if the values themselves contain commas or newlines? Perhaps we can surround in double quotes but then what if the value contains a double quote? And so forth. For this reason, CSV libraries are recommended.

We'll look at three shortly, but first let's summarise some of the highlights of the tour by looking at multiple winners. Here is some code which summarises our CSV data:

def byValueDesc = { -it.value }
def bySize = { k, v -> [k, v.size()] }
def isMultiple = { it.value > 1 }
def multipleWins = { Closure select -> rows
.groupBy(select)
.collectEntries(bySize)
.findAll(isMultiple)
.sort(byValueDesc)
.entrySet()
.join(', ')
}
println 'Multiple wins by country:\n' + multipleWins{ it[4] }
println 'Multiple wins by rider:\n' + multipleWins{ it[1] + ' ' + it[2] }
println 'Multiple wins by team:\n' + multipleWins{ it[3] }

This summary has nothing in particular to do with CSV files but is summarised in honour of the great riding during the tour! Here's the output:

MultipleWins.png

Okay, now let's look at our three CSV libraries.

Commons CSV

The Apache Commons CSV library makes writing and parsing CSV files easier. Here is the code for writing our CSV which makes use of the CSVPrinter class:

file.withWriter { w ->
new CSVPrinter(w, CSVFormat.DEFAULT).printRecords(data)
}

And here is the code for reading it back in which uses the RFC4180 parser factory singleton:

file.withReader { r ->
assert RFC4180.parse(r).records*.toList() == data
}

There are other singleton factories for tab-separated values and other common formats and builders to let you set a whole variety of options like escape characters, quote options, whether to use an enum to define header names, and whether to ignore empty lines or nulls.

For our more elaborate example, we have a tiny bit more work to do. We'll use the builder to tell the parser to skip the header row. We could have chosen to use the tail() trick we used earlier but we decided to use the parser features instead. The code would look like this:

file.withReader { r ->
def rows = RFC4180.builder()
.setHeader()
.setSkipHeaderRecord(true)
.build()
.parse(r)
.records
assert rows.size() == 21
assert rows.collect { it.firstname + ' ' + it.lastname }.toSet().size() == 15
assert rows*.team.toSet().size() == 10
assert rows*.country.toSet().size() == 9
}

You can see here that we have used column names rather than column numbers during our processing. Using column names is another advantage of using the CSV library; it would be quite a lot of work to do that aspect by hand. Also note that, for simplicity, we didn't create the entire result message as in the earlier example. Instead, we just checked the size of all of the relevant sets that we calculated previously.

OpenCSV

The OpenCSV library handles the messy CSV details when needed but doesn't get in the way for simple cases. For our first example, the CSVReader and CSVWriter classes will be suitable. Here is the code for writing our CSV file in the same way as earlier:

file.withWriter { w ->
new CSVWriter(w).writeAll(data.collect{ it as String[] })
}

And here is the code for reading data:

file.withReader { r ->
assert new CSVReader(r).readAll() == data
}

If we look at the produced file, it is already a little fancier than earlier with double quotes around all data:

FemmesPodiumStage1.png

If we want to do more elaborate processing, the CSVReaderHeaderAware class is aware of the initial header row and its column names. Here is our more elaborate example which processed some of the data further:

file.withReader { r ->
def rows = []
def reader = new CSVReaderHeaderAware(r)
while ((next = reader.readMap())) rows << next
assert rows.size() == 21
assert rows.collect { it.firstname + ' ' + it.lastname }.toSet().size() == 15
assert rows*.team.toSet().size() == 10
assert rows*.country.toSet().size() == 9
}

You can see here that we have again used column names rather than column numbers during our processing. For simplicity, we followed the same style as in the Commons CSV example and just checked the size of all of the relevant sets that we calculated previously.

OpenCSV also supports transforming CSV files into JavaBean instances. First, we define our target class (or annotate an existing domain class):

class Cyclist {
@CsvBindByName(column = 'firstname')
String first
@CsvBindByName(column = 'lastname')
String last
@CsvBindByName
String team
@CsvBindByName
String country
}

For two of the columns, we've indicated that the column name in the CSV file doesn't match our class property. The annotation attribute caters for that scenario.

Then, we can use this code to convert our CSV file into a list of domain objects:

file.withReader { r ->
List<Cyclist> rows = new CsvToBeanBuilder(r).withType(Cyclist).build().parse()
assert rows.size() == 21
assert rows.collect { it.first + ' ' + it.last }.toSet().size() == 15
assert rows*.team.toSet().size() == 10
assert rows*.country.toSet().size() == 9
}

OpenCSV has many options we didn't show. When writing files you can specify the separator and quote characters, when reading CSV you can specify column positions, types, and validate data.

Jackson Databind CSV

The Jackson Databind library supports the CSV format (as well as many others).

Writing CSV files from existing data is simple as shown here for running example:

file.withWriter { w ->
new CsvMapper().writeValue(w, data)
}

This writes the data into our temporary file as we saw with previous examples. One minor difference is that by default, just the values containing spaces will be double quoted but like the other libraries, there are many configuration options to tweak such settings.

Reading the data can be achieved using the following code:

def mapper = new CsvMapper().readerForListOf(String).with(CsvParser.Feature.WRAP_AS_ARRAY)
file.withReader { r ->
assert mapper.readValues(r).readAll() == data
}

Our more elaborate example is done in a similar way:

def schema = CsvSchema.emptySchema().withHeader()
def mapper = new CsvMapper().readerForMapOf(String).with(schema)
file.withReader { r ->
def rows = mapper.readValues(r).readAll()
assert rows.size() == 21
assert rows.collect { it.firstname + ' ' + it.lastname }.toSet().size() == 15
assert rows*.team.toSet().size() == 10
assert rows*.country.toSet().size() == 9
}

Here, we tell the library to make use of our header row and store each row of data in a map.

Jackson Databind also supports writing to classes including JavaBeans as well as records. Let's create a record to hold our cyclist information:

@JsonCreator
record Cyclist(
@JsonProperty('stage') int stage,
@JsonProperty('firstname') String first,
@JsonProperty('lastname') String last,
@JsonProperty('team') String team,
@JsonProperty('country') String country) {
String full() { "$first $last" }
}

Note that again we can indicate where our record component names may not match the names used in the CSV file, we simply supply the alternate name when specifying the property. There are other options like indicating that a field is required or giving its column position but we don't need those options for our example. We've also added a full() helper method to return the full name of the cyclist.

Groovy will use native records on platforms that support it (JDK16+) or emulated records on earlier platforms.

Now we can write our code for record deserialization:

def schema = CsvSchema.emptySchema().withHeader()
def mapper = new CsvMapper().readerFor(Cyclist).with(schema)
file.withReader { r ->
List<Cyclist> records = mapper.readValues(r).readAll()
assert records.size() == 21
assert records*.full().toSet().size() == 15
assert records*.team.toSet().size() == 10
assert records*.country.toSet().size() == 9
}

Conclusion

We have looked at writing and reading CSV files to Strings and domain classes and records. We had a look at handling simple cases by hand and also looked at the OpenCSV, Commons CSV and Jackson Databind CSV libraries.

Code for these examples:
https://github.com/paulk-asert/CsvGroovy

Code for other examples of using Groovy for Data Science:
https://github.com/paulk-asert/groovy-data-science




Comparators and Sorting in Groovy

by paulk


Posted on Thursday July 21, 2022 at 03:51PM in Technology


2022-07-22 01_05_29-s-l300.webp (300×291).pngThis blog post is inspired by the Comparator examples in the excellent Collections Refuelled talk and blog by Stuart Marks. That blog from 2017 highlights improvements in the Java collections library in Java 8 and 9 including numerous Comparator improvements. It is now 5 years old but still highly recommended for anyone using the Java collections library.

Rather than have a Student class as per the original blog example, we'll have a Celebrity class (and later record) which has the same first and last name fields and an additional age field. We'll compare initially by last name with nulls before non-nulls and then by first name and lastly by age.

As with the original blog, we'll cater for nulls, e.g. a celebrity known by a single name.

The Java comparator story recap

JavaTransparent.pngOur Celebrity class if we wrote it in Java would look something like:

public class Celebrity {                    // Java
private String firstName;
private String lastName;
private int age;

public Celebrity(String firstName, int age) {
this(firstName, null, age);
}

public Celebrity(String firstName, String lastName, int age) {
this.firstName = firstName;
this.lastName = lastName;
this.age = age;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public String getFirstName() {
return firstName;
}

public void setFirstName(String firstName) {
this.firstName = firstName;
}

public String getLastName() {
return lastName;
}

public void setLastName(String lastName) {
this.lastName = lastName;
}

@Override
public String toString() {
return "Celebrity{" +
"firstName='" + firstName +
(lastName == null ? "" : "', lastName='" + lastName) +
"', age=" + age +
'}';
}
}

It would look much nicer as a Java record (JDK16+) but we'll keep with the spirit of the original blog example for now. This is fairly standard boilerplate and in fact was mostly generated by IntelliJ IDEA. The only slightly interesting aspect is that we tweaked the toString method to not display null last names.

On JDK 8 with the old-style comparator coding, a main application which created and sorted some celebrities might look like this:

import java.util.ArrayList;            // Java
import java.util.Collections;
import java.util.List;

public class Main {
public static void main(String[] args) {
List<Celebrity> celebrities = new ArrayList<>();
celebrities.add(new Celebrity("Cher", "Wang", 63));
celebrities.add(new Celebrity("Cher", "Lloyd", 28));
celebrities.add(new Celebrity("Alex", "Lloyd", 47));
celebrities.add(new Celebrity("Alex", "Lloyd", 37));
celebrities.add(new Celebrity("Cher", 76));
Collections.sort(celebrities, (c1, c2) -> {
String f1 = c1.getLastName();
String f2 = c2.getLastName();
int r1;
if (f1 == null) {
r1 = f2 == null ? 0 : -1;
} else {
r1 = f2 == null ? 1 : f1.compareTo(f2);
}
if (r1 != 0) {
return r1;
}
int r2 = c1.getFirstName().compareTo(c2.getFirstName());
if (r2 != 0) {
return r2;
}
return Integer.compare(c1.getAge(), c2.getAge());
});
System.out.println("Celebrities:");
celebrities.forEach(System.out::println);
}
}

When we run this example, the output looks like this:

Celebrities:
Celebrity{firstName='Cher', age=76}
Celebrity{firstName='Alex', lastName='Lloyd', age=37}
Celebrity{firstName='Alex', lastName='Lloyd', age=47}
Celebrity{firstName='Cher', lastName='Lloyd', age=28}
Celebrity{firstName='Cher', lastName='Wang', age=63}

As pointed out in the original blog, this code is rather tedious and error-prone and can be improved greatly with comparator improvements in JDK8:

import java.util.Arrays;             // Java
import java.util.List;

import static java.util.Comparator.comparing;
import static java.util.Comparator.naturalOrder;
import static java.util.Comparator.nullsFirst;

public class Main {
public static void main(String[] args) {
List<Celebrity> celebrities = Arrays.asList(
new Celebrity("Cher", "Wang", 63),
new Celebrity("Cher", "Lloyd", 28),
new Celebrity("Alex", "Lloyd", 47),
new Celebrity("Alex", "Lloyd", 37),
new Celebrity("Cher", 76));
celebrities.sort(comparing(Celebrity::getLastName, nullsFirst(naturalOrder())).
thenComparing(Celebrity::getFirstName).thenComparing(Celebrity::getAge));
System.out.println("Celebrities:");
celebrities.forEach(System.out::println);
}
}

The original blog also points out the convenience factory methods from JDK9 for list creation which you might be tempted to consider here. For our case, we will be sorting in place, so the immutable lists returned by those methods won't help us here but Arrays.asList isn't much longer than List.of and works well for this example.

As well as being much shorter, the comparing and thenComparing methods and built-in comparators like nullsFirst and naturalOrdering allow for far simpler composability. The sort within array list is also more efficient than the sort that would have been used with the Collections.sort method on earlier JDKs. The output when running the example is the same as previously.

The Groovy comparator story

groovy.pngAt about the same time that Java was evolving its comparator story Groovy added some complementary features to tackle many of the same problems. We'll look at some of those features and also how the JDK improvements we saw above features can be used instead if preferred.

First off, let's create a Groovy Celebrity record:

@Sortable(includes = 'last,first,age')
@ToString(ignoreNulls = true, includeNames = true)
record Celebrity(String first, String last = null, int age) {}

And create our list of celebrities:

var celebrities = [
new Celebrity("Cher", "Wang", 63),
new Celebrity("Cher", "Lloyd", 28),
new Celebrity("Alex", "Lloyd", 47),
new Celebrity("Alex", "Lloyd", 37),
new Celebrity(first: "Cher", age: 76)
]

The record definition is nice and concise. It would look good in recent Java versions too. A nice aspect of the Groovy solution is that it will provide emulated records on earlier JDKs and it also has some nice declarative transforms to tweak the record definition. We could leave off the @ToString annotation and we'd get a standard record-style toString. Or we could add a toString method to our record definition similar to what was done in the Java example. Using @ToString allows us to remove null last names from the toString in a declarative way. We'll cover the @Sortable annotation a little later.

First off, Groovy's spaceship operator <=> allows us to write a nice compact version of the "tedious" code in the first Java version. It looks like this:

celebrities.sort { c1, c2 ->
c1.last <=> c2.last ?: c1.first <=> c2.first ?: c1.age <=> c2.age
}
println 'Celebrities:\n' + celebrities.join('\n')

And the output looks like this:

Celebrities:
Celebrity(first:Cher, age:76)
Celebrity(first:Alex, last:Lloyd, age:37)
Celebrity(first:Alex, last:Lloyd, age:47)
Celebrity(first:Cher, last:Lloyd, age:28)
Celebrity(first:Cher, last:Wang, age:63)

We'd have a tiny bit more work to do if we wanted nulls last but the defaults work well for the example at hand.

We can alternatively, make use of the "new in JDK8" methods mentioned earlier:

celebrities.sort(comparing(Celebrity::last, nullsFirst(naturalOrder())).
thenComparing(c -> c.first).thenComparing(c -> c.age))

But this is where we should come back and further explain the @Sortable annotation. That annotation is associated with an Abstract Syntax Tree (AST) transformation, or just transform for short, which provides us with an automatic compareTo method that takes into account the record's properties (and likewise if it was a class). Since we provided an includes annotation attribute and provided a list of property names, the order of those names determines the priority of the properties used in the comparator. We could equally include just some of the names in that list or alternatively provide an excludes annotation attribute and just mention that properties we don't want included.

It also adds Comparable<Celebrity> to the list of implemented interfaces for our record. So, what does all this mean? It means we can just write:

celebrities.sort()

The transform associated with the @Sortable annotation also provides some additional comparators for us. To sort by age, we can use one of those comparators:

celebrities.sort(Celebrity.comparatorByAge())

Which gives this output:

Celebrities:
Celebrity(first:Cher, last:Lloyd, age:28)
Celebrity(first:Alex, last:Lloyd, age:37)
Celebrity(first:Alex, last:Lloyd, age:47)
Celebrity(first:Cher, last:Wang, age:63)
Celebrity(first:Cher, age:76)

In addition to the sort method, Groovy provides a toSorted method which sorts a copy of the list, leaving the original unchanged. So, to create a list sorted by first name we can use this code:

var celebritiesByFirst = celebrities.toSorted(Celebrity.comparatorByFirst())

Which if output in a similar way to previous examples gives:

Celebrities:
Celebrity(first:Alex, last:Lloyd, age:37)
Celebrity(first:Alex, last:Lloyd, age:47)
Celebrity(first:Cher, last:Lloyd, age:28)
Celebrity(first:Cher, last:Wang, age:63)
Celebrity(first:Cher, age:76)

If you are a fan of functional style programming, you might consider using List.of to define the original list and then only toSorted method calls in further processing.

Mixing in some language integrated queries

Groovy also has a GQuery (aka GINQ) capability which provides a SQL inspired DSL for working with collections. We can use GQueries to examine and order our collection. Here is an example:

println GQ {
from c in celebrities
select c.first, c.last, c.age
}

Which has this output:

+-------+-------+-----+
| first | last  | age |
+-------+-------+-----+
| Cher  |       | 76  |
| Alex  | Lloyd | 37  |
| Alex  | Lloyd | 47  |
| Cher  | Lloyd | 28  |
| Cher  | Wang  | 63  |
+-------+-------+-----+

In this case, it's using the natural ordering which @Sortable gives us.

Or we can sort by age:

println GQ {
from c in celebrities
orderby c.age
select c.first, c.last, c.age
}

Which has this output:

+-------+-------+-----+
| first | last  | age |
+-------+-------+-----+
| Cher  | Lloyd | 28  |
| Alex  | Lloyd | 37  |
| Alex  | Lloyd | 47  |
| Cher  | Wang  | 63  |
| Cher  |       | 76  |
+-------+-------+-----+

Or we can sort by last name descending and then age:

println GQ {
from c in celebrities
orderby c.last in desc, c.age
select c.first, c.last, c.age
}

Which has this output:

+-------+-------+-----+
| first | last  | age |
+-------+-------+-----+
| Cher  | Wang  | 63  |
| Cher  | Lloyd | 28  |
| Alex  | Lloyd | 37  |
| Alex  | Lloyd | 47  |
| Cher  |       | 76  |
+-------+-------+-----+

Conclusion

We have seen a little example of using comparators in Groovy. All the great JDK capabilities are available as well as the spaceship operator, the sort and toSorted methods, and the @Sortable AST transformation.


Using Groovy with Apache Wayang and Apache Spark

by paulk


Posted on Sunday June 19, 2022 at 01:01PM in Technology


wayang.pngApache Wayang (incubating) is an API for big data cross-platform processing. It provides an abstraction over other platforms like Apache Spark and Apache Flink as well as a default built-in stream-based "platform". The goal is to provide a consistent developer experience when writing code regardless of whether a light-weight or highly-scalable platform may eventually be required. Execution of the application is specified in a logical plan which is again platform agnostic. Wayang will transform the logical plan into a set of physical operators to be executed by specific underlying processing platforms.

Whiskey Clustering

groovy.pngWe'll take a look at using Apache Wayang with Groovy to help us in the quest to find the perfect single-malt Scotch whiskey. The whiskies produced from 86 distilleries have been ranked by expert tasters according to 12 criteria (Body, Sweetness, Malty, Smoky, Fruity, etc.). We'll use a KMeans algorithm to calculate the centroids. This is similar to the KMeans example in the Wayang documentation but instead of 2 dimensions (x and y coordinates), we have 12 dimensions corresponding to our criteria. The main point is that it is illustrative of typical data science and machine learning algorithms involving iteration (the typical map, filter, reduce style of processing).

whiskey_bottles.jpg

KMeans is a standard data-science clustering technique. In our case, it groups whiskies with similar characteristics (according to the 12 criteria) into clusters. If we have a favourite whiskey, chances are we can find something similar by looking at other instances in the same cluster. If we are feeling like a change, we can look for a whiskey in some other cluster. The centroid is the notional "point" in the middle of the cluster. For us it reflects the typical measure of each criteria for a whiskey in that cluster.

Implementation Details

We'll start with defining a Point record:

record Point(double[] pts) implements Serializable {
static Point fromLine(String line) { new Point(line.split(',')[2..-1]*.toDouble() as double[]) }
}

We've made it Serializable (more on that later) and included a fromLine factory method to help us make points from a CSV file. We'll do that ourselves rather than rely on other libraries which could assist. It's not a 2D or 3D point for us but 12D corresponding to the 12 criteria. We just use a double array, so any dimension would be supported but the 12 comes from the number of columns in our data file.

We'll define a related TaggedPointCounter record. It's like a Point but tracks a cluster Id and count used when clustering the "points":

record TaggedPointCounter(double[] pts, int cluster, long count) implements Serializable {
TaggedPointCounter plus(TaggedPointCounter that) {
new TaggedPointCounter((0..<pts.size()).collect{ pts[it] + that.pts[it] } as double[], cluster, count + that.count)
}

TaggedPointCounter average() {
new TaggedPointCounter(pts.collect{ double d -> d/count } as double[], cluster, 0)
}
}

We have plus and average methods which will be helpful in the map/reduce parts of the algorithm.

Another aspect of the KMeans algorithm is assigning points to the cluster associated with their nearest centroid. For 2 dimensions, recalling pythagoras' theorem, this would be the square root of x squared plus y squared, where x and y are the distance of a point from the centroid in the x and y dimensions respectively. We'll do the same across all dimensions and define the following helper class to capture this part of the algorithm:

class SelectNearestCentroid implements ExtendedSerializableFunction<Point, TaggedPointCounter> {
Iterable<TaggedPointCounter> centroids

void open(ExecutionContext context) {
centroids = context.getBroadcast("centroids")
}

TaggedPointCounter apply(Point p) {
def minDistance = Double.POSITIVE_INFINITY
def nearestCentroidId = -1
for (c in centroids) {
def distance = sqrt((0..<p.pts.size()).collect{ p.pts[it] - c.pts[it] }.sum{ it ** 2 } as double)
if (distance < minDistance) {
minDistance = distance
nearestCentroidId = c.cluster
}
}
new TaggedPointCounter(p.pts, nearestCentroidId, 1)
}
}

In Wayang parlance, the SelectNearestCentroid class is a UDF, a User-Defined Function. It represents some chunk of functionality where an optimization decision can be made about where to run the operation.

Once we get to using Spark, the classes in the map/reduce part of our algorithm will need to be serializable. Method closures in dynamic Groovy aren't serializable. We have a few options to avoid using them. I'll show one approach here which is to use some helper classes in places where we might typically use method references. Here are the helper classes:

class Cluster implements SerializableFunction<TaggedPointCounter, Integer> {
Integer apply(TaggedPointCounter tpc) { tpc.cluster() }
}

class Average implements SerializableFunction<TaggedPointCounter, TaggedPointCounter> {
TaggedPointCounter apply(TaggedPointCounter tpc) { tpc.average() }
}

class Plus implements SerializableBinaryOperator<TaggedPointCounter> {
TaggedPointCounter apply(TaggedPointCounter tpc1, TaggedPointCounter tpc2) { tpc1.plus(tpc2) }
}

Now we are ready for our KMeans script:

int k = 5
int iterations = 20

// read in data from our file
def url = WhiskeyWayang.classLoader.getResource('whiskey.csv').file
def pointsData = new File(url).readLines()[1..-1].collect{ Point.fromLine(it) }
def dims = pointsData[0].pts().size()

// create some random points as initial centroids
def r = new Random()
def initPts = (1..k).collect { (0..<dims).collect { r.nextGaussian() + 2 } as double[] }

// create planbuilder with Java and Spark enabled
def configuration = new Configuration()
def context = new WayangContext(configuration)
.withPlugin(Java.basicPlugin())
.withPlugin(Spark.basicPlugin())
def planBuilder = new JavaPlanBuilder(context, "KMeans ($url, k=$k, iterations=$iterations)")

def points = planBuilder
.loadCollection(pointsData).withName('Load points')

def initialCentroids = planBuilder
.loadCollection((0..<k).collect{ idx -> new TaggedPointCounter(initPts[idx], idx, 0) })
.withName("Load random centroids")

def finalCentroids = initialCentroids
.repeat(iterations, currentCentroids ->
points.map(new SelectNearestCentroid())
.withBroadcast(currentCentroids, "centroids").withName("Find nearest centroid")
.reduceByKey(new Cluster(), new Plus()).withName("Add up points")
.map(new Average()).withName("Average points")
.withOutputClass(TaggedPointCounter)).withName("Loop").collect()

println 'Centroids:'
finalCentroids.each { c ->
println "Cluster$c.cluster: ${c.pts.collect{ sprintf('%.3f', it) }.join(', ')}"
}

Here, k is the desired number of clusters, and iterations is the number of times to iterate through the KMeans loop. The pointsData variable is a list of Point instances loaded from our data file. We'd use the readTextFile method instead of loadCollection if our data set was large. The initPts variable is some random starting positions for our initial centroids. Being random, and given the way the KMeans algorithm works, it is possible that some of our clusters may have no points assigned.

Our algorithm works by assigning, at each iteration, all the points to their closest current centroid and then calculating the new centroids given those assignments. Finally, we output the results.

Running with the Java streams-backed platform

As we mentioned earlier, Wayang selects which platform(s) will run our application. It has numerous capabilities whereby cost functions and load estimators can be used to influence and optimize how the application is run. For our simple example, it is enough to know that even though we specified Java or Spark as options, Wayang knows that for our small data set, the Java streams option is the way to go.

Since we prime the algorithm with random data, we expect the results to be slightly different each time the script is run, but here is one output:

> Task :WhiskeyWayang:run
Centroids:
Cluster0: 2.548, 2.419, 1.613, 0.194, 0.097, 1.871, 1.742, 1.774, 1.677, 1.935, 1.806, 1.613
Cluster2: 1.464, 2.679, 1.179, 0.321, 0.071, 0.786, 1.429, 0.429, 0.964, 1.643, 1.929, 2.179
Cluster3: 3.250, 1.500, 3.250, 3.000, 0.500, 0.250, 1.625, 0.375, 1.375, 1.375, 1.250, 0.250
Cluster4: 1.684, 1.842, 1.211, 0.421, 0.053, 1.316, 0.632, 0.737, 1.895, 2.000, 1.842, 1.737 ...

Which if plotted looks like this:

WhiskeyWayang Centroid Spider Plot

If you are interested, check out the examples in the repo links at the end of this article to see the code for producing this centroid spider plot or the Jupyter/BeakerX notebook in this project's github repo.

Running with Apache Spark

spark.pngGiven our small dataset size and no other customization, Wayang will choose the Java streams based solution. We could use Wayang optimization features to influence which processing platform it chooses, but to keep things simple, we'll just disable the Java streams platform in our configuration by making the following change in our code:

WhiskeyWayang_DisableJava.png

Now when we run the application, the output will be something like this (a solution similar to before but with 1000+ extra lines of Spark and Wayang log information - truncated for presentation purposes):

[main] INFO org.apache.spark.SparkContext - Running Spark version 3.3.0
[main] INFO org.apache.spark.util.Utils - Successfully started service 'sparkDriver' on port 62081.
...
Centroids:
Cluster4: 1.414, 2.448, 0.966, 0.138, 0.034, 0.862, 1.000, 0.483, 1.345, 1.690, 2.103, 2.138
Cluster0: 2.773, 2.455, 1.455, 0.000, 0.000, 1.909, 1.682, 1.955, 2.091, 2.045, 2.136, 1.818
Cluster1: 1.762, 2.286, 1.571, 0.619, 0.143, 1.714, 1.333, 0.905, 1.190, 1.952, 1.095, 1.524
Cluster2: 3.250, 1.500, 3.250, 3.000, 0.500, 0.250, 1.625, 0.375, 1.375, 1.375, 1.250, 0.250
Cluster3: 2.167, 2.000, 2.167, 1.000, 0.333, 0.333, 2.000, 0.833, 0.833, 1.500, 2.333, 1.667
...
[shutdown-hook-0] INFO org.apache.spark.SparkContext - Successfully stopped SparkContext
[shutdown-hook-0] INFO org.apache.spark.util.ShutdownHookManager - Shutdown hook called

Discussion

A goal of Apache Wayang is to allow developers to write platform-agnostic applications. While this is mostly true, the abstractions aren't perfect. As an example, if I know I am only using the streams-backed platform, I don't need to worry about making any of my classes serializable (which is a Spark requirement). In our example, we could have omitted the "implements Serializable" part of the TaggedPointCounter record, and we could have used a method reference TaggedPointCounter::average instead of our Average helper class. This isn't meant to be a criticism of Wayang, after all if you want to write cross-platform UDFs, you might expect to have to follow some rules. Instead, it is meant to just indicate that abstractions often have leaks around the edges. Sometimes those leaks can be beneficially used, other times they are traps waiting for unknowing developers.

To summarise, if using the Java streams-backed platform, you can run the application on JDK17 (which uses native records) as well as JDK11 and JDK8 (where Groovy provides emulated records). Also, we could make numerous simplifications if we desired. When using the Spark processing platform, the potential simplifications aren't applicable, and we can run on JDK8 and JDK11 (Spark isn't yet supported on JDK17).

Conclusion

We have looked at using Apache Wayang to implement a KMeans algorithm that runs either backed by the JDK streams capabilities or by Apache Spark. The Wayang API hid from us some of the complexities of writing code that works on a distributed platform and some of the intricacies of dealing with the Spark platform. The abstractions aren't perfect but they certainly aren't hard to use and provide extra protection should we wish to move between platforms. As an added bonus, they open up numerous optimization possibilities.

Apache Wayang is an incubating project at Apache and still has work to do before it graduates but lots of work has gone on previously (it was previously known as Rheem and was started in 2015). Platform agnostic applications is a holy grail that has been desired for many years but is hard to achieve. It should be exciting to see how far Apache Wayang progresses in achieving this goal.

More Information

  • Repo containing the source code: WhiskeyWayang
  • Repo containing similar examples using a variety of libraries including Apache Commons CSV, Weka, Smile, Tribuo and others: Whiskey
  • A similar example using Apache Spark directly but with a built-in parallelized KMeans from the spark-mllib library rather than a hand-crafted algorithm: WhiskeySpark
  • A similar example using Apache Ignite directly but with a built-in clustered KMeans from the ignite-ml library rather than a hand-crafted algorithm: WhiskeyIgnite