Skip to content
12 changes: 11 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.openhim</groupId>
<artifactId>mediator-xds</artifactId>
<version>1.0.3</version>
<version>1.0.4-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
Expand Down Expand Up @@ -80,6 +80,16 @@
</executions>
</plugin>
</plugins>
<resources>
<resource>
<directory>src/main/resources-filtered</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<filtering>false</filtering>
</resource>
</resources>
</build>
<dependencies>
<dependency>
Expand Down
22 changes: 19 additions & 3 deletions src/main/java/org/openhim/mediator/dsub/DsubActor.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import javax.xml.ws.wsaddressing.W3CEndpointReference;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URL;
import java.net.URI;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
Expand All @@ -65,7 +67,7 @@ public DsubActor(MediatorConfig config) {

PullPointFactory pullPointFactory = new PullPointFactory(mongoDb);
SubscriptionRepository subRepo = new MongoSubscriptionRepository(mongoDb, log);
SubscriptionNotifier subNotifier = new SoapSubscriptionNotifier(config);
SubscriptionNotifier subNotifier = new SoapSubscriptionNotifier(config, log);

dsubService = new DsubServiceImpl(pullPointFactory, subRepo,
subNotifier, log);
Expand Down Expand Up @@ -149,7 +151,11 @@ private void handleSubscriptionMessage(Subscribe subscribeRequest) {
}
}
}
dsubService.createSubscription(uri, null, terminationDate);
if (parseUrl(uri) != null) {
dsubService.createSubscription(uri, null, terminationDate);
} else {
log.error("Subscription not registered. Invalid url: " + uri);
}
}

private Object parseMessage(MediatorHTTPRequest request) {
Expand All @@ -172,4 +178,14 @@ private <T> T getProperty(Object object, String name) {
throw new RuntimeException("Unable to read field: " + name, e);
}
}
}

private URL parseUrl(String url) {
try {
URI uri = new URL(url).toURI();
return uri.toURL();
}
catch (Exception e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ public interface DsubService {
void newDocumentForPullPoint(String docId, String facilityId);

List<String> getDocumentsForPullPoint(String facilityId);

Boolean subscriptionExists(String url, String facility);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ public void createSubscription(String url, String facilityQuery, Date terminateA
Subscription subscription = new Subscription(url,
terminateAt, facilityQuery);

subscriptionRepository.saveSubscription(subscription);
if (subscriptionExists(url, facilityQuery) == false) {
subscriptionRepository.saveSubscription(subscription);
} else {
log.error("unable to create subscription. Another one already exists for: " + url);
}
}

@Override
Expand All @@ -49,8 +53,15 @@ public void notifyNewDocument(String docId, String facilityId) {
List<Subscription> subscriptions = subscriptionRepository
.findActiveSubscriptions(facilityId);

log.info("Active subscriptions: {}", subscriptions.size());
for (Subscription sub : subscriptions) {
subscriptionNotifier.notifySubscription(sub, docId);
log.info("URL: {}", sub.getUrl());

try {
subscriptionNotifier.notifySubscription(sub, docId);
} catch (Exception ex) {
log.error("Error occured while sending notification. Unable to notify subscriber: " + sub.getUrl());
}
}
}

Expand All @@ -65,4 +76,22 @@ public List<String> getDocumentsForPullPoint(String locationId) {
PullPoint pullPoint = pullPointFactory.get(locationId);
return pullPoint.getDocumentIds();
}

@Override
public Boolean subscriptionExists(String url, String facility) {
Boolean subcriptionFound = false;
List<Subscription> subscriptions = subscriptionRepository
.findActiveSubscriptions(facility);

log.info("Active subscriptions: {}", subscriptions.size());
for (Subscription sub : subscriptions) {
log.info("URL: {}", sub.getUrl());
if (url.equals(sub.getUrl())) {
subcriptionFound = true;
break;
}
}

return subcriptionFound;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.event.LoggingAdapter;

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
Expand All @@ -17,10 +19,11 @@ public class SoapSubscriptionNotifier implements SubscriptionNotifier {

private MediatorConfig config;

private static final Logger LOGGER = LoggerFactory.getLogger(SoapSubscriptionNotifier.class);
private final LoggingAdapter logA;

public SoapSubscriptionNotifier(MediatorConfig config) {
public SoapSubscriptionNotifier(MediatorConfig config, LoggingAdapter logA) {
this.config = config;
this.logA = logA;
}

@Override
Expand All @@ -36,7 +39,7 @@ public void notifySubscription(Subscription subscription, String documentId) {
private void sendMessage(String url, byte[] body) {
HttpURLConnection con = null;
try {

logA.info("Connecting to: {}", url);
URL myurl = new URL(url);

con = (HttpURLConnection) myurl.openConnection();
Expand All @@ -50,7 +53,7 @@ private void sendMessage(String url, byte[] body) {
}

StringBuilder content;

try (BufferedReader in = new BufferedReader(
new InputStreamReader(con.getInputStream()))) {

Expand All @@ -63,9 +66,9 @@ private void sendMessage(String url, byte[] body) {
}
}

System.out.println(content.toString());
logA.info(content.toString());
} catch (IOException exception) {
LOGGER.error(exception.getMessage());
logA.error(exception, exception.getMessage());
} finally {
con.disconnect();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ public class ITI53NotifyMessage {
private String documentId;

private static final String TEMPLATE =
"------OPENHIM\n" +
"Content-Type: application/xop+xml; charset=utf-8; type=\"application/soap+xml\"\n" +
"\n" +
"<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
"<s:Envelope xmlns:s=\"http://www.w3.org/2003/05/soap-envelope\" " +
"xmlns:a=\"http://www.w3.org/2005/08/addressing\" " +
"xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" " +
"xmlns:wsnt=\"http://docs.oasis-open.org/wsn/b-2\" " +
"xmlns:xds=\"urn:ihe:iti:xds-b:2007\" " +
"xmlns:rim=\"urn:oasis:names:tc:ebxml-regrep:xsd:rim:3.0\" " +
"xmlns:lcm=\"urn:oasis:names:tc:ebxml-regrep:xsd:lcm:3.0\" " +
"xsi:schemaLocation=\"http://www.w3.org/2003/05/soap-envelope http://www.w3.org/2003/05/soapenvelope " +
"http://www.w3.org/2005/08/addressing http://www.w3.org/2005/08/addressing/ws-addr.xsd " +
"http://docs.oasis-open.org/wsn/b-2 http://docs.oasis-open.org/wsn/b-2.xsd urn:ihe:iti:xds-b:2007 " +
Expand Down Expand Up @@ -55,8 +53,7 @@ public class ITI53NotifyMessage {
"</wsnt:NotificationMessage>" +
"</wsnt:Notify>" +
"</s:Body>" +
"</s:Envelope>" +
"------OPENHIM--";
"</s:Envelope>";

public ITI53NotifyMessage(String recipientServerAddress, String brokerServerAddress, String documentId) {
this.messageId = UUID.randomUUID().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ private void forwardRequestToRepository() {

private void finalizeResponse(MediatorHTTPResponse response) {
if (StringUtil.isNotBlank(labOrderDocumentId)) {
log.info("Notifying DBUS {}", labOrderDocumentId);
NotifyNewDocument msg = new NotifyNewDocument(labOrderDocumentId);
dsubActor.tell(msg, getSelf());
}
Expand All @@ -273,6 +274,7 @@ public void onReceive(Object msg) throws Exception {
} else if (msg instanceof OrchestrateProvideAndRegisterRequestResponse) {
processProvideAndRegisterResponse((OrchestrateProvideAndRegisterRequestResponse) msg);
} else if (msg instanceof MediatorHTTPResponse) {
log.info("Finalizing response");
finalizeResponse((MediatorHTTPResponse) msg);
} else {
unhandled(msg);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"urn": "urn:mediator:xds",
"version": "1.0.2-SNAPSHOT",
"version": "${version}",
"name": "OpenHIE XDS.b Mediator",
"description": "An XDS.b mediator for OpenHIE integration",
"endpoints": [
Expand Down