Oren Eini

CEO of RavenDB

a NoSQL Open Source Document Database

Get in touch with me:

oren@ravendb.net +972 52-548-6969

Posts: 7,565
|
Comments: 51,184
Privacy Policy · Terms
filter by tags archive
time to read 1 min | 179 words

I just love git pull requests, but the new behavior from GitHub is beyond moronic. Take a look at a typical pull request:

image

The problem is that clicking on this button would actually merge the changes to the public repository. I don’t know about you, but there are very few cases where this is what I want to do.

In 99.9999% of the cases, I want to merge this locally to see what the bloody changes are, run some tests, maybe modify the changes before I am taking them.  In this case, this particular pull request contains a failing test. I never want to commit that to the public repo automatically.

What is worse is that I now need to manually construct the pull command in the command line, whereas GitHub previously offered the option to generate that for me, which I liked much more.

time to read 4 min | 738 words

Originally posted at 4/29/2011

Yesterday I introduced a very minimal actor “framework”, and I noted that while it was very simple, it wasn’t a very good one. The major problems in that implementation are:

  • No considerations for errors
  • No considerations for async operations

The first one seems obvious, but what about the second one, how can we not consider async operations in an actor framework?

Well, the answer to that is quite simple, our actor framework assumed that we were always going to execute synchronously. That isn’t going to work if there is a need to do things like async IO.

As it happened, that is precisely what I had in mind for this code, so I wrote this:

public class Actor<TState>
{
    public TState State { get; set; }

    private readonly ConcurrentQueue<Func<TState, Task>> actions = new ConcurrentQueue<Func<TState, Task>>();
    private Task activeTask;

    public void Act(Func<TState, Task> action)
    {
        actions.Enqueue(action);

        lock(this)
        {
            if (activeTask != null) 
                return;
            activeTask = Task.Factory.StartNew(ExecuteActions);
        }
    }

    public event EventHandler<UnhandledExceptionEventArgs> OnError;

    private void InvokeOnError(UnhandledExceptionEventArgs e)
    {
        var handler = OnError;
        if (handler == null) 
            throw new InvalidOperationException("An error was raised for an actor with no error handling capabilities");
        handler(this, e);
    }

    private void ExecuteActions()
    {
        Func<TState, Task> func;
        if (actions.TryDequeue(out func))
        {
            func(State)
                .ContinueWith(x =>
                {
                    if (x.Exception != null)
                    {
                        InvokeOnError(new UnhandledExceptionEventArgs(x.Exception, false));
                        return;
                    }
                    ExecuteActions();
                });
            return;
        }
        lock(this)
        {
            activeTask = null;
        }
    }
}

Thoughts?d

time to read 3 min | 506 words

Originally posted at 4/29/2011

For one of our projects, we need the ability to asynchronously push changes through a socket, since we actually care about the order of actions, we realized that we couldn’t really use purely async IO. For example, consider the following actions:

connection.Send(“abc”);
connection.Send(“def”);

I care that abc will be sent before def, and I care that all of abc will be sent before anything else is sent through that connection. What I don’t care about is whatever I have anything else sent between abc and def.

All of that can be had using:

public class Actor<TState>
{
    public TState State { get; set; }

    private readonly ConcurrentQueue<Action<TState>> actions = new ConcurrentQueue<Action<TState>>();
    private Task activeTask;

    public void Act(Action<TState> action)
    {
        actions.Enqueue(action);

        if (activeTask != null) 
            return;

        lock(this)
        {
            if (activeTask != null) 
                return;
            activeTask = Task.Factory.StartNew(ExecuteActions);
        }
    }

    private void ExecuteActions()
    {
        Action<TState> action;
        while (actions.TryDequeue(out action))
        {
            action(State);
        }
        lock(this)
        {
            activeTask = null;
        }
    }
}

The actions will execute synchronously for each actor, and it satisfy my requirement for how to deal with this quite nicely, even if I say so myself Smile

In truth, the code above isn’t really good. Can you consider ways to improve this?

time to read 4 min | 725 words

Originally posted at 4/19/2011

RavenDB Auto Sharding is an implementation of sharding on the server. As the name implies, it aims to remove all sharding concerns from the user. At its core, the basic idea is simple. You have a RavenDB node with the sharding bundle installed. You just work with it normally.

At some point you realize that the data has grown too large for a single server, so you need to shard the data across multiple servers. You bring up another RavenDB server with the sharding bundle installed. You wait for the data to re-shard (during which time you can still read / write to the servers). You are done.

At least, that is the goal. In practice, there is one step that you would have to do, you would have to tell us how to shard your data. You do that by defining a sharding document, which looks like this:

{ // Raven/Sharding/ByUserName
  "Limits": [3],
  "Replica": 2
  "Definitions": [
    {
      "EntityName": "Users",
      "Paths": ["Username"]
    },
    {
      "EntityName": "Posts",
      "Paths": ["AuthorName"]
    }
  ]
}

There are several things to not here. We define a sharding document that shards on just one key, and the shard key has a length of 3. We also define different ways to retrieve the sharding key from the documents based on the entity name. This is important, since you want to be able to say that posts by the same user would sit on the same shard.

Based on the shard keys, we generate the sharding metadata:

{ "Id": "chunks/1", "Shards": ["http://shard1:8080", "http://shard1-backup:8080"], "Name": "ByUserName", "Range": ["aaa", "ddd"] }
{ "Id": "chunks/2", "Shards": ["http://shard1:8080", "http://shard2-backup:8080"], "Name": "ByUserName", "Range": ["ddd", "ggg"] }
{ "Id": "chunks/3", "Shards": ["http://shard2:8080", "http://shard3-backup:8080"], "Name": "ByUserName", "Range": ["ggg", "lll"] }
{ "Id": "chunks/4", "Shards": ["http://shard2:8080", "http://shard1-backup:8080"], "Name": "ByUserName", "Range": ["lll", "ppp"] }
{ "Id": "chunks/5", "Shards": ["http://shard3:8080", "http://shard2-backup:8080"], "Name": "ByUserName", "Range": ["ppp", "zzz"] }
{ "Id": "chunks/6", "Shards": ["http://shard3:8080", "http://shard3-backup:8080"], "Name": "ByUserName", "Range": ["000", "999"] }

This information gives us a way to make queries which are both directed (against a specific node, assuming we include the shard key in the query) or global (against all shards).

Note that we split the data into chunks, each chunk is going to be sitting in two different servers (because of the Replica setting above). We can determine which shard holds which chunk by using the Range data.

Once  a chunk grows too large (25,000 documents, by default), it will split, potentially moving to another server / servers.

Thoughts?

RavenFS & Rsync

time to read 4 min | 775 words

One of the things that I considered when starting to design RavenFS is “Do we really need this? Can’t we just use rsync?”

There were several reasons why I decided to go ahead with RavenFS. The first was that rsync isn’t really used on the Windows platform, for various reasons that I won’t get into.

The second is that rsync is meant to be a general purpose file synchronization tool. RavenFS is meant to be more than that.

More specifically, RavenFS is aimed specifically at distribution and synchronization of large files (tens of MB are considered small, hundreds of MB are common and multiple GBs are frequently used). It turns out that very large files mean a whole different kettle of fish. I am going to reference the thesis project of Andrew Tridgell quite frequently for the rest of this post (and you can find the link at the bottom of this post). Andrew is the author of rsync, so he probably knows a thing or two about synchronization problems.

In particular, he had thought about and discarded my approach for synchronizing files in his thesis:

This algorithm is very simple and meets some of the aims of our remote update algorithm, but it is useless in practice
The problem with it is that A can only find matches that are on block boundaries. If the file on A is the same as B except that one byte has been inserted at the start of the file then no block matches will be found and the algorithm will transfer the whole file.

This is absolutely correct. My approach, by design, suffer from the “inserting a single byte at beginning of file”. Why do I take such a stupid approach?

Put simply, because anything else is too expensive. The rsync approach is to compute a hash at byte boundaries, it gets away with that with having multiple hash functions, one of which is very cheap to compute (with higher number of collisions possible) and another that is more expensive to compute but has far lower probability of collisions.

Now, go up and look at the description of RavenFS. It is meant for very large files. Are you really going to perform an operation on every single byte on the file when the file is 2 GB in size?

This is where a very interesting property of file systems come into place. Let us assume that you have the following file, shown as a sequence of bytes:

[1,2,3,4]

And let us say that we want to add the byte 0 at the beginning of the file. For now, we will assume buffer size of 1, we will have to issue the following commands to the file system to do so:

  • Write 0 to position 1
  • Write 1 to position 2
  • Write 2 to position 3
  • Write 3 to position 4
  • Write 4 to position 5 <—increase the file size

In other words, inserting a value (vs modifying a value) is an O( File.Length – File.Position). In other words, the closer to the beginning of the file you are, the more expensive it is to insert a new value.

Ponder that while considering the cost of doing something like that on files that are big. Let us assume a more reasonable file size of 4,096 (the .NET default), and we realize that inserting a value into the beginning of a 500 MB file would require 128 thousand file system operations.

In practice, this isn’t a real issue because most of the time, when we are talking about such large files, we are talking about either files that are using fixed sized records ( no inserting whatsoever ) or appending data to the end of the file. That is mostly because there really isn’t any other choice for them, in order to preserve reasonable performance.

RavenFS is meant to take advantage on this aspect to make the “useless in practice” option a viable option.

On a separate issue, this quote made me laugh:

The lack of such a routine was quite a surprise and prompted me to look into the whole issue of parallel sorting, totally ignoring the fact that I only wanted a parallel sorting routine in order to solve a problem that didn’t really require sorting at all.

~ Andrew Tridgell http://samba.org/~tridge/phd_thesis.pdf

time to read 6 min | 1121 words

What is Raven FS? Raven FS is a distributed file system designed to handle large file replication across WAN networks reliably.

What does it actually means? The scenario that we have is actually quite simple. Given that we have a file in location A and we need to have that file in location B (geo distributed) how do we move the file across the WAN? Let me make the problem slightly more interesting:

  • The file is large, we are talking about hundreds of megabytes at the low range and tens of gigabytes at the high end.
  • The two locations might be connected over WAN.
  • The connection is assumed to be flakey.

Let us consider the a few scenarios where this can be useful:

  • I have a set of videos that I would like to be edited in some fashion (say, putting Bang! and Wham! callouts in some places). Since I have zero ability in editing videos, I hire a firm in India to do that for me. The problem is that each video file is large, and just sending the files to India and back is a challenge. (Large file distributed collaboration problem)
  • I have a set of webservers where users can upload images. We need to send those images to background servers for processing, and then they need to be made available to the web servers again. The image sizes are too large to be sent over traditional queuing technologies. (Udi Dahan calls the problem the Data Bus).
  • I have a set of geo-distributed locations where I have a common set of files (think about something like scene information for rendering a game) that needs to be kept in sync. (Distributed file replication).

I have run into each of those problems (and others that fall into similar categories) several times in recent months. Enough to convince me that:

  • There is a need here that people would be willing to pay for.
  • It is something that we can provide a solution for.
  • There is a host of other considerations related to those set of problems that we can also provide a solution for. A simple example might be simple backup procedures.

The actual implementation will probably vary, but this is the initial design for the problem.

A RavenFS node is going to be running as an HTTP Web Server. That removes a lot of complexity from our life, since we can utilize a lot of pre-existing protocols and behaviors. HTTP already supports the notion of partial downloads / parallel uploads, (Range, If-Range, Content-Range), so we can re-use a lot of that.

From an external implementation perspective, RavenFS node exposes the following endpoints:

  • GET /static/path/to/file <- get the file contents, optionally just a range
  • PUT /static/path/to/file <- put file contents, optionally just a range
  • DELETE /static/path/to/file  <- delete the file
  • GET /metadata/path/to/file <- get the metadata about a file
  • GET /browse/path/to/directory <- browse the content of a directory
  • GET /stats <- number of files, current replication efforts, statistics on replication, etc

A file in RavenFS consists of:

  • The file name
  • The file length
  • A sequence of bytes that makes up the file contents
  • A set of key/value properties that contains file metadata

Internally, files are stored in a transactional store. Each file is composed of pages, each page is a maximum of 4 MB in size and is identified by its signature. (Actually, a pair of hash signatures, probably SHA256 & RIPEMD160, to avoid any potential for collision). The file contents are actually the list of pages that it is composed of.

The notion of pages is pretty important for several reasons:

  • It provides us with a standard way to identify pieces of the files.
  • Each page may be part of multiple files.
  • Pages are immutable, once they are written to storage, they cannot be modified (but they can be removed if no file is referencing this page).
  • It makes it easier to chunk data to send while replicating.
  • It drastically reduces the size taken by files that share much of the same information.

Let us try to analyze this further. Let us say that we have a 750MB video, we put this video file inside RavenFS. Internally, that file is chunked into 188 pages, each of them about 4 MB in size. Since we have setup replication to the RavenFS node in India, we start replicating each of those pages as soon as we are done saving it to the local RavenFS node. In other words, even while we are uploading the file to the local RavenFS node, it is being replicated to the remote RavenFS nodes, saving us the need to wait until the full file is loaded for replication to begin. Once the entire file has been replicated to the remote node, the team in India can start editing that file.

They make changes in three different places, then save the file again to RavenFS. In total, they have modified 24 MB, and in total modified 30 pages. That means that for the purpose of replicating back to the local RavenFS node, we need to send only 120 MB, instead of 750 MB.

This reduces both time and bandwidth required to handle replication. The same will happen, by the way, if we have a set of common files that have some common parts, we will not store the information twice. For that matter, the RavenFS client will be able to ask the RavenFS node about pages that are already stored, and so won’t need to even bother uploading pages that are already on the server.

Another important factor in the decision to use pages is that when replicating across unreliable medium, sending large files around in a single chunk is a bad idea, because it is pretty common for the connection to drop, and if you need a prefect connection for the duration of the transfer of a 1.5 GB file, you are going to be in a pretty bad place very soon.

Thoughts?

FUTURE POSTS

No future posts left, oh my!

RECENT SERIES

  1. Production Postmortem (52):
    07 Apr 2025 - The race condition in the interlock
  2. RavenDB (13):
    02 Apr 2025 - .NET Aspire integration
  3. RavenDB 7.1 (6):
    18 Mar 2025 - One IO Ring to rule them all
  4. RavenDB 7.0 Released (4):
    07 Mar 2025 - Moving to NLog
  5. Challenge (77):
    03 Feb 2025 - Giving file system developer ulcer
View all series

RECENT COMMENTS

Syndication

Main feed Feed Stats
Comments feed   Comments Feed Stats
}