Source
49
49
mempool_t job_pool;
50
50
51
51
struct workqueue_struct *kcopyd_wq;
52
52
struct work_struct kcopyd_work;
53
53
54
54
struct dm_kcopyd_throttle *throttle;
55
55
56
56
atomic_t nr_jobs;
57
57
58
58
/*
59
-
* We maintain three lists of jobs:
59
+
* We maintain four lists of jobs:
60
60
*
61
61
* i) jobs waiting for pages
62
62
* ii) jobs that have pages, and are waiting for the io to be issued.
63
-
* iii) jobs that have completed.
63
+
* iii) jobs that don't need to do any IO and just run a callback
64
+
* iv) jobs that have completed.
64
65
*
65
-
* All three of these are protected by job_lock.
66
+
* All four of these are protected by job_lock.
66
67
*/
67
68
spinlock_t job_lock;
69
+
struct list_head callback_jobs;
68
70
struct list_head complete_jobs;
69
71
struct list_head io_jobs;
70
72
struct list_head pages_jobs;
71
73
};
72
74
73
75
static struct page_list zero_page_list;
74
76
75
77
static DEFINE_SPINLOCK(throttle_spinlock);
76
78
77
79
/*
618
620
}
619
621
620
622
/*
621
623
* kcopyd does this every time it's woken up.
622
624
*/
623
625
static void do_work(struct work_struct *work)
624
626
{
625
627
struct dm_kcopyd_client *kc = container_of(work,
626
628
struct dm_kcopyd_client, kcopyd_work);
627
629
struct blk_plug plug;
630
+
unsigned long flags;
628
631
629
632
/*
630
633
* The order that these are called is *very* important.
631
634
* complete jobs can free some pages for pages jobs.
632
635
* Pages jobs when successful will jump onto the io jobs
633
636
* list. io jobs call wake when they complete and it all
634
637
* starts again.
635
638
*/
639
+
spin_lock_irqsave(&kc->job_lock, flags);
640
+
list_splice_tail_init(&kc->callback_jobs, &kc->complete_jobs);
641
+
spin_unlock_irqrestore(&kc->job_lock, flags);
642
+
636
643
blk_start_plug(&plug);
637
644
process_jobs(&kc->complete_jobs, kc, run_complete_job);
638
645
process_jobs(&kc->pages_jobs, kc, run_pages_job);
639
646
process_jobs(&kc->io_jobs, kc, run_io_job);
640
647
blk_finish_plug(&plug);
641
648
}
642
649
643
650
/*
644
651
* If we are copying a small region we just dispatch a single job
645
652
* to do the copy, otherwise the io has to be split up into many
646
653
* jobs.
647
654
*/
648
655
static void dispatch_job(struct kcopyd_job *job)
649
656
{
650
657
struct dm_kcopyd_client *kc = job->kc;
651
658
atomic_inc(&kc->nr_jobs);
652
659
if (unlikely(!job->source.count))
653
-
push(&kc->complete_jobs, job);
660
+
push(&kc->callback_jobs, job);
654
661
else if (job->pages == &zero_page_list)
655
662
push(&kc->io_jobs, job);
656
663
else
657
664
push(&kc->pages_jobs, job);
658
665
wake(kc);
659
666
}
660
667
661
668
static void segment_complete(int read_err, unsigned long write_err,
662
669
void *context)
663
670
{
851
858
EXPORT_SYMBOL(dm_kcopyd_prepare_callback);
852
859
853
860
void dm_kcopyd_do_callback(void *j, int read_err, unsigned long write_err)
854
861
{
855
862
struct kcopyd_job *job = j;
856
863
struct dm_kcopyd_client *kc = job->kc;
857
864
858
865
job->read_err = read_err;
859
866
job->write_err = write_err;
860
867
861
-
push(&kc->complete_jobs, job);
868
+
push(&kc->callback_jobs, job);
862
869
wake(kc);
863
870
}
864
871
EXPORT_SYMBOL(dm_kcopyd_do_callback);
865
872
866
873
/*
867
874
* Cancels a kcopyd job, eg. someone might be deactivating a
868
875
* mirror.
869
876
*/
870
877
#if 0
871
878
int kcopyd_cancel(struct kcopyd_job *job, int block)
881
888
struct dm_kcopyd_client *dm_kcopyd_client_create(struct dm_kcopyd_throttle *throttle)
882
889
{
883
890
int r;
884
891
struct dm_kcopyd_client *kc;
885
892
886
893
kc = kzalloc(sizeof(*kc), GFP_KERNEL);
887
894
if (!kc)
888
895
return ERR_PTR(-ENOMEM);
889
896
890
897
spin_lock_init(&kc->job_lock);
898
+
INIT_LIST_HEAD(&kc->callback_jobs);
891
899
INIT_LIST_HEAD(&kc->complete_jobs);
892
900
INIT_LIST_HEAD(&kc->io_jobs);
893
901
INIT_LIST_HEAD(&kc->pages_jobs);
894
902
kc->throttle = throttle;
895
903
896
904
r = mempool_init_slab_pool(&kc->job_pool, MIN_JOBS, _job_cache);
897
905
if (r)
898
906
goto bad_slab;
899
907
900
908
INIT_WORK(&kc->kcopyd_work, do_work);
932
940
933
941
return ERR_PTR(r);
934
942
}
935
943
EXPORT_SYMBOL(dm_kcopyd_client_create);
936
944
937
945
void dm_kcopyd_client_destroy(struct dm_kcopyd_client *kc)
938
946
{
939
947
/* Wait for completion of all jobs submitted by this client. */
940
948
wait_event(kc->destroyq, !atomic_read(&kc->nr_jobs));
941
949
950
+
BUG_ON(!list_empty(&kc->callback_jobs));
942
951
BUG_ON(!list_empty(&kc->complete_jobs));
943
952
BUG_ON(!list_empty(&kc->io_jobs));
944
953
BUG_ON(!list_empty(&kc->pages_jobs));
945
954
destroy_workqueue(kc->kcopyd_wq);
946
955
dm_io_client_destroy(kc->io_client);
947
956
client_free_pages(kc);
948
957
mempool_exit(&kc->job_pool);
949
958
kfree(kc);
950
959
}
951
960
EXPORT_SYMBOL(dm_kcopyd_client_destroy);