comm

package
v0.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 11, 2017 License: GPL-3.0 Imports: 14 Imported by: 0

Documentation

Overview

Package comm implements network communication capabilities that are reliable and causally-ordered among multiple nodes. Vector clocks are used to ensure causality. Currently, communication is blocking on a sending node that fails to deliver an earlier message. Numerous message formats and required parses are provided to transform received marshalled CRDT messages into structured ones.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func InitReceiver

func InitReceiver(name string, logFilePath string, vclockLogPath string, socket net.Listener, applyCRDTUpdChan chan string, doneCRDTUpdChan chan struct{}, downRecv chan struct{}, nodes []string) (chan string, chan map[string]int, error)

InitReceiver initializes above struct and sets default values. It starts involved background routines and send initial channel trigger.

func InitSender

func InitSender(name string, logFilePath string, tlsConfig *tls.Config, timeout int, retry int, incVClock chan string, updVClock chan map[string]int, downSender chan struct{}, nodes map[string]string) (chan string, error)

InitSender initializes above struct and sets default values for most involved elements to start with. It returns a channel local processes can put CRDT changes into, so that those changes will be communicated to connected nodes.

func ParseOp

func ParseOp(payload string) (string, string, error)

ParseOp takes in raw incoming message payload, parses out the operation and returns it with the remaining part of the payload.

func ReliableConnect

func ReliableConnect(remoteAddr string, tlsConfig *tls.Config, retry int) (*tls.Conn, error)

ReliableConnect attempts to connect to defined remote node as longs as the error from previous attempts is possible to be dealt with.

func ReliableSend

func ReliableSend(conn *tls.Conn, text string, remoteAddr string, tlsConfig *tls.Config, timeout int, retry int) error

ReliableSend attempts to transmit a message between pluto nodes. If the first attempt fails, the node will try to reconnect and resend the message until successfully transmitted.

Types

type AppendMsg

type AppendMsg struct {
	User    string
	Mailbox string
	AddMail *Element
}

AppendMsg represents information needed in executing a remote APPEND operation on an ORSet CRDT.

func ParseAppend

func ParseAppend(payload string) (*AppendMsg, error)

ParseAppend takes in the remaining payload from ParseOp and attempts to parse it into a AppendMsg struct.

type CopyMsg

type CopyMsg struct {
	User     string
	Mailbox  string
	AddMails []*Element
}

CopyMsg represents information needed in executing a remote COPY operation on an ORSet CRDT.

func ParseCopy

func ParseCopy(payload string) (*CopyMsg, error)

ParseCopy takes in the remaining payload from ParseOp and attempts to parse it into a CopyMsg struct.

type CreateMsg

type CreateMsg struct {
	User       string
	Mailbox    string
	AddMailbox *Element
}

CreateMsg represents information needed in executing a remote CREATE operation on an ORSet CRDT.

func ParseCreate

func ParseCreate(payload string) (*CreateMsg, error)

ParseCreate takes in the remaining payload from ParseOp and attempts to parse it into a CreateMsg struct.

type DeleteMsg

type DeleteMsg struct {
	User       string
	Mailbox    string
	RmvMailbox []*Element
}

DeleteMsg represents information needed in executing a remote DELETE operation on an ORSet CRDT.

func ParseDelete

func ParseDelete(payload string) (*DeleteMsg, error)

ParseDelete takes in the remaining payload from ParseOp and attempts to parse it into a DeleteMsg struct.

type Element

type Element struct {
	Value    string
	Tag      string
	Contents string
}

Element contains the actual ORSet two-tupel elements and possible file contents of the element.

type ExpungeMsg

type ExpungeMsg struct {
	User    string
	Mailbox string
	RmvMail []*Element
}

ExpungeMsg represents information needed in executing a remote EXPUNGE operation on an ORSet CRDT.

func ParseExpunge

func ParseExpunge(payload string) (*ExpungeMsg, error)

ParseExpunge takes in the remaining payload from ParseOp and attempts to parse it into a ExpungeMsg struct.

type Message

type Message struct {
	Sender  string
	VClock  map[string]int
	Payload string
}

Message represents a CRDT synchronization message between nodes in a pluto system. It consists of the vector clock of the originating node and a CRDT payload to apply at receiver's CRDT replica.

func InitMessage

func InitMessage() *Message

InitMessage returns a fresh Message variable.

func Parse

func Parse(msg string) (*Message, error)

Parse takes in supplied string representing a received message and parses it back into message struct form.

func (*Message) String

func (m *Message) String() string

String marshalls given Message m into string representation so that we can send it out onto the TLS connection.

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver bundles all information needed to accept and process incoming CRDT downstream messages.

func (*Receiver) AcceptIncMsgs

func (recv *Receiver) AcceptIncMsgs() error

AcceptIncMsgs runs in background and waits for incoming CRDT messages. As soon as received, it dispatches into next routine.

func (*Receiver) ApplyStoredMsgs

func (recv *Receiver) ApplyStoredMsgs()

ApplyStoredMsgs waits for a signal on a channel that indicates a new available message to process, reads and updates the CRDT log file and applies the payload to the CRDT state.

func (*Receiver) IncVClockEntry

func (recv *Receiver) IncVClockEntry()

IncVClockEntry waits for an incoming name of a node on channel defined during initialization and passed on to senders. If the node is present in vector clock map, its value is incremented by one.

func (*Receiver) SaveVClockEntries

func (recv *Receiver) SaveVClockEntries() error

SaveVClockEntries writes current status of vector clock to log file to recover from later. It expects to be the only goroutine currently operating on receiver.

func (*Receiver) SetVClockEntries

func (recv *Receiver) SetVClockEntries() error

SetVClockEntries fetches saved vector clock entries from log file and sets them in internal vector clock. It expects to be the only goroutine currently operating on receiver.

func (*Receiver) Shutdown

func (recv *Receiver) Shutdown(downRecv chan struct{})

Shutdown awaits a receiver global shutdown signal and in turn instructs involved goroutines to finish and clean up.

func (*Receiver) StoreIncMsgs

func (recv *Receiver) StoreIncMsgs(conn net.Conn)

StoreIncMsgs takes received message string and saves it into incoming CRDT message log file.

func (*Receiver) TriggerMsgApplier added in v0.3.0

func (recv *Receiver) TriggerMsgApplier()

TriggerMsgApplier starts a timer that triggers an msgInLog event when duration elapsed. Supposed to routinely poke the ApplyStoredMsgs into checking for unprocessed messages in log.

type RenameMsg

type RenameMsg struct {
	User       string
	Mailbox    string
	RmvMailbox []*Element
	AddMailbox *Element
	AddMails   []*Element
}

RenameMsg represents information needed in executing a remote RENAME operation on an ORSet CRDT.

func ParseRename

func ParseRename(payload string) (*RenameMsg, error)

ParseRename takes in the remaining payload from ParseOp and attempts to parse it into a RenameMsg struct.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender bundles information needed for sending out sync messages via CRDTs.

func (*Sender) BrokerMsgs

func (sender *Sender) BrokerMsgs()

BrokerMsgs awaits a CRDT message to send to downstream replicas from one of the local processes on channel inc. It stores the message for sending in a dedicated CRDT log file and passes on a signal that a new message is available.

func (*Sender) SendMsgs

func (sender *Sender) SendMsgs()

SendMsgs waits for a signal indicating that a message is waiting in the log file to be send out and sends that to all downstream nodes.

func (*Sender) Shutdown

func (sender *Sender) Shutdown(downSender chan struct{})

Shutdown awaits a sender global shutdown signal and in turn instructs involved goroutines to finish and clean up open files.

type StoreMsg

type StoreMsg struct {
	User    string
	Mailbox string
	RmvMail []*Element
	AddMail *Element
}

StoreMsg represents information needed in executing a remote STORE operation on an ORSet CRDT.

func ParseStore

func ParseStore(payload string) (*StoreMsg, error)

ParseStore takes in the remaining payload from ParseOp and attempts to parse it into a StoreMsg struct.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL