package main import ( "container/list" "strconv" "sync" "time" ) // deferredTasks is a list of tasks that were deferred for processing to a later time. // This usually happens due to connection errors, protocol errors or per-endpoint limits. var deferredTasks *list.List // taskMutex is a mutex for safe handling of deferred task list. var taskMutex sync.Mutex func init() { deferredTasks = list.New() } // TaskEvent represents all events that can be issued on a Task. type TaskEvent int const ( TE_Generic TaskEvent = iota // undefined or no event // These should terminate a task instantly. TE_NoResponse // connect timed out TE_ReadFailed // read failed or timed out TE_NoService // endpoint does not provide selected service TE_ProtocolError // a service module reported an error during auth attempt TE_Bad // auth attempt completed successfully but credentials were wrong TE_Good // auth attempt completed successfully and the credentials are correct // TODO: proxying TE_ProxyNoResponse // cannot connect to a proxy TE_ProxyError // proxy failed during an exchange with the endpoint TE_ProxyInvalidAuth // authenticated proxy rejected our credentials // These serve as "hints" - they do not necessarily need to // terminate a task, but they can still provide useful // information about an endpoint or a service. TH_NoSuchLogin // login in this task is not present or not valid on a service TH_LoadExceeded // endpoint or service cannot handle this attempt rate TH_Banned // got banned from a service, should try another proxy or wait out the delay TH_WaitRequest // request for a grace time // These are still hints, but they occur very frequently on a Task's normal lifecycle, // effectively making these sort of "notifications" rather than hints. TN_Connected // successfully connected to an endpoint TN_ProxyConnected // successfully connected to a proxy ) func (ev TaskEvent) String() string { return [...]string{"Generic", "No response", "Read failed", "No Service", "Protocol error", "Bad", "Good", "No response from Proxy", "Error from Proxy", "Invalid auth from Proxy", "No Such login (hint)", "Load exceeded (hint)", "Banned (hint)", "Wait request (hint)", "Connected (notify)", "Connected from Proxy (notify)"}[ev] } // A Task represents a single unit of workload. // Every Task is linked to an Endpoint. type Task struct { e *Endpoint login string password string deferUntil time.Time numDeferrals int listElement *list.Element // position in list thread int // thread index } // String returns a string representation of a Task. func (task *Task) String() string { if task == nil { return "" } else { s := task.e.String() + "@" + task.login + ":" + task.password if task.thread > 0 { s = "[" + strconv.Itoa(task.thread) + "] " + s } return s } } // Defer sends a Task to the deferred queue. func (task *Task) Defer(addTime time.Duration) { task.deferUntil = time.Now().Add(addTime) task.numDeferrals++ maxDeferrals := getParamInt("task-max-deferrals") if maxDeferrals != -1 && task.numDeferrals >= maxDeferrals { log("task", 5, "giving up on task \"%v\" because it has exhausted its deferral limit (%v)", task, maxDeferrals) return } log("task", 5, "deferring task \"%v\" for %v", task, addTime) taskMutex.Lock() defer taskMutex.Unlock() if task.listElement == nil { task.listElement = deferredTasks.PushBack(task) } } // EventWithParm tells a Task (and its underlying Endpoint) that // something important has happened, or a hint has been acquired. // Returns False if an event resulted in a deletion of its Task. func (task *Task) EventWithParm(event TaskEvent, parm any) bool { log("task", 4, "task event for \"%v\": %v", task, event) if event == TE_Generic { return true // do not process generic events } endpointOk := task.e.EventWithParm(event, parm) // notify the endpoint first logIf(!endpointOk, "task", 4, "endpoint got deleted during a task event for \"%v\"", task) switch event { // on these events, defer a Task only if its Endpoint is being kept case TE_NoResponse: if endpointOk { task.Defer(getParamDurationMS("no-response-delay-ms")) } case TE_ReadFailed: if endpointOk { task.Defer(getParamDurationMS("read-error-delay-ms")) } case TE_ProtocolError: if endpointOk { task.Defer(getParamDurationMS("protocol-error-delay-ms")) } // report about a bad/good auth result case TE_Good: RegisterResult(task, true) case TE_Bad: RegisterResult(task, false) // wait request has occurred: stop processing and instantly wait on a thread case TH_WaitRequest: log("task", 4, "wait request for \"%v\": sleeping for %v", task, parm.(time.Duration)) time.Sleep(parm.(time.Duration)) } return endpointOk } // Event is a parameterless version of EventWithParm. func (task *Task) Event(event TaskEvent) bool { return task.EventWithParm(event, 0) } // GetDeferredTask retrieves a Task from the deferred queue. func GetDeferredTask() (task *Task, waitTime time.Duration) { currentTime := time.Now() if deferredTasks.Len() == 0 { log("task", 5, "deferred task list is empty") return nil, 0 } minWaitTime := time.Time{} for e := deferredTasks.Front(); e != nil; e = e.Next() { dt := e.Value.(*Task) if minWaitTime.IsZero() || (dt.deferUntil.Before(minWaitTime) && dt.deferUntil.After(currentTime)) { minWaitTime = dt.deferUntil } if dt.deferUntil.Before(currentTime) && (dt.e.delayUntil.IsZero() || dt.e.delayUntil.Before(currentTime)) { deferredTasks.Remove(e) return dt, 0 } } return nil, minWaitTime.Sub(currentTime) } // FetchTaskComponents returns all components needed to build a Task. func FetchTaskComponents() (ep *Endpoint, login string, password string, waitTime time.Duration) { var empty bool log("task", 5, "fetching components for a new task") ep, waitTime = FetchEndpoint() if ep == nil { return nil, "", "", waitTime } log("task", 5, "fetched endpoint: \"%v\"", ep) hasLogin := false for { log("task", 5, "fetching password for \"%v\"", ep) password, empty = SrcPassword.FetchOne(&ep.passwordPos, true) if !empty { log("task", 5, "got password for \"%v\": %v, fetching login", ep, password) if !hasLogin { login, empty = SrcLogin.FetchOne(&ep.loginPos, false) } break } log("task", 5, "out of passwords for \"%v\": resetting passwords and fetching new login", ep) ep.passwordPos.Reset() login, empty = SrcLogin.FetchOne(&ep.loginPos, true) hasLogin = true if empty { break } } if !empty { log("task", 5, "got login for \"%v\": %v", ep, login) return ep, login, password, 0 } else { log("task", 5, "out of logins for \"%v\": exhausting endpoint", ep) ep.Exhausted() return FetchTaskComponents() // attempt to fetch again with a new endpoint } } // CreateTask creates a new Task element. It searches through deferred queue first, // then, if nothing was found, it assembles a new (Endpoint, login, password) combination. func CreateTask(threadIdx int) (task *Task, delay time.Duration) { taskMutex.Lock() defer taskMutex.Unlock() task, delayDeferred := GetDeferredTask() if task != nil { log("task", 4, "new task (deferred): %v", task) return task, 0 } ep, login, password, delayEndpoint := FetchTaskComponents() if ep == nil { if delayDeferred == 0 && delayEndpoint == 0 { log("task", 4, "cannot build task, no endpoint") return nil, 0 } else if delayDeferred > delayEndpoint || delayDeferred == 0 { log("task", 4, "delaying task creation (by endpoint delay) for %v", delayEndpoint) return nil, delayEndpoint } else { log("task", 4, "delaying task creation (by deferred delay) for %v", delayDeferred) return nil, delayDeferred } } t := Task{e: ep, login: login, password: password, thread: threadIdx} log("task", 4, "new task: %v", &t) return &t, 0 }