package services import ( "encoding/json" "fmt" "log" "stocksearch/models" "strconv" "strings" "sync" "time" "github.com/gorilla/websocket" ) const ( kiwoomWSURL = "wss://api.kiwoom.com:10000/api/dostk/websocket" writeTimeout = 10 * time.Second // 쓰기 타임아웃 ) // KiwoomWSClient 키움증권 실시간 WebSocket 클라이언트 type KiwoomWSClient struct { tokenService *TokenService conn *websocket.Conn mu sync.Mutex // conn 보호 + 직렬화된 쓰기 보장 // 현재 구독 중인 종목 코드 집합 (재연결 시 복구용) subscribed map[string]bool // REG 배치 전송용: 짧은 시간 내 요청을 모아 단일 REG로 발송 pendingReg []string regTimer *time.Timer // 실시간 데이터 수신 콜백 onPrice func(price *models.StockPrice) onOrderBook func(ob *models.OrderBook) onProgram func(pg *models.ProgramTrading) onMarketStatus func(ms *models.MarketStatus) onMeta func(meta *models.StockMeta) } var kiwoomWSOnce sync.Once var kiwoomWSSvc *KiwoomWSClient // GetKiwoomWSClient KiwoomWS 클라이언트 싱글턴 반환 func GetKiwoomWSClient(onPrice func(*models.StockPrice)) *KiwoomWSClient { kiwoomWSOnce.Do(func() { kiwoomWSSvc = &KiwoomWSClient{ tokenService: GetTokenService(), subscribed: make(map[string]bool), onPrice: onPrice, } }) return kiwoomWSSvc } // SetCallbacks 추가 실시간 데이터 콜백 등록 func (k *KiwoomWSClient) SetCallbacks( onOrderBook func(*models.OrderBook), onProgram func(*models.ProgramTrading), onMarketStatus func(*models.MarketStatus), onMeta func(*models.StockMeta), ) { k.mu.Lock() defer k.mu.Unlock() k.onOrderBook = onOrderBook k.onProgram = onProgram k.onMarketStatus = onMarketStatus k.onMeta = onMeta } // Connect 키움 WS 서버에 연결 후 읽기 루프 시작 func (k *KiwoomWSClient) Connect() error { conn, err := k.dial() if err != nil { return err } k.mu.Lock() k.conn = conn k.mu.Unlock() stopCh := make(chan struct{}) go k.readLoop(conn, stopCh) log.Println("키움 WS 연결 완료") return nil } // dial WSS 연결 수립 후 로그인 패킷 전송 func (k *KiwoomWSClient) dial() (*websocket.Conn, error) { // HTTP 헤더 없이 연결 (키움 WS는 헤더 인증 불필요) conn, _, err := websocket.DefaultDialer.Dial(kiwoomWSURL, nil) if err != nil { return nil, err } // 연결 직후 로그인 패킷 전송 (Bearer 없이 token만) loginMsg := map[string]string{ "trnm": "LOGIN", "token": k.tokenService.GetToken(), } data, _ := json.Marshal(loginMsg) conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { conn.Close() return nil, err } // 로그인 응답 대기 conn.SetReadDeadline(time.Now().Add(10 * time.Second)) _, resp, err := conn.ReadMessage() if err != nil { conn.Close() return nil, err } var loginResp struct { Trnm string `json:"trnm"` ReturnCode int `json:"return_code"` ReturnMsg string `json:"return_msg"` } if err := json.Unmarshal(resp, &loginResp); err != nil { conn.Close() return nil, fmt.Errorf("로그인 응답 파싱 실패: %w", err) } if loginResp.Trnm != "LOGIN" || loginResp.ReturnCode != 0 { conn.Close() return nil, fmt.Errorf("키움 WS 로그인 실패 [%d]: %s", loginResp.ReturnCode, loginResp.ReturnMsg) } // 읽기 데드라인 초기화 (readLoop에서 관리) conn.SetReadDeadline(time.Time{}) // 장운영 상태(0s) 글로벌 구독 (item 빈 문자열) k.sendMarketStatusReg(conn) log.Println("키움 WS 로그인 성공") return conn, nil } // sendMarketStatusReg 장운영 상태(0s) 구독 전송 (item="", 전역) func (k *KiwoomWSClient) sendMarketStatusReg(conn *websocket.Conn) { msg := map[string]interface{}{ "trnm": "REG", "grp_no": "1", "refresh": "1", "data": []map[string]interface{}{ {"item": []string{""}, "type": []string{"0s"}}, }, } data, _ := json.Marshal(msg) conn.SetWriteDeadline(time.Now().Add(writeTimeout)) if err := conn.WriteMessage(websocket.TextMessage, data); err != nil { log.Printf("키움 WS 장운영상태 구독 실패: %v", err) } } // SubscribePair KRX + NXT 종목을 debounce 배치 REG로 구독 // 200ms 내 여러 호출이 들어오면 하나의 REG 메시지로 묶어 전송 func (k *KiwoomWSClient) SubscribePair(code string) { k.mu.Lock() defer k.mu.Unlock() nxt := code + "_NX" if !k.subscribed[code] { k.subscribed[code] = true k.pendingReg = append(k.pendingReg, code) } if !k.subscribed[nxt] { k.subscribed[nxt] = true k.pendingReg = append(k.pendingReg, nxt) } k.scheduleFlush() } // scheduleFlush REG 배치 전송 타이머 설정 (mu 보유 상태에서 호출) // 연속 호출 시 타이머를 초기화해 마지막 호출로부터 200ms 후 한 번만 전송 func (k *KiwoomWSClient) scheduleFlush() { if k.regTimer != nil { k.regTimer.Stop() } k.regTimer = time.AfterFunc(200*time.Millisecond, k.flushPendingReg) } // flushPendingReg 누적된 구독 코드를 단일 REG 메시지로 전송 func (k *KiwoomWSClient) flushPendingReg() { k.mu.Lock() defer k.mu.Unlock() if len(k.pendingReg) == 0 { return } codes := k.pendingReg k.pendingReg = nil k.sendRegBatch(codes, "1") log.Printf("키움 WS 배치 구독 전송 (%d개): %v", len(codes), codes) } // UnsubscribePair KRX + NXT 종목을 단일 REMOVE 메시지로 동시 해제 func (k *KiwoomWSClient) UnsubscribePair(code string) { k.mu.Lock() defer k.mu.Unlock() nxt := code + "_NX" var toRemove []string if k.subscribed[code] { delete(k.subscribed, code) toRemove = append(toRemove, code) } if k.subscribed[nxt] { delete(k.subscribed, nxt) toRemove = append(toRemove, nxt) } if len(toRemove) > 0 { k.sendRemoveBatch(toRemove) log.Printf("키움 WS 구독 해제: %v", toRemove) } } // sendRegBatch 여러 종목을 단일 REG 메시지로 배치 전송 (mu 보유 상태에서 호출) // 0B: KRX + NXT 전체, 0D/0H/0w/0g: KRX 코드만 func (k *KiwoomWSClient) sendRegBatch(codes []string, refresh string) { // KRX 전용 코드 추출 (_NX 등 접미사 없는 코드) krxCodes := filterKRXCodes(codes) dataItems := []map[string]interface{}{ {"item": codes, "type": []string{"0B"}}, } if len(krxCodes) > 0 { dataItems = append(dataItems, map[string]interface{}{ "item": krxCodes, "type": []string{"0D", "0H", "0w", "0g"}, }) } msg := map[string]interface{}{ "trnm": "REG", "grp_no": "1", "refresh": refresh, "data": dataItems, } if err := k.write(msg); err != nil { log.Printf("키움 WS 구독 전송 실패: %v", err) } } // sendRemoveBatch 여러 종목을 단일 REMOVE 메시지로 배치 전송 (mu 보유 상태에서 호출) func (k *KiwoomWSClient) sendRemoveBatch(codes []string) { krxCodes := filterKRXCodes(codes) dataItems := []map[string]interface{}{ {"item": codes, "type": []string{"0B"}}, } if len(krxCodes) > 0 { dataItems = append(dataItems, map[string]interface{}{ "item": krxCodes, "type": []string{"0D", "0H", "0w", "0g"}, }) } msg := map[string]interface{}{ "trnm": "REMOVE", "grp_no": "1", "data": dataItems, } if err := k.write(msg); err != nil { log.Printf("키움 WS 구독해제 전송 실패: %v", err) } } // filterKRXCodes 접미사 없는 KRX 코드만 반환 func filterKRXCodes(codes []string) []string { var krx []string for _, c := range codes { if !strings.Contains(c, "_") { krx = append(krx, c) } } return krx } // write JSON 메시지 전송 (mu 보유 상태에서 호출) func (k *KiwoomWSClient) write(v interface{}) error { if k.conn == nil { return nil } data, _ := json.Marshal(v) k.conn.SetWriteDeadline(time.Now().Add(writeTimeout)) return k.conn.WriteMessage(websocket.TextMessage, data) } // readLoop 키움 WS 메시지 수신 루프 func (k *KiwoomWSClient) readLoop(conn *websocket.Conn, stopCh chan struct{}) { defer func() { close(stopCh) k.mu.Lock() if k.conn == conn { k.conn = nil } k.mu.Unlock() conn.Close() log.Println("키움 WS 연결 끊김, 재연결 시도...") go k.reconnect() }() for { _, data, err := conn.ReadMessage() if err != nil { if !websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) { log.Printf("키움 WS 읽기 오류: %v", err) } return } k.handleMessage(data) } } // handleMessage 수신 메시지 파싱 및 콜백 호출 func (k *KiwoomWSClient) handleMessage(data []byte) { var msg struct { Trnm string `json:"trnm"` Data []struct { Type string `json:"type"` Item string `json:"item"` Values map[string]string `json:"values"` } `json:"data"` ReturnCode int `json:"return_code"` ReturnMsg string `json:"return_msg"` } if err := json.Unmarshal(data, &msg); err != nil { return } switch msg.Trnm { case "PING": // 키움 서버 PING 수신 → PONG 응답 전송 k.mu.Lock() _ = k.write(map[string]string{"trnm": "PONG"}) k.mu.Unlock() case "REG", "REMOVE": if msg.ReturnCode != 0 { log.Printf("키움 WS %s 오류: %s", msg.Trnm, msg.ReturnMsg) } case "REAL": for _, d := range msg.Data { switch d.Type { case "0B": if k.onPrice != nil { k.onPrice(parseRealPrice(d.Item, d.Values)) } case "0D": if k.onOrderBook != nil { k.onOrderBook(parseOrderBook(d.Item, d.Values)) } case "0H": // 예상체결 데이터 → StockPrice 형식으로 통합 if k.onPrice != nil { k.onPrice(parseExpectedPrice(d.Item, d.Values)) } case "0w": if k.onProgram != nil { k.onProgram(parseProgramTrading(d.Item, d.Values)) } case "0s": if k.onMarketStatus != nil { k.onMarketStatus(parseMarketStatus(d.Values)) } case "0g": if k.onMeta != nil { k.onMeta(parseStockMeta(d.Item, d.Values)) } } } } } // reconnect 재연결 및 기존 구독 복구 (지수 백오프) func (k *KiwoomWSClient) reconnect() { k.mu.Lock() codes := make([]string, 0, len(k.subscribed)) for code := range k.subscribed { codes = append(codes, code) } k.mu.Unlock() delay := 5 * time.Second for { time.Sleep(delay) log.Printf("키움 WS 재연결 시도... (%v 후 다음 시도)", delay*2) conn, err := k.dial() if err != nil { log.Printf("키움 WS 재연결 실패: %v", err) if delay < 60*time.Second { delay *= 2 } continue } k.mu.Lock() k.conn = conn // 기존 구독 복구: 모든 코드를 단일 REG 메시지로 배치 전송 if len(codes) > 0 { k.sendRegBatch(codes, "1") } k.mu.Unlock() stopCh := make(chan struct{}) go k.readLoop(conn, stopCh) log.Printf("키움 WS 재연결 성공, %d개 종목 구독 복구", len(codes)) return } } // parseRealPrice 0B 실시간 주식체결 값 → StockPrice 변환 // 20=체결시간, 10=현재가, 11=전일대비, 12=등락률, 13=누적거래량, 14=누적거래대금 // 15=거래량(체결량), 16=시가, 17=고가, 18=저가, 27=최우선매도호가, 28=최우선매수호가 // 228=체결강도, 290=장구분 // NXT 종목코드(005930_NX)는 _NX 접미사 제거 후 KRX 코드(005930)로 통일 func parseRealPrice(code string, v map[string]string) *models.StockPrice { normalized := strings.TrimPrefix(code, "A") normalized = strings.SplitN(normalized, "_", 2)[0] // _NX, _AL 등 접미사 제거 return &models.StockPrice{ Code: normalized, CurrentPrice: absInt(parseWSInt(v["10"])), ChangePrice: parseWSInt(v["11"]), ChangeRate: parseWSFloat(v["12"]), Volume: absInt(parseWSInt(v["13"])), TradeMoney: absInt(parseWSInt(v["14"])), TradeVolume: absInt(parseWSInt(v["15"])), Open: absInt(parseWSInt(v["16"])), High: absInt(parseWSInt(v["17"])), Low: absInt(parseWSInt(v["18"])), TradeTime: v["20"], AskPrice1: absInt(parseWSInt(v["27"])), BidPrice1: absInt(parseWSInt(v["28"])), CntrStr: parseWSFloat(v["228"]), MarketStatus: v["290"], UpdatedAt: time.Now(), } } // parseExpectedPrice 0H 주식예상체결 → StockPrice 변환 (장 전/후 예상체결 시 사용) func parseExpectedPrice(code string, v map[string]string) *models.StockPrice { normalized := strings.TrimPrefix(code, "A") normalized = strings.SplitN(normalized, "_", 2)[0] return &models.StockPrice{ Code: normalized, CurrentPrice: absInt(parseWSInt(v["10"])), ChangePrice: parseWSInt(v["11"]), ChangeRate: parseWSFloat(v["12"]), TradeVolume: absInt(parseWSInt(v["15"])), Volume: absInt(parseWSInt(v["13"])), TradeTime: v["20"], UpdatedAt: time.Now(), } } // parseOrderBook 0D 주식호가잔량 → OrderBook 변환 // 41~50=매도호가1~10, 61~70=매도호가수량1~10 // 51~60=매수호가1~10, 71~80=매수호가수량1~10 // 121=매도총잔량, 125=매수총잔량, 23=예상체결가, 24=예상체결수량 func parseOrderBook(code string, v map[string]string) *models.OrderBook { normalized := strings.TrimPrefix(code, "A") normalized = strings.SplitN(normalized, "_", 2)[0] ob := &models.OrderBook{ Code: normalized, AskTime: v["21"], } for i := 1; i <= 10; i++ { askKey := strconv.Itoa(40 + i) // 41..50 askVolKey := strconv.Itoa(60 + i) // 61..70 bidKey := strconv.Itoa(50 + i) // 51..60 bidVolKey := strconv.Itoa(70 + i) // 71..80 ob.Asks = append(ob.Asks, models.OrderBookEntry{ Price: absInt(parseWSInt(v[askKey])), Volume: absInt(parseWSInt(v[askVolKey])), }) ob.Bids = append(ob.Bids, models.OrderBookEntry{ Price: absInt(parseWSInt(v[bidKey])), Volume: absInt(parseWSInt(v[bidVolKey])), }) } ob.TotalAskVol = absInt(parseWSInt(v["121"])) ob.TotalBidVol = absInt(parseWSInt(v["125"])) ob.ExpectedPrc = absInt(parseWSInt(v["23"])) ob.ExpectedVol = absInt(parseWSInt(v["24"])) return ob } // parseProgramTrading 0w 종목프로그램매매 → ProgramTrading 변환 func parseProgramTrading(code string, v map[string]string) *models.ProgramTrading { normalized := strings.TrimPrefix(code, "A") normalized = strings.SplitN(normalized, "_", 2)[0] return &models.ProgramTrading{ Code: normalized, SellVolume: absInt(parseWSInt(v["202"])), SellAmount: absInt(parseWSInt(v["204"])), BuyVolume: absInt(parseWSInt(v["206"])), BuyAmount: absInt(parseWSInt(v["208"])), NetBuyVolume: parseWSInt(v["210"]), NetBuyAmount: parseWSInt(v["212"]), } } // parseMarketStatus 0s 장시작시간 → MarketStatus 변환 func parseMarketStatus(v map[string]string) *models.MarketStatus { code := v["215"] return &models.MarketStatus{ StatusCode: code, StatusName: marketStatusName(code), Time: v["20"], } } // marketStatusName 장운영구분 코드 → 한글 이름 func marketStatusName(code string) string { switch code { case "0": return "장시작전" case "2": return "장마감알림" case "3": return "장 중" case "4": return "장마감" case "8": return "정규장마감" case "9": return "전체장마감" case "a": return "시간외종가시작" case "b": return "시간외종가종료" case "c": return "시간외단일가시작" case "d": return "시간외단일가종료" default: return "장외" } } // parseStockMeta 0g 주식종목정보 → StockMeta 변환 func parseStockMeta(code string, v map[string]string) *models.StockMeta { normalized := strings.TrimPrefix(code, "A") normalized = strings.SplitN(normalized, "_", 2)[0] return &models.StockMeta{ Code: normalized, UpperLimit: absInt(parseWSInt(v["305"])), LowerLimit: absInt(parseWSInt(v["306"])), BasePrice: absInt(parseWSInt(v["307"])), } } func parseWSInt(s string) int64 { s = strings.TrimSpace(s) if s == "" { return 0 } neg := strings.HasPrefix(s, "-") s = strings.TrimLeft(s, "+-") s = strings.ReplaceAll(s, ",", "") n, _ := strconv.ParseInt(s, 10, 64) if neg { return -n } return n } func parseWSFloat(s string) float64 { s = strings.TrimSpace(s) if s == "" { return 0 } neg := strings.HasPrefix(s, "-") s = strings.TrimLeft(s, "+-") f, _ := strconv.ParseFloat(s, 64) if neg { return -f } return f } func absInt(n int64) int64 { if n < 0 { return -n } return n }