日期:2014-05-16  浏览次数:20603 次

在MongoDB里实现循环序列功能

环境是这样的:服务器是用Java做的, 数据库是MongoDB

?

需求是这样的:我们的系统里要生成一个唯一ID,前面的部分有一定的格式,并和时间关联, 精确到微秒,考虑到同一微秒内有可能存在并发情况, 所以后面在加两位序列号, 系统需要定义为1毫秒内的并发小于100个,所以后面两位就够用了。 Java服务器端有多台机器都可以用来生成这个唯一ID,所以需要在不同的机器上不能生成相同的序列号,所以需要在某一点上做全局的范围同步来保存这序列号的唯一性。 其实如果不考虑需求里的唯一ID是有一定意义的格式的, 用UUID或MongoDB的ObjectId都是更好的选择,完全不需要在某一点上进行同步,性能会更好。

?

这个可以生成序列号的点, 我们可以做一个序列号生成服务器来对应, 也可以用数据库来对应。 单单为这个简单的功能准备一个服务器来做显然不合适。 但是我们用的MongoDB并没有类似于MySQL或Oracle中的SELECT FOR UPDATE这样的锁机制。 所以没有办法简单的对这个序列号做原子操作。 但是MongoDB的对单个document进行update操作中有很是具有原子性的, 例如

  • $set
  • $unset
  • $inc
  • $push
  • $pushAll
  • $pull
  • $pullAll

我们可以利用这些原子操作,在数据库层以乐观锁的形式来实现循环序列字段。为了方便调用我把这段逻辑做成数据库中的Javascript函数。 类似与MySQL中的存储过程。

?

首先我们需要一个collection来存放序列号,并对需要的需要的序列号进行初始化。我们叫它counters。

db.counters.save({_id:"SerialNo1", val:0, maxval:99})

?

然后我们想system.js里添加一个Javascript函数

db.system.js.save({_id:"getNextUniqueSeq",
value:function (keyName) {
    var seqObj = db.counters.findOne({_id:keyName});
    if (seqObj == null) {
		print("can not find record with key: " + keyName);
        return -1;
    }
	
    // the max value of sequence
    var maxVal = seqObj.maxval;
    // the current value of sequence
    var curVal = seqObj.val;
	
	while(true){
		// if curVal reach max, reset it
		if(curVal >= maxVal){
			db.counters.update({_id : keyName, val : curVal}, { $set : { val : 0 }}, false, false);
			var err = db.getLastErrorObj();
			if( err && err.code ) {
				print( "unexpected error reset data: " + tojson( err ) );
                return -2;
	        } else if (err.n == 0){
				// fail to reset value, may be reseted by others
				print("fail to reset value: ");
			} 

			// get current value again.
			seqObj = db.counters.findOne({_id:keyName});
			maxVal = seqObj.maxval;
			curVal = seqObj.val;
			continue;
		} 
		
		// if curVal not reach the max, inc it;
		// increase 
		db.counters.update({_id : keyName, val : curVal}, { $inc : { val : 1 }}, false, false);
		var err = db.getLastErrorObj();
		if( err && err.code ) {
			print( "unexpected error inc val: " + tojson( err ) );
               return -3;
        } else if (err.n == 0){
			// fail to reset value, may be increased by others
			print("fail to inc value: ");
			
			// get current value again.
			seqObj = db.counters.findOne({_id:keyName});
			maxVal = seqObj.maxval;
			curVal = seqObj.val;
			continue;
		} else {
			var retVal = curVal + 1;
			print("success to get seq : " + retVal);
			// increase successful
			return retVal;
		}
	}
}
});

上面这段会把指定的序列号的val值+1,如果val达到上限则从0开始。所以叫循环序列。

?

其实上面的实现在原理上和Java里的AtomicInteger系列的功能实现是类似的,利用循环重试和原子性的CAS来实现。这种实现方式在多线程的环境里由于锁(Monitor)的范围很小,所以并发性上比排他锁要好一些。

?

下面我们用Java来测试一下这个函数的正确性。 即在多线程的情况下会不会得到重复的序列号。

?

第一个测试,val=0, maxval=2000, Java端20个线程每个线程循环调用100次。 共2000次。 所以正确的情况下,从0到1999应该每个数字只出现一次。

?

    @Test
    public void testGetNextUniqueSeq1() throws Exception {

        final int THREAD_COUNT = 20;
        final int LOOP_COUNT = 100;

        Mongo mongoClient = new Mongo("172.17.2.100", 27017);
        DB db = mongoClient.getDB("im");
        db.authenticate("imadmin", "imadmin".toCharArray());
        BasicDBObject q = new BasicDBObject();
        q.put("_id", "UNIQUE_KEY");

        BasicDBObject upd = new BasicDBObject();
        BasicDBObject set = new BasicDBObject();
        set.put("val", 0);
        set.put("maxval", THREAD_COUNT * LOOP_COUNT);
        upd.put("$set", set);

        db.getCollection("counters").update(q, upd);

        Thread[] threads = new Thread[THREAD_COUNT];
        final int[][] results = new int[THREAD_COUNT][LOOP_COUNT];
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int temp_i = i