From 38a30476647e5126c8062c39e3caae0adb1291f4 Mon Sep 17 00:00:00 2001 From: Matthias Fulz Date: Mon, 17 Feb 2020 19:18:39 +0100 Subject: [PATCH] Working --- connection.go | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/connection.go b/connection.go index d6f3436..98d13b6 100644 --- a/connection.go +++ b/connection.go @@ -1,11 +1,28 @@ package srpc import ( + "bytes" "io" "net" "sync" + "time" ) +type RPCMsgType uint8 + +const ( + RPC_REQUEST RPCMsgType = 0 + RPC_RESPONSE RPCMsgType = 1 + RPC_HEARTBEAT RPCMsgType = 2 +) + +const RPCHeaderSize = 10 + +type RPCHeader struct { + RPCType RPCMsgType + Size uint64 +} + type RPCStatus uint8 const ( @@ -13,6 +30,9 @@ const ( RPCERR RPCStatus = 1 ) +type RPCHeartbeat struct { +} + type RPCRequest struct { FuncName string Payload []byte @@ -33,6 +53,7 @@ type IRPCConn interface { } type TCPConn struct { + sync.Mutex conn net.Conn ed IEncoderDecoder @@ -40,6 +61,79 @@ type TCPConn struct { IRPCConn } +func (tc *TCPConn) send(payload interface{}) (err error) { + var header RPCHeader + var hb, b bytes.Buffer + + if _, ok := payload.(*RPCHeartbeat); ok { + header.RPCType = RPC_HEARTBEAT + } else if _, ok := payload.(*RPCRequest); ok { + header.RPCType = RPC_REQUEST + } else if _, ok := payload.(*RPCResponse); ok { + header.RPCType = RPC_RESPONSE + } else { + return errors.New("srpc - Invalid RPC message type") + } + + enc := tc.ed.NewEncoder(&b) + if err = enc.Encode(payload); err != nil { + return errors.New("srpc - Error: '" + err.Error() + "'") + } + header.Size = uint64(len(b.Bytes())) + + enc := tc.ed.NewEncoder(&hb) + if err = enc.Encode(header); err != nil { + return errors.New("srpc - Error: '" + err.Error() + "'") + } + + data := append([]byte{}, hb.Bytes()...) + data = append(data, b.Bytes()...) + + if n, err = tc.conn.Write(data); err != nil || n != len(data) { + return errors.New("srpc - Error writing message") + } +} + +func (tc *TCPConn) receive() (payload interface{}, err error) { + var header RPCHeader + hb := make([]byte, RPCHeaderSize) + + if n, err = tc.conn.Read(hb); err != nil || n != RPCHeaderSize { + return nil, errors.New("srpc - Error receiving message") + } + + dec := tc.ed.NewDecoder(bytes.NewReader(hb)) + if err = dec.Decode(&header); err != nil { + return nil, errors.New("srpc - Error: '" + err.Error() + "'") + } + + switch header.RPCType { + case RPC_HEARTBEAT: + payload = new(RPCHeartbeat) + break + case RPC_REQUEST: + payload = new(RPCRequest) + break + case RPC_RESPONSE: + payload = new(RPCResponse) + break + default: + return nil, errors.New("srpc - Invalid RPC message type") + } + + b := make([]byte, header.Size) + if n, err = tc.conn.Read(b); err != nil || n != int(header.Size) { + return nil, errors.New("srpc - Error receiving message") + } + + dec := tc.ed.NewDecoder(bytes.NewReader(b)) + if err = dec.Decode(payload); err != nil { + return errors.New("srpc - Error: '" + err.Error() + "'") + } + + return payload, nil +} + func (tc *TCPConn) ReceiveRequest() (ret *RPCRequest, err error) { ret = new(RPCRequest)