Communication basics done

This commit is contained in:
Matthias Fulz 2020-03-25 16:01:56 +01:00
parent 067e2a7129
commit a252437585
2 changed files with 44 additions and 16 deletions

View File

@ -1,7 +1,6 @@
package srpc
import (
"bytes"
"errors"
"fmt"
"io"
@ -208,6 +207,8 @@ func (c *RPCClient) Call(request interface{}) (response interface{}, err error)
err = requestCall.Error
requestCall.state = FREE
requestCall.Unlock()
fmt.Println(response)
fmt.Println(err)
return response, err
}
@ -310,8 +311,6 @@ func clientWriter(c *RPCClient, conn io.Writer, stopChan <-chan struct{}, done c
defer func() { done <- err }()
enc := c.Ed.NewEncoder(conn)
b := bytes.Buffer{}
enc2 := c.Ed.NewEncoder(&b)
for {
err = nil
@ -320,8 +319,6 @@ func clientWriter(c *RPCClient, conn io.Writer, stopChan <-chan struct{}, done c
select {
case requestCall = <-c.callChan:
requestCall.Lock()
enc2.Encode(requestCall.Request)
fmt.Println(b.Bytes())
if err = enc.Encode(requestCall.Request); err != nil {
if !serverDisconnect(err) && !clientStop(c.stopChan) {
requestCall.Error = errors.New("srpc - '%s'=>'%s': Cannot encode request: '%s'\n")
@ -332,13 +329,52 @@ func clientWriter(c *RPCClient, conn io.Writer, stopChan <-chan struct{}, done c
return
}
}
requestCall.Unlock()
case <-stopChan:
return
}
}
}
func clientReader(c *RPCClient, conn io.Reader, done <-chan error) {
func clientReader(c *RPCClient, conn io.Reader, done chan<- error) {
var err error
defer func() { done <- err }()
dec := c.Ed.NewDecoder(conn)
for {
var response serverMessage
response.Data = c.ResponseDataHandler()
if err = dec.Decode(&response); err != nil {
if serverDisconnect(err) || clientStop(c.stopChan) {
return
}
}
fmt.Println(response)
ok := false
for _, e := range c.calls {
fmt.Println(e.Request.ID)
e.Lock()
fmt.Println(e.Request.ID)
if e.Request.ID == response.ID {
e.Response = response
if e.Response.Error != "" {
e.Error = errors.New(e.Response.Error)
}
ok = true
e.done <- true
e.Unlock()
break
}
e.Unlock()
}
if !ok {
c.LogError("srpc - Client response for unknown request ID '%d'\n", response.ID)
}
}
}
func serverDisconnect(err error) bool {

View File

@ -1,7 +1,6 @@
package srpc
import (
"bytes"
"errors"
"fmt"
"io"
@ -95,7 +94,6 @@ func (s *RPCServer) Start() (err error) {
s.Listener = &netListener{}
}
if err = s.Listener.Init(s.Addr); err != nil {
err = fmt.Errorf("srpc - '%s' cannot listen to: '%s'", s.Addr, err)
s.LogError("%s\n", err)
return err
}
@ -180,21 +178,14 @@ func serverHandleConnection(s *RPCServer, conn io.ReadWriteCloser, clientAddr st
var err error
var msg clientMessage
msg.Data = s.RequestDataHandler()
fmt.Println(msg)
p := make([]byte, 62)
conn.Read(p)
fmt.Println(p)
dec := s.Ed.NewDecoder(bytes.NewBuffer(p))
//dec := s.Ed.NewDecoder(conn)
dec := s.Ed.NewDecoder(conn)
enc := s.Ed.NewEncoder(conn)
for {
if err = dec.Decode(&msg); err != nil {
if !clientDisconnect(err) && !serverStop(s.stopChan) {
s.LogError("srpc - '%s'=>'%s': Cannot decode request: '%s'\n", clientAddr, s.Addr, err)
fmt.Println(msg)
}
conn.Close()
return
@ -205,6 +196,7 @@ func serverHandleConnection(s *RPCServer, conn io.ReadWriteCloser, clientAddr st
response.ClientAddr = msg.ClientAddr
response.Data = s.RequestHandler(msg.ClientAddr, msg.Data)
fmt.Println(response.Data)
if err = enc.Encode(response); err != nil {
if !clientDisconnect(err) && !serverStop(s.stopChan) {