Files
stocksearch/services/kiwoom_ws_service.go
2026-03-31 19:32:59 +09:00

601 lines
16 KiB
Go

package services
import (
"encoding/json"
"fmt"
"log"
"stocksearch/models"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/websocket"
)
const (
kiwoomWSURL = "wss://api.kiwoom.com:10000/api/dostk/websocket"
writeTimeout = 10 * time.Second // 쓰기 타임아웃
)
// KiwoomWSClient 키움증권 실시간 WebSocket 클라이언트
type KiwoomWSClient struct {
tokenService *TokenService
conn *websocket.Conn
mu sync.Mutex // conn 보호 + 직렬화된 쓰기 보장
// 현재 구독 중인 종목 코드 집합 (재연결 시 복구용)
subscribed map[string]bool
// REG 배치 전송용: 짧은 시간 내 요청을 모아 단일 REG로 발송
pendingReg []string
regTimer *time.Timer
// 실시간 데이터 수신 콜백
onPrice func(price *models.StockPrice)
onOrderBook func(ob *models.OrderBook)
onProgram func(pg *models.ProgramTrading)
onMarketStatus func(ms *models.MarketStatus)
onMeta func(meta *models.StockMeta)
}
var kiwoomWSOnce sync.Once
var kiwoomWSSvc *KiwoomWSClient
// GetKiwoomWSClient KiwoomWS 클라이언트 싱글턴 반환
func GetKiwoomWSClient(onPrice func(*models.StockPrice)) *KiwoomWSClient {
kiwoomWSOnce.Do(func() {
kiwoomWSSvc = &KiwoomWSClient{
tokenService: GetTokenService(),
subscribed: make(map[string]bool),
onPrice: onPrice,
}
})
return kiwoomWSSvc
}
// SetCallbacks 추가 실시간 데이터 콜백 등록
func (k *KiwoomWSClient) SetCallbacks(
onOrderBook func(*models.OrderBook),
onProgram func(*models.ProgramTrading),
onMarketStatus func(*models.MarketStatus),
onMeta func(*models.StockMeta),
) {
k.mu.Lock()
defer k.mu.Unlock()
k.onOrderBook = onOrderBook
k.onProgram = onProgram
k.onMarketStatus = onMarketStatus
k.onMeta = onMeta
}
// Connect 키움 WS 서버에 연결 후 읽기 루프 시작
func (k *KiwoomWSClient) Connect() error {
conn, err := k.dial()
if err != nil {
return err
}
k.mu.Lock()
k.conn = conn
k.mu.Unlock()
stopCh := make(chan struct{})
go k.readLoop(conn, stopCh)
log.Println("키움 WS 연결 완료")
return nil
}
// dial WSS 연결 수립 후 로그인 패킷 전송
func (k *KiwoomWSClient) dial() (*websocket.Conn, error) {
// HTTP 헤더 없이 연결 (키움 WS는 헤더 인증 불필요)
conn, _, err := websocket.DefaultDialer.Dial(kiwoomWSURL, nil)
if err != nil {
return nil, err
}
// 연결 직후 로그인 패킷 전송 (Bearer 없이 token만)
loginMsg := map[string]string{
"trnm": "LOGIN",
"token": k.tokenService.GetToken(),
}
data, _ := json.Marshal(loginMsg)
conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
conn.Close()
return nil, err
}
// 로그인 응답 대기
conn.SetReadDeadline(time.Now().Add(10 * time.Second))
_, resp, err := conn.ReadMessage()
if err != nil {
conn.Close()
return nil, err
}
var loginResp struct {
Trnm string `json:"trnm"`
ReturnCode int `json:"return_code"`
ReturnMsg string `json:"return_msg"`
}
if err := json.Unmarshal(resp, &loginResp); err != nil {
conn.Close()
return nil, fmt.Errorf("로그인 응답 파싱 실패: %w", err)
}
if loginResp.Trnm != "LOGIN" || loginResp.ReturnCode != 0 {
conn.Close()
return nil, fmt.Errorf("키움 WS 로그인 실패 [%d]: %s", loginResp.ReturnCode, loginResp.ReturnMsg)
}
// 읽기 데드라인 초기화 (readLoop에서 관리)
conn.SetReadDeadline(time.Time{})
// 장운영 상태(0s) 글로벌 구독 (item 빈 문자열)
k.sendMarketStatusReg(conn)
log.Println("키움 WS 로그인 성공")
return conn, nil
}
// sendMarketStatusReg 장운영 상태(0s) 구독 전송 (item="", 전역)
func (k *KiwoomWSClient) sendMarketStatusReg(conn *websocket.Conn) {
msg := map[string]interface{}{
"trnm": "REG",
"grp_no": "1",
"refresh": "1",
"data": []map[string]interface{}{
{"item": []string{""}, "type": []string{"0s"}},
},
}
data, _ := json.Marshal(msg)
conn.SetWriteDeadline(time.Now().Add(writeTimeout))
if err := conn.WriteMessage(websocket.TextMessage, data); err != nil {
log.Printf("키움 WS 장운영상태 구독 실패: %v", err)
}
}
// SubscribePair KRX + NXT 종목을 debounce 배치 REG로 구독
// 200ms 내 여러 호출이 들어오면 하나의 REG 메시지로 묶어 전송
func (k *KiwoomWSClient) SubscribePair(code string) {
k.mu.Lock()
defer k.mu.Unlock()
nxt := code + "_NX"
if !k.subscribed[code] {
k.subscribed[code] = true
k.pendingReg = append(k.pendingReg, code)
}
if !k.subscribed[nxt] {
k.subscribed[nxt] = true
k.pendingReg = append(k.pendingReg, nxt)
}
k.scheduleFlush()
}
// scheduleFlush REG 배치 전송 타이머 설정 (mu 보유 상태에서 호출)
// 연속 호출 시 타이머를 초기화해 마지막 호출로부터 200ms 후 한 번만 전송
func (k *KiwoomWSClient) scheduleFlush() {
if k.regTimer != nil {
k.regTimer.Stop()
}
k.regTimer = time.AfterFunc(200*time.Millisecond, k.flushPendingReg)
}
// flushPendingReg 누적된 구독 코드를 단일 REG 메시지로 전송
func (k *KiwoomWSClient) flushPendingReg() {
k.mu.Lock()
defer k.mu.Unlock()
if len(k.pendingReg) == 0 {
return
}
codes := k.pendingReg
k.pendingReg = nil
k.sendRegBatch(codes, "1")
log.Printf("키움 WS 배치 구독 전송 (%d개): %v", len(codes), codes)
}
// UnsubscribePair KRX + NXT 종목을 단일 REMOVE 메시지로 동시 해제
func (k *KiwoomWSClient) UnsubscribePair(code string) {
k.mu.Lock()
defer k.mu.Unlock()
nxt := code + "_NX"
var toRemove []string
if k.subscribed[code] {
delete(k.subscribed, code)
toRemove = append(toRemove, code)
}
if k.subscribed[nxt] {
delete(k.subscribed, nxt)
toRemove = append(toRemove, nxt)
}
if len(toRemove) > 0 {
k.sendRemoveBatch(toRemove)
log.Printf("키움 WS 구독 해제: %v", toRemove)
}
}
// sendRegBatch 여러 종목을 단일 REG 메시지로 배치 전송 (mu 보유 상태에서 호출)
// 0B: KRX + NXT 전체, 0D/0H/0w/0g: KRX 코드만
func (k *KiwoomWSClient) sendRegBatch(codes []string, refresh string) {
// KRX 전용 코드 추출 (_NX 등 접미사 없는 코드)
krxCodes := filterKRXCodes(codes)
dataItems := []map[string]interface{}{
{"item": codes, "type": []string{"0B"}},
}
if len(krxCodes) > 0 {
dataItems = append(dataItems, map[string]interface{}{
"item": krxCodes,
"type": []string{"0D", "0H", "0w", "0g"},
})
}
msg := map[string]interface{}{
"trnm": "REG",
"grp_no": "1",
"refresh": refresh,
"data": dataItems,
}
if err := k.write(msg); err != nil {
log.Printf("키움 WS 구독 전송 실패: %v", err)
}
}
// sendRemoveBatch 여러 종목을 단일 REMOVE 메시지로 배치 전송 (mu 보유 상태에서 호출)
func (k *KiwoomWSClient) sendRemoveBatch(codes []string) {
krxCodes := filterKRXCodes(codes)
dataItems := []map[string]interface{}{
{"item": codes, "type": []string{"0B"}},
}
if len(krxCodes) > 0 {
dataItems = append(dataItems, map[string]interface{}{
"item": krxCodes,
"type": []string{"0D", "0H", "0w", "0g"},
})
}
msg := map[string]interface{}{
"trnm": "REMOVE",
"grp_no": "1",
"data": dataItems,
}
if err := k.write(msg); err != nil {
log.Printf("키움 WS 구독해제 전송 실패: %v", err)
}
}
// filterKRXCodes 접미사 없는 KRX 코드만 반환
func filterKRXCodes(codes []string) []string {
var krx []string
for _, c := range codes {
if !strings.Contains(c, "_") {
krx = append(krx, c)
}
}
return krx
}
// write JSON 메시지 전송 (mu 보유 상태에서 호출)
func (k *KiwoomWSClient) write(v interface{}) error {
if k.conn == nil {
return nil
}
data, _ := json.Marshal(v)
k.conn.SetWriteDeadline(time.Now().Add(writeTimeout))
return k.conn.WriteMessage(websocket.TextMessage, data)
}
// readLoop 키움 WS 메시지 수신 루프
func (k *KiwoomWSClient) readLoop(conn *websocket.Conn, stopCh chan struct{}) {
defer func() {
close(stopCh)
k.mu.Lock()
if k.conn == conn {
k.conn = nil
}
k.mu.Unlock()
conn.Close()
log.Println("키움 WS 연결 끊김, 재연결 시도...")
go k.reconnect()
}()
for {
_, data, err := conn.ReadMessage()
if err != nil {
if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
log.Printf("키움 WS 읽기 오류: %v", err)
}
return
}
k.handleMessage(data)
}
}
// handleMessage 수신 메시지 파싱 및 콜백 호출
func (k *KiwoomWSClient) handleMessage(data []byte) {
var msg struct {
Trnm string `json:"trnm"`
Data []struct {
Type string `json:"type"`
Item string `json:"item"`
Values map[string]string `json:"values"`
} `json:"data"`
ReturnCode int `json:"return_code"`
ReturnMsg string `json:"return_msg"`
}
if err := json.Unmarshal(data, &msg); err != nil {
return
}
switch msg.Trnm {
case "PING":
// 키움 서버 PING 수신 → PONG 응답 전송
k.mu.Lock()
_ = k.write(map[string]string{"trnm": "PONG"})
k.mu.Unlock()
case "REG", "REMOVE":
if msg.ReturnCode != 0 {
log.Printf("키움 WS %s 오류: %s", msg.Trnm, msg.ReturnMsg)
}
case "REAL":
for _, d := range msg.Data {
switch d.Type {
case "0B":
if k.onPrice != nil {
k.onPrice(parseRealPrice(d.Item, d.Values))
}
case "0D":
if k.onOrderBook != nil {
k.onOrderBook(parseOrderBook(d.Item, d.Values))
}
case "0H":
// 예상체결 데이터 → StockPrice 형식으로 통합
if k.onPrice != nil {
k.onPrice(parseExpectedPrice(d.Item, d.Values))
}
case "0w":
if k.onProgram != nil {
k.onProgram(parseProgramTrading(d.Item, d.Values))
}
case "0s":
if k.onMarketStatus != nil {
k.onMarketStatus(parseMarketStatus(d.Values))
}
case "0g":
if k.onMeta != nil {
k.onMeta(parseStockMeta(d.Item, d.Values))
}
}
}
}
}
// reconnect 재연결 및 기존 구독 복구 (지수 백오프)
func (k *KiwoomWSClient) reconnect() {
k.mu.Lock()
codes := make([]string, 0, len(k.subscribed))
for code := range k.subscribed {
codes = append(codes, code)
}
k.mu.Unlock()
delay := 5 * time.Second
for {
time.Sleep(delay)
log.Printf("키움 WS 재연결 시도... (%v 후 다음 시도)", delay*2)
conn, err := k.dial()
if err != nil {
log.Printf("키움 WS 재연결 실패: %v", err)
if delay < 60*time.Second {
delay *= 2
}
continue
}
k.mu.Lock()
k.conn = conn
// 기존 구독 복구: 모든 코드를 단일 REG 메시지로 배치 전송
if len(codes) > 0 {
k.sendRegBatch(codes, "1")
}
k.mu.Unlock()
stopCh := make(chan struct{})
go k.readLoop(conn, stopCh)
log.Printf("키움 WS 재연결 성공, %d개 종목 구독 복구", len(codes))
return
}
}
// parseRealPrice 0B 실시간 주식체결 값 → StockPrice 변환
// 20=체결시간, 10=현재가, 11=전일대비, 12=등락률, 13=누적거래량, 14=누적거래대금
// 15=거래량(체결량), 16=시가, 17=고가, 18=저가, 27=최우선매도호가, 28=최우선매수호가
// 228=체결강도, 290=장구분
// NXT 종목코드(005930_NX)는 _NX 접미사 제거 후 KRX 코드(005930)로 통일
func parseRealPrice(code string, v map[string]string) *models.StockPrice {
normalized := strings.TrimPrefix(code, "A")
normalized = strings.SplitN(normalized, "_", 2)[0] // _NX, _AL 등 접미사 제거
return &models.StockPrice{
Code: normalized,
CurrentPrice: absInt(parseWSInt(v["10"])),
ChangePrice: parseWSInt(v["11"]),
ChangeRate: parseWSFloat(v["12"]),
Volume: absInt(parseWSInt(v["13"])),
TradeMoney: absInt(parseWSInt(v["14"])),
TradeVolume: absInt(parseWSInt(v["15"])),
Open: absInt(parseWSInt(v["16"])),
High: absInt(parseWSInt(v["17"])),
Low: absInt(parseWSInt(v["18"])),
TradeTime: v["20"],
AskPrice1: absInt(parseWSInt(v["27"])),
BidPrice1: absInt(parseWSInt(v["28"])),
CntrStr: parseWSFloat(v["228"]),
MarketStatus: v["290"],
UpdatedAt: time.Now(),
}
}
// parseExpectedPrice 0H 주식예상체결 → StockPrice 변환 (장 전/후 예상체결 시 사용)
func parseExpectedPrice(code string, v map[string]string) *models.StockPrice {
normalized := strings.TrimPrefix(code, "A")
normalized = strings.SplitN(normalized, "_", 2)[0]
return &models.StockPrice{
Code: normalized,
CurrentPrice: absInt(parseWSInt(v["10"])),
ChangePrice: parseWSInt(v["11"]),
ChangeRate: parseWSFloat(v["12"]),
TradeVolume: absInt(parseWSInt(v["15"])),
Volume: absInt(parseWSInt(v["13"])),
TradeTime: v["20"],
UpdatedAt: time.Now(),
}
}
// parseOrderBook 0D 주식호가잔량 → OrderBook 변환
// 41~50=매도호가1~10, 61~70=매도호가수량1~10
// 51~60=매수호가1~10, 71~80=매수호가수량1~10
// 121=매도총잔량, 125=매수총잔량, 23=예상체결가, 24=예상체결수량
func parseOrderBook(code string, v map[string]string) *models.OrderBook {
normalized := strings.TrimPrefix(code, "A")
normalized = strings.SplitN(normalized, "_", 2)[0]
ob := &models.OrderBook{
Code: normalized,
AskTime: v["21"],
}
for i := 1; i <= 10; i++ {
askKey := strconv.Itoa(40 + i) // 41..50
askVolKey := strconv.Itoa(60 + i) // 61..70
bidKey := strconv.Itoa(50 + i) // 51..60
bidVolKey := strconv.Itoa(70 + i) // 71..80
ob.Asks = append(ob.Asks, models.OrderBookEntry{
Price: absInt(parseWSInt(v[askKey])),
Volume: absInt(parseWSInt(v[askVolKey])),
})
ob.Bids = append(ob.Bids, models.OrderBookEntry{
Price: absInt(parseWSInt(v[bidKey])),
Volume: absInt(parseWSInt(v[bidVolKey])),
})
}
ob.TotalAskVol = absInt(parseWSInt(v["121"]))
ob.TotalBidVol = absInt(parseWSInt(v["125"]))
ob.ExpectedPrc = absInt(parseWSInt(v["23"]))
ob.ExpectedVol = absInt(parseWSInt(v["24"]))
return ob
}
// parseProgramTrading 0w 종목프로그램매매 → ProgramTrading 변환
func parseProgramTrading(code string, v map[string]string) *models.ProgramTrading {
normalized := strings.TrimPrefix(code, "A")
normalized = strings.SplitN(normalized, "_", 2)[0]
return &models.ProgramTrading{
Code: normalized,
SellVolume: absInt(parseWSInt(v["202"])),
SellAmount: absInt(parseWSInt(v["204"])),
BuyVolume: absInt(parseWSInt(v["206"])),
BuyAmount: absInt(parseWSInt(v["208"])),
NetBuyVolume: parseWSInt(v["210"]),
NetBuyAmount: parseWSInt(v["212"]),
}
}
// parseMarketStatus 0s 장시작시간 → MarketStatus 변환
func parseMarketStatus(v map[string]string) *models.MarketStatus {
code := v["215"]
return &models.MarketStatus{
StatusCode: code,
StatusName: marketStatusName(code),
Time: v["20"],
}
}
// marketStatusName 장운영구분 코드 → 한글 이름
func marketStatusName(code string) string {
switch code {
case "0":
return "장시작전"
case "2":
return "장마감알림"
case "3":
return "장 중"
case "4":
return "장마감"
case "8":
return "정규장마감"
case "9":
return "전체장마감"
case "a":
return "시간외종가시작"
case "b":
return "시간외종가종료"
case "c":
return "시간외단일가시작"
case "d":
return "시간외단일가종료"
default:
return "장외"
}
}
// parseStockMeta 0g 주식종목정보 → StockMeta 변환
func parseStockMeta(code string, v map[string]string) *models.StockMeta {
normalized := strings.TrimPrefix(code, "A")
normalized = strings.SplitN(normalized, "_", 2)[0]
return &models.StockMeta{
Code: normalized,
UpperLimit: absInt(parseWSInt(v["305"])),
LowerLimit: absInt(parseWSInt(v["306"])),
BasePrice: absInt(parseWSInt(v["307"])),
}
}
func parseWSInt(s string) int64 {
s = strings.TrimSpace(s)
if s == "" {
return 0
}
neg := strings.HasPrefix(s, "-")
s = strings.TrimLeft(s, "+-")
s = strings.ReplaceAll(s, ",", "")
n, _ := strconv.ParseInt(s, 10, 64)
if neg {
return -n
}
return n
}
func parseWSFloat(s string) float64 {
s = strings.TrimSpace(s)
if s == "" {
return 0
}
neg := strings.HasPrefix(s, "-")
s = strings.TrimLeft(s, "+-")
f, _ := strconv.ParseFloat(s, 64)
if neg {
return -f
}
return f
}
func absInt(n int64) int64 {
if n < 0 {
return -n
}
return n
}