package main import ( "container/list" "errors" "net" "strconv" "strings" "sync" "time" ) type Address struct { ip string // TODO: switch to a static 16-byte array port int v6 bool } type Endpoint struct { addr Address loginPos SourcePos passwordPos SourcePos delayUntil time.Time normalList *list.Element delayedList *list.Element goodConn int badConn int consecutiveGoodConn int consecutiveBadConn int protoErrors int consecutiveProtoErrors int readErrors int consecutiveReadErrors int mutex sync.Mutex deleted bool // set to TRUE to mark this endpoint as deleted // unused, for now rtt float32 heuristicBanAPS int heuristicBanPPS int lastPacketAt time.Time // when was the last packet sent? lastAttemptAt time.Time // same, but for attempts } var endpoints *list.List // Contains all active endpoints var delayedEndpoints *list.List // Contains endpoints that got delayed // A mutex for synchronizing Endpoint collections. var globalEndpointMutex sync.Mutex // String transforms an Endpoint to a string representation compatible with Dialer interface. func (e *Endpoint) String() string { if e.addr.v6 { return "[" + e.addr.ip + "]:" + strconv.Itoa(e.addr.port) } else { return e.addr.ip + ":" + strconv.Itoa(e.addr.port) } } // Delete deletes an endpoint from global storage. // This method assumes that Endpoint's mutex was already taken. func (e *Endpoint) Delete() { globalEndpointMutex.Lock() defer globalEndpointMutex.Unlock() e.delayUntil = time.Time{} if e.delayedList != nil { log("ep", 3, "deleting delayed endpoint \"%v\"", e) delayedEndpoints.Remove(e.delayedList) e.delayedList = nil } if e.normalList != nil { log("ep", 3, "deleting endpoint \"%v\"", e) endpoints.Remove(e.normalList) e.normalList = nil } e.deleted = true } // Delay marks an Endpoint as "delayed" with the specified time duration // and causes it to move to the delayed queue. // This method assumes that Endpoint's mutex was already taken. func (e *Endpoint) Delay(addTime time.Duration) { if e.deleted { return } log("ep", 5, "delaying endpoint \"%v\" for %v", e, addTime) e.delayUntil = time.Now().Add(addTime) e.MigrateToDelayed() } // MigrateToDelayed moves an Endpoint to a delayed queue. // Endpoint mutex is assumed to be taken. func (e *Endpoint) MigrateToDelayed() { endpointMutex.Lock() defer endpointMutex.Unlock() if e.delayedList != nil { // already in a delayed list log("ep", 5, "cannot migrate endpoint \"%v\" to delayed list: already in the list", e) } else { log("ep", 5, "migrating endpoint \"%v\" to delayed list", e) e.delayedList = delayedEndpoints.PushBack(e) if e.normalList != nil { endpoints.Remove(e.normalList) e.normalList = nil } } } // MigrateToNormal moves an Endpoint to a normal queue. // Endpoint mutex is assumed to be taken. func (e *Endpoint) MigrateToNormal() { endpointMutex.Lock() defer endpointMutex.Unlock() if e.normalList != nil { log("ep", 5, "cannot migrate endpoint \"%v\" to normal list: already in the list", e) } else { log("ep", 5, "migrating endpoint \"%v\" to normal list", e) e.normalList = endpoints.PushBack(e) if e.delayedList != nil { delayedEndpoints.Remove(e.delayedList) e.delayedList = nil } } } // SkipLogin gets the endpoint's current login, // compares it with user-defined login and skips (advances) it if // both logins are equal. func (e *Endpoint) SkipLogin(login) { // attempt to fetch next login curLogin, empty := SrcLogin.FetchOne(&e.loginPos, false) if curLogin == login && !empty { // this login has not yet been exhausted? // reset password pos e.passwordPos.Reset() // fetch but ignore result SrcLogin.FetchOne(&e.loginPos, true) log("ep", 3, "advanced to next login for \"%v\"", e) } } // NoResponse is an event handler that gets called when // an Endpoint does not respond to a connection request. func (e *Endpoint) NoResponse() bool { e.mutex.Lock() defer e.mutex.Unlock() e.badConn++ if e.consecutiveGoodConn == 0 { e.consecutiveBadConn++ } else { e.consecutiveGoodConn = 0 e.consecutiveBadConn = 1 } // 1. always bail after X consecutive bad conns if e.consecutiveBadConn >= CfgGetInt("max-bad-conn") { log("ep", 3, "deleting \"%v\" due to max-bad-conn", e) e.Delete() return false } // 2. after a good conn, always allow at most X bad conns if e.goodConn > 0 && e.consecutiveBadConn <= CfgGetInt("max-bad-after-good-conn") { log("ep", 3, "keeping \"%v\" around due to max-bad-after-good-conn", e) e.Delay(CfgGetDurationMS("no-response-delay-ms")) return true } // 3. always allow at most X bad conns if e.consecutiveBadConn < CfgGetInt("min-bad-conn") { log("ep", 3, "keeping \"%v\" around due to min-bad-conn", e) e.Delay(CfgGetDurationMS("no-response-delay-ms")) return true } // 4. bad conn/good conn ratio must not be higher than X if e.goodConn > 0 && (float64(e.badConn)/float64(e.goodConn)) <= CfgGetFloat("conn-ratio") { log("ep", 3, "keeping \"%v\" around due to conn-ratio", e) e.Delay(CfgGetDurationMS("no-response-delay-ms")) return true } // otherwise, just delete it log("ep", 3, "deleting \"%v\" due to no applicable grace conditions", e) e.Delete() return false } // ProtocolError is an event handler that gets called when // an Endpoint responds with wrong or missing data. func (e *Endpoint) ProtocolError() bool { e.mutex.Lock() defer e.mutex.Unlock() e.protoErrors++ e.consecutiveProtoErrors++ // 1. always bail after X consecutive protocol errors if e.consecutiveProtoErrors >= CfgGetInt("max-proto-errors") { log("ep", 3, "deleting \"%v\" due to max-proto-errors", e) e.Delete() return false } // 2. always allow at most X consecutive protocol errors if e.consecutiveProtoErrors < CfgGetInt("min-proto-errors") { log("ep", 3, "keeping \"%v\" around due to min-proto-errors", e) e.Delay(CfgGetDurationMS("protocol-error-delay-ms")) return true } // 3. bad conn/good conn ratio must not be higher than X if e.goodConn > 0 && (float64(e.protoErrors)/float64(e.goodConn)) <= CfgGetFloat("proto-error-ratio") { log("ep", 3, "keeping \"%v\" around due to proto-error-ratio", e) e.Delay(CfgGetDurationMS("protocol-error-delay-ms")) return true } // otherwise, just delete it log("ep", 3, "deleting \"%v\" due to no applicable grace conditions", e) e.Delete() return false } // Bad is an event handler that gets called when // an authentication attempt to an Endpoint fails. func (e *Endpoint) Bad() { e.mutex.Lock() defer e.mutex.Unlock() e.consecutiveProtoErrors = 0 // The endpoint may be in delayed queue, so push it back to the normal queue. e.MigrateToNormal() } // Good is an event handler that gets called when // an authentication attempt to an Endpoint succeeds. func (e *Endpoint) Good(login) { e.mutex.Lock() defer e.mutex.Unlock() e.consecutiveProtoErrors = 0 if !CfgGetSwitch("keep-endpoint-on-good") { e.Delete() } else { e.MigrateToNormal() e.SkipLogin(login) } } // Connected is an event handler that gets called when // a connection attempt to an Endpoint succeeds. func (e *Endpoint) Connected() { e.mutex.Lock() defer e.mutex.Unlock() e.goodConn++ if e.consecutiveBadConn == 0 { e.consecutiveGoodConn++ } else { e.consecutiveBadConn = 0 e.consecutiveGoodConn = 1 } } // NoSuchLogin is an event handler that gets called when // a service module determines that a login does not present // on an Endpoint and therefore can be excluded from processing. func (e *Endpoint) NoSuchLogin(login string) { e.mutex.Lock() defer e.mutex.Unlock() e.SkipLogin(login) } // EventWithParm tells an Endpoint that something important has happened, // or a hint has been acquired. // It is normally called from a Task handler. // Returns False if an event resulted in a deletion of its Endpoint. func (e *Endpoint) EventWithParm(event TaskEvent, parm any) bool { log("ep", 4, "endpoint event for \"%v\": %v", e, event) if event == TE_Generic { return true // do not process generic events } switch event { case TE_NoResponse: return e.NoResponse() case TE_ProtocolError: return e.ProtocolError() case TE_Good: e.Good(parm.(string)) return false case TE_Bad: e.Bad() case TN_Connected: e.Connected() case TH_NoSuchLogin: e.NoSuchLogin(parm.(string)) } return true // keep this endpoint } // Event is a parameterless version of EventWithParm. func (e *Endpoint) Event(event TaskEvent) bool { return e.EventWithParm(event, 0) } // Exhausted gets called when an endpoint no longer has any valid logins and passwords, // thus it may be deleted. func (e *Endpoint) Exhausted() { e.mutex.Lock() defer e.mutex.Unlock() e.Delete() } // GetDelayedEndpoint retrieves an Endpoint from the delayed list. func GetDelayedEndpoint() (e *Endpoint, waitTime time.Duration) { currentTime := time.Now() if delayedEndpoints.Empty() { log("ep", 5, "delayed endpoint list is empty") return nil, 0 } it := delayedEndpoints.IteratorAt(delayedEndpoints.Left()) for { k, v := it.Key().(time.Time), it.Value().(*Endpoint) if v == nil { log("ep", 5, "!!! empty delayed endpoint!!!") return nil, 0 } if k.After(currentTime) { log("ep", 5, "no delayed endpoints can be processed at this time") return nil, k.Sub(currentTime) } if k.Before(v.delayUntil) { log("ep", 5, "delayed endpoint was re-delayed: removing lingering definition") defer delayedEndpoints.Remove(k) it.Next() continue } if v.delayUntil.IsZero() { log("ep", 5, "delayed endpoint is already in normal queue: removing lingering definition") defer delayedEndpoints.Remove(k) it.Next() continue } defer delayedEndpoints.Remove(k) return v, 0 } log("ep", 5, "delayed endpoint list was holding only lingering definitions and is now empty") return nil, 0 } // FetchEndpoint retrieves an endpoint: first, a delayed RB tree is queried, // then, if nothing is found, a normal list is searched, // and (TODO) if this list is empty or will soon be emptied, // a new batch of endpoints gets created. func FetchEndpoint() (e *Endpoint, waitTime time.Duration) { endpointMutex.Lock() defer endpointMutex.Unlock() e, waitTime = GetDelayedEndpoint() if e != nil { log("ep", 4, "fetched a delayed endpoint: \"%v\"", e) return e, 0 } el := endpoints.Front() if el == nil { if waitTime == 0 { log("ep", 1, "out of endpoints") return nil, 0 } log("ep", 4, "all endpoints are delayed, waiting for %v", waitTime) return nil, waitTime } endpoints.MoveToBack(el) e = el.Value.(*Endpoint) log("ep", 4, "fetched an endpoint: \"%v\"", e) return e, 0 } // Safety feature, to avoid expanding subnets into a huge amount of IPs. const maxNetmaskSize = 22 // expands into /10 for IPv4 // RegisterEndpoint builds an Endpoint and puts it to a global list of endpoints. func RegisterEndpoint(ip string, ports []int, isIPv6 bool) int { for _, port := range ports { ep := Endpoint{addr: Address{ip: ip, port: port, v6: isIPv6}} ep.loginPos.Reset() ep.passwordPos.Reset() ep.listElement = endpoints.PushBack(&ep) log("ep", 3, "ok registered: %v", &ep) } return len(ports) } func incIP(ip net.IP) { for j := len(ip) - 1; j >= 0; j-- { ip[j]++ if ip[j] > 0 { break } } } func parseCIDR(ip string, ports []int, isIPv6 bool) int { na, nm, err := net.ParseCIDR(ip) if err != nil { log("ep", 0, "failed to parse CIDR notation for \"%v\": %v", ip, err.Error()) return 0 } mask, maskBits := nm.Mask.Size() if mask < maskBits-maxNetmaskSize { log("ep", 0, "ignoring out of safe bounds CIDR netmask for \"%v\": %v (max: %v, allowed: %v)", ip, mask, maskBits, maxNetmaskSize) return 0 } curHost := 0 maxHost := 1<<(maskBits-mask) - 1 numParsed := 0 strict := CfgGetSwitch("strict-subnets") log("ep", 2, "expanding CIDR: \"%v\" to %v hosts", ip, maxHost+1) for expIP := na.Mask(nm.Mask); nm.Contains(expIP); incIP(expIP) { if strict && (curHost == 0 || curHost == maxHost) && maskBits-mask >= 2 { log("ep", 1, "ignoring network/broadcast address due to strict-subnets: \"%v\"", expIP.String()) } else { numParsed += parseIPPorts(expIP.String(), ports, isIPv6) } curHost++ } return numParsed } func parseIPPorts(ip string, ports []int, isIPv6 bool) int { // ip may be a domain name, a CIDR subnet or an IP address // CIDR subnets must be expanded to plain IPs if strings.LastIndex(ip, "/") >= 0 { // this is a CIDR subnet return parseCIDR(ip, ports, isIPv6) } else if strings.Count(ip, "/") > 1 { // invalid CIDR notation log("ep", 0, "invalid CIDR subnet format: \"%v\", ignoring", ip) return 0 } else { // otherwise, just register return RegisterEndpoint(ip, ports, isIPv6) } } func extractIPAndPort(str string, skippedIPv6 *int) (ip string, port int, err error) { var portString string ip, portString, err = net.SplitHostPort(str) if err != nil { return "", 0, err } port, err = strconv.Atoi(portString) if err != nil { return "", 0, err } if port <= 0 || port > 65535 { return "", 0, errors.New("invalid port: " + strconv.Itoa(port)) } return ip, port, nil } func ParseEndpoints(source []string) { log("ep", 1, "parsing endpoints") totalIPv6Skipped := 0 numParsed := 0 for _, str := range source { if !strings.Contains(str, ":") { // no ":": this is an ipv4/dn without port, // parse it with all known ports numParsed += parseIPPorts(str, CfgGetIntSlice("port"), false) } else { // either ipv4/dn with port, or ipv6 with/without port isIPv6 := strings.Count(str, ":") > 1 if isIPv6 && CfgGetSwitch("no-ipv6") { log("ep", 1, "skipping ipv6 target \"%v\" due to no-ipv6", str) totalIPv6Skipped++ continue } if !strings.Contains(str, "]:") && strings.Contains(str, "::") { // ipv6 without port numParsed += parseIPPorts(str, CfgGetIntSlice("port"), true) continue } ip, port, err := extractIPAndPort(str, &totalIPv6Skipped) if err != nil { log("ep", 0, "failed to extract ip/port for \"%v\": %v", str, err.Error()) continue } ports := []int{port} // append all default ports if CfgGetSwitch("append-default-ports") { for _, port2 := range CfgGetIntSlice("port") { if port != port2 { ports = append(ports, port2) } } } numParsed += parseIPPorts(ip, ports, isIPv6) } } logIf(totalIPv6Skipped > 0, "ep", 0, "skipping %v IPv6 targets due to no-ipv6 flag", totalIPv6Skipped) log("ep", 1, "finished parsing endpoints: got %v, total %v", numParsed, endpoints.Len()) } func init() { endpoints = list.New() delayedEndpoints = list.New() CfgRegister("port", []int{8291}, "one or more default ports") CfgRegister("max-aps", 5, "maximum number of attempts per second for an endpoint") CfgRegisterSwitch("no-ipv6", "skip IPv6 entries") CfgRegisterSwitch("append-default-ports", "always append default ports even for targets in host:port format") CfgRegisterSwitch("strict-subnets", "strict subnet behaviour: ignore network and broadcast addresses in /30 and bigger subnets") CfgRegisterSwitch("keep-endpoint-on-good", "keep processing endpoint if a login/password was found") CfgRegister("conn-ratio", 0.15, "keep a failed endpoint if its bad/good connection ratio is lower than this value") CfgRegister("max-bad-after-good-conn", 5, "how many consecutive bad connections to allow after a good connection") CfgRegister("max-bad-conn", 20, "always remove endpoint after this many consecutive bad connections") CfgRegister("min-bad-conn", 2, "do not consider removing an endpoint if it does not have this many consecutive bad connections") CfgRegister("proto-error-ratio", 0.25, "keep endpoints with a protocol error if their protocol error ratio is lower than this value") CfgRegister("max-proto-errors", 20, "always remove endpoint after this many consecutive protocol errors") CfgRegister("min-proto-errors", 4, "do not consider removing an endpoint if it does not have this many consecutive protocol errors") CfgRegister("read-error-ratio", 0.25, "keep endpoints with a read error if their read error ratio is lower than this value") CfgRegister("max-read-errors", 20, "always remove endpoint after this many consecutive read errors") CfgRegister("min-read-errors", 3, "do not consider removing an endpoint if it does not have this many consecutive read errors") CfgRegister("no-response-delay-ms", 2000, "wait for this number of ms if an endpoint does not respond") CfgRegister("read-error-delay-ms", 5000, "wait for this number of ms if an endpoint returns a read error") CfgRegister("protocol-error-delay-ms", 5000, "wait for this number of ms if an endpoint returns a protocol error") }