본문 바로가기
포트폴리오/Java rest api

Reuters 환율 처리

by NaHyungMin 2022. 11. 18.

작업 전 확인 * 로이터 환율은 유료입니다.

// https://mvnrepository.com/artifact/com.refinitiv.ema/ema
implementation group: 'com.refinitiv.ema', name: 'ema', version: '3.6.7.1'

 

ReutersApplication

public class ReutersApplication {
  private static ConfigurableApplicationContext context;

  public static void main(String[] args) {
    context = SpringApplication.run(ReutersApplication.class, args);

    log.info("시스템 시작.");
  }

  public static void exit() {
    int exitCode = SpringApplication.exit(context, () -> 0);
    System.exit(exitCode);
  }
}

 

CommandLineAppStartupRunner

@Component
public class CommandLineAppStartupRunner implements CommandLineRunner {
  @Autowired IReutersService reutersService;
  @Autowired IBiztalkUtilService biztalkUtilService;

  @Override
  public void run(String... args) throws Exception {
    try {
      reutersService.run();
    } catch (Exception ex) {
      Log.error("예외 발생 프로세스 종료 e={}", ex.getMessage());
      biztalkUtilService.sendNotification(MessageInfo.ERROR_EXCEPTION.CODE(), ex.getMessage());
      ReutersApplication.exit();
    }
  }
}

 

AppClientLocation

public class AppClientLocation implements ServiceEndpointDiscoveryClient {
  public String host;
  public String port;

  @Override
  public void onSuccess(ServiceEndpointDiscoveryResp serviceEndpointResp, ServiceEndpointDiscoveryEvent event) {
    for (ServiceEndpointDiscoveryInfo info:serviceEndpointResp.serviceEndpointInfoList()) {
      if (info.transport().equals("tcp")) {
        //print out only host and port for TCP transport type
        //System.out.println(info.endpoint() + ":" + info.port());

        //장애 시, 다른 포우처딩 처리, 현재 오류가 발생하지 않아서 모니터링 중
        host = info.endpoint();
        port = info.port();
        break;
      }
    }
  }

  @Override
  public void onError(String errorText, ServiceEndpointDiscoveryEvent event) {
    Log.error("Failed to get endpoints : {}", errorText);
  }
}

 

AppClientConsumer

public class AppClientConsumer implements OmmConsumerClient {

  private ICompleteCallback completeCallback;
  private IErrorCallback errorCallback;
  public String ricName;
  public String country;

  public AppClientConsumer(ICompleteCallback completeCallback, IErrorCallback errorCallback) {
    this.completeCallback = completeCallback;
    this.errorCallback = errorCallback;
  }

  @Override
  public void onRefreshMsg(RefreshMsg refreshMsg, OmmConsumerEvent consumerEvent) {
    if (DataType.DataTypes.FIELD_LIST == refreshMsg.payload().dataType()){
      //decode(refreshMsg.payload().fieldList());

      FieldEntry fieldEntry = refreshMsg.payload().fieldList().stream().filter(t -> t.name().equalsIgnoreCase("MID_PRICE")).findFirst().orElse(null);

      if(fieldEntry != null) {
        //callback
        System.out.print("Ric Name : " + refreshMsg.name() + ", " + fieldEntry);
        completeCallback.run(refreshMsg.name(), this.country, fieldEntry.real().asDouble(), consumerEvent.handle());
      }
    }
  }

  @Override
  public void onUpdateMsg(UpdateMsg updateMsg, OmmConsumerEvent consumerEvent) {

  }

  @Override
  public void onStatusMsg(StatusMsg statusMsg, OmmConsumerEvent consumerEvent) {
    //Close
    errorCallback.error("Class", consumerEvent);
  }

  @Override
  public void onGenericMsg(GenericMsg genericMsg, OmmConsumerEvent consumerEvent) {

  }

  @Override
  public void onAckMsg(AckMsg ackMsg, OmmConsumerEvent consumerEvent) {

  }

  @Override
  public void onAllMsg(Msg msg, OmmConsumerEvent consumerEvent) {

  }
}

 

ConsumerRTO3

  private OmmConsumerConfig consumerConfig;
  private AppClientLocation appClientLocation;
  private ServiceEndpointDiscovery service;

public void run() {
    this.consumerConfig = createAppClientLocationAndConfig();
    createAppClient(this.consumerConfig);
    sendInitialize();
    sendRequest();
  }
  
  private OmmConsumerConfig createAppClientLocationAndConfig() {
    Log.info("createAppClient Start");

    this.appClientLocation = new AppClientLocation();

    service = EmaFactory.createServiceEndpointDiscovery();
    service.registerClient(getOption(), this.appClientLocation);

    Map configDb = getServiceOption();
    String resourceUrl = getResource();

    OmmConsumerConfig config = EmaFactory.createOmmConsumerConfig(resourceUrl); //EmaFactory.createOmmConsumerConfig(); //EmaFactory.createOmmConsumerConfig();
    config.consumerName("Consumer_1");
    config.username(this.userName);
    config.password(this.password);
    config.clientId(this.clientId);
    config.config(configDb);
    config.tunnelingKeyStorePasswd("");

    Log.info("createAppClient End");

    return config;
  }
  
  private void createAppClient(OmmConsumerConfig config) {
    this.consumerList.clear();

    AppClientConsumer appClientConsumer = new AppClientConsumer(completeCallback, errorCallback);
    consumer = EmaFactory.createOmmConsumer(config, appClientConsumer);

    this.consumerList.add(appClientConsumer);
  }
  
   private void sendInitialize() {
    this.resultRicMap.clear();
    this.reutersRicInfoEntityQueue.addAll(reutersRicInfoEntityList);
  }
  
   private void sendRequest() {
    ReutersRicInfoEntity reutersRicInfoEntity = reutersRicInfoEntityQueue.poll();

    if(reutersRicInfoEntity != null) {
      ReqMsg req = EmaFactory.createReqMsg();
      req.name(reutersRicInfoEntity.getRic());
      req.serviceName("ELEKTRON_DD");
      req.interestAfterRefresh(false);

      AppClientConsumer appClientConsumer = new AppClientConsumer(completeCallback, errorCallback);
      appClientConsumer.ricName = reutersRicInfoEntity.getRic();
      appClientConsumer.country = reutersRicInfoEntity.getCountry();

      long handle = consumer.registerClient(req, appClientConsumer);
      ReutersRicInfoDto reutersRicInfoDto = requestRicMap.get(reutersRicInfoEntity.getRic());
      reutersRicInfoDto.setHandle(handle);

      Log.info("send ricName : {}, handle : {}", reutersRicInfoEntity.getRic(), handle);
    }
  }
  
  
  private void complete(String ricName, String country, Double midPrice, long handle) {
    Log.info("Ric complete : {}", ricName);

    ReutersRicInfoDto reutersRicInfoDto = new ReutersRicInfoDto();
    //..작업 목록

    resultRicMap.put(ricName, reutersRicInfoDto);
    Log.info("Ric complete resultRicMap size : {}, reutersRicInfoEntityList size : {}", resultRicMap.size(), sendRicSet.size());

    if(resultRicMap.size() == sendRicSet.size()) {
      Log.info("all complete");

      allComplete();
      timeObserver.stopRun();
      service.uninitialize();
    } else {
      sendRequest();
    }
  }
  
  private void allComplete() {
  }
  
  private void timeoutCallback() {
    Log.error("Thread Time over.");

    //오류 처리.
    iBiztalkUtilService.sendNotification(MessageInfo.ERROR_TIME_OUT.CODE(), null);
    //Send Message
    ReutersApplication.exit();
  }

  private void errorCallback(String errText, OmmConsumerEvent consumerEvent) {
    //오류 처리.
    iBiztalkUtilService.sendNotification(MessageInfo.ERROR_EXCEPTION.CODE(), errText);
    Log.error("errorCallback 발생");
    Log.error("errorCallback : {}", CommonUtil.toJson(consumerEvent));
    ReutersApplication.exit();
  }

refinitiv API 문서 보면서 처리. 장애가 나거나 지정해둔 시간을 넘어서면 자동으로 종료된다.

처음에는 Rest Api로 하려고 했는데, 절반만 지원(호스트..)해서 포기.

두 번째는 EMA로 등록 후 멈춤이나, 핸들 재등록을 사용해서 처리 하려고 했는데 memory leak 발생.

그래서 console server로 만드려고 했으나, 기타 처리와 같이 일하는 주니어 동료 생각에 스프링부트를 올리고 초기화 대기할 수 있는 방법을 선택.

 

만들게 된 계기는... 기존 파이썬으로 되어 있는 프로그램이 자주 오류가 나서 불편하다는 내용이 많아 처음부터 만듬.

'포트폴리오 > Java rest api' 카테고리의 다른 글

중계 서버  (0) 2022.11.18
rest api filter에서 header, body 변경  (0) 2022.09.07
rest api + jpa(jpql) + replica set  (0) 2022.08.02
자바 날씨 API  (0) 2021.06.09
Java 기본 Rest api  (0) 2020.08.11