

The workqueue code currently has a notion of a per-cpu queue being "busy". 
flush_scheduled_work()'s responsibility is to wait for a queue to be not busy.

Problem is, flush_scheduled_work() can easily hang up.

- The workqueue is deemed "busy" when there are pending delayed
  (timer-based) works.  But if someone repeatedly schedules new delayed work
  in the callback, the queue will never fall idle, and flush_scheduled_work()
  will not terminate.

- If someone reschedules work (not delayed work) in the work function, that
  too will cause the queue to never go idle, and flush_scheduled_work() will
  not terminate.

So what this patch does is:

- Create a new "cancel_delayed_work()" which will try to kill off any
  timer-based delayed works.

- Change flush_scheduled_work() so that it is immune to people re-adding
  work in the work callout handler.

  We can do this by recognising that the caller does *not* want to wait
  until the workqueue is "empty".  The caller merely wants to wait until all
  works which were pending at the time flush_scheduled_work() was called have
  completed.

  The patch uses a couple of sequence numbers for that.

So now, if someone wants to reliably remove delayed work they should do:


	/*
	 * Make sure that my work-callback will no longer schedule new work
	 */
	my_driver_is_shutting_down = 1;

	/*
	 * Kill off any pending delayed work
	 */
	cancel_delayed_work(&my_work);

	/*
	 * OK, there will be no new works scheduled.  But there may be one
	 * currently queued or in progress.  So wait for that to complete.
	 */
	flush_scheduled_work();


The patch also changes the flush_workqueue() sleep to be uninterruptible. 
We cannot legally bale out if a signal is delivered anyway.



 include/linux/workqueue.h |   10 +++++
 kernel/workqueue.c        |   83 +++++++++++++++++++++++-----------------------
 2 files changed, 53 insertions(+), 40 deletions(-)

diff -puN kernel/workqueue.c~flush_workqueue-hang-fix kernel/workqueue.c
--- 25/kernel/workqueue.c~flush_workqueue-hang-fix	2003-04-12 16:20:33.000000000 -0700
+++ 25-akpm/kernel/workqueue.c	2003-04-12 16:20:33.000000000 -0700
@@ -27,13 +27,21 @@
 #include <linux/slab.h>
 
 /*
- * The per-CPU workqueue:
+ * The per-CPU workqueue.
+ *
+ * The sequence counters are for flush_scheduled_work().  It wants to wait
+ * until until all currently-scheduled works are completed, but it doesn't
+ * want to be livelocked by new, incoming ones.  So it waits until
+ * remove_sequence is >= the insert_sequence which pertained when
+ * flush_scheduled_work() was called.
  */
 struct cpu_workqueue_struct {
 
 	spinlock_t lock;
 
-	atomic_t nr_queued;
+	long remove_sequence;	/* Least-recently added (next to run) */
+	long insert_sequence;	/* Next to add */
+
 	struct list_head worklist;
 	wait_queue_head_t more_work;
 	wait_queue_head_t work_done;
@@ -71,10 +79,9 @@ int queue_work(struct workqueue_struct *
 
 		spin_lock_irqsave(&cwq->lock, flags);
 		list_add_tail(&work->entry, &cwq->worklist);
-		atomic_inc(&cwq->nr_queued);
-		spin_unlock_irqrestore(&cwq->lock, flags);
-
+		cwq->insert_sequence++;
 		wake_up(&cwq->more_work);
+		spin_unlock_irqrestore(&cwq->lock, flags);
 		ret = 1;
 	}
 	put_cpu();
@@ -93,11 +100,13 @@ static void delayed_work_timer_fn(unsign
 	 */
 	spin_lock_irqsave(&cwq->lock, flags);
 	list_add_tail(&work->entry, &cwq->worklist);
+	cwq->insert_sequence++;
 	wake_up(&cwq->more_work);
 	spin_unlock_irqrestore(&cwq->lock, flags);
 }
 
-int queue_delayed_work(struct workqueue_struct *wq, struct work_struct *work, unsigned long delay)
+int queue_delayed_work(struct workqueue_struct *wq,
+			struct work_struct *work, unsigned long delay)
 {
 	int ret = 0, cpu = get_cpu();
 	struct timer_list *timer = &work->timer;
@@ -107,18 +116,11 @@ int queue_delayed_work(struct workqueue_
 		BUG_ON(timer_pending(timer));
 		BUG_ON(!list_empty(&work->entry));
 
-		/*
-		 * Increase nr_queued so that the flush function
-		 * knows that there's something pending.
-		 */
-		atomic_inc(&cwq->nr_queued);
 		work->wq_data = cwq;
-
 		timer->expires = jiffies + delay;
 		timer->data = (unsigned long)work;
 		timer->function = delayed_work_timer_fn;
 		add_timer(timer);
-
 		ret = 1;
 	}
 	put_cpu();
@@ -135,7 +137,8 @@ static inline void run_workqueue(struct 
 	 */
 	spin_lock_irqsave(&cwq->lock, flags);
 	while (!list_empty(&cwq->worklist)) {
-		struct work_struct *work = list_entry(cwq->worklist.next, struct work_struct, entry);
+		struct work_struct *work = list_entry(cwq->worklist.next,
+						struct work_struct, entry);
 		void (*f) (void *) = work->func;
 		void *data = work->data;
 
@@ -146,14 +149,9 @@ static inline void run_workqueue(struct 
 		clear_bit(0, &work->pending);
 		f(data);
 
-		/*
-		 * We only wake up 'work done' waiters (flush) when
-		 * the last function has been fully processed.
-		 */
-		if (atomic_dec_and_test(&cwq->nr_queued))
-			wake_up(&cwq->work_done);
-
 		spin_lock_irqsave(&cwq->lock, flags);
+		cwq->remove_sequence++;
+		wake_up(&cwq->work_done);
 	}
 	spin_unlock_irqrestore(&cwq->lock, flags);
 }
@@ -223,37 +221,41 @@ static int worker_thread(void *__startup
  * Forces execution of the workqueue and blocks until its completion.
  * This is typically used in driver shutdown handlers.
  *
- * NOTE: if work is being added to the queue constantly by some other
- * context then this function might block indefinitely.
+ * This function will sample each workqueue's current insert_sequence number and
+ * will sleep until the head sequence is greater than or equal to that.  This
+ * means that we sleep until all works which were queued on entry have been
+ * handled, but we are not livelocked by new incoming ones.
+ *
+ * This function used to run the workqueues itself.  Now we just wait for the
+ * helper threads to do it.
  */
 void flush_workqueue(struct workqueue_struct *wq)
 {
 	struct cpu_workqueue_struct *cwq;
 	int cpu;
 
+	might_sleep();
+
 	for (cpu = 0; cpu < NR_CPUS; cpu++) {
+		DEFINE_WAIT(wait);
+		long sequence_needed;
+
 		if (!cpu_online(cpu))
 			continue;
 		cwq = wq->cpu_wq + cpu;
 
-		if (atomic_read(&cwq->nr_queued)) {
-			DECLARE_WAITQUEUE(wait, current);
+		spin_lock_irq(&cwq->lock);
+		sequence_needed = cwq->insert_sequence;
 
-			if (!list_empty(&cwq->worklist))
-				run_workqueue(cwq);
-
-			/*
-			 * Wait for helper thread(s) to finish up
-			 * the queue:
-			 */
-			set_task_state(current, TASK_INTERRUPTIBLE);
-			add_wait_queue(&cwq->work_done, &wait);
-			if (atomic_read(&cwq->nr_queued))
-				schedule();
-			else
-				set_task_state(current, TASK_RUNNING);
-			remove_wait_queue(&cwq->work_done, &wait);
+		while (sequence_needed - cwq->remove_sequence > 0) {
+			prepare_to_wait(&cwq->work_done, &wait,
+					TASK_UNINTERRUPTIBLE);
+			spin_unlock_irq(&cwq->lock);
+			schedule();
+			spin_lock_irq(&cwq->lock);
 		}
+		finish_wait(&cwq->work_done, &wait);
+		spin_unlock_irq(&cwq->lock);
 	}
 }
 
@@ -279,7 +281,8 @@ struct workqueue_struct *create_workqueu
 		spin_lock_init(&cwq->lock);
 		cwq->wq = wq;
 		cwq->thread = NULL;
-		atomic_set(&cwq->nr_queued, 0);
+		cwq->insert_sequence = 0;
+		cwq->remove_sequence = 0;
 		INIT_LIST_HEAD(&cwq->worklist);
 		init_waitqueue_head(&cwq->more_work);
 		init_waitqueue_head(&cwq->work_done);
diff -puN include/linux/workqueue.h~flush_workqueue-hang-fix include/linux/workqueue.h
--- 25/include/linux/workqueue.h~flush_workqueue-hang-fix	2003-04-12 16:20:33.000000000 -0700
+++ 25-akpm/include/linux/workqueue.h	2003-04-12 16:20:33.000000000 -0700
@@ -63,5 +63,15 @@ extern int current_is_keventd(void);
 
 extern void init_workqueues(void);
 
+/*
+ * Kill off a pending schedule_delayed_work().  Note that the work callback
+ * function may still be running on return from cancel_delayed_work().  Run
+ * flush_scheduled_work() to wait on it.
+ */
+static inline int cancel_delayed_work(struct work_struct *work)
+{
+	return del_timer_sync(&work->timer);
+}
+
 #endif
 

_
