root/trunk/POE-Component-Pool-Thread/Thread.pm

Revision 32, 13.3 kB (checked in by scott, 2 years ago)

Incremented version number

Line 
1package POE::Component::Pool::Thread;
2# -----------------------------------------------------------------------------
3# "THE BEER-WARE LICENSE" (Revision 43) borrowed from FreeBSD's jail.c:
4# <tag@cpan.org> wrote this file.  As long as you retain this notice you
5# can do whatever you want with this stuff. If we meet some day, and you think
6# this stuff is worth it, you can buy me a beer in return.   Scott S. McCoy
7# -----------------------------------------------------------------------------
8
9use strict;
10use warnings FATAL => "all";
11 no warnings 'numeric'; # grep int hack
12use threads;
13use threads::shared;
14use Thread::Semaphore;
15use Thread::Queue;
16use IO::Handle;
17use POE qw( Pipe::OneWay Filter::Line Wheel::ReadWrite );
18use Fcntl;
19
20# Circumvent warnings...
21BEGIN { run POE::Kernel }
22
23*VERSION = \0.014;
24
25use constant DEBUG => 0;
26
27sub new {
28    die __PACKAGE__, "->new() requires a balanced list" unless @_ % 2;
29
30    my ($type, %opt) = @_;
31   
32    $opt{inline_states} ||= {};
33    $opt{StartThreads}  ||= 0;
34    $opt{MinFree}       ||= 2;
35    $opt{MaxFree}       ||= 10;
36
37    POE::Session->create   
38    ( inline_states => {
39        %{ $opt{inline_states} },
40
41        _start => sub {
42            my ($kernel, $heap) = @_[ KERNEL, HEAP ];
43
44            $kernel->alias_set($opt{Name}) if $opt{Name};
45
46            $heap->{queue} = [];
47
48            my ($pipe_in, $pipe_out) = POE::Pipe::OneWay->new;
49            $heap->{pipe_out} = $pipe_out;
50
51            die "Unable to create pipe" 
52            unless defined $pipe_in and defined $pipe_out;
53
54            $heap->{wheel} = POE::Wheel::ReadWrite->new
55                ( Handle      => $pipe_in,
56                  InputEvent  => "-thread_talkback",
57                  ErrorEvent  => "-thread_talkerror",
58                );
59
60            for (1 .. $opt{StartThreads}) {
61                $kernel->call($_[SESSION], "-spawn_thread");
62            }
63
64            goto $opt{inline_states}{_start} if $opt{inline_states}{_start};
65        },
66
67        _stop => sub {
68            my ($kernel, $heap) = @_[ KERNEL, HEAP ];
69
70            DEBUG && warn "Joining all threads";
71            for my $tid (keys %{ $heap->{thread} }) {
72                $heap->{thread}{$tid}{iqueue}->enqueue("last");
73                $heap->{thread}{$tid}{thread}->join;
74            }
75
76            goto $opt{inline_states}{_stop} if $opt{inline_states}{_stop};
77        },
78
79        _default => sub {
80            die "_default caught state: ", $_[ARG0];
81        },
82
83        -thread_talkerror => sub { die $_[ARG0], $_[ARG2] },
84
85        -thread_talkback => sub {
86            my ($kernel, $heap, $input) = @_[ KERNEL, HEAP, ARG0 ];
87            my ($tid, $command) = ($input =~ m/(\d+): (\w+)/);
88
89            DEBUG and warn "Recieved: $input";
90
91            # Depending upon the settings of perlvar's, its possible we may get
92            # some garbage through here.
93            if (defined $command) {
94                if ($command eq "cleanup") {
95                    $kernel->yield(-execute_cleanup => $tid);
96                }
97                elsif ($command eq "collect") {
98                    $kernel->yield(-collect_garbage => $tid);
99                }
100            }
101        },
102
103        -collect_garbage => sub {
104            DEBUG && warn "GC Called, thread exited";
105           
106            my ($kernel, $session, $heap, $tid) =
107                @_[ KERNEL, SESSION, HEAP, ARG0 ];
108
109            my $tdsc = delete $heap->{thread}{$tid} or return;
110
111            $tdsc->{thread}->join;
112
113            unless ($kernel->refcount_decrement($session->ID, "thread")) {
114                delete $heap->{wheel};
115            }
116
117            delete $tdsc->{$_} for keys %$tdsc;
118        },
119
120        -execute_cleanup => sub {
121            my ($kernel, $session, $heap, $tid) =
122                @_[ KERNEL, SESSION, HEAP, ARG0 ];
123
124            DEBUG && warn "GC Called, thread finished task";
125
126            my $thread = $heap->{thread};
127            my @free   = grep ${ $_->{semaphore} }, values %$thread;
128
129            my $queue  = $heap->{queue};
130            my $rqueue = $heap->{thread}{$tid}{rqueue};
131            my $iqueue = $heap->{thread}{$tid}{iqueue};
132
133            if ($rqueue->pending) {
134                if ($opt{CallBack}) {
135                    DEBUG && warn "Dispatching CallBack";
136                    $opt{CallBack}->( @_[0..ARG0-1], @{$rqueue->dequeue} );
137                }
138            }
139
140            if (@$queue) {
141                my $args = &share([]);
142                push @$args, @{ shift @$queue };
143
144                $iqueue->enqueue($args);
145            }
146            elsif (@free > $opt{MaxFree}) {
147                (shift @free)->{iqueue}->enqueue("last");
148            }
149        },
150
151        -spawn_thread => sub {
152            my ($kernel, $session, $heap) = @_[ KERNEL, SESSION, HEAP ];
153           
154            return if $opt{MaxThreads} == scalar keys %{ $heap->{thread} };
155            DEBUG && warn "Spawning a new thread";
156
157            my $semaphore   = Thread::Semaphore->new;
158            my $iqueue      = Thread::Queue->new;
159            my $rqueue      = Thread::Queue->new;
160            my $pipe_out    = $heap->{pipe_out};
161            my $queue       = $heap->{queue};
162
163            my $thread      = threads->create
164                ( \&thread_entry_point,
165                  $semaphore,
166                  $iqueue,
167                  $rqueue,
168                  fileno($pipe_out),
169                  $opt{EntryPoint} );
170
171            $kernel->refcount_increment($session->ID, "thread");
172
173            $heap->{thread}{$thread->tid} = {
174                semaphore   => $semaphore,
175                iqueue      => $iqueue,
176                rqueue      => $rqueue,
177                thread      => $thread,
178                lifespan    => 0, # Not currently used
179            };
180
181            if (@$queue) {
182                my $args = &share([]);
183                push @$args, @{ shift @$queue };
184
185                $iqueue->enqueue($args);
186            }
187        },
188
189        run => sub {
190            my ($kernel, $heap, @arg) = @_[ KERNEL, HEAP, ARG0 .. $#_ ];
191
192            DEBUG && warn "Assigned a task";
193
194            my $thread = $heap->{thread};
195            my @free   = grep ${ $_->{semaphore} }, values %$thread;
196
197            if (@free) {
198                my $tdsc = shift @free;
199
200                # Trickery so we can pass this through Thread::Queue;
201                my $sharg = &share([]);
202
203                # Just to be polite...
204                lock $sharg;
205                push @$sharg, @arg;
206
207                DEBUG and warn "Enqueueing on ", $tdsc->{thread}->tid;
208
209                $tdsc->{iqueue}->enqueue($sharg);
210            }
211            else {
212                push @{ $heap->{queue} }, [ @arg ];
213            }
214
215            if (@free < $opt{MinFree}) {
216                unless (scalar(keys %$thread) >= $opt{MaxThreads}) {
217                    $kernel->yield("-spawn_thread");
218                }
219            }
220        },
221
222        shutdown => sub {
223            my ($kernel, $heap) = @_[ KERNEL, HEAP ];
224
225            $heap->{shutdown} = 1;
226            $kernel->alias_remove($opt{Name});
227
228            for my $thread (values %{ $heap->{thread} }) {
229                $thread->{iqueue}->enqueue("last");
230            }
231        },
232      },
233    );
234}
235
236sub thread_entry_point {
237    my ($semaphore, $iqueue, $rqueue, $pipe_fd, $task) = @_;
238
239    my $pipe = IO::Handle->new_from_fd($pipe_fd, "a") or die $!;
240
241    # XXX Hack
242    my $code = $task;
243
244    # Just incase
245    local $\ = "\n";
246
247    while (my $action = $iqueue->dequeue) {
248        DEBUG and warn threads->self->tid, ": recieved action";
249        $semaphore->down;
250
251#       lock $action;
252
253        unless (ref $action) {
254            if ($action eq "last") {
255                $$semaphore = -1;
256                last;
257            }
258        }
259
260        else {
261            my $arg = $action;
262#           lock $arg;
263
264            # Just incase...
265            my $result = &share([]);
266            push @$result, $code->(@$arg);
267
268            DEBUG and warn threads->self->tid, ": Enqueuing result: @$result";
269            $rqueue->enqueue($result);
270        }
271
272        DEBUG and warn threads->self->tid, ": Requesting cleanup";
273
274        $pipe->print( threads->self->tid, ": cleanup" );
275        $pipe->flush;
276
277        $semaphore->up;
278    }
279
280    $pipe->print( threads->self->tid, ": collect" );
281    $pipe->flush;
282    DEBUG and warn threads->self->tid, ": Requesting Destruction";
283}
284
2851;
286
287=head1 NAME
288
289POE::Component::Pool::Thread - A POE Managed Boss/Worker threadpool.
290
291=head1 SYNOPSIS
292
293 use POE qw( Component::Pool::Thread );
294 
295 POE::Component::Pool::Thread->new
296    ( MinFree       => 2,
297      MaxFree       => 5,
298      MaxThreads    => 15,
299      StartThrneads => 5,
300      Name          => "ThreadPool",
301      EntryPoint    => \&thread_entry_point,
302      CallBack      => \&result_handler,
303      inline_states => {
304          _start => sub {
305               my ($kernel, $heap) = @_[ KERNEL, HEAP ];
306
307               # We are inside the component session
308               $kernel->yield(run => @arguements);
309
310               $kernel->post(ThreadPool => run => @arguements);
311          },
312      }
313    );
314
315 sub thread_entry_point {
316    my (@arguements) = @_;
317
318    return 1;
319 }
320
321 sub result_handler {
322    my ($kernel, $result) = @_[ KERNEL, ARG0 ];
323
324    $result == 1;
325 }
326
327=head1 DESCRIPTION
328
329This is an expand-on-demand thread pool managed through a POE session in a
330manner that does not interfer with cooperative multitasking.  A single pipe is
331created, each thread communicates its state to the main process through this
332pipe.  No serialization occurs (these are threads, not child processes), so
333execution is very fast.
334
335=head1 RATIONALE
336
337Cooperative Co-routine type programming isn't always available.  Some third
338party software (dependant libraries and/or modules) and particular tasks block
339processing weither you like it or not.
340
341Creation of threads is alot of overhead, infact quite a bit more overhead under
342the current implementation of ithreads than fork is.  Allocating these
343resources before you need them is an obvious solution to this problem, if you
344create the threads and re-use them, they're around when you need them without
345the horrendously slow threads->create() method.
346
347Communicating the results of a threads processing requires allowing it to exit.
348This means you will require the overhead of threads->create() next time you
349need to accomplish this task.  With a thread pool designed in this fasion, the
350main thread itself has its own process loop.  The result of each iteration is
351passed through a thread safe queue, allowing you to collect the results of a
352threads execution without the thread exiting.
353
354=head1 CONSTRUCTOR
355
356=over 4
357
358=item new MANY THINGS
359
360The new constructor is the only package method available with this package.  It
361creates a POE thread pool session which you describe in the following
362arguements.
363
364=over 4
365
366=item EntryPoint CODE
367
368This arguement describes the entry point of the thread and is required.  In the
369actual implementation, this is not actually an entry point.  This is instead a
370coderef the thread will call repeatedly.  The arguements of this subroutine
371will be the arguements recieved by the controlling session.  In order to pass
372references as arguements, each reference must be shared (L<threads::shared>).
373Filehandles and blessed references cannot be shared.  You will have to
374translate them yourself.  With file handles, you can pass simply the file
375descriptor and reopen it in the child thread.  With blessed references, you can
376pass the datastructure only, and rebless the reference in the thread.
377
378=item CallBack CODE
379
380This arguement descirbes the result handler, which is where the captured
381results of a threads last execution are sent.  As with EntryPoint subroutine
382arguements, any data structures you wish to pass through return results must be
383explicitly shared (L<threads::shared>).
384
385=item Name ALIAS
386
387This arguement descirbes the default alias your threadpool session is given.
388
389=item StartThreads INTEGER
390
391This arguement describes the number of threads the component will create during
392its "_start" state, or when the POE Session is being started.  This should be a
393number greater than MinFree and less than or equal to MaxFree.
394
395=item MaxThreads INTEGER
396
397This arguement descirbes the maximum number of threads this component will
398create for this task.  If the component is assigned more tasks than threads, it
399will place the remaining tasks in an internal FIFO queue and assign them
400threads as they complete their tasks.
401
402=item MinFree INTEGER
403
404This arguement sets the minimum number of free threads to maintain.  When the
405component is assigned a new task, if there are less than this number of threads
406available, it will yield a request to create a new thread at the components
407convience.
408
409=item MaxFree INTEGER
410
411This arguement provides the maximum number of free threads to maintain.  Upon
412completion of a task, this value is checked.  If there are more free threads
413than this value available, the oldest thread is asked to shut down.
414
415=back
416
417=back
418
419=head1 INLINE STATES
420
421=over 4
422
423=item run LIST
424
425The run state assigns a task to one of the free threads in the pool, or appends
426the task to the components internal FIFO if no threads are available and our
427thread resources are exhausted.
428
429=item shutdown
430
431This state politely asks all threads to exit, deletes the wheel watching the
432one way pipe threads are using to communicate, removes the session alias and
433awaits a clean session shutdown.
434
435=back
436
437=head1 BUGS
438
439Oh I'm pretty sure of it.  If you find some, let me know.
440
441=head1 THANKS
442
443Matt Cashner
444
445Rocco Caputo
446
447=head1 AUTHOR
448
449Scott McCoy (tag@cpan.org)
450
451=cut
Note: See TracBrowser for help on using the browser.