Documentation
¶
Overview ¶
Package comm implements network communication capabilities that are reliable and causally-ordered among multiple nodes. Vector clocks are used to ensure causality. Currently, communication is blocking on a sending node that fails to deliver an earlier message. Numerous message formats and required parses are provided to transform received marshalled CRDT messages into structured ones.
Index ¶
- func InitReceiver(name string, logFilePath string, vclockLogPath string, socket net.Listener, ...) (chan string, chan map[string]int, error)
- func InitSender(name string, logFilePath string, tlsConfig *tls.Config, timeout int, retry int, ...) (chan string, error)
- func ParseOp(payload string) (string, string, error)
- func ReliableConnect(remoteAddr string, tlsConfig *tls.Config, retry int) (*tls.Conn, error)
- func ReliableSend(conn *tls.Conn, text string, remoteAddr string, tlsConfig *tls.Config, ...) error
- type AppendMsg
- type CopyMsg
- type CreateMsg
- type DeleteMsg
- type Element
- type ExpungeMsg
- type Message
- type Receiver
- func (recv *Receiver) AcceptIncMsgs() error
- func (recv *Receiver) ApplyStoredMsgs()
- func (recv *Receiver) IncVClockEntry()
- func (recv *Receiver) SaveVClockEntries() error
- func (recv *Receiver) SetVClockEntries() error
- func (recv *Receiver) Shutdown(downRecv chan struct{})
- func (recv *Receiver) StoreIncMsgs(conn net.Conn)
- func (recv *Receiver) TriggerMsgApplier()
- type RenameMsg
- type Sender
- type StoreMsg
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func InitReceiver ¶
func InitReceiver(name string, logFilePath string, vclockLogPath string, socket net.Listener, applyCRDTUpdChan chan string, doneCRDTUpdChan chan struct{}, downRecv chan struct{}, nodes []string) (chan string, chan map[string]int, error)
InitReceiver initializes above struct and sets default values. It starts involved background routines and send initial channel trigger.
func InitSender ¶
func InitSender(name string, logFilePath string, tlsConfig *tls.Config, timeout int, retry int, incVClock chan string, updVClock chan map[string]int, downSender chan struct{}, nodes map[string]string) (chan string, error)
InitSender initializes above struct and sets default values for most involved elements to start with. It returns a channel local processes can put CRDT changes into, so that those changes will be communicated to connected nodes.
func ParseOp ¶
ParseOp takes in raw incoming message payload, parses out the operation and returns it with the remaining part of the payload.
func ReliableConnect ¶
ReliableConnect attempts to connect to defined remote node as longs as the error from previous attempts is possible to be dealt with.
func ReliableSend ¶
func ReliableSend(conn *tls.Conn, text string, remoteAddr string, tlsConfig *tls.Config, timeout int, retry int) error
ReliableSend attempts to transmit a message between pluto nodes. If the first attempt fails, the node will try to reconnect and resend the message until successfully transmitted.
Types ¶
type AppendMsg ¶
AppendMsg represents information needed in executing a remote APPEND operation on an ORSet CRDT.
func ParseAppend ¶
ParseAppend takes in the remaining payload from ParseOp and attempts to parse it into a AppendMsg struct.
type CopyMsg ¶
CopyMsg represents information needed in executing a remote COPY operation on an ORSet CRDT.
type CreateMsg ¶
CreateMsg represents information needed in executing a remote CREATE operation on an ORSet CRDT.
func ParseCreate ¶
ParseCreate takes in the remaining payload from ParseOp and attempts to parse it into a CreateMsg struct.
type DeleteMsg ¶
DeleteMsg represents information needed in executing a remote DELETE operation on an ORSet CRDT.
func ParseDelete ¶
ParseDelete takes in the remaining payload from ParseOp and attempts to parse it into a DeleteMsg struct.
type Element ¶
Element contains the actual ORSet two-tupel elements and possible file contents of the element.
type ExpungeMsg ¶
ExpungeMsg represents information needed in executing a remote EXPUNGE operation on an ORSet CRDT.
func ParseExpunge ¶
func ParseExpunge(payload string) (*ExpungeMsg, error)
ParseExpunge takes in the remaining payload from ParseOp and attempts to parse it into a ExpungeMsg struct.
type Message ¶
Message represents a CRDT synchronization message between nodes in a pluto system. It consists of the vector clock of the originating node and a CRDT payload to apply at receiver's CRDT replica.
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver bundles all information needed to accept and process incoming CRDT downstream messages.
func (*Receiver) AcceptIncMsgs ¶
AcceptIncMsgs runs in background and waits for incoming CRDT messages. As soon as received, it dispatches into next routine.
func (*Receiver) ApplyStoredMsgs ¶
func (recv *Receiver) ApplyStoredMsgs()
ApplyStoredMsgs waits for a signal on a channel that indicates a new available message to process, reads and updates the CRDT log file and applies the payload to the CRDT state.
func (*Receiver) IncVClockEntry ¶
func (recv *Receiver) IncVClockEntry()
IncVClockEntry waits for an incoming name of a node on channel defined during initialization and passed on to senders. If the node is present in vector clock map, its value is incremented by one.
func (*Receiver) SaveVClockEntries ¶
SaveVClockEntries writes current status of vector clock to log file to recover from later. It expects to be the only goroutine currently operating on receiver.
func (*Receiver) SetVClockEntries ¶
SetVClockEntries fetches saved vector clock entries from log file and sets them in internal vector clock. It expects to be the only goroutine currently operating on receiver.
func (*Receiver) Shutdown ¶
func (recv *Receiver) Shutdown(downRecv chan struct{})
Shutdown awaits a receiver global shutdown signal and in turn instructs involved goroutines to finish and clean up.
func (*Receiver) StoreIncMsgs ¶
StoreIncMsgs takes received message string and saves it into incoming CRDT message log file.
func (*Receiver) TriggerMsgApplier ¶ added in v0.3.0
func (recv *Receiver) TriggerMsgApplier()
TriggerMsgApplier starts a timer that triggers an msgInLog event when duration elapsed. Supposed to routinely poke the ApplyStoredMsgs into checking for unprocessed messages in log.
type RenameMsg ¶
type RenameMsg struct {
User string
Mailbox string
RmvMailbox []*Element
AddMailbox *Element
AddMails []*Element
}
RenameMsg represents information needed in executing a remote RENAME operation on an ORSet CRDT.
func ParseRename ¶
ParseRename takes in the remaining payload from ParseOp and attempts to parse it into a RenameMsg struct.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender bundles information needed for sending out sync messages via CRDTs.
func (*Sender) BrokerMsgs ¶
func (sender *Sender) BrokerMsgs()
BrokerMsgs awaits a CRDT message to send to downstream replicas from one of the local processes on channel inc. It stores the message for sending in a dedicated CRDT log file and passes on a signal that a new message is available.
type StoreMsg ¶
StoreMsg represents information needed in executing a remote STORE operation on an ORSet CRDT.
func ParseStore ¶
ParseStore takes in the remaining payload from ParseOp and attempts to parse it into a StoreMsg struct.