Parallelization
Hi...so I know the old parallel processing extension is no longer functional and I understand that you're working on parallelization of other operators. I have a simple problem - I have 50,000 examples which need to send a text attribute through an API one at a time. I need to make the API calls separately as sometimes the operator (Enrich Data by Webservice) will return an error and hence I put it into a Handle Exception operator to keep going. This all works fine but it is SLOW - maybe 1-4 API calls/second? I cannot tell the bottleneck but I would like to parallelize this by splitting the data set into 10 partitions (i.e. 5,000 per partition) and working on all 10 partitions at the same time. See attached process for a general framework (some stuff deleted).
With the old parallel processing extension, this would work fine. But now it is still going through each subprocess (i.e. each partition) one at a time. Ugh. SO I had an idea - could I somehow trick the fancy new Cross Validation operator to basically do the same thing? Thoughts?
Scott
Best Answer
-
JEdward RapidMiner Certified Analyst, RapidMiner Certified Expert, Member Posts: 578 Unicorn
Well okay then Scott.
This will give you a headstart on how I would approach it.
<?xml version="1.0" encoding="UTF-8"?><process version="7.3.000">
<context>
<input/>
<output/>
<macros/>
</context>
<operator activated="true" class="process" compatibility="7.3.000" expanded="true" name="Process">
<process expanded="true">
<operator activated="true" class="generate_data" compatibility="7.3.000" expanded="true" height="68" name="Generate Data" width="90" x="45" y="34"/>
<operator activated="true" class="split_data" compatibility="7.3.000" expanded="true" height="292" name="Split Data" width="90" x="179" y="34">
<enumeration key="partitions">
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
<parameter key="ratio" value="0.1"/>
</enumeration>
</operator>
<operator activated="true" class="collect" compatibility="7.3.000" expanded="true" height="292" name="Collect" width="90" x="313" y="34"/>
<operator activated="true" class="subprocess" compatibility="7.3.000" expanded="true" height="82" name="Save as Collection, Generate FakeModellingDataset" width="90" x="514" y="34">
<process expanded="true">
<operator activated="true" class="generate_direct_mailing_data" compatibility="7.3.000" expanded="true" height="68" name="FakeDataForModelling (2)" width="90" x="45" y="34"/>
<operator activated="true" class="publish_to_app" compatibility="7.3.000" expanded="true" height="68" name="Publish to App" width="90" x="45" y="187">
<parameter key="name" value="DataForAPI"/>
</operator>
<connect from_port="in 1" to_op="Publish to App" to_port="store"/>
<connect from_op="FakeDataForModelling (2)" from_port="output" to_port="out 1"/>
<portSpacing port="source_in 1" spacing="0"/>
<portSpacing port="source_in 2" spacing="0"/>
<portSpacing port="sink_out 1" spacing="0"/>
<portSpacing port="sink_out 2" spacing="0"/>
</process>
</operator>
<operator activated="true" class="concurrency:cross_validation" compatibility="7.3.000" expanded="true" height="145" name="Cross Validation" width="90" x="715" y="34">
<process expanded="true">
<operator activated="true" class="parallel_decision_tree" compatibility="7.3.000" expanded="true" height="82" name="Decision Tree" width="90" x="112" y="34"/>
<operator activated="true" class="generate_macro" compatibility="7.3.000" expanded="true" height="82" name="Generate Macro" width="90" x="112" y="238">
<list key="function_descriptions">
<parameter key="NumberOfLoop" value="eval(%{execution_count})"/>
</list>
</operator>
<operator activated="true" class="recall_from_app" compatibility="7.3.000" expanded="true" height="103" name="Recall from App" width="90" x="246" y="187">
<parameter key="name" value="DataForAPI"/>
</operator>
<operator activated="true" class="select" compatibility="7.3.000" expanded="true" height="68" name="Select" width="90" x="380" y="187">
<parameter key="index" value="%{NumberOfLoop}"/>
</operator>
<operator activated="true" class="publish_to_app" compatibility="7.3.000" expanded="true" height="68" name="Publish to App (2)" width="90" x="447" y="289">
<parameter key="name" value="MyDataSet%{NumberOfLoop}"/>
<description align="center" color="transparent" colored="false" width="126">Before this step you would put your Enhance By WebService Operator. It doesn't have to write to separate datasets, but I thought it's easier for demo purposes</description>
</operator>
<connect from_port="training set" to_op="Decision Tree" to_port="training set"/>
<connect from_op="Decision Tree" from_port="model" to_port="model"/>
<connect from_op="Decision Tree" from_port="exampleSet" to_op="Generate Macro" to_port="through 1"/>
<connect from_op="Generate Macro" from_port="through 1" to_op="Recall from App" to_port="through 1"/>
<connect from_op="Recall from App" from_port="result" to_op="Select" to_port="collection"/>
<connect from_op="Select" from_port="selected" to_op="Publish to App (2)" to_port="store"/>
<portSpacing port="source_training set" spacing="0"/>
<portSpacing port="sink_model" spacing="0"/>
<portSpacing port="sink_through 1" spacing="0"/>
</process>
<process expanded="true">
<operator activated="true" class="apply_model" compatibility="7.3.000" expanded="true" height="82" name="Apply Model" width="90" x="45" y="34">
<list key="application_parameters"/>
</operator>
<operator activated="true" class="performance" compatibility="7.3.000" expanded="true" height="82" name="Performance" width="90" x="179" y="34"/>
<connect from_port="model" to_op="Apply Model" to_port="model"/>
<connect from_port="test set" to_op="Apply Model" to_port="unlabelled data"/>
<connect from_op="Apply Model" from_port="labelled data" to_op="Performance" to_port="labelled data"/>
<connect from_op="Performance" from_port="performance" to_port="performance 1"/>
<portSpacing port="source_model" spacing="0"/>
<portSpacing port="source_test set" spacing="0"/>
<portSpacing port="source_through 1" spacing="0"/>
<portSpacing port="sink_test set results" spacing="0"/>
<portSpacing port="sink_performance 1" spacing="0"/>
<portSpacing port="sink_performance 2" spacing="0"/>
</process>
</operator>
<connect from_op="Generate Data" from_port="output" to_op="Split Data" to_port="example set"/>
<connect from_op="Split Data" from_port="partition 1" to_op="Collect" to_port="input 1"/>
<connect from_op="Split Data" from_port="partition 2" to_op="Collect" to_port="input 2"/>
<connect from_op="Split Data" from_port="partition 3" to_op="Collect" to_port="input 3"/>
<connect from_op="Split Data" from_port="partition 4" to_op="Collect" to_port="input 4"/>
<connect from_op="Split Data" from_port="partition 5" to_op="Collect" to_port="input 5"/>
<connect from_op="Split Data" from_port="partition 6" to_op="Collect" to_port="input 6"/>
<connect from_op="Split Data" from_port="partition 7" to_op="Collect" to_port="input 7"/>
<connect from_op="Split Data" from_port="partition 8" to_op="Collect" to_port="input 8"/>
<connect from_op="Split Data" from_port="partition 9" to_op="Collect" to_port="input 9"/>
<connect from_op="Split Data" from_port="partition 10" to_op="Collect" to_port="input 10"/>
<connect from_op="Split Data" from_port="partition 11" to_op="Collect" to_port="input 11"/>
<connect from_op="Collect" from_port="collection" to_op="Save as Collection, Generate FakeModellingDataset" to_port="in 1"/>
<connect from_op="Save as Collection, Generate FakeModellingDataset" from_port="out 1" to_op="Cross Validation" to_port="example set"/>
<connect from_op="Cross Validation" from_port="model" to_port="result 1"/>
<portSpacing port="source_input 1" spacing="0"/>
<portSpacing port="sink_result 1" spacing="0"/>
<portSpacing port="sink_result 2" spacing="0"/>
<description align="center" color="yellow" colored="false" height="156" resized="false" width="180" x="750" y="237">After running go to View -&gt; App Objects to see all the datasets generated by the parallel execution. Remember, XValidation runs 11 times in the training phase when 10 is entered.</description>
</process>
</operator>
</process>1
Answers
I used to do something similar to this when I needed to query an API which would only give me 1,000 records per request and I needed to extract a much larger number.
My basic process was like this.
You then call multiple RapidMiner processes. Back in the 5.3 days the parallel plugin should have works for this within a loops, but needed a bit of extra thinking on macros, so I didn't really try it.
Instead I ran the above 15 times concurrently on RapidMiner Server + with a couple of RM Studio instances open on my local machine which reduced the time down hugely.
Great idea on abusing the X-Validation operator to do it, but the above method should also work and might be more robust in case of API errors, computer power failures etc. (You can also add timestamps to your API call log Db so that you know when a call was set to InProgress by a process and reset it if it hasn't updated by an expected time).
wow. Thanks, John. That's a lot more complicated than I was hoping. I was making some progress with Cross-Validation and will keep hacking at that.
Scott
That is wicked. I was getting there with the split data and keeping track of loops but you nailed it. THANK YOU. Works perfectly.
Scott