This post is a plea for help. I’ve made something pretty cool, in a commercial context, that’s small and contained and useful and which I want to open source. I’ve got no experience in successfully creating open sourced code that other people might want to interact with, so I’m looking for guidance from someone who does, particularly in the Python and/or AppEngine communities.
AppEngine is a great platform, one I’ve been working in now, using Python, for a couple of years.
One of the ongoing annoyances for me has been the low-level nature of the abstractions available for distributed programming, particularly using task queues to kick off background tasks (which might want to kick off more background tasks and etc).
At the most basic, you can create tasks on a queue. The queue is processed, the task runs eventually. The task is a web handler (a “get” handler iirc). Simple, but it’s messy to set up if you want to do complex things, lots of boilerplate, lots of messing around with routes and so on.
Then there’s the excellent deferred library. It allows you to kick off a function with some arguments in a task, but hiding all of the task queue messiness. It makes tasks highly useful and usable. But there are still niggles.
Firstly, deferred functions are passed by name. Actually, it’s more complex than this; deferred takes a callable, which might be an object (must be picklable, is pickled), or a function (passed by name for the most part I think, maybe something else going on with built in functions). But in any case we have the following restrictions in the doc:
The following callables can NOT be used as tasks:
1) Nested functions or closures
2) Nested classes or objects of them
3) Lambda functions
4) Static methods
ie: you can’t use really interesting stuff.
The second problem is that you launch a process, but then what? Does it ever execute? How do you know when it’s complete? How can you compose these calls?
As a higher level alternative to using deferred, I’ve made a library that provides distributed promises.
It lets you do things like this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def MyLongRunningFunction(resolve): | |
try: | |
# we're in a separate task. Do something long running! | |
# eventually call resolve(result), with whatever result you made | |
except Exception, ex: | |
resolve(ex) | |
def MyFunctionToRunAfterThat(result): | |
try: | |
value = result.value | |
# we now have the value passed to resolve. Do something with it. | |
except Exception, ex: | |
# if resolve() was passed an exception, it'll arrive here when | |
# we access result.value | |
logging.exception("failure running long running thing") | |
PromiseSpace().when(MyLongRunningFunction).then(MyFunctionToRunAfterThat) |
The interesting thing here is that both the functions are run in (separate) background tasks. If the first one times out, the second one will receive a timeout error as the result.
You’ll recognise promises more or less from the javascript world. These are similar, but structured a little differently.
Javascript’s promises are used for handling an asynchronous environment inside a single process. So you can kick off something asynchronous, and have the handler for that be a closure over the resolve method, meaning you can signal the job is done in a finished handler, without needing to “return” from the originating function.
For python on appengine, if you want to manage asynchronous behaviour inside a single process, try ndb tasklets.
My distributed promises for appengine are instead for managing distributed processing; that is, algorithms coordination multiple tasks to do something.
The function you pass to your promise (via when, then, etc) will run in a separate task, and can signal it is done by calling resolve() which will trigger further tasks (eg: as above) if necessary.
Also, to make promises really work and be interesting, I’ve included the ability to pass inner functions and closures and so on through to the promises. So while defer takes only callables that are picklable or referenceable by name, promises take functions (must be functions, not other callables) which are completely serialised with their entire closure context as necessary, and reconstructed in the task in which they will run. So you can do things like this:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def DoABigThing(aListOfNumbers, callWhenSucceededF, callWhenFailedF): | |
def DoTheThing(resolve, initialValue): | |
theSum = sum(aListOfNumbers) + initialValue | |
theSum += # also get some stuff from a webservice or datastore or something else slow? | |
resolve(theSum) | |
def DoneTheThing(result): | |
try: | |
callWhenSucceededF(result.value) | |
except Exception, ex: | |
callWhenFailedF(ex) | |
PromiseSpace().when(DoTheThing, 47).then(DoneTheThing) |
Note the functions are referring to names from outside their own scope. Full closures are allowed. Also you can pass lambda functions.
The when() function returns a promise. The then() function returns a promise. There’s an all() function which also returns a promise (as well as variations such as allwhen and allthen). These functions are completely chainable.
The all() functions allow you to do something when a bunch of other things finish, ie:
promiseSpace.all([promises...]).then(somethingfinal)
Python has great exception handling, so I’ve dispensed with Javascript’s separate resolve and reject methods; to reject, just pass an exception to resolve. Also dispensed with are separate success / fail handlers; instead you get a result object with a value property, which throws the exception if the result was actually an exception.
Here’s a more red-blooded example of a function for mapping over the datastore, and example of using that to make a count function, and an example of calling that function:
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is a generic map function for the datastore. | |
# Suitable for passing to the PromiseSpace.when() method. | |
# outerresolve: this function is called when the mapping is complete, with the full result. | |
# promiseSpace: a promise space, to be used for making promises. | |
# dsQuery: an ndb datastore query. | |
# mapF: an optional map function. Takes an object or key (depending on keyOnly) and returns something. | |
# reduceF: an optional reduce function. Takes a list of results of mapping and a previous return value | |
# from reduceF, and returns a new value | |
# keyOnly: determines whether the queries fetch objects or just keys | |
# pageSize: size of the pages fetched from the query | |
# returns: | |
# if no mapF, no reduceF: | |
# returns a list of items or keys fetched from the query. Be careful not to get too many! | |
# if has mapF but no reduceF: | |
# returns a list of results from mapF. Be careful not to get too many! | |
# if has no mapF, but has reduceF: | |
# returns last result returned by calling reduceF on lists of keys/items | |
# if has both mapF and reduceF: | |
# returns last result returned by calling reduceF on lists of mapF results | |
def MapDS(outerresolve, promiseSpace, dsQuery, mapF = None, reduceF = None, keyOnly = False, pageSize = 100): | |
def ProcessPage(resolve, cumulativeResult, cursor): | |
try: | |
if cursor: | |
litems, lnextCursor, lmore = dsQuery.fetch_page(pageSize, start_cursor = cursor, keys_only = keyOnly) | |
else: | |
litems, lnextCursor, lmore = dsQuery.fetch_page(pageSize, keys_only = keyOnly) | |
llocalResult = (mapF(litem) if mapF else litem for litem in litems) | |
if reduceF: | |
resolve((reduceF(llocalResult, cumulativeResult), lmore, lnextCursor)) | |
else: | |
if cumulativeResult is None: | |
cumulativeResult = llocalResult | |
else: | |
cumulativeResult.extend(llocalResult) | |
resolve((cumulativeResult, lmore, lnextCursor)) | |
except Exception, ex: | |
logging.exception("error in ProcessPage") | |
resolve(ex) | |
def NextPage(result): | |
try: | |
lcumulativeResult, lmore, lcursor = result.value | |
if not lmore: | |
outerresolve(lcumulativeResult) | |
else: | |
lpromise = promiseSpace.when(ProcessPage, lcumulativeResult, lcursor) | |
lpromise.then(NextPage) | |
except Exception, ex: | |
logging.exception("error in NextPage") | |
outerresolve(ex) | |
lpromise = promiseSpace.when(ProcessPage, None, None) | |
lpromise.then(NextPage) | |
# This is a generic counting function, which can count arbitrarily large data sets. | |
# Suitable for passing to the PromiseSpace.when() method. | |
# outerresolve: this function is called when the counting is complete, with the count. | |
# promiseSpace: a promise space, to be used for making promises. | |
# dsQuery: an ndb datastore query to count elements of | |
# progressF: if present, is called periodically, with the subtotal. | |
def CountDS(outerresolve, promiseSpace, dsQuery, progressF = None): | |
def CountMap(itemkey): | |
return 1 | |
def CountReduce(counts, subtotal): | |
result = (subtotal if subtotal else 0) + sum(counts) | |
progressF(result) | |
return result | |
return MapDS(outerresolve, promiseSpace, dsQuery, CountMap, CountReduce, True) | |
# An example of how you might use CountDS | |
class CountResult(ndb.Model): | |
countname = ndb.StringProperty() | |
result = ndb.IntegerProperty() | |
def Count(self, dsQuery) | |
def UpdateSelf(result): | |
try: | |
self = self.key.get() # must reload self, will be old serialised version. | |
self.result = result.value | |
self.put() | |
except Exception, ex: | |
logging.exception("Counting failed") | |
promiseSpace = PromiseSpace(countname) | |
promiseSpace.when(CountDS, dsQuery).then(UpdateSelf) | |
CountResult("MyNdbObjectCount", MyNdbObject.query()).put() | |
Start at the bottom and work upwards to see how a simple counting method is implemented.
There might be much better ways of writing this; I’m not especially good at using these promises yet!
So, in closing, if you can see some value in this and you could help me with how to best publish this code, please contact me.
You must be logged in to post a comment.