#package require logger package provide shellthread [namespace eval shellthread { variable version set version 1.6.1 }] package require Thread namespace eval shellthread { proc iso8601 {{tsmicros ""}} { if {$tsmicros eq ""} { set tsmicros [clock micros] } else { set microsnow [clock micros] if {[string length $tsmicros] != [string length $microsnow]} { error "iso8601 requires 'clock micros' or empty string to create timestamp" } } set seconds [expr {$tsmicros / 1000000}] return [clock format $seconds -format "%Y-%m-%d_%H-%M-%S"] } } namespace eval shellthread::worker { variable settings variable sysloghost_port variable sock variable logfile "" variable fd variable client_ids [list] variable ts_start_micros variable errorlist [list] variable inpipe "" proc bgerror {args} { variable errorlist lappend errorlist $args } proc send_errors_now {tidcli} { variable errorlist thread::send -async $tidcli [list shellthread::manager::report_worker_errors [list worker_tid [thread::id] errors $errorlist]] } proc add_client_tid {tidcli} { variable client_ids if {$tidcli ni $client_ids} { lappend client_ids $tidcli } } proc init {tidclient start_m settingsdict} { variable sysloghost_port variable logfile variable settings interp bgerror {} shellthread::worker::bgerror #package require overtype ;#overtype uses tcllib textutil, punk::char etc - currently too heavyweight in terms of loading time for use in threads. variable client_ids variable ts_start_micros lappend client_ids $tidclient set ts_start_micros $start_m set defaults [list -raw 0 -file "" -syslog "" -direction out] set settings [dict merge $defaults $settingsdict] set syslog [dict get $settings -syslog] if {[string length $syslog]} { lassign [split $syslog :] s_host s_port set sysloghost_port [list $s_host $s_port] if {[catch {package require udp} errm]} { #disable rather than bomb and interfere with any -file being written #review - log/notify? set sysloghost_port "" } } else { set sysloghost_port "" } set logfile [dict get $settings -file] } proc start_pipe_read {source readchan args} { #assume 1 inpipe for now variable inpipe variable sysloghost_port variable logfile set defaults [dict create -buffering \uFFFF ] set opts [dict merge $defaults $args] if {[dict exists $opts -readbuffering]} { set readbuffering [dict get $opts -readbuffering] } else { if {[dict get $opts -buffering] eq "\uFFFF"} { #get buffering setting from the channel as it was set prior to thread::transfer set readbuffering [chan configure $readchan -buffering] } else { set readbuffering [dict get $opts -buffering] chan configure $readchan -buffering $readbuffering } } if {[dict exists $opts -writebuffering]} { set writebuffering [dict get $opts -writebuffering] } else { if {[dict get $opts -buffering] eq "\uFFFF"} { set writebuffering line #set writebuffering [chan configure $writechan -buffering] } else { set writebuffering [dict get $opts -buffering] #can configure $writechan -buffering $writebuffering } } chan configure $readchan -translation lf if {$readchan ni [chan names]} { error "shellthread::worker::start_pipe_read - inpipe not configured. Use shellthread::manager::set_pipe_read_from_client to thread::transfer the pipe end" } set inpipe $readchan chan configure $readchan -blocking 0 set waitvar ::shellthread::worker::wait($inpipe,[clock micros]) #tcl::chan::fifo2 based pipe seems slower to establish events upon than Memchan chan event $readchan readable [list ::shellthread::worker::pipe_read $readchan $source $waitvar $readbuffering $writebuffering] vwait $waitvar } proc pipe_read {chan source waitfor readbuffering writebuffering} { if {$readbuffering eq "line"} { set chunksize [chan gets $chan chunk] if {$chunksize >= 0} { if {![chan eof $chan]} { ::shellthread::worker::log pipe 0 - $source - info $chunk\n $writebuffering } else { ::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering } } } else { set chunk [chan read $chan] ::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering } if {[chan eof $chan]} { chan event $chan readable {} set $waitfor "pipe" chan close $chan } } proc start_pipe_write {source writechan args} { variable outpipe set defaults [dict create -buffering \uFFFF ] set opts [dict merge $defaults $args] #todo! set readchan stdin if {[dict exists $opts -readbuffering]} { set readbuffering [dict get $opts -readbuffering] } else { if {[dict get $opts -buffering] eq "\uFFFF"} { set readbuffering [chan configure $readchan -buffering] } else { set readbuffering [dict get $opts -buffering] chan configure $readchan -buffering $readbuffering } } if {[dict exists $opts -writebuffering]} { set writebuffering [dict get $opts -writebuffering] } else { if {[dict get $opts -buffering] eq "\uFFFF"} { #nothing explicitly set - take from transferred channel set writebuffering [chan configure $writechan -buffering] } else { set writebuffering [dict get $opts -buffering] can configure $writechan -buffering $writebuffering } } if {$writechan ni [chan names]} { error "shellthread::worker::start_pipe_write - outpipe not configured. Use shellthread::manager::set_pipe_write_to_client to thread::transfer the pipe end" } set outpipe $writechan chan configure $readchan -blocking 0 chan configure $writechan -blocking 0 set waitvar ::shellthread::worker::wait($outpipe,[clock micros]) chan event $readchan readable [list apply {{chan writechan source waitfor readbuffering} { if {$readbuffering eq "line"} { set chunksize [chan gets $chan chunk] if {$chunksize >= 0} { if {![chan eof $chan]} { puts $writechan $chunk } else { puts -nonewline $writechan $chunk } } } else { set chunk [chan read $chan] puts -nonewline $writechan $chunk } if {[chan eof $chan]} { chan event $chan readable {} set $waitfor "pipe" chan close $writechan if {$chan ne "stdin"} { chan close $chan } } }} $readchan $writechan $source $waitvar $readbuffering] vwait $waitvar } proc _initsock {} { variable sysloghost_port variable sock if {[string length $sysloghost_port]} { if {[catch {fconfigure $sock} state]} { set sock [udp_open] fconfigure $sock -buffering none -translation binary fconfigure $sock -remote $sysloghost_port } } } proc _reconnect {} { variable sock catch {close $sock} _initsock return [fconfigure $sock] } proc send_info {client_tid ts_sent source msg} { set ts_received [clock micros] set lag_micros [expr {$ts_received - $ts_sent}] set lag [expr {$lag_micros / 1000000.0}] ;#lag as x.xxxxxx seconds log $client_tid $ts_sent $lag $source - info $msg line 1 } proc log {client_tid ts_sent lag source service level msg writebuffering {islog 0}} { variable sock variable fd variable sysloghost_port variable logfile variable settings set logchunk $msg if {![dict get $settings -raw]} { set tail_crlf 0 set tail_lf 0 set tail_cr 0 #for cooked - always remove the trailing newline before splitting.. # #note that if we got our data from reading a non-line-buffered binary channel - then this naive line splitting will not split neatly for mixed line-endings. # #Possibly not critical as cooked is for logging and we are still preserving all \r and \n chars - but review and consider implementing a better split #but add it back exactly as it was afterwards #we can always split on \n - and any adjacent \r will be preserved in the rejoin set lastchar [string range $logchunk end end] if {[string range $logchunk end-1 end] eq "\r\n"} { set tail_crlf 1 set logchunk [string range $logchunk 0 end-2] } else { if {$lastchar eq "\n"} { set tail_lf 1 set logchunk [string range $logchunk 0 end-1] } elseif {$lastchar eq "\r"} { #\r line-endings are obsolete..and unlikely... and ugly as they can hide characters on the console. but we'll pass through anyway. set tail_cr 1 set logchunk [string range $logchunk 0 end-1] } else { #possibly a single line with no linefeed.. or has linefeeds only in the middle } } if {$ts_sent != 0} { set micros [lindex [split [expr {$ts_sent / 1000000.0}] .] end] set time_info [::shellthread::iso8601 $ts_sent].$micros #set time_info "${time_info}+$lag" set lagfp "+[format %f $lag]" } else { #from pipe - no ts_sent/lag info available set time_info "" set lagfp "" } set idtail [string range $client_tid end-8 end] ;#enough for display purposes id - mostly zeros anyway #set col0 [string repeat " " 9] #set col1 [string repeat " " 27] #set col2 [string repeat " " 11] #set col3 [string repeat " " 22] ##do not columnize the final data column or append to tail - or we could muck up the crlf integrity #lassign [list [overtype::left $col0 $idtail] [overtype::left $col1 $time_info] [overtype::left $col2 $lagfp] [overtype::left $col3 $source]] c0 c1 c2 c3 set w0 9 set w1 27 set w2 11 set w3 22 ;#review - this can truncate source name without indication tail is missing #do not columnize the final data column or append to tail - or we could muck up the crlf integrity lassign [list \ [format %-${w0}s $idtail]\ [format %-${w1}s $time_info]\ [format %-${w2}s $lagfp]\ [format %-${w3}s $source]\ ] c0 c1 c2 c3 set c2_blank [string repeat " " $w2] #split on \n no matter the actual line-ending in use #shouldn't matter as long as we don't add anything at the end of the line other than the raw data #ie - don't quote or add spaces set lines [split $logchunk \n] set i 1 set outlines [list] foreach ln $lines { if {$i == 1} { lappend outlines "$c0 $c1 $c2 $c3 $ln" } else { lappend outlines "$c0 $c1 $c2_blank $c3 $ln" } incr i } if {$tail_lf} { set logchunk "[join $outlines \n]\n" } elseif {$tail_crlf} { set logchunk "[join $outlines \r\n]\r\n" } elseif {$tail_cr} { set logchunk "[join $outlines \r]\r" } else { #no trailing linefeed set logchunk [join $outlines \n] } #set logchunk "[overtype::left $col0 $idtail] [overtype::left $col1 $time_info] [overtype::left $col2 "+$lagfp"] [overtype::left $col3 $source] $msg" } if {[string length $sysloghost_port]} { _initsock catch {puts -nonewline $sock $logchunk} } #todo - sockets etc? if {[string length $logfile]} { #todo - setting to maintain open filehandle and reduce io. # possible settings for buffersize - and maybe logrotation, although this could be left to client #for now - default to safe option of open/close each write despite the overhead. set fd [open $logfile a] chan configure $fd -translation auto -buffering $writebuffering #whether line buffered or not - by now our logchunk includes newlines puts -nonewline $fd $logchunk close $fd } } # - withdraw just this client proc finish {tidclient} { variable client_ids if {($tidclient in $clientids) && ([llength $clientids] == 1)} { terminate $tidclient } else { set posn [lsearch $client_ids $tidclient] set client_ids [lreplace $clientids $posn $posn] } } #allow any client to terminate proc terminate {tidclient} { variable sock variable fd variable client_ids if {$tidclient in $client_ids} { catch {close $sock} catch {close $fd} set client_ids [list] #review use of thread::release -wait #docs indicate deprecated for regular use, and that we should use thread::join #however.. how can we set a timeout on a thread::join ? #by telling the thread to release itself - we can wait on the thread::send variable # This needs review - because it's unclear that -wait even works on self # (what does it mean to wait for the target thread to exit if the target is self??) thread::release -wait return [thread::id] } else { return "" } } } namespace eval shellthread::manager { variable workers [dict create] variable worker_errors [list] variable timeouts variable free_threads [list] #variable log_threads proc dict_getdef {dictValue args} { if {[llength $args] < 2} { error {wrong # args: should be "dict_getdef dictValue ?key ...? key default"} } set keys [lrange $args 0 end-1] if {[dict exists $dictValue {*}$keys]} { return [dict get $dictValue {*}$keys] } else { return [lindex $args end] } } #new datastructure regarding workers and sourcetags required. #one worker can service multiple sourcetags - but each sourcetag may be used by multiple threads too. #generally each thread will use a specific sourcetag - but we may have pools doing similar things which log to same destination. # #As a convention we may use a sourcetag for the thread which started the worker that isn't actually used for logging - but as a common target for joins #If the thread which started the thread calls leave_worker with that 'primary' sourcetag it means others won't be able to use that target - which seems reasonable. #If another thread want's to maintain joinability beyond the span provided by the starting client, #it can join with both the primary tag and a tag it will actually use for logging. #A thread can join the logger with any existingtag - not just the 'primary' #(which is arbitrary anyway. It will usually be the first in the list - but may be unsubscribed by clients and disappear) proc join_worker {existingtag sourcetaglist} { set client_tid [thread::id] #todo - allow a source to piggyback on existing worker by referencing one of the sourcetags already using the worker } proc new_pipe_worker {sourcetaglist {settingsdict {}}} { if {[dict exists $settingsdict -workertype]} { if {[string tolower [dict get $settingsdict -workertype]] ne "pipe"} { error "new_pipe_worker error: -workertype ne 'pipe'. Set to 'pipe' or leave empty" } } dict set settingsdict -workertype pipe new_worker $sourcetaglist $settingsdict } #it is up to caller to use a unique sourcetag (e.g by prefixing with own thread::id etc) # This allows multiple threads to more easily write to the same named sourcetag if necessary # todo - change sourcetag for a list of tags which will be handled by the same thread. e.g for multiple threads logging to same file # # todo - some protection mechanism for case where target is a file to stop creation of multiple worker threads writing to same file. # Even if we use open fd,close fd wrapped around writes.. it is probably undesirable to have multiple threads with same target # On the other hand socket targets such as UDP can happily be written to by multiple threads. # For now the mechanism is that a call to new_worker (rename to open_worker?) will join the same thread if a sourcetag matches.. # but, as sourcetags can get removed(unsubbed via leave_worker) this doesn't guarantee two threads with same -file settings won't fight. # Also.. the settingsdict is ignored when joining with a tag that exists.. this is problematic.. e.g logrotation where previous file still being written by existing worker # todo - rename 'sourcetag' concept to 'targettag' ?? the concept is a mixture of both.. it is somewhat analagous to a syslog 'facility' # probably new_worker should disallow auto-joining and we allow different workers to handle same tags simultaneously to support overlap during logrotation etc. proc new_worker {sourcetaglist {settingsdict {}}} { variable workers set ts_start [clock micros] set tidclient [thread::id] set sourcetag [lindex $sourcetaglist 0] ;#todo - use all set defaults [dict create\ -workertype message\ ] set settingsdict [dict merge $defaults $settingsdict] set workertype [string tolower [dict get $settingsdict -workertype]] set known_workertypes [list pipe message] if {$workertype ni $known_workertypes} { error "new_worker - unknown -workertype $workertype. Expected one of '$known_workertypes'" } if {[dict exists $workers $sourcetag]} { set winfo [dict get $workers $sourcetag] if {[dict get $winfo tid] ne "noop" && [thread::exists [dict get $winfo tid]]} { #add our client-info to existing worker thread dict lappend winfo list_client_tids $tidclient dict set workers $sourcetag $winfo ;#writeback return [dict get $winfo tid] } } #noop fake worker for empty syslog and empty file if {$workertype eq "message"} { if {[dict_getdef $settingsdict -syslog ""] eq "" && [dict_getdef $settingsdict -file ""] eq ""} { set winfo [dict create tid noop list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype "message"] dict set workers $sourcetag $winfo return noop } } #check if there is an existing unsubscribed thread first #don't use free_threads for pipe workertype for now.. variable free_threads if {$workertype ne "pipe"} { if {[llength $free_threads]} { #todo - re-use from tail - as most likely to have been doing similar work?? review set free_threads [lassign $free_threads tidworker] #todo - keep track of real ts_start of free threads... kill when too old set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list] workertype [dict get $settingsdict -workertype]] #puts stderr "shellfilter::new_worker Re-using free worker thread: $tidworker with tag $sourcetag" dict set workers $sourcetag $winfo return $tidworker } } #set ts_start [::shellthread::iso8601] set tidworker [thread::create -preserved] set init_script [string map [list %ts_start% $ts_start %mp% [tcl::tm::list] %ap% $::auto_path %tidcli% $tidclient %sd% $settingsdict] { #set tclbase [file dirname [file dirname [info nameofexecutable]]] #set tcllib $tclbase/lib #if {$tcllib ni $::auto_path} { # lappend ::auto_path $tcllib #} set ::settingsinfo [dict create %sd%] #if the executable running things is something like a tclkit, # then it's likely we will need to use the caller's auto_path and tcl::tm::list to find things #The caller can tune the thread's package search by providing a settingsdict if {![dict exists $::settingsinfo tcl_tm_list]} { ::tcl::tm::add %mp% } else { tcl::tm::remove {*}[tcl::tm::list] ::tcl::tm::add {*}[dict get $::settingsinfo tcl_tm_list] } if {![dict exists $::settingsinfo auto_path]} { set ::auto_path [list %ap%] } else { set ::auto_path [dict get $::settingsinfo auto_path] } package require Thread package require shellthread if {![catch {::shellthread::worker::init %tidcli% %ts_start% $::settingsinfo} errmsg]} { unset ::settingsinfo set ::shellthread_init "ok" } else { unset ::settingsinfo set ::shellthread_init "err $errmsg" } }] thread::send -async $tidworker $init_script #thread::send $tidworker $init_script set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list]] dict set workers $sourcetag $winfo return $tidworker } proc set_pipe_read_from_client {tag_pipename worker_tid rchan args} { variable workers if {![dict exists $workers $tag_pipename]} { error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename not found" } set match_worker_tid [dict get $workers $tag_pipename tid] if {$worker_tid ne $match_worker_tid} { error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'" } #buffering set during channel creation will be preserved on thread::transfer thread::transfer $worker_tid $rchan #start_pipe_read will vwait - so we have to send async thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_read $tag_pipename $rchan] #client may start writing immediately - but presumably it will buffer in fifo2 } proc set_pipe_write_to_client {tag_pipename worker_tid wchan args} { variable workers if {![dict exists $workers $tag_pipename]} { error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename not found" } set match_worker_tid [dict get $workers $tag_pipename tid] if {$worker_tid ne $match_worker_tid} { error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'" } #buffering set during channel creation will be preserved on thread::transfer thread::transfer $worker_tid $wchan thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_write $tag_pipename $wchan] } proc write_log {source msg args} { variable workers set ts_micros_sent [clock micros] set defaults [list -async 1 -level info] set opts [dict merge $defaults $args] if {[dict exists $workers $source]} { set tidworker [dict get $workers $source tid] if {$tidworker eq "noop"} { return } if {![thread::exists $tidworker]} { # -syslog -file ? set tidworker [new_worker $source] } } else { #auto create with no requirement to call new_worker.. warn? # -syslog -file ? error "write_log no log opened for source: $source" set tidworker [new_worker $source] } set client_tid [thread::id] if {[dict get $opts -async]} { thread::send -async $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg] } else { thread::send $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg] } } proc report_worker_errors {errdict} { variable workers set reporting_tid [dict get $errdict worker_tid] dict for {src srcinfo} $workers { if {[dict get $srcinfo tid] eq $reporting_tid} { dict set srcinfo errors [dict get $errdict errors] dict set workers $src $srcinfo ;#writeback updated break } } } #aka leave_worker #Note that the tags may be on separate workertids, or some tags may share workertids proc unsubscribe {sourcetaglist} { variable workers #workers structure example: #[list sourcetag1 [list tid list_client_tids ] ts_start ts_end_list {}] variable free_threads set mytid [thread::id] ;#caller of shellthread::manager::xxx is the client thread set subscriberless_tags [list] foreach source $sourcetaglist { if {[dict exists $workers $source]} { set list_client_tids [dict get $workers $source list_client_tids] if {[set posn [lsearch $list_client_tids $mytid]] >= 0} { set list_client_tids [lreplace $list_client_tids $posn $posn] dict set workers $source list_client_tids $list_client_tids } if {![llength $list_client_tids]} { lappend subscriberless_tags $source } } } #we've removed our own tid from all the tags - possibly across multiplew workertids, and possibly leaving some workertids with no subscribers for a particular tag - or no subscribers at all. set subscriberless_workers [list] set shuttingdown_workers [list] foreach deadtag $subscriberless_tags { set workertid [dict get $workers $deadtag tid] set worker_tags [get_worker_tagstate $workertid] set subscriber_count 0 set kill_count 0 ;#number of ts_end_list entries - even one indicates thread is doomed foreach taginfo $worker_tags { incr subscriber_count [llength [dict get $taginfo list_client_tids]] incr kill_count [llength [dict get $taginfo ts_end_list]] } if {$subscriber_count == 0} { lappend subscriberless_workers $workertid } if {$kill_count > 0} { lappend shuttingdown_workers $workertid } } #if worker isn't shutting down - add it to free_threads list foreach workertid $subscriberless_workers { if {$workertid ni $shuttingdown_workers} { if {$workertid ni $free_threads && $workertid ne "noop"} { lappend free_threads $workertid } } } #todo #unsub this client_tid from the sourcetags in the sourcetaglist. if no more client_tids exist for sourcetag, remove sourcetag, #if no more sourcetags - add worker to free_threads } proc get_worker_tagstate {workertid} { variable workers set taginfo_list [list] dict for {source sourceinfo} $workers { if {[dict get $sourceinfo tid] eq $workertid} { lappend taginfo_list $sourceinfo } } return $taginfo_list } #finalisation proc shutdown_free_threads {{timeout 2500}} { variable free_threads if {![llength $free_threads]} { return } upvar ::shellthread::manager::timeouts timeoutarr if {[info exists timeoutarr(shutdown_free_threads)]} { #already called return false } #set timeoutarr(shutdown_free_threads) waiting #after $timeout [list set timeoutarr(shutdown_free_threads) timed-out] set ::shellthread::waitfor waiting after $timeout [list set ::shellthread::waitfor] set waiting_for [list] set ended [list] set timedout 0 foreach tid $free_threads { if {[thread::exists $tid]} { lappend waiting_for $tid #thread::send -async $tid [list shellthread::worker::terminate [thread::id]] timeoutarr(shutdown_free_threads) thread::send -async $tid [list shellthread::worker::terminate [thread::id]] ::shellthread::waitfor } } if {[llength $waiting_for]} { for {set i 0} {$i < [llength $waiting_for]} {incr i} { vwait ::shellthread::waitfor if {$::shellthread::waitfor eq "timed-out"} { set timedout 1 break } else { lappend ended $::shellthread::waitfor } } } set free_threads [list] return [dict create existed $waiting_for ended $ended timedout $timedout] } #TODO - important. #REVIEW! #since moving to the unsubscribe mechansm - close_worker $source isn't being called # - we need to set a limit to the number of free threads and shut down excess when detected during unsubscription #instruction to shut-down the thread that has this source. #instruction to shut-down the thread that has this source. proc close_worker {source {timeout 2500}} { variable workers variable worker_errors variable free_threads upvar ::shellthread::manager::timeouts timeoutarr set ts_now [clock micros] #puts stderr "close_worker $source" if {[dict exists $workers $source]} { set tidworker [dict get $workers $source tid] if {$tidworker in $freethreads} { #make sure a thread that is being closed is removed from the free_threads list set posn [lsearch $freethreads $tidworker] set freethreads [lreplace $freethreads $posn $posn] } set mytid [thread::id] set client_tids [dict get $workers $source list_client_tids] if {[set posn [lsearch $client_tids $mytid]] >= 0} { set client_tids [lreplace $client_tids $posn $posn] #remove self from list of clients dict set workers $source list_client_tids $client_tids } set ts_end_list [dict get $workers $source ts_end_list] ;#ts_end_list is just a list of timestamps of closing calls for this source - only one is needed to close, but they may all come in a flurry. if {[llength $ts_end_list]} { set last_end_ts [lindex $ts_end_list end] if {[expr {(($tsnow - $last_end_ts) / 1000) >= $timeout}]} { lappend ts_end_list $ts_now dict set workers $source ts_end_list $ts_end_list } else { #existing close in progress.. assume it will work return } } if {[thread::exists $tidworker]} { #puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source still running.. terminating" #review - timeoutarr is local var (?) set timeoutarr($source) 0 after $timeout [list set timeoutarr($source) 2] thread::send -async $tidworker [list shellthread::worker::send_errors_now [thread::id]] thread::send -async $tidworker [list shellthread::worker::terminate [thread::id]] timeoutarr($source) #thread::send -async $tidworker [string map [list %tidclient% [thread::id]] { # shellthread::worker::terminate %tidclient% #}] timeoutarr($source) vwait timeoutarr($source) #puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE1" thread::release $tidworker #puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE2" if {[dict exists $workers $source errors]} { set errlist [dict get $workers $source errors] if {[llength $errlist]} { lappend worker_errors [list $source [dict get $workers $source]] } } dict unset workers $source } else { #thread may have been closed by call to close_worker with another source with same worker #clear workers record for this source #REVIEW - race condition for re-creation of source with new workerid? #check that record is subscriberless to avoid this if {[llength [dict get $workers $source list_client_tids]] == 0} { dict unset workers $source } } } #puts stdout "close_worker $source - end" } #worker errors only available for a source after close_worker called on that source #It is possible for there to be multiple entries for a source because new_worker can be called multiple times with same sourcetag, proc get_and_clear_errors {source} { variable worker_errors set source_errors [lsearch -all -inline -index 0 $worker_errors $source] set worker_errors [lsearch -all -inline -index 0 -not $worker_errors $source] return $source_errors } }