대략적인 프로그램의 흐름을 구상했다. 나의 생각은 일단
1. 모든 코인을 탐색하며 1차적인 조건을 만족하는(거래를 시작할) 코인을 검색
2. 해당 코인을 팔로우하며 조건을 실시간 계산, 특정 조건 부합 시 구매 요청
* 2. 를 구현할 때 멀티프로세스/멀티쓰레드/비동기(asyncio)를 고려하였고, 내가 구상한 조건에는 비동기(asyncio)를 활용한 방식이 가장 적합하다고 함.
이다. 조금 더 구체화해서 기초적인 프로세스 흐름 다이어그램을 그려보았다.
WebSocket으로 전부 사용하려 하였으나, [과거 n개의 분봉 조회] 기능은 Rest API를 통해서만 제공하는 것으로 확인했다.
나는 모든 코인을 계속 조회하면서 조건에 맞는 코인을 찾으면 비동기적으로 거래를 시작하게끔 할 생각이기에, 모든 코인을 Rest API를 활용해 조회하여야 한다. 이때에는 멀티스레드를 사용해 병렬처리하는 것이 맞다고 생각했다. (비동기적 거래 실행부분에서 조건 확인을 위해 분봉 조회 부분은 따로 추가 조회를 할 필요 없이, 메인에서 지속적으로 조회 중인 자료를 가져다 쓰면 될 것 같다)
Rest API로 분봉을 조회하는 것은 QUOTATION API이고 한도는 초당 10회이다. 종목이 150여 개정도 되기에, 스레드를 몇 개 사용하는 게 최적인지 테스트해 보기로 했다. 서버 CPU의 자원에서 병목이 발생하지 않는다는 가정하에, Rest API 분봉 조회 시 스레드 개수별 소요시간과 오류 발생 빈도를 테스트하였을 때 결과는 다음과 같다.
쓰레드 수 | 총 소요시간 | HTTP 429 오류 발생 빈도 |
1 | 32.5796 | 발생안함 |
2 | 17.9258 | 거의 발생안함 |
3 | 16.0706 | 높음 |
4 | 16.5046 | 매우 높음 |
5 | 16.3677 | 매우 높음 |
분봉을 조회하는데 32초가 소모되는 것은 리스크가 크다고 생각하고, 업비트 사이트 정보에 의하면 HTTP 429 오류 발생으로 인한 페널티는 존재하지 않는다. 따라서 2개의 스레드를 사용하는 게 가장 효율적이라고 생각한다.
import requests
import concurrent.futures
import certifi
import time
# 업비트 API 엔드포인트
MARKET_URL = "https://api.upbit.com/v1/market/all"
CANDLES_URL = "https://api.upbit.com/v1/candles/minutes/1"
# 모든 코인 티커 가져오기
def get_all_tickers():
response = requests.get(MARKET_URL, verify=certifi.where()) # ✅ 최신 CA 인증서 적용
if response.status_code == 200:
markets = response.json()
return [market["market"] for market in markets if market["market"].startswith("KRW-")]
else:
print("Error fetching tickers:", response.status_code)
return []
# 특정 코인의 1분봉 100개 데이터 가져오기 (429 오류 처리 추가)
def get_minute_candles(ticker):
params = {"market": ticker, "count": 100}
response = requests.get(CANDLES_URL, params=params, verify=certifi.where())
if response.status_code == 200:
print(f"{ticker} Candles fetched")
return ticker, response.json()
elif response.status_code == 429:
print(f"Error fetching candles for {ticker}: 429. Retry...")
return get_minute_candles(ticker,)
else:
print(f"Error fetching candles for {ticker}: {response.status_code}")
return ticker, None
# 병렬로 모든 코인의 분봉 데이터 가져오기
def get_all_coins_minute_candles():
tickers = get_all_tickers()
all_candles = {}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = {executor.submit(get_minute_candles, ticker): ticker for ticker in tickers}
for future in concurrent.futures.as_completed(futures):
ticker, candles = future.result()
if candles:
all_candles[ticker] = candles
return all_candles
def measure_execution_time(func, *args, **kwargs):
start_time = time.perf_counter() # 시작 시간 기록
result = func(*args, **kwargs) # 함수 실행
end_time = time.perf_counter() # 종료 시간 기록
elapsed_time = end_time - start_time # 실행 시간 계산
print(f"Execution Time: {elapsed_time:.4f} seconds")
return result # 함수 실행 결과 반환
if __name__ == "__main__":
all_data = measure_execution_time(get_all_coins_minute_candles) # 실행 시간 측정
print("Total coins fetched:", len(all_data))
2025.02.01 - [자동매매/Upbit] - 파이썬으로 업비트 자동매매 봇 만들기 4 - Tulip Indicators
파이썬으로 업비트 자동매매 봇 만들기 4 - Tulip Indicators
프로그램의 큰 갈래를 잡고 세부적인 내용을 조율하고 있던 도중 필요한 지표들을 직접 계산할 필요 없이 라이브러리를 활용하면 된다는 것을 찾았다. TA, TA-Lib, pandas(수동 계산), Tulip Indicators 등
chabin37.tistory.com
'API Transaction > Upbit' 카테고리의 다른 글
[파이썬] 업비트 자동매매 봇 만들기 6 - 거래중인 코인 갯수 유지(무분별한 코인 거래 방지) (1) | 2025.02.10 |
---|---|
[파이썬] 업비트 자동매매 봇 만들기 5 - asyncio, async, await, 비동기 작업 (0) | 2025.02.08 |
[Python] 업비트 자동매매 봇 만들기 4 - Tulip Indicators (1) | 2025.02.01 |
[Python] 업비트 자동매매 봇 만들기 2 - JSON통신 압축 (0) | 2025.01.29 |
[Python] 업비트 자동매매 봇 만들기 1 - 업비트의 통신 방식 (0) | 2025.01.29 |