There is a lot of excitement in the Java community since Java 8 was released. Lambdas and Streams are a massive improvement and nobody wants to go back to the old days. Today, however, I want to talk about the caveats of some use of Java Streams.
When you start using Streams and its functional capabilities you start seeing your code as some pipeline of computational operations. As you're abstracting low-level concerns as iteration details, you can focus on the rules (business, english-like rules) of your transformations. Then your mindset gets even more abstract (as it usually happens when you have exposure to functional programming) and you realise that almost every piece of application server code that you've written can be abstracted as a pipeline of transformations in some data that sends messages during the process (aka, side effects).
Even if we use DDD or any OOP design practice, that key principle stands. Then the temptation to model our whole flow (from Rest adapter to DB or Event Stores) as a single Stream is quite strong. Let's provide an example.
Let's say that we have a football league application and we want to check that the day before a match everything is ready to play. Let's see the data in different steps of the pipeline:
Subscribed players
Available players
Team ....
In 1.a we want to send a reminder email to the guys that have marked themselves previously as available and in 2.a we want to store that team in some DB. Those behaviours could and should live in different components, but conceptually that flow is the same stream. We want to use the data to execute the side effects but not consume it, as we want to keep using the stream.
peek
method.peek
is a method in the Java 8 Stream API that allows you to use but not consume the data of an stream. We might think that it's a good idea to use peek
to send messages to EmailAdapter and TeamRepository. The problem with peek
is that it's only executed whenever we do actually consume the stream. The Javadoc provides some clues about that:
This method exists mainly to support debugging additionally performing the provided action on each element as elements are consumed from the resulting stream.
As elements are consumed is the key, but it's not obvious the first time you read it. In the provided snippet we can see how peek
is executed only when a final operation (forEach
in this case) is called:
public static void main(String[] args) {
IntStream stream = createAStreamAndPerformSomeSideEffectWithPeek();
System.out.println("Second. I should be the second group of prints");
consumeTheStream(stream);
}
private static IntStream createAStreamAndPerformSomeSideEffectWithPeek() {
return IntStream.of(1, 2, 3)
.peek(number -> System.out.println(String.format("First. My number is %d", number)))
.map(number -> number + 1);
}
private static void consumeTheStream(IntStream stream) {
stream.filter(number -> number % 2 == 0)
.forEach(number -> System.out.println(String.format("Third. My number is %d", number)));
}
We might expect an output like this:
When in fact we'll have:
A clever reader could state that the problem was the inclusion of some non-stream operation in the middle of a lazy pipeline. That could be true. For instance when you work in asynchronous code like promises in Node.js is quite important not to mix sequential code with the async chain without a lot of a thought.
Non-immediate execution using threads, RxJava, streams or any "lazy" construct could be confusing for a Java developer used to sequential flows. This is a really silly example using Java 8 Optional API that confused me the first time that I saw it:
return fetchTrackBy(device).orElse(trackRepository.saveTrackWith(metadata));
fetchTrackBy
returns an Optional so that code could be thought as syntactic sugar. It would unroll to this:
Track track = fetchTrackBy(device);
if (track.isPresent()) {
return track.get();
}
return trackRepository.saveTrackWith(metadata);
In fact, there is a bug in the first implementation. Did you spot it? Yes, we're saving the track even if we found it in first place. We'd fix it with this implementation:
return fetchTrackBy(device).orElseGet(() -> trackRepository.saveTrackWith(metadata));
As you can see we're providing a lambda (an implementation of the Supplier
functional interface) that will be evaluated lazily if the Optional is not present.
Coming back to Streams, reading the Javadoc seems to be quite useful for our concerns. About side effects:
Side-effects in behavioral parameters to stream operations are, in general, discouraged, as they can often lead to unwitting violations of the statelessness requirement, as well as other thread-safety hazards.
We can see an example of a stateful lambda here:
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })
If the behavioral parameters do have side-effects, unless explicitly stated, there are no guarantees as to the visibility of those side-effects to other threads, nor are there any guarantees that different operations on the "same" element within the same stream pipeline are executed in the same thread
This is really important to note, but we could say that for our example is not relevant as we want to send messages to external processes with their own concurrency mechanisms. Something more like:
However, side-effects such as using println() for debugging purposes are usually harmless.
There are another problems about using Stream as backbone of our flows. Once a Stream is consumed it's not possible to reuse it again. That means that storing a Stream as a field of some of our domain objects feels like a dangerous idea, as that domain object will be useless once that field is consumed. An idea to mitigate that problem is wrapping that Stream in a Supplier, generating a new stream every time that we request access to that field. This is similar to an Iterable
, which creates a new Iterator
each time the iterator()
method is called.
I would discourage passing around streams in teams with little experience with Java 8. If we decide to apply side effects during the pipeline, I would avoid any side effect that implies some kind of locking or coordination between threads. That said, I would recommend experimenting with these ideas in your spare time, as I found that the best way of learning is by applying the core ideas of Java 8 Streams:
Software es nuestra pasión.
Somos Software Craftspeople. Construimos software bien elaborado para nuestros clientes, ayudamos a los/as desarrolladores/as a mejorar en su oficio a través de la formación, la orientación y la tutoría. Ayudamos a las empresas a mejorar en la distribución de software.