Parallelization

sgenzersgenzer Administrator, Moderator, Employee, RapidMiner Certified Analyst, Community Manager, Member, University Professor, PM Moderator Posts: 2,959 Community Manager
edited November 2018 in Help

Hi...so I know the old parallel processing extension is no longer functional and I understand that you're working on parallelization of other operators.  I have a simple problem - I have 50,000 examples which need to send a text attribute through an API one at a time.  I need to make the API calls separately as sometimes the operator (Enrich Data by Webservice) will return an error and hence I put it into a Handle Exception operator to keep going.  This all works fine but it is SLOW - maybe 1-4 API calls/second?  I cannot tell the bottleneck but I would like to parallelize this by splitting the data set into 10 partitions (i.e. 5,000 per partition) and working on all 10 partitions at the same time.  See attached process for a general framework (some stuff deleted).

 

With the old parallel processing extension, this would work fine.  But now it is still going through each subprocess (i.e. each partition) one at a time.  Ugh.  SO I had an idea - could I somehow trick the fancy new Cross Validation operator to basically do the same thing?  Thoughts?

 

Scott

foo.rmp 70.2K

Best Answer

  • JEdwardJEdward RapidMiner Certified Analyst, RapidMiner Certified Expert, Member Posts: 578 Unicorn
    Solution Accepted

    Well okay then Scott. 

    This will give you a headstart on how I would approach it. 

     

    <?xml version="1.0" encoding="UTF-8"?><process version="7.3.000">
    <context>
    <input/>
    <output/>
    <macros/>
    </context>
    <operator activated="true" class="process" compatibility="7.3.000" expanded="true" name="Process">
    <process expanded="true">
    <operator activated="true" class="generate_data" compatibility="7.3.000" expanded="true" height="68" name="Generate Data" width="90" x="45" y="34"/>
    <operator activated="true" class="split_data" compatibility="7.3.000" expanded="true" height="292" name="Split Data" width="90" x="179" y="34">
    <enumeration key="partitions">
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    <parameter key="ratio" value="0.1"/>
    </enumeration>
    </operator>
    <operator activated="true" class="collect" compatibility="7.3.000" expanded="true" height="292" name="Collect" width="90" x="313" y="34"/>
    <operator activated="true" class="subprocess" compatibility="7.3.000" expanded="true" height="82" name="Save as Collection, Generate FakeModellingDataset" width="90" x="514" y="34">
    <process expanded="true">
    <operator activated="true" class="generate_direct_mailing_data" compatibility="7.3.000" expanded="true" height="68" name="FakeDataForModelling (2)" width="90" x="45" y="34"/>
    <operator activated="true" class="publish_to_app" compatibility="7.3.000" expanded="true" height="68" name="Publish to App" width="90" x="45" y="187">
    <parameter key="name" value="DataForAPI"/>
    </operator>
    <connect from_port="in 1" to_op="Publish to App" to_port="store"/>
    <connect from_op="FakeDataForModelling (2)" from_port="output" to_port="out 1"/>
    <portSpacing port="source_in 1" spacing="0"/>
    <portSpacing port="source_in 2" spacing="0"/>
    <portSpacing port="sink_out 1" spacing="0"/>
    <portSpacing port="sink_out 2" spacing="0"/>
    </process>
    </operator>
    <operator activated="true" class="concurrency:cross_validation" compatibility="7.3.000" expanded="true" height="145" name="Cross Validation" width="90" x="715" y="34">
    <process expanded="true">
    <operator activated="true" class="parallel_decision_tree" compatibility="7.3.000" expanded="true" height="82" name="Decision Tree" width="90" x="112" y="34"/>
    <operator activated="true" class="generate_macro" compatibility="7.3.000" expanded="true" height="82" name="Generate Macro" width="90" x="112" y="238">
    <list key="function_descriptions">
    <parameter key="NumberOfLoop" value="eval(%{execution_count})"/>
    </list>
    </operator>
    <operator activated="true" class="recall_from_app" compatibility="7.3.000" expanded="true" height="103" name="Recall from App" width="90" x="246" y="187">
    <parameter key="name" value="DataForAPI"/>
    </operator>
    <operator activated="true" class="select" compatibility="7.3.000" expanded="true" height="68" name="Select" width="90" x="380" y="187">
    <parameter key="index" value="%{NumberOfLoop}"/>
    </operator>
    <operator activated="true" class="publish_to_app" compatibility="7.3.000" expanded="true" height="68" name="Publish to App (2)" width="90" x="447" y="289">
    <parameter key="name" value="MyDataSet%{NumberOfLoop}"/>
    <description align="center" color="transparent" colored="false" width="126">Before this step you would put your Enhance By WebService Operator. It doesn't have to write to separate datasets, but I thought it's easier for demo purposes</description>
    </operator>
    <connect from_port="training set" to_op="Decision Tree" to_port="training set"/>
    <connect from_op="Decision Tree" from_port="model" to_port="model"/>
    <connect from_op="Decision Tree" from_port="exampleSet" to_op="Generate Macro" to_port="through 1"/>
    <connect from_op="Generate Macro" from_port="through 1" to_op="Recall from App" to_port="through 1"/>
    <connect from_op="Recall from App" from_port="result" to_op="Select" to_port="collection"/>
    <connect from_op="Select" from_port="selected" to_op="Publish to App (2)" to_port="store"/>
    <portSpacing port="source_training set" spacing="0"/>
    <portSpacing port="sink_model" spacing="0"/>
    <portSpacing port="sink_through 1" spacing="0"/>
    </process>
    <process expanded="true">
    <operator activated="true" class="apply_model" compatibility="7.3.000" expanded="true" height="82" name="Apply Model" width="90" x="45" y="34">
    <list key="application_parameters"/>
    </operator>
    <operator activated="true" class="performance" compatibility="7.3.000" expanded="true" height="82" name="Performance" width="90" x="179" y="34"/>
    <connect from_port="model" to_op="Apply Model" to_port="model"/>
    <connect from_port="test set" to_op="Apply Model" to_port="unlabelled data"/>
    <connect from_op="Apply Model" from_port="labelled data" to_op="Performance" to_port="labelled data"/>
    <connect from_op="Performance" from_port="performance" to_port="performance 1"/>
    <portSpacing port="source_model" spacing="0"/>
    <portSpacing port="source_test set" spacing="0"/>
    <portSpacing port="source_through 1" spacing="0"/>
    <portSpacing port="sink_test set results" spacing="0"/>
    <portSpacing port="sink_performance 1" spacing="0"/>
    <portSpacing port="sink_performance 2" spacing="0"/>
    </process>
    </operator>
    <connect from_op="Generate Data" from_port="output" to_op="Split Data" to_port="example set"/>
    <connect from_op="Split Data" from_port="partition 1" to_op="Collect" to_port="input 1"/>
    <connect from_op="Split Data" from_port="partition 2" to_op="Collect" to_port="input 2"/>
    <connect from_op="Split Data" from_port="partition 3" to_op="Collect" to_port="input 3"/>
    <connect from_op="Split Data" from_port="partition 4" to_op="Collect" to_port="input 4"/>
    <connect from_op="Split Data" from_port="partition 5" to_op="Collect" to_port="input 5"/>
    <connect from_op="Split Data" from_port="partition 6" to_op="Collect" to_port="input 6"/>
    <connect from_op="Split Data" from_port="partition 7" to_op="Collect" to_port="input 7"/>
    <connect from_op="Split Data" from_port="partition 8" to_op="Collect" to_port="input 8"/>
    <connect from_op="Split Data" from_port="partition 9" to_op="Collect" to_port="input 9"/>
    <connect from_op="Split Data" from_port="partition 10" to_op="Collect" to_port="input 10"/>
    <connect from_op="Split Data" from_port="partition 11" to_op="Collect" to_port="input 11"/>
    <connect from_op="Collect" from_port="collection" to_op="Save as Collection, Generate FakeModellingDataset" to_port="in 1"/>
    <connect from_op="Save as Collection, Generate FakeModellingDataset" from_port="out 1" to_op="Cross Validation" to_port="example set"/>
    <connect from_op="Cross Validation" from_port="model" to_port="result 1"/>
    <portSpacing port="source_input 1" spacing="0"/>
    <portSpacing port="sink_result 1" spacing="0"/>
    <portSpacing port="sink_result 2" spacing="0"/>
    <description align="center" color="yellow" colored="false" height="156" resized="false" width="180" x="750" y="237">After running go to View -&amp;gt; App Objects to see all the datasets generated by the parallel execution. Remember, XValidation runs 11 times in the training phase when 10 is entered.</description>
    </process>
    </operator>
    </process>

Answers

  • JEdwardJEdward RapidMiner Certified Analyst, RapidMiner Certified Expert, Member Posts: 578 Unicorn

    I used to do something similar to this when I needed to query an API which would only give me 1,000 records per request and I needed to extract a much larger number. 

    My basic process was like this.

     

    1. Create a database to track all the calls that you want to make to the API.  Ensure there is an area to make with the following statuses
      • Completed
      • InProgress
      • Pending
      • Failed
    2. In your process read from this database and filter to records that are pending and select one of them. 
      • I recommend rules around the selection of the record to process so that you can ensure there are only a limited number of conflicts.  Strategies include: random selection & rule based selection (rule would be based on the number of process threads you are running).
    3. Update the status to 'InProgress'
    4. Make the API call. 
      • Make sure to handle failing API calls smoothly so that they are either marked with a status change of 'Failed' or just moved back to Pending to be picked up by another thread later.
    5. Import result into targetted destination, ensuring there is deduplication in case of multiple API calls to same record.
    6. On success update the status to 'Completed'
    7. Return to Step 2.

    You then call multiple RapidMiner processes.  Back in the 5.3 days the parallel plugin should have works for this within a loops, but needed a bit of extra thinking on macros, so I didn't really try it. 
    Instead I ran the above 15 times concurrently on RapidMiner Server + with a couple of RM Studio instances open on my local machine which reduced the time down hugely. 

     

    Great idea on abusing the X-Validation operator to do it, but the above method should also work and might be more robust in case of API errors, computer power failures etc.  (You can also add timestamps to your API call log Db so that you know when a call was set to InProgress by a process and reset it if it hasn't updated by an expected time).

     

  • sgenzersgenzer Administrator, Moderator, Employee, RapidMiner Certified Analyst, Community Manager, Member, University Professor, PM Moderator Posts: 2,959 Community Manager

    wow.  Thanks, John.  That's a lot more complicated than I was hoping.  I was making some progress with Cross-Validation and will keep hacking at that.

     

    Scott

     

  • sgenzersgenzer Administrator, Moderator, Employee, RapidMiner Certified Analyst, Community Manager, Member, University Professor, PM Moderator Posts: 2,959 Community Manager

    That is wicked.  I was getting there with the split data and keeping track of loops but you nailed it.  THANK YOU.  Works perfectly.

     

    Scott

Sign In or Register to comment.