package memcached import ( "bytes" "encoding/binary" "fmt" "io" "math" "github.com/couchbase/gomemcached" "github.com/couchbase/goutils/logging" ) // TAP protocol docs: <http://www.couchbase.com/wiki/display/couchbase/TAP+Protocol> // TapOpcode is the tap operation type (found in TapEvent) type TapOpcode uint8 // Tap opcode values. const ( TapBeginBackfill = TapOpcode(iota) TapEndBackfill TapMutation TapDeletion TapCheckpointStart TapCheckpointEnd tapEndStream ) const tapMutationExtraLen = 16 var tapOpcodeNames map[TapOpcode]string func init() { tapOpcodeNames = map[TapOpcode]string{ TapBeginBackfill: "BeginBackfill", TapEndBackfill: "EndBackfill", TapMutation: "Mutation", TapDeletion: "Deletion", TapCheckpointStart: "TapCheckpointStart", TapCheckpointEnd: "TapCheckpointEnd", tapEndStream: "EndStream", } } func (opcode TapOpcode) String() string { name := tapOpcodeNames[opcode] if name == "" { name = fmt.Sprintf("#%d", opcode) } return name } // TapEvent is a TAP notification of an operation on the server. type TapEvent struct { Opcode TapOpcode // Type of event VBucket uint16 // VBucket this event applies to Flags uint32 // Item flags Expiry uint32 // Item expiration time Key, Value []byte // Item key/value Cas uint64 } func makeTapEvent(req gomemcached.MCRequest) *TapEvent { event := TapEvent{ VBucket: req.VBucket, } switch req.Opcode { case gomemcached.TAP_MUTATION: event.Opcode = TapMutation event.Key = req.Key event.Value = req.Body event.Cas = req.Cas case gomemcached.TAP_DELETE: event.Opcode = TapDeletion event.Key = req.Key event.Cas = req.Cas case gomemcached.TAP_CHECKPOINT_START: event.Opcode = TapCheckpointStart case gomemcached.TAP_CHECKPOINT_END: event.Opcode = TapCheckpointEnd case gomemcached.TAP_OPAQUE: if len(req.Extras) < 8+4 { return nil } switch op := int(binary.BigEndian.Uint32(req.Extras[8:])); op { case gomemcached.TAP_OPAQUE_INITIAL_VBUCKET_STREAM: event.Opcode = TapBeginBackfill case gomemcached.TAP_OPAQUE_CLOSE_BACKFILL: event.Opcode = TapEndBackfill case gomemcached.TAP_OPAQUE_CLOSE_TAP_STREAM: event.Opcode = tapEndStream case gomemcached.TAP_OPAQUE_ENABLE_AUTO_NACK: return nil case gomemcached.TAP_OPAQUE_ENABLE_CHECKPOINT_SYNC: return nil default: logging.Infof("TapFeed: Ignoring TAP_OPAQUE/%d", op) return nil // unknown opaque event } case gomemcached.NOOP: return nil // ignore default: logging.Infof("TapFeed: Ignoring %s", req.Opcode) return nil // unknown event } if len(req.Extras) >= tapMutationExtraLen && (event.Opcode == TapMutation || event.Opcode == TapDeletion) { event.Flags = binary.BigEndian.Uint32(req.Extras[8:]) event.Expiry = binary.BigEndian.Uint32(req.Extras[12:]) } return &event } func (event TapEvent) String() string { switch event.Opcode { case TapBeginBackfill, TapEndBackfill, TapCheckpointStart, TapCheckpointEnd: return fmt.Sprintf("<TapEvent %s, vbucket=%d>", event.Opcode, event.VBucket) default: return fmt.Sprintf("<TapEvent %s, key=%q (%d bytes) flags=%x, exp=%d>", event.Opcode, event.Key, len(event.Value), event.Flags, event.Expiry) } } // TapArguments are parameters for requesting a TAP feed. // // Call DefaultTapArguments to get a default one. type TapArguments struct { // Timestamp of oldest item to send. // // Use TapNoBackfill to suppress all past items. Backfill uint64 // If set, server will disconnect after sending existing items. Dump bool // The indices of the vbuckets to watch; empty/nil to watch all. VBuckets []uint16 // Transfers ownership of vbuckets during cluster rebalance. Takeover bool // If true, server will wait for client ACK after every notification. SupportAck bool // If true, client doesn't want values so server shouldn't send them. KeysOnly bool // If true, client wants the server to send checkpoint events. Checkpoint bool // Optional identifier to use for this client, to allow reconnects ClientName string // Registers this client (by name) till explicitly deregistered. RegisteredClient bool } // Value for TapArguments.Backfill denoting that no past events at all // should be sent. const TapNoBackfill = math.MaxUint64 // DefaultTapArguments returns a default set of parameter values to // pass to StartTapFeed. func DefaultTapArguments() TapArguments { return TapArguments{ Backfill: TapNoBackfill, } } func (args *TapArguments) flags() []byte { var flags gomemcached.TapConnectFlag if args.Backfill != 0 { flags |= gomemcached.BACKFILL } if args.Dump { flags |= gomemcached.DUMP } if len(args.VBuckets) > 0 { flags |= gomemcached.LIST_VBUCKETS } if args.Takeover { flags |= gomemcached.TAKEOVER_VBUCKETS } if args.SupportAck { flags |= gomemcached.SUPPORT_ACK } if args.KeysOnly { flags |= gomemcached.REQUEST_KEYS_ONLY } if args.Checkpoint { flags |= gomemcached.CHECKPOINT } if args.RegisteredClient { flags |= gomemcached.REGISTERED_CLIENT } encoded := make([]byte, 4) binary.BigEndian.PutUint32(encoded, uint32(flags)) return encoded } func must(err error) { if err != nil { panic(err) } } func (args *TapArguments) bytes() (rv []byte) { buf := bytes.NewBuffer([]byte{}) if args.Backfill > 0 { must(binary.Write(buf, binary.BigEndian, uint64(args.Backfill))) } if len(args.VBuckets) > 0 { must(binary.Write(buf, binary.BigEndian, uint16(len(args.VBuckets)))) for i := 0; i < len(args.VBuckets); i++ { must(binary.Write(buf, binary.BigEndian, uint16(args.VBuckets[i]))) } } return buf.Bytes() } // TapFeed represents a stream of events from a server. type TapFeed struct { C <-chan TapEvent Error error closer chan bool } // StartTapFeed starts a TAP feed on a client connection. // // The events can be read from the returned channel. The connection // can no longer be used for other purposes; it's now reserved for // receiving the TAP messages. To stop receiving events, close the // client connection. func (mc *Client) StartTapFeed(args TapArguments) (*TapFeed, error) { rq := &gomemcached.MCRequest{ Opcode: gomemcached.TAP_CONNECT, Key: []byte(args.ClientName), Extras: args.flags(), Body: args.bytes()} err := mc.Transmit(rq) if err != nil { return nil, err } ch := make(chan TapEvent) feed := &TapFeed{ C: ch, closer: make(chan bool), } go mc.runFeed(ch, feed) return feed, nil } // TapRecvHook is called after every incoming tap packet is received. var TapRecvHook func(*gomemcached.MCRequest, int, error) // Internal goroutine that reads from the socket and writes events to // the channel func (mc *Client) runFeed(ch chan TapEvent, feed *TapFeed) { defer close(ch) var headerBuf [gomemcached.HDR_LEN]byte loop: for { // Read the next request from the server. // // (Can't call mc.Receive() because it reads a // _response_ not a request.) var pkt gomemcached.MCRequest n, err := pkt.Receive(mc.conn, headerBuf[:]) if TapRecvHook != nil { TapRecvHook(&pkt, n, err) } if err != nil { if err != io.EOF { feed.Error = err } break loop } //logging.Infof("** TapFeed received %#v : %q", pkt, pkt.Body) if pkt.Opcode == gomemcached.TAP_CONNECT { // This is not an event from the server; it's // an error response to my connect request. feed.Error = fmt.Errorf("tap connection failed: %s", pkt.Body) break loop } event := makeTapEvent(pkt) if event != nil { if event.Opcode == tapEndStream { break loop } select { case ch <- *event: case <-feed.closer: break loop } } if len(pkt.Extras) >= 4 { reqFlags := binary.BigEndian.Uint16(pkt.Extras[2:]) if reqFlags&gomemcached.TAP_ACK != 0 { if _, err := mc.sendAck(&pkt); err != nil { feed.Error = err break loop } } } } if err := mc.Close(); err != nil { logging.Errorf("Error closing memcached client: %v", err) } } func (mc *Client) sendAck(pkt *gomemcached.MCRequest) (int, error) { res := gomemcached.MCResponse{ Opcode: pkt.Opcode, Opaque: pkt.Opaque, Status: gomemcached.SUCCESS, } return res.Transmit(mc.conn) } // Close terminates a TapFeed. // // Call this if you stop using a TapFeed before its channel ends. func (feed *TapFeed) Close() { close(feed.closer) }