diff --git a/client.go b/client.go index 4971906..e6017bc 100644 --- a/client.go +++ b/client.go @@ -1,7 +1,6 @@ package srpc import ( - "bytes" "errors" "fmt" "io" @@ -208,6 +207,8 @@ func (c *RPCClient) Call(request interface{}) (response interface{}, err error) err = requestCall.Error requestCall.state = FREE requestCall.Unlock() + fmt.Println(response) + fmt.Println(err) return response, err } @@ -310,8 +311,6 @@ func clientWriter(c *RPCClient, conn io.Writer, stopChan <-chan struct{}, done c defer func() { done <- err }() enc := c.Ed.NewEncoder(conn) - b := bytes.Buffer{} - enc2 := c.Ed.NewEncoder(&b) for { err = nil @@ -320,8 +319,6 @@ func clientWriter(c *RPCClient, conn io.Writer, stopChan <-chan struct{}, done c select { case requestCall = <-c.callChan: requestCall.Lock() - enc2.Encode(requestCall.Request) - fmt.Println(b.Bytes()) if err = enc.Encode(requestCall.Request); err != nil { if !serverDisconnect(err) && !clientStop(c.stopChan) { requestCall.Error = errors.New("srpc - '%s'=>'%s': Cannot encode request: '%s'\n") @@ -332,13 +329,52 @@ func clientWriter(c *RPCClient, conn io.Writer, stopChan <-chan struct{}, done c return } } + requestCall.Unlock() case <-stopChan: return } } } -func clientReader(c *RPCClient, conn io.Reader, done <-chan error) { +func clientReader(c *RPCClient, conn io.Reader, done chan<- error) { + var err error + defer func() { done <- err }() + + dec := c.Ed.NewDecoder(conn) + + for { + var response serverMessage + response.Data = c.ResponseDataHandler() + + if err = dec.Decode(&response); err != nil { + if serverDisconnect(err) || clientStop(c.stopChan) { + return + } + } + + fmt.Println(response) + ok := false + for _, e := range c.calls { + fmt.Println(e.Request.ID) + e.Lock() + fmt.Println(e.Request.ID) + if e.Request.ID == response.ID { + e.Response = response + if e.Response.Error != "" { + e.Error = errors.New(e.Response.Error) + } + ok = true + e.done <- true + e.Unlock() + break + } + e.Unlock() + } + + if !ok { + c.LogError("srpc - Client response for unknown request ID '%d'\n", response.ID) + } + } } func serverDisconnect(err error) bool { diff --git a/server.go b/server.go index 68f76f1..0faef2f 100644 --- a/server.go +++ b/server.go @@ -1,7 +1,6 @@ package srpc import ( - "bytes" "errors" "fmt" "io" @@ -95,7 +94,6 @@ func (s *RPCServer) Start() (err error) { s.Listener = &netListener{} } if err = s.Listener.Init(s.Addr); err != nil { - err = fmt.Errorf("srpc - '%s' cannot listen to: '%s'", s.Addr, err) s.LogError("%s\n", err) return err } @@ -180,21 +178,14 @@ func serverHandleConnection(s *RPCServer, conn io.ReadWriteCloser, clientAddr st var err error var msg clientMessage msg.Data = s.RequestDataHandler() - fmt.Println(msg) - p := make([]byte, 62) - conn.Read(p) - fmt.Println(p) - - dec := s.Ed.NewDecoder(bytes.NewBuffer(p)) - //dec := s.Ed.NewDecoder(conn) + dec := s.Ed.NewDecoder(conn) enc := s.Ed.NewEncoder(conn) for { if err = dec.Decode(&msg); err != nil { if !clientDisconnect(err) && !serverStop(s.stopChan) { s.LogError("srpc - '%s'=>'%s': Cannot decode request: '%s'\n", clientAddr, s.Addr, err) - fmt.Println(msg) } conn.Close() return @@ -205,6 +196,7 @@ func serverHandleConnection(s *RPCServer, conn io.ReadWriteCloser, clientAddr st response.ClientAddr = msg.ClientAddr response.Data = s.RequestHandler(msg.ClientAddr, msg.Data) + fmt.Println(response.Data) if err = enc.Encode(response); err != nil { if !clientDisconnect(err) && !serverStop(s.stopChan) {