diff --git a/src/main/java/ua/net/uid/utils/concurrent/Promise.java b/src/main/java/ua/net/uid/utils/concurrent/Promise.java new file mode 100644 index 0000000..556c5f3 --- /dev/null +++ b/src/main/java/ua/net/uid/utils/concurrent/Promise.java @@ -0,0 +1,373 @@ +/* + * Copyright 2020 nightfall. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ua.net.uid.utils.concurrent; + +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; +import ua.net.uid.utils.Function; + +/** + * !WIP! not for use + * + * @author nightfall + * @param + */ +public class Promise { + //////////////////////////////////////////////////////////////////////////// + public enum State { PENDING, FULFILLED, REJECTED } + //////////////////////////////////////////////////////////////////////////// + public interface Resolver { + void accept(V value); + void reject(Throwable error); + } + public interface Performer { + void apply(Resolver resolver) throws Throwable; + } + //////////////////////////////////////////////////////////////////////////// + public static Promise resolve(V value) { + return new Promise<>(value, 0); + } + public static Promise reject(Throwable error) { + return new Promise<>(error, false); + } + //////////////////////////////////////////////////////////////////////////// + /* + public static Promise> all(Promise ... promises) { + return all(Arrays.asList(promises)); + } + public static Promise> all(Iterable> promises) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + public static Promise> allSettled(Promise ... promises) { + return allSettled(Arrays.asList(promises)); + } + public static Promise> allSettled(Iterable> promises) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + public static Promise> any(Promise ... promises) { + return any(Arrays.asList(promises)); + } + public static Promise> any(Iterable> promises) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + + public static Promise> race(Promise ... promises) { + return race(Arrays.asList(promises)); + } + public static Promise> race(Iterable> promises) { + throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. + } + */ + //////////////////////////////////////////////////////////////////////////// + private volatile Internal internal; + //////////////////////////////////////////////////////////////////////////// + private Promise(V value, int any) { + internal = new Fulfilled(value); + } + private Promise(Throwable error, boolean any) { + internal = new Rejected(error); + } + + public Promise(Callable task) { + this((resolver) -> { + try { + resolver.accept(task.call()); + } catch (Throwable ex) { + resolver.reject(ex); + } + }); + } + public Promise(Performer performer) { + internal = new Pending(performer); + internal.run(); + } + //////////////////////////////////////////////////////////////////////////// + public State getState() { + return internal.getState(); + } + public V getValue() { + internal.sync(); + return internal.getValue(); + } + public Throwable getError() { + internal.sync(); + return internal.getError(); + } + public Promise sync() { + internal.sync(); + return this; + } + public Promise then(Function callback) { + return internal.then(callback); + } + public Promise then(U value) { + return internal.then(value); + } + public Promise except(Function callback) { + return internal.except(callback); + } + public Promise except(U value) { + return internal.except(value); + } + public Promise anyway(Function callback) { + return internal.anyway(callback); + } + public Promise anyway(U value) { + return internal.anyway(value); + } + private void run() { + internal.run(); + } + //////////////////////////////////////////////////////////////////////////// + private interface Internal { + State getState(); + V getValue(); + Throwable getError(); + void run(); + void sync(); + Promise then(Function callback); + Promise then(U value); + Promise except(Function callback); + Promise except(U value); + Promise anyway(Function callback); + Promise anyway(U value); + } + //////////////////////////////////////////////////////////////////////////// + private final class Fulfilled implements Internal { + private final V value; + public Fulfilled(V value) { this.value = value; } + @Override + public State getState() { return State.FULFILLED; } + @Override + public V getValue() { return value; } + @Override + public Throwable getError() { return null; } + @Override + public void run() {} + @Override + public void sync() {} + + @Override + public Promise then(Function callback) { + return new Promise<>((resolver) -> { + try { + resolver.accept(callback.call(value)); + } catch (Throwable ex) { + resolver.reject(ex); + } + }); + } + @Override + public Promise then(U value) { + return Promise.resolve(value); + } + @Override + public Promise except(Function callback) { + return (Promise) Promise.this; + } + @Override + public Promise except(U value) { + return (Promise) Promise.this; + } + @Override + public Promise anyway(Function callback) { + return new Promise<>((resolver) -> { + try { + resolver.accept(callback.call(value)); + } catch (Throwable ex) { + resolver.reject(ex); + } + }); + } + @Override + public Promise anyway(U value) { + return Promise.resolve(value); + } + } + //////////////////////////////////////////////////////////////////////////// + private final class Rejected implements Internal { + private final Throwable error; + public Rejected(Throwable error) { this.error = error; } + @Override + public State getState() { return State.REJECTED; } + @Override + public V getValue() { return null; } + @Override + public Throwable getError() { return error; } + @Override + public void run() {} + @Override + public void sync() {} + + @Override + public Promise then(Function callback) { + return (Promise) Promise.this; + } + @Override + public Promise then(U value) { + return (Promise) Promise.this; + } + @Override + public Promise except(Function callback) { + return new Promise<>((resolver) -> { + try { + resolver.accept(callback.call(error)); + } catch (Throwable ex) { + resolver.reject(ex); + } + }); + } + @Override + public Promise except(U value) { + return Promise.resolve(value); + } + @Override + public Promise anyway(Function callback) { + return new Promise<>((resolver) -> { + try { + resolver.accept(callback.call(error)); + } catch (Throwable ex) { + resolver.reject(ex); + } + }); + } + @Override + public Promise anyway(U value) { + return Promise.resolve(value); + } + } + //////////////////////////////////////////////////////////////////////////// + private final class Pending implements Internal, Resolver { + private final ForkJoinTask task; + private final Queue onFulfill = new ConcurrentLinkedQueue<>(); + private final Queue onReject = new ConcurrentLinkedQueue<>(); + private Object value = null; + public Pending(Performer performer) { + task = ForkJoinTask.adapt(() -> { + try { + performer.apply(Pending.this); + } catch (Throwable error) { + Pending.this.reject(error); + } + }); + } + @Override + public State getState() { return State.PENDING; } + @Override + public V getValue() { throw new UnsupportedOperationException(); } + @Override + public Throwable getError() { throw new UnsupportedOperationException(); } + @Override + public void run() { + ForkJoinPool.commonPool().submit(task); + } + @Override + public void sync() { + try { + task.get(); + } catch (InterruptedException | ExecutionException ex) { + reject(ex); + } + } + @Override + public Promise then(Function callback) { + Promise promise = new Promise<>((resolver) -> { + try { + resolver.accept(callback.call((V) Pending.this.value)); + } catch (Throwable ex) { + resolver.reject(ex); + } + }); + onFulfill.add(promise); + return promise; + } + @Override + public Promise then(U value) { + Promise promise = Promise.resolve(value); + onFulfill.add(promise); + return promise; + } + @Override + public Promise except(Function callback) { + Promise promise = new Promise<>((resolver) -> { + try { + resolver.accept(callback.call((Throwable) Pending.this.value)); + } catch (Throwable ex) { + resolver.reject(ex); + } + }); + onReject.add(promise); + return promise; + } + @Override + public Promise except(U value) { + Promise promise = Promise.resolve(value); + onReject.add(promise); + return promise; + } + @Override + public Promise anyway(Function callback) { + Promise promise = new Promise<>((resolver) -> { + try { + resolver.accept(callback.call(Pending.this.value)); + } catch (Throwable ex) { + resolver.reject(ex); + } + }); + onFulfill.add(promise); + onReject.add(promise); + return promise; + } + @Override + public Promise anyway(U value) { + Promise promise = Promise.resolve(value); + onFulfill.add(promise); + onReject.add(promise); + return promise; + } + @Override + public void accept(V value) { + internal = new Fulfilled(value); + this.value = value; + if (!onFulfill.isEmpty()) { + ForkJoinPool.commonPool().execute(() -> { + do { + onFulfill.peek().run(); + } while (!onFulfill.isEmpty()); + }); + } + } + @Override + public void reject(Throwable error) { + internal = new Rejected(error); + this.value = error; + if (!onReject.isEmpty()) { + ForkJoinPool.commonPool().execute(() -> { + do { + onReject.peek().run(); + } while (!onReject.isEmpty()); + }); + } + } + } + //////////////////////////////////////////////////////////////////////////// +} diff --git a/src/test/java/ua/net/uid/utils/concurrent/PromiseTest.java b/src/test/java/ua/net/uid/utils/concurrent/PromiseTest.java new file mode 100644 index 0000000..640bfdb --- /dev/null +++ b/src/test/java/ua/net/uid/utils/concurrent/PromiseTest.java @@ -0,0 +1,82 @@ +/* + * Copyright 2020 nightfall. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package ua.net.uid.utils.concurrent; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +/** + * + * @author nightfall + */ +public class PromiseTest { + + public PromiseTest() { + } + + @Test + public void testResolveState() { + Promise promise = Promise.resolve(123); + assertSame(Promise.State.FULFILLED, promise.getState()); + + assertEquals(Integer.valueOf(123), promise.getValue()); + assertNull(promise.getError()); + } + + @Test + public void testRejectState() { + final Throwable error = new UnsupportedOperationException("test exception #1"); + Promise promise = Promise.reject(error); + assertSame(Promise.State.REJECTED, promise.getState()); + + assertNull(promise.getValue()); + assertEquals(error, promise.getError()); + } + + @Test + public void testPendingState() { + Exception error = new UnsupportedOperationException("test throw"); + + Promise promise1 = new Promise<>(() -> { + Thread.sleep(1); + return 213; + }); + Promise promise2 = new Promise<>(() -> { + Thread.sleep(1); + throw error; + }); + + assertSame(Promise.State.PENDING, promise1.getState()); + assertSame(Promise.State.PENDING, promise2.getState()); + + assertEquals(Integer.valueOf(213), promise1.getValue()); + assertNull(promise1.getError()); + assertSame(Promise.State.FULFILLED, promise1.getState()); + + assertNull(promise2.getValue()); + assertEquals(error, promise2.getError()); + assertSame(Promise.State.REJECTED, promise2.getState()); + } + + + + + + + + + +}