mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2024-12-13 11:42:23 -05:00
1121 lines
32 KiB
Go
Vendored
1121 lines
32 KiB
Go
Vendored
// go implementation of upr client.
|
|
// See https://github.com/couchbaselabs/cbupr/blob/master/transport-spec.md
|
|
// TODO
|
|
// 1. Use a pool allocator to avoid garbage
|
|
package memcached
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/couchbase/gomemcached"
|
|
"github.com/couchbase/goutils/logging"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
const uprMutationExtraLen = 30
|
|
const uprDeletetionExtraLen = 18
|
|
const uprDeletetionWithDeletionTimeExtraLen = 21
|
|
const uprSnapshotExtraLen = 20
|
|
const dcpSystemEventExtraLen = 13
|
|
const bufferAckThreshold = 0.2
|
|
const opaqueOpen = 0xBEAF0001
|
|
const opaqueFailover = 0xDEADBEEF
|
|
const uprDefaultNoopInterval = 120
|
|
|
|
// Counter on top of opaqueOpen that others can draw from for open and control msgs
|
|
var opaqueOpenCtrlWell uint32 = opaqueOpen
|
|
|
|
type PriorityType string
|
|
|
|
// high > medium > disabled > low
|
|
const (
|
|
PriorityDisabled PriorityType = ""
|
|
PriorityLow PriorityType = "low"
|
|
PriorityMed PriorityType = "medium"
|
|
PriorityHigh PriorityType = "high"
|
|
)
|
|
|
|
type DcpStreamType int32
|
|
|
|
var UninitializedStream DcpStreamType = -1
|
|
|
|
const (
|
|
NonCollectionStream DcpStreamType = 0
|
|
CollectionsNonStreamId DcpStreamType = iota
|
|
CollectionsStreamId DcpStreamType = iota
|
|
)
|
|
|
|
func (t DcpStreamType) String() string {
|
|
switch t {
|
|
case UninitializedStream:
|
|
return "Un-Initialized Stream"
|
|
case NonCollectionStream:
|
|
return "Traditional Non-Collection Stream"
|
|
case CollectionsNonStreamId:
|
|
return "Collections Stream without StreamID"
|
|
case CollectionsStreamId:
|
|
return "Collection Stream with StreamID"
|
|
default:
|
|
return "Unknown Stream Type"
|
|
}
|
|
}
|
|
|
|
// UprStream is per stream data structure over an UPR Connection.
|
|
type UprStream struct {
|
|
Vbucket uint16 // Vbucket id
|
|
Vbuuid uint64 // vbucket uuid
|
|
StartSeq uint64 // start sequence number
|
|
EndSeq uint64 // end sequence number
|
|
connected bool
|
|
StreamType DcpStreamType
|
|
}
|
|
|
|
type FeedState int
|
|
|
|
const (
|
|
FeedStateInitial = iota
|
|
FeedStateOpened = iota
|
|
FeedStateClosed = iota
|
|
)
|
|
|
|
func (fs FeedState) String() string {
|
|
switch fs {
|
|
case FeedStateInitial:
|
|
return "Initial"
|
|
case FeedStateOpened:
|
|
return "Opened"
|
|
case FeedStateClosed:
|
|
return "Closed"
|
|
default:
|
|
return "Unknown"
|
|
}
|
|
}
|
|
|
|
const (
|
|
CompressionTypeStartMarker = iota // also means invalid
|
|
CompressionTypeNone = iota
|
|
CompressionTypeSnappy = iota
|
|
CompressionTypeEndMarker = iota // also means invalid
|
|
)
|
|
|
|
// kv_engine/include/mcbp/protocol/datatype.h
|
|
const (
|
|
JSONDataType uint8 = 1
|
|
SnappyDataType uint8 = 2
|
|
XattrDataType uint8 = 4
|
|
)
|
|
|
|
type UprFeatures struct {
|
|
Xattribute bool
|
|
CompressionType int
|
|
IncludeDeletionTime bool
|
|
DcpPriority PriorityType
|
|
EnableExpiry bool
|
|
EnableStreamId bool
|
|
}
|
|
|
|
/**
|
|
* Used to handle multiple concurrent calls UprRequestStream() by UprFeed clients
|
|
* It is expected that a client that calls UprRequestStream() more than once should issue
|
|
* different "opaque" (version) numbers
|
|
*/
|
|
type opaqueStreamMap map[uint16]*UprStream // opaque -> stream
|
|
|
|
type vbStreamNegotiator struct {
|
|
vbHandshakeMap map[uint16]opaqueStreamMap // vbno -> opaqueStreamMap
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
func (negotiator *vbStreamNegotiator) initialize() {
|
|
negotiator.mutex.Lock()
|
|
negotiator.vbHandshakeMap = make(map[uint16]opaqueStreamMap)
|
|
negotiator.mutex.Unlock()
|
|
}
|
|
|
|
func (negotiator *vbStreamNegotiator) registerRequest(vbno, opaque uint16, vbuuid, startSequence, endSequence uint64) {
|
|
negotiator.mutex.Lock()
|
|
defer negotiator.mutex.Unlock()
|
|
|
|
var osMap opaqueStreamMap
|
|
var ok bool
|
|
if osMap, ok = negotiator.vbHandshakeMap[vbno]; !ok {
|
|
osMap = make(opaqueStreamMap)
|
|
negotiator.vbHandshakeMap[vbno] = osMap
|
|
}
|
|
|
|
if _, ok = osMap[opaque]; !ok {
|
|
osMap[opaque] = &UprStream{
|
|
Vbucket: vbno,
|
|
Vbuuid: vbuuid,
|
|
StartSeq: startSequence,
|
|
EndSeq: endSequence,
|
|
}
|
|
}
|
|
}
|
|
|
|
func (negotiator *vbStreamNegotiator) getStreamsCntFromMap(vbno uint16) int {
|
|
negotiator.mutex.RLock()
|
|
defer negotiator.mutex.RUnlock()
|
|
|
|
osmap, ok := negotiator.vbHandshakeMap[vbno]
|
|
if !ok {
|
|
return 0
|
|
} else {
|
|
return len(osmap)
|
|
}
|
|
}
|
|
|
|
func (negotiator *vbStreamNegotiator) getStreamFromMap(vbno, opaque uint16) (*UprStream, error) {
|
|
negotiator.mutex.RLock()
|
|
defer negotiator.mutex.RUnlock()
|
|
|
|
osmap, ok := negotiator.vbHandshakeMap[vbno]
|
|
if !ok {
|
|
return nil, fmt.Errorf("Error: stream for vb: %v does not exist", vbno)
|
|
}
|
|
|
|
stream, ok := osmap[opaque]
|
|
if !ok {
|
|
return nil, fmt.Errorf("Error: stream for vb: %v opaque: %v does not exist", vbno, opaque)
|
|
}
|
|
return stream, nil
|
|
}
|
|
|
|
func (negotiator *vbStreamNegotiator) deleteStreamFromMap(vbno, opaque uint16) {
|
|
negotiator.mutex.Lock()
|
|
defer negotiator.mutex.Unlock()
|
|
|
|
osmap, ok := negotiator.vbHandshakeMap[vbno]
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
delete(osmap, opaque)
|
|
if len(osmap) == 0 {
|
|
delete(negotiator.vbHandshakeMap, vbno)
|
|
}
|
|
}
|
|
|
|
func (negotiator *vbStreamNegotiator) handleStreamRequest(feed *UprFeed,
|
|
headerBuf [gomemcached.HDR_LEN]byte, pktPtr *gomemcached.MCRequest, bytesReceivedFromDCP int,
|
|
response *gomemcached.MCResponse) (*UprEvent, error) {
|
|
var event *UprEvent
|
|
|
|
if feed == nil || response == nil || pktPtr == nil {
|
|
return nil, errors.New("Invalid inputs")
|
|
}
|
|
|
|
// Get Stream from negotiator map
|
|
vbno := vbOpaque(response.Opaque)
|
|
opaque := appOpaque(response.Opaque)
|
|
|
|
stream, err := negotiator.getStreamFromMap(vbno, opaque)
|
|
if err != nil {
|
|
err = fmt.Errorf("Stream not found for vb %d: %#v", vbno, *pktPtr)
|
|
logging.Errorf(err.Error())
|
|
return nil, err
|
|
}
|
|
|
|
status, rb, flog, err := handleStreamRequest(response, headerBuf[:])
|
|
|
|
if status == gomemcached.ROLLBACK {
|
|
event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP)
|
|
event.Status = status
|
|
// rollback stream
|
|
logging.Infof("UPR_STREAMREQ with rollback %d for vb %d Failed: %v", rb, vbno, err)
|
|
negotiator.deleteStreamFromMap(vbno, opaque)
|
|
} else if status == gomemcached.SUCCESS {
|
|
event = makeUprEvent(*pktPtr, stream, bytesReceivedFromDCP)
|
|
event.Seqno = stream.StartSeq
|
|
event.FailoverLog = flog
|
|
event.Status = status
|
|
feed.activateStream(vbno, opaque, stream)
|
|
feed.negotiator.deleteStreamFromMap(vbno, opaque)
|
|
logging.Infof("UPR_STREAMREQ for vb %d successful", vbno)
|
|
|
|
} else if err != nil {
|
|
logging.Errorf("UPR_STREAMREQ for vbucket %d erro %s", vbno, err.Error())
|
|
event = &UprEvent{
|
|
Opcode: gomemcached.UPR_STREAMREQ,
|
|
Status: status,
|
|
VBucket: vbno,
|
|
Error: err,
|
|
}
|
|
negotiator.deleteStreamFromMap(vbno, opaque)
|
|
}
|
|
return event, nil
|
|
}
|
|
|
|
func (negotiator *vbStreamNegotiator) cleanUpVbStreams(vbno uint16) {
|
|
negotiator.mutex.Lock()
|
|
defer negotiator.mutex.Unlock()
|
|
|
|
delete(negotiator.vbHandshakeMap, vbno)
|
|
}
|
|
|
|
// UprFeed represents an UPR feed. A feed contains a connection to a single
|
|
// host and multiple vBuckets
|
|
type UprFeed struct {
|
|
// lock for feed.vbstreams
|
|
muVbstreams sync.RWMutex
|
|
C <-chan *UprEvent // Exported channel for receiving UPR events
|
|
negotiator vbStreamNegotiator // Used for pre-vbstreams, concurrent vb stream negotiation
|
|
vbstreams map[uint16]*UprStream // official live vb->stream mapping
|
|
closer chan bool // closer
|
|
conn *Client // connection to UPR producer
|
|
Error error // error
|
|
bytesRead uint64 // total bytes read on this connection
|
|
toAckBytes uint32 // bytes client has read
|
|
maxAckBytes uint32 // Max buffer control ack bytes
|
|
stats UprStats // Stats for upr client
|
|
transmitCh chan *gomemcached.MCRequest // transmit command channel
|
|
transmitCl chan bool // closer channel for transmit go-routine
|
|
// if flag is true, upr feed will use ack from client to determine whether/when to send ack to DCP
|
|
// if flag is false, upr feed will track how many bytes it has sent to client
|
|
// and use that to determine whether/when to send ack to DCP
|
|
ackByClient bool
|
|
feedState FeedState
|
|
muFeedState sync.RWMutex
|
|
activatedFeatures UprFeatures
|
|
collectionEnabled bool // This is needed separately because parsing depends on this
|
|
// DCP StreamID allows multiple filtered collection streams to share a single DCP Stream
|
|
// It is not allowed once a regular/legacy stream was started originally
|
|
streamsType DcpStreamType
|
|
initStreamTypeOnce sync.Once
|
|
}
|
|
|
|
// Exported interface - to allow for mocking
|
|
type UprFeedIface interface {
|
|
Close()
|
|
Closed() bool
|
|
CloseStream(vbno, opaqueMSB uint16) error
|
|
GetError() error
|
|
GetUprStats() *UprStats
|
|
ClientAck(event *UprEvent) error
|
|
GetUprEventCh() <-chan *UprEvent
|
|
StartFeed() error
|
|
StartFeedWithConfig(datachan_len int) error
|
|
UprOpen(name string, sequence uint32, bufSize uint32) error
|
|
UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error
|
|
UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures)
|
|
UprRequestStream(vbno, opaqueMSB uint16, flags uint32, vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error
|
|
// Set DCP priority on an existing DCP connection. The command is sent asynchronously without waiting for a response
|
|
SetPriorityAsync(p PriorityType) error
|
|
|
|
// Various Collection-Type RequestStreams
|
|
UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32, vbuuid, startSeq, endSeq, snapStart, snapEnd uint64, filter *CollectionsFilter) error
|
|
}
|
|
|
|
type UprStats struct {
|
|
TotalBytes uint64
|
|
TotalMutation uint64
|
|
TotalBufferAckSent uint64
|
|
TotalSnapShot uint64
|
|
}
|
|
|
|
// error codes
|
|
var ErrorInvalidLog = errors.New("couchbase.errorInvalidLog")
|
|
|
|
func (flogp *FailoverLog) Latest() (vbuuid, seqno uint64, err error) {
|
|
if flogp != nil {
|
|
flog := *flogp
|
|
latest := flog[len(flog)-1]
|
|
return latest[0], latest[1], nil
|
|
}
|
|
return vbuuid, seqno, ErrorInvalidLog
|
|
}
|
|
|
|
func (feed *UprFeed) sendCommands(mc *Client) {
|
|
transmitCh := feed.transmitCh
|
|
transmitCl := feed.transmitCl
|
|
loop:
|
|
for {
|
|
select {
|
|
case command := <-transmitCh:
|
|
if err := mc.Transmit(command); err != nil {
|
|
logging.Errorf("Failed to transmit command %s. Error %s", command.Opcode.String(), err.Error())
|
|
// get feed to close and runFeed routine to exit
|
|
feed.Close()
|
|
break loop
|
|
}
|
|
|
|
case <-transmitCl:
|
|
break loop
|
|
}
|
|
}
|
|
|
|
// After sendCommands exits, write to transmitCh will block forever
|
|
// when we write to transmitCh, e.g., at CloseStream(), we need to check feed closure to have an exit route
|
|
|
|
logging.Infof("sendCommands exiting")
|
|
}
|
|
|
|
// Sets the specified stream as the connected stream for this vbno, and also cleans up negotiator
|
|
func (feed *UprFeed) activateStream(vbno, opaque uint16, stream *UprStream) error {
|
|
feed.muVbstreams.Lock()
|
|
defer feed.muVbstreams.Unlock()
|
|
|
|
if feed.collectionEnabled {
|
|
stream.StreamType = feed.streamsType
|
|
}
|
|
|
|
// Set this stream as the officially connected stream for this vb
|
|
stream.connected = true
|
|
feed.vbstreams[vbno] = stream
|
|
return nil
|
|
}
|
|
|
|
func (feed *UprFeed) cleanUpVbStream(vbno uint16) {
|
|
feed.muVbstreams.Lock()
|
|
defer feed.muVbstreams.Unlock()
|
|
|
|
delete(feed.vbstreams, vbno)
|
|
}
|
|
|
|
// NewUprFeed creates a new UPR Feed.
|
|
// TODO: Describe side-effects on bucket instance and its connection pool.
|
|
func (mc *Client) NewUprFeed() (*UprFeed, error) {
|
|
return mc.NewUprFeedWithConfig(false /*ackByClient*/)
|
|
}
|
|
|
|
func (mc *Client) NewUprFeedWithConfig(ackByClient bool) (*UprFeed, error) {
|
|
feed := &UprFeed{
|
|
conn: mc,
|
|
closer: make(chan bool, 1),
|
|
vbstreams: make(map[uint16]*UprStream),
|
|
transmitCh: make(chan *gomemcached.MCRequest),
|
|
transmitCl: make(chan bool),
|
|
ackByClient: ackByClient,
|
|
collectionEnabled: mc.CollectionEnabled(),
|
|
streamsType: UninitializedStream,
|
|
}
|
|
|
|
feed.negotiator.initialize()
|
|
|
|
go feed.sendCommands(mc)
|
|
return feed, nil
|
|
}
|
|
|
|
func (mc *Client) NewUprFeedIface() (UprFeedIface, error) {
|
|
return mc.NewUprFeed()
|
|
}
|
|
|
|
func (mc *Client) NewUprFeedWithConfigIface(ackByClient bool) (UprFeedIface, error) {
|
|
return mc.NewUprFeedWithConfig(ackByClient)
|
|
}
|
|
|
|
func doUprOpen(mc *Client, name string, sequence uint32, features UprFeatures) error {
|
|
rq := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_OPEN,
|
|
Key: []byte(name),
|
|
Opaque: getUprOpenCtrlOpaque(),
|
|
}
|
|
|
|
rq.Extras = make([]byte, 8)
|
|
binary.BigEndian.PutUint32(rq.Extras[:4], sequence)
|
|
|
|
// opens a producer type connection
|
|
flags := gomemcached.DCP_PRODUCER
|
|
if features.Xattribute {
|
|
flags = flags | gomemcached.DCP_OPEN_INCLUDE_XATTRS
|
|
}
|
|
if features.IncludeDeletionTime {
|
|
flags = flags | gomemcached.DCP_OPEN_INCLUDE_DELETE_TIMES
|
|
}
|
|
binary.BigEndian.PutUint32(rq.Extras[4:], flags)
|
|
|
|
return sendMcRequestSync(mc, rq)
|
|
}
|
|
|
|
// Synchronously send a memcached request and wait for the response
|
|
func sendMcRequestSync(mc *Client, req *gomemcached.MCRequest) error {
|
|
if err := mc.Transmit(req); err != nil {
|
|
return err
|
|
}
|
|
|
|
if res, err := mc.Receive(); err != nil {
|
|
return err
|
|
} else if req.Opcode != res.Opcode {
|
|
return fmt.Errorf("unexpected #opcode sent %v received %v", req.Opcode, res.Opaque)
|
|
} else if req.Opaque != res.Opaque {
|
|
return fmt.Errorf("opaque mismatch, sent %v received %v", req.Opaque, res.Opaque)
|
|
} else if res.Status != gomemcached.SUCCESS {
|
|
return fmt.Errorf("error %v", res.Status)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UprOpen to connect with a UPR producer.
|
|
// Name: name of te UPR connection
|
|
// sequence: sequence number for the connection
|
|
// bufsize: max size of the application
|
|
func (feed *UprFeed) UprOpen(name string, sequence uint32, bufSize uint32) error {
|
|
var allFeaturesDisabled UprFeatures
|
|
err, _ := feed.uprOpen(name, sequence, bufSize, allFeaturesDisabled)
|
|
return err
|
|
}
|
|
|
|
// UprOpen with XATTR enabled.
|
|
func (feed *UprFeed) UprOpenWithXATTR(name string, sequence uint32, bufSize uint32) error {
|
|
var onlyXattrEnabled UprFeatures
|
|
onlyXattrEnabled.Xattribute = true
|
|
err, _ := feed.uprOpen(name, sequence, bufSize, onlyXattrEnabled)
|
|
return err
|
|
}
|
|
|
|
func (feed *UprFeed) UprOpenWithFeatures(name string, sequence uint32, bufSize uint32, features UprFeatures) (error, UprFeatures) {
|
|
return feed.uprOpen(name, sequence, bufSize, features)
|
|
}
|
|
|
|
func (feed *UprFeed) SetPriorityAsync(p PriorityType) error {
|
|
if !feed.isOpen() {
|
|
// do not send this command if upr feed is not yet open, otherwise it may interfere with
|
|
// feed start up process, which relies on synchronous message exchange with DCP.
|
|
return fmt.Errorf("Upr feed is not open. State=%v", feed.getState())
|
|
}
|
|
|
|
return feed.setPriority(p, false /*sync*/)
|
|
}
|
|
|
|
func (feed *UprFeed) setPriority(p PriorityType, sync bool) error {
|
|
rq := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_CONTROL,
|
|
Key: []byte("set_priority"),
|
|
Body: []byte(p),
|
|
Opaque: getUprOpenCtrlOpaque(),
|
|
}
|
|
if sync {
|
|
return sendMcRequestSync(feed.conn, rq)
|
|
} else {
|
|
return feed.writeToTransmitCh(rq)
|
|
|
|
}
|
|
}
|
|
|
|
func (feed *UprFeed) uprOpen(name string, sequence uint32, bufSize uint32, features UprFeatures) (err error, activatedFeatures UprFeatures) {
|
|
mc := feed.conn
|
|
|
|
// First set this to an invalid value to state that the method hasn't gotten to executing this control yet
|
|
activatedFeatures.CompressionType = CompressionTypeEndMarker
|
|
|
|
if err = doUprOpen(mc, name, sequence, features); err != nil {
|
|
return
|
|
}
|
|
|
|
activatedFeatures.Xattribute = features.Xattribute
|
|
|
|
// send a UPR control message to set the window size for the this connection
|
|
if bufSize > 0 {
|
|
rq := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_CONTROL,
|
|
Key: []byte("connection_buffer_size"),
|
|
Body: []byte(strconv.Itoa(int(bufSize))),
|
|
Opaque: getUprOpenCtrlOpaque(),
|
|
}
|
|
err = sendMcRequestSync(feed.conn, rq)
|
|
if err != nil {
|
|
return
|
|
}
|
|
feed.maxAckBytes = uint32(bufferAckThreshold * float32(bufSize))
|
|
}
|
|
|
|
// enable noop and set noop interval
|
|
rq := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_CONTROL,
|
|
Key: []byte("enable_noop"),
|
|
Body: []byte("true"),
|
|
Opaque: getUprOpenCtrlOpaque(),
|
|
}
|
|
err = sendMcRequestSync(feed.conn, rq)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
rq = &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_CONTROL,
|
|
Key: []byte("set_noop_interval"),
|
|
Body: []byte(strconv.Itoa(int(uprDefaultNoopInterval))),
|
|
Opaque: getUprOpenCtrlOpaque(),
|
|
}
|
|
err = sendMcRequestSync(feed.conn, rq)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
if features.CompressionType == CompressionTypeSnappy {
|
|
activatedFeatures.CompressionType = CompressionTypeNone
|
|
rq = &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_CONTROL,
|
|
Key: []byte("force_value_compression"),
|
|
Body: []byte("true"),
|
|
Opaque: getUprOpenCtrlOpaque(),
|
|
}
|
|
err = sendMcRequestSync(feed.conn, rq)
|
|
} else if features.CompressionType == CompressionTypeEndMarker {
|
|
err = fmt.Errorf("UPR_CONTROL Failed - Invalid CompressionType: %v", features.CompressionType)
|
|
}
|
|
if err != nil {
|
|
return
|
|
}
|
|
activatedFeatures.CompressionType = features.CompressionType
|
|
|
|
if features.DcpPriority != PriorityDisabled {
|
|
err = feed.setPriority(features.DcpPriority, true /*sync*/)
|
|
if err == nil {
|
|
activatedFeatures.DcpPriority = features.DcpPriority
|
|
} else {
|
|
return
|
|
}
|
|
}
|
|
|
|
if features.EnableExpiry {
|
|
rq := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_CONTROL,
|
|
Key: []byte("enable_expiry_opcode"),
|
|
Body: []byte("true"),
|
|
Opaque: getUprOpenCtrlOpaque(),
|
|
}
|
|
err = sendMcRequestSync(feed.conn, rq)
|
|
if err != nil {
|
|
return
|
|
}
|
|
activatedFeatures.EnableExpiry = true
|
|
}
|
|
|
|
if features.EnableStreamId {
|
|
rq := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_CONTROL,
|
|
Key: []byte("enable_stream_id"),
|
|
Body: []byte("true"),
|
|
Opaque: getUprOpenCtrlOpaque(),
|
|
}
|
|
err = sendMcRequestSync(feed.conn, rq)
|
|
if err != nil {
|
|
return
|
|
}
|
|
activatedFeatures.EnableStreamId = true
|
|
}
|
|
|
|
// everything is ok so far, set upr feed to open state
|
|
feed.activatedFeatures = activatedFeatures
|
|
feed.setOpen()
|
|
return
|
|
}
|
|
|
|
// UprGetFailoverLog for given list of vbuckets.
|
|
func (mc *Client) UprGetFailoverLog(
|
|
vb []uint16) (map[uint16]*FailoverLog, error) {
|
|
|
|
rq := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_FAILOVERLOG,
|
|
Opaque: opaqueFailover,
|
|
}
|
|
|
|
var allFeaturesDisabled UprFeatures
|
|
if err := doUprOpen(mc, "FailoverLog", 0, allFeaturesDisabled); err != nil {
|
|
return nil, fmt.Errorf("UPR_OPEN Failed %s", err.Error())
|
|
}
|
|
|
|
failoverLogs := make(map[uint16]*FailoverLog)
|
|
for _, vBucket := range vb {
|
|
rq.VBucket = vBucket
|
|
if err := mc.Transmit(rq); err != nil {
|
|
return nil, err
|
|
}
|
|
res, err := mc.Receive()
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to receive %s", err.Error())
|
|
} else if res.Opcode != gomemcached.UPR_FAILOVERLOG || res.Status != gomemcached.SUCCESS {
|
|
return nil, fmt.Errorf("unexpected #opcode %v", res.Opcode)
|
|
}
|
|
|
|
flog, err := parseFailoverLog(res.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to parse failover logs for vb %d", vb)
|
|
}
|
|
failoverLogs[vBucket] = flog
|
|
}
|
|
|
|
return failoverLogs, nil
|
|
}
|
|
|
|
// UprRequestStream for a single vbucket.
|
|
func (feed *UprFeed) UprRequestStream(vbno, opaqueMSB uint16, flags uint32,
|
|
vuuid, startSequence, endSequence, snapStart, snapEnd uint64) error {
|
|
|
|
return feed.UprRequestCollectionsStream(vbno, opaqueMSB, flags, vuuid, startSequence, endSequence, snapStart, snapEnd, nil)
|
|
}
|
|
|
|
func (feed *UprFeed) initStreamType(filter *CollectionsFilter) (err error) {
|
|
if filter != nil && filter.UseStreamId && !feed.activatedFeatures.EnableStreamId {
|
|
err = fmt.Errorf("Cannot use streamID based filter if the feed was not started with the streamID feature")
|
|
return
|
|
}
|
|
|
|
streamInitFunc := func() {
|
|
if feed.streamsType != UninitializedStream {
|
|
// Shouldn't happen
|
|
err = fmt.Errorf("The current feed has already been started in %v mode", feed.streamsType.String())
|
|
} else {
|
|
if !feed.collectionEnabled {
|
|
feed.streamsType = NonCollectionStream
|
|
} else {
|
|
if filter != nil && filter.UseStreamId {
|
|
feed.streamsType = CollectionsStreamId
|
|
} else {
|
|
feed.streamsType = CollectionsNonStreamId
|
|
}
|
|
}
|
|
}
|
|
}
|
|
feed.initStreamTypeOnce.Do(streamInitFunc)
|
|
return
|
|
}
|
|
|
|
func (feed *UprFeed) UprRequestCollectionsStream(vbno, opaqueMSB uint16, flags uint32,
|
|
vbuuid, startSequence, endSequence, snapStart, snapEnd uint64, filter *CollectionsFilter) error {
|
|
|
|
err := feed.initStreamType(filter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var mcRequestBody []byte
|
|
if filter != nil {
|
|
err = filter.IsValid()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mcRequestBody, err = filter.ToStreamReqBody()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
rq := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_STREAMREQ,
|
|
VBucket: vbno,
|
|
Opaque: composeOpaque(vbno, opaqueMSB),
|
|
Body: mcRequestBody,
|
|
}
|
|
|
|
rq.Extras = make([]byte, 48) // #Extras
|
|
binary.BigEndian.PutUint32(rq.Extras[:4], flags)
|
|
binary.BigEndian.PutUint32(rq.Extras[4:8], uint32(0))
|
|
binary.BigEndian.PutUint64(rq.Extras[8:16], startSequence)
|
|
binary.BigEndian.PutUint64(rq.Extras[16:24], endSequence)
|
|
binary.BigEndian.PutUint64(rq.Extras[24:32], vbuuid)
|
|
binary.BigEndian.PutUint64(rq.Extras[32:40], snapStart)
|
|
binary.BigEndian.PutUint64(rq.Extras[40:48], snapEnd)
|
|
|
|
feed.negotiator.registerRequest(vbno, opaqueMSB, vbuuid, startSequence, endSequence)
|
|
// Any client that has ever called this method, regardless of return code,
|
|
// should expect a potential UPR_CLOSESTREAM message due to this new map entry prior to Transmit.
|
|
|
|
if err = feed.conn.Transmit(rq); err != nil {
|
|
logging.Errorf("Error in StreamRequest %s", err.Error())
|
|
// If an error occurs during transmit, then the UPRFeed will keep the stream
|
|
// in the vbstreams map. This is to prevent nil lookup from any previously
|
|
// sent stream requests.
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CloseStream for specified vbucket.
|
|
func (feed *UprFeed) CloseStream(vbno, opaqueMSB uint16) error {
|
|
|
|
err := feed.validateCloseStream(vbno)
|
|
if err != nil {
|
|
logging.Infof("CloseStream for %v has been skipped because of error %v", vbno, err)
|
|
return err
|
|
}
|
|
|
|
closeStream := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_CLOSESTREAM,
|
|
VBucket: vbno,
|
|
Opaque: composeOpaque(vbno, opaqueMSB),
|
|
}
|
|
|
|
feed.writeToTransmitCh(closeStream)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (feed *UprFeed) GetUprEventCh() <-chan *UprEvent {
|
|
return feed.C
|
|
}
|
|
|
|
func (feed *UprFeed) GetError() error {
|
|
return feed.Error
|
|
}
|
|
|
|
func (feed *UprFeed) validateCloseStream(vbno uint16) error {
|
|
feed.muVbstreams.RLock()
|
|
nilVbStream := feed.vbstreams[vbno] == nil
|
|
feed.muVbstreams.RUnlock()
|
|
|
|
if nilVbStream && (feed.negotiator.getStreamsCntFromMap(vbno) == 0) {
|
|
return fmt.Errorf("Stream for vb %d has not been requested", vbno)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (feed *UprFeed) writeToTransmitCh(rq *gomemcached.MCRequest) error {
|
|
// write to transmitCh may block forever if sendCommands has exited
|
|
// check for feed closure to have an exit route in this case
|
|
select {
|
|
case <-feed.closer:
|
|
errMsg := fmt.Sprintf("Abort sending request to transmitCh because feed has been closed. request=%v", rq)
|
|
logging.Infof(errMsg)
|
|
return errors.New(errMsg)
|
|
case feed.transmitCh <- rq:
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// StartFeed to start the upper feed.
|
|
func (feed *UprFeed) StartFeed() error {
|
|
return feed.StartFeedWithConfig(10)
|
|
}
|
|
|
|
func (feed *UprFeed) StartFeedWithConfig(datachan_len int) error {
|
|
ch := make(chan *UprEvent, datachan_len)
|
|
feed.C = ch
|
|
go feed.runFeed(ch)
|
|
return nil
|
|
}
|
|
|
|
func parseFailoverLog(body []byte) (*FailoverLog, error) {
|
|
|
|
if len(body)%16 != 0 {
|
|
err := fmt.Errorf("invalid body length %v, in failover-log", len(body))
|
|
return nil, err
|
|
}
|
|
log := make(FailoverLog, len(body)/16)
|
|
for i, j := 0, 0; i < len(body); i += 16 {
|
|
vuuid := binary.BigEndian.Uint64(body[i : i+8])
|
|
seqno := binary.BigEndian.Uint64(body[i+8 : i+16])
|
|
log[j] = [2]uint64{vuuid, seqno}
|
|
j++
|
|
}
|
|
return &log, nil
|
|
}
|
|
|
|
func handleStreamRequest(
|
|
res *gomemcached.MCResponse,
|
|
headerBuf []byte,
|
|
) (gomemcached.Status, uint64, *FailoverLog, error) {
|
|
|
|
var rollback uint64
|
|
var err error
|
|
|
|
switch {
|
|
case res.Status == gomemcached.ROLLBACK:
|
|
logging.Infof("Rollback response. body=%v, headerBuf=%v\n", res.Body, headerBuf)
|
|
rollback = binary.BigEndian.Uint64(res.Body)
|
|
logging.Infof("Rollback seqno is %v for response with opaque %v\n", rollback, res.Opaque)
|
|
return res.Status, rollback, nil, nil
|
|
|
|
case res.Status != gomemcached.SUCCESS:
|
|
err = fmt.Errorf("unexpected status %v for response with opaque %v", res.Status, res.Opaque)
|
|
return res.Status, 0, nil, err
|
|
}
|
|
|
|
flog, err := parseFailoverLog(res.Body[:])
|
|
return res.Status, rollback, flog, err
|
|
}
|
|
|
|
// generate stream end responses for all active vb streams
|
|
func (feed *UprFeed) doStreamClose(ch chan *UprEvent) {
|
|
feed.muVbstreams.RLock()
|
|
|
|
uprEvents := make([]*UprEvent, len(feed.vbstreams))
|
|
index := 0
|
|
for vbno, stream := range feed.vbstreams {
|
|
uprEvent := &UprEvent{
|
|
VBucket: vbno,
|
|
VBuuid: stream.Vbuuid,
|
|
Opcode: gomemcached.UPR_STREAMEND,
|
|
}
|
|
uprEvents[index] = uprEvent
|
|
index++
|
|
}
|
|
|
|
// release the lock before sending uprEvents to ch, which may block
|
|
feed.muVbstreams.RUnlock()
|
|
|
|
loop:
|
|
for _, uprEvent := range uprEvents {
|
|
select {
|
|
case ch <- uprEvent:
|
|
case <-feed.closer:
|
|
logging.Infof("Feed has been closed. Aborting doStreamClose.")
|
|
break loop
|
|
}
|
|
}
|
|
}
|
|
|
|
func (feed *UprFeed) runFeed(ch chan *UprEvent) {
|
|
defer close(ch)
|
|
var headerBuf [gomemcached.HDR_LEN]byte
|
|
var pkt gomemcached.MCRequest
|
|
var event *UprEvent
|
|
|
|
mc := feed.conn.Hijack()
|
|
uprStats := &feed.stats
|
|
|
|
loop:
|
|
for {
|
|
select {
|
|
case <-feed.closer:
|
|
logging.Infof("Feed has been closed. Exiting.")
|
|
break loop
|
|
default:
|
|
bytes, err := pkt.Receive(mc, headerBuf[:])
|
|
if err != nil {
|
|
logging.Errorf("Error in receive %s", err.Error())
|
|
feed.Error = err
|
|
// send all the stream close messages to the client
|
|
feed.doStreamClose(ch)
|
|
break loop
|
|
} else {
|
|
event = nil
|
|
res := &gomemcached.MCResponse{
|
|
Opcode: pkt.Opcode,
|
|
Cas: pkt.Cas,
|
|
Opaque: pkt.Opaque,
|
|
Status: gomemcached.Status(pkt.VBucket),
|
|
Extras: pkt.Extras,
|
|
Key: pkt.Key,
|
|
Body: pkt.Body,
|
|
}
|
|
|
|
vb := vbOpaque(pkt.Opaque)
|
|
appOpaque := appOpaque(pkt.Opaque)
|
|
uprStats.TotalBytes = uint64(bytes)
|
|
|
|
feed.muVbstreams.RLock()
|
|
stream := feed.vbstreams[vb]
|
|
feed.muVbstreams.RUnlock()
|
|
|
|
switch pkt.Opcode {
|
|
case gomemcached.UPR_STREAMREQ:
|
|
event, err = feed.negotiator.handleStreamRequest(feed, headerBuf, &pkt, bytes, res)
|
|
if err != nil {
|
|
logging.Infof(err.Error())
|
|
break loop
|
|
}
|
|
case gomemcached.UPR_MUTATION,
|
|
gomemcached.UPR_DELETION,
|
|
gomemcached.UPR_EXPIRATION:
|
|
if stream == nil {
|
|
logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
|
|
break loop
|
|
}
|
|
event = makeUprEvent(pkt, stream, bytes)
|
|
uprStats.TotalMutation++
|
|
|
|
case gomemcached.UPR_STREAMEND:
|
|
if stream == nil {
|
|
logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
|
|
break loop
|
|
}
|
|
//stream has ended
|
|
event = makeUprEvent(pkt, stream, bytes)
|
|
logging.Infof("Stream Ended for vb %d", vb)
|
|
|
|
feed.negotiator.deleteStreamFromMap(vb, appOpaque)
|
|
feed.cleanUpVbStream(vb)
|
|
|
|
case gomemcached.UPR_SNAPSHOT:
|
|
if stream == nil {
|
|
logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
|
|
break loop
|
|
}
|
|
// snapshot marker
|
|
event = makeUprEvent(pkt, stream, bytes)
|
|
uprStats.TotalSnapShot++
|
|
|
|
case gomemcached.UPR_FLUSH:
|
|
if stream == nil {
|
|
logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
|
|
break loop
|
|
}
|
|
// special processing for flush ?
|
|
event = makeUprEvent(pkt, stream, bytes)
|
|
|
|
case gomemcached.UPR_CLOSESTREAM:
|
|
if stream == nil {
|
|
logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
|
|
break loop
|
|
}
|
|
event = makeUprEvent(pkt, stream, bytes)
|
|
event.Opcode = gomemcached.UPR_STREAMEND // opcode re-write !!
|
|
logging.Infof("Stream Closed for vb %d StreamEnd simulated", vb)
|
|
|
|
feed.negotiator.deleteStreamFromMap(vb, appOpaque)
|
|
feed.cleanUpVbStream(vb)
|
|
|
|
case gomemcached.UPR_ADDSTREAM:
|
|
logging.Infof("Opcode %v not implemented", pkt.Opcode)
|
|
|
|
case gomemcached.UPR_CONTROL, gomemcached.UPR_BUFFERACK:
|
|
if res.Status != gomemcached.SUCCESS {
|
|
logging.Infof("Opcode %v received status %d", pkt.Opcode.String(), res.Status)
|
|
}
|
|
|
|
case gomemcached.UPR_NOOP:
|
|
// send a NOOP back
|
|
noop := &gomemcached.MCResponse{
|
|
Opcode: gomemcached.UPR_NOOP,
|
|
Opaque: pkt.Opaque,
|
|
}
|
|
|
|
if err := feed.conn.TransmitResponse(noop); err != nil {
|
|
logging.Warnf("failed to transmit command %s. Error %s", noop.Opcode.String(), err.Error())
|
|
}
|
|
case gomemcached.DCP_SYSTEM_EVENT:
|
|
if stream == nil {
|
|
logging.Infof("Stream not found for vb %d: %#v", vb, pkt)
|
|
break loop
|
|
}
|
|
event = makeUprEvent(pkt, stream, bytes)
|
|
default:
|
|
logging.Infof("Recived an unknown response for vbucket %d", vb)
|
|
}
|
|
}
|
|
|
|
if event != nil {
|
|
select {
|
|
case ch <- event:
|
|
case <-feed.closer:
|
|
logging.Infof("Feed has been closed. Skip sending events. Exiting.")
|
|
break loop
|
|
}
|
|
|
|
feed.muVbstreams.RLock()
|
|
l := len(feed.vbstreams)
|
|
feed.muVbstreams.RUnlock()
|
|
|
|
if event.Opcode == gomemcached.UPR_CLOSESTREAM && l == 0 {
|
|
logging.Infof("No more streams")
|
|
}
|
|
}
|
|
|
|
if !feed.ackByClient {
|
|
// if client does not ack, do the ack check now
|
|
feed.sendBufferAckIfNeeded(event)
|
|
}
|
|
}
|
|
}
|
|
|
|
// make sure that feed is closed before we signal transmitCl and exit runFeed
|
|
feed.Close()
|
|
|
|
close(feed.transmitCl)
|
|
logging.Infof("runFeed exiting")
|
|
}
|
|
|
|
// Client, after completing processing of an UprEvent, need to call this API to notify UprFeed,
|
|
// so that UprFeed can update its ack bytes stats and send ack to DCP if needed
|
|
// Client needs to set ackByClient flag to true in NewUprFeedWithConfig() call as a prerequisite for this call to work
|
|
// This API is not thread safe. Caller should NOT have more than one go rountine calling this API
|
|
func (feed *UprFeed) ClientAck(event *UprEvent) error {
|
|
if !feed.ackByClient {
|
|
return errors.New("Upr feed does not have ackByclient flag set")
|
|
}
|
|
feed.sendBufferAckIfNeeded(event)
|
|
return nil
|
|
}
|
|
|
|
// increment ack bytes if the event needs to be acked to DCP
|
|
// send buffer ack if enough ack bytes have been accumulated
|
|
func (feed *UprFeed) sendBufferAckIfNeeded(event *UprEvent) {
|
|
if event == nil || event.AckSize == 0 {
|
|
// this indicates that there is no need to ack to DCP
|
|
return
|
|
}
|
|
|
|
totalBytes := feed.toAckBytes + event.AckSize
|
|
if totalBytes > feed.maxAckBytes {
|
|
feed.toAckBytes = 0
|
|
feed.sendBufferAck(totalBytes)
|
|
} else {
|
|
feed.toAckBytes = totalBytes
|
|
}
|
|
}
|
|
|
|
// send buffer ack to dcp
|
|
func (feed *UprFeed) sendBufferAck(sendSize uint32) {
|
|
bufferAck := &gomemcached.MCRequest{
|
|
Opcode: gomemcached.UPR_BUFFERACK,
|
|
}
|
|
bufferAck.Extras = make([]byte, 4)
|
|
binary.BigEndian.PutUint32(bufferAck.Extras[:4], uint32(sendSize))
|
|
feed.writeToTransmitCh(bufferAck)
|
|
feed.stats.TotalBufferAckSent++
|
|
}
|
|
|
|
func (feed *UprFeed) GetUprStats() *UprStats {
|
|
return &feed.stats
|
|
}
|
|
|
|
func composeOpaque(vbno, opaqueMSB uint16) uint32 {
|
|
return (uint32(opaqueMSB) << 16) | uint32(vbno)
|
|
}
|
|
|
|
func getUprOpenCtrlOpaque() uint32 {
|
|
return atomic.AddUint32(&opaqueOpenCtrlWell, 1)
|
|
}
|
|
|
|
func appOpaque(opq32 uint32) uint16 {
|
|
return uint16((opq32 & 0xFFFF0000) >> 16)
|
|
}
|
|
|
|
func vbOpaque(opq32 uint32) uint16 {
|
|
return uint16(opq32 & 0xFFFF)
|
|
}
|
|
|
|
// Close this UprFeed.
|
|
func (feed *UprFeed) Close() {
|
|
feed.muFeedState.Lock()
|
|
defer feed.muFeedState.Unlock()
|
|
if feed.feedState != FeedStateClosed {
|
|
close(feed.closer)
|
|
feed.feedState = FeedStateClosed
|
|
feed.negotiator.initialize()
|
|
}
|
|
}
|
|
|
|
// check if the UprFeed has been closed
|
|
func (feed *UprFeed) Closed() bool {
|
|
feed.muFeedState.RLock()
|
|
defer feed.muFeedState.RUnlock()
|
|
return feed.feedState == FeedStateClosed
|
|
}
|
|
|
|
// set upr feed to opened state after initialization is done
|
|
func (feed *UprFeed) setOpen() {
|
|
feed.muFeedState.Lock()
|
|
defer feed.muFeedState.Unlock()
|
|
feed.feedState = FeedStateOpened
|
|
}
|
|
|
|
func (feed *UprFeed) isOpen() bool {
|
|
feed.muFeedState.RLock()
|
|
defer feed.muFeedState.RUnlock()
|
|
return feed.feedState == FeedStateOpened
|
|
}
|
|
|
|
func (feed *UprFeed) getState() FeedState {
|
|
feed.muFeedState.RLock()
|
|
defer feed.muFeedState.RUnlock()
|
|
return feed.feedState
|
|
}
|