Started with interface / API abstraction

This commit is contained in:
Matthias Fulz 2020-02-14 19:55:35 +01:00
parent 442072cbae
commit 2fe79d214f
3 changed files with 169 additions and 91 deletions

View File

@ -2,30 +2,20 @@ package srpc
import ( import (
"bytes" "bytes"
"fmt" "errors"
"io" "io"
"net" "net"
) )
var REQUEST_HEADER_SIZE = int32(8)
type requestHeader struct {
Size int64
}
type request struct {
FuncName string
Payload []byte
}
type Client struct { type Client struct {
ed IEncoderDecoder ed IEncoderDecoder
conn net.Conn conn IRPCConn
} }
func (client *Client) Call(funcName string, args ...interface{}) (ret []byte, ok bool) { func (client *Client) Call(funcName string, args ...interface{}) (ret []byte, err error) {
defer client.conn.Close()
var b bytes.Buffer var b bytes.Buffer
ok = true
enc := client.ed.NewEncoder(&b) enc := client.ed.NewEncoder(&b)
for _, a := range args { for _, a := range args {
@ -33,26 +23,22 @@ func (client *Client) Call(funcName string, args ...interface{}) (ret []byte, ok
} }
payload := b.Bytes() payload := b.Bytes()
req := request{funcName, payload} req := RPCRequest{funcName, payload}
fmt.Println(req)
enc = client.ed.NewEncoder(client.conn) if err = client.conn.SendRequest(&req); err != nil {
enc.Encode(req) return nil, err
var header responseHeader
dec := client.ed.NewDecoder(client.conn)
dec.Decode(&header)
fmt.Println(header.Size)
fmt.Println(header.Status)
respData := make([]byte, header.Size)
dec.Decode(&respData)
if header.Status != OK {
ok = false
} }
return respData, ok var response *RPCResponse
if response, err = client.conn.ReceiveResponse(); err != nil {
return nil, err
}
if response.Status != RPCOK {
err = errors.New("srpc - Response contained error: '" + response.Error + "'")
}
return response.Payload, err
} }
func (client *Client) NewDecoder(r io.Reader) IDecoder { func (client *Client) NewDecoder(r io.Reader) IDecoder {
@ -60,5 +46,5 @@ func (client *Client) NewDecoder(r io.Reader) IDecoder {
} }
func NewClient(conn net.Conn) *Client { func NewClient(conn net.Conn) *Client {
return &Client{NewEncoderDecoder(), conn} return &Client{NewEncoderDecoder(), NewTCPConn(conn, NewEncoderDecoder())}
} }

93
connection.go Normal file
View File

@ -0,0 +1,93 @@
package srpc
import (
"net"
)
type RPCStatus uint8
const (
RPCOK RPCStatus = 0
RPCERR RPCStatus = 1
)
type RPCRequest struct {
FuncName string
Payload []byte
}
type RPCResponse struct {
Status RPCStatus
Error string
Payload []byte
}
type IRPCConn interface {
ReceiveRequest() (*RPCRequest, error)
ReceiveResponse() (*RPCResponse, error)
SendRequest(request *RPCRequest) error
SendResponse(response *RPCResponse) error
Close()
}
type TCPConn struct {
conn net.Conn
ed IEncoderDecoder
// interface
IRPCConn
}
func (tc *TCPConn) ReceiveRequest() (ret *RPCRequest, err error) {
ret = new(RPCRequest)
err = nil
dec := tc.ed.NewDecoder(tc.conn)
if err = dec.Decode(ret); err != nil {
return nil, err
}
return ret, err
}
func (tc *TCPConn) ReceiveResponse() (ret *RPCResponse, err error) {
ret = new(RPCResponse)
err = nil
dec := tc.ed.NewDecoder(tc.conn)
if err = dec.Decode(ret); err != nil {
return nil, err
}
return ret, err
}
func (tc *TCPConn) SendRequest(request *RPCRequest) (err error) {
err = nil
enc := tc.ed.NewEncoder(tc.conn)
err = enc.Encode(request)
return err
}
func (tc *TCPConn) SendResponse(response *RPCResponse) (err error) {
err = nil
enc := tc.ed.NewEncoder(tc.conn)
err = enc.Encode(response)
return err
}
func (tc *TCPConn) Close() {
tc.conn.Close()
}
func NewTCPConn(conn net.Conn, ed IEncoderDecoder) *TCPConn {
ret := &TCPConn{}
ret.conn = conn
ret.ed = ed
return ret
}

115
server.go
View File

@ -10,24 +10,8 @@ import (
"sync" "sync"
) )
type argumentType uint8 type parseInputsFunc func(in []byte) (ret []reflect.Value, err error)
type parseOutputsFunc func(in []reflect.Value) (ret []byte, err error)
type responseStatus uint8
const (
OK responseStatus = 0
ERR responseStatus = 1
)
var RESPONSE_HEADER_SIZE = int32(9)
type responseHeader struct {
Size int64
Status responseStatus
}
type parseInputsFunc func(in []byte) (ret []reflect.Value)
type parseOutputsFunc func(in []reflect.Value) (ret []byte)
type service struct { type service struct {
f reflect.Value f reflect.Value
@ -42,33 +26,48 @@ type Server struct {
func (server *Server) ServeConn(conn net.Conn) { func (server *Server) ServeConn(conn net.Conn) {
var fs service var fs service
var request *RPCRequest
var err error
dec := server.ed.NewDecoder(conn) tcpConn := NewTCPConn(conn, server.ed)
defer tcpConn.Close()
var req request if request, err = tcpConn.ReceiveRequest(); err != nil {
dec.Decode(&req) slog.LOG_ERROR("srpc - Malformed request received: '%s'\n", err.Error())
return
}
header := responseHeader{} var response RPCResponse
var respData []byte response.Status = RPCOK
response.Error = ""
if fsRaw, ok := server.serviceMap.Load(req.FuncName); !ok { if fsRaw, ok := server.serviceMap.Load(request.FuncName); !ok {
slog.LOG_ERROR("srpc - Call to unknown method: '%s'\n", req.FuncName) slog.LOG_ERROR("srpc - Call to unknown method: '%s'\n", request.FuncName)
err := fmt.Sprintf("Unknown method: '%s'", req.FuncName) err := fmt.Sprintf("Unknown method: '%s'", request.FuncName)
header.Status = ERR response.Status = RPCERR
respData = append([]byte{}, []byte(err)...) response.Error = err
} else { } else {
fs = fsRaw.(service) fs = fsRaw.(service)
inputs := fs.fin(req.Payload) if inputs, err := fs.fin(request.Payload); err != nil {
outputs := fs.f.Call(inputs) slog.LOG_ERROR("srpc - Error parsing inputs '%s'\n", err.Error())
slog.LOG_DEBUG("%v\n", outputs) response.Status = RPCERR
header.Status = OK response.Error = err.Error()
respData = fs.fout(outputs) } else {
outputs := fs.f.Call(inputs)
if payload, err := fs.fout(outputs); err != nil {
slog.LOG_ERROR("srpc - Error parsing outputs '%s'\n", err.Error())
response.Status = RPCERR
response.Error = err.Error()
} else {
slog.LOG_DEBUG("%v\n", outputs)
response.Payload = append([]byte{}, payload...)
}
}
} }
header.Size = int64(len(respData))
enc := server.ed.NewEncoder(conn) if err = tcpConn.SendResponse(&response); err != nil {
enc.Encode(header) slog.LOG_ERROR("srpc - Error sending response '%s'\n", err.Error())
enc.Encode(respData) }
} }
func (server *Server) Accept(ln net.Listener) { func (server *Server) Accept(ln net.Listener) {
@ -93,52 +92,49 @@ func (server *Server) RegisterName(name string, rcvr interface{}) (err error) {
fs := service{} fs := service{}
fs.fin = func(in []byte) (ret []reflect.Value) { fs.fin = func(in []byte) (ret []reflect.Value, err error) {
var b bytes.Buffer var b bytes.Buffer
if _, err = b.Write(in); err != nil { if _, err = b.Write(in); err != nil {
return nil return nil, err
} }
decoder := server.ed.NewDecoder(&b) decoder := server.ed.NewDecoder(&b)
ret = make([]reflect.Value, nIn) ret = make([]reflect.Value, nIn)
for i := 0; i < nIn; i++ { for i := 0; i < nIn; i++ {
in := reflect.New(ft.In(i)) arg := reflect.New(ft.In(i))
if err = decoder.Decode(in.Interface()); err != nil { if err = decoder.Decode(arg.Interface()); err != nil {
fmt.Println(err) return nil, err
return nil
} }
ret[i] = reflect.Indirect(in) ret[i] = reflect.Indirect(arg)
} }
return ret return ret, nil
} }
fs.fout = func(in []reflect.Value) (ret []byte) { fs.fout = func(in []reflect.Value) (ret []byte, err error) {
var b bytes.Buffer var b bytes.Buffer
encoder := server.ed.NewEncoder(&b) encoder := server.ed.NewEncoder(&b)
for _, v := range in { for _, v := range in {
if v.Type() == reflect.TypeOf((*error)(nil)).Elem() { if v.Type() == reflect.TypeOf((*error)(nil)).Elem() {
if v.IsNil() { if v.IsNil() {
if err := encoder.Encode(string("")); err != nil { if err = encoder.Encode(string("")); err != nil {
fmt.Println(err) return nil, err
return nil
} }
} else { } else {
if err := encoder.Encode(v.Interface().(error).Error()); err != nil { if err = encoder.Encode(v.Interface().(error).Error()); err != nil {
fmt.Println(err) fmt.Println(err)
return nil return nil, err
} }
} }
} else { } else {
if err := encoder.Encode(v.Interface()); err != nil { if err = encoder.Encode(v.Interface()); err != nil {
fmt.Println(err) return nil, err
return nil
} }
} }
} }
return b.Bytes() return b.Bytes(), nil
} }
fs.f = fv fs.f = fv
@ -163,9 +159,12 @@ func (server *Server) CallName(name string, args ...interface{}) (ret []byte, er
return nil, errors.New("srpc - Error: '" + err.Error() + "'") return nil, errors.New("srpc - Error: '" + err.Error() + "'")
} }
} }
inputs := fs.fin(b.Bytes()) if inputs, err := fs.fin(b.Bytes()); err != nil {
outputs := fs.f.Call(inputs) return nil, err
return fs.fout(outputs), nil } else {
outputs := fs.f.Call(inputs)
return fs.fout(outputs)
}
} }
func NewServer(ed IEncoderDecoder) *Server { func NewServer(ed IEncoderDecoder) *Server {