You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

595 lines
26 KiB

#package require logger
package provide shellthread [namespace eval shellthread {
variable version
set version 1.6
}]
package require Thread
namespace eval shellthread {
proc iso8601 {{tsmicros ""}} {
if {$tsmicros eq ""} {
set tsmicros [clock micros]
} else {
set microsnow [clock micros]
if {[string length $tsmicros] != [string length $microsnow]} {
error "iso8601 requires 'clock micros' or empty string to create timestamp"
}
}
set seconds [expr {$tsmicros / 1000000}]
return [clock format $seconds -format "%Y-%m-%d_%H-%M-%S"]
}
}
namespace eval shellthread::worker {
variable settings
variable sysloghost_port
variable sock
variable logfile ""
variable fd
variable client_ids [list]
variable ts_start_micros
variable errorlist [list]
variable inpipe ""
proc bgerror {args} {
variable errorlist
lappend errorlist $args
}
proc send_errors_now {tidcli} {
variable errorlist
thread::send -async $tidcli [list shellthread::manager::report_worker_errors [list worker_tid [thread::id] errors $errorlist]]
}
proc add_client_tid {tidcli} {
variable client_ids
if {$tidcli ni $client_ids} {
lappend client_ids $tidcli
}
}
proc init {tidclient start_m settingsdict} {
variable sysloghost_port
variable logfile
variable settings
interp bgerror {} shellthread::worker::bgerror
package require overtype
variable client_ids
variable ts_start_micros
lappend client_ids $tidclient
set ts_start_micros $start_m
set defaults [list -raw 0 -file "" -syslog "" -direction out]
set settings [dict merge $defaults $settingsdict]
set syslog [dict get $settings -syslog]
if {[string length $syslog]} {
lassign [split $syslog :] s_host s_port
set sysloghost_port [list $s_host $s_port]
} else {
set sysloghost_port ""
}
if {[catch {package require udp} errm]} {
#disable rather than bomb and interfere with any -file being written
set sysloghost_port ""
}
set logfile [dict get $settings -file]
}
proc start_pipe_read {source readchan args} {
#assume 1 inpipe for now
variable inpipe
variable sysloghost_port
variable logfile
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#get buffering setting from the channel as it was set prior to thread::transfer
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set writebuffering line
#set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
#can configure $writechan -buffering $writebuffering
}
}
chan configure $readchan -translation lf
if {$readchan ni [chan names]} {
error "shellthread::worker::start_pipe_read - inpipe not configured. Use shellthread::manager::set_pipe_read_from_client to thread::transfer the pipe end"
}
set inpipe $readchan
#::shellthread::worker::log $inpipe 0 - $source - info "START PIPE READ HELLO\n" line
chan configure $readchan -blocking 0
#::shellthread::worker::log $inpipe 0 - $source - info "START PIPE READ HELLO2 readbuffering: $readbuffering syslog $sysloghost_port filename $logfile" line
set waitvar ::shellthread::worker::wait($inpipe,[clock micros])
chan event $readchan readable [list apply {{chan source waitfor readbuffering writebuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
::shellthread::worker::log pipe 0 - $source - info $chunk\n $writebuffering
} else {
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
}
} else {
set chunk [chan read $chan]
::shellthread::worker::log pipe 0 - $source - info $chunk $writebuffering
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $chan
}
}} $readchan $source $waitvar $readbuffering $writebuffering]
#::shellthread::worker::log $inpipe 0 - $source - info "START PIPE READ HELLO3 vwaiting on $waitvar\n" line
vwait $waitvar
}
proc start_pipe_write {source writechan args} {
variable outpipe
set defaults [dict create -buffering \uFFFF ]
set opts [dict merge $defaults $args]
#todo!
set readchan stdin
if {[dict exists $opts -readbuffering]} {
set readbuffering [dict get $opts -readbuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
set readbuffering [chan configure $readchan -buffering]
} else {
set readbuffering [dict get $opts -buffering]
chan configure $readchan -buffering $readbuffering
}
}
if {[dict exists $opts -writebuffering]} {
set writebuffering [dict get $opts -writebuffering]
} else {
if {[dict get $opts -buffering] eq "\uFFFF"} {
#nothing explicitly set - take from transferred channel
set writebuffering [chan configure $writechan -buffering]
} else {
set writebuffering [dict get $opts -buffering]
can configure $writechan -buffering $writebuffering
}
}
if {$writechan ni [chan names]} {
error "shellthread::worker::start_pipe_write - outpipe not configured. Use shellthread::manager::set_pipe_write_to_client to thread::transfer the pipe end"
}
set outpipe $writechan
chan configure $readchan -blocking 0
chan configure $writechan -blocking 0
set waitvar ::shellthread::worker::wait($outpipe,[clock micros])
chan event $readchan readable [list apply {{chan writechan source waitfor readbuffering} {
if {$readbuffering eq "line"} {
set chunksize [chan gets $chan chunk]
if {$chunksize >= 0} {
if {![chan eof $chan]} {
puts $writechan $chunk
} else {
puts -nonewline $writechan $chunk
}
}
} else {
set chunk [chan read $chan]
puts -nonewline $writechan $chunk
}
if {[chan eof $chan]} {
chan event $chan readable {}
set $waitfor "pipe"
chan close $writechan
if {$chan ne "stdin"} {
chan close $chan
}
}
}} $readchan $writechan $source $waitvar $readbuffering]
vwait $waitvar
}
proc _initsock {} {
variable sysloghost_port
variable sock
if {[string length $sysloghost_port]} {
if {[catch {fconfigure $sock} state]} {
set sock [udp_open]
fconfigure $sock -buffering none -translation binary
fconfigure $sock -remote $sysloghost_port
}
}
}
proc _reconnect {} {
variable sock
catch {close $sock}
_initsock
return [fconfigure $sock]
}
proc send_info {client_tid ts_sent source msg} {
set ts_received [clock micros]
set lag_micros [expr {$ts_received - $ts_sent}]
set lag [expr {$lag_micros / 1000000.0}] ;#lag as x.xxxxxx seconds
log $client_tid $ts_sent $lag $source - info $msg line 1
}
proc log {client_tid ts_sent lag source service level msg writebuffering {islog 0}} {
variable sock
variable fd
variable sysloghost_port
variable logfile
variable settings
set logchunk $msg
if {![dict get $settings -raw]} {
set tail_crlf 0
set tail_lf 0
set tail_cr 0
#for cooked - always remove the trailing newline before splitting..
#
#note that if we got our data from reading a non-line-buffered binary channel - then this naive line splitting will not split neatly for mixed line-endings.
#
#Possibly not critical as cooked is for logging and we are still preserving all \r and \n chars - but review and consider implementing a better split
#but add it back exactly as it was afterwards
#we can always split on \n - and any adjacent \r will be preserved in the rejoin
set lastchar [string range $logchunk end end]
if {[string range $logchunk end-1 end] eq "\r\n"} {
set tail_crlf 1
set logchunk [string range $logchunk 0 end-2]
} else {
if {$lastchar eq "\n"} {
set tail_lf 1
set logchunk [string range $logchunk 0 end-1]
} elseif {$lastchar eq "\r"} {
#\r line-endings are obsolete..and unlikely... and ugly as they can hide characters on the console. but we'll pass through anyway.
set tail_cr 1
set logchunk [string range $logchunk 0 end-1]
} else {
#possibly a single line with no linefeed.. or has linefeeds only in the middle
}
}
if {$ts_sent != 0} {
set micros [lindex [split [expr {$ts_sent / 1000000.0}] .] end]
set time_info [::shellthread::iso8601 $ts_sent].$micros
#set time_info "${time_info}+$lag"
set lagfp "+[format %f $lag]"
} else {
#from pipe - no ts_sent/lag info available
set time_info ""
set lagfp ""
}
set idtail [string range $client_tid end-8 end] ;#enough for display purposes id - mostly zeros anyway
set col0 [string repeat " " 9]
set col1 [string repeat " " 27]
set col2 [string repeat " " 11]
set col3 [string repeat " " 20]
#do not columnize the final data column or append to tail - or we could muck up the crlf integrity
lassign [list [overtype::left $col0 $idtail] [overtype::left $col1 $time_info] [overtype::left $col2 $lagfp] [overtype::left $col3 $source]] c0 c1 c2 c3
#split on \n no matter the actual line-ending in use
#shouldn't matter as long as we don't add anything at the end of the line other than the raw data
#ie - don't quote or add spaces
set lines [split $logchunk \n]
set i 1
set outlines [list]
foreach ln $lines {
if {$i == 1} {
lappend outlines "$c0 $c1 $c2 $c3 $ln"
} else {
lappend outlines "$c0 $c1 $col2 $c3 $ln"
}
incr i
}
if {$tail_lf} {
set logchunk "[join $outlines \n]\n"
} elseif {$tail_crlf} {
set logchunk "[join $outlines \r\n]\r\n"
} elseif {$tail_cr} {
set logchunk "[join $outlines \r]\r"
} else {
#no trailing linefeed
set logchunk [join $outlines \n]
}
#set logchunk "[overtype::left $col0 $idtail] [overtype::left $col1 $time_info] [overtype::left $col2 "+$lagfp"] [overtype::left $col3 $source] $msg"
}
if {[string length $sysloghost_port]} {
_initsock
catch {puts -nonewline $sock $logchunk}
}
#todo - sockets etc?
if {[string length $logfile]} {
#todo - setting to maintain open filehandle and reduce io.
# possible settings for buffersize - and maybe logrotation, although this could be left to client
#for now - default to safe option of open/close each write despite the overhead.
set fd [open $logfile a]
chan configure $fd -translation auto -buffering $writebuffering
#whether line buffered or not - by now our logchunk includes newlines
puts -nonewline $fd $logchunk
close $fd
}
}
# - withdraw just this client
proc finish {tidclient} {
variable client_ids
if {($tidclient in $clientids) && ([llength $clientids] == 1)} {
terminate $tidclient
} else {
set posn [lsearch $client_ids $tidclient]
set client_ids [lreplace $clientids $posn $posn]
}
}
#allow any client to terminate
proc terminate {tidclient} {
variable sock
variable client_ids
if {$tidclient in $client_ids} {
catch {close $sock}
set client_ids [list]
return 1
} else {
return 0
}
}
}
namespace eval shellthread::manager {
variable workers [dict create]
variable worker_errors [list]
variable log_threads
#new datastructure regarding workers and sourcetags required.
#one worker can service multiple sourcetags - but each sourcetag may be used by multiple threads too.
#generally each thread will use a specific sourcetag - but we may have pools doing similar things which log to same destination.
#
#As a convention we may use a sourcetag for the thread which started the worker that isn't actually used for logging - but as a common target for joins
#If the thread which started the thread calls leave_worker with that 'primary' sourcetag it means others won't be able to use that target - which seems reasonable.
#If another thread want's to maintain joinability beyond the span provided by the starting client,
#it can join with both the primary tag and a tag it will actually use for logging.
#A thread can join the logger with any existingtag - not just the 'primary'
#(which is arbitrary anyway. It will usually be the first in the list - but may be unsubscribed by clients and disappear)
proc join_worker {client_tid existingtag sourcetaglist} {
#todo - allow a source to piggyback on existing worker by referencing one of the sourcetags already using the worker
}
proc leave_worker {client_tid sourcetaglist} {
#todo
#unsub this client_tid from the sourcetags in the sourcetaglist. if no more client_tids exist for sourcetag, remove sourcetag,
#if no more sourcetags - close worker
}
#it is up to caller to use a unique sourcetag (e.g by prefixing with own thread::id etc)
# This allows multiple threads to more easily write to the same named sourcetag if necessary
# todo - change sourcetag for a list of tags which will be handled by the same thread. e.g for multiple threads logging to same file
#
# todo - some protection mechanism for case where target is a file to stop creation of multiple worker threads writing to same file.
# Even if we use open fd,close fd wrapped around writes.. it is probably undesirable to have multiple threads with same target
# On the other hand socket targets such as UDP can happily be written to by multiple threads.
# For now the mechanism is that a call to new_worker (rename to open_worker?) will join the same thread if a sourcetag matches..
# but, as sourcetags can get removed(unsubbed via leave_worker) this doesn't guarantee two threads with same -file settings won't fight.
# Also.. the settingsdict is ignored when joining with a tag that exists.. this is problematic.. e.g logrotation where previous file still being written by existing worker
# todo - rename 'sourcetag' concept to 'targettag' ?? the concept is a mixture of both.. it is somewhat analagous to a syslog 'facility'
# probably new_worker should disallow auto-joining and we allow different workers to handle same tags simultaneously to support overlap during logrotation etc.
proc new_worker {sourcetaglist {settingsdict {}}} {
variable workers
set ts_start [clock micros]
set tidclient [thread::id]
set sourcetag [lindex $sourcetaglist 0] ;#todo - use all
if {[dict exists $workers $sourcetag]} {
set winfo [dict get $workers $sourcetag]
if {[thread::exists [dict get $winfo tid]]} {
#add our client-info to existing worker thread
dict lappend winfo list_client_tids $tidclient
dict set workers $sourcetag $winfo ;#writeback
return [dict get $winfo tid]
}
}
#set ts_start [::shellthread::iso8601]
set tidworker [thread::create -preserved]
set init_script [string map [list %ts_start% $ts_start %mp% [tcl::tm::list] %ap% $::auto_path %tidcli% $tidclient %sd% $settingsdict] {
#set tclbase [file dirname [file dirname [info nameofexecutable]]]
#set tcllib $tclbase/lib
#if {$tcllib ni $::auto_path} {
# lappend ::auto_path $tcllib
#}
set ::settingsinfo [dict create %sd%]
#if the executable running things is something like a tclkit,
# then it's likely we will need to use the caller's auto_path and tcl::tm::list to find things
#The caller can tune the thread's package search by providing a settingsdict
if {![dict exists $::settingsinfo tcl_tm_list]} {
tcl::tm::add %mp%
} else {
tcl::tm::remove {*}[tcl::tm::list]
tcl::tm::add {*}[dict get $::settingsinfo tcl_tm_list]
}
if {![dict exists $::settingsinfo auto_path]} {
set ::auto_path [list %ap%]
} else {
set ::auto_path [dict get $::settingsinfo auto_path]
}
package require Thread
package require shellthread
if {![catch {::shellthread::worker::init %tidcli% %ts_start% $::settingsinfo} errmsg]} {
unset ::settingsinfo
set ::shellthread_init "ok"
} else {
unset ::settingsinfo
set ::shellthread_init "err $errmsg"
}
}]
thread::send -async $tidworker $init_script
#thread::send $tidworker $init_script
set winfo [dict create tid $tidworker list_client_tids [list $tidclient] ts_start $ts_start ts_end_list [list]]
dict set workers $sourcetag $winfo
return $tidworker
}
proc set_pipe_read_from_client {tag_pipename worker_tid rchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_read_from_client source/pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $rchan
#start_pipe_read will vwait - so we have to send async
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_read $tag_pipename $rchan]
#client may start writing immediately - but presumably it will buffer in fifo2
}
proc set_pipe_write_to_client {tag_pipename worker_tid wchan args} {
variable workers
if {![dict exists $workers $tag_pipename]} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename not found"
}
set match_worker_tid [dict get $workers $tag_pipename tid]
if {$worker_tid ne $match_worker_tid} {
error "workerthread::manager::set_pipe_write_to_client pipename $tag_pipename workert_tid mismatch '$worker_tid' vs existing:'$match_worker_tid'"
}
#buffering set during channel creation will be preserved on thread::transfer
thread::transfer $worker_tid $wchan
thread::send -async $worker_tid [list ::shellthread::worker::start_pipe_write $tag_pipename $wchan]
}
proc write_log {source msg args} {
variable workers
set ts_micros_sent [clock micros]
set defaults [list -async 1 -level info]
set opts [dict merge $defaults $args]
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
if {![thread::exists $tidworker]} {
set tidworker [new_worker $source]
}
} else {
#auto create with no requirement to call new_worker.. warn?
set tidworker [new_worker $source]
}
set client_tid [thread::id]
if {[dict get $opts -async]} {
thread::send -async $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
} else {
thread::send $tidworker [list ::shellthread::worker::send_info $client_tid $ts_micros_sent $source $msg]
}
}
proc report_worker_errors {errdict} {
variable workers
set reporting_tid [dict get $errdict worker_tid]
dict for {src srcinfo} $workers {
if {[dict get $srcinfo tid] eq $reporting_tid} {
dict set srcinfo errors [dict get $errdict errors]
dict set workers $src $srcinfo ;#writeback updated
break
}
}
}
proc close_worker {source {timeout 2500}} {
variable workers
variable worker_errors
set ts_now [clock micros]
#puts stderr "close_worker $source"
if {[dict exists $workers $source]} {
set tidworker [dict get $workers $source tid]
set ts_end_list [dict get $workers $source ts_end_list]
if {[llength $ts_end_list]} {
set last_end_ts [lindex $ts_end_list end]
if {[expr {(($tsnow - $last_end_ts) / 1000) >= $timeout}]} {
lappend ts_end_list $ts_now
dict set workers $source ts_end_list $ts_end_list
} else {
#existing close in progress.. assume it will work
return
}
}
if {[thread::exists $tidworker]} {
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source still running.. terminating"
set timeoutarr($source) 0
after $timeout [list set timeoutarr($source) 2]
thread::send -async $tidworker [list shellthread::worker::send_errors_now [thread::id]]
thread::send -async $tidworker [list shellthread::worker::terminate [thread::id]] timeoutarr($source)
#thread::send -async $tidworker [string map [list %tidclient% [thread::id]] {
# shellthread::worker::terminate %tidclient%
#}] timeoutarr($source)
vwait timeoutarr($source)
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE1"
thread::release $tidworker
#puts stderr "shellthread::manager::close_worker: thread $tidworker for source $source DONE2"
if {[dict exists $workers $source errors]} {
set errlist [dict get $workers $source errors]
if {[llength $errlist]} {
lappend worker_errors [list $source [dict get $workers $source]]
}
}
dict unset workers $source
}
}
#puts stdout "close_worker $source - end"
}
#worker errors only available for a source after close_worker called on that source
#It is possible for there to be multiple entries for a source because new_worker can be called multiple times with same sourcetag,
# e.g if a thread
proc get_and_clear_errors {source} {
variable worker_errors
set source_errors [lsearch -all -inline -index 0 $worker_errors $source]
set worker_errors [lsearch -all -inline -index 0 -not $worker_errors $source]
return $source_errors
}
}