-
Notifications
You must be signed in to change notification settings - Fork 136
Queues explained
SimpleReact Queues are a way of joining two or more JDK 8 Streams. Producing Streams add to the Queue, and Consuming Streams remove data from the Queue. Consuming Streams will block if there is no data.
SimpleReact Queues can be backed by any BlockingQueue implementation (and we also supply a wrapped Non-Blocking Queue via QueueFactories - but special care needs to be taken using this). With unbounded BlockingQueues Producers can limitlessly (until OutOfMemory) add to a Queue. With bounded Blocking Queues Producers will be blocked (using queue.offer() ) until Consumers can remove data.
Stream<String> stream = Stream.of("1", "2", "3");
Queue<String> q = new Queue(new LinkedBlockingQueue());
q.fromStream(stream);
Stream<String> dq = q.stream();
Integer dequeued = q.stream().limit(3).map(it -> Integer.valueOf(it))
.reduce(0, (acc, next) -> acc + next);
In this example we populate a Queue from a finite Stream, and then build another Steam from the Queue. Both Streams are operating in the same thread.
Queues can be very useful when infinite (or large) Producing Streams provide data to infinite (or large) Consuming Streams on separate threads.
Generating an infinite Stream using SimpleReact
Queue<Integer> q = new Queue(new LinkedBlockingQueue());
AtomicInteger count = new AtomicInteger(0);
new SimpleReact().react(() -> q.fromStream(
Stream.generate(() -> count.incrementAndGet())));
In this example, we are simply returning the next number in a sequence, but we could use similar code to read data from file, remote service, or database. Similarly we could populate the Stream on a listening thread triggered by external events (incoming REST Calls, or messages from a message queue).
We can extract a finite set of values from the queue
q.stream().limit(1000)
.peek(it -> System.out.println(it))
.collect(Collectors.toList());
Or build an infinite lazy Stream
SimpleReact.lazy()
.fromStream(q.streamCompletableFutures())
.then(it -> "*" + it).peek(it -> incrementFound())
.peek(it -> System.out.println(it))
.run(Executors.newSingleThreadExecutor());
This will move Stream management unto a single thread (not the current thread) - via .run(Executors.newSingleThreadExecutor()), while SimpleReact will react to the Stream in a multithreaded fashion o with SimpleReact's ExecutorService (by default configured to have parallelism equal to the number of cores).
If a bounded Queue is full it will block until data is extracted by a consumer
AtomicInteger found = new AtomicInteger(0);
Queue<Integer> q = new Queue<>(new LinkedBlockingQueue<>(2));
new SimpleReact().react(() -> 1, ()->2,()->3,()->4)
.then( (it) -> { q.offer(it); return found.incrementAndGet();});
sleep(10);
assertThat(found.get(), is(2));
Removing two items from the Queue will allow SimpleReact to fill it back up again.
q.stream().limit(2).collect(Collectors.toList());
sleep(2);
assertThat(found, is(4));
Queue queue = new Queue();
//**thread 1:** manage input
queue.fromStream(restStream);
//**thread 2:** manage processing stream A
Stream processingStreamA = queue.stream()
.map(entity -> process(entity))
.forEach(processed -> save(processed));
//**thread 3:** manage processing stream B
Stream processingStreamB = queue.stream()
.map(entity -> process(entity))
.forEach(processed -> save(processed));
Queue queue = new Queue();
//thread 1 : manage file stream
fileStream.map(next -> toEntity(next))
.peek(entity-> queue.add(entity));
//thread 2: manage rest stream
queue.fromStream(restStream);
//thread 3: manage processing stream
Stream processingStream = queue.stream()
.map(entity -> process(entity))
.forEach(processed -> save(processed));
oops - my bad