Chapter 5. Parallel Streams
Chp 5.1: Develop the code that use parallel streams
Aggregate operations and parallel streams enable you to implement parallelism with non-thread-safe collections provided that you do not modify the collection while you are operating on it.
Use .parallelStream()
instead of .stream()
or use .stream().parallel()
use .sequential()
method on a stream to convert a parallel stream into a sequential stream
Parallel streams uses the fork/join
framework that was added in Java 7.
When a stream executes in parallel, the Java runtime partitions the stream into multiple substreams.
Aggregate operations iterate over and process these substreams in parallel and then combine the results.
Differences:
- Uses ConcurrentMap instead of Map
- Uses groupingByConcurrent instead of groupingBy
- the operation Collectors.toConcurrentMap performs better with parallel streams than the operation Collectors.toMap.
Ordering: Pipeline methods
The ordering will be totally random here incase of parallel streams.
-
forEachOrdered
-> processes the elements of the stream in the order specified by its source. You will loose the benefits of parallelism.
Interference
- Lambda expressions in stream operations should not interfere. Interference occurs when the source of a stream is modified while a pipeline processes the stream.
Stateful Lambda Expressions
- Avoid using stateful lambda expressions as parameters in stream operations.
- A stateful lambda expression is one whose result depends on any state that might change during the execution of a pipeline
Make sure you use a concurrent version or synchronized version of the Map/List for parallel stream when you add to it. otherwise, you’ll get incorrect results since multiple threads access and modify the collection.
Chap 5.2 Implement decomposition, reduction, in streams
Reduction Operations:
- Reduce to one value or groups of values
- Many terminal operations (such as average, sum, min, max, and count) return one value by combining the contents of a stream.
- Some terminal operations (such as reduce and collect) return a collection by finding the average of values or grouping elements into categories.
Stream.reduce
-
identity
: initial value of the reduction or the default result if no elements -
accumulator
: a partial result of the reduction and the next element of the stream
reduce(BinaryOperator<T> accumulator)
Performs a reduction on the elements of this stream, using an associative accumulation function, and returns an Optional describing the reduced value, if any.
T reduce(T identity, BinaryOperator<T> accumulator)
Performs a reduction on the elements of this stream, using the provided identity value and an associative accumulation function, and returns the reduced value.
U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)
Performs a reduction on the elements of this stream, using the provided identity, accumulation and combining functions.
The reduce operation always returns a new value. The accumulator function also returns a new value every time it processes an element of a stream
Example1:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static void reduce(List<Person> roster) {
Integer totalAge = roster
.stream()
.mapToInt(Person::getAge)
.sum();
System.out.println("TotalAge: " + totalAge);
Integer totalAgeReduce = roster
.stream()
.map(Person::getAge)
.reduce(
0,
(a, b) -> a + b);
System.out.println("TotalAge: " + totalAgeReduce);
}
Example2:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static class Averager implements IntConsumer {
private int total = 0;
private int count = 0;
public double average() {
return count > 0 ? ((double) total) / count : 0;
}
public void accept(int i) {
total += i;
count++;
}
public void combine(Averager other) {
total += other.total;
count += other.count;
}
}
static void collect(List<Person> roster) {
Averager averageCollect = roster.stream()
.map(Person::getAge)
.collect(Averager::new, Averager::accept, Averager::combine);
System.out.println("Average age of all members: " + averageCollect.average());
}
public static void main(String[] args) {
List<Person> persons = Arrays.asList(new Person("Jane", 25, false), new Person("Alice", 28, false),
new Person("Bob", 42, true), new Person("Tina", 19, false));
collect(persons);
}
Output
Average age of all members: 28.5
Stream.collect
Unlike the reduce method, which always creates a new value when it processes an element, the collect method modifies, or mutates, an existing value.
<R,A> R collect(Collector<? super T,A,R> collector)
Performs a mutable reduction operation on the elements of this stream using a Collector.
<R> R collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner)
Performs a mutable reduction operation on the elements of this stream.
The collect operation is best suited for collections.
Example:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
Map<Person.Sex, Integer> totalAgeByGender =
persons
.stream()
.collect(
Collectors.groupingBy(
Person::getGender,
Collectors.reducing(
0,
Person::getAge,
Integer::sum)));
System.out.println("TotalAgeByGender: " + totalAgeByGender);
>> TotalAgeByGender: {FEMALE=72, MALE=42} // for input above.
Know all the above methods very well - the input arguments and the return types.
Next Chapter - Lambda Cookbook