Wednesday, 30 September 2015

Handling Redis messages in parallel

In the last post it looked like the rate of submission and completion of tasks was constant in time, but that the rate of completion of workflows was not.

There are two end member scenarios: if tasks were completed in strict workflow order then there’d be a steady progression of workflows completing, giving a constant rate; alternatively, if the first 8 tasks in every workflow were completed first, and the last task only started when every other workflow had been touched, then all workflows would complete almost instantaneously right at the end of the run. What I saw in the last results might represent a balance between the two.

The workflow completion behaviour might also be controlled or modified by the messages in flight from Redis, and by the way these are both published and picked up by the client (StackExchange.Redis). I’m assuming that Redis will not stall while it passes a given message to every consumer subscribed, and I’m assuming that StackExchange.Redis will make a separately-threaded callback into my code whenever a message is received, probably subject to some throttling.

There are a couple of things there that I can check. One is the number of running tasks at each moment; if we’re handling these in a multithreaded way (via StackExchange.Redis – there’s no such behaviour in the benchmark app yet) then it should be observable. The other thing to check is the size of the submitted queue.

I added some instrumentation to my benchmark app to output these and, voila:


The submitted queue size reaches a plateau then hovers there, which intuitively means that across all workflows we’re finishing one task and replacing it with another in the submitted queue. It tails off roughly in line with the workflow completion curve.

One very interesting point to take away there is that the number of running tasks at any time is 1.

That indicates that somewhere the processing of tasks is either bottlenecked or significantly throttled. I’m not doing anything in my app. Is StackExchange.Redis serializing the handling of the returned messages?

It turns out that it is. By default, StackExchange.Redis forces subscribed message handling to be sequential, principally because it’s safer: it stops clients having to worry about thread conflicts when processing the messages.

However, when you want to scale out it makes sense to let go and account for possible threading issues by design. In this case, each workflow task is intended to be a unit independent of any other, and so they’re embarrassingly parallelisable. In fact, currently the creation of the multiplexer is in the hand of the client, so they can make the choice themselves.

If I set ConnectionMultiplexer.PreserveAsyncOrder = false in the benchmark app, things improve considerably. Task churn leaps to 4k per second; the whole run completes sooner (210 seconds) with the push only finishing at 168 seconds.


There’s a lot less dispersion in the message handling – the workflow complete queues are much tighter, and the count of tasks completing follows the submitted count much more closely. After a suppressed start the system completes workflows at a steady rate.

For the bulk of the run task submission and completion, and workflow completion, proceed at a constant rate. That indicates that this solution might scale out with an increasing number of tasks or workflows.

It’s not all sunshine and rainbows. When repeated I’ve seen occasional timeouts which currently bring the workflows down. Superficially they seem to be correlated with the points at which Redis forks and saves out its logs – typically I see error messages like

Timeout performing EVAL, inst: 1, mgr: ExecuteSelect, err: never, queue: 22, qu: 0, qs: 22, qc: 0, wr: 0, wq: 0, in: 0, ar: 0, IOCP: (Busy=0,Free=1000,Min=4,Max=1000), WORKER: (Busy=20,Free=2027,Min=4,Max=2047), clientName: TIM-LAPTOP

This seems to indicate that 22 commands have been sent to the server (qs), but no response has been received (in); consequently it might worth increasing the timeout. If it is related to the Redis background fork-and-save, perhaps there’s something I can do there too.

Monday, 28 September 2015

First look at performance for workflow processing

I’ve set up a simple benchmarking app to get a look at how my new Redis-based workflow processing system was going, and used it to create 100k workflows, each containing 9 tasks. The workflow structure is identical for each: just a simple chain. I ran it and a Redis instance on my dev laptop, and gathered data about task submit and completion time, and workflow completion time – both actual, and when I received the Redis pub-sub message to confirm completion.


It took ~350 seconds to run through 900k tasks (each as close to a no-op as possible) giving an overall churn around 1k tasks per second. It took ~60 seconds to submit 100k workflows with initial tasks, or about 1.5k workflows per second.

Completion of tasks was noticeably suppressed during workflow submission, and as a result the number of completed tasks lags behind the number of submitted tasks. However, it looks like the rate of task submission and completion – the key parts of workflow turnover – are independent of the number of tasks to run, suggesting that this behaviour scales.

There are some features that I don’t immediately understand, however. The rate of workflow completion is not linear; completion is delayed, as you might expect, but the rate increases with time.

Additionally, there’s a delay between actual workflow completion and the delivery of the message that signals that a workflow has completed. The rate of delivery looks consistent with the rate of completion, but the delay is consistently about 50 seconds. Two things immediately occur to me; as the system’s using message passing to flag submission of new tasks that they might also be affected by a delay; and that arguably we might not need to wait for a message to tell us that a workflow has completed. As I’m marking tasks complete synchronously from the client, it could potentially tell me that there are either more tasks submitted as a result, or none, in which case I could reasonably take some sort of action (although that action would need to be shared across all task handling processes, if I had such).

In fact, there’s a problem with the way I’ve labelled the curves – it’s not obvious that workflow completion time is nonlinearly related to the number of remaining tasks. I’m making an assumption.

Next things to check: I’m going to add a periodic sampling of the queue size to add to this chart, and see if its possible to analyse the number of messages being posted. 

Wednesday, 23 September 2015

Pausing for thought

This post relates to code changes in this commit to my redis workflow project on GitHub, under this issue .

In my workflow project it’s time to consider how to pause a given workflow, and the first thing is to decide on what pause actually means.


At first sight it means “hold everything that’s running” and “stop anything new from starting”, but that turns out to be difficult to implement effectively generally, and has some particular wrinkles when using Redis.

It looks easy to stop anything new from starting: I can move all tasks for a given workflow to a state that means they won’t get picked up; let’s say that state is labelled paused, because frankly, this gets complicated enough. To release the tasks all I’ll need to do is move all the paused tasks for this workflow back to the submitted state.

Except: the fact I’m asking for all the submitted (or paused) tasks for a specific workflow means we have a decision to make. At the point of time this functionality was added all these tasks were together in a global submitted state. I could loop through every id in the submitted set, use those ids to lookup tasks and fetch their workflow ids, then decide whether each one needs to be acted on.

Alternatively I could populate an additional set when tasks transition to each new state, keyed by workflow id. When tasks in workflow 1 move to the submitted state, for example, we could add them to a distinct set with key submitted:1. That makes finding tasks in a specific state for a specific workflow much easier, and is the approach I’m taking. It means S x W new sets at worst, where S is the number of states and W the number of workflows. At the scale I’m aiming for that means ~600,000 sets, assuming no clean-up after workflows.

So, when someone asks to pause a workflow we’ll just shift all tasks from submitted state to paused, yes? And on release, we should just move everything from paused to submitted? Not quite.

I’m deciding to let running tasks run to completion. As currently running tasks complete (or fail) they might cause further tasks to be submitted, which isn’t the behaviour we want. It makes sense then to have the completion logic know that if the completed task has moved to the paused state then it shouldn’t cause its dependencies to be submitted, although it can be transitioned to is final desired end state (complete, or failed).

However, when it comes to releasing the workflow again, what should happen to the children of those completed tasks, if all their parents have completed? If I don’t do something then they won’t get picked up when someone asks to release the workflow – there’s nothing left to complete and trigger their submission.

Instead, if a paused task completes and its children become eligible for submission they should be placed in the paused state themselves. Then, when someone requests that the workflow be released, they’ll shift back to the submitted state.

On release, then, that takes care of ensuring I don’t lose track of any tasks, assuming that the release command comes after all outstanding running tasks have completed.

If the release request comes while tasks are still running down, however, releasing the workflow  will be difficult. We won’t know whether to move the tasks to the submitted or running state, because there’s no differentiation in source state for tasks in the paused set. We’ve lost that information. If we shift a task that’s being executed somewhere to the submitted state on release then it might get run twice in parallel, or get confused when it comes to submit and finds it’s not in an expected state.


The (hopefully) final puzzle piece is then to record the last state of each task, so when a release request is issued we can move it back to the appropriate state. Running tasks should get moved back to running, submitted tasks to submitted. Tasks that have completed already won’t be in the paused state, so won’t get considered for release, and we’ll need to tag newly runnable tasks with a fake last state of submitted.


That should means we don’t lose any information about the state of the workflow during pause and release operations. For such a simple thing it’s still quite possible to get wrong; making a few assumptions and decisions explicitly helps a great deal. I decided to:

1. Stop anything new from starting.

2. Let executing tasks run to completion.

3. Create lookups to answer specific questions.

4. Avoid duplicate task execution.

all of which seem reasonable currently; let’s see how the next chunk of functionality changes that.

Friday, 11 September 2015

Picking up where you left off

One thing is certain in any live system: something will break and leave the system in an inconsistent state. If your system isn’t designed with this as a core assumption – not that things will work, but that things will break – then your design is wrong.

Less catastrophically, there are occasions where you want to turn part or all of a system off. Without thought this will have the same effect: the system will be left in an inconsistent state.

In a distributed workflow this generally means that tasks are unofficially abandoned, as the agent responsible for running them can’t complete them and update their state. If a system is deliberately stopped then you have a chance to manage that state properly. You could:

  1. wait for all running tasks to complete before stopping the system, while preventing any new tasks from running, or
  2. transition any running tasks to an appropriate pre-running state.

Typically any final solution comprises some element of the second plan; under the first plan you might still want to be careful about giving tasks a maximum timeout duration, and if tasks reach that timeout then you have no option but to reset them to a pre-running state.

If your tasks define work that’s to be undertaken on a separate system – say a distinct compute cloud – then resetting tasks you might still have some underlying problems to solve if that compute infrastructure doesn’t allow you to cancel running work.

In each case, however, you have a chance to transition tasks to some desired state before agents stop. If an agent has failed you typically don’t get that chance, and you need to find some other way of managing the state. There are a few options for doing that, among them:

  1. just forget; have some backstop process gather up tasks that haven’t had a state update within a certain duration and automatically reset them to a pre-running state, or
  2. make it possible for agents to reconnect to their previous state and decide what to do with abandoned tasks.

The first option there is quite simple, but has implicit problems. How do you choose the duration after which task state gets reset? You run this risk of either resetting in-flight tasks, or leaving tasks abandoned for so long that their results are no longer required. At worst, this backstop is a manual process.

The second presents more explicit problems: how does the system know which state is or was associated with which agent? Two things are needed: first each agent needs to have an identity that’s maintained in time (or at any time there needs to be an agent that’s acting under a given identity), and second that identity needs to be associated with task state when it takes responsibility for it, and dissociated when it gives up responsibility – when it starts and stops the task.

Maintaining a system of agents with distinct identities is relatively straightforward. The trick is to have any new agent process assume one from a pool of identities when it starts up. The actual technique chosen is heavily influenced by the mechanism you use to manage agent processes, but could be as simple as an argument passed in on a command line – “you are agent 1, the first thing you need to do is work out what the last agent 1 was doing”.

Associating an identity with task state is a little more complicated, as there are some options for where that association state could sit. It could be stored locally by the agent; every time it takes a task, it adds the tasks identifier to its local store. If another agent takes its place, it can use that stored task state to reconnect with what was running. Alternatively, it could be stored alongside the task state in whatever state store’s being used.

In each case information from one domain is leaking into another as responsibility is taken and given up for task state. The important thing is to notice that an exchange of information is taking place – when a task is taken, task data is being exchanged for the an agent identity – and that that exchange must happen in a transactional manner so that information isn’t lost.

For example – an agent could pop a task from the task state store, and then store that task information in a local sqllite database. If the agent dies between popping the task and storing the data in a local database then information is completely lost.

That’s bad, and something to be avoided.

If the agent presents its identity information to the state store as part of the task pop process however, the state store can associate that identity with the now-running task as part of the same transaction –- safely.

The latter approach is what I’ve taken with the workflows-on-Redis project I’ve been working on recently in order to enable the recovery of tasks, and ensure that none of that information is lost when taking tasks in the first place.

Thursday, 10 September 2015

It turns out I'm not a good blogger :)

Embarrassingly I've had a good bundle of comments waiting for moderation for some time! Apologies to everyone who's got in touch -- I know full well how frustrating it is to reach out to someone's blog and be met with slence -- and thanks for the interest, comments and definitely for the corrections!