포트폴리오/Java rest api
Reuters 환율 처리
NaHyungMin
2022. 11. 18. 16:09
작업 전 확인 * 로이터 환율은 유료입니다.
// https://mvnrepository.com/artifact/com.refinitiv.ema/ema
implementation group: 'com.refinitiv.ema', name: 'ema', version: '3.6.7.1'
ReutersApplication
public class ReutersApplication {
private static ConfigurableApplicationContext context;
public static void main(String[] args) {
context = SpringApplication.run(ReutersApplication.class, args);
log.info("시스템 시작.");
}
public static void exit() {
int exitCode = SpringApplication.exit(context, () -> 0);
System.exit(exitCode);
}
}
CommandLineAppStartupRunner
@Component
public class CommandLineAppStartupRunner implements CommandLineRunner {
@Autowired IReutersService reutersService;
@Autowired IBiztalkUtilService biztalkUtilService;
@Override
public void run(String... args) throws Exception {
try {
reutersService.run();
} catch (Exception ex) {
Log.error("예외 발생 프로세스 종료 e={}", ex.getMessage());
biztalkUtilService.sendNotification(MessageInfo.ERROR_EXCEPTION.CODE(), ex.getMessage());
ReutersApplication.exit();
}
}
}
AppClientLocation
public class AppClientLocation implements ServiceEndpointDiscoveryClient {
public String host;
public String port;
@Override
public void onSuccess(ServiceEndpointDiscoveryResp serviceEndpointResp, ServiceEndpointDiscoveryEvent event) {
for (ServiceEndpointDiscoveryInfo info:serviceEndpointResp.serviceEndpointInfoList()) {
if (info.transport().equals("tcp")) {
//print out only host and port for TCP transport type
//System.out.println(info.endpoint() + ":" + info.port());
//장애 시, 다른 포우처딩 처리, 현재 오류가 발생하지 않아서 모니터링 중
host = info.endpoint();
port = info.port();
break;
}
}
}
@Override
public void onError(String errorText, ServiceEndpointDiscoveryEvent event) {
Log.error("Failed to get endpoints : {}", errorText);
}
}
AppClientConsumer
public class AppClientConsumer implements OmmConsumerClient {
private ICompleteCallback completeCallback;
private IErrorCallback errorCallback;
public String ricName;
public String country;
public AppClientConsumer(ICompleteCallback completeCallback, IErrorCallback errorCallback) {
this.completeCallback = completeCallback;
this.errorCallback = errorCallback;
}
@Override
public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) {
if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
//decode(refreshMsg.payload().fieldList());
FieldEntry fieldEntry = refreshMsg.payload().fieldList().stream().filter(t -> t.name().equalsIgnoreCase("MID_PRICE")).findFirst().orElse(null);
if(fieldEntry != null) {
//callback
System.out.print("Ric Name : " + refreshMsg.name() + ", " + fieldEntry);
completeCallback.run(refreshMsg.name(), this.country, fieldEntry.real().asDouble(), consumerEvent.handle());
}
}
}
@Override
public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {
}
@Override
public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {
//Close
errorCallback.error("Class", consumerEvent);
}
@Override
public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {
}
@Override
public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {
}
@Override
public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {
}
}
ConsumerRTO3
private OmmConsumerConfig consumerConfig;
private AppClientLocation appClientLocation;
private ServiceEndpointDiscovery service;
public void run() {
this.consumerConfig = createAppClientLocationAndConfig();
createAppClient(this.consumerConfig);
sendInitialize();
sendRequest();
}
private OmmConsumerConfig createAppClientLocationAndConfig() {
Log.info("createAppClient Start");
this.appClientLocation = new AppClientLocation();
service = EmaFactory.createServiceEndpointDiscovery();
service.registerClient(getOption(), this.appClientLocation);
Map configDb = getServiceOption();
String resourceUrl = getResource();
OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig(resourceUrl); //EmaFactory.createOmmConsumerConfig(); //EmaFactory.createOmmConsumerConfig();
config.consumerName("Consumer_1");
config.username(this.userName);
config.password(this.password);
config.clientId(this.clientId);
config.config(configDb);
config.tunnelingKeyStorePasswd("");
Log.info("createAppClient End");
return config;
}
private void createAppClient(OmmConsumerConfig config) {
this.consumerList.clear();
AppClientConsumer appClientConsumer = new AppClientConsumer(completeCallback, errorCallback);
consumer = EmaFactory.createOmmConsumer(config, appClientConsumer);
this.consumerList.add(appClientConsumer);
}
private void sendInitialize() {
this.resultRicMap.clear();
this.reutersRicInfoEntityQueue.addAll(reutersRicInfoEntityList);
}
private void sendRequest() {
ReutersRicInfoEntity reutersRicInfoEntity = reutersRicInfoEntityQueue.poll();
if(reutersRicInfoEntity != null) {
ReqMsg req = EmaFactory.createReqMsg();
req.name(reutersRicInfoEntity.getRic());
req.serviceName("ELEKTRON_DD");
req.interestAfterRefresh(false);
AppClientConsumer appClientConsumer = new AppClientConsumer(completeCallback, errorCallback);
appClientConsumer.ricName = reutersRicInfoEntity.getRic();
appClientConsumer.country = reutersRicInfoEntity.getCountry();
long handle = consumer.registerClient(req, appClientConsumer);
ReutersRicInfoDto reutersRicInfoDto = requestRicMap.get(reutersRicInfoEntity.getRic());
reutersRicInfoDto.setHandle(handle);
Log.info("send ricName : {}, handle : {}", reutersRicInfoEntity.getRic(), handle);
}
}
private void complete(String ricName, String country, Double midPrice, long handle) {
Log.info("Ric complete : {}", ricName);
ReutersRicInfoDto reutersRicInfoDto = new ReutersRicInfoDto();
//..작업 목록
resultRicMap.put(ricName, reutersRicInfoDto);
Log.info("Ric complete resultRicMap size : {}, reutersRicInfoEntityList size : {}", resultRicMap.size(), sendRicSet.size());
if(resultRicMap.size() == sendRicSet.size()) {
Log.info("all complete");
allComplete();
timeObserver.stopRun();
service.uninitialize();
} else {
sendRequest();
}
}
private void allComplete() {
}
private void timeoutCallback() {
Log.error("Thread Time over.");
//오류 처리.
iBiztalkUtilService.sendNotification(MessageInfo.ERROR_TIME_OUT.CODE(), null);
//Send Message
ReutersApplication.exit();
}
private void errorCallback(String errText, OmmConsumerEvent consumerEvent) {
//오류 처리.
iBiztalkUtilService.sendNotification(MessageInfo.ERROR_EXCEPTION.CODE(), errText);
Log.error("errorCallback 발생");
Log.error("errorCallback : {}", CommonUtil.toJson(consumerEvent));
ReutersApplication.exit();
}
refinitiv API 문서 보면서 처리. 장애가 나거나 지정해둔 시간을 넘어서면 자동으로 종료된다.
처음에는 Rest Api로 하려고 했는데, 절반만 지원(호스트..)해서 포기.
두 번째는 EMA로 등록 후 멈춤이나, 핸들 재등록을 사용해서 처리 하려고 했는데 memory leak 발생.
그래서 console server로 만드려고 했으나, 기타 처리와 같이 일하는 주니어 동료 생각에 스프링부트를 올리고 초기화 대기할 수 있는 방법을 선택.
만들게 된 계기는... 기존 파이썬으로 되어 있는 프로그램이 자주 오류가 나서 불편하다는 내용이 많아 처음부터 만듬.