日期:2014-05-20  浏览次数:21041 次

在线等待 java 多线程 循环不重复处理接收到的udp数据
现在在做一个应用
流程如下:
一个程序向本机的3306用udp传输数据→我的程序监听本机的3306端口→解析3306端口的数据(每一条解析出来都需要调用存储过程,这个存储过程1300多行代码 执行一次花费的时间0.3秒左右)

问题:别人的程序向这个3306端口传输数据的量是 50--100条/秒

我用单线程的时候 每秒钟只能处理有限的数据 多余的数据就缓存了起来 每秒钟累计起来 到一定时间就会累计非常多 导致这些数据等待的时间超过了6分钟就自动丢弃了 也就是丢包了

于是我采用了多线程监听3306这个端口的数据 问题出在 当一条数据进入3306端口的时候 我的线程会出现有两个同时去处理这条数据的情况 时有时无 不知道怎么让这些线程处理的数据不要重复呢? 各位大大们 小弟很无奈啊..

单线程的时候不会出现重复的数据 也就是一条数据处理两次 多线程的时候会出现一条数据多个线程同时处理

不知道是不是小弟的多线程写法是不是有问题...
求教..

下面帖出 单线程和多线程的代码

单线程的代码如下(我的单线程和多线程就是注释那里相互切换)
Java code

package com.maphao.aisvoyage.ede.xsocket.server;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.SocketException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


import com.maphao.aisvoyage.ede.ServerFactory;
import com.maphao.aisvoyage.ede.XServer;
import com.maphao.aisvoyage.ede.xsocket.thread.ThreadPool;

public class UDPServer implements XServer, Runnable {

    private static final Log logger = LogFactory.getLog(UDPServer.class);

    private static int port = 3344;

    private static int size = 2048;


    private boolean isStart = true;
    
    private static Thread runner = null;

    private static UDPServer server = null;

    private static DatagramSocket datagramSocket = null;

    private static DatagramPacket datagramPacket = null;
    
//    private static Thread[] threads = new Thread[10]; //10个线程

    private UDPServer() {

    }

    public static UDPServer getServer() {
        if (server == null) {
            server = new UDPServer();
//            for (int i = 0; i < threads.length; i++) {
//                threads[i] = new Thread(server);
//            }    
            runner = new Thread(server);
            byte[] buffer = new byte[size];
            try {
                datagramSocket = new DatagramSocket(ServerFactory.udpPort);
            } catch (SocketException e) {
                e.printStackTrace();
            }
            datagramPacket = new DatagramPacket(buffer, buffer.length);
        }
        return server;
    }

    @Override
    public void run() {
        try {
            logger.debug("UDP server start on "
                    + datagramSocket.getLocalAddress().getHostAddress() + ":"
                    + datagramSocket.getPort());
            while (isStart) {
                datagramSocket.receive(datagramPacket);
                UDPServerHandler handler = new UDPServerHandler();
                handler.onData(datagramPacket);
            }
        } catch (SocketException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    @Override
    public void startServer() {
        isStart = true;
        runner.start();
        // 创建线程,并启动发送
//        for (Thread t : threads) {
//            t = new Thread(server);
//            t.start();
//        }
    }

    @Override
    public void stopServer() {
        isStart = false;
        runner.interrupt();
//        for (Thread t : threads) {
//            t = new Thread(server);
//            t.interrupt();
//        }
    }

    @Override
    public void restart() {

    }

    public static void main(String[] args) {
        UDPServer.getServer().startServer();
    }

}





多线程的代码如下
Java code

package com.maphao.aisvoyage.ede.xsocket.server;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.on中






在线等待...先谢谢关注的大大们..

------解决方案--------------------
改成同步块试试 
synchronized(datagramPacket){