Sunday, March 24, 2013

Delimiter less network protocol with JSON

Recently, an "opportunity" came up to optimize a certain network application.  This service allows users to send queries in JSON format and receive a list of results in reply also as JSON, no results will just yield a [].  This worked fine for a while until the amount of queries grew, and reports started coming in that it is taking a really long time (over an hour to get work done).  I first optimized some DB queries such that not all searches will be done upfront, however if an interesting match is found and additional search can be sent, banking on the fact that most queries will be misses.  This has considerably seeded up processing since all it does is a lookup from a single table on indexed data and no joins, question is can this be made even faster.

Current implementation is an nginx web server sitting on top of uwsgi, sitting on top of django, sitting on top of mysql database.  I wondered if there maybe any way I can optimize this stack.  The obvious choice is perhaps get rid of django, however with uwsgi spawning ten workers and each worker being good for five thousand connections, forking processes per request is probably not the overhead I am looking for, also it does not appear that uwsgi even bothers to reread my python code from file on every request, so again not an issue.  I am left with HTTP and mySQL over a network connection.  The obvious thing to do is perhaps move mySQL closer to the service, cache results until next DB reload or maybe even get rid of mySQL and do something very simple.  The problem is there are a few other components of this application that rely on mySQL + django.  So the next thought was, even if this accomplishes little wouldn't it be fun to take the webserver and HTTP out of the equation and implement something a little bit more light weight?

About seven years ago a buddy of mine and myself attempted to write a chat application in JAVA.  The client and server were both in JAVA and the protocol used a typical TLV scheme so that decoding such a beast was very simple.  Message boundaries or the lack of thereof were very clearly demarcated and taking apart messages was simple.  In this case I was hoping to not use any delimiters aside from the JSON itself which already provides enough structure.  In addition I wanted to give the user an option to keep a connection open and send multiple JSON messages and get multiple replies.  This is slightly more complicated then an HTTP POST since you are not guaranteed that all the data will arrive in the same packet or the entire message will be reassembled by a higher level protocol.  Since I also wanted to have an option to keep a connection open and send multiple requests, that  meant that the ending of one JSON structure and begging of the next one may arrive in one packet further complicating the handling.

Seems like a recursive descent parser will not work here, so I decided to write my own streaming JSON parser (think SAX vs DOM) so that I can scan input character by character and know when a full message is received.  Luckily as mentioned before JSON is self delimiting so I did not have to work too hard and also my parser does not have to be very smart, it just needs to know when to hand a resultant string to python's json.loads.

So I came up with the following constraints:
  • messages can either start with [ or {
  • messages cannot start with a scalar value such as an alphanumeric character
  • messages cannot start with ] or }
  • It should parse escaped characters, but I can worry about that later
In addition a simple design:
  • incoming data is scanned character by character and written to a String Buffer (StringIO in python).  This is done because strings are immutable and creating new strings through string concatenation every time is costly
  • a stack will be kept onto which characters [, ", { will be placed, when a closing ], ", } and is indeed the closing character for the last character on the stack, we pop.
  • We need to have a control flag to know when we are in a string "string" because we should not treat [{]} as anything else but parts of a string. 
  • If a parsing error is encountered the existing accumulator is dumped along with the stack and parsing proceeds from where it left off
I picked twisted python as my framework because I always wanted to play with it more and I have yet to write a project from scratch using it.   Supposedly event loops are much faster then having multiple workers and also I just wanted to do something new.


Packet processor:
The fist thing I wrote was the parser which I called JSONAccumulator.  Overall it is not very interesting except for how the data is returned back to the caller.  At first  I wrote it to return results, but then realized that I should not return from my data processor until the entire data packet is processed.  Otherwise this causes the processor to drop remaining data on the floor if it returns to signal completion or error.

Generator to the rescue:
Luckily python has a great feature that allows to hand over control to the caller along with some data.  Ans so I rewrote the processor as a generator.

So now when the processor wants to tell the caller something, it simply yields a result.  The result is now yielded when either an error occurs, JSON assembly is complete or the data packet has been exhausted.

Here is the full code of the packet processor:
1:    def process_data(self, data):  
2:      for char in data:  
3:        self.char_buffer.write(char)  
4:        self.data_ptr += 1  
5:    
6:        # JSON structures should only start with [ or {  
7:        if len(self.ctrl_stack) == 0 and char != '[' and char != '{':  
8:          self.status.current_status = self.status.ERROR  
9:          self.status.error_str = "Must start with { or ["  
10:          self.reinit()  
11:          # return here do not place more char in the buffer  
12:          yield {'STATUS': self.status.current_status, 'ERROR_STR': self.status.error_str}  
13:            
14:        elif char == '"':  
15:          if self.in_string == True and self.ctrl_stack[-1] == '"':  
16:            # we've matched a string  
17:            self.ctrl_stack.pop()  
18:            self.in_string = False  
19:          else:  
20:            # We're in a string  
21:            self.ctrl_stack.append(char)  
22:            self.in_string = True  
23:              
24:        elif self.in_string == False and char == '[' or char == '{':  
25:          self.ctrl_stack.append(char)  
26:        elif self.in_string == False and char == ']' or char == '}':  
27:          # do somethign when control_stack is 0  
28:          if len(self.ctrl_stack) and self.ctrl_stack[-1] == self.closing_char[char]:  
29:            self.ctrl_stack.pop()  
30:          else:  
31:            # some sort of parsing error  
32:            self.status.current_status = self.status.ERROR  
33:            self.status.error_str = "Unmatched closing control '{char}'".format(char=char)  
34:            self.reinit()  
35:            # return here do not place more char in the buffer  
36:            yield {'STATUS': self.status.current_status, 'ERROR_STR': self.status.error_str}  
37:        if len(self.ctrl_stack) == 0:  
38:          # we've assmebled a json structure  
39:          self.status.current_status = self.status.COMPLETE  
40:          self.status.error_str = ""  
41:          assembled_json = self.char_buffer.getvalue()  
42:          self.reinit()  
43:          yield {'STATUS': self.status.current_status,  
44:              'ERROR_STR': self.status.error_str,  
45:              'JSON': assembled_json}  
46:    
47:      self.status.current_status = self.status.MORE_DATA  
48:      self.status.error_str = ""  
49:      yield {'STATUS': self.status.current_status,  
50:          'ERROR_STR': self.status.error_str}  
51:      return  

This is rather straightforward, and only a few things warrant attention here, the reinit() function , is called to reset state, hence it will be called in JSONAccumulator's constructor, when JSON assembly is complete and when an error is encountered.

Reinit code:
1:    def reinit(self):  
2:      self.char_buffer = StringIO.StringIO()  
3:      self.ctrl_stack = []  
4:      self.in_string = False  
5:      self.data_ptr = -1  

 A few more things the ctrl_stack is the stack where opening characters are placed, the char_buffer is the string buffer where characters are written and then extracted form when the JSON assembly is completed.

There are  a few lines worth mentioning:
Line 7 is a check to make sure that a structure opens with { or [
Line 14 handles " since the closing and opening characters for a string are the same
Line 24 handles opening characters and pushes them onto the stack
Line 26 checks if a closing character matches an opening character and pops the matching opening character off the stack
Line 37 JSON assembly completeness check

The twisted part:
This being a quite simple protocol it turned out that there was not a whole lot to do as far as twisted is concerned

1:  class JSONProtocol(protocol.Protocol):  
2:    def __init__(self):  
3:      self.json_accumulator = JSONAccumulator()  
4:        
5:    def dataReceived(self, data):  
6:      char_processor = self.json_accumulator.process_data(data)  
7:      try:  
8:        for yvalue in char_processor:  
9:          if yvalue['STATUS'] == JSONAccumulator.status.ERROR:  
10:            self.transport.write(str(yvalue) + "\n")  
11:          if yvalue['STATUS'] == JSONAccumulator.status.COMPLETE:  
12:            self.transport.write(str(yvalue) + "\n")  
13:      except StopIteration:  
14:        pass  
15:    
16:  class JSONFactory(protocol.ServerFactory):  
17:    protocol = JSONProtocol  
18:    
19:    
20:  def main():  
21:    reactor.listenTCP(8000, JSONFactory())  
22:    reactor.run()  
23:    
24:    
25:  if __name__ == '__main__':  
26:    main()  
27:    

Sample output:

levb@levb-desktop:~/DEV$ echo -n '["a", "b", "c"]{"a":1}' | nc -q 2 localhost 8000
{'STATUS': 'COMPLETE', 'JSON': '["a", "b", "c"]', 'ERROR_STR': ''}
{'STATUS': 'COMPLETE', 'JSON': '{"a":1}', 'ERROR_STR': ''}


levb@levb-desktop:~/DEV$ echo -n '"c"]{"a":1}' | nc -q 2 localhost 8000
{'STATUS': 'ERROR', 'ERROR_STR': 'Must start with { or ['}
{'STATUS': 'COMPLETE', 'JSON': '', 'ERROR_STR': ''}
{'STATUS': 'ERROR', 'ERROR_STR': 'Must start with { or ['}
{'STATUS': 'COMPLETE', 'JSON': '', 'ERROR_STR': ''}
{'STATUS': 'ERROR', 'ERROR_STR': 'Must start with { or ['}
{'STATUS': 'COMPLETE', 'JSON': '', 'ERROR_STR': ''}
{'STATUS': 'ERROR', 'ERROR_STR': 'Must start with { or ['}
{'STATUS': 'COMPLETE', 'JSON': '', 'ERROR_STR': ''}
{'STATUS': 'COMPLETE', 'JSON': '{"a":1}', 'ERROR_STR': ''}


What else?
Right now there is no actual work being done, I still have to teach this code to query a database, but even though I got distracted form my main purpose, this turned out to be an interesting project.  I am sure at some point it might also be fun to add some sort of authentication, and privacy on top of this channel perhaps TLS or GSSAPI.  Also I am sure this code can be cleaned up and shortened a bit.

Code:
Full code is available at https://github.com/pu239ppy/json-stream-parser