Loading

- Generalized Log-Likelihood Test for Poisson Distribution
- How Accurate is Mahout for Summing Numbers?
- Learning to Rank, in a Very Bayesian Way
- References for On-line Algorithms
- Bayesian Bandits
- Buzzwords Keynote - Conclusion
- Buzzwords Keynote - Part 3
- Buzzwords Keynote - Part 2
- Buzzwords Keynote ... blog edition
- Buzzwords Wrapup
- The Best Illustration of a probability Distribution
- Visit to DIMA at Technische Universitaet
- A Tour of the Multi-update For Zookeeper
- Online algorithms for boxcar-ish moving averages
- Exponentially weighted averaging for rates
- Update on exponential time-embedded averages
- Exponential weighted averages with irregular sampling
- Recent lecture on Mahout for SDForum
- Mahout 0.4 released!
- New Mahout release coming
- Why is the sum of two uniform randoms not uniform?
- Sadder things
- Word Count using Plume
- The new grool
- Hadoop user group AKA Mahout Users Anonymous

In another blog, I described how a generalized log-likelihood ratio can be used to find interesting differences in counts. In the simplest cases, we have counts for some kind of observation (A and not A) under two conditions (B and not B). The question of interest is whether the the rate of A varies with whether or not condition B applies. In classical statistics, we would be talk about having a test against a null hypothesis, but in practice we want to look at lots of As under lots of different conditions B. Testing so many situations makes it very hard to use the classical machinery well, so we have to look at the problem a bit differently.
For a Poisson distribution under two conditions $A$ and $\neg A$, we can observe a count for each of the conditions as well as the total time over which the count is taken. We can arrange our results this way:
This is suggestive of the way that counts from two binomial observations can be arranged to look for a difference under different conditions \cite{dunning93}.
We can investigate whether the Poisson distribution the same under both conditions using the generalized log-likelihood ratio test. Such a test uses $\lambda$, the generalized log-likelihood ratio,
\[
\lambda = \frac{
\max_{\theta_1 = \theta_2 = \theta_0} p(k_1 | \theta_0)p(k_2 | \theta_0)
}{
\max_{\theta_1, \theta_2} p(k_1 | \theta_1)p(k_2 | \theta_2)
}
\]

As I mentioned in other blogs, we can still use a classically derived test known as the generalized log-likelihood ratio as a way of simply ranking different A-B combinations against each other according to how interesting they are. Even without being able to interpret the statistical score as a statistical test, we get useful results in practice.

The generalized log-likelihood ratio most commonly used in these situations is derived assuming we have two binomial observations. This test can be extended to compare two multinomial conditions for independence, but this is rarely done, if only because comparing two binomials is so darned useful.

With the binomial test, we look at the number of positive observations out of some total number of observations for each condition. In some situations, it is much more natural to talk about the number of positive observations not as a fraction of all observations, but as a fraction of the duration of the condition. For instance, we might talk about the number of times we noticed a particular kind of network error under different conditions. In such a case, we probably can say how long we looked for the errors under each condition, but it can be very hard to say how many observations there were without an error.

Count
$\Delta t$

$A$

$k_1$

$t_1$

$\neg A$

$k_2$

$t_2$

According to Wilks\cite{wilks1938} and also later Chernoff\cite{Chernoff1954}, the quantity $-2 \log \lambda$ is asymptotically $\chi^2$ distributed with one degree of freedom.
For the Poisson distribution,
\[
p(k | \theta, t) = \frac{(\theta t)^k e^{-\theta t}}{k!} \\
\log p(k|\theta, t) = k \log \theta t - \theta t - \log k!
\]
The maximum likelihood estimator $\hat\theta$ can be computed by maximizing the log probability

\[
\max_\theta \frac{(\theta t)^k e^{-\theta t}}{k!} = \max_\theta \log \frac{(\theta t)^k e^{-\theta t}}{k!} \\
\log \frac{(\theta t)^k e^{-\theta t}}{k!} = k \log \theta t - \theta t - \log k! \\
\frac{\partial \log L(k | \theta, t)}{\partial \theta} = \frac{k}{ \theta} - t
=0 \\
\hat \theta = k
\]
Returning to the log-likelihood ratio test, after some cancellation we get

\[
-\log \lambda =
k_1 \log k_1 +
k_2 \log k_2
- k_1 \log \frac{k_1+k_2}{t_1+t_2} t_1 - k_2 \log \frac{k_1+k_2}{t_1+t_2} t_2
\]

Some small rearrangement gives the following preferred form that is very reminiscent of the form most commonly to compute the log-likelihood ratio test for binomials and multinomials
\[
-2 \log \lambda = 2 \left( k_1 \log \frac{k_1}{t_1} +
k_2 \log \frac{k_2}{t_2} - (k_1+k_2) \log \frac{k_1+k_2}{t_1+t_2}
\right)
\]

A question was recently posted on the Mahout mailing list suggesting that the Mahout math library was "unwashed" because it didnt use Kahan summation. My feeling is that this complaint is not founded and Mahout is considerably more washed than the original poster suggests. Here is why I think this.
As a background, if you add up lots of numbers using a straightforward loop, you can lose precision. In the worse case the loss is \(O(n \epsilon)\), but in virtually all real examples the lossage is \(O(\epsilon \sqrt n)\). If we are summing a billion numbers, the square root is \(\approx 10^5\) so we can potentially lose 5 sig figs (out of 17 available with double precision).
Kahan summation increases the number of floating point operations by \(4 \times\), but using a clever trick and manages to retain most of the bits that would otherwise be lost. Shewchuk summation uses divide and conquer to limit the lossage with \(O(\log n)\) storage and no increase in the number of flops.
There are several cases to consider:
1) online algorithms such as OnlineSummarizer.
2) dot product and friends.
3) general matrix decompositions
In the first case, we can often have millions or even billions of numbers to analyze. that said, however, the input data is typically quite noisy and signal to noise ratios \(> 100\) are actually kind of rare in Mahout applications. Modified Shewchuk estimation (see below for details) could decrease summation error from a few parts in \(10^{12}\) to less than 1 part in \(10^{12}\) at minimal cost. These errors are \(10^{10}\) smaller than the noise in our data so this seems not useful.
In the second case, we almost always are summing products of sparse vectors. Having thousands of non-zero elements is common but millions of non-zeros are quite rare. Billions of non-zeros are unheard of. This means that the errors are going to be trivial.
In the third case, we often have dense matrices, but the sizes are typically on the order of \(100 \times 100\) or less. This makes the errors even smaller than our common dot products.
To me, this seems to say that this isnt worth doing. I am happy to be corrected if you have counter evidence.
Note that BLAS does naive summation and none of the Mahout operations are implemented using anything except double precision floating point.
Here is an experiment that tests to see how big the problem really is:
@Test
public void runKahanSum() {
Random gen = RandomUtils.getRandom();
double ksum = 0; // Kahan sum
double c = 0; // low order bits for Kahan
float sum = 0; // naive sum
float[] vsum = new float[16]; // 8 way decomposed sum
for (int i = 0; i < 1e9; i++) {
float x = (float) (2 * gen.nextDouble() - 1);
double y = x - c;
double t = ksum + y;
c = (t - ksum) - y;
ksum = t;
sum += x;
vsum[i % 16] += x;
}
// now add up the decomposed pieces
double zsum = 0;
for (int i = 0; i < vsum.length; i++) {
zsum += vsum[i];
}
System.out.printf("%.4f %.4f %.4f\n",
ksum, 1e6 * (sum - ksum) / ksum,
1e6 * (zsum - ksum) / ksum);
}

A typical result here is that naive summation gives results that are accurate to within 1 part in \(10^{12}\) 8 way summation manages \(< 0.05\) parts in \(10^{12}\) and 16 way summation is only slightly better than 8 way summation.

If the random numbers being summed are changed to have a mean of zero, then the relative error increases to 1.7 parts in \(10^{12}\) and 0.3 parts in \(10^{12}\), but the absolute error is much smaller.

Generally, it doesnt make sense to do the accumulation in floats because these operations are almost always memory channel bound rather than CPU bound. Changing to floating point arithmetic in spite of this decreases the accuracy to about 500 parts per million, 200 parts per million respectively for naive summation and 8 way summation

The problem of ranking comments by a crowd-sourced version of "quality" is a common one on the internet.
James Neufeld suggests that Bayesian Bandit algorithms can be applied to this problem. The basic idea is that you would define a stochastic quality metric whose distribution for each comment depends on the up and down votes that comment has received.
Normal ranking algorithms try to estimate the single best value for this quality metric. Neufeld suggests that this value should be sampled from a beta distribution which models the probability that a user would mark the comment positively given that they have marked the comment at all. To present comments to a user, the metric would be sampled independently for each comment and the comments would be sorted according to the resulting scores. Different presentations would necessarily result in different orders, but as users mark comments positively or negatively, the order should converge to one where the comments presented near the top of the list have the highest probability of being marked positively.
One very nice thing about this approach is that it doesnt waste any cycles on determining the ranking of low quality comments. Once the quality of these comments has been determined to be relatively lower than the best columns, no more learning need be done with those comments. This accelerates learning of the ranking of the best options dramatically.
The output is a thousand lines of numbers that you can drop into R, OmniGraphSketcher or even Excel to produce a plot like the one above.
QUICK CODE DISSECTION

This idea is interesting enough that I built a quick implementation which you can find on github. The main sample code there invents several hundred "comments" each with a uniformly sampled probability of getting a positive rating. The ideal behavior for ordering the comments would be to put the comment with the highest probability of getting a positive rating first and the one with the lowest probability last. The way that the program proceeds is that it picks a pageful of twenty comments to show and then proceeds to generate ratings for each of the comments on that page according to the underlying probability associated with the items displayed. The process of generating pages of comments to show and applying feedback is repeated and performance is measured.

Here are some results of running the program. Here we have 200 total comments, of which 20 are shown on the page that defines which comments are rated. Precision is measured here to determine how many of the best 10 comments are shown on the page. As can be seen, the system shows immediate improvement as ratings are collected. The performance rises from the initially random 10% precision and passes 50% after 30 pages of ratings.

As James demonstrated in his article and as others have demonstrated elsewhere, this class of algorithm is very effective for this sort of bandit problem. What is much less well known is how easily you can build a system like this.

TRY IT YOURSELF
To run this code, you will need git, maven and java 1.7. To download the source code and compile the system, do this

$ GIT CLONE GIT://GITHUB.COM/TDUNNING/BANDIT-RANKING.GIT

$ CD BANDIT-RANKING

$ MVN PACKAGE

This will download all dependencies of the code, compile the code and run some tests. To run the test program, do this
$ JAVA -JAR TARGET/BANDIT-RANKING-*-WITH-DEPENDENCIES.JAR

In com.mapr.bandit.BanditRanking, the main program for this demo, a BetaBayesFactory is used to construct several BayesianBandit objects (for average results later). This pattern can be used with other kinds of bandit factories.

The BayesianBandit objects allow you to do a variety of things include sampling (BayesianBandit.sample) for the current best alternative, ranking (BayesianBandit.rank) a number of alternatives and providing training data (BayesianBandit.train). Sampling is used in a traditional multi-armed bandit setting such as with A/B testing. Ranking is used as it is here for getting a list of best alternatives and training is used ubiquitously for feeding back training data to the bandit.

Evaluation can be done by computing precision as is done here (how many good items are in the top 20?) or by computing regret. Regret is defined as the difference between the mean payoff of the best possible choice and the mean payoff of the choice made by the bandit. For the ranking problem here, I assume that payoff of a page is the sum of the probabilities of positively rating each item on a page.
The BetaBayesFactory internally uses a beta-binomial distribution to model the likelihood of a positive rating for each rank. A more general alternative would be to use a gamma-normal distribution. This can be done by using the GammaNormalBayesFactory instead. This extra generality comes at a cost, however, as the graph to the left shows. Here, the beta-binomial distribution results in considerably faster convergence to perfect precision than the gamma-normal. This is to be expected since the beta-binomial starts off with the assumption that we are modeling a binary random variable that can only take on values of 0 and 1. The gamma-normal distribution has to learn about this constraint itself. That extra learning costs about 50 pages of ratings. Put another way, the cumulative regret is nearly doubled by the choice of the gamma-normal distribution.

In order to understand what the algorithm is really doing at a high level, the graph on the right is helpful. What it shows is the number of times comments that are at different ranks are shown. What is striking here is that comments that are below the fourth page get very few trials and even on the second page, the number of impression falls precipitously relative to the first page of comments. This is what you would expect because in this experiment, it takes only a few ratings on the worst comments to know that they stand essentially no chance of being one of the best. It is this pattern of not sampling comments that dont need precise ranking that makes Bayesian Bandits so powerful.

[See also my more recent blog on this talking about using bandits for ranking rated items like comments]
I have been speaking lately about how various on-line algorithms have substantial potential for various real-time learning applications. The most notable of these algorithms are Thompson sampling for real-time handling of multi-armed bandit and contextual bandit problems and an algorithm due to Shindler, Myerson and Wang for fast $k$-means clustering of data. Since I have had a number of requests for references back to the original sources for these works, I figured a few blog posts would be a good thing. This post will describe the multi-armed bandit work and the next will describe the clustering work.
The Basic Problem - Multi-armed Bandits
For the bandit problems, there are two basic problems to be dealt with. The first and most basic problem is that of the multi-armed bandit. In this problem, you can sample from any of a finite number of distributions and your goal is to maximize the average value of the values that you get. This can be cast into a number of practical settings in which you select which slot machine to put a quarter into, or you select which on-line ad to present to a user or you select which landing page to deliver to a users browser should see when they visit a particular URL. It is common to simplify this case further by assuming a stationary distribution. Obviously, at least one of the distributions you are picking from has a mean equal to the large mean of any alternative. Any time you take a sample from a distribution that has a smaller mean, you fall behind the theoretical best, on average, that you could have achieved by picking from (one of) the best distributions. The degree by which you fall behind is known as the regret that you incur.
The key to the multi-armed bandit problem is that you cannot know which distribution might have the largest mean. This means that you have to sample all of the distributions in order to estimate their means, but this implies that you have to sample from the lesser distributions in order to determine that their are, in fact, inferior.
There are well known bounds on how well you can actually solve this problem. There are also a number of algorithms that have regret on par with these bounds or come reasonably close to these bounds. Mostly, however, these known solutions have limitations either on the number of distributions they can consider or on the complexity of the solution.
Kuleshov and Precup provide some good examples of how to compare different bandit algorithms in their paper. This tutorial on bandits provides a wider view of different forms of multi-armed bandit problems with a number of references.

Conspicuously missing from most lists of references, however, is all the recent work using Thompson sampling. These algorithms, which I have referred to as Bayesian Bandits, have particularly nice properties of simplicity and optimality. Chapelle and Li provide an empirical look at performance with these algorithms compared to upper confidence bound (UCB) algorithms. The last paragraph of that paper laments the lack of a theoretical analysis of these algorithms, but that lack was cured shortly in this paper by Agrawal and Goyal. Scott provided a more comprehensive view of these algorithms under the name of randomized probability matching.

The idea behind Bayesian Bandits is quite simple. For each bandit, we maintain use the observations so far to build a posterior distribution for the mean of the associated payoff distribution. For binary payoffs, it is common to use a $\beta$-binomial distribution and for other cases a $\gamma$-normal distribution works well. To pick a bandit, we sample a mean for each bandit from these posterior distributions and then pick the bandit with the largest sampled mean. The new sample from that bandit gives us more data which refines the posterior distribution for that bandit. We can repeat this process as long as desired.

Extensions To Contextual Bandits
One of the most important characteristics of the Thompson sampling approaches (aka randomized probability matching aka Bayesian Bandits) is that they can be extended to more complex situations. One setting that I have found particularly useful involves optimizing return not just from a few bandits, but from a parameterized set of bandits that could conceivably even be infinite. The transformation from the parameters to the bandit distribution is unknown, but if we could know that, we would be able to search the parameter space to find the bandit with the highest mean payoff.

This formulation is a generalization of the previous case because we can take the parameter to be an integer from $1 \ldots k$ where there are $k$ bandits and the transformation consists of the mean payoffs for each of the $k$ bandits.

The algorithm in the contextual case simply consists of sampling the transformation from some posterior distribution and then solving for the parameters of the bandit that we would like to use. Some of the parameters might be fixed by the context we are working in which is where the name contextual bandits comes in.

The paper by Scott alludes to this formulation, but the most approachable work on this that I know of is the paper by Graepel, Candela, Borchert, and Herbrich. In this paper, they describe the operation of AdPredictor, a system used by the Bing search engine to target ads using context.

The basic idea with Bayesian Bandits is to solve the problem of the explore/exploit trade-off in a multi-armed bandit by keeping a distributional estimates for the probability of payoff for each bandit, but avoiding the cost of manipulating distributions by sampling from those distributions and then proceeding on each iteration as if that were a point estimate.
The advantage that this is that bandwidth assignments will be made according to the best estimate of the probability that each bandit could possibly be the best. This automatically causes the system to do exploration as long as it is plausible and causes the system to smoothly transition to exploitation when it becomes clear which bandit is the best. Essentially what this gives us is a Bayesian implementation of active learning.
To illustrate this, here is a graph that shows the posterior distribution of conversion probability for two bandits where we have lots of history for one and only a little history for the other.
You can see that the red bandit with 100 conversions out of 1000 impressions mostly like has a probability of conversion of 0.1, more or less a bit. The blue bandit with no conversions out of 10 impressions is very likely worse than the red bandit, but there is a small possibility that it is better. If we just picked the average or mode of these distributions, we would conclude that the blue bandit is worse and wouldnt give it any bandwidth without substantial mechanisms to over-ride this decision.
On the other hand, if we estimate a conversion probability by sampling and use that sampled estimate for targeting, then we will give the blue bandit a little bandwidth and thus a chance to redeem itself.
There are several aspects of the Bayesian Bandit algorithm that are exciting.
* exploration and exploitation are handled uniformly in the same framework
* our internal representation encodes all of the information that we have so we dont confuse evidence of failure with lack of evidence for success
* the algorithm only requires a few lines of code
* the updates for the algorithm can be expressed in terms of back-propagation or stochastic gradient descent
* the performance is really good.

As a bit of a teaser, here are a few graphs that describe how the Bayesian Bandit converges in simulated runs. The first graph shows how the average total regret for the Bayesian Bandit algorithm approaches the ideal as the number of trials increases. This experiment uses normally distributed rewards with $\sigma = 0.1$. Any algorithm that meets the optimal $O(\log n)$ convergence lower bound is said to "solve" the bandit problem.

The convergence here is very close to the ideal convergence rate. Note that this graph includes the convergence to optimal payoff, not the convergence knowing which is the better bandit. This is actually an interesting aspect of the problem since the algorithm will converge almost instantly for cases where the conversion probabilities are highly disparate which will make the payoff converge quickly. For cases where the conversion probabilities are nearly the same, it will take a long time for the algorithm to determine which is the better bandit, but exploration is not expensive in such a case so the convergence to near-optimal payoff will be even faster than the case where the conversion rates are very different.

For example, here is a graph of the probability of picking the better bandit where the conversion rates are nearly the same. As you can see, it takes quite a while for the algorithm to split these two options. The average payoff, however, only changes from 0.11 to 0.12 during this entire convergence and it has already reached 0.118 by the time it is 20% into the process so the cost of a long experiment is not that high.

Sample code for the Bayesian bandit is available at https://github.com/tdunning/storm-counts.

In the end, it is up to us to make things better. We need a way for non-Apache entities to interact with Apache productively. If we cant do that, then it is quite possible that all of the momentum and excitement that Hadoop now has will be lost.

Conclusion

The key is that we now have an eco-system, not just a community. We can make it work. Or we can elect to let it not work. Not working is the default state. We have to take positive action to avoid the default.

Apache can stay a strong voice for business friendly open source by remaining Apache. Trying to make Apache broad enough to include all of the players in Hadoop and Hadoop derivatives will simply debase the voice of Apache into the average of opposing viewpoints, i.e. into nothing.

There _are,_ however, many other players who are not part of Apache and who should probably not be part of Apache. There needs to be a way for these others to engage with the Apache viewpoint. It cant just be on the level of individuals from Apache trying to informally spread the Apache way even though that is critical to have. It is likely to require a venue in which corporate entities can deal with something comparable to themselves. A good analogy is how Mozilla participation in W3C has made the web a better place.

But we can make our eco-system work. It isnâ€™t what it was and it never will be again.

But it can be astonishing

Lets make it so.

In the third part of my talk, I talked a bit about where Hadoop has come from and where it is going. Importantly, this involves a choice about where Hadoop and the related companies products and individuals might be able to take things.

Where we are and how we got here

My second section described the rough state of the Hadoop eco-system is a slightly provocative way. In particular, I described a time when I was on a British train and in partial compensation for delays the operators announced that "free beer would be on sale in the galley car". Free beer for sale is a wonderful analogy for the recent state of Hadoop and related software.

That said, there are serious problems brewing. The current world of Hadoop is largely based on the assumption that the current community is all that there is. This is a problem, however, because the current (Apache-based) community presumes interaction by individuals with a relatively common agenda. More and more, however, the presence of a fundable business opportunity means that this happy world of individuals building software for the greater good has been invaded by non-human, non-individual corporations. Corporations cant share the same agenda as the individuals involved in Apache and Apache is constitutively unable to allow corporate entities as members.

This means that the current community can no longer be the current world. What we now have is not just a community with shared values but is now an eco-system with different kinds of entities, multiple agendas, direct competition and conflicting goals. The Apache community is one piece of this eco-system.

Our choice of roads

Much as Dante once described his own situation, Hadoop now finds itself in the middle of the road of its life in a dark wood. The members of the Apache community have a large voice in the future of Hadoop and related software.

As a darker option, the community can pretend that the eco-system that now exists of human and corporate participants is really a community. If so, it is likely that the recent problems in moving Hadoop forward will continue and even get worse. Commit wars and factionalization are likely to increase as corporate entities, denied a direct voice in Apache affairs, will tend to gain influence indirectly. Paralysis in development will stall forward progress of Hadoop itself leading to death by a thousand forks. Such a dark world would let alternative frameworks such as Azure to gain footholds and possibly to dominate.

In this brighter alternative future, I think that there are ways to create a larger forum in which corporate voices can be heard in their true form rather than via conflicts of interest. In this scenario, Apache would be stronger because it really can be a strong voice of the open source community. Rather than being the average of conflicting views, Apache would be free to express the shared values of open source developers. Corporations would be able to express their goals, some shared, some not in a more direct form and would not need so much to pull the strings of Apache committers. Importantly, I would hope that Hadoop could become something analogous to a reference implementation and that commercial products derived from Hadoop would have a good way to honor their lineage without finding it difficult to differentiate themselves from the original. Hopefully in this world innovation would be welcomed, but users would be able to get a more predictable experience because they would be able to pick products offering whatever innovation rate/stability trade-off that they desire. Importantly, there would be many winners in such a world since different players would measure success in different terms.

We have a key task ahead of us to define just what kind of eco-system we want. It can be mercenary and driven entirely be corporate goals. This could easily happen if Apache doesnt somehow facilitate the creation of a forum for eco-system discussion. In such an eco-system, it is to be expected that the companies that have shown a strong talent at dominating standards processes and competing in often unethical ways will dominate. My first thought when I imagine such a company is Microsoft, but that is largely based on having been on the receiving end of their business practices. I have no illusions that talent for that kind of work is exclusively found in Redmond.

In my talk, I proposed some colorful cosmological metaphors for possible worlds, but the key question is how we can build a way for different kinds of entities to talk. It is important to recognize different values and viewpoints. Apache members need to understand that not everything is based on individual action, nor do corporation hold the same values. Companies need to take a strong stance to recognize the incredible debt owed to the Apache community for creating the opportunities we all see.

If we can do this, then Hadoop (and off-spring) really does have a potential to dominate business computing.

In the first part of the talk, I made the case that Apache Hadoop has lots of head-room in terms of performance. This translates into lots of opportunity both for open source developers to make Hadoop itself better, but also for companies to build products that derive from Hadoop but improve it in various ways.

The $S$ score

In honor of Steve Jobs whose highest praise is reputedly to say "that doesnt suck", I proposed an $S$ score whose highest score is zero, but for all real systems is always negative. For a batch, data processing system like Hadoop, I proposed that a good definition of $S$ was the log base 10 of the ratio of the actual performance to the performance implied by hardware limits.

Not suprisingly, the overall score for Hadoop comes out to be somewhere between -5 to -2 depending on desired workload (i.e. Hadoop runs programs somewhere between 100 and 100,000 times slower than the hardware would allow). For some aspects, Hadoops $S$ score can be as good as $-0.5$ but generally there are multiple choke-points and some of these are additive. This is hardly news and isnt even a mark of discredit to Hadoop since the developers of Hadoop have always prized getting things to work and to work at scale above getting things to work within an iota of the best the hardware can do at a particular scale. Another factor that drives $S$ down for Hadoop is the fact that the hardware we use has changed dramatically over the 6-7 year life of Hadoop.

In defining the value of $S$ for current Hadoop versions, I dont mean to include algorithm changes. Michael Stonebraker has become a bit famous for running down Hadoop for not doing database-like things with database-like algorithms, but I would like to stick to the question of how fast Hadoop could do what Hadoop is normally and currently used to do.

The key conclusion is that having such a low $S$ combined with high demand for Hadoop-like computation represents a lot of opportunity. This opportunity involves opportunities for the open source community to make things better. It also represents opportunities for commercial companies to make money. The latter kind of opportunity is what is going to shake up the currently cozy Hadoop community the most.

There has been a bit of demand for an expanded version of my Buzzwords keynote from a few weeks ago. This demand has been increased by a particular unfortunate mis-quote in a tweet that suggested that I thought that there was a need for a new organization "to supersede Apache". Of course, I suggested nothing of the sort so it is a good idea to walk through the ideas that I presented. The buzz words site has the video of my talk and pdf of my slides in case you want to follow along.
The talk was divided into several sections. The first one proposed the uncontroversial thesis that Hadoop performs at a level far below the potential offered by modern hardware. A second section pointed out difficulties with the current social structure surrounding the development of Hadoop and related software. I then examined what I see as possible futures while describing how I think we will be choosing between these alternative futures. I will post each section in a separate blog entry.

As I spoke, I encouraged the audience to tweet using the hash-tag #bbuzz and to keep communal notes on a shared Google document. The tweets are a bit hard to find as befits ephemeral media, but the shared notes are still accessible.

Sections:
The S Score
Possible Futures
Conclusion
Well, Buzzwords is over and my primary conclusion is that I wish I had come last year as well as this year. Isabel and Simon are really making Buzzwords into a major open source conference and with the demise of the European ApacheCon, Buzzwords is probably the the first or second most important open source conference in Europe. If you only can choose one, I would strongly recommend geeking in Berlin. It isnt just the conference; there are bunches of related events such as informal dinners, bar camps and hackathons. Since Buzzwords makes such a strong effort to include North American participants you may even have a better chance of connecting globally by going to Europe than going to a conference in the US or Canada.
The conference itself consisted of two days of scheduled events anchored by keynotes each day. Doug Cutting gave the first keynote and covered a lot of the history and current state of Hadoop. As always, his talk was very well done and contained quite a bit of technical information which is refreshing in a keynote. I gave the second keynote and talked a bit about the state and future of Hadoop, related Apache projects and the burgeoning commercial marketplace. Some of what I said stirred up a bit of talk, which is good since my primary thesis that we arent talking enough about how the world of Hadoop and related software is rapidly changing in ways that arent well recognized. Stay tuned here for a blog edition of my talk.
There were quite a few excellent technical talks as well. Among the scheduled talks, Jonathan Gray gave a talk which his usual and customary dose of excellent technical information about how Facebook is using Hbase. A notable moment came when he was asked about the state of Cassandra at Facebook. Check out the upcoming video for details on his answer.
Dawid Weiss gave an excellent talk on finite state automata and the difference between deterministic and non-deterministic variants. The only defect I could see in his presentation was that we couldnt see the eagles on the coins. Based on the fact that the room was packed (I sat in the aisle on the floor) and the very eager audience questions, I would say that there is a surprisingly strong market place for information on foundational algorithms like finite state transformers.
The lightning talks at the end of day two also had some gems. Thomas Halls northern accent blended charmingly with the frank assessment of some of his experiences with certain technical approaches. I cant possibly convey the tone and content so, yet again, you will need to refer to his slides and the video on the conference web-site.
Frank Scholten also had a lightning talk that contained a very nice walk-through of Mahout document clustering. What he showed is a work in progress, but already what he has provides a highly requested set of recipes to illustrate a lot of the software in Mahout.
Outside of the conference there was an (excellent) barcamp run by Nick Burch. I think I learned as much about how to run a barcamp by watching him as anybody did from any of the technical discussions and the technical discussions were pretty excellent.
I have to say that if you want to see me next year in early June, there is a high likelihood that you will have to be in Berlin to do it.
See http://berlinbuzzwords.de/wiki/linkstoslides to get slides from talks.

I had a great visit today at the DIMA laboratory at TU in Berlin. They are working on an interesting system called Stratosphere which provides an interesting generalization generalization of map-reduce. Of particular interest is the run-time flexibility for adapting how the flow partitions or transfers data.
They accomplish this by having a lower level abstraction layer that supports a larger repertoire of basic options beyond just map and reduce. These operations include match, cross product and co-group. Having a wider range of operations and retaining some additional flow information at that level allows them to do on-the-fly selection of the detailed algorithm for different operations based on the statistics of the data and the properties of the user-supplied functions.
Heres a pic of me answering questions about startups and log-likelihood ratio tests.

I have recently been working on some new features for Apache Zookeeper that represent probably the largest change in the Zookeeper client API in several years. Other recent changes such as observers and read-only clusters have changed the server-side semantics, but the client API is nearly unchanged since the original public release of Zookeeper several years ago. The JIRA covering the overall issue is Zookeeper-965 and you can look at the code as it is being refined into committable state in my fork of Zookeeper that I keep on github.
(related announcement)
Almost from the beginning of the Zookeeper project, there have been repeated questions on the mailing about how to update several nodes at once. The answer has always been to consolidate the structures that you need to update atomically into the contents of a single znode and then update those contents atomically using standard methods. This update problem is often exacerbated by a desire to use ephemeral nodes so that state disappears correctly when a service crashes.
This is a pretty reasonable answer and it handle many important cases fairly well. For instance, in the common case of servers that are assigned certain roles and then in turn advertise which roles they have successfully taken on, this pattern can be used by giving each server a single file of assignments and a single file of advertised capabilities. Each of these files can be ephemerally created by the server so they will both vanish when the server disappears. Assignments can be added atomically and only the server ever modifies the advertised roles. Zookeeper works quite well in these cases.
Keeping track of a graph full of nodes with directional connections to other nodes is a good example of where Zookeepers update model is not so nice. In this case, nodes have a list of connections to other nodes and a list of connections from other nodes. If any node has a list of outgoing connections that are not matched by the corresponding list of incoming connections on other nodes, then the graph is referentially inconsistent. Keeping the graph consistent under changes is not possible with normal Zookeeper updates unless you store the entire graph in a single file.
Lazy Operations
The new multi() method allows such a data structure to be updated with strong guarantees of consistency. This is done by currying the normal database mutation operators and then passing all of the resulting closures to Zookeeper at once for execution. Obviously this requires a bit of syntactic sugar in languages like Java and C which dont like to express closures directly.
The way that this works is that there is a new class call Op. There are sub-classes of Op called Op.Create, Op.SetData, Op.Check and Op.Delete that correspond to the operations normally invoked by calling the similarly named methods on a ZooKeeper object. In essence, these sub-classes of Op represent reified methods or closures that can be frozen at one point in time and then executed later. These sub-classes are instantiated using factory methods on Op class. The names and signatures of these factory methods mimic the corresponding methods on ZooKeeper.
Once you have all of the operations you would like to perform, you can execute them all in a single transaction using ZooKeeper.multi(). This will either execute all of the Ops or none of them.

The Problem

An Example

I have placed an example program for doing a graph-based computation over on github. This program builds a graph consisting of 32 nodes in the form of a 5-dimensional hyper-cube and then uses a numerical technique called over-relaxation to solve for the voltages that would result if all the links in the hyper-cube were replaced by unit resistors. A picture of the graph is just to the right. In this graph, the voltage for node 0x1F is labeled as $V_5$ and the voltage for node 0x0 is labeled as $V_0$. There are lots of connections and solving this problem analytically is difficult unless you twig to the trick of using symmetry. Real-world networks generally dont exhibit such congenial properties and can be nearly impossible to solve analytically.

Normally, of course, we wouldnt actually use Zookeeper for this computation, but the point here isnt so much a realistic computation as a test-bed for the new multi() method and a demonstration of how the closure based style associated with multi-ops makes code easier to read and understand.

The Code

To read the code, start with the class ZGraphTest. This is a JUnit test that drives all the rest of the code. In this test, a graph is constructed (named zg in the code), nodes are connected in the pattern of a hyper-cube which means that nodes are labeled with integers from 0 to 31 and a node $n_1$ is connected to another node $n_2$ if $n_1$ and $n_2$ differ in exactly one bit.

In each iteration of the code, the ZGraph.reweight() method of the graph is called on a randomly selected node. This sets the value at that node to be the average of the values at the neighbors of that node. This computation converges geometrically to the correct value with errors decreasing by a factor of 5-10 with every 1000 iterations. As the actual computation proceeds the error versus the theoretically known correct values is shown every 1000 iterations and you can see the errors go to zero with 6 significant figures at about 7000 iterations.

Internally, a ZGraph keeps a connection to Zookeeper and the name of the root directory for all of the nodes in the graph. Methods like connect(), reweight() and update() all work by defining lazy update or version check operations on Node objects and then executing all of the update or version checks at one time in a transaction. For instance, in the reweight() method, this code gets the weight of all of the neighbors of node g1:

double mean = 0;

Set neighbors = g1.getIn();

List ops = Lists.newArrayList();

for (Integer neighbor : neighbors) {

Node n = getNode(neighbor);

ops.add(n.checkOp(root));

mean += n.getWeight();

}

As a side effect, we also collect a list of version check operations into the list ops. Then in this code we add one more operation to set the weight on g1 and add the update operation to the list ops:

// set up the update to the node itself

g1.setWeight(mean / neighbors.size());

ops.add(g1.updateOp(root));

Finally, multi() is called to actually do all of the version checks and updates that we have collected,

zk.multi(ops);

The essential point here is how the list of operations was collected a bit at a time and then executed all at once. The versions of the nodes that were inputs into the operation were collected in the form of check operations. When those check operations are executed along with the update of g1, we guarantee that g1s new weight will accurately reflect the average of its neighbors and if somebody changes the value of any of the neighbors in between the time that we read the value and the time we actually do the update, we will run the update again.

Similarly, and closer to the idea of maintaining referential integrity, the ZGraph.connect() collects update operations for the two nodes being connected and executes both updates together to avoid the possibility that anyone would see a situation where one node connects to a second but the second doesnt have the reverse connection. This code does the job,

Node g1 = Node.readNode(zk, root, id1);

Node g2 = Node.readNode(zk, root, id2);

g1.connectTo(id2);

g2.connectFrom(id1);

zk.multi(Arrays.asList(g1.updateOp(root), g2.updateOp(root)), results);

Again, the closure passing style makes this operation very easy.

One additional point to be noted is that having each Node return closures for updates that can be executed in any context the caller would like has the effect of removing all direct Zookeeper operations from the Node other than for reading or creating a node. It also removes all references to serialization of a Node from any outside code. This simplifies both the caller and the Node because the Node doesnt have to implement all plausible combinations of multiple updates and the caller doesnt have to worry about any aspect of serialization and can focus on the appropriate task of arranging the transactional semantics for updates.

Credit Where Credit is Due

Of course, this new Zookeeper capability wouldnt be possible if the Apache Zookeeper project team hadnt built Zookeeper in a very flexible way in the first place. Kudos to Ben Reed and Mahadev and the other early committers on the project.

Also, the actual Zookeeper-965 project would have stalled out if Marshall McMullen and Camille Fournier hadnt stepped in with a bit of extra momentum. I had completed the wire protocols and all of the client side work, but Marshall did all of server side work and Camille provided really excellent advice about how the changes should be done.

The final credit goes to MapR Technologies for supporting open source by allowing capabilities like multi to be contributed back.

One problem with exponentially weighted moving averages is that the weight that older samples decays sharply even for very recent samples.
The impulse response of such an average shows this clearly. In the graph to the right, the red line is the impulse response of the exponential weighted average is shown by the red line. The impulse response of a different kind of moving average derived by John Canny in the early 80s is shown by the black line.
The Canny filter puts higher weight on the events in the recent past which makes it preferable when you dont want to forget things right away, but do want to forget them before long and also want an on-line algorithm.
The cool thing about the Canny filter is that it is only twice as much work as a simple exponential moving average. The idea is that if you take the difference between two exponentially weighted averages with different time constants, you can engineer things to give you an impulse response of the sort that you would like.
The formula for such a difference looks like this
\begin{eqnarray*}
w(x) &=& k_1 e^{-x/\alpha} - k_2 e^{-x/\beta}
\end{eqnarray*}
Here $k_1$ and $k_2$ scale the two component filters in magnitude and $\alpha$ and $\beta$ are the time constants for the filters.
It is nice to set the magnitude of the filter at delay $0$ to be exactly 1. We can use this to get a value for $k_2$ in terms of $k_1$
\begin{eqnarray*}
w(0) &=& k_1 - k_2 = 1 \\
k_2 &=& k_1 -1
\end{eqnarray*}
Similarly, we can constrain the slope of the impulse response to be $0$ at delay $0$. This gives us $\beta$ in terms $\alpha$
\begin{eqnarray*}
w(0) &=& {k_1 \over \alpha} - {k_2 \over \beta} = 0\\
{k_1 \over \alpha} &=& {k_1-1 \over \beta} \\
\beta &=& \alpha \frac{ k_1-1} { k_1}
\end{eqnarray*}
The final result is this impulse response
\begin{eqnarray*}
w(x) = k \exp \left(-{x \over \alpha}\right)-(k-1) \exp\left(-{k x\over \alpha (k-1)}\right)
\end{eqnarray*}
We can do a bit better if we note that as $k \to \infty$, the shape of the impulse quickly converges to a constant shape with mean of $w(x) \to \frac{3a}{2}$ and a total volume of $2a$.

In a previous post, I talked about how to produce exponentially weighted averages of time-embedded values. This is fine for cases where you really want averages, but it doesnt work for rates. Happily, the technique I presented for simple averages can be easily extended to work for rates as well.
To recap a bit, an on-line algorithm for computing a simple average works by accumulating the number of observations and the sum of the observations. There is no mention of time. For each observation $(x, t)$ we update our state $(s, w)$ according to
\begin{eqnarray*}
s_{n} &=& s_{n-1} + x \\
w_{n} &=& w_{n-1} + 1
\end{eqnarray*}
The exponentially weighted average approach that I mentioned before uses the time of each observation to discount the previous state based on the time of the previous observation. Thus, the state $(s,w,t)$ is updated according to
\begin{eqnarray*}
\pi &=& e^{-(t-t_{n-1})/\alpha} \\
s_{n} &=& \pi s_{n-1} + x \\
w_{n} &=& \pi w_{n-1} + 1 \\
t_{n} &=& t
\end{eqnarray*}
If we were to compute simple rates without discounting, then instead of interpreting $w$ as a count, we would interpret it as a time interval. Thus we would update state $(s,w,t)$ according to with an observation $(x, t)$ according to
\begin{eqnarray*}
s_{n} &=& s_{n-1} + x \\
w_{n} &=& w_{n-1} + (t - t_n) \\
t_n &=& t
\end{eqnarray*}
By analogy with the discounted version of the simple average, we can produce an exponentially weighted rate average by updating the state according to this
\begin{eqnarray*}
\pi &=& e^{-(t-t_{n-1})/\alpha} \\
s_{n} &=& \pi s_{n-1} + x \\
w_{n} &=& \pi w_{n-1} + (t-t_n) \\
t_{n} &=& t
\end{eqnarray*}
This approach has a defect in that each data pont should really be considered to be a report of events spread out in an uncertain way over the time since the last report. As such, the interval and the events should probably both be discounted as if they had been reported uniformly over the period in question. In practice, this correction does not matter much since the averaging time constant $\alpha$ is typically large compared to the average reporting interval. Another consideration comes up when multiple sources are reporting concurrently. If we want to do proper discounting of each observed number, then we have to keep track of the time since last report for each of the sources. Since this probably wont matter and since it considerably complicates the implementation, I would rather not do it.
See https://issues.apache.org/jira/browse/MAHOUT-634 for code. This will be in Mahout before long.

I will be adding this code to Mahout shortly.
See https://issues.apache.org/jira/browse/MAHOUT-634 for status.
Also, if you are measuring rates, then it is common for rates to be reported from multiple sources independently. Such an average can be computed pretty easily using this same framework if the sources report often relative to the averaging time constant. This simple implementation just attributes each reported count as if they occurred in the interval since the most recent report from any reporter. If the time constant is relatively long, this can work out reasonably well as long as we are careful.
If reporting intervals are longer, then the averaging is a bit trickier because we really would like to attribute the reported counts over the entire interval from the last report from the same source. This means that we have to discount some of the counts because they are effectively kind of old.
More details shortly.

Ted Yu on the hbase list wants to compute an exponential weighted average for the number of queries per second or other statistics that characterize the performance or state of an hbase system.
The trick is that the measurements are only available at irregular intervals. If they were sampled regularly, then the standard mixing trick would work:
\[
m_{n+1} = \mu m_n + (1-\mu) x_{n+1}
\]
where $m$ is our current estimate of the mean, $x_n$ is the $n$-th sample and $\mu$ determines how much history to use.
With unequal sample times, things become a bit more complicated. If we get lots of measurements all at once, we want to give them nearly equal weight but if we have a long gap, we want to weight the very old samples much less.
In fact, we want to weight old samples according to how old they are with exponentially decreasing weight. If we sample values $\left \lbrace x_1 \ldots x_n \right \rbrace$ at times $t_1 \ldots t_n$ then we want the weighted mean defined by
\[
m_n = {\sum_{i=1}^n x_i e^{-(t_n - t_i)/\alpha} \over \sum_{i=1}^n e^{-(t_n - t_i)/\alpha} }
\]
Here $\alpha$ plays the same role as $\mu$ did before, but on a different scale. If the evenly sampled data comes at time intervals $\Delta t$ then $\mu = e^{\Delta t / \alpha}$.
Happily, there is a very simple recurrence relationship that allows us to keep only two intermediate values while computing the value of $m_1 \ldots m_n$ in an entirely on-line fashion as the $x_i$ values arrive.
To see this, define
\begin{eqnarray*}
\pi_n &=& e^{-(t_{n+1}-t_n)/\alpha} \\
w_{n+1} &=&
\sum_{i=1}^{n+1} e^{-(t_{n+1} - t_i)/\alpha} =
1+e^{-(t_{n+1}-t_n)/\alpha} \sum_{i=1}^{n} e^{-(t_{n} - t_i)/\alpha} \\
& =& 1 + \pi w_n\\
s_{n+1} &=&
\sum_{i=1}^{n+1} x_i e^{-(t_{n+1} - t_i)/\alpha} =
x_{n+1}+e^{-(t_{n+1}-t_n)/\alpha} \sum_{i=1}^{n} x_i e^{-(t_{n} - t_i)/\alpha} \\
&=& x_{n+1} + \pi_n s_n
\end{eqnarray*}
Then note that
\[
m_{n+1} = {s_{n+1} \over w_{n+1}}
\]
This leads naturally to a procedure that has state consisting of $t, w, m$ which are updated with using new values of $t_n, x_n$ according to
\begin{eqnarray*}
\pi &=& e^{-(t_{n}-t)/\alpha} \\
w &=& 1 + \pi w \\
s &=& x_n + \pi s \\
m &=& {s \over w} \\
t &=& t_{n}
\end{eqnarray*}
Isnt that a kick!
To do this right, however, we need a test. Here are some data vectors computed for $\alpha=5$:
t x pi w s m
1 11.35718 1.5992071 1.0000000 1.000000 1.5992071 1.5992071
2 21.54637 -1.3577032 0.1303100 1.130310 -1.1493105 -1.0168100
3 28.91061 -0.3405638 0.2292718 1.259148 -0.6040683 -0.4797436
4 33.03586 0.7048632 0.4382129 1.551775 0.4401527 0.2836447
5 39.57767 0.3020558 0.2702621 1.419386 0.4210124 0.2966159
Writing a proper unit test is an exercise left to the reader. (but I will be happy to help)

I gave a lecture last night on recent additions to Mahout.
The slides are here: http://www.slideshare.net/tdunning/sdforum-11042010
I can amplify this post with answers to any questions that anybody puts in the comments.

Go to the Apache Mahout site for more info. Here is the official announcement:

We are pleased to announce release 0.4 of Mahout. Virtually every corner of the project has changed, and significantly, since 0.3. Developers are invited to use and depend on version 0.4 even as yet more change is to be expected before the next release. Highlights include:

- Model refactoring and CLI changes to improve integration and consistency

- New ClusterEvaluator and CDbwClusterEvaluator offer new ways to evaluate clustering effectiveness

- New Spectral Clustering and MinHash Clustering (still experimental)

- New VectorModelClassifier allows any set of clusters to be used for classification

- Map/Reduce job to compute the pairwise similarities of the rows of a matrix using a customizable similarity measure

- Map/Reduce job to compute the item-item-similarities for item-based collaborative filtering

- RecommenderJob has been evolved to a fully distributed item-based recommender

- Distributed Lanczos SVD implementation

- More support for distributed operations on very large matrices

- Easier access to Mahout operations via the command line

- New HMM based sequence classification from GSoC (currently as sequential version only and still experimental)

- Sequential logistic regression training framework

- New SGD classifier

- Experimental new type of NB classifier, and feature reduction options for existing one

- New vector encoding framework for high speed vectorization without a pre-built dictionary

- Additional elements of supervised model evaluation framework

- Promoted several pieces of old Colt framework to tested status (QR decomposition, in particular)

- Can now save random forests and use it to classify new data

- Many, many small fixes, improvements, refactorings and cleanup

Details on whats included can be found in the release notes.

Downloads are available from the Apache Mirrors

The vote has started for the 0.4 Mahout release. Lots of new stuff, but the part that I am excited about is a fairly comprehensive implementation of logistic regression suitable for large scale training and high speed classification, but there is a whole lot more.
With the 0.4 release, Mahout is moving along strongly towards the fabled 1.0 release. At that point, we will start paying lots of attention to backwards compatibility. That will be good, but the current wild and wooly policy is pretty handy if you have something in mind that Mahout really, really needs because we can get new things in pretty readily right now.
See http://mahout.apache.org/ for the release when it arrives and watch my twitter feed for an announcement.

Lance Norkog asks on the Mahout mailing list why adding two uniformly distributed random variables gives a pyramidal distributed value. I would normally answer on the mailing list, but here I can use lovely math notation. As I mentioned on-list, this is a very basic result that is related to the law of large numbers.
If we were to draw a picture of the joint distribution of these variables \(x\) and \(y\), we would get something that is 1 inside the \([0,1] \times [0,1]\) square and 0 outside that region.
For a given value \(\alpha\) of the sum \(x + y\), there is a diagonal line segment where \(x+y=\alpha\) and \(x\) and \(y\) are in the square. Where \(z \le 0\) or \(z \ge 2\) that intersection vanishes and for \(0 \lt z \lt 2\), that intersection varies in length. The probability of the sum having some particular value z is proportional to the length of that intersection. As you can imagine, the intersection varies in size linearly and it reaches a maximum where z = 1.
For the sum of three random variables, the situation is more complex to reason about geometrically because we need to worry about the intersection of a plane and a cube. For more variables, the geometry is not worth the trouble.
If we tackle the problem a bit more rigorously, then the easiest way to approach the problem is to compute the cumulative distribution of various values of sums. That leads to a convolution integral over the density functions involved. Since the densities are all 1, the integration limits are the key to the value and those limits have to broken down into cases. Actually doing these integrals is a pretty rare activity since the limit is approximated so well after just a few iterations.
Just how quickly that convergence happens is can be seen by looking at the empirical distribution of the sum of three uniform deviates. I used something very like this R code to produce the graph below:
HIST(RUNIF(100000)+RUNIF(100000)+RUNIF(100000),
BREAKS=50, PROB=T)
LINES(SEQ(-1,4,BY=0.01), DNORM(SEQ(-1,4,BY=0.01),
1.5, SQRT(1/4)))
In this graph, the red curve is the normal distribution with the same mean and standard deviation. As you can see, the peak is a tiny bit too high and the tails are just a _skoshi_ too long for the normal to be a good description of the samples of the sum.

This is, however, just the sum of three random values. If you sum more values, the convergence to the normal distribution is very strong and by the time you are adding six uniform random values together, the difference between the distributions is no longer visible in a graph like this and can only be detected numerically using lots of data and clever things like a Kolmogorov-Smirnov test.

The moral here is that there isnt much way to avoid this regression to the normal distribution and distorting the data to avoid it is probably pointless.

But if you are like me, being just a little more normal always made it easier to get along.

I spent this last weekend holding a hand that grew cold in the early hours of Sunday morning.
That hand helped me through much of my life. No longer. At least not in the flesh.
Nobody who reads this blog is likely to have known my father and given how little he talked about things he had done, few who knew him would know much of the many things he did. He lived a long life and a full one. Along the way he saw things few will ever see.
In his prime, he was simply extraordinary. He could see and he could hear better than anyone I have ever known. That could be torture, as it was the time when a cat walking in the next room woke him from a deep sleep but it was what let him fly the way he did. And fly he did in planes large and small. He checked out Gary Powers in the U-2, flew P-47s and P-38 in combat and flew with me in small aircraft. We fished and walked and camped across the western US and we lived in many places.
He didnt show off his mental abilities, but there too he could do things few others could match. He passed a graduate reading exam in French without ever studying any romance language. I saw him on several occasions understand spoken German also without having studied the language. He spoke of the shape and rate of physical processes in ways that only somebody with innate ability in math could possibly do.
These faculties declined in age as they must with all of us, but even thus dimmed his candle burned bright.
But it did not last. I saw it sputter and fade. Then, between one instant and the next, it was gone.

Plume is working for toy programs!
You can get the source code at http://github.com/tdunning/Plume
Here is a quick description of how to code up the perennial map-reduce demo program for counting words. The idea is that we have lines of text that we have to tokenize and then count the words. This example is to be found in the class WordCountTest in Plume.
So we start with PCollection lines for input. For each line, we split the line
into words and emit them as a separate record:
PCollection words = lines
.map(new DoFn() {
@Override
public void process(String x, EmitFn emitter) {
for (String word : onNonWordChar.split(x)) {
emitter.emit(word);
}
}
}, collectionOf(strings()));
Then we emit each word as a key for a PTable with a count of 1. This is just the same as most word-count implementations except that we have separated the tokenization from the emission of the original counts. We could have put them together into a single map operation, but the optimizer will do that for us (when it exists) so keeping the functions modular is probably better.
PTable wc = words
.map(new DoFn>() {
@Override
public void process(String x,
EmitFn
> emitter) {
emitter.emit(Pair.create(x, 1));
}
}, tableOf(strings(), integers()))
Then we group by word
.groupByKey()
And do the counting. Note how we dont have to worry about the details of using a combiner or a reducer.
.combine(new CombinerFn() {
@Override
public Integer combine(Iterable counts) {
int sum = 0;
for (Integer k : counts) {
sum += k;
}
return sum;
}
});

In all, it takes 27 lines to implement a slightly more general word-count than the one in the Hadoop tutorial. If we were compare apples to apples, this could would probably be a few lines shorter. The original word-count demo was 210 lines to do the same thing.

A few years ago, I built a prototype system I called grool in Groovy to simplify map-reduce programming. My goal was to use Groovy to handle control flow and define operations but to execute large programs using Hadoop.
Grool foundered on the difficulty in transporting closures over the network. I used a clever trick to avoid the problem, but it depended on the ability to execute a script multiple times with different meanings each time. That is a difficult concept to convey to users and the result was that grool really didnt work that well for ordinary folks to use.
A bit later, the guys at Google developed FlumeJava which has many of the same goals and many of the same benefits as grool intended to provide. In Java, however, transporting functional objects it paradoxically much simpler than in Groovy. The difference is entirely because Java is statically compiled and thus state-less functional classes can be referred to by name in different JVMs with access to the same jar.
Flume also provide an optimizer which is made possible because Flume uses lazy evaluation. This makes FlumeJava programs nearly as efficient as well-optimized hand-written programs. Other systems like Pig and Cascading are able to re-write their logical plan, but Pig especially has problems because it has no real access to a Turing complete language.
In addition, Flume has some interesting choices in terms of API.
All in all, Flume-like systems are definitely worth playing with. In order to make that easier, I just implemented an eager, sequential version of an approximate clone of FlumeJava that I call Plume. The name is a presumptuous one, anticipating that if all goes well, we would be able to build a community and bring Plume into Apache where it would be Apache Plume. There it would provide some redress for the clear fauna bias in software names.
Check it out at http://wiki.github.com/tdunning/Plume/

At the Hadoop User Group meeting last evening it was quite the Mahout love fest. First the Yahoo team described their spam fighting efforts which apparently are partially anchored by frequent item set mining using Mahout (thanks to Robin Anils hard work as a GSoC student). Then later Ken Krugler demonstrated some of what you can do with web-mining and Mahout.
These are just two of a growing number of real-world uses of Mahout which is really beginning to grow up. Related to that growing up, the Apache board just decided to make Mahout a top level project. That will take a little while to make real, but the first steps are already happening.
You can find out more about how Mahout is being used by perusing the Mahout wiki.
Slide decks available here for the HUG presentations.