C, PHP, VB, .NET

Дневникът на Филип Петров


* Монади с Java 8 Stream

Публикувано на 16 март 2015 в раздел ПИК3 Java.

Във функционалното програмиране понятието "монада" означава изчисление, което се провежда в поредица от стъпки. Това може да са вержини операции или поредица от операции създадена чрез генератори на функции (примери и за двете показахме по-рано). В тази статия ще се фокусираме върху едно от сериозните нововъведения в Java 8 - клас Stream, чрез който могат да се реализират монади върху списъци и масиви. Още в началото трябва да се отбележи, че под "Stream" нямаме предвид познатите InputStream и OutputStream, а един изцяло нов клас, който се помещава в пакета java.util.stream.

Stream работи поредица от елементи, върху които могат да се прилагат два вида верижни операции:

  • Междинни (intermediate) - извиква се метод от класа Stream, който извършва поредица от действия върху текущия поток от елементи и връща като резултат отново обект от тип Stream. Наричаме ги междинни, защото върху тях можем да продължим да извикваме нов метод от класа Stream;
  • Крайни (terminal) - последния метод във веригата от действия, която дава краен резултат, т.е. извикване на метод на обект от клас Stream, който или е void, или връща нещо различно от обект от тип Stream.

Почти всички методи в клас Stream приемат като параметър ламбда израз, т.е. те работят с функционални интерфейси. Класификацията на методите с оглед на въздействието им върху останалите елементи в потока, с които работят, е следната:

  • Независещи от останалите елементи в потока (stateless) методи извършват операции върху текущите елементи без да се интересуват от останалите елементи в потока. Например метод map (променя текущия елемент по определени от подадения ламбда израз правила) извършва операции, които не зависят от останалите елементи в потока;
  • Зависещи от останалите елементи в потока (stateful) методи зависят от всички елементи в дадения поток. Например sorted е stateful - трябват ви всички елементи на потока, за да може да го сортирате.
  • Прекъсващи преждевременно (short-circuit) методи са тези, които спират изпълнението си при изпълняване на определено условие, без да са били приложени върху елементите от целия поток, а само върху част от тях. Така например метод limit (взима само определен брой елементи от потока) ще прекъсне при агрегирането на определения му брой (и ще върне нов поток само с агрегираните елементи), дори потока да е безкраен.

Има и още една важна класификация, свързана с операциите, които извършваме вътре в ламбда изразите, които подаваме като параметри на методите:

  • Непроменящи (non interfering) са тези операции, които не променят оригиналните данни, от които произхожда потока;
  • Променящи (interfering) са операции, които модифицират оригиналните данни;

В документацията на Oracle е изрично споменато, че ламбда изразите НЕ трябва да променят оригиналните данни, от които произхожда потока, Причината за това е, че обектите от клас Stream не работят със свои собствени копия на елементите, а директно с елементите от списъка, на базата на който е генериран потока, т.е. методите на потоците работят със затваряния (closures) на оригиналните данни. В една многонишкова среда това би давало напълно непредвидими резултати. Но дори да не правим многонишкова програма, промяна на елементите пак може да доведе до непредвиден резултат защото потоците са "lazy evaluated", т.е. нищо не се върши във верижните операции преди крайната (terminal) операция да бъде достигната. Поради тази причина е препоръчително да работите с копия на данните, а не с оригиналните данни. С други думи - дори обектите, с които работите, да са mutable, вие трябва да се отнасяте с тях както бихте се отнасяли към immutable такива.

На този етап може би най-често ще използвате потоците върху списъци (List и други Collections). Всеки списък в Java 8 има метод "stream()" и "parralelStream()", които връщат обект от тип Stream. Нека дадем нашия първи пример:

import java.util.stream.Stream;
import java.util.List;
import java.util.LinkedList;

public class StreamExample{
  public static void main(String[] args){
    List<Employee> list = new LinkedList<>();
    try{
      list.add(new Employee("Petar", 1200, 23));
      list.add(new Employee("Ivaylo", 900, 20));
      list.add(new Employee("Maria", 950, 31));
      list.add(new Employee("Ivan", 1000, 32));
    }
    catch(Exception e){ return; }
    
    list.stream()
      .filter(e -> e.getSalary() >= 1000)
      .sorted((e1, e2) -> e1.getName().compareTo(e2.getName()))
      .forEach(e -> System.out.println(e.name + " has " + e.getSalary()));
  }
}

class Employee{
  String name;
  private int salary;
  private int age;
  public Employee(String name, int salary, int age) throws Exception{
    this.name = name;
    this.setSalary(salary);
    this.setAge(age);
  }
  int getSalary(){ return this.salary; }
  int getAge(){ return this.age; }
  void setSalary(int salary) throws Exception{
    if(salary < 340){
      throw new Exception("Salary lower than the lowest");
    }
    this.salary = salary;
  }
  void setAge(int age) throws Exception{
    if(age < 16){
      throw new Exception("Too young for employee");
    }
    this.age = age;
  }
  public boolean equals(Object o){
    if(!(o instanceof Employee)) return false;
    Employee e = (Employee)o;
    return this.name.equals(e.getName()) &&
      this.salary == e.getSalary() &&
      this.age == e.getAge();
  }
  public int hashCode(){
    int result = 17;
    result = result*37 + this.name.hashCode();
    result = result*37 + this.age;
    result = result*37 + this.salary;
    return result;
  }
}

В горния пример методите filter() и sorted() са междинни, а forEach() е краен. Виждате, че първо извикваме метод stream() от списъка - той връща обект от тип Stream. На този обект извикваме метод "filter", който приема за входен параметър предикат. Резултатът от метод filter е нов обект от тип Stream, чиито елементи отговарят на условието на предиката. На този обект извикваме метод sorted, който приема за параметър обект от тип Comparator. Този метод сортира елементите на потока и връща нов обект от тип Stream. Финално крайния метод forEach извършва операция "отпечатване на екрана на име и заплата" на всеки елемент от така получения поток.

Нека разгледаме първо основните методи, които осъществяват междинни операции върху потоци с обекти. Всички методи връщат нов обект от тип Stream:

  • distinct() - премахва дублиращите се елементи от потока. За целта използва метод "equals" на елементите от него. За да работи тази операция, обектите върху които работите трябва да имат дефинирани както метод equals, така и метод hashcode (казвам го специално, защото не е споменато в документацията на Oracle)! Методът е stateful;
  • filter(Predicate<T>) - премахва елементи от потока, които не отговарят на условието на подадения предикат. Методът е stateless;
  • flatMap(Function<T, Stream>) - този метод ни позволява да трансформираме един поток в друг. Функцията приема елемент от текущия Stream и на базата на него генерира нов поток - той сам по себе си може да има 0, 1 или колкото си искаме на брой елементи. Потоците генерирани от всички обекти накрая се обединяват в един общ поток.. Именно така ние трансформираме съществуващия поток в друг поток (който може да притежава не само различен брой елементи, но и различен тип елементи). Методът е stateless;
  • limit(long) - генерира нов поток с толкова на брой елементи от текущия, колкото са указани от входния параметър на метода. Този метод се използва често при работа с безкрайни потоци (да, потоци с безкраен на брой елементи - звучи странно на този етап, но ще дадем пример в следваща статия). Методът е stateful и short-circuit;
  • map(Function<T, R>) - по простия вариант на flatMap. При map ние просто трансформираме елементите на текущия поток от един тип в друг. Броят на елементите в потока се запазва (с което map може да се каже, че е по-прост и с по-малко възможности от flatMap). Методът е stateless;
  • peek(Consumer<T>) - връща Stream, който притежава всички елементи на текущия ("изконсумирани са от подадения Consumer, който сам по себе си връща Stream), като допълнително са извършени определените от подадения ламбда израз операции. Това е опростен вариант на map (на елемент от текущия поток реално съпоставя елемент от същия тип, докато при map типа може да се смени) и както него отново е stateless;
  • skip(long) - пропуска определен брой елементи от списъка и генерира нов поток с останалите. Методът е stateful;
  • sorted() и sorted(Comparator) - сортира всички обекти в потока. Методът е stateful.

Крайни операции върху потоци с обекти са следните:

  • boolean allMatch(Predicate<T>) - връща true ако всички елементи на потока отговарят на условието на предиката или false ако поне един елемент не отговаря на това уславие. Понеже при първо срещане на false условие метода прекъсва това е short-circuit метод. Все пак той не е подходящ при работа с безкрайни потоци, защото ако всички елементи на безкрайния поток са true, метода няма никога да прекъсне изпълнението си;
  • boolean anyMatch(Predicate<T>) - ще върне true ако поне един елемент отговаря на условието на предиката и false ако нито един не отговаря на него. Методът е short-circuit;
  • <R, A> collect(Collector<T, A, R>) - един от най-важните интерфейси, който се използва за конвертиране на Stream до друг тип множество - например List, Set или Map. Както виждате от дефиницията интерфейсът Collector е доста сложен, но за щастие има редица готови Collector-генератори, които са налични от статични методи в клас Collectors, например Collectors.toList() (конвертира Stream в List). Ще разгледаме примери по-надолу в статията;
  • long count() - връща броя на елементите в потока;
  • Optional<T> findAny() - връща някой елемент от потока. Точният термин е "някой", а не "произволен", защото реално Java ще избере най-удобния за себе си елемент (например този, с който е работила последно и към който пази текуща референция). В този смисъл findAny се използва тогава, когато ни трябва просто някакъв елемент от потока, без значение кой е той. Така печелим производителност спрямо например метод findFirst. Този метод е short-circuit;
  • Optional<T> findFIrst() - връща винаги първия елемент на потока. Този метод е short-circuit;
  • void forEach(Consumer<T>) - вече сме говорили за този метод по друг повод. Извършва подадените действия за всеки елемент от потока;
  • void forEachOrdered(Consumer<T>) - разликата с forEach е, че тук е гарантирана поредността на действията - първо ще се обработи първия елемент, след това втория и т.н. При forEach при паралелни потоци това не е гарантирано;
  • boolean noneMatch(Predicate<T>) - обратния метод на allMatch. Методът е short-circuit;
  • Optional<T> max(Comparator<T>) - намира най-големия елемент от потока в зависимост от подаденото условие;
  • Optional<T> min(Comparator<T>) - намира най-малкия елемент от потока в зависимост от подаденото условие;
  • Optional<T> reduce(BinaryOperator <T>) - комбинира всички елементи на поток в един единствен резултат. Всъщност има три вида reduce методи според входните им параметри - най-простия е указания, иначе е възможно да се променят и типовете на обектите (чрез подаване на BiFunction вместо BinaryOperator). Ще дадем пример по-долу за използване на reduce, при който се генерира нов "агрегиращ" данни от всички елементи на списъка обект - T reduce(T, BinaryOperator<T>). Пълният вариант на reduce включва identify, accumulator и combiner елементи и на този етап няма да даваме пример с него, защото е по-сложен;
  • Object[] toArray() или T[] toArray(T[]::new) - когато искате да превърнете потока в обикновен масив от обекти.

Също така е изключително важно в какъв ред провеждате операциите. Размяната на поредността на два метода в общия случай ще води до различни резултати (като производителност или като краен резултат).

Важно: потоците не могат да се преизползват! Веднъж като извикате крайна операция, потокът е буквално загубен и вече не може да го използвате.

Нека разгледаме няколко задачи свързани с примерния код от началото на статията:

Задача 1. Намерете служителите със заплата по-голяма или равна на 1000 лева. Изведете името и годините им, като сортирате по години и изведете имената им само с главни букви.

Решение: Първо филтрираме само хората, които имат заплата над 1000 - така следващите операции ще се изпълнят върху по-малък брой елементи. След това ги сортираме по години, както е казано в условието. Третия метод съпоставя на всеки елемент от потока (Employee) обект от тип String, който ще е връщания резултат. Накрая с forEach отпечатваме резултата на екрана:

list.stream()
.filter(e -> e.getSalary() >= 1000)
.sorted((e1, e2) -> e1.getAge()>=e2.getAge()?1:-1)
.map(e -> e.name.toUpperCase() + " has " + e.getSalary())
.forEach(System.out::println);

Задача 2. Направете нов списък, който да включва само служителите над 30 години.

Решение: Първо филтрираме служителите с filter, след това използваме collect, за да направим списък:

List<Employee> empsAbove30 = list.stream()
 .filter(e -> e.getAge()>30)
 .collect(java.util.stream.Collectors.toList());

Задача 3. Потърсете дали има служител над 30 години, който получава под 1000 лева.

Решение: Тук няма нужда да се филтрира - направо използваме крайната функция anyMatch:

boolean hasOldUnderpaid = list.stream()
 .anyMatch(e -> e.getAge()>30 && e.getSalary()<1000);

Задача 4. Намерете най-възрастния служител.

Решение: Тук ще използваме метод reduce. Той връща Optional - не сме говорили още за този клас, но неговата употреба е доста интуитивна. Употребата му е предназначена за случаи, когато е възможно да се получи NullPointerException. Няма да се спираме подробно на него, а просто ще демонстрираме един от неговите често употребявани методи - ifPresent - извършва даденото действие ако има резултат (иначе не прави нищо). Именно с ifPresent предотвратяваме появата на NullPointerException ако се опитаме да извършим действие върху обект, който не е инициализиран:

java.util.Optional<Employee> oldest = list.stream()
 .reduce((e1, e2) -> e1.getAge()>e2.getAge()?e1:e2);
 
oldest.ifPresent(e -> System.out.println(e.name + " is the oldest"));

Ето и едно по-добро решение от това - директно използване на метод max:

Optional<Employee> oldest = list.stream()
 .max((e1, e2) -> {
     if(e1.getAge()>e2.getAge()) return 1;
     else if(e1.getAge()<e2.getAge()) return -1;
     else return 0;
 });
 
oldest.ifPresent(e -> System.out.println(e.name + " is the oldest"));

Задача 5. Изведете списък с имената на служителите започващи с буквата "I" разделени със запетая.

Решение: Първо филтрираме, след това на всеки обект Employee съпоставяме String с неговото име и накрая извършваме reduce:

java.util.Optional<String> namesStartingWithI = list.stream()
 .filter(e -> e.name.startsWith("I"))
 .map(e -> e.name)
 .reduce((e1, e2) -> e1 + ", " + e2);
 
namesStartingWithI.ifPresent(System.out::println);

Задача 6. Намерете служителите, които са по-млади от 30 години. Генерирайте нов обект от тип Employee, в който името е конкатенацията от техните имена, разделени със запетая, заплатата е сумата от техните заплати, а годините е средната им възраст.

Решение: Първо ще намерим средното аритметично на годините на служителите, а след това с втори поток ще генерираме нов обект от тип Employee, в който ще акумулираме резултатите. Тук отново ще използваме метод reduce, но този път ще използваме вторият му вариант, който приема два параметъра - identity function (връщания обект) и BinaryOperator (по подобие на вече познатия от предишния пример):

Double averageUnder30Age = list.stream()
 .filter(emp -> emp.getAge()<30)
 .collect(java.util.stream.Collectors.averagingInt(p -> p.getAge()));
 
int averageUnder30AgeRoundedDown = (int) averageUnder30Age.doubleValue();
 
Employee e = null;
try{
  e = list.stream()
    .filter(emp -> emp.getAge()<30)
    // Връщания резултат ще е нов обект от тип Employee
    // По принуда започваме с такъв с 340 заплата и 16 г.
    .reduce(new Employee("", 340, 16), (e1, e2) -> {
       try{
          // ако името на е1 е празно, работим с новия
          // обект, който създадохме. В този случай
          // не акумулираме заплатата му - просто я
          // заменяме с тази на следващия обект е2
          if(e1.name.equals("")){
             e1.name += e2.name;
             e1.setAge(averageUnder30AgeRoundedDown);
             e1.setSalary(e2.getSalary());
          }
          // ако вече сме записали нещо в името, акумулираме
          // новото име и заплата от следващия обект
          else{
             e1.setSalary(e1.getSalary() + e2.getSalary());
             e1.name += ", " + e2.name;
          }
       }
       catch(Exception agesalex){} // заради set методите
       return e1;
    });
}
catch(Exception ex){} // това е за new Employee в reduce
 
System.out.println(e.name + " get " + e.getSalary()
    + " sumed salary and " + e.getAge() + " average age");

В този пример демонстрирахме и още един метод на клас Collectors - averagingInt. Както говори името му, този метод намира средно аритметично от подадени числа.

Задача 7. Нека за всеки служител пазим допълнително списък с езиците за програмиране, които владее. Направете масив с всички езици за програмиране, които нашите програмисти владеят, който да е сортиран по азбучен ред.

Решение: ще дадем пълния код на програмата с всички нейни промени. С flatMap съпоставяме на всеки служител неговия списък с езици за програмиране (от тип String ще са). След това извършваме последователно distinct, за да махнем повторенията и sorted, за да сортираме:

import java.util.stream.Stream;
import java.util.List;
import java.util.LinkedList;

public class StreamExample{
  public static void main(String[] args){
    List<Employee> list = new LinkedList<>();
    try{
      list.add(new Employee("Petar", 1200, 23));
      list.add(new Employee("Ivaylo", 900, 20));
      list.add(new Employee("Maria", 950, 31));
      list.add(new Employee("Ivan", 1000, 32));
      list.add(new Employee("Pencho", 1100, 29));
    }
    catch(Exception e){ return; }
    
    list.get(0).programmingLanguages.add("Java");
    list.get(0).programmingLanguages.add("C++");
    list.get(0).programmingLanguages.add("C#");
    list.get(1).programmingLanguages.add("C++");
    list.get(1).programmingLanguages.add("Python");
    list.get(1).programmingLanguages.add("Java");
    list.get(2).programmingLanguages.add("Perl");
    list.get(3).programmingLanguages.add("Java");
    list.get(3).programmingLanguages.add("Python");
    list.get(3).programmingLanguages.add("Ruby");
    // за get(5) нарочно оставяме да няма нищо
    
    String[] langs = list.stream()
      .flatMap(e -> e.programmingLanguages.stream())
      .distinct()
      .sorted()
      .toArray(String[]::new);
    
    for(String lang: langs) System.out.println(lang);
  }
}

class Employee{
  String name;
  private int salary;
  private int age;
  List<String> programmingLanguages;
  public Employee(String name, int salary, int age) throws Exception{
    this.name = name;
    this.setSalary(salary);
    this.setAge(age);
    programmingLanguages = new LinkedList<String>();
  }
  String getName(){ return this.name; }
  int getSalary(){ return this.salary; }
  int getAge(){ return this.age; }
  void setSalary(int salary) throws Exception{
    if(salary < 340){
      throw new Exception("Salary lower than the lowest");
    }
    this.salary = salary;
  }
  void setAge(int age) throws Exception{
    if(age < 16){
      throw new Exception("Too young for employee");
    }
    this.age = age;
  }
  public boolean equals(Object o){
    if(!(o instanceof Employee)) return false;
    Employee e = (Employee)o;
    return this.name.equals(e.getName()) &&
      this.salary == e.getSalary() &&
      this.age == e.getAge();
  }
  public int hashCode(){
    int result = 17;
    result = result*37 + this.name.hashCode();
    result = result*37 + this.age;
    result = result*37 + this.salary;
    return result;
  }
}

В следваща статия ще покажем как Stream може да работи с примитивни типове данни и какви допълнителни функции е възможно да прилагаме чрез тях.

 



Добави коментар

Адресът на електронната поща няма да се публикува


*