Documentation
¶
Index ¶
- Constants
- Variables
- func CheckAndReturnDefaultForScopeOrCollection(key string) string
- func CheckKeyspaceExist(observer notifier.Observer, keyspace application.Keyspace) bool
- func DistributeAndWaitWork[T comparable](parallelism int, batchSize int, initialiser func(chan<- T) error, ...)
- func GetCheckpointKey(app *AppConfig, vb uint16, cType checkpointType) string
- func GetCompositeState(dStatus, pStatus bool) int8
- func GetDefaultHandlerHeaders() []string
- func GetHexToUint32(keyspaceComponentHexId string) (uint32, error)
- func GetLocalhost(ipMode IpMode) string
- func GetLogfileLocationAndName(parentDir string, locationString string) (string, string)
- func GetRand16Byte() (uint32, error)
- func GetSystemEventInfo(eventId systemeventlog.EventId) (systemeventlog.SystemEventInfo, error)
- func HexLittleEndianToUint64(hexLE []byte) uint64
- func InitialiseSystemEventLogger(baseNsserverURL string)
- func LogSystemEvent(eventId systemeventlog.EventId, severity systemeventlog.EventSeverity, ...)
- func RandomID() (string, error)
- func RandomIDFromDict(dict string) (string, error)
- func StopServer(server *http.Server) error
- func Uint32ToHex(uint32Val uint32) string
- type AppConfig
- type AppRebalanceProgress
- type AppStatus
- type AppStatusResponse
- type Application
- type AtomicTypes
- type Broadcaster
- type Bucket
- type ChangeType
- type ClusterSettings
- type CompileStatus
- type Config
- type Constant
- type CouchbaseVer
- type Credential
- type Curl
- type CursorRegistryMgr
- type CursorRegistryReader
- type CursorRegistryWriter
- type DcpStreamBoundary
- type DebuggerInstance
- type DebuggerOp
- type DepCfg
- type EventProcessingStats
- type EventingConsumer
- type EventingProducer
- type EventingServiceMgr
- type EventingSuperSup
- type FunctionScope
- type GlobalStatsCounter
- type HandlerConfig
- type HistogramStats
- func (hs *HistogramStats) Copy() *HistogramStats
- func (hs *HistogramStats) Get() map[string]uint64
- func (hs *HistogramStats) MarshalJSON() ([]byte, error)
- func (hs *HistogramStats) PercentileN(p int) int
- func (hs *HistogramStats) Reset()
- func (hs *HistogramStats) Update(delta map[string]uint64)
- func (hs *HistogramStats) UpdateWithHistogram(hs1 *HistogramStats)
- type Identity
- type Insight
- type InsightLine
- type Insights
- type IpMode
- type Key
- type Keyspace
- type KeyspaceConvertor
- type KeyspaceID
- type KeyspaceName
- type LifecycleMsg
- type MarshalledData
- type MonitorType
- type Owner
- type OwnershipRoutine
- type PlannerNodeVbMapping
- type ProcessConfig
- type RebalanceConfig
- type RebalanceProgress
- type SecuritySetting
- type Signal
- type SingleAppStatusResponse
- type Stats
- type StatsData
- type StatsInterface
- type StatsType
- type StorageEngine
- type StreamType
- type TopologyChangeMsg
- type UndeployAction
Constants ¶
View Source
const ( DcpEverything = DcpStreamBoundary("everything") DcpFromNow = DcpStreamBoundary("from_now") DcpFromPrior = DcpStreamBoundary("from_prior") )
View Source
const ( AppLocationTag = "appLocation" AppLocationsTag = "appLocations" ReasonTag = "reason" )
View Source
const ( StartRebalanceCType = ChangeType("start-rebalance") StopRebalanceCType = ChangeType("stop-rebalance") StartFailoverCType = ChangeType("start-failover") )
View Source
const ( AppState int8 = iota AppStateUndeployed AppStateEnabled AppStatePaused AppStateUnexpected )
View Source
const ( WaitingForMutation = "WaitingForMutation" // Debugger has been started and consumers are waiting to trap MutationTrapped = "MutationTrapped" // One of the consumers have trapped the mutation DebuggerTokenKey = "debugger" MetakvDebuggerPath = MetakvEventingPath + "debugger/" MetakvTempAppsPath = MetakvEventingPath + "tempApps/" MetakvCredentialsPath = MetakvEventingPath + "credentials/" )
View Source
const ( DebuggerCheckpoint checkpointType = iota Checkpoint )
View Source
const ( RandomNameDict = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" InstanceIDDict = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ*&" )
View Source
const ( Ipv4 = IpMode("ipv4") Ipv6 = IpMode("ipv6") )
View Source
const ( KiB int = 1 << ((iota + 1) * 10) // Equivalent to 2^10 MiB // Equivalent to 2^20 GiB // Equivalent to 2^30 )
View Source
const ( QueryVbMapVersion = "version" NewResponse = "newResponse" TenantID = "tenantID" )
View Source
const ( MetakvEventingPath = "/eventing" EventingTopologyPath = MetakvEventingPath + "/rebalanceToken/" // Listen to this and based on full path decide whether first one or second one is triggerd EventingFunctionPath = MetakvEventingPath + "/tempApps/" EventingConfigPath = MetakvEventingPath + "/settings/config/" EventingMetakvConfigKeepNodes = MetakvEventingPath + "/config/keepNodes" EventingTenantDistributionPath = MetakvEventingPath + "/tenantDistribution" EventingDebuggerPath = MetakvEventingPath + "/debugger/" )
View Source
const ( EventingFunctionPathTemplate = EventingFunctionPath + "%s/%d" EventingFunctionCredentialTemplate = MetakvEventingPath + "/credentials/%s" EventingConfigPathTemplate = EventingConfigPath + "%s" EventingDebuggerPathTemplate = EventingDebuggerPath + "%s" )
View Source
const ( PauseFunctionTemplate = "/api/v1/functions/%s/pause" UndeployFunctionTemplate = "/api/v1/functions/%s/undeploy" )
View Source
const ( SYSTEM_EVENT_COMPONENT = "eventing" SUB_COMPONENT_EVENTING_PRODUCER = "eventing-producer" DEFAULT_TIMEOUT_SECS = 2 )
View Source
const ( EVENTID_PRODUCER_STARTUP systemeventlog.EventId = 4096 EVENTID_CONSUMER_STARTUP systemeventlog.EventId = 4097 EVENTID_CONSUMER_CRASH systemeventlog.EventId = 4098 EVENTID_START_TRACING systemeventlog.EventId = 4099 EVENTID_STOP_TRACING systemeventlog.EventId = 4100 EVENTID_START_DEBUGGER systemeventlog.EventId = 4101 EVENTID_STOP_DEBUGGER systemeventlog.EventId = 4102 EVENTID_CREATE_FUNCTION systemeventlog.EventId = 4103 EVENTID_DELETE_FUNCTION systemeventlog.EventId = 4104 EVENTID_IMPORT_FUNCTIONS systemeventlog.EventId = 4105 EVENTID_EXPORT_FUNCTIONS systemeventlog.EventId = 4106 EVENTID_BACKUP_FUNCTION systemeventlog.EventId = 4107 EVENTID_RESTORE_FUNCTION systemeventlog.EventId = 4108 EVENTID_DEPLOY_FUNCTION systemeventlog.EventId = 4109 EVENTID_UNDEPLOY_FUNCTION systemeventlog.EventId = 4110 EVENTID_PAUSE_FUNCTION systemeventlog.EventId = 4111 EVENTID_RESUME_FUNCTION systemeventlog.EventId = 4112 EVENTID_CLEANUP_EVENTING systemeventlog.EventId = 4113 EVENTID_DIE systemeventlog.EventId = 4114 EVENTID_TRIGGER_GC systemeventlog.EventId = 4115 EVENTID_FREE_OS_MEMORY systemeventlog.EventId = 4116 EVENTID_UPDATE_CONFIG systemeventlog.EventId = 4117 EVENTID_CLEAR_STATISTICS systemeventlog.EventId = 4118 )
View Source
const (
CurlFeature uint32 = 1 << iota
)
View Source
const (
HttpCallWaitTime = 2 * time.Second
)
View Source
const (
SystemScopeName = "_system"
)
Variables ¶
View Source
var ( Couchstore = StorageEngine("couchstore") Magma = StorageEngine("magma") )
View Source
var ( ErrRetryTimeout = errors.New("retry timeout") ErrEncryptionLevelChanged = errors.New("Encryption Level changed during boostrap") ErrHandleEmpty = errors.New("Bucket handle not initialized") ErrOnDeployFail = errors.New("OnDeploy execution failed") )
View Source
var ( TransactionMutationPrefix = []byte("_txn:") SyncGatewayMutationPrefix = []byte("_sync:") SyncGatewayAttachmentPrefix = []byte("_sync:att") )
View Source
var BucketNotWatched = errors.New("Bucket not being watched")
View Source
var CouchbaseVerMap = map[string]CouchbaseVer{ "vulcan": CouchbaseVer{ // contains filtered or unexported fields }, "alice": CouchbaseVer{ // contains filtered or unexported fields }, "mad-hatter": CouchbaseVer{ // contains filtered or unexported fields }, "cheshire-cat": CouchbaseVer{ // contains filtered or unexported fields }, "6.6.2": CouchbaseVer{ // contains filtered or unexported fields }, }
View Source
var (
CrcTable = crc32.MakeTable(crc32.Castagnoli)
)
View Source
var (
DisableCurl = "disable_curl"
)
View Source
var (
ErrDecodingComponentID = errors.New("error decoding keyspaceComponentId")
)
View Source
var ErrInvalidVersion = errors.New("invalid eventing version")
View Source
var (
ErrNodeNotAvailable = errors.New("node not available")
)
View Source
var LanguageCompatibility = []string{"7.2.0", "6.6.2", "6.0.0", "6.5.0"}
missing default is filled by the index 0
View Source
var MetakvMaxRetries int64 = 60
Functions ¶
func CheckKeyspaceExist ¶
func CheckKeyspaceExist(observer notifier.Observer, keyspace application.Keyspace) bool
func DistributeAndWaitWork ¶
func GetCheckpointKey ¶
func GetCompositeState ¶
func GetDefaultHandlerHeaders ¶
func GetDefaultHandlerHeaders() []string
func GetHexToUint32 ¶
func GetLocalhost ¶
func GetRand16Byte ¶
func GetSystemEventInfo ¶
func GetSystemEventInfo(eventId systemeventlog.EventId) (systemeventlog.SystemEventInfo, error)
func HexLittleEndianToUint64 ¶
func InitialiseSystemEventLogger ¶
func InitialiseSystemEventLogger(baseNsserverURL string)
func LogSystemEvent ¶
func LogSystemEvent(eventId systemeventlog.EventId, severity systemeventlog.EventSeverity, extraAttributes interface{})
func RandomIDFromDict ¶
func Uint32ToHex ¶
Types ¶
type AppConfig ¶
type AppConfig struct {
AppCode string
ParsedAppCode string
AppDeployState string
AppName string
AppLocation string
AppState string
AppVersion string
FunctionID uint32
FunctionInstanceID string
LastDeploy string
Settings map[string]interface{}
UserPrefix string
FunctionScope FunctionScope
}
AppConfig Application/Event handler configuration
type AppRebalanceProgress ¶
type AppStatus ¶
type AppStatus struct {
CompositeStatus string `json:"composite_status"`
Name string `json:"name"`
FunctionScope application.OldNamespace `json:"function_scope"`
NumBootstrappingNodes int `json:"num_bootstrapping_nodes"`
NumDeployedNodes int `json:"num_deployed_nodes"`
DeploymentStatus bool `json:"deployment_status"`
ProcessingStatus bool `json:"processing_status"`
RedeployRequired bool `json:"redeploy_required"`
AppState stateMachine.AppState `json:"-"`
}
type AppStatusResponse ¶
type Application ¶
type Application struct {
AppHandlers string `json:"appcode"`
DeploymentConfig DepCfg `json:"depcfg"`
EventingVersion string `json:"version"`
EnforceSchema bool `json:"enforce_schema"`
FunctionID uint32 `json:"handleruuid"`
FunctionInstanceID string `json:"function_instance_id"`
Name string `json:"appname"`
Settings map[string]interface{} `json:"settings"`
Metainfo map[string]interface{} `json:"metainfo,omitempty"`
Owner *Owner
FunctionScope FunctionScope `json:"function_scope"`
}
type AtomicTypes ¶
type AtomicTypes[T any] struct { // contains filtered or unexported fields }
func NewAtomicTypes ¶
func NewAtomicTypes[T any](val T) *AtomicTypes[T]
func (*AtomicTypes[T]) Load ¶
func (a *AtomicTypes[T]) Load() T
func (*AtomicTypes[T]) Store ¶
func (a *AtomicTypes[T]) Store(val T)
func (*AtomicTypes[T]) Swap ¶
func (a *AtomicTypes[T]) Swap(new T) T
type Broadcaster ¶
type Broadcaster interface {
Request(onlyThisNode bool, dontSkipOthersOnError bool, path string, request *pc.Request) ([][]byte, *pc.Response, error)
RequestFor(nodeUUID string, path string, request *pc.Request) ([][]byte, *pc.Response, error)
CloseBroadcaster()
}
func NewBroadcaster ¶
func NewBroadcaster(observer notifier.Observer) (Broadcaster, error)
type ChangeType ¶
type ChangeType string
type ClusterSettings ¶
type ClusterSettings struct {
LocalUsername string
LocalPassword string
LocalAddress string
AdminHTTPPort string
AdminSSLPort string
SslCAFile string
SslCertFile string
SslKeyFile string
ClientKeyFile string
ClientCertFile string
ExecutablePath string
EventingDir string
KvPort string
RestPort string
DebugPort string
UUID string
DiagDir string
MaxRunningNodes int
IpMode IpMode
NodeCompatVersion *notifier.Version
}
func (*ClusterSettings) ProtocolVer ¶
func (cs *ClusterSettings) ProtocolVer() string
func (*ClusterSettings) String ¶
func (cs *ClusterSettings) String() string
type CompileStatus ¶
type CouchbaseVer ¶
type CouchbaseVer struct {
// contains filtered or unexported fields
}
func FrameCouchbaseVerFromNsServerStreamingRestApi ¶
func FrameCouchbaseVerFromNsServerStreamingRestApi(ver string) (CouchbaseVer, error)
major.minor.mpVersion-build-type
func FrameCouchbaseVersion ¶
func FrameCouchbaseVersion(ver string) (CouchbaseVer, error)
func FrameCouchbaseVersionShort ¶
func FrameCouchbaseVersionShort(ver string) (CouchbaseVer, error)
for short hand version like x.x.x
func (CouchbaseVer) Compare ¶
func (e CouchbaseVer) Compare(need CouchbaseVer) bool
returns e >= need
func (CouchbaseVer) String ¶
func (e CouchbaseVer) String() string
type Credential ¶
type Curl ¶
type Curl struct {
Hostname string `json:"hostname"`
Value string `json:"value"`
AuthType string `json:"auth_type"`
Username string `json:"username"`
Password string `json:"password"`
BearerKey string `json:"bearer_key"`
AllowCookies bool `json:"allow_cookies"`
ValidateSSLCertificate bool `json:"validate_ssl_certificate"`
}
type CursorRegistryMgr ¶
type CursorRegistryMgr interface {
UpdateLimit(newlimit uint8)
CursorRegistryWriter
CursorRegistryReader
}
type CursorRegistryReader ¶
type CursorRegistryReader interface {
GetCursors(k KeyspaceName) (map[string]struct{}, bool)
PrintTree()
}
type CursorRegistryWriter ¶
type CursorRegistryWriter interface {
Register(k KeyspaceName, funcId string) bool
Unregister(k KeyspaceName, funcId string)
}
type DcpStreamBoundary ¶
type DcpStreamBoundary string
func StreamBoundary ¶
func StreamBoundary(boundary string) DcpStreamBoundary
type DebuggerInstance ¶
type DebuggerInstance struct {
Token string `json:"token"` // An ID for a debugging session
Host string `json:"host"` // The node where debugger has been spawned
Status string `json:"status"` // Possible values are WaitingForMutation, MutationTrapped
URL string `json:"url"` // Chrome-Devtools URL for debugging
NodesExternalIP []string `json:"nodes_external_ip"` // List of external IP address of the nodes in the cluster
}
type DebuggerOp ¶
type DebuggerOp uint8
const ( StartDebuggerOp DebuggerOp = iota GetDebuggerURl WriteDebuggerURL StopDebuggerOp )
func (DebuggerOp) String ¶
func (do DebuggerOp) String() string
type DepCfg ¶
type DepCfg struct {
Buckets []Bucket `json:"buckets,omitempty"`
Curl []Curl `json:"curl,omitempty"`
Constants []Constant `json:"constants,omitempty"`
SourceBucket string `json:"source_bucket"`
SourceScope string `json:"source_scope"`
SourceCollection string `json:"source_collection"`
MetadataBucket string `json:"metadata_bucket"`
MetadataScope string `json:"metadata_scope"`
MetadataCollection string `json:"metadata_collection"`
}
type EventProcessingStats ¶
type EventingConsumer ¶
type EventingConsumer interface {
BootstrapStatus() bool
CheckIfQueuesAreDrained() error
ClearEventStats()
CloseAllRunningDcpFeeds()
ConsumerName() string
DcpEventsRemainingToProcess() uint64
EventingNodeUUIDs() []string
EventsProcessedPSec() *EventProcessingStats
GetEventProcessingStats() map[string]uint64
GetExecutionStats() map[string]interface{}
GetFailureStats() map[string]interface{}
GetInsight() *Insight
GetLcbExceptionsStats() map[string]uint64
GetMetaStoreStats() map[string]uint64
HandleV8Worker() error
HostPortAddr() string
Index() int
InternalVbDistributionStats() []uint16
NodeUUID() string
NotifyClusterChange()
NotifyRebalanceStop()
NotifySettingsChange()
Pid() int
RebalanceStatus() bool
RebalanceTaskProgress() *RebalanceProgress
RemoveSupervisorToken() error
ResetBootstrapDone()
ResetCounters()
Serve()
SetConnHandle(net.Conn)
SetFeedbackConnHandle(net.Conn)
SetRebalanceStatus(status bool)
GetRebalanceStatus() bool
GetPrevRebalanceInCompleteStatus() bool
SignalBootstrapFinish()
SignalConnected()
SignalFeedbackConnected()
SignalStopDebugger() error
SpawnCompilationWorker(appCode, appContent, appName, eventingPort string, handlerHeaders, handlerFooters []string) (*CompileStatus, error)
Stop(context string)
String() string
TimerDebugStats() map[int]map[string]interface{}
NotifyPrepareTopologyChange(keepNodes, ejectNodes []string)
UpdateEncryptionLevel(enforceTLS, encryptOn bool)
UpdateWorkerQueueMemCap(quota int64)
VbDcpEventsRemainingToProcess() map[int]int64
VbEventingNodeAssignMapUpdate(map[uint16]string)
VbProcessingStats() map[uint16]map[string]interface{}
VbSeqnoStats() map[int]map[string]interface{}
WorkerVbMapUpdate(map[string][]uint16)
SendAssignedVbs()
PauseConsumer()
GetAssignedVbs(workerName string) ([]uint16, error)
NotifyWorker()
GetOwner() *Owner
SetFeatureMatrix(featureMatrix uint32)
GetSuperSup() EventingSuperSup
}
EventingConsumer interface to export functions from eventing_consumer
type EventingProducer ¶
type EventingProducer interface {
AddMetadataPrefix(key string) Key
AppendCurlLatencyStats(deltas StatsData)
AppendLatencyStats(deltas StatsData)
BootstrapStatus() bool
CfgData() string
CheckpointBlobDump() map[string]interface{}
CleanupMetadataBucket(skipCheckpointBlobs bool) error
CleanupUDSs()
ClearEventStats()
DcpFeedBoundary() string
GetAppCode() string
GetAppLog(sz int64) []string
GetDcpEventsRemainingToProcess() uint64
GetDebuggerURL() (string, error)
GetEventingConsumerPids() map[string]int
GetEventProcessingStats() map[string]uint64
GetExecutionStats() map[string]interface{}
GetFailureStats() map[string]interface{}
GetLatencyStats() StatsData
GetCurlLatencyStats() StatsData
GetInsight() *Insight
GetLcbExceptionsStats() map[string]uint64
GetMetaStoreStats() map[string]uint64
GetMetadataPrefix() string
GetNsServerPort() string
GetVbOwner(vb uint16) (string, string, error)
GetSeqsProcessed() map[int]int64
GetDebuggerToken() string
InternalVbDistributionStats() map[string]string
IsEventingNodeAlive(eventingHostPortAddr, nodeUUID string) bool
IsPlannerRunning() bool
IsTrapEvent() bool
KillAllConsumers()
KillAndRespawnEventingConsumer(consumer EventingConsumer)
KvHostPorts() []string
LenRunningConsumers() int
MetadataBucket() string
MetadataScope() string
MetadataCollection() string
NotifyInit()
NotifyPrepareTopologyChange(ejectNodes, keepNodes []string, changeType service.TopologyChangeType)
NotifySettingsChange()
NotifySupervisor()
NotifyTopologyChange(msg *TopologyChangeMsg)
NsServerHostPort() string
NsServerNodeCount() int
PauseProducer()
PlannerStats() []*PlannerNodeVbMapping
ResumeProducer()
RebalanceStatus() bool
RebalanceTaskProgress() *RebalanceProgress
RemoveConsumerToken(workerName string)
ResetCounters()
SignalBootstrapFinish()
SignalStartDebugger(token string) error
SignalStopDebugger() error
SetRetryCount(retryCount int64)
SpanBlobDump() map[string]interface{}
Serve()
SourceBucket() string
SourceScope() string
SourceCollection() string
GetSourceKeyspaceID() (KeyspaceID, bool)
GetMetadataKeyspaceID() (KeyspaceID, bool)
GetFunctionInstanceId() string
GetCursorAware() bool
SrcMutation() bool
Stop(context string)
StopRunningConsumers()
String() string
SetTrapEvent(value bool)
TimerDebugStats() map[int]map[string]interface{}
UndeployHandler(msg UndeployAction)
UpdateEncryptionLevel(enforceTLS, encryptOn bool)
UpdateMemoryQuota(quota int64)
UsingTimer() bool
VbDcpEventsRemainingToProcess() map[int]int64
VbDistributionStatsFromMetadata() map[string]map[string]string
VbSeqnoStats() map[int][]map[string]interface{}
WriteAppLog(log string)
WriteDebuggerURL(url string)
WriteDebuggerToken(token string, hostnames []string) error
GetOwner() *Owner
GetFuncScopeDetails() (string, uint32)
FunctionManageBucket() string
FunctionManageScope() string
SetFeatureMatrix(featureMatrix uint32)
}
EventingProducer interface to export functions from eventing_producer
type EventingServiceMgr ¶
type EventingServiceMgr interface {
UpdateBucketGraphFromMetakv(functionName string) error
ResetFailoverStatus()
GetFailoverStatus() (failoverNotifTs int64, changeId string)
CheckLifeCycleOpsDuringRebalance() bool
NotifySupervisorWaitCh()
// TODO: Replace it with getting back the whole application.
GetFunctionId(id Identity) (uint32, error)
GetPreparedApp(appLocation string) Application
GetAdminHTTPPort() string
SetSettings(appLocation string, data []byte, force bool)
}
type EventingSuperSup ¶
type EventingSuperSup interface {
PausingAppList() map[string]string
BootstrapAppList() map[string]string
BootstrapAppStatus(appName string) bool
BootstrapStatus() bool
CheckAndSwitchgocbBucket(bucketName, appName string, setting *SecuritySetting) error
CheckpointBlobDump(appName string) (interface{}, error)
ClearEventStats() []string
DcpFeedBoundary(fnName string) (string, error)
DeployedAppList() []string
GetEventProcessingStats(appName string) map[string]uint64
GetAppCode(appName string) string
GetAppLog(appName string, sz int64) []string
GetAppCompositeState(appName string) int8
GetDcpEventsRemainingToProcess(appName string) uint64
GetDebuggerURL(appName string) (string, error)
GetDeployedApps() map[string]string
GetUndeployedApps() []string
GetEventingConsumerPids(appName string) map[string]int
GetExecutionStats(appName string) map[string]interface{}
GetFailureStats(appName string) map[string]interface{}
GetLatencyStats(appName string) StatsData
GetCurlLatencyStats(appName string) StatsData
GetInsight(appName string) *Insight
GetLcbExceptionsStats(appName string) map[string]uint64
GetLocallyDeployedApps() map[string]string
GetMetaStoreStats(appName string) map[string]uint64
GetBucket(bucketName, appName string) (*couchbase.Bucket, error)
GetMetadataHandle(bucketName, scopeName, collectionName, appName string) (*gocb.Collection, error)
GetKeyspaceID(bucketName, scopeName, collectionName string) (keyspaceID KeyspaceID, err error)
GetCurrentManifestId(bucketName string) (string, error)
GetRegisteredPool() string
GetSeqsProcessed(appName string) map[int]int64
GetNumVbucketsForBucket(bucketName string) int
InternalVbDistributionStats(appName string) map[string]string
KillAllConsumers()
NotifyPrepareTopologyChange(ejectNodes, keepNodes []string, changeType service.TopologyChangeType)
TopologyChangeNotifCallback(kve metakv.KVEntry) error
PlannerStats(appName string) []*PlannerNodeVbMapping
RebalanceStatus() bool
RebalanceTaskProgress(appName string) (*RebalanceProgress, error)
RemoveProducerToken(appName string)
RestPort() string
ResetCounters(appName string) error
SetSecuritySetting(setting *SecuritySetting) bool
GetSecuritySetting() *SecuritySetting
EncryptionChangedDuringLifecycle() bool
GetGocbSubscribedApps(encryptionEnabled bool) map[string]struct{}
SignalStopDebugger(appName string) error
SpanBlobDump(appName string) (interface{}, error)
StopProducer(appName string, msg UndeployAction)
TimerDebugStats(appName string) (map[int]map[string]interface{}, error)
UpdateEncryptionLevel(enforceTLS, encryptOn bool)
VbDcpEventsRemainingToProcess(appName string) map[int]int64
VbDistributionStatsFromMetadata(appName string) map[string]map[string]string
VbSeqnoStats(appName string) (map[int][]map[string]interface{}, error)
WriteDebuggerURL(appName, url string)
WriteDebuggerToken(appName, token string, hostnames []string)
IncWorkerRespawnedCount()
WorkerRespawnedCount() uint32
CheckLifeCycleOpsDuringRebalance() bool
GetBSCSnapshot() (map[string]map[string][]string, error)
GetSystemMemoryQuota() float64
ReadOnDeployDoc(string) (string, string, string)
RemoveOnDeployLeader(string)
PublishOnDeployStatus(string, string)
WritePauseTimestamp(string, time.Time)
RemovePauseTimestampDoc(appName string)
WriteOnDeployMsgBuffer(appName, msg string)
GetOnDeployMsgBuffer(appName string) []string
ClearOnDeployMsgBuffer(appName string)
UpdateFailedOnDeployStatus(appName string)
CleanupOnDeployTimers(appName string, skipCheckpointBlobs bool) error
WatchBucket(keyspace Keyspace, appName string, mType MonitorType) error
UnwatchBucket(keyspace Keyspace, appName string)
}
type FunctionScope ¶
needed only during 1st creation of the function
func GetFunctionScope ¶
func GetFunctionScope(identity Identity) FunctionScope
func (FunctionScope) String ¶
func (fs FunctionScope) String() string
func (*FunctionScope) ToKeyspace ¶
func (fs *FunctionScope) ToKeyspace() *Keyspace
type GlobalStatsCounter ¶
func NewGlobalStatsCounters ¶
func NewGlobalStatsCounters() *GlobalStatsCounter
type HandlerConfig ¶
type HandlerConfig struct {
N1qlPrepareAll bool
LanguageCompatibility string
AllowTransactionMutations bool
AllowSyncDocuments bool
CursorAware bool
AggDCPFeedMemCap int64
CheckpointInterval int
IdleCheckpointInterval int
CPPWorkerThrCount int
ExecuteTimerRoutineCount int
ExecutionTimeout int
CursorCheckpointTimeout int
FeedbackBatchSize int
FeedbackQueueCap int64
FeedbackReadBufferSize int
HandlerHeaders []string
LcbInstCapacity int
N1qlConsistency string
LogLevel string
SocketWriteBatchSize int
SourceKeyspace *Keyspace
StatsLogInterval int
StreamBoundary DcpStreamBoundary
TimerContextSize int64
TimerQueueMemCap uint64
TimerQueueSize uint64
UndeployRoutineCount int
WorkerCount int
WorkerQueueCap int64
WorkerQueueMemCap int64
WorkerResponseTimeout int
LcbRetryCount int
LcbTimeout int
BucketCacheSize int64
BucketCacheAge int64
NumTimerPartitions int
CurlMaxAllowedRespSize int
}
type HistogramStats ¶
type HistogramStats struct {
// contains filtered or unexported fields
}
func NewHistogramStats ¶
func NewHistogramStats() *HistogramStats
func (*HistogramStats) Copy ¶
func (hs *HistogramStats) Copy() *HistogramStats
func (*HistogramStats) Get ¶
func (hs *HistogramStats) Get() map[string]uint64
func (*HistogramStats) MarshalJSON ¶
func (hs *HistogramStats) MarshalJSON() ([]byte, error)
func (*HistogramStats) PercentileN ¶
func (hs *HistogramStats) PercentileN(p int) int
func (*HistogramStats) Reset ¶
func (hs *HistogramStats) Reset()
func (*HistogramStats) Update ¶
func (hs *HistogramStats) Update(delta map[string]uint64)
func (*HistogramStats) UpdateWithHistogram ¶
func (hs *HistogramStats) UpdateWithHistogram(hs1 *HistogramStats)
type Insight ¶
type Insight struct {
Script string `json:"script"`
Lines map[int]InsightLine `json:"lines"`
}
func NewInsight ¶
func NewInsight() *Insight
func (*Insight) Accumulate ¶
type InsightLine ¶
type InsightLine struct {
CallCount int64 `json:"call_count"`
CallTime float64 `json:"call_time"`
ExceptionCount int64 `json:"error_count"`
LastException string `json:"error_msg"`
LastLog string `json:"last_log"`
}
InsightLine represents a single line of insight
type Keyspace ¶
func (Keyspace) IsWildcard ¶
func (Keyspace) ToFunctionScope ¶
func (k Keyspace) ToFunctionScope() *FunctionScope
type KeyspaceConvertor ¶
type KeyspaceID ¶
type KeyspaceID struct {
Bid string
Cid uint32
Sid uint32
StreamType StreamType
}
func (KeyspaceID) Equals ¶
func (keyspaceID KeyspaceID) Equals(keyspaceID2 KeyspaceID) bool
type KeyspaceName ¶
type LifecycleMsg ¶
type LifecycleMsg struct {
InstanceID string
Applocation application.AppLocation
Description string
Revert bool
DeleteFunction bool
PauseFunction bool
UndeloyFunction bool
}
func (LifecycleMsg) String ¶
func (u LifecycleMsg) String() string
type MarshalledData ¶
type MarshalledData[T any] struct { // contains filtered or unexported fields }
func NewMarshalledData ¶
func NewMarshalledData[T any](val T) *MarshalledData[T]
func (*MarshalledData[T]) GetOriginalKeyspace ¶
func (m *MarshalledData[T]) GetOriginalKeyspace() T
func (*MarshalledData[T]) MarshalJSON ¶
func (m *MarshalledData[T]) MarshalJSON() ([]byte, error)
type MonitorType ¶
type MonitorType int8
const ( SrcWatch MonitorType = iota MetaWatch FunctionScopeWatch )
type OwnershipRoutine ¶
type OwnershipRoutine interface {
GetVbMap(namespace *application.KeyspaceInfo, numVb uint16) (string, []uint16, error)
IsFunctionOwnedByThisNode(namespace *application.KeyspaceInfo) (bool, error)
}
type PlannerNodeVbMapping ¶
type PlannerNodeVbMapping struct {
Hostname string `json:"host_name"`
StartVb int `json:"start_vb"`
VbsCount int `json:"vb_count"`
}
PlannerNodeVbMapping captures the vbucket distribution across all eventing nodes as per planner
type ProcessConfig ¶
type RebalanceConfig ¶
type RebalanceProgress ¶
type SecuritySetting ¶
type Signal ¶
type Signal struct {
// contains filtered or unexported fields
}
func (*Signal) WaitResume ¶
func (n *Signal) WaitResume()
type SingleAppStatusResponse ¶
type Stats ¶
type Stats struct {
ExecutionStats map[string]interface{} `json:"execution_stats"`
FailureStats map[string]interface{} `json:"failure_stats"`
EventProcessingStats map[string]uint64 `json:"event_processing_stats"`
LCBExceptionStats map[string]uint64 `json:"lcb_exception_stats"`
FunctionScope application.Namespace `json:"function_scope"`
FunctionName string `json:"function_name"`
FunctionID uint32 `json:"function_id"`
DcpFeedBoundary string `json:"dcp_feed_boundary"`
EventRemaining map[string]uint64 `json:"events_remaining"`
LcbCredsRequestCounter uint64 `json:"lcb_creds_request_counter,omitempty"`
GoCbCredsRequestCounter uint64 `json:"gocb_creds_request_counter,omitempty"`
// full stats
InternalVbDistributionStats map[string]string `json:"internal_vb_distribution_stats,omitempty"`
WorkerPids map[string]int `json:"worker_pids,omitempty"`
LatencyPercentileStats map[string]int `json:"latency_percentile_stats"`
CurlLatency *HistogramStats `json:"curl_latency_stats,omitempty"`
LatencyHistogram *HistogramStats `json:"latency_stats,omitempty"`
// Full debug stats
ProcessStats map[string]interface{} `json:"function_process_stats,omitempty"`
CheckPointStats map[string]interface{} `json:"function_checkpoint_stats,omitempty"`
VbStats map[string]interface{} `json:"vb_stats,omitempty"`
Insight *Insight `json:"-"`
TempLatencyHistogram *HistogramStats `json:"-"`
ProcessedSeq map[uint16]uint64 `json:"-"`
}
type StatsInterface ¶
type StorageEngine ¶
type StorageEngine string
type StreamType ¶
type StreamType uint8
const ( STREAM_BUCKET StreamType = iota STREAM_SCOPE STREAM_COLLECTION STREAM_UNKNOWN )
type TopologyChangeMsg ¶
type TopologyChangeMsg struct {
CType ChangeType
MsgSource string
}
type UndeployAction ¶
type UndeployAction struct {
UpdateMetakv bool
SkipMetadataCleanup bool
DeleteFunction bool
Reason string
}
func DefaultUndeployAction ¶
func DefaultUndeployAction() UndeployAction
func (UndeployAction) String ¶
func (msg UndeployAction) String() string
Source Files
¶
Click to show internal directories.
Click to hide internal directories.