mirror of
https://github.com/golang/go
synced 2024-11-23 09:00:04 -07:00
148 lines
2.5 KiB
Go
148 lines
2.5 KiB
Go
|
// compile
|
||
|
|
||
|
// Copyright 2019 The Go Authors. All rights reserved.
|
||
|
// Use of this source code is governed by a BSD-style
|
||
|
// license that can be found in the LICENSE file.
|
||
|
|
||
|
// This code failed on arm64 in the register allocator.
|
||
|
// See issue 33355.
|
||
|
|
||
|
package server
|
||
|
|
||
|
import (
|
||
|
"bytes"
|
||
|
"sync"
|
||
|
)
|
||
|
|
||
|
type client struct {
|
||
|
junk [4]int
|
||
|
mu sync.Mutex
|
||
|
srv *Server
|
||
|
gw *gateway
|
||
|
msgb [100]byte
|
||
|
}
|
||
|
|
||
|
type gateway struct {
|
||
|
cfg *gatewayCfg
|
||
|
outsim *sync.Map
|
||
|
}
|
||
|
|
||
|
type gatewayCfg struct {
|
||
|
replyPfx []byte
|
||
|
}
|
||
|
|
||
|
type Account struct {
|
||
|
Name string
|
||
|
}
|
||
|
|
||
|
type Server struct {
|
||
|
gateway *srvGateway
|
||
|
}
|
||
|
|
||
|
type srvGateway struct {
|
||
|
outo []*client
|
||
|
}
|
||
|
|
||
|
type subscription struct {
|
||
|
queue []byte
|
||
|
client *client
|
||
|
}
|
||
|
|
||
|
type outsie struct {
|
||
|
ni map[string]struct{}
|
||
|
sl *Sublist
|
||
|
qsubs int
|
||
|
}
|
||
|
|
||
|
type Sublist struct {
|
||
|
}
|
||
|
|
||
|
type SublistResult struct {
|
||
|
psubs []*subscription
|
||
|
qsubs [][]*subscription
|
||
|
}
|
||
|
|
||
|
var subPool = &sync.Pool{}
|
||
|
|
||
|
func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) {
|
||
|
var gws []*client
|
||
|
gw := c.srv.gateway
|
||
|
for i := 0; i < len(gw.outo); i++ {
|
||
|
gws = append(gws, gw.outo[i])
|
||
|
}
|
||
|
var (
|
||
|
subj = string(subject)
|
||
|
queuesa = [512]byte{}
|
||
|
queues = queuesa[:0]
|
||
|
mreply []byte
|
||
|
dstPfx []byte
|
||
|
checkReply = len(reply) > 0
|
||
|
)
|
||
|
|
||
|
sub := subPool.Get().(*subscription)
|
||
|
|
||
|
if subjectStartsWithGatewayReplyPrefix(subject) {
|
||
|
dstPfx = subject[:8]
|
||
|
}
|
||
|
for i := 0; i < len(gws); i++ {
|
||
|
gwc := gws[i]
|
||
|
if dstPfx != nil {
|
||
|
gwc.mu.Lock()
|
||
|
ok := bytes.Equal(dstPfx, gwc.gw.cfg.replyPfx)
|
||
|
gwc.mu.Unlock()
|
||
|
if !ok {
|
||
|
continue
|
||
|
}
|
||
|
} else {
|
||
|
qr := gwc.gatewayInterest(acc.Name, subj)
|
||
|
queues = queuesa[:0]
|
||
|
for i := 0; i < len(qr.qsubs); i++ {
|
||
|
qsubs := qr.qsubs[i]
|
||
|
queue := qsubs[0].queue
|
||
|
add := true
|
||
|
for _, qn := range qgroups {
|
||
|
if bytes.Equal(queue, qn) {
|
||
|
add = false
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
if add {
|
||
|
qgroups = append(qgroups, queue)
|
||
|
}
|
||
|
}
|
||
|
if len(queues) == 0 {
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
if checkReply {
|
||
|
checkReply = false
|
||
|
mreply = reply
|
||
|
}
|
||
|
mh := c.msgb[:10]
|
||
|
mh = append(mh, subject...)
|
||
|
if len(queues) > 0 {
|
||
|
mh = append(mh, mreply...)
|
||
|
mh = append(mh, queues...)
|
||
|
}
|
||
|
sub.client = gwc
|
||
|
}
|
||
|
subPool.Put(sub)
|
||
|
}
|
||
|
|
||
|
func subjectStartsWithGatewayReplyPrefix(subj []byte) bool {
|
||
|
return len(subj) > 8 && string(subj[:4]) == "foob"
|
||
|
}
|
||
|
|
||
|
func (c *client) gatewayInterest(acc, subj string) *SublistResult {
|
||
|
ei, _ := c.gw.outsim.Load(acc)
|
||
|
var r *SublistResult
|
||
|
e := ei.(*outsie)
|
||
|
r = e.sl.Match(subj)
|
||
|
return r
|
||
|
}
|
||
|
|
||
|
func (s *Sublist) Match(subject string) *SublistResult {
|
||
|
return nil
|
||
|
}
|
||
|
|