Skip to content
This repository was archived by the owner on Jul 20, 2020. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions API/api/src/main/java/tc/oc/api/message/AbstractMessageService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package tc.oc.api.message;

import java.util.HashSet;
import java.util.Set;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.concurrent.Executor;
import java.util.logging.Logger;
import javax.annotation.Nullable;
import javax.inject.Inject;

import com.google.common.reflect.TypeToken;
import tc.oc.commons.core.exception.ExceptionHandler;
import tc.oc.commons.core.logging.Loggers;
import tc.oc.commons.core.reflect.Methods;
import tc.oc.commons.core.reflect.Types;
import tc.oc.commons.core.util.CachingTypeMap;

public abstract class AbstractMessageService implements MessageService {

protected static class RegisteredHandler<T extends Message> {
final @Nullable MessageListener listener;
final MessageHandler<T> handler;
final @Nullable Executor executor;

private RegisteredHandler(@Nullable MessageListener listener, MessageHandler<T> handler, @Nullable Executor executor) {
this.listener = listener;
this.handler = handler;
this.executor = executor;
}
}

protected Logger logger;
@Inject protected MessageRegistry messageRegistry;
@Inject protected ExceptionHandler exceptionHandler;

protected final Set<RegisteredHandler<?>> handlers = new HashSet<>();
public final CachingTypeMap<Message, RegisteredHandler<?>> handlersByType = CachingTypeMap.create();
protected volatile boolean suspended;

@Inject void init(Loggers loggers) {
logger = loggers.get(getClass());
}

@Override
public <T extends Message> void subscribe(TypeToken<T> messageType, MessageHandler<T> handler, @Nullable Executor executor) {
subscribe(messageType, null, handler, executor);
}

private <T extends Message> void subscribe(TypeToken<T> messageType, @Nullable MessageListener listener, MessageHandler<T> handler, @Nullable Executor executor) {
logger.fine("Subscribing handler " + handler);
synchronized(handlers) {
final RegisteredHandler<T> registered = new RegisteredHandler<>(listener, handler, executor);
handlers.add(registered);
handlersByType.put(messageType, registered);
handlersByType.invalidate();
}
}

private TypeToken<? extends Message> getMessageType(TypeToken decl, Method method) {
if(method.getParameterTypes().length != 1) {
throw new IllegalStateException("Message handler method must take 1 parameter");
}

final TypeToken<Message> base = new TypeToken<Message>(){};

for(Type param : method.getGenericParameterTypes()) {
final TypeToken paramToken = decl.resolveType(param);
Types.assertFullySpecified(paramToken);
if(base.isAssignableFrom(paramToken)) {
messageRegistry.typeName(paramToken.getRawType()); // Verify message type is registered
return paramToken;
}
}

throw new IllegalStateException("Message handler has no message parameter");
}

@Override
public void subscribe(final MessageListener listener, @Nullable Executor executor) {
logger.fine("Subscribing listener " + listener);

final TypeToken<? extends MessageListener> listenerType = TypeToken.of(listener.getClass());
Methods.declaredMethodsInAncestors(listener.getClass()).forEach(method -> {
final MessageListener.HandleMessage annot = method.getAnnotation(MessageListener.HandleMessage.class);
if(annot != null) {
method.setAccessible(true);
final TypeToken<? extends Message> messageType = getMessageType(listenerType, method);

logger.fine(" dispatching " + messageType.getRawType().getSimpleName() + " to method " + method.getName());

MessageHandler handler = new MessageHandler() {
@Override
public void handleDelivery(Message message, TypeToken type) {
try {
method.invoke(listener, message);
} catch(IllegalAccessException e) {
throw new IllegalStateException(e);
} catch(InvocationTargetException e) {
if(e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new IllegalStateException(e);
}
}
}

@Override
public String toString() {
return listener + "." + method.getName();
}
};

subscribe(messageType, listener, handler, executor);
}
});
}

@Override
public void unsubscribe(MessageHandler<?> handler) {
synchronized(handlers) {
handlers.removeIf(registered -> registered.handler == handler);
handlersByType.entries().removeIf(registered -> registered.getValue().handler == handler);
}
}

@Override
public void unsubscribe(MessageListener listener) {
if(listener == null) return;
synchronized(handlers) {
handlers.removeIf(registered -> registered.listener == listener);
handlersByType.entries().removeIf(registered -> registered.getValue().listener == listener);
}
}

protected void dispatchMessage(Message message, TypeToken<? extends Message> type) {
final Collection<RegisteredHandler<?>> matchingHandlers;
synchronized (handlers) {
matchingHandlers = handlersByType.allAssignableFrom(type);
}
if (matchingHandlers.isEmpty()) return;

for (final RegisteredHandler handler : matchingHandlers) {
if (suspended && handler.listener != null &&
!handler.listener.listenWhileSuspended()) continue;

logger.fine("Dispatching " + type.getType() + " to " + handler.handler.getClass());
if (handler.executor == null) {
exceptionHandler.run(() -> handler.handler.handleDelivery(message, type));
} else {
handler.executor.execute(() -> {
synchronized (handlers) {
// Double check from the handler's executor that it is still registered.
// This makes it much less likely to dispatch a message to a handler
// after it unsubs. It should work perfectly if the handler unsubs on
// the same thread it handles messages on.
if (!handlers.contains(handler)) return;
}
exceptionHandler.run(() -> handler.handler.handleDelivery(message, type));
});
}
}
}
}
21 changes: 21 additions & 0 deletions API/api/src/main/java/tc/oc/api/message/LocalMessageService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package tc.oc.api.message;

import com.google.common.reflect.TypeToken;

import javax.inject.Singleton;

@Singleton
public class LocalMessageService extends AbstractMessageService {

@Override
public void bind(Class<? extends Message> type) {}

public void receive(Message message, TypeToken<? extends Message> type) {
dispatchMessage(message, type);
}

public void receive(Message message) {
dispatchMessage(message, TypeToken.of(message.getClass()));
}

}
4 changes: 1 addition & 3 deletions API/api/src/main/java/tc/oc/api/message/MessageHandler.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package tc.oc.api.message;

import com.google.common.reflect.TypeToken;
import tc.oc.api.queue.Delivery;
import tc.oc.api.queue.Metadata;

public interface MessageHandler<T extends Message> {
void handleDelivery(T message, TypeToken<? extends T> type, Metadata properties, Delivery delivery);
void handleDelivery(T message, TypeToken<? extends T> type);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import com.google.common.reflect.TypeToken;

public interface MessageQueue {
public interface MessageService {

/**
* Tell the queue to receive messages of the given type
Expand Down Expand Up @@ -35,4 +35,5 @@ default void subscribe(MessageListener listener) {
void unsubscribe(MessageHandler<?> handler);

void unsubscribe(MessageListener listener);

}
6 changes: 4 additions & 2 deletions API/api/src/main/java/tc/oc/api/message/MessagesManifest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ public class MessagesManifest extends HybridManifest {
public void configure() {
bindAndExpose(MessageRegistry.class);

publicBinder().forOptional(MessageQueue.class)
.setDefault().to(NullMessageQueue.class);
publicBinder().forOptional(MessageService.class)
.setDefault().to(LocalMessageService.class);

bindAndExpose(LocalMessageService.class);

final MessageBinder messages = new MessageBinder(publicBinder());

Expand Down
24 changes: 0 additions & 24 deletions API/api/src/main/java/tc/oc/api/message/NullMessageQueue.java

This file was deleted.

5 changes: 3 additions & 2 deletions API/api/src/main/java/tc/oc/api/model/ModelStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import tc.oc.api.docs.virtual.Model;
import tc.oc.api.document.DocumentGenerator;
import tc.oc.api.message.MessageListener;
import tc.oc.api.message.MessageQueue;
import tc.oc.api.message.MessageService;
import tc.oc.api.message.types.FindMultiResponse;
import tc.oc.api.message.types.FindRequest;
import tc.oc.api.message.types.ModelDelete;
Expand All @@ -42,7 +42,8 @@ public abstract class ModelStore<T extends Model> implements MessageListener, Co

protected Logger logger;
protected @Inject QueryService<T> queryService;
protected @Inject MessageQueue primaryQueue;
protected @Inject
MessageService primaryQueue;
protected @Inject @ModelSync ExecutorService modelSync;
protected @Inject ModelDispatcher dispatcher;

Expand Down
Loading