292 lines
7.2 KiB
Go
292 lines
7.2 KiB
Go
package websocket
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log"
|
|
"net/http"
|
|
"stocksearch/models"
|
|
"stocksearch/services"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
)
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 4096,
|
|
// 개발 환경에서 CORS 허용
|
|
CheckOrigin: func(r *http.Request) bool { return true },
|
|
}
|
|
|
|
// SubscribeMsg 구독/해제 요청 메시지
|
|
type SubscribeMsg struct {
|
|
Client *Client
|
|
Code string
|
|
}
|
|
|
|
// Hub WebSocket 연결 및 실시간 시세 관리
|
|
type Hub struct {
|
|
// 클라이언트 → 구독 종목 코드 집합
|
|
clients map[*Client]map[string]bool
|
|
|
|
// 종목 코드 → 구독 클라이언트 수 (키움 WS 구독 관리용)
|
|
codeCounts map[string]int
|
|
|
|
// 채널
|
|
register chan *Client
|
|
unregister chan *Client
|
|
subscribe chan *SubscribeMsg
|
|
unsubscribeCode chan *SubscribeMsg
|
|
priceUpdates chan *models.StockPrice // 키움 WS에서 수신한 실시간 시세 (0B/0H)
|
|
orderBookUpdates chan *models.OrderBook // 실시간 호가창 (0D)
|
|
programUpdates chan *models.ProgramTrading // 프로그램 매매 (0w)
|
|
marketUpdates chan *models.MarketStatus // 장운영 상태 (0s)
|
|
metaUpdates chan *models.StockMeta // 종목 메타 (0g)
|
|
tradeLogs chan []byte // 자동매매 로그 브로드캐스트
|
|
|
|
kiwoomWS *services.KiwoomWSClient
|
|
internalSubscribe chan []string // 스캐너/자동매매 전용 구독 요청 채널
|
|
}
|
|
|
|
// NewHub Hub 초기화 (키움 WS 클라이언트 주입)
|
|
func NewHub() *Hub {
|
|
hub := &Hub{
|
|
clients: make(map[*Client]map[string]bool),
|
|
codeCounts: make(map[string]int),
|
|
register: make(chan *Client),
|
|
unregister: make(chan *Client),
|
|
subscribe: make(chan *SubscribeMsg),
|
|
unsubscribeCode: make(chan *SubscribeMsg),
|
|
priceUpdates: make(chan *models.StockPrice, 256),
|
|
orderBookUpdates: make(chan *models.OrderBook, 256),
|
|
programUpdates: make(chan *models.ProgramTrading, 128),
|
|
marketUpdates: make(chan *models.MarketStatus, 32),
|
|
metaUpdates: make(chan *models.StockMeta, 64),
|
|
tradeLogs: make(chan []byte, 64),
|
|
internalSubscribe: make(chan []string, 32),
|
|
}
|
|
|
|
// 키움 WS 클라이언트 생성 (가격 수신 시 채널로 전달)
|
|
hub.kiwoomWS = services.GetKiwoomWSClient(func(price *models.StockPrice) {
|
|
select {
|
|
case hub.priceUpdates <- price:
|
|
default:
|
|
// 버퍼 꽉 찼으면 드롭
|
|
}
|
|
})
|
|
|
|
// 추가 실시간 데이터 콜백 등록
|
|
hub.kiwoomWS.SetCallbacks(
|
|
func(ob *models.OrderBook) {
|
|
select {
|
|
case hub.orderBookUpdates <- ob:
|
|
default:
|
|
}
|
|
},
|
|
func(pg *models.ProgramTrading) {
|
|
select {
|
|
case hub.programUpdates <- pg:
|
|
default:
|
|
}
|
|
},
|
|
func(ms *models.MarketStatus) {
|
|
select {
|
|
case hub.marketUpdates <- ms:
|
|
default:
|
|
}
|
|
},
|
|
func(meta *models.StockMeta) {
|
|
select {
|
|
case hub.metaUpdates <- meta:
|
|
default:
|
|
}
|
|
},
|
|
)
|
|
|
|
return hub
|
|
}
|
|
|
|
// Run Hub 이벤트 루프 실행 (고루틴으로 실행)
|
|
func (h *Hub) Run() {
|
|
for {
|
|
select {
|
|
case client := <-h.register:
|
|
h.clients[client] = make(map[string]bool)
|
|
|
|
case client := <-h.unregister:
|
|
if codes, ok := h.clients[client]; ok {
|
|
for code := range codes {
|
|
h.decreaseCount(code)
|
|
}
|
|
delete(h.clients, client)
|
|
close(client.send)
|
|
}
|
|
|
|
case msg := <-h.subscribe:
|
|
if codes, ok := h.clients[msg.Client]; ok {
|
|
if !codes[msg.Code] {
|
|
codes[msg.Code] = true
|
|
h.increaseCount(msg.Code)
|
|
log.Printf("브라우저 구독: %s", msg.Code)
|
|
}
|
|
}
|
|
|
|
case msg := <-h.unsubscribeCode:
|
|
if codes, ok := h.clients[msg.Client]; ok {
|
|
if codes[msg.Code] {
|
|
delete(codes, msg.Code)
|
|
h.decreaseCount(msg.Code)
|
|
}
|
|
}
|
|
|
|
case price := <-h.priceUpdates:
|
|
h.broadcastToCode(price.Code, "price", price)
|
|
services.GetCacheService().Set("price:"+price.Code, price, 10*time.Second)
|
|
|
|
case ob := <-h.orderBookUpdates:
|
|
h.broadcastToCode(ob.Code, "orderbook", ob)
|
|
services.GetCacheService().Set("orderbook:"+ob.Code, ob, 10*time.Second)
|
|
|
|
case codes := <-h.internalSubscribe:
|
|
for _, code := range codes {
|
|
h.codeCounts[code]++
|
|
if h.codeCounts[code] == 1 {
|
|
h.kiwoomWS.SubscribePair(code)
|
|
log.Printf("내부 구독 등록: %s", code)
|
|
}
|
|
}
|
|
|
|
case pg := <-h.programUpdates:
|
|
h.broadcastToCode(pg.Code, "program", pg)
|
|
|
|
case ms := <-h.marketUpdates:
|
|
h.broadcastToAll("market", ms)
|
|
|
|
case meta := <-h.metaUpdates:
|
|
h.broadcastToCode(meta.Code, "meta", meta)
|
|
|
|
case raw := <-h.tradeLogs:
|
|
// 자동매매 로그: 모든 클라이언트에 브로드캐스트
|
|
for client := range h.clients {
|
|
select {
|
|
case client.send <- raw:
|
|
default:
|
|
close(client.send)
|
|
delete(h.clients, client)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// SubscribeInternal 스캐너/자동매매 전용 WS 구독 (클라이언트 없이)
|
|
func (h *Hub) SubscribeInternal(codes []string) {
|
|
select {
|
|
case h.internalSubscribe <- codes:
|
|
default:
|
|
}
|
|
}
|
|
|
|
// BroadcastTradeLog 자동매매 로그를 모든 WS 클라이언트에 전송
|
|
func (h *Hub) BroadcastTradeLog(l models.AutoTradeLog) {
|
|
msg := models.WSMessage{Type: "tradelog", Data: l}
|
|
raw, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
select {
|
|
case h.tradeLogs <- raw:
|
|
default:
|
|
// 버퍼 꽉 찼으면 드롭
|
|
}
|
|
}
|
|
|
|
// increaseCount 종목 구독 수 증가 → 0→1 시 키움 WS 구독 등록 (KRX + NXT 단일 REG)
|
|
func (h *Hub) increaseCount(code string) {
|
|
h.codeCounts[code]++
|
|
if h.codeCounts[code] == 1 {
|
|
h.kiwoomWS.SubscribePair(code) // KRX + NXT 단일 REG
|
|
}
|
|
}
|
|
|
|
// decreaseCount 종목 구독 수 감소 → 0 시 키움 WS 구독 해제 (KRX + NXT 단일 REMOVE)
|
|
func (h *Hub) decreaseCount(code string) {
|
|
h.codeCounts[code]--
|
|
if h.codeCounts[code] <= 0 {
|
|
delete(h.codeCounts, code)
|
|
h.kiwoomWS.UnsubscribePair(code) // KRX + NXT 단일 REMOVE
|
|
}
|
|
}
|
|
|
|
// broadcastToCode 특정 종목 구독 클라이언트에게만 메시지 전송
|
|
func (h *Hub) broadcastToCode(code string, msgType string, data interface{}) {
|
|
msg := models.WSMessage{
|
|
Type: msgType,
|
|
Code: code,
|
|
Data: data,
|
|
}
|
|
raw, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for client, codes := range h.clients {
|
|
if !codes[code] {
|
|
continue
|
|
}
|
|
select {
|
|
case client.send <- raw:
|
|
default:
|
|
// 슬로우 클라이언트 연결 해제
|
|
close(client.send)
|
|
delete(h.clients, client)
|
|
}
|
|
}
|
|
}
|
|
|
|
// broadcastToAll 모든 클라이언트에게 메시지 전송 (장운영 상태 등 전역 이벤트)
|
|
func (h *Hub) broadcastToAll(msgType string, data interface{}) {
|
|
msg := models.WSMessage{
|
|
Type: msgType,
|
|
Code: "",
|
|
Data: data,
|
|
}
|
|
raw, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for client := range h.clients {
|
|
select {
|
|
case client.send <- raw:
|
|
default:
|
|
close(client.send)
|
|
delete(h.clients, client)
|
|
}
|
|
}
|
|
}
|
|
|
|
// StartKiwoomWS 키움 WS 실시간 연결 시작
|
|
func (h *Hub) StartKiwoomWS() error {
|
|
return h.kiwoomWS.Connect()
|
|
}
|
|
|
|
// ServeWS HTTP 요청을 WebSocket으로 업그레이드 후 클라이언트 등록
|
|
func (h *Hub) ServeWS(w http.ResponseWriter, r *http.Request) {
|
|
conn, err := upgrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Printf("WebSocket 업그레이드 실패: %v", err)
|
|
return
|
|
}
|
|
|
|
client := &Client{
|
|
hub: h,
|
|
conn: conn,
|
|
send: make(chan []byte, 256),
|
|
}
|
|
h.register <- client
|
|
|
|
go client.writePump()
|
|
go client.readPump()
|
|
}
|