linxpsoft / delphi-iocp-framework

Automatically exported from code.google.com/p/delphi-iocp-framework
0 stars 0 forks source link

侦听超过62个端口 #8

Open GoogleCodeExporter opened 8 years ago

GoogleCodeExporter commented 8 years ago
soulawing:
  很高兴看到你增加了一个线程侦听多个端口功能,但限制了62个端口,
这是我的一个程序,目的是能过侦听多个〉62端口,希望能合
并这个功能到你的版本中:

  TIocpMultiAcceptEventType = (etAdd, etDel, etShutdown);

  TIocpMultiAcceptThread = class(TIocpAcceptThread)
  private
    FOwner: TTcpConnectionServerFactory;
    // FListenSocket: array of TIocpMultiSocket;
    FServers: array [0..62] of TTcpConnectionServer;
    FAcceptEvents: array [0..63] of THandle;
    FServerCount:Integer;
//    FInitAcceptNum: Integer;
    FAiFamily: Integer;
    FAcceptEventServer: TTcpConnectionServer;
    FAcceptEventType: TIocpMultiAcceptEventType;
    FAcceptEventSocketContext: Pointer;
    FAcceptEvent: THandle;
    FAcceptEventLocker: TCriticalSection;
  protected
    procedure Execute; override;
    procedure DoAddListen;
    procedure DoDelListen;
    procedure DoShutDown;
    procedure DoDelAllListen;
  public
    constructor Create(IocpSocket: TTcpConnectionServerFactory; AiFamily, InitAcceptNum: Integer); reintroduce;
    destructor Destroy; override;
    function AddListen(aServer: TTcpConnectionServer): Boolean;
    procedure DelListen(aServer: TTcpConnectionServer);
    procedure Quit; override;
  end;

procedure TIocpMultiAcceptThread.Execute;
var
  N, i: Integer;
  aSocket: TSocket;
  LastErr: Integer;
  RetEvents: TWSANetworkEvents;
  dwRet: DWORD;
begin
  FServerCount:=0;
  FAcceptEvents[0] := CreateEvent(nil, True, False, nil);
  try
    try
      while not Terminated do
      begin
        // 等待退出或者ACCEPT事件
        dwRet := WSAWaitForMultipleEvents(FServerCount+1, @FAcceptEvents[0], False, INFINITE, True);
        if (dwRet = WSA_WAIT_FAILED) or Terminated then
          Break;

        // 收到事件通知
        N := dwRet - WSA_WAIT_EVENT_0;
        if (N = Length(FAcceptEvents)) then
        begin
          case FAcceptEventType of
            etAdd:
              DoAddListen;
            etDel:
              DoDelListen;
            etShutdown:
              DoShutDown;
          end;
          Continue;
        end;

        aSocket := FServers[N].FListenSocket;
        // 读取事件状态
        if (WSAEnumNetworkEvents(aSocket, FAcceptEvents[N], @RetEvents) = SOCKET_ERROR) then
        begin
          LastErr := WSAGetLastError;
          AppendLog('%s.WSAEnumNetworkEvents失败, ERROR %d=%s', [ClassName, LastErr, SysErrorMessage(LastErr)], ltWarning);
          DelListen(FServers[N]);
          Continue;
        end;

        // 如果ACCEPT事件触发,则投递新的Accept套接字
        // 每次投递32个
        if (RetEvents.lNetworkEvents and FD_ACCEPT = FD_ACCEPT) then
        begin
          if (RetEvents.iErrorCode[FD_ACCEPT_BIT] <> 0) then
          begin
            LastErr := WSAGetLastError;
            AppendLog('%s.WSAEnumNetworkEvents失败, ERROR %d=%s', [ClassName, LastErr, SysErrorMessage(LastErr)], ltWarning);
            DelListen(FServers[N]);
            Continue;
          end;
          for i := 1 to FServers[N].FParam.InitAcceptNum do
          begin
            if not FOwner.PostNewAcceptEx2(FServers[N], FAiFamily) then
              Break;
          end;
        end;
      end; // End of While
      DoDelAllListen;
    finally
      CloseHandle(FAcceptEvents[0]);
    end;
  except
    on e: Exception do
      AppendLog('%s.Execute, %s=%s', [ClassName, e.ClassName, e.Message], ltException);
  end;
end;

下面是添加侦听:
function TTcpConnectionServerFactory.AddListen(const 
Param:ITcpConnectionServerParam): TTcpConnectionServer;
const
  IPV6_V6ONLY = 27;
var
  PHost: PWideChar;
  ListenSocket: TSocket;
  InAddrInfo: TAddrInfoW;
  POutAddrInfo, Ptr: PAddrInfoW;
  LastErr: Integer;
  B: Boolean;
  i: Integer;
  T: TIocpMultiAcceptThread;
begin
  Result := nil;

  try
    // 如果传递了一个有效地址则监听该地址
    // 否则监听所有本地地址
    if (Param.ListenAddress = '') then
      PHost := nil
    else
      PHost := PWideChar(WideString(AnsiString(Param.ListenAddress)));

    FillChar(InAddrInfo, SizeOf(TAddrInfoW), 0);
    InAddrInfo.ai_flags    := AI_PASSIVE;
    InAddrInfo.ai_family   := AF_UNSPEC;
    InAddrInfo.ai_socktype := SOCK_STREAM;
    InAddrInfo.ai_protocol := IPPROTO_TCP;
    if (getaddrinfo(PHost, PWideChar(IntToStr(Param.ListenPort)), @InAddrInfo, @POutAddrInfo) <> 0) then
    begin
      LastErr := WSAGetLastError;
      AppendLog('%s.getaddrinfo失败, ERR=%d,%s', [ClassName, LastErr, SysErrorMessage(LastErr)], ltWarning);
      Exit;
    end;

    try
      Ptr := POutAddrInfo;
      while (Ptr <> nil) do
      begin
        ListenSocket := WSASocket(Ptr.ai_family, Ptr.ai_socktype, Ptr.ai_protocol, nil, 0, WSA_FLAG_OVERLAPPED);
        if (ListenSocket = INVALID_SOCKET) then
          Exit;

        // no := 0;
        // setsockopt(ListenSocket, IPPROTO_IPV6, IPV6_V6ONLY, PAnsiChar(@no), sizeof(no));

        if (bind(ListenSocket, Ptr.ai_addr, Ptr.ai_addrlen) = SOCKET_ERROR) then
        begin
          LastErr := WSAGetLastError;
          Iocp.Winsock2.CloseSocket(ListenSocket);
          AppendLog('%s.绑定监听端口(%d)失败, ERR=%d,%s', [ClassName, Param.ListenPort, LastErr, SysErrorMessage(LastErr)], ltWarning);
          Exit;
        end;

        if (Iocp.Winsock2.listen(ListenSocket, SOMAXCONN) = SOCKET_ERROR) then
        begin
          LastErr := WSAGetLastError;
          Iocp.Winsock2.CloseSocket(ListenSocket);
          AppendLog('%s.启动监听端口(%d)失败, ERR=%d,%s', [ClassName, Param.ListenPort, LastErr, SysErrorMessage(LastErr)], ltWarning);
          Exit;
        end;

        Result := TTcpConnectionServer.Create;
        if not AssociateSocketWithCompletionPort2(ListenSocket, Result) then
        begin
          Result.Free;
          Iocp.Winsock2.CloseSocket(ListenSocket);
          AppendLog('%s.绑定监听端口(%d)到IOCP失败', [ClassName, Param.ListenPort], ltWarning);
          Exit;
        end;

        Result.FTcpSocket          := Self;
        Result.FListenSocket       := ListenSocket;
        Result.FContext            := Param.Context;
        Result.FInfo.FStartTime    := Now;
        Result.FInfo.FLocalAddress := Param.ListenAddress;
        Result.FInfo.FLocalPort    := Param.ListenPort;
        try
          FListenThreadsLocker.Enter;
          for i := 0 to FListenThreads.Count - 1 do
          begin
            B := TIocpMultiAcceptThread(FListenThreads[i]).AddListen(Result);
            if B then
              Break;
          end;
          if not B then
          begin
            T := TIocpMultiAcceptThread.Create(Self, Ptr.ai_family, Param.InitAcceptNum);
            FListenThreads.Add(T);
            T.AddListen(Result);
          end;
        finally
          FListenThreadsLocker.Leave;
        end;

        Ptr := Ptr.ai_next;
      end;
    finally
      freeaddrinfo(POutAddrInfo);
    end;

    // Result := True;
  except
    on e: Exception do
      AppendLog('%s.Listen ERROR %s=%s', [ClassName, e.ClassName, e.Message], ltException);
  end;
end;

Original issue reported on code.google.com by Hezihang...@gmail.com on 27 May 2013 at 7:39

GoogleCodeExporter commented 8 years ago
function TIocpMultiAcceptThread.AddListen(aServer: TTcpConnectionServer): 
Boolean;
begin
  FAcceptEventLocker.Enter;
  Result := FServerCount <= 63;
  if Result then
  begin
    FAcceptEventType   := etAdd;
    FAcceptEventServer := aServer;
    SetEvent(FAcceptEvent);
  end
  else
    FAcceptEventLocker.Leave;
end;

procedure TIocpMultiAcceptThread.DoAddListen;
var
  i: Integer;
begin
  FServers[FServerCount] := FAcceptEventServer;
  // FListenSocket[High(FServers)].FContext:=FAcceptEventSocketContext;

  for i := 1 to FAcceptEventServer.FParam.InitAcceptNum - 1 do
    FOwner.PostNewAcceptEx(FServers[FServerCount].FListenSocket, FAiFamily);
  FAcceptEvents[FServerCount] := WSACreateEvent;
  // 绑定监听端口的ACCEPT事件,当没有足够的Accept套接字时就会触发该事件
  WSAEventSelect(FAcceptEventServer.FListenSocket, FAcceptEvents[FServerCount], FD_ACCEPT);
  Inc(FServerCount);
  FAcceptEvents[FServerCount] := FAcceptEvent;
  FAcceptEventLocker.Leave;
end;

procedure TIocpMultiAcceptThread.DelListen(aServer: TTcpConnectionServer);
begin
  FAcceptEventLocker.Enter;
  FAcceptEventType   := etDel;
  FAcceptEventServer := aServer;
  SetEvent(FAcceptEvent);
end;

procedure TIocpMultiAcceptThread.DoDelListen;
var
  i: Integer;
begin
  for i := 0 to FServerCount-1 do
  begin
    if FServers[i] = FAcceptEventServer then
    begin
      FOwner.CloseSocket(FAcceptEventServer.FListenSocket);
      CloseHandle(FAcceptEvents[i]);
      FServers[i].DoStop;
      FServers[i].Free;
      if i <> High(FServers) then
        Move(FServers[i + 1], FServers[i], FServerCount - i);
      Move(FAcceptEvents[i + 1], FAcceptEvents[i], FServerCount+1 - i);
      Dec(FServerCount);
      Break;
    end;
  end;
  FAcceptEventLocker.Leave;
end;

Original comment by Hezihang...@gmail.com on 27 May 2013 at 2:46

GoogleCodeExporter commented 8 years ago
你的程序在添加Listen或删除Listen可能会存在问题,因为
      dwRet := WSAWaitForMultipleEvents(FServerCount+1, @FAcceptEvents[0], False, INFINITE, True);
并没有及时把新的Listen加入Wait或者没有及时把删除Listen的Even
t从FAcceptEvents删除,
我的程序不会有这个问题,请参考我的程序.

Original comment by Hezihang...@gmail.com on 27 May 2013 at 2:52

GoogleCodeExporter commented 8 years ago
[deleted comment]
GoogleCodeExporter commented 8 years ago
1、等空余时间多一点,这个我会考虑
2、你更新最新的代码看看,应该没这个问题

Original comment by soulaw...@gmail.com on 31 May 2013 at 1:37

GoogleCodeExporter commented 8 years ago
在IOAcceptThread中,Quit和New Listen 
事件可以用一个Event,这样一个线程可以多侦听一个端口,另
外IoAcceptThread的Reset和AddListen仍然可能存在被多个线程同时访�
��,导致数据错误。
你可以参考看一下上面我的程序中的AddListen,DelListen等实现��
�式。

Original comment by Hezihang...@gmail.com on 1 Jun 2013 at 1:30