ðâ%Oberon10.Scn.Fnt9Syntax10.Scn.Fnt4Oberon8b.Scn.Fnt Syntax8b.Scn.Fnt+USyntax10b.Scn.Fnt*            5 J 5, S#>@  ,  . r  ?   " %         : A# Oberon10i.Scn.Fnt     ; \  å*  3!* "! Ã     H  "  "A I        %!H5 ³    ; K   "  %  " $  7+  9$ ’ 63 -"& 0` `N*zº >/E‚ =%: A 4 ).  )  1  D  "Š;{* "  2   -)7  -,X ".P" I@!)2  ; [ > ;       "   E/  $% 4 " +w 4,(>    Is 2#u fs J ¹7J$#3;©$4J %        #    6 d(   ;!& 4V9  /Š . $   2,#      0    55P(* Aos Runtime: AosTCP, Copyright 2005, Emil J. Zeller *) (* Aos, Copyright 2001, Pieter Muller, ETH Zurich *) MODULE AosTCP; (** AUTHOR "pjm, mvt"; PURPOSE "TCP protocol"; *) IMPORT SYSTEM, WSock32, AosModules, AosKernel, AosIO, AosNet, AosIP, AosActive, AosOut; CONST trace = FALSE; NilPort* = 0; HashTableSize = 1024 * 16; (* size of connection lookup hash table *) (** Error codes *) Ok* = 0; ConnectionRefused* = 3701; NotConnected* = 3705; TimedOut* = 3704; (** TCP connection states *) (** TCP connection states *) NumStates* = 12; Closed* = 0; Listen* = 1; SynSent* = 2; SynReceived* = 3; Established* = 4; CloseWait* = 5; FinWait1* = 6; Closing* = 7; LastAck* = 8; FinWait2* = 9; TimeWait* = 10; Unused* = 11; (* no real state, only used in this implementation *) OpenStates* = {Listen, SynReceived, Established, CloseWait, FinWait1, FinWait2}; ClosedStates* = {Unused, Closed, Closing, LastAck, TimeWait}; HalfClosedStates* = ClosedStates + {FinWait1, FinWait2}; FinStates* = {Unused, Closed, CloseWait, Closing, LastAck, TimeWait}; Timeout = 14; (*AckNow = 0; *) (* send Ack immediately *) (* DelAck = 1;*) (* send Ack, but try to delay it *) NoDelay = 2; (* don't delay packets tocoalesce (disable Nagle algorithm) *) DoKeepAlive = 3; (* enable keep-alive timer *) BufSize = 8192; MaxReceive = 8 * 8192; MaxSend = 4096; TYPE NetProc = PROCEDURE {DELEGATE} ( VAR buf: ARRAY OF SYSTEM.BYTE; size: LONGINT ): LONGINT; TYPE BufSeg = OBJECT VAR data: ARRAY BufSize OF CHAR; ofs, len: LONGINT; next: BufSeg (* prev: needed?*) END BufSeg; Receiver = OBJECT VAR first, last: BufSeg; available: LONGINT; recv: NetProc; run: BOOLEAN; terminate: BOOLEAN; PROCEDURE & Init( recv: NetProc ); BEGIN NEW( first ); last := first; SELF.recv := recv; run := FALSE; terminate := FALSE; END Init; PROCEDURE CleanUp; BEGIN IF (first = last) & (first.len = 0) THEN first.ofs := 0 END; END CleanUp; PROCEDURE Start*; BEGIN {EXCLUSIVE} run := TRUE; NEW( first ); last := first; END Start; PROCEDURE Terminate; BEGIN {EXCLUSIVE} terminate := TRUE; END Terminate; PROCEDURE Running( ): BOOLEAN; BEGIN {EXCLUSIVE} RETURN run; END Running; PROCEDURE Receive; VAR inbuf: ARRAY BufSize OF CHAR; inlen, puflen, len, ret, offset: LONGINT; BEGIN LOOP BEGIN {EXCLUSIVE} AWAIT( run OR terminate ); END; IF terminate THEN EXIT END; ret := recv( inbuf, LEN( inbuf ) ); (* now transfer bytes to buffers *) BEGIN {EXCLUSIVE} IF ret < 0 THEN IF trace THEN AosOut.String( "AosTCP.Receiver.Receive: " ); WSock32.DispError; END; run := FALSE; ELSIF (ret = 0) THEN run := FALSE ELSE offset := 0; WHILE (ret > 0) DO puflen := BufSize - last.len - last.ofs; IF puflen = 0 THEN NEW( last.next ); last := last.next; last.len := 0; last.ofs := 0; puflen := BufSize; END; IF puflen < ret THEN len := puflen ELSE len := ret END; SYSTEM.MOVE( SYSTEM.ADR( inbuf[offset] ), SYSTEM.ADR( last.data[last.ofs + last.len] ), len ); INC( last.len, len ); INC( available, len ); DEC( ret, len ); DEC( puflen, len ); INC( offset, len ); END; AWAIT( (available < MaxReceive) OR (~run) OR (terminate) ); END; END; END; END Receive; PROCEDURE Get( adr: LONGINT; min, max: LONGINT ): LONGINT; VAR size, size1, clen: LONGINT; BEGIN {EXCLUSIVE} AWAIT( (available >= min) OR terminate ); IF max < available THEN size := max ELSE size := available END; size1 := size; WHILE (size > 0) DO IF (first # last) & (first.len = 0) THEN first := first.next END; clen := first.len; IF size < clen THEN clen := size END; SYSTEM.MOVE( SYSTEM.ADR( first.data[first.ofs] ), adr, clen ); INC( first.ofs, clen ); DEC( first.len, clen ); DEC( size, clen ); INC( adr, clen ); DEC( available, clen ); END; CleanUp; RETURN size1; END Get; PROCEDURE Available( ): LONGINT; BEGIN {EXCLUSIVE} RETURN available; END Available; BEGIN {ACTIVE} Receive; END Receiver; Sender = OBJECT VAR first, last: BufSeg; send: NetProc; available: LONGINT; PROCEDURE CleanUp; BEGIN IF (first = last) & (first.len = 0) THEN first.ofs := 0 END; END CleanUp; PROCEDURE Send0( ): LONGINT; VAR ret: LONGINT; BEGIN ret := 1; WHILE (available > 0) & (ret > 0) DO IF (first.len = 0) & (first # last) THEN first := first.next END; ret := send( (first.data[first.ofs]), first.len ); IF ret > 0 THEN DEC( available, ret ); INC( first.ofs, ret ); DEC( first.len, ret ); END; END; CleanUp; RETURN ret; END Send0; PROCEDURE Send( ): LONGINT; BEGIN {EXCLUSIVE} IF available > 0 THEN RETURN Send0(); ELSE RETURN 0 END; END Send; PROCEDURE Put( adr, size: LONGINT ): LONGINT; VAR clen, res: LONGINT; BEGIN {EXCLUSIVE} res := 1; WHILE (size > 0) & (res > 0) DO IF (last.ofs + last.len = BufSize) THEN NEW( last.next ); last := last.next; last.len := 0; last.ofs := 0; END; clen := BufSize - last.ofs - last.len; IF size < clen THEN clen := size END; SYSTEM.MOVE( adr, SYSTEM.ADR( last.data[last.ofs + last.len] ), clen ); INC( last.len, clen ); DEC( size, clen ); INC( adr, clen ); INC( available, clen ); IF available > MaxSend THEN res := Send0(); END; END; RETURN res; END Put; PROCEDURE & Init( send: NetProc ); BEGIN NEW( first ); last := first; SELF.send := send; END Init; END Sender; (** Connection object. NOTE: Only one process should access a Connection! *) Connection* = OBJECT (AosIO.Connection) VAR (* assigned interface *) int-: AosIP.Interface; (* local protocol address *) lport-: LONGINT; (* foreign protocol address *) fip-: AosIP.Adr; fport-: LONGINT; state-: SHORTINT; (* TCP state *) (* send sequence *) sndnxt-: LONGINT; (* send next *) iss-: LONGINT; (* initial send sequence number *) (* receive sequence *) rcvnxt-: LONGINT; (* receive next *) irs-: LONGINT; (* initial receive sequence number *) socket: WSock32.Socket; sndwnd-: LONGINT; (* send window *) sndcwnd-: LONGINT; (* congestion-controlled window *) sndcc-: LONGINT; (* number of bytes in send buffer *) rcvwnd-: LONGINT; (* receive window *) srtt-: LONGINT; (* smoothed round trip time *) (* receiver: Receiver; *) (* sender: Sender; *) (* lip: AosIP.Adr; *) timeout: AosActive.Timer; flags: SET; (* Initialization for internal use only. *) PROCEDURE & Init; BEGIN state := Unused; socket := WSock32.InvalidSocket; (* NEW(inbuf,receive,send); *) (*NEW( receiver, SELF.receive ); *) (*NEW( sender, SELF.send ); *) END Init; (** Open a TCP connection (only use once per Connection instance). Use AosTCP.NilPort for lport to automatically assign an unused local port. *) PROCEDURE Open*( lport: LONGINT; fip: AosIP.Adr; fport: LONGINT; VAR res: LONGINT ); VAR adr: WSock32.sockaddrIn; err: LONGINT; str: ARRAY 64 OF CHAR; BEGIN {EXCLUSIVE} IF trace THEN AosOut.Enter; AosOut.String( "Open connection: lport=" ); AosOut.Int( lport, 1 ); AosOut.String( " ,fip=" ); AosIP.AdrToStr( fip, str ); AosOut.String( str ); AosOut.String( " ,fport=" ); AosOut.Int( fport, 1 ); AosOut.Exit; END; ASSERT ( (state = Unused) & (lport >= 0) & (lport < 10000H) & (fport >= 0) & (fport < 10000H) ); IF socket = WSock32.InvalidSocket THEN socket := WSock32.socket( WSock32.AFINet, WSock32.SockStream, WSock32.IPProtoTCP ); ASSERT ( socket # WSock32.InvalidSocket ); pool.Add( SELF, SELF.Finalize ) END; IF (fip # AosIP.NilAdr) & (fport # NilPort) THEN (* active open (connect) *) int := AosIP.InterfaceByDstIP( fip ); SELF.lport := lport; SELF.fip := fip; SELF.fport := fport; IF lport # NilPort THEN adr.sinFamily := WSock32.PFINet; adr.sinAddr := 0; adr.sinPort := WSock32.htons( SHORT( lport ) ); err := WSock32.bind( socket, adr, SIZE( WSock32.sockaddrIn ) ); IF err # 0 THEN res := NotConnected; state := Closed; WSock32.DispError; RETURN END END; adr.sinFamily := WSock32.PFINet; (* SYSTEM.MOVE( SYSTEM.ADR( fip ), SYSTEM.ADR( adr.sinAddr ), 4 ); *) adr.sinAddr := (fip); adr.sinPort := WSock32.htons( SHORT( fport ) ); err := WSock32.connect( socket, adr, SIZE( WSock32.sockaddrIn ) ); IF err # 0 THEN res := NotConnected; WSock32.DispError; state := Closed; err := WSock32.closesocket( socket ); ELSE res := Ok; state := Established; SetPortAndIp(); END ELSE (* passive open (listen) *) ASSERT ( (fport = NilPort) & (fip = AosIP.NilAdr) ); SELF.int := NIL; SELF.lport := lport; SELF.fip := AosIP.NilAdr; SELF.fport := NilPort; adr.sinFamily := WSock32.PFINet; adr.sinAddr := 0; adr.sinPort := WSock32.htons( SHORT( lport ) ); err := WSock32.bind( socket, adr, SIZE( WSock32.sockaddrIn ) ); IF err = 0 THEN err := WSock32.listen( socket, WSock32.SOMaxConn ) END; IF err # 0 THEN res := NotConnected; state := Closed; ELSE SetPortAndIp(); res := Ok; state := Listen END END; IF trace THEN AosOut.Enter; AosOut.String( "Open connection, result = " ); AosOut.Int( res, 1 ); AosOut.Exit; END; IF state = Established THEN (* receiver.Start*) END; END Open; (** Send data on a TCP connection. *) PROCEDURE Send*( VAR data: ARRAY OF CHAR; ofs, len: LONGINT; propagate: BOOLEAN; VAR res: LONGINT ); VAR err, n: LONGINT; buf: ARRAY 1024 OF CHAR; BEGIN IF state = Closed THEN res := NotConnected; RETURN ELSIF state = Closing THEN ELSE ASSERT ( (state = Established) & (socket # WSock32.InvalidSocket) ); END; ASSERT ( ofs + len <= LEN( data ) ); (* index check *) res := AosIO.Ok; (* WHILE (len > 0) & (res = AosIO.Ok) DO IF len > 1024 THEN n := 1024 ELSE n := len END; SYSTEM.MOVE(SYSTEM.ADR(data[ofs]), SYSTEM.ADR(buf[0]), n); err := WSock32.send(socket, buf, n, {}); IF err <= 0 THEN err := WSock32.WSAGetLastError(); AosOut.String( "AosTCP.Send :" ); WSock32.DispError; res := NotConnected; (* IF (err = WSock32.WSAEConnReset) OR (err = WSock32.WSAEConnAborted) OR (err = WSock32.WSAEShutdown) THEN res := NotConnected ELSE res := NotConnected; WSock32.DispError; END *) ELSE INC(ofs, err); DEC(len, err) END END; *) propagate := TRUE; err := WSock32.send( socket, data[ofs], len, {} ); (* err := sender.Put( SYSTEM.ADR( data[ofs] ), len ); IF (err > 0) & propagate THEN err := sender.Send(); END; *) IF state = Closing THEN BEGIN {EXCLUSIVE} state := Closed; END; END; IF (err < 0) OR ((err=0) & (len>0)) THEN AosOut.String( "AosTCP.Send :" ); WSock32.DispError; res := NotConnected; END; END Send; PROCEDURE receive( VAR buf: ARRAY OF SYSTEM.BYTE; size: LONGINT ): LONGINT; BEGIN RETURN WSock32.recv( socket, buf, size, {} ); (* return 0 -> Connection has been gracefully closed *) END receive; PROCEDURE send( VAR buf: ARRAY OF SYSTEM.BYTE; size: LONGINT ): LONGINT; BEGIN RETURN WSock32.send( socket, buf, size, {} ); END send; (** Receive data on a TCP connection. The data parameter specifies the buffer. The ofs parameters specify the position in the buffer where data should be received (usually 0), and the size parameters specifies how many bytes of data can be received in the buffer. The min parameter specifies the minimum number of bytes to receive before Receive returns and must by <= size. The len parameter returns the number of bytes received, and the res parameter returns 0 if ok, or a non-zero error code otherwise (e.g. if the connection is closed by the communication partner, or by a call of the Close method). *) PROCEDURE Receive*( VAR data: ARRAY OF CHAR; ofs, size, min: LONGINT; VAR len, res: LONGINT ); VAR err, n, l: LONGINT; buf: ARRAY 1024 OF CHAR; have: LONGINT; ret: LONGINT; BEGIN (* AosOut.String("receive"); AosOut.Int(size,5); AosOut.Int(min,5); AosOut.Ln;*) BEGIN {EXCLUSIVE} IF state = Closed THEN len := 0; res := NotConnected; RETURN END; ASSERT ( ((state = Established) OR (state = Closing)) ); END; IF socket = WSock32.InvalidSocket THEN res := NotConnected; RETURN END; IF (min=0) & (Available()=0) THEN len := 0; res := AosIO.Ok; RETURN END; ASSERT ( (ofs >= 0) & (ofs + size <= LEN( data )) & (min <= size) ); (* parameter consistency check *) len := 0; res := AosIO.Ok; REPEAT ret := WSock32.recv( socket, data[ofs], size, {} ); IF ret > 0 THEN INC( len, ret ); END; UNTIL (len >= min) OR (ret <= 0); IF ret < 0 THEN IF trace THEN AosOut.String( "AosTCP.Receiver.Receive: " ); WSock32.DispError; END; BEGIN {EXCLUSIVE} state := Closing; res := AosIO.EOF END; ELSIF ret = 0 THEN (* connection has been gracefully shut down by remote side, otherwise recv would block *) IF trace THEN AosOut.Enter; AosOut.String( "AosTCP.Connection.Receive, graceful shutdown by remote side " ); AosOut.Exit; END; BEGIN {EXCLUSIVE} state := Closing; res := AosIO.EOF END; END; END Receive; (** Return connection state. *) PROCEDURE State*( ): LONGINT; BEGIN {EXCLUSIVE} RETURN state END State; PROCEDURE HandleTimeout; BEGIN {EXCLUSIVE} INCL( flags, Timeout ) END HandleTimeout; PROCEDURE AwaitState*( good, bad: SET; ms: LONGINT; VAR res: LONGINT ); BEGIN {EXCLUSIVE} IF ~(state IN (good + bad)) THEN IF ms # -1 THEN IF timeout = NIL THEN NEW( timeout ) END; AosActive.SetTimeout( timeout, SELF.HandleTimeout, ms ) END; EXCL( flags, Timeout ); AWAIT( (state IN (good + bad)) OR (Timeout IN flags) ); IF ms # -1 THEN AosActive.CancelTimeout( timeout ) END END; IF state IN good THEN res := Ok ELSIF state IN bad THEN res := NotConnected ELSE res := TimedOut END END AwaitState; (** Close a TCP connection (half-close). *) PROCEDURE Close*; VAR res: LONGINT; BEGIN {EXCLUSIVE} IF state = Closed THEN RETURN END; (*IF receiver # NIL THEN receiver.Terminate END; *) IF trace THEN AosOut.Enter; AosOut.String( "AosTCP.Connection.Close, state= " ); AosOut.Int( state, 1 ); AosOut.Exit; END; (* ASSERT ( ((state = Listen) OR (state = Established) OR (state=Closing) ) & (socket # WSock32.InvalidSocket) ); *) IF socket # WSock32.InvalidSocket THEN (* half-close: use shutdown? *) res := WSock32.shutdown( socket, WSock32.SDboth ); res := WSock32.closesocket( socket ); (* ASSERT(res = 0); *) (* violated when system is going down *) socket := WSock32.InvalidSocket; pool.Remove( SELF ) END; state := Closed END Close; PROCEDURE SetPortAndIp( ); VAR sockname: WSock32.sockaddrIn; lensockname: LONGINT; res: LONGINT; BEGIN lensockname := SIZE( WSock32.sockaddrIn ); res := WSock32.getsockname( socket, sockname, lensockname ); IF res = Ok THEN (* lip := sockname.sinAddr; *) lport := WSock32.ntohs( sockname.sinPort ); (* lip := WSock32.ntohl( lip ); *) END; lensockname := SIZE( WSock32.sockaddrIn ); res := WSock32.getpeername( socket, sockname, lensockname ); IF res = Ok THEN fip := sockname.sinAddr; fport := WSock32.ntohs( sockname.sinPort ); fip := WSock32.ntohl( fip ); END; END SetPortAndIp; (** Accept a client waiting on a listening connection. Blocks until a client is available or the connection is closed. *) PROCEDURE Accept*( VAR client: Connection; VAR res: LONGINT ); VAR s: WSock32.Socket; adr: WSock32.sockaddrIn; adrlen: LONGINT; str: ARRAY 64 OF CHAR; BEGIN IF trace THEN AosOut.Enter; AosOut.String( "Accepting connections" ); AosOut.Exit; END; ASSERT ( (state = Listen) & (socket # WSock32.InvalidSocket) ); adr.sinFamily := WSock32.PFINet; adrlen := SIZE( WSock32.sockaddrIn ); s := WSock32.accept( socket, adr, adrlen ); (* blocks ! *) BEGIN {EXCLUSIVE} IF s # WSock32.InvalidSocket THEN NEW( client ); client.lport := NilPort; IF (adrlen = SIZE( WSock32.sockaddrIn )) & (adr.sinFamily = WSock32.PFINet) THEN client.fip := adr.sinAddr; (* WSock32.ntohl( adr.sinAddr ); *) client.fport := WSock32.ntohs( adr.sinPort ) ELSE client.fip := AosIP.NilAdr; client.fport := NilPort END; client.int := AosIP.InterfaceByDstIP( client.fip ); pool.Add( client, client.Finalize ); client.socket := s; client.state := Established; res := Ok; client.SetPortAndIp(); (*client.receiver.Start; *) IF trace THEN AosOut.Enter; AosOut.String( "Accepted connection: lport=" ); AosOut.Int( client.lport, 1 ); AosOut.String( " ,fip=" ); AosIP.AdrToStr( client.fip, str ); AosOut.String( str ); AosOut.String( " ,fport=" ); AosOut.Int( client.fport, 1 ); AosOut.Exit; AosOut.Ln; END; ELSE client := NIL; res := ConnectionRefused END; END; END Accept; PROCEDURE DelaySend*( enable: BOOLEAN ); BEGIN {EXCLUSIVE} IF enable THEN EXCL( flags, NoDelay ); ELSE INCL( flags, NoDelay ); END; END DelaySend; PROCEDURE KeepAlive*( enable: BOOLEAN ); BEGIN {EXCLUSIVE} IF enable THEN INCL( flags, DoKeepAlive ); ELSE EXCL( flags, DoKeepAlive ); END; END KeepAlive; PROCEDURE Discard*; BEGIN (* SetState( Closed ); *) Close; (* ConnectionFinalizer( SELF ); *) END Discard; PROCEDURE Requested*( ): BOOLEAN; BEGIN {EXCLUSIVE} RETURN FALSE; END Requested; PROCEDURE Available*( ): LONGINT; VAR ret, res: LONGINT; fdset: WSock32.FDSet; data: ARRAY 256 OF CHAR; BEGIN (* {EXCLUSIVE} *) (* IF trace THEN AosOut.String("available: "); AosOut.Int(receiver.Available(),1); AosOut.Ln END; *) ret := WSock32.ioctlsocket( socket, WSock32.FIONRead, res ); IF ret # 0 THEN AosOut.String( "AosTCP.Available " ); WSock32.DispError; END; IF res = 0 THEN (* check socket for shutdown *) fdset.fdcount := 1; fdset.socket[0] := socket; (* AosOut.String("select..."); *) ret := WSock32.select( 0, fdset, NIL , NIL , 0 ); IF ret = 1 THEN (* nothing available but we can receive, try it: *) (* AosOut.Enter; AosOut.String( " Available: trying to receive " ); AosOut.Exit; *) res := WSock32.recv( socket, data, 256, {1} ); IF res = 0 THEN BEGIN {EXCLUSIVE} state := Closing; END; IF trace THEN AosOut.Enter; AosOut.String( "AosTCP.Connection.Available: graceful shutdown by remote side." ); AosOut.Exit; END; ELSIF res < 0 THEN IF trace THEN AosOut.String( "AosTCP.Receiver.Receive: " ); WSock32.DispError; END; res := 0; BEGIN {EXCLUSIVE} state := Closed; END; END; END; END; RETURN res; END Available; (* Finalize the Connection object *) PROCEDURE Finalize( ptr: PTR ); VAR res: LONGINT; BEGIN {EXCLUSIVE} IF trace THEN AosOut.Enter; AosOut.String( "AosTCP.Finalize " ); AosOut.Exit END; ASSERT ( ptr = SELF ); IF socket # WSock32.InvalidSocket THEN res := WSock32.closesocket( socket ); (* ASSERT ( res = 0 ); *) socket := WSock32.InvalidSocket; (* pool.Remove(SELF) *) (* done outside !*) END; state := Unused END Finalize; END Connection; VAR pool*: AosKernel.FinalizedCollection; (* pool of all Connections *) PROCEDURE Init; BEGIN NEW( pool ) END Init; PROCEDURE Finalize( obj: PTR; VAR cont: BOOLEAN ); BEGIN obj( Connection ).Finalize( obj ); cont := TRUE END Finalize; PROCEDURE Cleanup; BEGIN pool.Enumerate( Finalize ) END Cleanup; BEGIN Init(); AosModules.InstallTermHandler( Cleanup ) END AosTCP.