package main // thread.go: handling of worker threads import ( "sync" "time" ) // maxSafeThreads is a safeguard to prevent creation of too many threads at once. const maxSafeThreads = 5000 // ThreadService creates, starts up and waits for all threads. func ThreadService() { numThreads := getParamInt("threads") failIf(numThreads > maxSafeThreads, "too many threads (max %v)", maxSafeThreads) log("thread", 0, "initializing %v threads", numThreads) c := make(chan bool) var wg sync.WaitGroup for i := 1; i <= numThreads; i++ { wg.Add(1) go threadEntryPoint(c, i, &wg) } threadDelay := getParamDurationMS("thread-delay-ms") log("thread", 0, "starting %v threads", numThreads) for i := 1; i <= numThreads; i++ { c <- true if threadDelay > 0 { time.Sleep(threadDelay) } } log("thread", 1, "waiting for threads") wg.Wait() log("thread", 1, "finished waiting for threads") } // threadEntryPoint is the main entrypoint for a work thread. func threadEntryPoint(c chan bool, threadIdx int, wg *sync.WaitGroup) { <-c log("thread", 3, "starting loop for thread %v", threadIdx) for threadWork() { } log("thread", 3, "exiting thread %v", threadIdx) wg.Done() } // threadWork processes a single work item for a thread. func threadWork() bool { task, delay := CreateTask() if task == nil { if delay > 0 { log("thread", 3, "no active endpoints available, sleeping for %v", delay) time.Sleep(delay) return true } else { log("thread", 3, "no endpoints available (active and deferred), stopping thread loop") return false } } conn, err := NewConnection(task.e) if err != nil { task.EventWithParm(TE_NoResponse, err) return true } task.Event(TN_Connected) log("thread", 2, "trying %v", task) res, err := TryLogin(task, conn) if err != nil { task.Event(TE_ProtocolError) } else { if res && err == nil { task.Event(TE_Good) } else { task.Event(TE_Bad) } } return true }