Commit fb0d18f5 authored by pysaumont's avatar pysaumont
Browse files

Chapter 14 revised

parent d7e59945
......@@ -225,7 +225,7 @@ project(':fpinjava-io-solutions') {
}
}
project(':fpinjava-actors-exercises') {
project(':fpinjava-actors') {
dependencies {
compile project(':fpinjava-common')
......@@ -233,23 +233,7 @@ project(':fpinjava-actors-exercises') {
}
}
project(':fpinjava-actors-solutions') {
dependencies {
compile project(':fpinjava-common')
testCompile group: 'junit', name: 'junit', version: '4.+'
}
}
project(':fpinjava-applications-exercises') {
dependencies {
compile project(':fpinjava-common')
testCompile group: 'junit', name: 'junit', version: '4.+'
}
}
project(':fpinjava-applications-solutions') {
project(':fpinjava-applications') {
dependencies {
compile project(':fpinjava-common')
......
package com.fpinjava.actor;
import org.junit.Test;
public class ActorTest {
@Test
public void test() {}
}
package com.fpinjava.actors.listing14_01;
import com.fpinjava.actors.listing14_02.ActorContext;
import com.fpinjava.common.Result;
public interface Actor<T> {
static <T> Result<Actor<T>> noSender() {
return Result.empty();
}
Result<Actor<T>> self();
ActorContext<T> getContext();
default void tell(T message) {
tell(message, self());
}
void tell(T message, Result<Actor<T>> sender);
void shutdown();
default void tell(T message, Actor<T> sender) {
tell(message, Result.of(sender));
}
enum Type {SERIAL, PARALLEL} // <7>
}
package com.fpinjava.actors.listing14_02;
public interface ActorContext<T> {
void become(MessageProcessor<T> behavior);
MessageProcessor<T> getBehavior();
}
package com.fpinjava.actors.listing14_02;
import com.fpinjava.actors.listing14_01.Actor;
import com.fpinjava.common.Result;
public interface MessageProcessor<T> {
void process(T t, Result<Actor<T>> sender);
}
package com.fpinjava.actors.listing14_03;
import com.fpinjava.actors.DaemonThreadFactory;
import com.fpinjava.actors.listing14_01.Actor;
import com.fpinjava.actors.listing14_02.ActorContext;
import com.fpinjava.actors.listing14_02.MessageProcessor;
import com.fpinjava.common.Result;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
public abstract class AbstractActor<T> implements Actor<T> {
private final ActorContext<T> context;
protected final String id;
private final ExecutorService executor;
public AbstractActor(String id, Actor.Type type) {
super();
this.id = id;
this.executor = type == Type.SERIAL
? Executors.newSingleThreadExecutor(new DaemonThreadFactory())
: Executors.newCachedThreadPool(new DaemonThreadFactory());
this.context = new ActorContext<T>() {
private MessageProcessor<T> behavior =
AbstractActor.this::onReceive;
@Override
public synchronized void become(MessageProcessor<T> behavior) {
this.behavior = behavior;
}
@Override
public MessageProcessor<T> getBehavior() {
return behavior;
}
};
}
public abstract void onReceive(T message, Result<Actor<T>> sender);
public Result<Actor<T>> self() {
return Result.success(this);
}
public ActorContext<T> getContext() {
return this.context;
}
@Override
public void shutdown() {
this.executor.shutdown();
}
public synchronized void tell(final T message, Result<Actor<T>> sender) {
executor.execute(() -> {
try {
context.getBehavior().process(message, sender);
} catch (RejectedExecutionException e) {
/*
* This is probably normal and means all pending tasks
* were canceled because the actor was stopped.
*/
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
package com.fpinjava.actors.listing14_04;
import com.fpinjava.actors.listing14_01.Actor;
import com.fpinjava.actors.listing14_03.AbstractActor;
import com.fpinjava.common.Result;
public class PingPong {
static class Player extends AbstractActor<Integer> {
private final String sound;
private final Actor<Integer> referee;
public Player(String id, String sound, Actor<Integer> referee) {
super(id, Actor.Type.SERIAL);
this.referee = referee;
this.sound = sound;
}
@Override
public void onReceive(Integer message, Result<Actor<Integer>> sender) {
System.out.println(sound + " - " + message);
if (message >= 10) {
referee.tell(message, sender);
} else {
sender.forEachOrFail(actor -> actor.tell(message + 1, self()))
.forEach(ignore -> referee.tell(message, sender));
}
}
}
}
package com.fpinjava.actors.listing14_05;
import com.fpinjava.actors.DaemonThreadFactory;
import com.fpinjava.common.Result;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
public abstract class AbstractActor<T> implements Actor<T> {
private final ActorContext<T> context;
protected final String id;
private final ExecutorService executor;
public AbstractActor(String id, Type type) {
super();
this.id = id;
this.executor = type == Type.SERIAL
? Executors.newSingleThreadExecutor(new DaemonThreadFactory())
: Executors.newCachedThreadPool(new DaemonThreadFactory());
this.context = new ActorContext<T>() {
private MessageProcessor<T> behavior =
AbstractActor.this::onReceive;
@Override
public synchronized void become(MessageProcessor<T> behavior) {
this.behavior = behavior;
}
@Override
public MessageProcessor<T> getBehavior() {
return behavior;
}
};
}
public abstract void onReceive(T message, Result<Actor<T>> sender);
public Result<Actor<T>> self() {
return Result.success(this);
}
public ActorContext<T> getContext() {
return this.context;
}
@Override
public void shutdown() {
this.executor.shutdown();
}
public synchronized void tell(final T message, Result<Actor<T>> sender) {
executor.execute(() -> {
try {
context.getBehavior().process(message, sender);
} catch (RejectedExecutionException e) {
/*
* This is probably normal and means all pending tasks
* were canceled because the actor was stopped.
*/
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}
package com.fpinjava.actors.listing14_05;
import com.fpinjava.common.Result;
public interface Actor<T> {
static <T> Result<Actor<T>> noSender() {
return Result.empty();
}
Result<Actor<T>> self();
ActorContext<T> getContext();
default void tell(T message) {
tell(message, self());
}
void tell(T message, Result<Actor<T>> sender);
void shutdown();
default void tell(T message, Actor<T> sender) {
tell(message, Result.of(sender));
}
enum Type {SERIAL, PARALLEL} // <7>
}
package com.fpinjava.actors.listing14_05;
public interface ActorContext<T> {
void become(MessageProcessor<T> behavior);
MessageProcessor<T> getBehavior();
}
package com.fpinjava.actors.listing14_05;
import com.fpinjava.common.Result;
public interface MessageProcessor<T> {
void process(T t, Result<Actor<T>> sender);
}
package com.fpinjava.actors.listing14_05;
import com.fpinjava.common.Result;
import java.util.concurrent.Semaphore;
public class PingPong {
private static final Semaphore semaphore = new Semaphore(1); // <1>
public static void main(String... args) throws InterruptedException {
Actor<Integer> referee =
new AbstractActor<Integer>("Referee", Actor.Type.SERIAL) {
@Override
public void onReceive(Integer message, Result<Actor<Integer>> sender) {
System.out.println("Game ended after " + message + " shots");
semaphore.release();
}
};
Actor<Integer> player1 = new Player("Player1", "Ping", referee);
Actor<Integer> player2 = new Player("Player2", "Pong", referee);
semaphore.acquire();
player1.tell(1, Result.success(player2));
semaphore.acquire();
}
static class Player extends AbstractActor<Integer> {
private final String sound;
private final Actor<Integer> referee;
public Player(String id, String sound, Actor<Integer> referee) {
super(id, Actor.Type.SERIAL);
this.referee = referee;
this.sound = sound;
}
@Override
public void onReceive(Integer message, Result<Actor<Integer>> sender) {
System.out.println(sound + " - " + message);
if (message >= 10) {
referee.tell(message, sender);
} else {
sender.forEachOrFail(actor -> actor.tell(message + 1, self()))
.forEach(ignore -> referee.tell(message, sender));
}
}
}
}
package com.fpinjava.actors.listing14_06;
import com.fpinjava.actors.listing14_05.AbstractActor;
import com.fpinjava.actors.listing14_05.Actor;
import com.fpinjava.common.Result;
import com.fpinjava.common.TailCall;
public class Worker extends AbstractActor<Integer> {
public Worker(String id, Type type) {
super(id, type);
}
@Override
public void onReceive(Integer message, Result<Actor<Integer>> sender) {
sender.forEach(a -> a.tell(fibo(message), self()));
}
private static int fibo(int number) {
return fibo_(0, 1, number).eval();
}
private static TailCall<Integer> fibo_(int acc1, int acc2, int x) {
if (x == 0) {
return TailCall.ret(1);
} else if (x == 1) {
return TailCall.ret(acc1 + acc2);
} else {
return TailCall.sus(() -> fibo_(acc2, acc1 + acc2, x - 1));
}
}
}
package com.fpinjava.actors.listing14_07_08_09;
import com.fpinjava.actors.listing14_05.AbstractActor;
import com.fpinjava.actors.listing14_05.Actor;
import com.fpinjava.actors.listing14_05.MessageProcessor;
import com.fpinjava.actors.listing14_06.Worker;
import com.fpinjava.common.*;
public class Manager extends AbstractActor<Integer> {
private final Actor<Result<List<Integer>>> client;
private final int workers;
private final List<Tuple<Integer, Integer>> initial;
private final List<Integer> workList;
private final List<Integer> resultList;
private final Function<Manager, Function<Behavior,
Effect<Integer>>> managerFunction;
public Manager(String id, List<Integer> list,
Actor<Result<List<Integer>>> client, int workers) {
super(id, Type.SERIAL);
this.client = client;
this.workers = workers;
Tuple<List<Integer>, List<Integer>> splitLists =
list.splitAt(this.workers);
this.initial = splitLists._1.zipWithPosition();
this.workList = splitLists._2;
this.resultList = List.list();
managerFunction = manager -> behavior -> i -> {
List<Integer> result = behavior.resultList.cons(i);
if (result.length() == list.length()) {
this.client.tell(Result.success(result.reverse()));
} else {
manager.getContext()
.become(new Behavior(behavior.workList
.tailOption()
.getOrElse(List.list()), result));
}
};
}
public void start() {
onReceive(0, self());
initial.sequence(this::initWorker)
.forEachOrFail(this::initWorkers)
.forEach(this::tellClientEmptyResult);
}
private Result<Executable> initWorker(Tuple<Integer, Integer> t) {
return Result.success(() ->
new Worker("Worker " + t._2, Type.SERIAL).tell(t._1, self()));
}
private void initWorkers(List<Executable> lst) {
lst.forEach(Executable::exec);
}
private void tellClientEmptyResult(String string) {
client.tell(Result.failure(string + " caused by empty input list."));
}
@Override
public void onReceive(Integer message, Result<Actor<Integer>> sender) {
getContext().become(new Behavior(workList, resultList));
}
class Behavior implements MessageProcessor<Integer> {
private final List<Integer> workList;
private final List<Integer> resultList;
private Behavior(List<Integer> workList, List<Integer> resultList) {
this.workList = workList;
this.resultList = resultList;
}
@Override
public void process(Integer i, Result<Actor<Integer>> sender) {
managerFunction.apply(Manager.this).apply(Behavior.this).apply(i);
sender.forEach(a -> workList.headOption().forEachOrFail(x -> a.tell(x, self()))
.forEach(x -> a.shutdown()));
}
}
}
package com.fpinjava.actors.listing14_10;
import com.fpinjava.actors.DaemonThreadFactory;
import com.fpinjava.common.Result;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
public abstract class AbstractActor<T> implements Actor<T> {
private final ActorContext<T> context;
protected final String id;
private final ExecutorService executor;
public AbstractActor(String id, Type type) {
super();
this.id = id;
this.executor = type == Type.SERIAL
? Executors.newSingleThreadExecutor(new DaemonThreadFactory())
: Executors.newCachedThreadPool(new DaemonThreadFactory());
this.context = new ActorContext<T>() {
private MessageProcessor<T> behavior =
AbstractActor.this::onReceive;
@Override
public synchronized void become(MessageProcessor<T> behavior) {
this.behavior = behavior;
}
@Override
public MessageProcessor<T> getBehavior() {
return behavior;
}
};
}
public abstract void onReceive(T message, Result<Actor<T>> sender);