Created
September 18, 2020 12:12
-
-
Save damithadayananda/70c5c3556a9194e67abbf58c6e7bd6f1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main.reactiveAPI; | |
public class Employee { | |
private int id; | |
private String fname; | |
private String sname; | |
private String fullName; | |
public String getFullName() { | |
return fullName; | |
} | |
public void setFullName(String fullName) { | |
this.fullName = fullName; | |
} | |
public String getSname() { | |
return sname; | |
} | |
public void setSname(String sname) { | |
this.sname = sname; | |
} | |
public Employee(String fname, String sname) { | |
this.fname = fname; | |
this.sname = sname; | |
} | |
public int getId() { | |
return id; | |
} | |
public void setId(int id) { | |
this.id = id; | |
} | |
public String getFName() { | |
return fname; | |
} | |
public void setFName(String name) { | |
this.fname = name; | |
} | |
@Override | |
public String toString() { | |
return "Employee{" + | |
"id=" + id + | |
", name='" + fname + '\'' + | |
'}'; | |
} | |
} | |
package main.reactiveAPI; | |
import java.util.concurrent.Flow; | |
import java.util.concurrent.SubmissionPublisher; | |
import java.util.function.Function; | |
public class MyProcessor extends SubmissionPublisher<Employee> implements Flow.Processor<Employee, Employee> { | |
private Flow.Subscription subscription; | |
private Function<Employee, Employee> function; | |
public MyProcessor(Function<Employee, Employee> function){ | |
this.function = function; | |
} | |
@Override | |
public void onSubscribe(Flow.Subscription subscription) { | |
this.subscription = subscription; | |
subscription.request(1); | |
} | |
@Override | |
public void onNext(Employee item) { | |
submit((Employee) function.apply(item)); | |
subscription.request(1); | |
} | |
@Override | |
public void onError(Throwable throwable) { | |
throwable.printStackTrace(); | |
} | |
@Override | |
public void onComplete() { | |
System.out.println("Done"); | |
} | |
} | |
package main.reactiveAPI; | |
import java.util.concurrent.Flow; | |
public class MySubscriber implements Flow.Subscriber<Employee> { | |
private Flow.Subscription subscription; | |
int count =0; | |
@Override | |
public void onSubscribe(Flow.Subscription subscription) { | |
System.out.println("Subscribed for employees"); | |
this.subscription = subscription; | |
this.subscription.request(1); | |
} | |
@Override | |
public void onNext(Employee item) { | |
System.out.println("Processing employee:"+item); | |
count++; | |
this.subscription.request(1); | |
} | |
@Override | |
public void onError(Throwable e) { | |
System.out.println("Error occurred"); | |
e.printStackTrace(); | |
} | |
@Override | |
public void onComplete() { | |
System.out.println("All processing done"); | |
} | |
public int getCounter(){ | |
return count; | |
} | |
} | |
package main.reactiveAPI; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.SubmissionPublisher; | |
public class ReactiveAPIDemo { | |
public ReactiveAPIDemo() { | |
SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>(); | |
MyProcessor transformProcessor = new MyProcessor(s -> { | |
s.setFullName(s.getFName()+" "+s.getSname()); | |
return s; | |
}); | |
MySubscriber subs = new MySubscriber(); | |
publisher.subscribe(transformProcessor); | |
transformProcessor.subscribe(subs); | |
List<Employee> emps = new ArrayList<>(Arrays.asList( | |
new Employee("D","d"), | |
new Employee("A","a"), | |
new Employee("B","b"), | |
new Employee("C","c"), | |
new Employee("E", "e") | |
)); | |
System.out.println("Publishing items"); | |
emps.stream().forEach( | |
i -> publisher.submit(i)); | |
while (emps.size() != subs.getCounter()){ | |
try { | |
Thread.sleep(10); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
publisher.close(); | |
transformProcessor.close(); | |
System.out.println("done"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment