From a056f5ac9630323a0a06af401b71c12d26a3e5c8 Mon Sep 17 00:00:00 2001 From: Pasquale Congiusti Date: Thu, 9 Oct 2025 10:42:27 +0200 Subject: [PATCH] chore(quickfixj): add examples They are taken "as is" from core code base [1] since we're cleaning from there. The idea is to maintain these examples as utilities in the examples repo instead. [1] https://github.com/apache/camel/tree/24e3fdbd40e0edd66cb2115f0710bd2e5d1b29db/components/camel-quickfix/src/test/java/org/apache/camel/component/quickfixj/examples --- pom.xml | 1 + quickfixj/pom.xml | 122 +++++ .../component/quickfixj/TestSupport.java | 120 +++++ .../examples/AuthenticationExample.java | 117 +++++ .../examples/DynamicRoutingExample.java | 117 +++++ .../examples/RequestReplyExample.java | 178 +++++++ .../examples/RestartRouteExample.java | 104 ++++ .../examples/SimpleMessagingExample.java | 104 ++++ .../examples/routing/FixMessageRouter.java | 107 ++++ .../examples/trading/MarketQuoteProvider.java | 27 ++ .../trading/QuickfixjMessageListener.java | 24 + .../examples/trading/TradeExecutor.java | 457 ++++++++++++++++++ .../trading/TradeExecutorComponent.java | 167 +++++++ .../trading/TradeExecutorExample.java | 136 ++++++ .../QuickfixjEventJsonTransformer.java | 54 +++ .../QuickfixjMessageJsonPrinter.java | 36 ++ .../QuickfixjMessageJsonTransformer.java | 110 +++++ .../util/CountDownLatchDecrementer.java | 42 ++ .../test/resources/examples/gateway.qf.cfg | 59 +++ .../test/resources/examples/inprocess.qf.cfg | 46 ++ .../src/test/resources/log4j2.properties | 28 ++ 21 files changed, 2156 insertions(+) create mode 100644 quickfixj/pom.xml create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/AuthenticationExample.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/RestartRouteExample.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorComponent.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjEventJsonTransformer.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonPrinter.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java create mode 100644 quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/util/CountDownLatchDecrementer.java create mode 100644 quickfixj/src/test/resources/examples/gateway.qf.cfg create mode 100644 quickfixj/src/test/resources/examples/inprocess.qf.cfg create mode 100644 quickfixj/src/test/resources/log4j2.properties diff --git a/pom.xml b/pom.xml index 51e1ecdad..7f7416fb3 100644 --- a/pom.xml +++ b/pom.xml @@ -122,6 +122,7 @@ minio mongodb netty-custom-correlation + quickfixj resume-api routeloader routetemplate diff --git a/quickfixj/pom.xml b/quickfixj/pom.xml new file mode 100644 index 000000000..cec2e21ce --- /dev/null +++ b/quickfixj/pom.xml @@ -0,0 +1,122 @@ + + + + 4.0.0 + + + org.apache.camel.example + camel-examples + 4.15.0-SNAPSHOT + + + camel-example-quickfixj + jar + Camel :: QuickFIX/J + Camel QuickFIX/J support + + + + + + + + + org.apache.camel + camel-bom + ${camel.version} + pom + import + + + + + + + org.apache.camel + camel-support + + + org.quickfixj + quickfixj-core + ${quickfixj-version} + + + org.quickfixj + quickfixj-messages-all + ${quickfixj-version} + + + + + org.apache.camel + camel-test-spring-junit5 + test + + + org.apache.camel + camel-jetty + test + + + org.apache.camel + camel-quickfix + test + + + org.apache.mina + mina-core + ${mina-version} + test + + + org.mockito + mockito-core + ${mockito-version} + test + + + org.hamcrest + hamcrest + ${hamcrest-version} + test + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + true + + true + + + + + + + diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java new file mode 100644 index 000000000..fd56a4e12 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/TestSupport.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Date; + +import org.mockito.Mockito; +import quickfix.Acceptor; +import quickfix.Application; +import quickfix.ConfigError; +import quickfix.DefaultSessionFactory; +import quickfix.LogFactory; +import quickfix.MessageFactory; +import quickfix.MessageStore; +import quickfix.MessageStoreFactory; +import quickfix.Session; +import quickfix.SessionFactory; +import quickfix.SessionID; +import quickfix.SessionSettings; +import quickfix.field.EmailThreadID; +import quickfix.field.EmailType; +import quickfix.field.Subject; +import quickfix.field.Text; +import quickfix.fix42.Email; + +public final class TestSupport { + private TestSupport() { + // Utility class + } + + public static void writeSettings(SessionSettings settings, File settingsFile) throws IOException { + FileOutputStream settingsOut = new FileOutputStream(settingsFile); + try { + settings.toStream(settingsOut); + } finally { + settingsOut.close(); + } + } + + public static void setSessionID(SessionSettings sessionSettings, SessionID sessionID) { + sessionSettings.setString(sessionID, SessionSettings.BEGINSTRING, sessionID.getBeginString()); + sessionSettings.setString(sessionID, SessionSettings.SENDERCOMPID, sessionID.getSenderCompID()); + sessionSettings.setString(sessionID, SessionSettings.TARGETCOMPID, sessionID.getTargetCompID()); + } + + public static Email createEmailMessage(String subject) { + Email email = new Email(new EmailThreadID("ID"), new EmailType(EmailType.NEW), new Subject(subject)); + Email.LinesOfText text = new Email.LinesOfText(); + text.set(new Text("Content")); + email.addGroup(text); + return email; + } + + public static Session createSession(SessionID sessionID) throws ConfigError, IOException { + MessageStoreFactory mockMessageStoreFactory = Mockito.mock(MessageStoreFactory.class); + MessageStore mockMessageStore = Mockito.mock(MessageStore.class); + Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date()); + + Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore); + + DefaultSessionFactory factory = new DefaultSessionFactory( + Mockito.mock(Application.class), + mockMessageStoreFactory, + Mockito.mock(LogFactory.class)); + + SessionSettings settings = new SessionSettings(); + settings.setLong(Session.SETTING_HEARTBTINT, 10); + settings.setString(Session.SETTING_START_TIME, "00:00:00"); + settings.setString(Session.SETTING_END_TIME, "00:00:00"); + settings.setString(SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE); + settings.setBool(Session.SETTING_USE_DATA_DICTIONARY, false); + + return factory.create(sessionID, settings); + } + + public static QuickfixjEngine createEngine() throws Exception { + return createEngine(false); + } + + public static QuickfixjEngine createEngine(boolean lazy) throws Exception { + SessionID sessionID = new SessionID("FIX.4.4:SENDER->TARGET"); + + MessageStoreFactory mockMessageStoreFactory = Mockito.mock(MessageStoreFactory.class); + MessageStore mockMessageStore = Mockito.mock(MessageStore.class); + Mockito.when(mockMessageStore.getCreationTime()).thenReturn(new Date()); + Mockito.when(mockMessageStoreFactory.create(sessionID)).thenReturn(mockMessageStore); + + SessionSettings settings = new SessionSettings(); + + settings.setLong(sessionID, Session.SETTING_HEARTBTINT, 10); + settings.setString(sessionID, Session.SETTING_START_TIME, "00:00:00"); + settings.setString(sessionID, Session.SETTING_END_TIME, "00:00:00"); + settings.setString(sessionID, SessionFactory.SETTING_CONNECTION_TYPE, SessionFactory.ACCEPTOR_CONNECTION_TYPE); + settings.setLong(sessionID, Acceptor.SETTING_SOCKET_ACCEPT_PORT, 8000); + settings.setBool(sessionID, Session.SETTING_USE_DATA_DICTIONARY, false); + + return new QuickfixjEngine( + "", settings, + mockMessageStoreFactory, + Mockito.mock(LogFactory.class), + Mockito.mock(MessageFactory.class), lazy); + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/AuthenticationExample.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/AuthenticationExample.java new file mode 100644 index 000000000..80300c478 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/AuthenticationExample.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelExchangeException; +import org.apache.camel.Exchange; +import org.apache.camel.builder.PredicateBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.quickfixj.QuickfixjEndpoint; +import org.apache.camel.component.quickfixj.QuickfixjEventCategory; +import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer; +import org.apache.camel.impl.DefaultCamelContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.FieldNotFound; +import quickfix.Message; +import quickfix.RejectLogon; +import quickfix.field.MsgType; +import quickfix.field.RawData; +import quickfix.field.RawDataLength; + +/** + * This example demonstrates several features of the QuickFIX/J component. It uses QFJ session events to synchronize + * application behavior (e.g., Session logon). + */ +public class AuthenticationExample { + private static final Logger LOG = LoggerFactory.getLogger(AuthenticationExample.class); + + public static void main(String[] args) throws Exception { + new AuthenticationExample().run(); + } + + public void run() throws Exception { + DefaultCamelContext context = new DefaultCamelContext(); + + final CountDownLatch logoutLatch = new CountDownLatch(1); + + RouteBuilder routes = new RouteBuilder() { + @Override + public void configure() { + // Modify the outgoing logon message to add a password + // The modified message will be sent from the FIX engine when the message exchange completes + from("quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:TRADER->MARKET").filter(PredicateBuilder.and( + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AdminMessageSent), + header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON))) + .bean(new CredentialInjector("PASSWORD")); + + // Release latch when the trader received a logout message + from("quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:TRADER->MARKET") + .filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogoff)) + .bean(new CountDownLatchDecrementer("logout", logoutLatch)); + + // Reject all logons on market side + // Demonstrates how to validate logons + from("quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:MARKET->TRADER").filter(PredicateBuilder.and( + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AdminMessageReceived), + header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.LOGON))).bean(new LogonAuthenticator()); + } + }; + + context.addRoutes(routes); + + LOG.info("Starting Camel context"); + context.start(); + + if (!logoutLatch.await(5L, TimeUnit.SECONDS)) { + throw new IllegalStateException("Logout was not received"); + } + + context.stop(); + + LOG.info("Example complete"); + } + + public static class LogonAuthenticator { + public void authenticate(Exchange exchange) throws RejectLogon, CamelExchangeException, FieldNotFound { + LOG.info("Acceptor is rejecting logon for {}", exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY)); + Message message = exchange.getIn().getMandatoryBody(Message.class); + if (message.isSetField(RawData.FIELD)) { + LOG.info("Invalid password: {}", message.getString(RawData.FIELD)); + } + throw new RejectLogon("Rejecting logon for test purposes"); + } + } + + public static class CredentialInjector { + private final String password; + + public CredentialInjector(String password) { + this.password = password; + } + + public void inject(Exchange exchange) throws CamelExchangeException { + LOG.info("Injecting password into outgoing logon message"); + Message message = exchange.getIn().getMandatoryBody(Message.class); + message.setString(RawData.FIELD, password); + message.setInt(RawDataLength.FIELD, password.length()); + } + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java new file mode 100644 index 000000000..2a3e72eec --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/DynamicRoutingExample.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Producer; +import org.apache.camel.builder.PredicateBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.quickfixj.QuickfixjEndpoint; +import org.apache.camel.component.quickfixj.QuickfixjEventCategory; +import org.apache.camel.component.quickfixj.TestSupport; +import org.apache.camel.component.quickfixj.examples.routing.FixMessageRouter; +import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonPrinter; +import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer; +import org.apache.camel.impl.DefaultCamelContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.field.DeliverToCompID; +import quickfix.field.MsgType; +import quickfix.fix42.Email; + +public class DynamicRoutingExample { + private static final Logger LOG = LoggerFactory.getLogger(DynamicRoutingExample.class); + + public static void main(String[] args) throws Exception { + new DynamicRoutingExample().sendMessage(); + } + + public void sendMessage() throws Exception { + DefaultCamelContext context = new DefaultCamelContext(); + + final CountDownLatch logonLatch = new CountDownLatch(4); + final CountDownLatch receivedMessageLatch = new CountDownLatch(1); + + RouteBuilder routes = new RouteBuilder() { + @Override + public void configure() throws Exception { + // Release latch when session logon events are received + // We expect four logon events (four sessions) + from("quickfix:examples/gateway.qf.cfg") + .filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)) + .bean(new CountDownLatchDecrementer("logon", logonLatch)); + + // Dynamic router -- Uses FIX DeliverTo tags + from("quickfix:examples/gateway.qf.cfg") + .filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY) + .isEqualTo(QuickfixjEventCategory.AppMessageReceived)) + .recipientList(method(new FixMessageRouter("quickfix:examples/gateway.qf.cfg"))); + + // Logger app messages as JSON + from("quickfix:examples/gateway.qf.cfg").filter(PredicateBuilder.or( + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived), + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageSent))) + .bean(new QuickfixjMessageJsonPrinter()); + + // If the trader@2 session receives an email then release the latch + from("quickfix:examples/gateway.qf.cfg?sessionID=FIX.4.2:TRADER@2->GATEWAY").filter(PredicateBuilder.and( + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived), + header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.EMAIL))) + .bean(new CountDownLatchDecrementer("message", receivedMessageLatch)); + } + }; + + context.addRoutes(routes); + + LOG.info("Starting Camel context"); + context.start(); + + // This is not strictly necessary, but it prevents the need for session + // synchronization due to app messages being sent before being logged on + if (!logonLatch.await(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Logon did not complete"); + } + + String gatewayUri = "quickfix:examples/gateway.qf.cfg?sessionID=FIX.4.2:TRADER@1->GATEWAY"; + Endpoint gatewayEndpoint = context.getEndpoint(gatewayUri); + Producer producer = gatewayEndpoint.createProducer(); + + Email email = TestSupport.createEmailMessage("Dynamic Routing Example"); + email.getHeader().setString(DeliverToCompID.FIELD, "TRADER@2"); + + LOG.info("Sending routed message"); + + Exchange exchange = producer.getEndpoint().createExchange(ExchangePattern.InOnly); + exchange.getIn().setBody(email); + producer.process(exchange); + + if (!receivedMessageLatch.await(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Message did not reach target"); + } + + LOG.info("Message received, shutting down Camel context"); + + context.stop(); + + LOG.info("Dynamic routing example complete"); + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java new file mode 100644 index 000000000..95239c651 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/RequestReplyExample.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.URL; +import java.net.URLConnection; +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Header; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.quickfixj.MessagePredicate; +import org.apache.camel.component.quickfixj.QuickfixjEndpoint; +import org.apache.camel.component.quickfixj.QuickfixjEventCategory; +import org.apache.camel.component.quickfixj.QuickfixjProducer; +import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonPrinter; +import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonTransformer; +import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.util.IOHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.FieldNotFound; +import quickfix.SessionID; +import quickfix.field.AvgPx; +import quickfix.field.ClOrdID; +import quickfix.field.CumQty; +import quickfix.field.ExecID; +import quickfix.field.ExecTransType; +import quickfix.field.ExecType; +import quickfix.field.LeavesQty; +import quickfix.field.MsgType; +import quickfix.field.OrdStatus; +import quickfix.field.OrderID; +import quickfix.field.Side; +import quickfix.field.Symbol; +import quickfix.fix42.ExecutionReport; +import quickfix.fix42.OrderStatusRequest; + +public class RequestReplyExample { + private static final Logger LOG = LoggerFactory.getLogger(RequestReplyExample.class); + + public static void main(String[] args) throws Exception { + new RequestReplyExample().run(); + } + + public void run() throws Exception { + final CamelContext context = new DefaultCamelContext(); + final CountDownLatch logonLatch = new CountDownLatch(1); + final String orderStatusServiceUrl = "http://localhost:9123/order/status"; + + RouteBuilder routes = new RouteBuilder() { + @Override + public void configure() throws Exception { + // Synchronize the logon so we don't start sending status requests too early + from("quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:TRADER->MARKET") + .filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)) + .bean(new CountDownLatchDecrementer("logon", logonLatch)); + + // Incoming status requests are passed to the order status service and afterwards we print out that + // order status being delivered using the json printer. + from("quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:MARKET->TRADER&exchangePattern=InOut") + .filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.ORDER_STATUS_REQUEST)) + .to("log://OrderStatusRequestLog?showAll=true&multiline=true") + .bean(new MarketOrderStatusService()) + .bean(new QuickfixjMessageJsonPrinter()); + + from("jetty:" + orderStatusServiceUrl) + .bean(new OrderStatusRequestTransformer()) + .routingSlip(method(FixSessionRouter.class, "route")) + .bean(new QuickfixjMessageJsonTransformer(), "transform(${body})"); + } + }; + + context.addRoutes(routes); + + LOG.info("Starting Camel context"); + context.start(); + + if (!logonLatch.await(5L, TimeUnit.SECONDS)) { + throw new IllegalStateException("Logon did not succeed"); + } + + // Send a request to the order status web service. + // Verify that the response is a JSON response. + + URL orderStatusUrl = new URL(orderStatusServiceUrl + "?sessionID=FIX.4.2:TRADER->MARKET&orderID=abc"); + URLConnection connection = orderStatusUrl.openConnection(); + BufferedReader orderStatusReply = IOHelper.buffered(new InputStreamReader(connection.getInputStream())); + String line = orderStatusReply.readLine(); + if (!line.equals("\"message\": {")) { + throw new Exception("Don't appear to be a JSON response"); + } else { + StringBuilder sb = new StringBuilder(); + while (line != null) { + sb.append(line); + sb.append('\n'); + line = orderStatusReply.readLine(); + } + LOG.info("Web reply:\n{}", sb); + } + orderStatusReply.close(); + + LOG.info("Shutting down Camel context"); + context.stop(); + + LOG.info("Example complete"); + } + + public static class OrderStatusRequestTransformer { + private static final Logger LOG = LoggerFactory.getLogger(OrderStatusRequestTransformer.class); + + public void transform(Exchange exchange) throws FieldNotFound { + // For the reply take the reverse sessionID into the account, see org.apache.camel.component.quickfixj.MessagePredicate + String requestSessionID = exchange.getIn().getHeader("sessionID", String.class); + String replySessionID = "FIX.4.2:MARKET->TRADER"; + LOG.info("Given the requestSessionID '{}' calculated the replySessionID as '{}'", requestSessionID, replySessionID); + + String orderID = exchange.getIn().getHeader("orderID", String.class); + + OrderStatusRequest request = new OrderStatusRequest(new ClOrdID("XYZ"), new Symbol("GOOG"), new Side(Side.BUY)); + request.set(new OrderID(orderID)); + + // Look for a reply execution report back to the requester session + // and having the requested OrderID. This is a loose correlation but the best + // we can do with FIX 4.2. Newer versions of FIX have an optional explicit correlation field. + exchange.setProperty(QuickfixjProducer.CORRELATION_CRITERIA_KEY, new MessagePredicate( + new SessionID(replySessionID), MsgType.EXECUTION_REPORT).withField(OrderID.FIELD, + request.getString(OrderID.FIELD))); + + exchange.getIn().setBody(request); + } + } + + public static class MarketOrderStatusService { + private static final Logger LOG = LoggerFactory.getLogger(MarketOrderStatusService.class); + + public ExecutionReport getOrderStatus(OrderStatusRequest request) throws FieldNotFound { + LOG.info("Received order status request for orderId={}", request.getOrderID().getValue()); + return new ExecutionReport( + request.getOrderID(), + new ExecID(UUID.randomUUID().toString()), + new ExecTransType(ExecTransType.STATUS), + new ExecType(ExecType.REJECTED), + new OrdStatus(OrdStatus.REJECTED), + new Symbol("GOOG"), + new Side(Side.BUY), + new LeavesQty(100), + new CumQty(0), + new AvgPx(0)); + } + } + + public static class FixSessionRouter { + public String route(@Header("sessionID") String sessionID) { + return String.format("quickfix:examples/inprocess.qf.cfg?sessionID=%s", sessionID); + } + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/RestartRouteExample.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/RestartRouteExample.java new file mode 100644 index 000000000..2ec169910 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/RestartRouteExample.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Producer; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.quickfixj.QuickfixjEndpoint; +import org.apache.camel.component.quickfixj.TestSupport; +import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer; +import org.apache.camel.impl.DefaultCamelContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.field.MsgType; +import quickfix.fix42.Email; + +/** + * Stopping a route should stop engine if no longer in use. And starting the route should start engine again. + */ +public class RestartRouteExample { + private static final Logger LOG = LoggerFactory.getLogger(RestartRouteExample.class); + + public static void main(String[] args) throws Exception { + new RestartRouteExample().sendMessage(); + } + + public void sendMessage() throws Exception { + DefaultCamelContext context = new DefaultCamelContext(); + + final CountDownLatch receivedMessageLatch = new CountDownLatch(2); + + RouteBuilder routes = new RouteBuilder() { + @Override + public void configure() { + from("quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:MARKET->TRADER").routeId("foo") + .filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.EMAIL)) + .bean(new CountDownLatchDecrementer("message", receivedMessageLatch)); + } + }; + + context.addRoutes(routes); + + LOG.info("Starting Camel context"); + context.start(); + + String marketUri = "quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:TRADER->MARKET"; + Producer producer = context.getEndpoint(marketUri).createProducer(); + + Email email = TestSupport.createEmailMessage("Example"); + Exchange exchange = producer.getEndpoint().createExchange(ExchangePattern.InOnly); + exchange.getIn().setBody(email); + producer.process(exchange); + + // wait a little before stopping + Thread.sleep(5000); + + // stop route + context.getRouteController().stopRoute("foo"); + + // wait a little before starting + Thread.sleep(5000); + + // start route again + context.getRouteController().startRoute("foo"); + + // wait a little before sending + Thread.sleep(5000); + + // send another email + producer = context.getEndpoint(marketUri).createProducer(); + email = TestSupport.createEmailMessage("Example2"); + exchange = producer.getEndpoint().createExchange(ExchangePattern.InOnly); + exchange.getIn().setBody(email); + producer.process(exchange); + + if (!receivedMessageLatch.await(30L, TimeUnit.SECONDS)) { + throw new IllegalStateException("Message did not reach market"); + } + + LOG.info("Message received, shutting down Camel context"); + + context.stop(); + + LOG.info("Example complete"); + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java new file mode 100644 index 000000000..45f2a7660 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/SimpleMessagingExample.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Producer; +import org.apache.camel.builder.PredicateBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.quickfixj.QuickfixjEndpoint; +import org.apache.camel.component.quickfixj.QuickfixjEventCategory; +import org.apache.camel.component.quickfixj.TestSupport; +import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonPrinter; +import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer; +import org.apache.camel.impl.DefaultCamelContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.field.MsgType; +import quickfix.fix42.Email; + +/** + * This example demonstrates several features of the QuickFIX/J component. It uses QFJ session events to synchronize + * application behavior (e.g., Session logon). + */ +public class SimpleMessagingExample { + private static final Logger LOG = LoggerFactory.getLogger(SimpleMessagingExample.class); + + public static void main(String[] args) throws Exception { + new SimpleMessagingExample().sendMessage(); + } + + public void sendMessage() throws Exception { + DefaultCamelContext context = new DefaultCamelContext(); + + final CountDownLatch logonLatch = new CountDownLatch(2); + final CountDownLatch receivedMessageLatch = new CountDownLatch(1); + + RouteBuilder routes = new RouteBuilder() { + @Override + public void configure() throws Exception { + // Release latch when session logon events are received + // We expect two events, one for the trader session and one for the market session + from("quickfix:examples/inprocess.qf.cfg") + .filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)) + .bean(new CountDownLatchDecrementer("logon", logonLatch)); + + // For all received messages, print the JSON-formatted message to stdout + from("quickfix:examples/inprocess.qf.cfg").filter(PredicateBuilder.or( + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AdminMessageReceived), + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived))) + .bean(new QuickfixjMessageJsonPrinter()); + + // If the market session receives an email then release the latch + from("quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:MARKET->TRADER") + .filter(header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.EMAIL)) + .bean(new CountDownLatchDecrementer("message", receivedMessageLatch)); + } + }; + + context.addRoutes(routes); + + LOG.info("Starting Camel context"); + context.start(); + + if (!logonLatch.await(5L, TimeUnit.SECONDS)) { + throw new IllegalStateException("Logon did not succeed"); + } + + String marketUri = "quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:TRADER->MARKET"; + Producer producer = context.getEndpoint(marketUri).createProducer(); + + Email email = TestSupport.createEmailMessage("Example"); + Exchange exchange = producer.getEndpoint().createExchange(ExchangePattern.InOnly); + exchange.getIn().setBody(email); + producer.process(exchange); + + if (!receivedMessageLatch.await(5L, TimeUnit.SECONDS)) { + throw new IllegalStateException("Message did not reach market"); + } + + LOG.info("Message received, shutting down Camel context"); + + context.stop(); + + LOG.info("Example complete"); + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java new file mode 100644 index 000000000..a14f37ed9 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/routing/FixMessageRouter.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.routing; + +import org.apache.camel.Exchange; +import org.apache.camel.RuntimeCamelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.FieldMap; +import quickfix.Message; +import quickfix.Message.Header; +import quickfix.SessionID; +import quickfix.field.BeginString; +import quickfix.field.DeliverToCompID; +import quickfix.field.DeliverToLocationID; +import quickfix.field.DeliverToSubID; +import quickfix.field.OnBehalfOfCompID; +import quickfix.field.OnBehalfOfLocationID; +import quickfix.field.OnBehalfOfSubID; +import quickfix.field.SenderCompID; +import quickfix.field.SenderLocationID; +import quickfix.field.SenderSubID; +import quickfix.field.TargetCompID; +import quickfix.field.TargetLocationID; +import quickfix.field.TargetSubID; + +/** + * Routes exchanges based on FIX-specific routing fields in the message. + */ +public class FixMessageRouter { + private static final Logger LOG = LoggerFactory.getLogger(FixMessageRouter.class); + + private final String engineUri; + + public FixMessageRouter(String engineUri) { + this.engineUri = engineUri; + } + + public String route(Exchange exchange) { + Message message = exchange.getIn().getBody(Message.class); + if (message != null) { + SessionID destinationSession = getDestinationSessionID(message); + if (destinationSession != null) { + String destinationUri = String.format("%s?sessionID=%s", engineUri, destinationSession); + LOG.debug("Routing destination: {}", destinationUri); + return destinationUri; + } + } + return null; + } + + private SessionID getDestinationSessionID(Message message) { + Header header = message.getHeader(); + String fixVersion = getField(header, BeginString.FIELD); + String destinationCompId = getField(header, DeliverToCompID.FIELD); + if (destinationCompId != null) { + String destinationSubId = getField(header, DeliverToSubID.FIELD); + String destinationLocationId = getField(header, DeliverToLocationID.FIELD); + + header.removeField(DeliverToCompID.FIELD); + header.removeField(DeliverToSubID.FIELD); + header.removeField(DeliverToLocationID.FIELD); + + String gatewayCompId = getField(header, TargetCompID.FIELD); + String gatewaySubId = getField(header, TargetSubID.FIELD); + String gatewayLocationId = getField(header, TargetLocationID.FIELD); + + header.setString(OnBehalfOfCompID.FIELD, getField(header, SenderCompID.FIELD)); + if (header.isSetField(SenderSubID.FIELD)) { + header.setString(OnBehalfOfSubID.FIELD, getField(header, SenderSubID.FIELD)); + } + if (header.isSetField(SenderLocationID.FIELD)) { + header.setString(OnBehalfOfLocationID.FIELD, getField(header, SenderLocationID.FIELD)); + } + + return new SessionID( + fixVersion, gatewayCompId, gatewaySubId, gatewayLocationId, + destinationCompId, destinationSubId, destinationLocationId, null); + } + return null; + } + + private String getField(FieldMap fieldMap, int tag) { + if (fieldMap.isSetField(tag)) { + try { + return fieldMap.getString(tag); + } catch (Exception e) { + throw RuntimeCamelException.wrapRuntimeCamelException(e); + } + } + return null; + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java new file mode 100644 index 000000000..121d5ee04 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/MarketQuoteProvider.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.trading; + +/** + * Trivial market data provider interface to allow plugins for alternative market data sources. + * + */ +public interface MarketQuoteProvider { + double getBid(String symbol); + + double getAsk(String symbol); +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java new file mode 100644 index 000000000..b69579fdd --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/QuickfixjMessageListener.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.trading; + +import quickfix.Message; +import quickfix.SessionID; + +public interface QuickfixjMessageListener { + void onMessage(SessionID sessionID, Message message) throws Exception; +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java new file mode 100644 index 000000000..0cefd2c2d --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutor.java @@ -0,0 +1,457 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.trading; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; + +import org.apache.camel.RuntimeCamelException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.DataDictionary; +import quickfix.DataDictionaryProvider; +import quickfix.FieldNotFound; +import quickfix.FixVersions; +import quickfix.IncorrectTagValue; +import quickfix.LogUtil; +import quickfix.Message; +import quickfix.MessageUtils; +import quickfix.Session; +import quickfix.SessionID; +import quickfix.SessionNotFound; +import quickfix.field.ApplVerID; +import quickfix.field.AvgPx; +import quickfix.field.CumQty; +import quickfix.field.ExecID; +import quickfix.field.ExecTransType; +import quickfix.field.ExecType; +import quickfix.field.LastPx; +import quickfix.field.LastQty; +import quickfix.field.LastShares; +import quickfix.field.LeavesQty; +import quickfix.field.OrdStatus; +import quickfix.field.OrdType; +import quickfix.field.OrderID; +import quickfix.field.OrderQty; +import quickfix.field.Price; +import quickfix.field.Side; +import quickfix.field.Symbol; + +/** + * Trade executor based on QFJ example "executor" (No Camel dependencies) + */ +public class TradeExecutor { + private static final Logger LOG = LoggerFactory.getLogger(TradeExecutor.class); + + private boolean alwaysFillLimitOrders; + private Set validOrderTypes = new HashSet<>(); + private MarketQuoteProvider marketQuoteProvider; + + private List listeners = new CopyOnWriteArrayList<>(); + + private int orderID; + private int execID; + + public TradeExecutor() { + setAlwaysFillLimitOrders(true); + + Set validOrderTypes = new HashSet<>(); + validOrderTypes.add(OrdType.LIMIT + ""); + validOrderTypes.add(OrdType.MARKET + ""); + setValidOrderTypes(validOrderTypes); + + setMarketQuoteProvider(new DefaultMarketQuoteProvider(10.00)); + } + + public void setAlwaysFillLimitOrders(boolean alwaysFillLimitOrders) { + this.alwaysFillLimitOrders = alwaysFillLimitOrders; + } + + public void setMarketQuoteProvider(MarketQuoteProvider marketQuoteProvider) { + this.marketQuoteProvider = marketQuoteProvider; + } + + public void setValidOrderTypes(String validOrderTypes) { + setValidOrderTypes(new HashSet<>(Arrays.asList(validOrderTypes.split("\\s*,\\s*")))); + } + + public void setValidOrderTypes(Set validOrderTypes) { + this.validOrderTypes = validOrderTypes; + } + + public void addListener(QuickfixjMessageListener listener) { + listeners.add(listener); + } + + public void removeListener(QuickfixjMessageListener listener) { + listeners.remove(listener); + } + + public void execute(final Message message) { + final SessionID sessionID = MessageUtils.getSessionID(message); + + try { + if (message instanceof quickfix.fix40.NewOrderSingle) { + onMessage((quickfix.fix40.NewOrderSingle) message, sessionID); + } else if (message instanceof quickfix.fix41.NewOrderSingle) { + onMessage((quickfix.fix41.NewOrderSingle) message, sessionID); + } else if (message instanceof quickfix.fix42.NewOrderSingle) { + onMessage((quickfix.fix42.NewOrderSingle) message, sessionID); + } else if (message instanceof quickfix.fix43.NewOrderSingle) { + onMessage((quickfix.fix43.NewOrderSingle) message, sessionID); + } else if (message instanceof quickfix.fix44.NewOrderSingle) { + onMessage((quickfix.fix44.NewOrderSingle) message, sessionID); + } else if (message instanceof quickfix.fix50.NewOrderSingle) { + onMessage((quickfix.fix50.NewOrderSingle) message, sessionID); + } + } catch (Exception e) { + LOG.error("Error submitting execution task", e); + } + } + + private void onMessage(quickfix.fix40.NewOrderSingle order, SessionID sessionID) + throws FieldNotFound, IncorrectTagValue { + try { + validateOrder(order); + + OrderQty orderQty = order.getOrderQty(); + + Price price = getPrice(order); + + quickfix.fix40.ExecutionReport accept = new quickfix.fix40.ExecutionReport( + genOrderID(), genExecID(), + new ExecTransType(ExecTransType.NEW), new OrdStatus(OrdStatus.NEW), order.getSymbol(), order.getSide(), + orderQty, new LastShares(0), new LastPx(0), new CumQty(0), new AvgPx(0)); + + accept.set(order.getClOrdID()); + sendMessage(sessionID, accept); + + if (isOrderExecutable(order, price)) { + quickfix.fix40.ExecutionReport fill = new quickfix.fix40.ExecutionReport( + genOrderID(), genExecID(), + new ExecTransType(ExecTransType.NEW), new OrdStatus(OrdStatus.FILLED), order.getSymbol(), order + .getSide(), + orderQty, new LastShares(orderQty.getValue()), new LastPx(price.getValue()), + new CumQty(orderQty.getValue()), new AvgPx(price.getValue())); + + fill.set(order.getClOrdID()); + + sendMessage(sessionID, fill); + } + } catch (RuntimeException e) { + LogUtil.logThrowable(sessionID, e.getMessage(), e); + } + } + + private boolean isOrderExecutable(Message order, Price price) throws FieldNotFound { + if (order.getChar(OrdType.FIELD) == OrdType.LIMIT) { + BigDecimal limitPrice = new BigDecimal(order.getString(Price.FIELD)); + char side = order.getChar(Side.FIELD); + BigDecimal thePrice = new BigDecimal(price.getValue()); + + return side == Side.BUY && thePrice.compareTo(limitPrice) <= 0 + || (side == Side.SELL || side == Side.SELL_SHORT) && thePrice.compareTo(limitPrice) >= 0; + } + return true; + } + + private Price getPrice(Message message) throws FieldNotFound { + Price price; + if (message.getChar(OrdType.FIELD) == OrdType.LIMIT && alwaysFillLimitOrders) { + price = new Price(message.getDouble(Price.FIELD)); + } else { + if (marketQuoteProvider == null) { + throw new RuntimeCamelException("No market data provider specified for market order"); + } + char side = message.getChar(Side.FIELD); + if (side == Side.BUY) { + price = new Price(marketQuoteProvider.getAsk(message.getString(Symbol.FIELD))); + } else if (side == Side.SELL || side == Side.SELL_SHORT) { + price = new Price(marketQuoteProvider.getBid(message.getString(Symbol.FIELD))); + } else { + throw new RuntimeCamelException("Invalid order side: " + side); + } + } + return price; + } + + private void sendMessage(SessionID sessionID, Message message) { + try { + Session session = Session.lookupSession(sessionID); + if (session == null) { + throw new SessionNotFound(sessionID.toString()); + } + + DataDictionaryProvider provider = session.getDataDictionaryProvider(); + if (provider != null) { + try { + ApplVerID applVerID = getApplVerID(session, message); + DataDictionary appDataDictionary = provider.getApplicationDataDictionary(applVerID); + appDataDictionary.validate(message, true); + } catch (Exception e) { + LogUtil.logThrowable(sessionID, "Outgoing message failed validation: " + + e.getMessage(), + e); + return; + } + } + + for (QuickfixjMessageListener listener : listeners) { + try { + listener.onMessage(sessionID, message); + } catch (Throwable e) { + LogUtil.logThrowable(sessionID, "Error while dispatching message", e); + } + } + + } catch (SessionNotFound e) { + LOG.error(e.getMessage(), e); + } + } + + private ApplVerID getApplVerID(Session session, Message message) { + String beginString = session.getSessionID().getBeginString(); + if (FixVersions.BEGINSTRING_FIXT11.equals(beginString)) { + return new ApplVerID(ApplVerID.FIX50); + } else { + return MessageUtils.toApplVerID(beginString); + } + } + + private void onMessage(quickfix.fix41.NewOrderSingle order, SessionID sessionID) + throws FieldNotFound, IncorrectTagValue { + try { + validateOrder(order); + + OrderQty orderQty = order.getOrderQty(); + Price price = getPrice(order); + + quickfix.fix41.ExecutionReport accept = new quickfix.fix41.ExecutionReport( + genOrderID(), genExecID(), + new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), order + .getSymbol(), + order.getSide(), orderQty, new LastShares(0), new LastPx(0), new LeavesQty(0), + new CumQty(0), new AvgPx(0)); + + accept.set(order.getClOrdID()); + sendMessage(sessionID, accept); + + if (isOrderExecutable(order, price)) { + quickfix.fix41.ExecutionReport executionReport = new quickfix.fix41.ExecutionReport( + genOrderID(), + genExecID(), new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.FILL), new OrdStatus( + OrdStatus.FILLED), + order.getSymbol(), order.getSide(), orderQty, new LastShares( + orderQty + .getValue()), + new LastPx(price.getValue()), new LeavesQty(0), new CumQty( + orderQty + .getValue()), + new AvgPx(price.getValue())); + + executionReport.set(order.getClOrdID()); + + sendMessage(sessionID, executionReport); + } + } catch (RuntimeException e) { + LogUtil.logThrowable(sessionID, e.getMessage(), e); + } + } + + private void onMessage(quickfix.fix42.NewOrderSingle order, SessionID sessionID) + throws FieldNotFound, IncorrectTagValue { + try { + validateOrder(order); + + OrderQty orderQty = order.getOrderQty(); + Price price = getPrice(order); + + quickfix.fix42.ExecutionReport accept = new quickfix.fix42.ExecutionReport( + genOrderID(), genExecID(), + new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), order + .getSymbol(), + order.getSide(), new LeavesQty(0), new CumQty(0), new AvgPx(0)); + + accept.set(order.getClOrdID()); + sendMessage(sessionID, accept); + + if (isOrderExecutable(order, price)) { + quickfix.fix42.ExecutionReport executionReport = new quickfix.fix42.ExecutionReport( + genOrderID(), + genExecID(), new ExecTransType(ExecTransType.NEW), new ExecType(ExecType.FILL), + new OrdStatus(OrdStatus.FILLED), order.getSymbol(), order.getSide(), new LeavesQty(0), + new CumQty(orderQty.getValue()), new AvgPx(price.getValue())); + + executionReport.set(order.getClOrdID()); + executionReport.set(orderQty); + executionReport.set(new LastShares(orderQty.getValue())); + executionReport.set(new LastPx(price.getValue())); + + sendMessage(sessionID, executionReport); + } + } catch (RuntimeException e) { + LogUtil.logThrowable(sessionID, e.getMessage(), e); + } + } + + private void validateOrder(Message order) throws IncorrectTagValue, FieldNotFound { + OrdType ordType = new OrdType(order.getChar(OrdType.FIELD)); + if (!validOrderTypes.contains(Character.toString(ordType.getValue()))) { + LOG.error("Order type not in ValidOrderTypes setting"); + throw new IncorrectTagValue(ordType.getField()); + } + if (ordType.getValue() == OrdType.MARKET && marketQuoteProvider == null) { + LOG.error("DefaultMarketPrice setting not specified for market order"); + throw new IncorrectTagValue(ordType.getField()); + } + } + + private void onMessage(quickfix.fix43.NewOrderSingle order, SessionID sessionID) + throws FieldNotFound, IncorrectTagValue { + try { + validateOrder(order); + + OrderQty orderQty = order.getOrderQty(); + Price price = getPrice(order); + + quickfix.fix43.ExecutionReport accept = new quickfix.fix43.ExecutionReport( + genOrderID(), genExecID(), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), + order.getSide(), new LeavesQty(order.getOrderQty().getValue()), new CumQty(0), new AvgPx(0)); + + accept.set(order.getClOrdID()); + accept.set(order.getSymbol()); + sendMessage(sessionID, accept); + + if (isOrderExecutable(order, price)) { + quickfix.fix43.ExecutionReport executionReport = new quickfix.fix43.ExecutionReport( + genOrderID(), + genExecID(), new ExecType(ExecType.FILL), new OrdStatus(OrdStatus.FILLED), order.getSide(), + new LeavesQty(0), new CumQty(orderQty.getValue()), new AvgPx(price.getValue())); + + executionReport.set(order.getClOrdID()); + executionReport.set(order.getSymbol()); + executionReport.set(orderQty); + executionReport.set(new LastQty(orderQty.getValue())); + executionReport.set(new LastPx(price.getValue())); + + sendMessage(sessionID, executionReport); + } + } catch (RuntimeException e) { + LogUtil.logThrowable(sessionID, e.getMessage(), e); + } + } + + private void onMessage(quickfix.fix44.NewOrderSingle order, SessionID sessionID) + throws FieldNotFound, IncorrectTagValue { + try { + validateOrder(order); + + OrderQty orderQty = order.getOrderQty(); + Price price = getPrice(order); + + quickfix.fix44.ExecutionReport accept = new quickfix.fix44.ExecutionReport( + genOrderID(), genExecID(), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), + order.getSide(), new LeavesQty(order.getOrderQty().getValue()), new CumQty(0), new AvgPx(0)); + + accept.set(order.getClOrdID()); + accept.set(order.getSymbol()); + sendMessage(sessionID, accept); + + if (isOrderExecutable(order, price)) { + quickfix.fix44.ExecutionReport executionReport = new quickfix.fix44.ExecutionReport( + genOrderID(), + genExecID(), new ExecType(ExecType.FILL), new OrdStatus(OrdStatus.FILLED), order.getSide(), + new LeavesQty(0), new CumQty(orderQty.getValue()), new AvgPx(price.getValue())); + + executionReport.set(order.getClOrdID()); + executionReport.set(order.getSymbol()); + executionReport.set(orderQty); + executionReport.set(new LastQty(orderQty.getValue())); + executionReport.set(new LastPx(price.getValue())); + + sendMessage(sessionID, executionReport); + } + } catch (RuntimeException e) { + LogUtil.logThrowable(sessionID, e.getMessage(), e); + } + } + + private void onMessage(quickfix.fix50.NewOrderSingle order, SessionID sessionID) + throws FieldNotFound, IncorrectTagValue { + try { + validateOrder(order); + + OrderQty orderQty = order.getOrderQty(); + Price price = getPrice(order); + + quickfix.fix50.ExecutionReport accept = new quickfix.fix50.ExecutionReport( + genOrderID(), genExecID(), new ExecType(ExecType.NEW), new OrdStatus(OrdStatus.NEW), + order.getSide(), new LeavesQty(order.getOrderQty().getValue()), new CumQty(0)); + + accept.set(order.getClOrdID()); + accept.set(order.getSymbol()); + sendMessage(sessionID, accept); + + if (isOrderExecutable(order, price)) { + quickfix.fix50.ExecutionReport executionReport = new quickfix.fix50.ExecutionReport( + genOrderID(), genExecID(), new ExecType(ExecType.FILL), new OrdStatus(OrdStatus.FILLED), + order.getSide(), new LeavesQty(0), new CumQty(orderQty.getValue())); + + executionReport.set(order.getClOrdID()); + executionReport.set(order.getSymbol()); + executionReport.set(orderQty); + executionReport.set(new LastQty(orderQty.getValue())); + executionReport.set(new LastPx(price.getValue())); + executionReport.set(new AvgPx(price.getValue())); + + sendMessage(sessionID, executionReport); + } + } catch (RuntimeException e) { + LogUtil.logThrowable(sessionID, e.getMessage(), e); + } + } + + public OrderID genOrderID() { + return new OrderID(Integer.valueOf(++orderID).toString()); + } + + public ExecID genExecID() { + return new ExecID(Integer.valueOf(++execID).toString()); + } + + private static class DefaultMarketQuoteProvider implements MarketQuoteProvider { + private double defaultMarketPrice; + + DefaultMarketQuoteProvider(double defaultMarketPrice) { + this.defaultMarketPrice = defaultMarketPrice; + } + + @Override + public double getAsk(String symbol) { + return defaultMarketPrice; + } + + @Override + public double getBid(String symbol) { + return defaultMarketPrice; + } + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorComponent.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorComponent.java new file mode 100644 index 000000000..b1076248a --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorComponent.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.trading; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +import org.apache.camel.Consumer; +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.component.quickfixj.QuickfixjEventCategory; +import org.apache.camel.component.quickfixj.converter.QuickfixjConverters; +import org.apache.camel.support.DefaultComponent; +import org.apache.camel.support.DefaultConsumer; +import org.apache.camel.support.DefaultEndpoint; +import org.apache.camel.support.DefaultProducer; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.Message; +import quickfix.Message.Header; +import quickfix.SessionID; +import quickfix.field.SenderCompID; +import quickfix.field.SenderLocationID; +import quickfix.field.SenderSubID; +import quickfix.field.TargetCompID; +import quickfix.field.TargetLocationID; +import quickfix.field.TargetSubID; + +/** + * Adapts the TradeExecutor for use as a Camel endpoint. + * + * @see TradeExecutor + */ +public class TradeExecutorComponent extends DefaultComponent { + private static final Logger LOG = LoggerFactory.getLogger(TradeExecutorComponent.class); + + private Map endpoints = new HashMap<>(); + private final Executor executor; + + public TradeExecutorComponent() { + this(Executors.newCachedThreadPool(new TradeExecutorThreadFactory())); + } + + private static class TradeExecutorThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable r) { + Thread thread = new Thread(r, "Trade Executor"); + thread.setDaemon(true); + return thread; + } + } + + public TradeExecutorComponent(Executor executor) { + this.executor = executor; + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map parameters) throws Exception { + synchronized (endpoints) { + Endpoint endpoint = endpoints.get(uri); + if (endpoint == null) { + endpoint = new TradeExecutorEndpoint(uri, new TradeExecutor()); + endpoints.put(uri, (TradeExecutorEndpoint) endpoint); + LOG.info("Created trade executor: {}", uri); + } + return endpoint; + } + } + + private class TradeExecutorEndpoint extends DefaultEndpoint { + private final TradeExecutor tradeExecutor; + private List processors = new CopyOnWriteArrayList<>(); + + TradeExecutorEndpoint(String uri, TradeExecutor tradeExecutor) { + super(uri, TradeExecutorComponent.this); + this.tradeExecutor = tradeExecutor; + tradeExecutor.addListener(new QuickfixjMessageListener() { + @Override + public void onMessage(SessionID sessionID, Message message) throws Exception { + // Inject session ID into message so producer will know where to send it + Header header = message.getHeader(); + setOptionalField(header, sessionID, SenderCompID.FIELD, sessionID.getTargetCompID()); + setOptionalField(header, sessionID, SenderSubID.FIELD, sessionID.getTargetSubID()); + setOptionalField(header, sessionID, SenderLocationID.FIELD, sessionID.getTargetLocationID()); + setOptionalField(header, sessionID, TargetCompID.FIELD, sessionID.getSenderCompID()); + setOptionalField(header, sessionID, TargetSubID.FIELD, sessionID.getSenderSubID()); + setOptionalField(header, sessionID, TargetLocationID.FIELD, sessionID.getSenderLocationID()); + + Exchange exchange = QuickfixjConverters.toExchange( + TradeExecutorEndpoint.this, sessionID, message, + QuickfixjEventCategory.AppMessageReceived); + + for (Processor processor : processors) { + processor.process(exchange); + } + } + + private void setOptionalField(Header header, SessionID sessionID, int tag, String value) { + if (!ObjectHelper.isEmpty(value)) { + header.setString(tag, value); + } + } + }); + } + + @Override + public Producer createProducer() { + return new DefaultProducer(this) { + @Override + public void process(final Exchange exchange) { + executor.execute(new Runnable() { + @Override + public void run() { + try { + tradeExecutor.execute(exchange.getIn().getMandatoryBody(Message.class)); + } catch (Exception e) { + LOG.error("Error during trade execution", e); + } + } + }); + } + }; + } + + @Override + public Consumer createConsumer(Processor processor) { + return new DefaultConsumer(this, processor) { + @Override + protected void doStart() { + processors.add(getProcessor()); + } + + @Override + protected void doStop() { + processors.remove(getProcessor()); + } + }; + } + + @Override + public boolean isSingleton() { + return false; + } + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java new file mode 100644 index 000000000..03f682d05 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/trading/TradeExecutorExample.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.trading; + +import java.time.LocalDateTime; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.Endpoint; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangePattern; +import org.apache.camel.Producer; +import org.apache.camel.builder.PredicateBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.quickfixj.QuickfixjEndpoint; +import org.apache.camel.component.quickfixj.QuickfixjEventCategory; +import org.apache.camel.component.quickfixj.examples.transform.QuickfixjMessageJsonPrinter; +import org.apache.camel.component.quickfixj.examples.util.CountDownLatchDecrementer; +import org.apache.camel.impl.DefaultCamelContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.field.ClOrdID; +import quickfix.field.HandlInst; +import quickfix.field.MsgType; +import quickfix.field.OrdType; +import quickfix.field.OrderQty; +import quickfix.field.Price; +import quickfix.field.Side; +import quickfix.field.Symbol; +import quickfix.field.TransactTime; +import quickfix.fix42.NewOrderSingle; + +public class TradeExecutorExample { + private static final Logger LOG = LoggerFactory.getLogger(TradeExecutorExample.class); + + public static void main(String[] args) throws Exception { + new TradeExecutorExample().sendMessage(); + } + + public void sendMessage() throws Exception { + DefaultCamelContext context = new DefaultCamelContext(); + context.addComponent("trade-executor", new TradeExecutorComponent()); + + final CountDownLatch logonLatch = new CountDownLatch(2); + final CountDownLatch executionReportLatch = new CountDownLatch(2); + + RouteBuilder routes = new RouteBuilder() { + @Override + public void configure() throws Exception { + // Release latch when session logon events are received + from("quickfix:examples/inprocess.qf.cfg") + .filter(header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.SessionLogon)) + .bean(new CountDownLatchDecrementer("logon", logonLatch)); + + from("quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:MARKET->TRADER").filter( + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived)) + .to("trade-executor:market"); + + from("trade-executor:market").to("quickfix:examples/inprocess.qf.cfg"); + + // Logger app messages as JSON + from("quickfix:examples/inprocess.qf.cfg").filter(PredicateBuilder.or( + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived), + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageSent))) + .bean(new QuickfixjMessageJsonPrinter()); + + // Release latch when trader receives execution report + from("quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:TRADER->MARKET").filter(PredicateBuilder.and( + header(QuickfixjEndpoint.EVENT_CATEGORY_KEY).isEqualTo(QuickfixjEventCategory.AppMessageReceived), + header(QuickfixjEndpoint.MESSAGE_TYPE_KEY).isEqualTo(MsgType.EXECUTION_REPORT))) + .bean(new CountDownLatchDecrementer("execution report", executionReportLatch)); + } + }; + + context.addRoutes(routes); + + LOG.info("Starting Camel context"); + context.start(); + + // This is not strictly necessary, but it prevents the need for session + // synchronization due to app messages being sent before being logged on + if (!logonLatch.await(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Logon did not complete"); + } + + String gatewayUri = "quickfix:examples/inprocess.qf.cfg?sessionID=FIX.4.2:TRADER->MARKET"; + Endpoint gatewayEndpoint = context.getEndpoint(gatewayUri); + Producer producer = gatewayEndpoint.createProducer(); + + LOG.info("Sending order"); + + NewOrderSingle order = createNewOrderMessage(); + Exchange exchange = producer.getEndpoint().createExchange(ExchangePattern.InOnly); + exchange.getIn().setBody(order); + producer.process(exchange); + + if (!executionReportLatch.await(5, TimeUnit.SECONDS)) { + throw new IllegalStateException("Did not receive execution reports"); + } + + LOG.info("Message received, shutting down Camel context"); + + context.stop(); + + LOG.info("Order execution example complete"); + } + + private NewOrderSingle createNewOrderMessage() { + NewOrderSingle order = new NewOrderSingle( + new ClOrdID("CLIENT_ORDER_ID"), + new HandlInst('1'), + new Symbol("GOOG"), + new Side(Side.BUY), + new TransactTime(LocalDateTime.now()), + new OrdType(OrdType.LIMIT)); + + order.set(new OrderQty(10)); + order.set(new Price(300.00)); + + return order; + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjEventJsonTransformer.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjEventJsonTransformer.java new file mode 100644 index 000000000..a78ebe86b --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjEventJsonTransformer.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.transform; + +import org.apache.camel.Exchange; +import org.apache.camel.component.quickfixj.QuickfixjEndpoint; +import quickfix.DataDictionary; +import quickfix.Message; +import quickfix.Session; +import quickfix.SessionID; + +public class QuickfixjEventJsonTransformer { + private final QuickfixjMessageJsonTransformer renderer; + + public QuickfixjEventJsonTransformer() { + renderer = new QuickfixjMessageJsonTransformer(); + } + + public String transform(Exchange exchange) { + SessionID sessionID = exchange.getIn().getHeader(QuickfixjEndpoint.SESSION_ID_KEY, SessionID.class); + Session session = Session.lookupSession(sessionID); + DataDictionary dataDictionary = session.getDataDictionary(); + + if (dataDictionary == null) { + throw new IllegalStateException("No Data Dictionary. Exchange must reference an existing session"); + } + + StringBuilder sb = new StringBuilder(); + sb.append("\"event\": {\n"); + + org.apache.camel.Message in = exchange.getIn(); + for (String key : in.getHeaders().keySet()) { + sb.append(" \"").append(key).append("\": ").append(in.getHeader(key)).append(",\n"); + } + + sb.append(renderer.transform(in.getBody(Message.class), " ", dataDictionary)).append("\n"); + sb.append("}\n"); + return sb.toString(); + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonPrinter.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonPrinter.java new file mode 100644 index 000000000..2b70acfc0 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonPrinter.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.transform; + +import org.apache.camel.Exchange; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import quickfix.ConfigError; + +public class QuickfixjMessageJsonPrinter { + + private static final Logger LOG = LoggerFactory.getLogger(QuickfixjMessageJsonPrinter.class); + private QuickfixjEventJsonTransformer formatter; + + public QuickfixjMessageJsonPrinter() throws ConfigError { + formatter = new QuickfixjEventJsonTransformer(); + } + + public void print(Exchange exchange) { + LOG.info(formatter.transform(exchange)); + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java new file mode 100644 index 000000000..4a0ee3aac --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/transform/QuickfixjMessageJsonTransformer.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.transform; + +import java.util.Iterator; + +import quickfix.DataDictionary; +import quickfix.Field; +import quickfix.FieldMap; +import quickfix.FieldType; +import quickfix.Group; +import quickfix.Message; +import quickfix.MessageUtils; +import quickfix.Session; +import quickfix.SessionID; + +public class QuickfixjMessageJsonTransformer { + + public String transform(Message message) { + SessionID sessionID = MessageUtils.getSessionID(message); + Session session = Session.lookupSession(sessionID); + DataDictionary dataDictionary = session.getDataDictionary(); + + if (dataDictionary == null) { + throw new IllegalStateException("No Data Dictionary. Exchange must reference an existing session"); + } + + return transform(message, dataDictionary); + } + + public String transform(Message message, DataDictionary dataDictionary) { + return transform(message, "", dataDictionary); + } + + public String transform(Message message, String indent, DataDictionary dd) { + StringBuilder sb = new StringBuilder(); + sb.append(indent).append("\"message\": "); + if (message == null) { + sb.append("null"); + } else { + sb.append("{\n"); + String contentIndent = indent + " "; + + transform("header", message.getHeader(), sb, contentIndent, dd); + sb.append("\n"); + + transform("body", message, sb, contentIndent, dd); + sb.append("\n"); + + transform("trailer", message.getTrailer(), sb, contentIndent, dd); + sb.append("\n"); + + sb.append(indent).append("}"); + } + return sb.toString(); + } + + private void transform(String name, FieldMap fieldMap, StringBuilder sb, String indent, DataDictionary dd) { + sb.append(indent).append("\"").append(name).append("\": {\n"); + int fieldCount = 0; + Iterator> fieldIterator = fieldMap.iterator(); + while (fieldIterator.hasNext()) { + if (fieldCount > 0) { + sb.append(",\n"); + } + Field field = fieldIterator.next(); + sb.append(indent).append(" \"").append(dd.getFieldName(field.getField())).append("\": "); + if (dd.hasFieldValue(field.getField())) { + int tag = field.getField(); + sb.append("[ \"").append(field.getObject().toString()).append("\", \"") + .append(dd.getValueName(tag, field.getObject().toString())).append("\" ]"); + } else { + FieldType fieldType = dd.getFieldType(field.getField()); + if (Number.class.isAssignableFrom(fieldType.getJavaType())) { + sb.append(field.getObject()); + } else { + sb.append("\"").append(field.getObject().toString()).append("\""); + } + } + fieldCount++; + } + + sb.append("\n"); + + Iterator groupKeys = fieldMap.groupKeyIterator(); + while (groupKeys.hasNext()) { + int groupTag = groupKeys.next(); + for (Group group : fieldMap.getGroups(groupTag)) { + String groupName = dd.getFieldName(groupTag); + transform(groupName, group, sb, indent + " ", dd); + } + } + + sb.append(indent).append("}").append("\n"); + } +} diff --git a/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/util/CountDownLatchDecrementer.java b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/util/CountDownLatchDecrementer.java new file mode 100644 index 000000000..4938ede42 --- /dev/null +++ b/quickfixj/src/test/java/org/apache/camel/component/quickfixj/examples/util/CountDownLatchDecrementer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.quickfixj.examples.util; + +import java.util.concurrent.CountDownLatch; + +import org.apache.camel.Exchange; +import org.apache.camel.Handler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CountDownLatchDecrementer { + private static final Logger LOG = LoggerFactory.getLogger(CountDownLatchDecrementer.class); + + private String label; + private CountDownLatch latch; + + public CountDownLatchDecrementer(String label, CountDownLatch latch) { + this.label = label; + this.latch = latch; + } + + @Handler + public void decrement(Exchange exchange) { + LOG.info("Decrementing latch count: {}", label); + latch.countDown(); + } +} diff --git a/quickfixj/src/test/resources/examples/gateway.qf.cfg b/quickfixj/src/test/resources/examples/gateway.qf.cfg new file mode 100644 index 000000000..5e6b4605a --- /dev/null +++ b/quickfixj/src/test/resources/examples/gateway.qf.cfg @@ -0,0 +1,59 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +# +# Creates an initiators and acceptors that communicate within a +# VM (no external socket connections). +# +[default] +SocketAcceptProtocol=VM_PIPE +SocketAcceptPort=7001 +SocketConnectProtocol=VM_PIPE +SocketConnectPort=7001 +FileLogPath=target/log + +# +# Initiator for simulating trader #1 +# +[session] +ConnectionType=initiator +BeginString=FIX.4.2 +SenderCompID=TRADER@1 +TargetCompID=GATEWAY + +# +# Initiator for simulating trader #2 +# +[session] +ConnectionType=initiator +BeginString=FIX.4.2 +SenderCompID=TRADER@2 +TargetCompID=GATEWAY + +# +# Acceptor for simulating the routing gateway +# +[session] +ConnectionType=acceptor +BeginString=FIX.4.2 +SenderCompID=GATEWAY +TargetCompID=TRADER@1 + +[session] +ConnectionType=acceptor +BeginString=FIX.4.2 +SenderCompID=GATEWAY +TargetCompID=TRADER@2 diff --git a/quickfixj/src/test/resources/examples/inprocess.qf.cfg b/quickfixj/src/test/resources/examples/inprocess.qf.cfg new file mode 100644 index 000000000..5a3bd5987 --- /dev/null +++ b/quickfixj/src/test/resources/examples/inprocess.qf.cfg @@ -0,0 +1,46 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- +# +# Creates an initiator and acceptor that communicate within a +# VM (no external socket connections). +# +[default] +UseJmx=Y +SocketAcceptProtocol=VM_PIPE +SocketConnectProtocol=VM_PIPE +HeartBtInt=120 +FileLogPath=target/log + +# +# Initiator for simulating a trader +# +[session] +ConnectionType=initiator +BeginString=FIX.4.2 +SenderCompID=TRADER +TargetCompID=MARKET +SocketConnectPort=7001 + +# +# Acceptor for simulating the market +# +[session] +ConnectionType=acceptor +BeginString=FIX.4.2 +SenderCompID=MARKET +TargetCompID=TRADER +SocketAcceptPort=7001 \ No newline at end of file diff --git a/quickfixj/src/test/resources/log4j2.properties b/quickfixj/src/test/resources/log4j2.properties new file mode 100644 index 000000000..7337c3a33 --- /dev/null +++ b/quickfixj/src/test/resources/log4j2.properties @@ -0,0 +1,28 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +appender.file.type = File +appender.file.name = file +appender.file.fileName = target/camel-quickfix-test.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.out.type = Console +appender.out.name = out +appender.out.layout.type = PatternLayout +appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +rootLogger.level = INFO +rootLogger.appenderRef.file.ref = file