본문 바로가기

사이드프로젝트/(240808)이거왜오름?

이거왜오름?레포트생성부분 비동기(코루틴)처리하기

gpt를 통해서 api로 레포트를 가져오는부분이,동기식으로 처리되어있어서 최악의경우 각 레포트마다 10초,300초가 걸리는 문제가 있었다

물론 이 문제가 있을거라는건 알고있었다(너무뻔한 문제니까)

그래서 나중에 비동기로 변경하기쉽게 구현해뒀었고,일단 주기적으로 배치도 돌리는만큼 저정도까지 날 일은 거의없긴했지만,

그래도 저런부분은 비동기로 처리하는게 좋은만큼,비동기로 변경하기로 마음먹었다

 

일단 나는 코틀린을 사용하니 코루틴을 사용해서 비동기처리를 할 예정이다

거기다가 supervisorScope(supervisorJob)을 사용해서 어떤 한 대상에 문제가 생기더라도,무시하고 동작하게 만들생각이다

 

그렇게 어렵진않다

일단 useCase의 해당부분을 suspend로 처리해주고

interface CreateReportUseCase {
    suspend fun fetchHighReports(): List<ResponseReportDto>

    suspend fun fetchHighReport(dto: KoreanStockReportDto): ResponseReportDto
}

 

useCase를 구현한 구현서비스에 suspend를 붙인다음

    @Transactional
    override suspend fun fetchHighReports(): List<ResponseReportDto> =
        coroutineScope {
            rankFetch()
                .map { assetName ->
                    async {
                            createReport(assetName)
                    }
                }.awaitAll()
                .map { ResponseReportDto(it.getReportBody(), it.getCreateTime()) }
        }

 

 

그리고 이제 레포트생성함수에서,레포트생성부분만 async로 처리하면된다

private suspend fun createReport(assetName: String): Report =
    coroutineScope {
        if (reportCachesRepository.isCacheValid(assetName)) {
            reportCachesRepository.findOne(assetName).getMainReport()
        } else {
            val koreanStock =
                findOrCreateKoreanStockPort.findOrCreate(FindOrCreateKoreanStockDto(assetName))
            val report = async { createReportPort.createReport(koreanStock.name) } //여기
            reportCachesRepository.saveOrUpdate(report.await())
            report.await()
        }
    }

 

이제 gpt어댑터 부분에서

override suspend fun createReport(
    assetName: String,
    volatilityTime: Int,
): Report =
    withContext(Dispatchers.IO) {
        val response = fetchAsync(createReportPrompt(assetName, volatilityTime))

        responseToReport(assetName, response)
    }
private suspend fun fetchAsync(prompt: Prompt): ChatResponse =
    withContext(Dispatchers.IO) {
        val response =
            chatClient
                .prompt(prompt)
                .call()
                .chatResponse()
        response
    }

이렇게 비동기처리를 해주면된다

 

 

마지막으로 스프링은 컨트롤러에 suspend가 붙은건 알아서 자체스코프로 실행시켜주니,컨트롤러의 해당메서드에 suspend만 붙여주면된다

@RestController
@RequestMapping("/api/report")
class ReportController(
    private val reportUseCase: CreateReportUseCase,
) {
    @GetMapping("/stock/high")
    suspend fun fetchHighReports(): Result<List<ResponseReportDto>> {
        val reports =
            reportUseCase
                .fetchHighReports()

        return Result(reports)
    }

 

 

일단 이러면 비동기로 돌아가긴하는데,문제는 api처리한계다

퍼플릭시티는 분당 20회의 api호출을 지원하는데,저렇게되면 최악의경우 10초안에 30회의 입력이 들어가는게 문제다

물론 최악의경우까지 고려하면,무조건 2분이 걸리는데,배치가 있으니 저렇게 잡히진않을거고,많아봐야 변동이 3개~6개정도 걸린다고 생각하고 3개씩 처리하면 될거라고 생각했고,

이제 이걸 해결하려면 api처리량 제한을 해줘야한다,이를위해 세마포를 사용할수있다

    private val semaphore = Semaphore(3)

    @Transactional
    override suspend fun fetchHighReports(): List<ResponseReportDto> =
        coroutineScope {
            rankFetch()
                .map { assetName ->
                    async {
                        semaphore.withPermit {
                            createReport(assetName)
                        }
                    }
                }.awaitAll()
                .map { ResponseReportDto(it.getReportBody(), it.getCreateTime()) }
        }

 

이때 세마포는 클래스레벨로 잡아줘야한다,한번에 처리할수 있는 갯수는 상황에 따라서 알아서 하면된다

 

이러면 일단 동작구현은 일단락된다

이제 남은 문제는

  • 자식코루틴의 예외발생시 코루틴스코프가 끝나는현상
  • 서비스에 있는 비동기루틴이 보기싫음

두개를 더 해결해야한다

자식코루틴의 예외발생시 코루틴스코프는 전체코루틴을 종료시키는걸로 대응한다

이게싫다면 supervisorScope를 사용하면된다

 

또한 서비스에 비동기루틴이 있는게 보기싫은데,이건 확장함수를 사용해버리면된다

 

적당히 share같은 폴더에 ListExtension같은 파일을(클래스가 아니다,코틀린은 최상위 함수,즉 일급객체로서 단독으로 존재할수있다) 만들고,거기다가

package rkrk.whyprice.share.extension

import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.supervisorScope

suspend fun <T, R> List<T>.mapAsync(transform: suspend (T) -> R): List<R> =
    supervisorScope {
        this@mapAsync.map { async { transform(it) } }.awaitAll()
    }

 

이렇게 확장함수를 만들어서 사용하면된다,사용법은 그냥 map이랑 똑같이

rankFetch()
    .mapAsync {
        semaphore.withPermit { createReport(it) }
    }.map { ResponseReportDto(it.getReportBody(), it.getCreateTime()) }

 

이렇게 사용하면된다

좀 크게보면

@Service
@Transactional(readOnly = true)
class CreateReportService(
    private val createReportPort: CreateReportPort,
    private val rankFetcher: RankFetcher,
    private val reportCachesRepository: ReportCachesRepository,
    private val findOrCreateKoreanStockPort: FindOrCreateKoreanStockPort,
) : CreateReportUseCase {
    private val semaphore = Semaphore(3)

    @Transactional
    override suspend fun fetchHighReports(): List<ResponseReportDto> =
        rankFetch()
            .mapAsync {
                semaphore.withPermit { createReport(it) }
            }.map { ResponseReportDto(it.getReportBody(), it.getCreateTime()) }

 

이렇게된다

여기서 세마포도 외부로 뺄수있는데,저건 여기서 처리하는거도 나쁘지않을거같아서 유지했다