Wednesday 17 April 2019

arcpy - How to implement multiprocessing with multiple geoprocesses?


I am relatively new to Python and thought I would give a stab at multiprocessing. I have a script that runs well in IDLE or as an ArcMap toolbox script. After perusing these forums and docs.python, I made an attempt at incorporating my working script into a multiprocessing one. However similar working examples on this forum are, none address data processing as I would like to. I hope it is feasible.



Basically, the script moves through a list of elevation rasters (ERDAS IMG format), extracting cells below a threshold, and finally merging them together. I am currently running the script in the command prompt, as everything else opens new windows, or crashes in an attempt to. The script gives the illusion that it works fine, except it seems to move on to the final merge before waiting for the workers to completely finish.


I have looked at several examples and few seem to have more than a couple of processes in the worker function. None of which are arcpy geoprocesses. So I guess my questions are essentially 1) Should I be using something other than pool.apply_async, such as pool.map or pool.apply? 2) Am I properly returning the path of the final polygon to the resultList?


Any criticism is welcome and greatly appreciated. Thank you in advance.


# Import modules
import arcpy, os, math
from arcpy import env
from arcpy.sa import *
import multiprocessing
import time


# Check out licenses
arcpy.CheckOutExtension("spatial")

# Define functions
def worker_bee(inputRaster, scratch, addNum):
(path, lName) = os.path.split(inputRaster)
(sName, ext) = os.path.splitext(lName)
nameParts = sName.split("_")
nameNumber = nameParts[-1]


# Create scratch subfolder if not exists
subFolder = scratch + "\\" + nameNumber + "_output"
if not os.path.exists(subFolder):os.makedirs(subFolder)
# Set workspace to subfolder
arcpy.env.workspace = subFolder
arcpy.env.overwriteOutput=True
arcpy.env.extent = "MAXOF"

# Local Variables
Expression = "Shape_Area >= 100"


poly1 = subFolder + "\\poly1.shp"
poly2 = subFolder + "\\poly2.shp"
poly3 = subFolder + "\\poly3.shp"
poly4 = subFolder + "\\poly4.shp"
poly5 = subFolder + "\\poly5.shp"
poly6 = subFolder + "\\poly6.shp"
poly7 = subFolder + "\\poly7.shp"
outName = scratch + "\\ABL_" + nameNumber + ".shp"


#### Perform calculations ###
# Map Algebra (replace -9999 with 9999)
inRasterCon = Con(inputRaster, 9999, inputRaster, "Value = -9999")
# Filter DEM to smooth out low outliers
filterOut = Filter(inRasterCon, "LOW", "DATA")
# Determine raster MINIMUM value and calculate threshold
filterMinResult = arcpy.GetRasterProperties_management(filterOut, "MINIMUM")
filterMin = filterMinResult.getOutput(0)
threshold = (float(filterMin) + float(addNum))
# Map Algebra (values under threshold)

outCon = Con(filterOut <= threshold, 1, "")
arcpy.RasterToPolygon_conversion(outCon, poly1, "SIMPLIFY", "Value")
# Dissolve parts
arcpy.Dissolve_management(poly1, poly2, "", "", "SINGLE_PART", "DISSOLVE_LINES")
# Select parts larger than 100 sq m
arcpy.Select_analysis(poly2, poly3, Expression)
# Process: Eliminate Polygon Part
arcpy.EliminatePolygonPart_management(poly4, poly5, "PERCENT", "0 SquareMeters", "10", "CONTAINED_ONLY")
# Select parts larget than 100 sq m
arcpy.Select_analysis(poly5, poly6, Expression)

# Simplify Polygon
arcpy.SimplifyPolygon_cartography(poly6, poly7, "BEND_SIMPLIFY", "3 Meters", "3000 SquareMeters", "RESOLVE_ERRORS", "KEEP_COLLAPSED_POINTS")
# Smooth Polygon
outShape = arcpy.SmoothPolygon_cartography(poly7, outName, "PAEK", "3 Meters", "FIXED_ENDPOINT", "FLAG_ERRORS").getOutput(0)
### Calculations complete ###

# Delete scratch subfolder
arcpy.Delete_management(subFolder)

print("Completed " + outShape + "...")

return outShape

resultList = []
def log_result(result):
resultList.append(result)

if __name__ == "__main__":
arcpy.env.overwriteOutput=True

# Read in parameters

inFolder = raw_input("Input Folder: ")#arcpy.GetParameterAsText(0)
addElev = raw_input("Number of elevation units to add to minimum: ")

# Create scratch folder workspace
scratchFolder = inFolder + "\\scratch"
if not os.path.exists(scratchFolder):os.makedirs(scratchFolder)

# Local variables
dec_num = str(float(addElev) - int(float(addElev)))[1:]
outNameNum = dec_num.replace(".", "")

outMerge = inFolder + "\\ABL_" + outNameNum + ".shp"

# Print core usage
cores = multiprocessing.cpu_count()
print("Using " + str(cores) + " cores...")

#Start timing
start = time.clock()

# List input tiles

arcpy.env.workspace = inFolder
inTiles = arcpy.ListRasters("*", "IMG")
tileList = []
for tile in inTiles:
tileList.append(inFolder + "\\" + tile)

# Create a Pool of subprocesses
pool = multiprocessing.Pool(cores)

print("Adding jobs to multiprocessing pool...")

for tile in tileList:
# Add the job to the multiprocessing pool asynchronously
pool.apply_async(worker_bee, (tile, scratchFolder, addElev), callback = log_result)

# Clean up worker pool; waits for all jobs to finish
pool.close()
pool.join()

# Get the resulting outputs (paths to successfully computed breakline polygons)
#print("Getting resulting outputs...")

#results = [job.get() for job in jobs]

# Merge the temporary outputs
print("Merging temporary outputs into shapefile " + outMerge + "...")
arcpy.Merge_management(resultList, outMerge)

# Clean up temporary data
print("Deleting temporary data ...")
for result in results:
try:

arcpy.Delete_management(result)
except:
pass

# Stop timing and report duration
end = time.clock()
duration = end - start
hours, remainder = divmod(duration, 3600)
minutes, seconds = divmod(remainder, 60)
print("Completed in %d:%d:%f" % (hours, minutes, seconds))



No comments:

Post a Comment

arcpy - Changing output name when exporting data driven pages to JPG?

Is there a way to save the output JPG, changing the output file name to the page name, instead of page number? I mean changing the script fo...