valid_data - Forking for Fun and Profit in Ruby
We recently released valid_data, a gem to check whether the rows in your database are valid according to your ActiveRecord validations. It worked by sequentially scanning your tables in order to check if the model instances were still valid over time. While this is pretty straightforward, we're essentially being naive in the sense that we could actually process some tables in parallel. In this post we'll be discussing the concept and implementation of parallelization that we chose to implement in the valid_data gem. We'll also show how it dramatically improves performance in certain cases over the sequential library.
What are we trying to solve?
The problem that we're trying to solve has some characteristics that we can take advantage of. Taking a closer look at the data, we can break the problem into smaller parts (tables), process them independently and then combine the solutions to show the final result.
Below we distribute models
M5 between 3 buckets.
P3 will then process the models sequentially. First,
P1 will sequentially process
P2 will do the same for
M5, and so on and so forth.
We could even take a step further and have 5 buckets each containing just one model if we know we have enough capacity for that. We'll take a look at that later in the benchmarks section.
In Unix-like operating systems there is a concept called fork where a process can create a copy of itself. This is a very cheap and fast process that can be used to implement parallelization. After forking a process, we have the original parent process and the new copy commonly called child process. We're not limited to create only one child, but as many as we want. In general, most libraries allow users to choose how many children will be forked making easier to customize according to the number of CPUs in their machine. We implemented that by accepting the number of children as an argument for our task:
In the execution above, we ask it to fork 8 child processes so that the processing of models will be distributed between them. This is more or less translated to the following code:
@readers = fork(num_of_processes.to_i, &worker)
@readers variable will hold the communication mechanism between child processes and their parent process. The mechanism by which children can communicate with their parents is known as a "pipe."
reader, writer = IO.pipe
In order to keep the parent process aware of any message sent by their children we create a thread that will periodically check if there is any message from any child's pipe. Here is a simplified version:
Thread.new do loop do io = IO.select(readers.values) ((io && io.first) || ).each do |reader| yield reader end end end
Finally we tell the parent process to wait for its children to finish their processing with:
There's a gotcha here. We have to wait as many times as we forked. In other words, you have to call
Process.wait as many times as you call
Process.fork. That's why we extract the pids (Process IDs) from the readers in order to wait for all children processes. You can find the complete code here.
After implementing parallelization with the concept previously discussed, it's time to measure if it is indeed worth going in this direction. We've designed our schema with the following tables:
Model | Total ------------------------------ SubPage | 22000 StaticPage | 22000 Essay | 40000 News | 40000 Page | 40000 Article | 496687 Post | 572000
Each one one has two columns:
body. You can find the entire schema, benchmark script and machine details here. With the schema above we've run some benchmarks to compare:
- Sequential execution
- Parallel execution with 2 children
- Parallel execution with 5 children
- Parallel execution with 7 children
After 5 iterations we were able to see how the time is decreasing as we increase the number of processes:
user system total real Sequential 555.080000 5.120000 560.200000 (606.937533) Parallel 2x 0.830000 0.150000 359.200000 (236.735275) Parallel 5x 0.820000 0.290000 317.530000 (185.183759) Parallel 7x 0.760000 0.290000 320.680000 (175.603909)
In a benchmark with 10 iterations we see a similar speedup:
user system total real Sequential 1136.120000 9.940000 1146.060000 (1240.820179) Parallel 2x 1.600000 0.340000 643.410000 (425.282860) Parallel 5x 1.630000 0.470000 639.670000 (362.135289) Parallel 7x 1.580000 0.540000 678.880000 (358.012563)
We can see the processing time has dropped around 70% when comparing the sequential execution with parallel execution using 7 child processes. That's really impressive and the reason behind it is pretty straightforward. Before processing the tables we sort them by size so that the bigger tables will be at the end of the list. We take advantage of this and distribute big tables between different processes which in turn allows them to be processed in parallel:
$ ps aux | grep [V]alid jivago 25812 [ValidData-0.0.1] Manager [25838, 25841, 25844, 25847, 25851, 25855, 25859] jivago 25838 [ValidData-0.0.1] Processing ["SubPage"] jivago 25841 [ValidData-0.0.1] Processing ["StaticPage"] jivago 25844 [ValidData-0.0.1] Processing ["Essay"] jivago 25847 [ValidData-0.0.1] Processing ["News"] jivago 25851 [ValidData-0.0.1] Processing ["Page"] jivago 25855 [ValidData-0.0.1] Processing ["Article"] jivago 25859 [ValidData-0.0.1] Processing ["Post"]
We can see above the models' distribution among processes. In this scenario, each model is being examined by a different process. After some time the smaller tables will be finished and we can see the bigger ones still being processed in parallel:
$ ps aux | grep [V]alid jivago 25812 [ValidData-0.0.1] Manager [25855, 25859] jivago 25855 [ValidData-0.0.1] Processing ["Article"] jivago 25859 [ValidData-0.0.1] Processing ["Post"]
We've seen that breaking the problem into smaller parts allowed us to process them independently with parallel execution. Parallelization has dropped the time by 70%. While this schema is not a real production system, chances are you'll probably have much more bigger tables and that's exactly where this approach can be of the most benefit.
As future work it may be worth distributing a gigantic table between processes which in practice would be translated in more than one process checking different parts of the same table. Other scenario worth investigating would be checking how the parallel processing is affected when there are associations between models. In this case, while processing a table we will end up retrieving data from other tables and that may affect the overall speed since now the parts are not truly independent from each other. For scenarios like that figuring out a good distribution of group of tables may help to surpass the dependency problem.
Please let us know if you have any question or ideas to improve the solution or the code. We hope this new approach will bring velocity to you in your daily job!