package main import ( "container/list" "strconv" "sync" "time" ) var endpoints *list.List // Contains all active and ready endpoints var delayedEndpoints *list.List // Contains endpoints that are active, but not ready var globalEndpointMutex sync.Mutex // A mutex for synchronizing Endpoint collections func init() { endpoints = list.New() delayedEndpoints = list.New() registerParam("port", []int{8291}, "one or more default ports") registerParam("max-aps", 5, "maximum number of attempts per second for an endpoint") registerSwitch("keep-endpoint-on-good", "keep processing endpoint even if a good login/password was found") registerParam("conn-ratio", 0.15, "keep a failed endpoint if its bad/good connection ratio is lower than this value") registerParam("max-bad-after-good-conn", 5, "how many consecutive bad connections to allow after a good connection") registerParam("max-bad-conn", 20, "always remove endpoint after this many consecutive bad connections") registerParam("min-bad-conn", 1, "do not consider removing an endpoint if it does not have this many consecutive bad connections") registerParam("proto-error-ratio", 0.25, "keep endpoints with a protocol error if their protocol error ratio is lower than this value") registerParam("max-proto-errors", 20, "always remove endpoint after this many consecutive protocol errors") registerParam("min-proto-errors", 4, "do not consider removing an endpoint if it does not have this many consecutive protocol errors") registerParam("read-error-ratio", 0.25, "keep endpoints with a read error if their read error ratio is lower than this value") registerParam("max-read-errors", 20, "always remove endpoint after this many consecutive read errors") registerParam("min-read-errors", 3, "do not consider removing an endpoint if it does not have this many consecutive read errors") registerParam("no-response-delay-ms", 2000, "wait for this number of ms if an endpoint does not respond") registerParam("read-error-delay-ms", 5000, "wait for this number of ms if an endpoint returns a read error") registerParam("protocol-error-delay-ms", 5000, "wait for this number of ms if an endpoint returns a protocol error") registerParam("discover-percentage", 80, "percentage of threads that should be running host discovery") registerParam("discover-max-total-aps", 200, "max total attempts per second when discovery is not yet finished") registerParam("discover-max-endpoint-aps", 10, "max attempts per second for endpoints when discovery is not yet finished") } // FetchEndpoint retrieves an endpoint: first, a delayed list is queried, // then, if nothing is found, a normal list is searched. // If all endpoints are delayed, a wait time is returned. func FetchEndpoint() (e *Endpoint, waitTime time.Duration) { globalEndpointMutex.Lock() defer globalEndpointMutex.Unlock() log("ep", 4, "fetching an endpoint") e, waitTime = GetDelayedEndpoint() if e != nil { log("ep", 4, "fetched a delayed endpoint: \"%v\"", e) return e, 0 } el := endpoints.Front() if el == nil { if waitTime == 0 { log("ep", 4, "out of endpoints") return nil, 0 } log("ep", 4, "all endpoints are delayed, waiting for %v", waitTime) return nil, waitTime } endpoints.MoveToBack(el) e = el.Value.(*Endpoint) if e.state == ES_Deleted { panic("fetched a deleted endpoint") } log("ep", 4, "fetched a normal endpoint: \"%v\"", e) return e, 0 } // Event is a parameterless version of EventWithParm. func (e *Endpoint) Event(event TaskEvent) bool { return e.EventWithParm(event, 0) } // EventWithParm tells an Endpoint that something important has happened, // or a hint has been acquired. // It is normally called from a Task handler. // Returns False if an event resulted in a deletion of its Endpoint. func (e *Endpoint) EventWithParm(event TaskEvent, parm any) bool { log("ep", 4, "endpoint event for \"%v\": %v", e, event) if event == TE_Generic { return true // do not process generic events } e.TakeMutex() defer e.ReleaseMutex() switch event { case TE_NoResponse: return e.NoResponse() case TE_ProtocolError: return e.ProtocolError() case TE_Good: e.Good(parm.(string)) return false case TE_Bad: e.Bad() case TN_Connected: e.Connected() case TH_NoSuchLogin: e.NoSuchLogin(parm.(string)) } return true // keep this endpoint } func (state EndpointState) String() string { switch state { case ES_Normal: return "normal" case ES_Delayed: return "delayed" case ES_Deleted: return "deleted" } return "unknown" } func (state EndpointState) GetList() *list.List { switch state { case ES_Normal: return endpoints case ES_Delayed: return delayedEndpoints } return nil } // String transforms an Endpoint to a string representation compatible with Dialer interface. func (e *Endpoint) String() string { if e.addr.v6 { return "[" + e.addr.ip + "]:" + strconv.Itoa(e.addr.port) } else { return e.addr.ip + ":" + strconv.Itoa(e.addr.port) } } func (e *Endpoint) GetList() *list.List { return e.state.GetList() } // Delete deletes an endpoint from global storage. // This method assumes that Endpoint's mutex was already taken. func (e *Endpoint) Delete() { globalEndpointMutex.Lock() defer globalEndpointMutex.Unlock() list := e.GetList() if list == nil { log("ep", 3, "cannot delete endpoint \"%v\", not in the list", e) } else { log("ep", 3, "deleting endpoint \"%v\"", e) } e.delayUntil = time.Time{} e.SetStateEx(ES_Deleted, false) } // SetState changes an endpoint's state. func (e *Endpoint) SetStateEx(newState EndpointState, takeMutex bool) { if e.state == newState { log("ep", 5, "ignoring state change for an endpoint \"%v\": already in state \"%v\"", e, e.state) return } oldList := e.GetList() newList := newState.GetList() if takeMutex { globalEndpointMutex.Lock() defer globalEndpointMutex.Unlock() } if e.listElement != nil && oldList != nil { oldList.Remove(e.listElement) } if newList == nil { e.listElement = nil } else { e.listElement = newList.PushBack(e) } e.state = newState } func (e *Endpoint) SetState(newState EndpointState) { e.SetStateEx(newState, true) } // Delay marks an Endpoint as "delayed" for a certain duration // and migrates it to the delayed queue. // This method assumes that Endpoint's mutex was already taken. func (e *Endpoint) Delay(addTime time.Duration) { if e.state == ES_Normal { log("ep", 5, "delaying endpoint \"%v\" for %v", e, addTime) e.delayUntil = time.Now().Add(addTime) e.SetState(ES_Delayed) } else if e.state == ES_Delayed { // endpoints that are already delayed can have their delay time extended further tm := time.Now().Add(addTime) if e.delayUntil.Before(tm) { log("ep", 5, "extending delay deadline for endpoint \"%v\" from %v to %v", e, e.delayUntil, tm) e.delayUntil = tm } } } // SkipLogin gets the endpoint's current login, // compares it with user-defined login and skips (advances) it if // both logins are equal. func (e *Endpoint) SkipLogin(login string) { // attempt to fetch next login curLogin, empty := SrcLogin.FetchOne(&e.loginPos, false) if curLogin == login && !empty { // this login has not yet been exhausted? // reset password pos e.passwordPos.Reset() // fetch but ignore result SrcLogin.FetchOne(&e.loginPos, true) log("ep", 3, "advanced to next login for \"%v\"", e) } } // NoResponse is an event handler that gets called when // an Endpoint does not respond to a connection request. func (e *Endpoint) NoResponse() bool { e.badConn++ if e.consecutiveGoodConn == 0 { e.consecutiveBadConn++ } else { e.consecutiveGoodConn = 0 e.consecutiveBadConn = 1 } // 1. always bail after X consecutive bad conns if e.consecutiveBadConn >= getParamInt("max-bad-conn") { log("ep", 3, "deleting \"%v\" due to max-bad-conn", e) e.Delete() return false } // 2. after a good conn, always allow at most X bad conns if e.goodConn > 0 && e.consecutiveBadConn <= getParamInt("max-bad-after-good-conn") { log("ep", 3, "keeping \"%v\" around due to max-bad-after-good-conn", e) e.Delay(getParamDurationMS("no-response-delay-ms")) return true } // 3. always allow at most X bad conns if e.consecutiveBadConn < getParamInt("min-bad-conn") { log("ep", 3, "keeping \"%v\" around due to min-bad-conn", e) e.Delay(getParamDurationMS("no-response-delay-ms")) return true } // 4. bad conn/good conn ratio must not be higher than X if e.goodConn > 0 && (float64(e.badConn)/float64(e.goodConn)) <= getParamFloat("conn-ratio") { log("ep", 3, "keeping \"%v\" around due to conn-ratio", e) e.Delay(getParamDurationMS("no-response-delay-ms")) return true } // otherwise, just delete it log("ep", 3, "deleting \"%v\" due to no applicable grace conditions", e) e.Delete() return false } // ProtocolError is an event handler that gets called when // an Endpoint responds with wrong or missing data. func (e *Endpoint) ProtocolError() bool { e.protoErrors++ e.consecutiveProtoErrors++ // 1. always bail after X consecutive protocol errors if e.consecutiveProtoErrors >= getParamInt("max-proto-errors") { log("ep", 3, "deleting \"%v\" due to max-proto-errors", e) e.Delete() return false } // 2. always allow at most X consecutive protocol errors if e.consecutiveProtoErrors < getParamInt("min-proto-errors") { log("ep", 3, "keeping \"%v\" around due to min-proto-errors", e) e.Delay(getParamDurationMS("protocol-error-delay-ms")) return true } // 3. bad conn/good conn ratio must not be higher than X if e.goodConn > 0 && (float64(e.protoErrors)/float64(e.goodConn)) <= getParamFloat("proto-error-ratio") { log("ep", 3, "keeping \"%v\" around due to proto-error-ratio", e) e.Delay(getParamDurationMS("protocol-error-delay-ms")) return true } // otherwise, just delete it log("ep", 3, "deleting \"%v\" due to no applicable grace conditions", e) e.Delete() return false } // Bad is an event handler that gets called when // an authentication attempt to an Endpoint fails. func (e *Endpoint) Bad() { e.consecutiveProtoErrors = 0 // The endpoint may be in delayed queue, so push it back to the normal queue. e.SetState(ES_Normal) } // Good is an event handler that gets called when // an authentication attempt to an Endpoint succeeds. func (e *Endpoint) Good(login string) { e.consecutiveProtoErrors = 0 if !getParamSwitch("keep-endpoint-on-good") { e.Delete() } else { e.SetState(ES_Normal) e.SkipLogin(login) } } // Connected is an event handler that gets called when // a connection attempt to an Endpoint succeeds. func (e *Endpoint) Connected() { e.goodConn++ if e.consecutiveBadConn == 0 { e.consecutiveGoodConn++ } else { e.consecutiveBadConn = 0 e.consecutiveGoodConn = 1 } } // NoSuchLogin is an event handler that gets called when // a service module determines that a login does not present // on an Endpoint and therefore can be excluded from processing. func (e *Endpoint) NoSuchLogin(login string) { e.SkipLogin(login) } // Exhausted gets called when an endpoint no longer has any valid logins and passwords, // thus it may be deleted. func (e *Endpoint) Exhausted() { e.TakeMutex() defer e.ReleaseMutex() e.Delete() } // GetDelayedEndpoint retrieves an Endpoint from the delayed list. // globalEndpointMutex must be already taken. func GetDelayedEndpoint() (e *Endpoint, waitTime time.Duration) { currentTime := time.Now() if delayedEndpoints.Len() == 0 { log("ep", 5, "delayed endpoint list is empty") return nil, 0 } minWaitTime := time.Time{} for e := delayedEndpoints.Front(); e != nil; e = e.Next() { dt := e.Value.(*Endpoint) if minWaitTime.IsZero() || (dt.delayUntil.Before(minWaitTime) && dt.delayUntil.After(currentTime)) { minWaitTime = dt.delayUntil } if dt.delayUntil.Before(currentTime) { dt.delayUntil = time.Time{} dt.SetStateEx(ES_Normal, false) return dt, 0 } } if minWaitTime.Before(currentTime) { return nil, 0 } else { return nil, minWaitTime.Sub(currentTime) } } func (e *Endpoint) TakeMutex() { e.mutex.Lock() } func (e *Endpoint) ReleaseMutex() { e.mutex.Unlock() } func (e *Endpoint) RegisterRTT(rtt time.Duration) { const rttAverage = 8 if e.rttCount == 0 { e.rtt = rtt } else { e.rtt = e.rtt*(rttAverage-1)/rttAverage + rtt/rttAverage } e.rttCount++ } type Address struct { ip string // TODO: switch to a static 16-byte array port int v6 bool } type EndpointState int const ( ES_Normal EndpointState = iota ES_Delayed ES_Deleted ) // An Endpoint represents a remote target and stores its persistent data between multiple connections. type Endpoint struct { addr Address // IP address of an endpoint loginPos SourcePos passwordPos SourcePos // login/password cursors listElement *list.Element // position in list state EndpointState // which state an endpoint is in delayUntil time.Time // when this endpoint can be used again // endpoint stats goodConn, badConn, protoErrors, readErrors int consecutiveGoodConn, consecutiveBadConn, consecutiveProtoErrors, consecutiveReadErrors int mutex sync.Mutex // sync primitive rtt time.Duration rttCount uint lastSentAt time.Time lastAttemptAt time.Time lastReceivedAt time.Time }