
Introduction
Stream Gatherers, introduced as a preview feature in Java 22 in March 2024 under JEP 461, marks a significant milestone exactly 10 years after the release of the Java Stream API in Java 8. This feature empowers developers to incorporate custom intermediate operations into stream processing, enabling stream pipelines to transform data in ways that were previously difficult to achieve with the existing built-in intermediate operations.
Stream Overview
The Stream API in Java provides a powerful and efficient way to process collections of data. It allows developers to perform functional-style operations on streams of elements, such as filtering, mapping, and reducing. They leverage the concept of lazy evaluation, enabling operations to be executed only when necessary. Additionally, streams can be parallelized, leveraging multicore architectures for enhanced performance when processing large datasets. Stream API in Java distinguishes between intermediate and terminal operations. Intermediate operations, like filter()
, map()
, and sorted()
, transform the elements of a stream and return a new stream as a result, without performing any processing until a terminal operation is invoked. Terminal operations, such as forEach()
, collect()
, and reduce()
, trigger the processing of elements within a stream and produce a non-stream result. Once a terminal operation is executed, the stream is consumed and cannot be reused.
Why do we need custom intermediate operations
Streams are widely utilized in Java for various tasks, yet their fixed set of intermediate operations, such as map
, reduce
, filter
, etc., limits the ease of expressing complex tasks through stream pipelines. Let us go through a code example.
record Employee(String name, int age, String department) {} List<Employee> employees = List.of( new Employee("John", 30, "HR"), new Employee("Alice", 25, "Engineering"), new Employee("Bob", 35, "Finance"), new Employee("Mary", 28, "Engineering"), new Employee("David", 40, "HR"), new Employee("John", 32, "Engineering"), new Employee("Ramesh", 31, "Engineering"), new Employee("Jen", 26, "Engineering") ); List<String> engineeringEmployeeNames = employees.stream() .filter(employee -> employee.department().equals("Engineering")) .map(Employee::name) .toList(); System.out.println(engineeringEmployeeNames); // [Alice, Mary, John, Ramesh, Jen]
We have a record Employee and a list of Employee objects. The stream pipeline starts by filtering employees based on the department equal to “Engineering”, using filter intermediate operation. Then, it maps each employee to their name using map another intermediate operation. Finally, the toList() terminal operation collects the names into a list, which is printed to the console.
In this example if we have to pair the employees using a fixed window logic, we don’t have an intermediate operation available now. The best option is to place the fixed-window grouping logic in the terminal operation, by invoking collect with a custom Collector. However, we must precede the collect operation with a fixed-size limit operation, since the collector cannot signal to collect that it is finished while new elements are appearing which happens forever with an infinite stream. Similarly, many operations required for a business application like folding, scanning etc. is missing in the intermediate operations available in streams.
Stream Gatherers as the solution
Stream gatherers solve this issue by allowing custom intermediate operation to be plugged in to the stream. A new function gather is added to the Stream interface for this which accepts an implementation of Gatherer interface. We can add pairing as an intermediate operation like below in line no 4.
var employeePairList = employees.stream() .filter(employee -> employee.department().equals("Engineering")) .map(Employee::name) .gather(fixedWindowGatherer) .toList(); System.out.println(employeePairList);//[[Alice, Mary], [John, Ramesh], [Jen]]
Now let us see how we can create this fixed window gatherer
Stream Gatherer elements
A gatherer is defined by four functions that work together:
-
Initializer: An optional initializer function provides an object that maintains private state while processing stream elements for stateful processing. In the fixed window example, we need to keep the employees in a temporary list until the count reaches the window size and then pass it downstream.
-
Integrator: This function applies the business logic on the elements in the stream, sometimes using state, and then passes it to the next stage in the stream. It can also terminate the stream like the
limit
operation. -
Finisher: An optional finisher function is invoked after all elements in the stream are processed.
-
Combiner: An optional combiner function is called while using parallel streams to combine the data from parallel runs.
Initializer
Initializer is used for creating the initial state at the start of the stream processing. This of type Supplier.
@FunctionalInterface public interface Supplier<T> { T get(); }
In the example for grouping employees by a fixed window, we need to keep the employee names in a list till it reaches the window size, and then we can pass that list to downstream. We will use ArrayList. If our gatherer is not a stateful operation we don’t need initializer.
Supplier<List<T>> initializer = ArrayList::new;
Integrator
This is only mandatory component and will have the processing logic. Integrator is a functional interface with the below structure.
@FunctionalInterface interface Integrator<A, T, R> { boolean integrate(A state, T element, Downstream<? super R> downstream); }
State is the state initialized in Initializer and element is the input element that the integrator function is currently processing. The parameter downstream is an object of type Downstream. When you call its push method, it passes its argument to the next stage in the pipeline.
@FunctionalInterface interface Downstream<T> { boolean push(T element); }
Let us implement our integrator now
Gatherer.Integrator<List<T>,T,List<T>> integrator = (state, element, downstream) -> { state.add(element); if(state.size() == limit){ var group = List.copyOf(state); downstream.push(group); state.clear(); } return true; };
Here we keep on adding input elements to the state which is of type ArrayList till it reaches window limit. Once the window limit is reached, we create a copy of the state and sends it to next stage in the pipeline by calling downstream.push
method. Then we clear state to start accumulating the next set. Then we return true to indicate that next element can be send to the gatherer. If we want to limit the pipeline to a fixed set similar to limit operation, we can return false.
Our gatherer is almost ready. But If we carefully examine there is an issue. If the number of elements is not exact multiple of window size, last few elements will get skipped. In our code example, output will be [[Alice,Mary],[John,Ramesh]]. Jen won’t get added since the accumulated state will not have reached size of window limit when the stream ends.
Finisher
This is where finisher comes into play. Finisher performs an action after the gatherer has processed all input elements; it could inspect the private state object or emit additional output elements. In our case we have few elements left in the state which can be pushed to the downstream as list.
BiConsumer<List<T>, Gatherer.Downstream<? super List<T>>> finisher = (state,downStream) ->{ if(!state.isEmpty()){ downStream.push(List.copyOf(state)); } };
Final code for gatherer will look as below.
private static <T> Gatherer<T,List<T>,List<T>> getFixedWindowGatherer(int limit) { Supplier<List<T>> initializer = ArrayList::new; Gatherer.Integrator<List<T>,T,List<T>> integrator = (state, element, downstream) -> { state.add(element); if(state.size() == limit){ var group = List.copyOf(state); downstream.push(group); state.clear(); } return true; }; BiConsumer<List<T>, Gatherer.Downstream<? super List<T>>> finisher = (state,downStream) ->{ if(!state.isEmpty()){ downStream.push(List.copyOf(state)); } }; return Gatherer.ofSequential(initializer,integrator,finisher); }
Implementing an existing intermediate operation using Gatherer
It is quite easy to implement any existing intermediate operation using Gatherer. Let us implement map operation to get a better understanding of Gatherer.
We don’t need an initializer since this is not a stateful operation like the windowing gatherer. Our function will accept a mapper function which will transform data from type T to R.
Integrator will call mapper.apply() method to transform data and then push to downstream. Gatherer.of(integrator) will create the gatherer with just integrator
private static <T,R> Gatherer<T,?,R> map(Function<T,R> mapper) { Gatherer.Integrator<Void,T,R> integrator = (_,element,downStream) -> { R newElement = mapper.apply(element); downStream.push(newElement); return true; }; return Gatherer.of(integrator); }
We can call this gatherer in our pipeline like below. Here you can see how we can chain gatherers.
var employeePairList = employees.stream() .filter(employee -> employee.department().equals("Engineering")) .gather(map(Employee::name)) .gather(fixedWindowGatherer) .toList();
Built in Gatherers
There are already some built in gatherers in JDK which can be initialised using factory methods in Gatherers class.
Fold(Supplier initial, BiFunction folder)
This is a many-to-one gatherer will combine all stream elements to a single one similar to collector.
Stream.of("a", "b", "c", "d", "e", "f", "g", "h") .gather(Gatherers.fold(()->"", (joined, element) -> STR."\{joined}\{element}")) .forEach(System.out::println);//abcdefgh
MapConcurrent(int maxConcurrency, Function mapper)
This is a one-to-one gatherer that invokes mapper for each input element in the stream concurrently, up to the limit specified by maxConcurrency
Scan(Supplier initial, BiFunction scanner)
Performs a prefix scan, an incremental accumulation using the provided functions
Stream.of("a", "b", "c", "d", "e", "f", "g", "h") .gather(Gatherers.scan(()->"", (joined, element) -> STR."\{joined}\{element}")) .forEach(System.out::println);//a, ab, abc, abcd, abcde, abcdef, abcdefg, abcdefgh
WindowFixed(int windowSize)
This is same as the gatherer we have implemented in this blog. This is a many-to-many gatherer that gathers elements in windows.
windowSliding(int windowSize)
Similar to windowFixed, this is a many-to-many gatherer that gathers elements in windows. However, each subsequent window includes all elements of the previous window except for its first element, and adds the next element in the stream. The following example demonstrates this gatherer
var employeePairListSliding = employees.stream() .filter(employee -> employee.department().equals("Engineering")) .gather(map(Employee::name)) .gather(Gatherers.windowSliding(2)) .toList(); System.out.println(employeePairListSliding);//[[Alice, Mary], [Mary, John], [John, Ramesh], [Ramesh, Jen]]
Conclusion
Stream gatherers provide an efficient way for adding customized intermediate operation to the stream pipeline in a functional style.
To stay updated with the latest updates in Java and Spring follow us on youtube, linked in and medium. You can find the code used in this blog here
Video Version
To watch a more detailed video version of Stream Gatherers, see the video below:
References
Related Posts
Clean code with Pattern Matching in Java 23
This blog introduces the powerful new pattern matching features released till Java 23. Learn how these features can simplify your code and enhance its readability.
Free AI Code Assistant with Ollama and Local LLM
Learn how to set up a powerful, free AI code assistant in VS Code and IntelliJ IDEA using Ollama and Continue plugin. This guide will help you enhance your coding experience with local AI models, providing intelligent code suggestions without relying on cloud services.