package websocket import ( "encoding/json" "log" "net/http" "stocksearch/models" "stocksearch/services" "time" "github.com/gorilla/websocket" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 4096, // 개발 환경에서 CORS 허용 CheckOrigin: func(r *http.Request) bool { return true }, } // SubscribeMsg 구독/해제 요청 메시지 type SubscribeMsg struct { Client *Client Code string } // Hub WebSocket 연결 및 실시간 시세 관리 type Hub struct { // 클라이언트 → 구독 종목 코드 집합 clients map[*Client]map[string]bool // 종목 코드 → 구독 클라이언트 수 (키움 WS 구독 관리용) codeCounts map[string]int // 채널 register chan *Client unregister chan *Client subscribe chan *SubscribeMsg unsubscribeCode chan *SubscribeMsg priceUpdates chan *models.StockPrice // 키움 WS에서 수신한 실시간 시세 (0B/0H) orderBookUpdates chan *models.OrderBook // 실시간 호가창 (0D) programUpdates chan *models.ProgramTrading // 프로그램 매매 (0w) marketUpdates chan *models.MarketStatus // 장운영 상태 (0s) metaUpdates chan *models.StockMeta // 종목 메타 (0g) tradeLogs chan []byte // 자동매매 로그 브로드캐스트 kiwoomWS *services.KiwoomWSClient internalSubscribe chan []string // 스캐너/자동매매 전용 구독 요청 채널 } // NewHub Hub 초기화 (키움 WS 클라이언트 주입) func NewHub() *Hub { hub := &Hub{ clients: make(map[*Client]map[string]bool), codeCounts: make(map[string]int), register: make(chan *Client), unregister: make(chan *Client), subscribe: make(chan *SubscribeMsg), unsubscribeCode: make(chan *SubscribeMsg), priceUpdates: make(chan *models.StockPrice, 256), orderBookUpdates: make(chan *models.OrderBook, 256), programUpdates: make(chan *models.ProgramTrading, 128), marketUpdates: make(chan *models.MarketStatus, 32), metaUpdates: make(chan *models.StockMeta, 64), tradeLogs: make(chan []byte, 64), internalSubscribe: make(chan []string, 32), } // 키움 WS 클라이언트 생성 (가격 수신 시 채널로 전달) hub.kiwoomWS = services.GetKiwoomWSClient(func(price *models.StockPrice) { select { case hub.priceUpdates <- price: default: // 버퍼 꽉 찼으면 드롭 } }) // 추가 실시간 데이터 콜백 등록 hub.kiwoomWS.SetCallbacks( func(ob *models.OrderBook) { select { case hub.orderBookUpdates <- ob: default: } }, func(pg *models.ProgramTrading) { select { case hub.programUpdates <- pg: default: } }, func(ms *models.MarketStatus) { select { case hub.marketUpdates <- ms: default: } }, func(meta *models.StockMeta) { select { case hub.metaUpdates <- meta: default: } }, ) return hub } // Run Hub 이벤트 루프 실행 (고루틴으로 실행) func (h *Hub) Run() { for { select { case client := <-h.register: h.clients[client] = make(map[string]bool) case client := <-h.unregister: if codes, ok := h.clients[client]; ok { for code := range codes { h.decreaseCount(code) } delete(h.clients, client) close(client.send) } case msg := <-h.subscribe: if codes, ok := h.clients[msg.Client]; ok { if !codes[msg.Code] { codes[msg.Code] = true h.increaseCount(msg.Code) log.Printf("브라우저 구독: %s", msg.Code) } } case msg := <-h.unsubscribeCode: if codes, ok := h.clients[msg.Client]; ok { if codes[msg.Code] { delete(codes, msg.Code) h.decreaseCount(msg.Code) } } case price := <-h.priceUpdates: h.broadcastToCode(price.Code, "price", price) services.GetCacheService().Set("price:"+price.Code, price, 10*time.Second) case ob := <-h.orderBookUpdates: h.broadcastToCode(ob.Code, "orderbook", ob) services.GetCacheService().Set("orderbook:"+ob.Code, ob, 10*time.Second) case codes := <-h.internalSubscribe: for _, code := range codes { h.codeCounts[code]++ if h.codeCounts[code] == 1 { h.kiwoomWS.SubscribePair(code) log.Printf("내부 구독 등록: %s", code) } } case pg := <-h.programUpdates: h.broadcastToCode(pg.Code, "program", pg) case ms := <-h.marketUpdates: h.broadcastToAll("market", ms) case meta := <-h.metaUpdates: h.broadcastToCode(meta.Code, "meta", meta) case raw := <-h.tradeLogs: // 자동매매 로그: 모든 클라이언트에 브로드캐스트 for client := range h.clients { select { case client.send <- raw: default: close(client.send) delete(h.clients, client) } } } } } // SubscribeInternal 스캐너/자동매매 전용 WS 구독 (클라이언트 없이) func (h *Hub) SubscribeInternal(codes []string) { select { case h.internalSubscribe <- codes: default: } } // BroadcastTradeLog 자동매매 로그를 모든 WS 클라이언트에 전송 func (h *Hub) BroadcastTradeLog(l models.AutoTradeLog) { msg := models.WSMessage{Type: "tradelog", Data: l} raw, err := json.Marshal(msg) if err != nil { return } select { case h.tradeLogs <- raw: default: // 버퍼 꽉 찼으면 드롭 } } // increaseCount 종목 구독 수 증가 → 0→1 시 키움 WS 구독 등록 (KRX + NXT 단일 REG) func (h *Hub) increaseCount(code string) { h.codeCounts[code]++ if h.codeCounts[code] == 1 { h.kiwoomWS.SubscribePair(code) // KRX + NXT 단일 REG } } // decreaseCount 종목 구독 수 감소 → 0 시 키움 WS 구독 해제 (KRX + NXT 단일 REMOVE) func (h *Hub) decreaseCount(code string) { h.codeCounts[code]-- if h.codeCounts[code] <= 0 { delete(h.codeCounts, code) h.kiwoomWS.UnsubscribePair(code) // KRX + NXT 단일 REMOVE } } // broadcastToCode 특정 종목 구독 클라이언트에게만 메시지 전송 func (h *Hub) broadcastToCode(code string, msgType string, data interface{}) { msg := models.WSMessage{ Type: msgType, Code: code, Data: data, } raw, err := json.Marshal(msg) if err != nil { return } for client, codes := range h.clients { if !codes[code] { continue } select { case client.send <- raw: default: // 슬로우 클라이언트 연결 해제 close(client.send) delete(h.clients, client) } } } // broadcastToAll 모든 클라이언트에게 메시지 전송 (장운영 상태 등 전역 이벤트) func (h *Hub) broadcastToAll(msgType string, data interface{}) { msg := models.WSMessage{ Type: msgType, Code: "", Data: data, } raw, err := json.Marshal(msg) if err != nil { return } for client := range h.clients { select { case client.send <- raw: default: close(client.send) delete(h.clients, client) } } } // StartKiwoomWS 키움 WS 실시간 연결 시작 func (h *Hub) StartKiwoomWS() error { return h.kiwoomWS.Connect() } // ServeWS HTTP 요청을 WebSocket으로 업그레이드 후 클라이언트 등록 func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) { conn, err := upgrader.Upgrade(w, r, nil) if err != nil { log.Printf("WebSocket 업그레이드 실패: %v", err) return } client := &Client{ hub: h, conn: conn, send: make(chan []byte, 256), } h.register <- client go client.writePump() go client.readPump() }