FennelParallelExecStreamScheduler

From LucidDB Wiki
Jump to: navigation, search

Contents

Overview

The Fennel executor design docs describe a simple non-parallel reference implementation of the stream scheduler. This page documents a parallelized implementation which can be used instead to take advantage of SMP machines.

The implementation described (ParallelExecStreamScheduler) provides vertical parallelism (executing different parts of a stream graph concurrently) rather than horizontal parallelism (concurrently executing copies of the same stream graph on partitioned data sets). However, the implementation could be used to execute a graph after it has been copied for horizontal parallelism. This could also effectively combine the two approaches, e.g. to allow one horizontal-parallelism copy to "catch up" with others in the presence of skew or fluctuating processor availability--however, refinements to the scheduling algorithm are likely to be necessary for this to work well.

An important constraint from the scheduling framework is that adjacent streams can never be executing simultaneously. This allows buffer accessors to be invoked without synchronization (minimizing overhead by isolating all synchronization to the scheduler, at the granularity of per-quantum rather than per-row).

Data Structures

ParallelExecStreamScheduler maintains the following data structures:

  • a thread pool; the task queue inside of this thread pool represents the runnable queue (each entry is a reference to an ExecStream for which a quantum should be executed)
  • a map from stream ID to stream state:
    • SLEEPING: this stream is not currently in a state where it would make sense to execute it, implying one of
      • an input is in underflow
      • an output is in overflow
      • the stream has not yet been reached from a neighbor's execution, and a lazy execution policy is being followed
      • the stream has produced EOS
    • RUNNING: this stream has been submitted to the thread pool and has not yet completed execution (we don't currently distinguish between runnable and running)
    • INHIBITED: this stream is prevented from executing because at least one adjacent stream is currently in state RUNNING
      • for this state, we also keep track of an inhibition count (how many adjacent streams are running)
  • a queue of inhibited streams
  • a result queue of tasks which have completed and are waiting for the scheduler to update itself based on their results (each entry is a reference to an ExecStream which was run, along with an ExecStreamResult code)

Note that instances of ExecStreamGraph contain special sentinel vertices representing output buffers which can be accessed via readStream; these have no associated ExecStream instances, but do have stream map entries to track whether a readStream request is currently being processed.

We may eventually want the thread pool to be shared/reused across statements, but currently it is private to the statement's own scheduler. (For extension projects where a global scheduler is used, a common pool is already implied.)

When executing in a JVM, it's necessary for native threads to be attached and detached correctly, so ThreadPool takes a ThreadTracker callback interface for this purpose and invokes it on thread start/end. The default implementation (e.g. for Fennel unit tests) does nothing, but when Fennel is loaded as part of Farrago, a JavaThreadTracker implementation is supplied which makes the correct JNI calls.

Initialization

When the scheduler is started, it initializes most streams as SLEEPING in the state map. However, inhibition counts for producers feeding into the output sentinel node are set to 1, since except during readStream calls, the output buffer is directly accessible by callers. Each corresponding buffer accessor also receives a requestProduction invocation; this applies the initial "suction" on the graph which will result in upstream execution being scheduled as needed to satisfy readStream requests.

After data structure initialization, the following additional start steps are carrier out:

  1. The thread pool is started.
  2. A manager task is submitted to the thread pool. This is a task which runs until the scheduler is stopped (never yielding), so it ties up one thread from the pool. For this reason, when the thread pool is started, the number of threads requested is degreeOfParallelism+1.

Scheduling

We take the approach that only the manager thread can read and write global scheduler state; worker threads in the thread pool just do their assigned work and synchronize access only to the runnable queue and the result queue. This keeps contention to a minimum. (In an earlier design, worker threads shared the work of scheduler state maintenance, leading to prohibitive contention.) The downside of this design is that the manager thread can become a bottleneck; we may want to experiment with boosting its priority. Think of a manager with a number of workers; as each worker completes a task, he or she comes back to the manager for a new one; as long as the project schedule isn't excessively complicated or bottlenecked, and the manager doesn't get distracted, throughput remains high. This tends to work best with larger page sizes (each quantum is longer, meaning scheduling overhead is a smaller percentage of the overall work performed, and also less chance for contention on queues).

ParallelExecStreamScheduler.png

The manager task is a loop which waits for events in the result queue, and for each one, carries out the following processCompletedTask procedure:

  1. The completed stream's state is set to SLEEPING
  2. The inhibition counts for the stream's neighbors are decremented by 1.
  3. Further processing takes place based on the stream execution's result code:
    • EOS, underflow, or overflow: for any output buffer in a state other than underflow, the corresponding consumer is made runnable (if not inhibited); and for any input buffer in the underflow state, the corresponding producer is made runnable (if not inhibited)
      • making a stream runnable implies incrementing the inhibition count of its neighbors
      • for consumers which are output sentinels, further signalSentinel processing is carried out as described later on
    • quantum expired: the executed stream is made runnable again (since it still has work to do)
  4. The inhibited queue is processed, and any streams which have lost their inhibitions (due to the completion of this execution) are made runnable.

Note that the current algorithm is lazy, in that it does not schedule streams until they are known to be related to the output. An eager algorithm might cause all streams to start pumping out data regardless of demand by consumers.

--Jvs 22:39, 30 July 2008 (PDT): I'm not sure how well the inhibited queue processing is going to scale up with large stream graphs; a more optimized data structure would track inhibition dependencies.

Result Processing

Special handling is required for the output vertices, which are accessed by outside callers. The readStream call is implemented as follows:

  1. If the output buffer is in overflow or EOS, the call returns immediately.
  2. If the output buffer is currently empty, requestProduction is called on its buffer accessor so that the scheduler will invoke upstream nodes.
  3. The output vertex is marked as SLEEPING (otherwise no progress could be made, as producers would be inhibited by the RUNNING state of the output vertex).
  4. A special entry is added to the result queue to signal the manager that the output vertex is now in underflow and needs attention. This entry has the ID of the sentinel vertex, and a result code of EXECRC_BUF_UNDERFLOW. From the perspective of the manager, it looks just like the completion event for a real stream; this eliminates one special case.
  5. readStream waits on a condition variable (sentinelCondition, with the completion condition that the sentinel state must be RUNNING again) this will be signaled once one of the following happens
    • the manager detects a result event for a producer feeding into the sentinel, or
    • an exception is encountered (in this case, readStream terminates by re-throwing the exception, as described in a later section)
  6. Once readStream wakes up without an exception, it returns the accessor for the filled output buffer.

As mentioned previously, signalSentinel is carried out by the manager when it detects a result event related to the sentinel; here are the steps:

  1. increment the inhibition count on the producers for the sentinel
  2. change the sentinel state from SLEEPING to RUNNING
  3. signal the sentinelCondition condition variable

Why have a dedicated manager thread (i.e. why not just let readStream carry out this role)? There are two reasons:

  • in an ExecStreamGraph for a statement containing UDX invocations, there may be concurrent calls to readStream (since each UDX invocation gets its own thread), and our goal is to avoid synchronization in the manager
  • by making the manager autonomous, processing for queries which produce large result sets can be pipelined beyond the scheduler; the scheduler can be prefetching the next batch of results while the readStream caller is processing the previous batch

Applicability

The scheduling algorithm above is useful for stream graphs with a shape like this:

           ___stream1__
          /            \
         /              \
splitter -----stream2----barrier
         \              /
          \___stream3__/

stream1/2/3 can all be executed in parallel. An example would be a LucidDB column-store load, with each stream loading one column. Parallel UNION ALL has a shape similar to the right half of the graph above.

The scheduling algorithm can also yield "bucket brigade" pipelined parallelism for a graph with a shape like this:


stream1--->membuf1--->stream2--->membuf2--->stream3

To achieve pipelined parallelism, ScratchBufferExecStream cannot be used for the memory buffers, since it would only allow one side (either producer or consumer, but not both) to execute at once. The reason for this limitation is that the implementation only has a single buffer page, and ExecStreamBufAccessor does not currently implement circular buffer access. To avoid the complexity of byte-level circular buffer access, we use a simpler page-level approach:

  • introduce new class DoubleBufferExecStream as an alternative for ScratchBufferExecStream
  • DoubleBufferExecStream is similar to ScratchBufferExecStream, but it allocates two buffer pages instead of one, and flips between them--standard double-buffering technique
  • DoubleBufferExecStream set its input edges to underflow AND its output edges to overflow simultaneously to indicate that both producer and consumer can execute in parallel (since they are accessing different buffers)

For non-parallel schedulers, we continue to use ScratchBufferExecStream to avoid wasting buffer pool pages. This is possible since ExecStreamScheduler::createBufferProvisionAdapter is a virtual method, so each scheduler can choose how to instantiate buffer ExecStreams.

Shutdown

The scheduler implements the stop interface method as follows:

  1. Signal the manager to quit
  2. Wait for the manager to shut itself down
  3. Call stop on the thread pool, which waits for any executing threads to complete their work
  4. Clear any pending exception
  5. Discard leftover requests and events from queues

Degree of Parallelism

The number of threads in the pool is controlled by a parameter known as the degree of parallelism. When Fennel is loaded as part of Farrago, this can be controlled from the SQL level by session parameter "degreeOfParallelism":

alter session set "degreeOfParallelism" = 4;

The default value is 1, causing the non-parallel scheduler implementation to be plugged in; any value greater than 1 will result in the parallel scheduler being used.

--Jvs 14:06, 30 July 2008 (PDT): there is a problem here with statement caching; the DOP gets burned into the XMI for the Fennel stream defs, and affects the resulting stream graph construction, but it does not appear as part of the cache key. So, one session might set DOP=4, and compile a statement, leaving the parallelized plan in the cache; then another session might attempt to execute the same statement with DOP=1, find the cached entry, and accidentally execute the parallel stream graph.

--Rchen 17:50, 19 August 2008 (PDT) Is the run-time DOP static? Or it can take into consideration the "load" of the system?

--Jvs 13:27, 21 August 2008 (PDT): currently it is static; if we move to a thread pool shared across statements (or shared scheduler), then we can address global contention, fairness and governance issues

Exception Handling

If a stream encounters an exception while executing, it will be thrown in the pooled thread context rather than the calling thread context. Consequently, the scheduler needs to propagate the exception back to the calling thread context. It does this by wrapping stream execution in a try block, catching the exception, saving it in a state variable pPendingExcn and then signalling the condition variable. When readStream wakes up and sees this state of affairs, it will rethrow the saved exception, now in the caller's context.

(Due to the way exception handling works in C++, it is necessary to clone the original exception object and ask it to re-throw itself; there is no way to re-throw the original exception object in the new context without slicing it, and we can't do that since it may be a Java exception which needs to be preserved as such.)

Streams executing in parallel will carry on with their current quanta, but subsequent attempts to make streams runnable will be treated as NOP's once an exception is pending, causing the scheduler to quiesce itself.

If multiple streams encounter exceptions, the first one wins (subsequent exception reports are ignored).

Asynchronous abort requests are implemented by simply setting pPendingExcn to a new AbortException and signalling the condition variable.

--Jvs 14:06, 30 July 2008 (PDT): for error conditions which are logged rather than thrown, are there any synchronization issues which need to be dealt with? Probably not, since we already have accidental parallelism from UDX threading, but worth reviewing.

Unit Tests

The parallel scheduler is currently unit tested by class ParallelExecStreamSchedulerTest, which executes all of the pre-existing stream tests from ExecStreamTestSuite with degree of parallelism set to 4 by default (this is parameterized, e.g. you can run ParallelExecStreamSchedulerTest degreeOfParallelism=8). Much more is needed to exercise real parallel-relevant scenarios and streams (including exceptions and abort requests), as well as to verify that concurrent execution is actually taking place.

Open Issues

  • tracing needs to be thought out in the parallel context
    • a simple way to deal with it is to only trace from the readStream thread; however, for high-verbosity trace levels, this will drastically alter the scheduling, leading to heisenbugs (think of a manager who spends most of the time reading and writing book-length status reports instead of telling people what to do)
  • stream implementations which rely on dynamic parameters may not work with the parallel executor depending on their access patterns; Julian has been adding generic support for declaring dynamic parameter dependencies from streams, so we may need to figure out how to build on that
  • stream implementations which rely on restart may not work with the parallel executor, since the scheduler is not aware of the restart calls, and they usually are not safe to be run concurrently with execution; if restart is only called when everything upstream is guaranteed to be EOS already, this may not be a problem, but early restarts could spell trouble
  • stream implementations which block for I/O may prevent full utilization
  • there may be cases where additional buffering would improve throughput
    • in some cases, this could mean a generalization of DoubleBufferExecStream (cycling through multiple buffers instead of just two); or a version of SegBufferExecStream which allows for concurrent reads and writes on an arbitrary-length buffer (i.e. disk-backed as needed)
    • in other cases, this could mean inserting buffers where currently there are none in the graph (e.g. in between a splitter and a downstream column loader)

--Rchen 18:16, 19 August 2008 (PDT)Could adjacent non blocking streams be scheduled to one worker? I.e. this "block" of streams share one scheduling state and are always assigned a thread together.

--Jvs 13:29, 21 August 2008 (PDT): Currently the unit of scheduling is one ExecStream, but the possibility of scheduling trains of ExecStreams is being discussed; it could help with minimizing contention on scheduler queues
  • boosting the priority of the manager thread seems to help a little bit; need to investigate this further, and if worthwhile, come up with global priority scheme for all threads; but probably that way madness lies
  • need some sequence/statechart diagrams in this doc

Attachments

Personal tools
Product Documentation