거래량 상위 30개의 레포트를 요청하면,대략 2분정도걸리면서 타임아웃이 나는 문제가 있었다
로직이 for돌리면서 하나씩 레포트를 생성해서 하는거라서 그렇다
이걸 해결하는 방법으로 생각나는건
- 비동기처리(코루틴사용)
- 배치로 미리 만들어두기
- 캐시만료를 없애고 무조건 마지막캐시레포트를 보내고,보낸뒤에 갱신요청
이렇게 3가지가 생각났다
캐시만료를 없애는건 딱봐도 문제가 많이생길거같아(하루전 레포트가 간다거나) 제외,
비동기처리는 괜찮아보이는데,외부api를 사용해야하는 로직 특성상 한ip에서 갑자기 부하를 잔뜩걸어버리면 일정시간 ip밴을 때릴 가능성이 있어보이고,도큐먼트상에서도 분당 20회의 요청을 처리할수있다 라고 적혀있길래 제외
결국 소거법으로 배치로 처리하기로 했다
배치 구조는
- api reader로 한투api에 접근해서 주식이름리스트 가져오기
- gpt processor로 주식이름으로 레포트 만들기
- jpa writer로 만들어진 레포트 저장하기
이렇게 진행되고,웹+di로 스프링을 쓸때 다른거대신 배치를 사용하면 좋은건,이렇게 원래 가지고있던 코드실행을 분리하는경우,그냥 복붙해서 약간 수정해서 쓰면 된다는거에 있다
그래서 필요한코드들을 다 가져와서,상속,의존관계나 이름정도 조금 건드려서 사용하면 된다
일단 apiReader부터 만들면
class HighRiserReader(
private val highRisersFetcher: HighRisersFetcher,
) : ItemReader<String> {
private var nextIndex = 0
private var data: List<String> = emptyList()
override fun read(): String? {
if (data.isEmpty()) {
data = fetchData()
}
return data.getOrNull(nextIndex++)
}
private fun fetchData(): List<String> = highRisersFetcher.fetch()
}
이런식으로 ItemReader을 상속받아서,read를 실행시키면 된다
이때 주의할건,read는 하나씩 다음거로 던져줘야해서 첫실행때 data를 만들어야하고,인덱스도 따로관리하면서 하나씩 던져주고,만약 배열크기보다 더 많이 요구하면 null을 던져서 일을 끝내주면된다
여기서 HighRisersFetcher은 상위거래량조회api클래스고,그냥 why_price에서 가져왔다
다음으로는 CreateReportProcessor다
class CreateReportProcessor(
private val repository: ReportCachesRepository,
private val createReportPort: CreateReportPort,
) : ItemProcessor<String, Report> {
override fun process(item: String): Report? {
if (repository.isCacheValid(item)) {
return null
}
return createReport(item)
}
private fun createReport(stockName: String): Report = createReportPort.createReport(stockName)
}
이거도 그냥 원본을 가져와서 ItemProcessor을 구현해서,캐시체크하고 캐시유효하면 null,유효하지않으면 생성하면된다
ReportCacheWriter도
class ReportCacheWriter(
private val repository: ReportCachesRepository,
) : ItemWriter<Report> {
override fun write(chunk: Chunk<out Report>) {
chunk.forEach { repository.saveOrUpdate(it) }
}
}
writer은 청크단위(3개,5개 이런단위)로 처리되고,저단위로 트랜잭션이 걸리고 실패시 저단위로 재시작된다는거만 주의하면 별차이없다
그리고 다만들었으면 테스트를 만들어서 돌려보고,Config에 빈등록해주면 알아서 동작한다
이때 잡과 스탭을 만들어야하는데,스탭은 저런 ItemProcessor같은것들을 묶어둔 스크립트정도라고 생각하면되고,잡은 그런 스탭들을 묶어둔 스크립트라고 생각하면된다
즉 ItemProcessor같은게 어댑터같은거고,스탭은 서비스,잡은 컨트롤러라고 생각하면 대충 맞을거같다
일단 프로세서리더라이터를 먼저 빈으로 등록하고,스탭단위에서 조회되게 @StepScope를 붙여주자
@StepScope
@Bean
fun highRiserReader(highRisersFetcher: HighRisersFetcher): ItemReader<String> = HighRiserReader(highRisersFetcher)
@Bean
fun highRisersFetcher(apiHelper: ApiHelper): HighRisersFetcher = HighRisersFetcher(apiHelper)
@StepScope
@Bean
fun createReportProcessor(
reportCachesRepository: ReportCachesRepository,
createReportPort: CreateReportPort,
): ItemProcessor<String, Report> = CreateReportProcessor(reportCachesRepository, createReportPort)
@StepScope
@Bean
fun reportCacheWriter(reportCachesRepository: ReportCachesRepository): ItemWriter<Report> = ReportCacheWriter(reportCachesRepository)
그리고 스탭을 등록하고,이땐 잡에서 검색할수있게 @JobScope를 붙여주면된다
@JobScope
@Bean
fun highRiserStep(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager,
highRiserReader: ItemReader<String>,
createReportProcessor: ItemProcessor<String, Report>,
reportCacheWriter: ItemWriter<Report>,
): Step =
StepBuilder("highRiserStep", jobRepository)
.chunk<String, Report>(3, transactionManager)
.reader(highRiserReader)
.processor(createReportProcessor)
.writer(reportCacheWriter)
.build()
여기서 다른건 다 이름그대로고,StepBuilder.chunk의 첫번째 매개변수가 청크의 크기,즉 트랜잭션의 크기라는거만 알면된다
잡은
@Bean
fun highRiserJob(
jobRepository: JobRepository,
highRiserStep: Step,
): Job =
JobBuilder("highRiserJob", jobRepository)
.incrementer(RunIdIncrementer())
.start(highRiserStep)
.build()
그냥 잡빌더로 생성해서 빈등록하면된다
이때 주의해야할건,스프링배치는 잡이름과 잡 파라미터가 동일한 잡은,중복실행이라고 보고 실행시키지않고 패스한다
이걸 막기위해서
.incrementer(RunIdIncrementer())
를 넣어서 id를 자동으로 1씩 증가시켜서 중복실행이 가능하게 만들어야한다
만약 이미 저거없이 한번 실행을 시켰을경우,저걸넣어도 안되니 스프링배치 테이블을 지우고 다시하던지,잡이름을 바꿔보던지,아니면 의미없는 현재시간잡파라미터를 추가하는걸 넣던지 하면된다
그리고 실행을 돌리면 스프링이 알아서 빈등록된 잡을 실행시킨다
이떄,만약 api때문에 restTemplate나 이런 라이브러리들 받는다고 spring web을 의존성에 추가했으면 웹이 켜질수있는데,이땐 application.yaml에서
spring:
main:
web-application-type: none
추가해주면 웹이 켜지지않고 배치만 실행되고
BadSqlGrammarException이 뜨고,테이블이 없다고 그러는거면
spring:
batch:
jdbc:
initialize-schema: always
를 추가해주면 스프링이 알아서 db에 메타데이터테이블을 만들어서 사용한다
뭐..이정도면 된거같은데 마지막으로 하나만 추가하자면
배치는 특성상 외부와 거의 무조건 연결되어있다
이때 당연히 외부와 연결되어있는 특성상 외부의 영향에 취약한데,그래서 외부의 영향으로 예외가 발생했을때 이걸 해결하는 로직을 넣어두는게 좋다
일반적으로는
@JobScope
@Bean
fun highRiserStep(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager,
highRiserReader: HighRiserReader,
createReportProcessor: CreateReportProcessor,
reportCacheWriter: ReportCacheWriter,
): Step =//주의,이코드 그대로 사용하면안됨
StepBuilder("highRiserStep", jobRepository)
.chunk<String, Report>(3, transactionManager)
.reader(highRiserReader)
.processor(createReportProcessor)
.writer(reportCacheWriter)
.faultTolerant()
.skip(NonTransientAiException::class.java)
.skipLimit(3)
.listener(NonTransientExceptionListener::class.java)
.retry(TransientAiException::class.java)
.retryLimit(3)
.listener(TransientExceptionListener::class.java)
.build()
이렇게 반복해서성공가능한것과 반복해서 성공불가능한걸 구분해서 사용하면되지만,
스프링ai의 동작방식을 알필요가 있다
스프링 ai는 통신예외를 NonTransientAiException과 TransientAiException으로 분류한다
이름그대로 NonTransientAiException은 반복불가능한예외,즉 최대토큰수보다 더 큰 토큰이 들어갔거나 했을때 나는 예외고,http로 치면 400에러라고 볼수있다,즉 반복해서 보내도 무조건 실패하는경우에 나는 예외다
TransientAiException은 일반적인 통신예외,즉 저쪽의 서버에 문제가 생겼을때 나오는 예외고,http로 치면 500에러라고 볼수있다,즉 저쪽서버문제라서 반복해서 보내면 성공할수도 있는 예외다
스프링ai는 기본적으로 TransientAiException의 경우 자체적으로 재시도를 한다(github코드링크)
@Bean
@ConditionalOnMissingBean
public RetryTemplate retryTemplate(SpringAiRetryProperties properties) {
return RetryTemplate.builder()
.maxAttempts(properties.getMaxAttempts())
.retryOn(TransientAiException.class) //여기
.exponentialBackoff(properties.getBackoff().getInitialInterval(), properties.getBackoff().getMultiplier(),
properties.getBackoff().getMaxInterval())
.withListener(new RetryListener() {
@Override
public <T extends Object, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
logger.warn("Retry error. Retry count:" + context.getRetryCount(), throwable);
};
})
.build();
}
SpringAiRetryAutoConfiguration 의 retryTemplate메서드의 retryOn을 보면 알수있다
또한 재시도할때 로깅도 여기서 처리한다
또한 기본재시도횟수는
public class RetryTemplate implements RetryOperations {
private static final String GLOBAL_STATE = "state.global";
protected final Log logger = LogFactory.getLog(this.getClass());
private volatile BackOffPolicy backOffPolicy = new NoBackOffPolicy();
private volatile RetryPolicy retryPolicy = new SimpleRetryPolicy(3); //여기
private volatile RetryListener[] listeners = new RetryListener[0];
private RetryContextCache retryContextCache = new MapRetryContextCache();
private boolean throwLastExceptionOnExhausted;
여기서 3번으로 잡혀있다
그러니 배치에서 재시도를 할 필요가 없는것이다
만약 로깅이 추가적으로 필요하거나,추가동작이 필요하다면
@AutoConfiguration
@ConditionalOnClass(RetryTemplate.class)
@EnableConfigurationProperties({ SpringAiRetryProperties.class })
public class SpringAiRetryAutoConfiguration {
private static final Logger logger = LoggerFactory.getLogger(SpringAiRetryAutoConfiguration.class);
@Bean
@ConditionalOnMissingBean
public RetryTemplate retryTemplate(SpringAiRetryProperties properties) {
return RetryTemplate.builder()
.maxAttempts(properties.getMaxAttempts())
.retryOn(TransientAiException.class)
.exponentialBackoff(properties.getBackoff().getInitialInterval(), properties.getBackoff().getMultiplier(),
properties.getBackoff().getMaxInterval())
.withListener(new RetryListener() {
@Override
public <T extends Object, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
logger.warn("Retry error. Retry count:" + context.getRetryCount(), throwable);
};
})
.build();
}
아까본 이클래스를 직접만들어서 configration에 수동등록해주면될거같다
난 기본동작으로도 만족하기때문에 거기까지 하진 않을거다
즉 아까 step에서,retry관련을 제외하고 skip만 남겨두면된다
사실 배치특성상 500에러가 뜰일이 있을까싶긴하지만,혹시모르니까
@JobScope
@Bean
fun highRiserStep(
jobRepository: JobRepository,
transactionManager: PlatformTransactionManager,
highRiserReader: HighRiserReader,
createReportProcessor: CreateReportProcessor,
reportCacheWriter: ReportCacheWriter,
): Step =
StepBuilder("highRiserStep", jobRepository)
.chunk<String, Report>(3, transactionManager)
.reader(highRiserReader)
.processor(createReportProcessor)
.writer(reportCacheWriter)
.faultTolerant()
.skip(NonTransientAiException::class.java)
.skipLimit(3)
.listener(NonTransientExceptionListener::class.java)
.build()
이러고 예외가 떴을때 로깅하는 NonTransientExceptionListener를 만들어주면되는데
class NonTransientExceptionListener : SkipListener<String, Report> {
private val log = LoggerFactory.getLogger(this::class.java)
override fun onSkipInProcess(
item: String,
t: Throwable,
) {
log.warn("반복불가능한 예외발생 item={$item} ")
super.onSkipInProcess(item, t)
}
}
이렇게 상속받아서 로깅하고 super을 호출하는식으로 하면된다
이러면 대충 적당히 끝난거같다
'사이드프로젝트 > (240808)이거왜오름?' 카테고리의 다른 글
이거왜오름?레포트캐시 스프링배치 도커라이징하고 스케줄링하기 (3) | 2024.10.08 |
---|---|
이거왜오름?웹앱 도커라이징하기 (0) | 2024.10.06 |
이거왜오름? 헥사고날 리팩터링 (0) | 2024.09.18 |
이거왜오름? 테스트 작성 (0) | 2024.09.12 |
이거왜오름?컨트롤러서비스레포지토리 추가 (2) | 2024.09.01 |