package main import ( rbt "github.com/emirpasic/gods/trees/redblacktree" rbtUtils "github.com/emirpasic/gods/utils" "net" "sync" "time" ) // TaskEvent represents all events that can be issued on a Task. type TaskEvent int const ( TE_Generic TaskEvent = iota // undefined or no event // These should terminate a task instantly. TE_NoResponse // connect timed out TE_ReadFailed // read failed or timed out TE_NoService // endpoint does not provide selected service TE_ProtocolError // a service module reported an error during auth attempt TE_Bad // auth attempt completed successfully but credentials were wrong TE_Good // auth attempt completed successfully and the credentials are correct // TODO: proxying TE_ProxyNoResponse // cannot connect to a proxy TE_ProxyError // proxy failed during an exchange with the endpoint TE_ProxyInvalidAuth // authenticated proxy rejected our credentials // These serve as "hints" - they do not necessarily need to // terminate a task, but they can still provide useful // information about an endpoint or a service. TH_NoSuchLogin // login in this task is not present or not valid on a service TH_LoadExceeded // endpoint or service cannot handle this attempt rate TH_Banned // got banned from a service, should try another proxy or wait out the delay TH_WaitRequest // request for a grace time // These are still hints, but they occur very frequently on a Task's normal lifecycle, // effectively making these sort of "notifications" rather than hints. TN_Connected // successfully connected to an endpoint TN_ProxyConnected // successfully connected to a proxy ) func (ev TaskEvent) String() string { return [...]string{"Generic", "No response", "Read failed", "No Service", "Protocol error", "Bad", "Good", "No response from Proxy", "Error from Proxy", "Invalid auth from Proxy", "No Such login (hint)", "Load exceeded (hint)", "Banned (hint)", "Wait request (hint)", "Connected (notify)", "Connected from Proxy (notify)"}[ev] } // A Task represents a single unit of workload. // Every Task is linked to an Endpoint. type Task struct { e *Endpoint login, password string deferUntil time.Time numDeferrals int // this should not be in Task struct! conn net.Conn // ??? do we even need this here good bool } // maxSafeThreads is a safeguard to prevent creation of too many threads at once. const maxSafeThreads = 5000 // deferredTasks is a list of tasks that were deferred for processing to a later time. // This usually happens due to connection errors, protocol errors or per-endpoint limits. var deferredTasks *rbt.Tree // taskMutex is a mutex for safe handling of RB tree. var taskMutex sync.Mutex // String returns a string representation of a Task. func (task *Task) String() string { if task == nil { return "" } else { return task.e.String() + "@" + task.login + ":" + task.password } } // Defer sends a Task to the deferred queue. func (task *Task) Defer(addTime time.Duration) { task.deferUntil = time.Now().Add(addTime) task.numDeferrals++ // tell the endpoint that we got deferred, // so it won't be selected until the deferral time has passed // task.e.SetDeferralTime(task.deferUntil) // FIXME: this isn't needed, endpoints can handle their own delays maxDeferrals := CfgGetInt("task-max-deferrals") if maxDeferrals != -1 && task.numDeferrals >= maxDeferrals { log("task", 5, "giving up on task \"%v\" because it has exhausted its deferral limit (%v)", task, maxDeferrals) return } log("task", 5, "deferring task \"%v\" for %v", task, addTime) taskMutex.Lock() defer taskMutex.Unlock() deferredTasks.Put(task.deferUntil, task) } // EventWithParm tells a Task (and its underlying Endpoint) that // something important has happened, or a hint has been acquired. // Returns False if an event resulted in a deletion of its Task. func (task *Task) EventWithParm(event TaskEvent, parm any) bool { log("task", 4, "task event for \"%v\": %v", task, event) if event == TE_Generic { return true // do not process generic events } res := task.e.EventWithParm(event, parm) // notify the endpoint first switch event { // on these events, defer a Task only if its Endpoint is being kept case TE_NoResponse: if res { task.Defer(CfgGetDurationMS("no-response-delay-ms")) } case TE_ReadFailed: if res { task.Defer(CfgGetDurationMS("read-error-delay-ms")) } case TE_ProtocolError: if res { task.Defer(CfgGetDurationMS("protocol-error-delay-ms")) } // report about a bad/good auth result case TE_Good: RegisterResult(task, true) case TE_Bad: RegisterResult(task, false) // wait request has occurred: stop processing and instantly wait on a thread case TH_WaitRequest: log("task", 4, "wait request for \"%v\": sleeping for %v", task, parm.(time.Duration)) time.Sleep(parm.(time.Duration)) } return res } // Event is a parameterless version of EventWithParm. func (task *Task) Event(event TaskEvent) bool { return task.EventWithParm(event, 0) } // GetDeferredTask retrieves a Task from the deferred queue. func GetDeferredTask() (task *Task, waitTime time.Duration) { currentTime := time.Now() if deferredTasks.Empty() { log("task", 5, "deferred task list is empty") return nil, 0 } // check if a deferred task's endpoint is OK to fetch - // sometimes, a task is OK to fetch but the endpoint was delayed by something else it := deferredTasks.IteratorAt(deferredTasks.Left()) for { k, v := it.Key().(time.Time), it.Value().(*Task) if k.After(currentTime) { log("task", 5, "deferred tasks cannot yet be processed at this time") return nil, k.Sub(currentTime) } if k.Before(v.deferUntil) { log("task", 5, "deferred task was re-deferred: removing its previous definition") defer deferredTasks.Remove(k) it.Next() continue } if !v.e.delayUntil.IsZero() && v.e.delayUntil.After(currentTime) { // skip this task: deferred task is OK, but its endpoint is delayed it.Next() continue } defer deferredTasks.Remove(k) return v, 0 } log("task", 5, "deferred tasks are OK for processing but their endpoints cannot yet be processed at this time") return nil, 0 } // FetchTaskComponents returns all components needed to build a Task. func FetchTaskComponents() (ep *Endpoint, login string, password string, waitTime time.Duration) { var empty bool log("task", 5, "fetching new endpoint") ep, waitTime = FetchEndpoint() if ep == nil { return nil, "", "", waitTime } log("task", 5, "fetched endpoint: \"%v\"", ep) for { log("task", 5, "fetching password for \"%v\"", ep) password, empty = SrcPassword.FetchOne(&ep.passwordPos, true) if !empty { break } log("task", 5, "out of passwords for \"%v\": resetting and fetching new login", ep) ep.passwordPos.Reset() login, empty = SrcLogin.FetchOne(&ep.loginPos, true) } log("task", 5, "got password for \"%v\": %v, fetching login", ep, password) login, empty = SrcLogin.FetchOne(&ep.loginPos, false) if !empty { log("task", 5, "got login for \"%v\": %v", ep, login) return ep, login, password, 0 } else { log("task", 5, "out of logins for \"%v\": exhausting endpoint", ep) ep.Exhausted() return FetchTaskComponents() // attempt to fetch again } } // CreateTask creates a new Task element. It searches through deferred queue first, // then, if nothing was found, it assembles a new (Endpoint, login, password) combination. func CreateTask() (task *Task, delay time.Duration) { taskMutex.Lock() defer taskMutex.Unlock() task, delayDeferred := GetDeferredTask() if task != nil { log("task", 4, "new task (deferred): %v", task) task.conn = nil task.good = false return task, 0 } ep, login, password, delaySource := FetchTaskComponents() if ep == nil { if delayDeferred == 0 && delaySource == 0 { log("task", 4, "cannot build task, no endpoint") return nil, 0 } else if delayDeferred > delaySource || delayDeferred == 0 { log("task", 4, "delaying task creation (by source delay) for %v", delaySource) return nil, delaySource } else { log("task", 4, "delaying task creation (by deferred delay) for %v", delayDeferred) return nil, delayDeferred } } t := Task{} t.e = ep t.login = login t.password = password t.good = false log("task", 4, "new task: %v", &t) return &t, 0 } func init() { deferredTasks = rbt.NewWith(rbtUtils.TimeComparator) CfgRegister("threads", 3, "how many threads to use") CfgRegister("thread-delay-ms", 10, "separate threads at startup for this amount of ms") CfgRegister("connect-timeout-ms", 3000, "") CfgRegister("read-timeout-ms", 2000, "") // using a very high limit for now, but this should actually be set to -1 CfgRegister("task-max-deferrals", 30000, "how many deferrals are allowed for a single task. -1 to disable") CfgRegisterAlias("t", "threads") }