Susumu Yata
null+****@clear*****
Fri Jun 9 09:39:36 JST 2017
Susumu Yata 2017-06-09 09:39:36 +0900 (Fri, 09 Jun 2017) New Revision: 5380ae8d86db59449b39a2c270ed970b320c0815 https://github.com/groonga/grnci/commit/5380ae8d86db59449b39a2c270ed970b320c0815 Message: Put the base implementations of v2. The v2 provides GQTP and HTTP clients without libgroonga. The v2/libgrn provides GQTP clients and local DB handles using libgroonga. Added files: v2/README.md v2/address_test.go v2/doc.go v2/error_test.go v2/gqtp_test.go v2/http_test.go v2/libgrn/client_test.go v2/libgrn/conn.go v2/libgrn/conn_test.go v2/libgrn/response.go v2/request_test.go v2/rule.go Removed files: v2/argument.go v2/client.go v2/libgrn/handle.go Modified files: v2/address.go v2/error.go v2/gqtp.go v2/http.go v2/libgrn/client.go v2/libgrn/libgrn.go v2/request.go v2/response.go Added: v2/README.md (+2 -0) 100644 =================================================================== --- /dev/null +++ v2/README.md 2017-06-09 09:39:36 +0900 (e4e420c) @@ -0,0 +1,2 @@ +# grnci +Development repository for grnci ver. 2 Modified: v2/address.go (+142 -74) =================================================================== --- v2/address.go 2017-05-17 19:23:04 +0900 (024a332) +++ v2/address.go 2017-06-09 09:39:36 +0900 (64275ab) @@ -1,17 +1,15 @@ package grnci import ( - "fmt" "net" "strconv" "strings" ) -// Address represents a parsed address. +// Address is a parsed address. // The expected address format is -// [scheme://][username[:password]@]host[:port][path][?query][#fragment]. +// [scheme://][username[:password]@][host][:port][path][?query][#fragment]. type Address struct { - Raw string Scheme string Username string Password string @@ -22,113 +20,95 @@ type Address struct { Fragment string } -// String assembles the address fields except Raw into an address string. -func (a *Address) String() string { - var url string - if a.Scheme != "" { - url += a.Scheme + "://" - } - if a.Password != "" { - url += a.Username + ":" + a.Password + "@" - } else if a.Username != "" { - url += a.Username + "@" - } - url += a.Host - if a.Port != 0 { - url += ":" + strconv.Itoa(a.Port) - } - url += a.Path - if a.Query != "" { - url += "?" + a.Query - } - if a.Fragment != "" { - url += "#" + a.Fragment - } - return url -} - +// Address default settings. const ( - gqtpScheme = "gqtp" - gqtpDefaultHost = "localhost" - gqtpDefaultPort = 10043 + DefaultScheme = "gqtp" + DefaultHost = "localhost" + GQTPDefaultPort = 10043 + HTTPDefaultPort = 10041 + HTTPDefaultPath = "/d/" ) // fillGQTP checks fields and fills missing fields in a GQTP address. func (a *Address) fillGQTP() error { + if a.Scheme == "" { + a.Scheme = "gqtp" + } if a.Username != "" { - return fmt.Errorf("invalid username: raw = %s", a.Raw) + return NewError(StatusInvalidAddress, map[string]interface{}{ + "username": a.Username, + "error": "GQTP does not accept username.", + }) } if a.Password != "" { - return fmt.Errorf("invalid password: raw = %s", a.Raw) + return NewError(StatusInvalidAddress, map[string]interface{}{ + "password": a.Password, + "error": "GQTP does not accept password.", + }) } if a.Host == "" { - a.Host = gqtpDefaultHost + a.Host = DefaultHost } if a.Port == 0 { - a.Port = gqtpDefaultPort + a.Port = GQTPDefaultPort } if a.Path != "" { - return fmt.Errorf("invalid path: raw = %s", a.Raw) + return NewError(StatusInvalidAddress, map[string]interface{}{ + "path": a.Path, + "error": "GQTP does not accept path.", + }) } if a.Query != "" { - return fmt.Errorf("invalid query: raw = %s", a.Raw) + return NewError(StatusInvalidAddress, map[string]interface{}{ + "query": a.Query, + "error": "GQTP does not accept query.", + }) } if a.Fragment != "" { - return fmt.Errorf("invalid fragment: raw = %s", a.Raw) + return NewError(StatusInvalidAddress, map[string]interface{}{ + "fragment": a.Fragment, + "error": "GQTP does not accept fragment.", + }) } return nil } -const ( - httpScheme = "http" - httpsScheme = "https" - httpDefaultHost = "localhost" - httpDefaultPort = 10041 - httpDefaultPath = "/d/" -) - // fillHTTP checks fields and fills missing fields in an HTTP address. func (a *Address) fillHTTP() error { + if a.Scheme == "" { + a.Scheme = "http" + } if a.Host == "" { - a.Host = httpDefaultHost + a.Host = DefaultHost } if a.Port == 0 { - a.Port = httpDefaultPort + a.Port = HTTPDefaultPort } if a.Path == "" { - a.Path = httpDefaultPath - } - if a.Query != "" { - return fmt.Errorf("invalid query: raw = %s", a.Raw) - } - if a.Fragment != "" { - return fmt.Errorf("invalid fragment: raw = %s", a.Raw) + a.Path = HTTPDefaultPath } return nil } -const ( - defaultScheme = gqtpScheme -) - // fill checks fields and fills missing fields. func (a *Address) fill() error { if a.Scheme == "" { - a.Scheme = defaultScheme - } else { - a.Scheme = strings.ToLower(a.Scheme) + a.Scheme = DefaultScheme } - switch a.Scheme { - case gqtpScheme: + switch strings.ToLower(a.Scheme) { + case "gqtp": if err := a.fillGQTP(); err != nil { return err } - case httpScheme, httpsScheme: + case "http", "https": if err := a.fillHTTP(); err != nil { return err } default: - return fmt.Errorf("invalid scheme: raw = %s", a.Raw) + return NewError(StatusInvalidAddress, map[string]interface{}{ + "scheme": a.Scheme, + "error": "The scheme is not supported.", + }) } return nil } @@ -142,7 +122,10 @@ func (a *Address) parseHostPort(s string) error { if s[0] == '[' { i := strings.IndexByte(s, ']') if i == -1 { - return fmt.Errorf("missing ']': s = %s", s) + return NewError(StatusInvalidAddress, map[string]interface{}{ + "address": s, + "error": "IPv6 address must be enclosed in [].", + }) } a.Host = s[:i+1] rest := s[i+1:] @@ -150,7 +133,10 @@ func (a *Address) parseHostPort(s string) error { return nil } if rest[0] != ':' { - return fmt.Errorf("missing ':' after ']': s = %s", s) + return NewError(StatusInvalidAddress, map[string]interface{}{ + "address": s, + "error": "IPv6 address and port must be separated by ':'.", + }) } portStr = rest[1:] } else { @@ -165,18 +151,21 @@ func (a *Address) parseHostPort(s string) error { if portStr != "" { port, err := net.LookupPort("tcp", portStr) if err != nil { - return fmt.Errorf("net.LookupPort failed: %v", err) + return NewError(StatusInvalidAddress, map[string]interface{}{ + "port": portStr, + "error": err.Error(), + }) } a.Port = port } return nil } -// ParseAddress parses an address. +// parseAddress parses an address. // The expected address format is -// [scheme://][username[:password]@]host[:port][path][?query][#fragment]. -func ParseAddress(s string) (*Address, error) { - a := &Address{Raw: s} +// [scheme://][username[:password]@]host[:port][path]. +func parseAddress(s string) (*Address, error) { + a := new(Address) if i := strings.IndexByte(s, '#'); i != -1 { a.Fragment = s[i+1:] s = s[:i] @@ -207,8 +196,87 @@ func ParseAddress(s string) (*Address, error) { if err := a.parseHostPort(s); err != nil { return nil, err } + return a, nil +} + +// ParseAddress parses an address. +// The expected address format is +// [scheme://][username[:password]@][host][:port][path][?query][#fragment]. +func ParseAddress(s string) (*Address, error) { + a, err := parseAddress(s) + if err != nil { + return nil, err + } if err := a.fill(); err != nil { return nil, err } return a, nil } + +// ParseGQTPAddress parses a GQTP address. +// The expected address format is [scheme://][host][:port]. +func ParseGQTPAddress(s string) (*Address, error) { + a, err := parseAddress(s) + if err != nil { + return nil, err + } + switch strings.ToLower(a.Scheme) { + case "", "gqtp": + default: + return nil, NewError(StatusInvalidAddress, map[string]interface{}{ + "scheme": a.Scheme, + "error": "The scheme is not supported.", + }) + } + if err := a.fillGQTP(); err != nil { + return nil, err + } + return a, nil +} + +// ParseHTTPAddress parses an HTTP address. +// The expected address format is +// [scheme://][username[:password]@][host][:port][path][?query][#fragment]. +func ParseHTTPAddress(s string) (*Address, error) { + a, err := parseAddress(s) + if err != nil { + return nil, err + } + switch strings.ToLower(a.Scheme) { + case "", "http", "https": + default: + return nil, NewError(StatusInvalidAddress, map[string]interface{}{ + "scheme": a.Scheme, + "error": "The scheme is not supported.", + }) + } + if err := a.fillHTTP(); err != nil { + return nil, err + } + return a, nil +} + +// String assembles the fields into an address. +func (a *Address) String() string { + var url string + if a.Scheme != "" { + url += a.Scheme + "://" + } + if a.Password != "" { + url += a.Username + ":" + a.Password + "@" + } else if a.Username != "" { + url += a.Username + "@" + } + url += a.Host + if a.Port != 0 { + url += ":" + strconv.Itoa(a.Port) + } + url += a.Path + if a.Query != "" { + url += "?" + a.Query + } + if a.Fragment != "" { + url += "#" + a.Fragment + } + return url +} Added: v2/address_test.go (+116 -0) 100644 =================================================================== --- /dev/null +++ v2/address_test.go 2017-06-09 09:39:36 +0900 (e1d371a) @@ -0,0 +1,116 @@ +package grnci + +import ( + "fmt" + "sort" + "testing" +) + +func TestParseAddress(t *testing.T) { + data := map[string]string{ + "": fmt.Sprintf("%s://%s:%d%s", + DefaultScheme, DefaultHost, GQTPDefaultPort, ""), + "gqtp://": fmt.Sprintf("%s://%s:%d%s", + DefaultScheme, DefaultHost, GQTPDefaultPort, ""), + "http://": fmt.Sprintf("%s://%s:%d%s", + "http", DefaultHost, HTTPDefaultPort, HTTPDefaultPath), + "https://": fmt.Sprintf("%s://%s:%d%s", + "https", DefaultHost, HTTPDefaultPort, HTTPDefaultPath), + "example.com": fmt.Sprintf("%s://%s:%d%s", + DefaultScheme, "example.com", GQTPDefaultPort, ""), + ":8080": fmt.Sprintf("%s://%s:%d%s", + DefaultScheme, DefaultHost, 8080, ""), + } + var keys []string + for key := range data { + keys = append(keys, key) + } + sort.Strings(keys) + for _, src := range keys { + want := data[src] + addr, err := ParseAddress(src) + actual := addr.String() + if err != nil { + t.Fatalf("ParseAddress failed: src = %s, actual = %s, err = %v", + src, actual, err) + } + if addr.String() != want { + t.Fatalf("ParseAddress failed: src = %s, actual = %s, want = %s", + src, actual, want) + } + } +} + +func TestParseGQTPAddress(t *testing.T) { + data := map[string]string{ + "": fmt.Sprintf("%s://%s:%d%s", + "gqtp", DefaultHost, GQTPDefaultPort, ""), + "gqtp://": fmt.Sprintf("%s://%s:%d%s", + "gqtp", DefaultHost, GQTPDefaultPort, ""), + "example.com": fmt.Sprintf("%s://%s:%d%s", + "gqtp", "example.com", GQTPDefaultPort, ""), + ":8080": fmt.Sprintf("%s://%s:%d%s", + "gqtp", DefaultHost, 8080, ""), + "example.com:8080": fmt.Sprintf("%s://%s:%d%s", + "gqtp", "example.com", 8080, ""), + } + var keys []string + for key := range data { + keys = append(keys, key) + } + sort.Strings(keys) + for _, src := range keys { + want := data[src] + addr, err := ParseGQTPAddress(src) + actual := addr.String() + if err != nil { + t.Fatalf("ParseGQTPAddress failed: src = %s, actual = %s, err = %v", + src, actual, err) + } + if addr.String() != want { + t.Fatalf("ParseGQTPAddress failed: src = %s, actual = %s, want = %s", + src, actual, want) + } + } +} + +func TestParseHTTPAddress(t *testing.T) { + data := map[string]string{ + "": fmt.Sprintf("%s://%s:%d%s", + "http", DefaultHost, HTTPDefaultPort, HTTPDefaultPath), + "https://": fmt.Sprintf("%s://%s:%d%s", + "https", DefaultHost, HTTPDefaultPort, HTTPDefaultPath), + "example.com": fmt.Sprintf("%s://%s:%d%s", + "http", "example.com", HTTPDefaultPort, HTTPDefaultPath), + ":8080": fmt.Sprintf("%s://%s:%d%s", + "http", DefaultHost, 8080, HTTPDefaultPath), + "http://example.com": fmt.Sprintf("%s://%s:%d%s", + "http", "example.com", HTTPDefaultPort, HTTPDefaultPath), + "http://example.com:8080": fmt.Sprintf("%s://%s:%d%s", + "http", "example.com", 8080, HTTPDefaultPath), + "http://example.com:8080/": fmt.Sprintf("%s://%s:%d%s", + "http", "example.com", 8080, "/"), + "http://:8080": fmt.Sprintf("%s://%s:%d%s", + "http", DefaultHost, 8080, HTTPDefaultPath), + "http://:8080/": fmt.Sprintf("%s://%s:%d%s", + "http", DefaultHost, 8080, "/"), + } + var keys []string + for key := range data { + keys = append(keys, key) + } + sort.Strings(keys) + for _, src := range keys { + want := data[src] + addr, err := ParseHTTPAddress(src) + actual := addr.String() + if err != nil { + t.Fatalf("ParseHTTPAddress failed: src = %s, actual = %s, err = %v", + src, actual, err) + } + if addr.String() != want { + t.Fatalf("ParseHTTPAddress failed: src = %s, actual = %s, want = %s", + src, actual, want) + } + } +} Deleted: v2/argument.go (+0 -60) 100644 =================================================================== --- v2/argument.go 2017-05-17 19:23:04 +0900 (8862bcd) +++ /dev/null @@ -1,60 +0,0 @@ -package grnci - -import ( - "errors" - "fmt" -) - -// Argument stores a command argument. -// -// If Key != "", it is a named argument. -// Otherwise, it is an unnamed argument. -// Note that the order of unnamed arguments is important. -type Argument struct { - Key string - Value string -} - -// isDigit checks if c is a digit. -func isDigit(c byte) bool { - return c >= '0' && c <= '9' -} - -// isAlpha checks if c is an alphabet. -func isAlpha(c byte) bool { - return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') -} - -// isAlnum checks if c is a digit or alphabet. -func isAlnum(c byte) bool { - return isDigit(c) || isAlpha(c) -} - -// checkArgumentKey checks if s is valid as an argument key. -func checkArgumentKey(s string) error { - if s == "" { - return errors.New("invalid format: s = ") - } - if !isAlpha(s[0]) { - return fmt.Errorf("invalid format: s = %s", s) - } - for i := 1; i < len(s); i++ { - if isAlnum(s[i]) { - continue - } - switch s[i] { - case '#', '@', '-', '_', '.', '[', ']': - default: - return fmt.Errorf("invalid format: s = %s", s) - } - } - return nil -} - -// Check checks if arg is valid. -func (a *Argument) Check() error { - if err := checkArgumentKey(a.Key); err != nil { - return fmt.Errorf("checkArgumentKey failed: %v", err) - } - return nil -} Deleted: v2/client.go (+0 -88) 100644 =================================================================== --- v2/client.go 2017-05-17 19:23:04 +0900 (9f46eaa) +++ /dev/null @@ -1,88 +0,0 @@ -package grnci - -import ( - "fmt" - "net" - "net/http" -) - -// iClient is a Groonga client interface. -type iClient interface { - Close() error - Query(*Request) (*Response, error) -} - -// Client is a Groonga client. -type Client struct { - impl iClient -} - -// NewClient returns a new Client using an existing client. -func NewClient(c iClient) *Client { - return &Client{impl: c} -} - -// NewClientByAddress returns a new Client to access a Groonga server. -func NewClientByAddress(addr string) (*Client, error) { - a, err := ParseAddress(addr) - if err != nil { - return nil, err - } - switch a.Scheme { - case gqtpScheme: - c, err := dialGQTP(a) - if err != nil { - return nil, err - } - return NewClient(c), nil - case httpScheme, httpsScheme: - c, err := newHTTPClient(a, nil) - if err != nil { - return nil, err - } - return NewClient(c), nil - default: - return nil, fmt.Errorf("invalid scheme: raw = %s", a.Raw) - } -} - -// NewGQTPClient returns a new Client using an existing connection. -func NewGQTPClient(conn net.Conn) (*Client, error) { - c, err := newGQTPClient(conn) - if err != nil { - return nil, err - } - return NewClient(c), nil -} - -// NewHTTPClient returns a new Client using an existing HTTP client. -// If client is nil, NewHTTPClient uses http.DefaultClient. -func NewHTTPClient(addr string, client *http.Client) (*Client, error) { - a, err := ParseAddress(addr) - if err != nil { - return nil, err - } - if client == nil { - client = http.DefaultClient - } - switch a.Scheme { - case httpScheme, httpsScheme: - default: - return nil, fmt.Errorf("invalid scheme: raw = %s", a.Raw) - } - c, err := newHTTPClient(a, client) - if err != nil { - return nil, err - } - return NewClient(c), nil -} - -// Close closes a client. -func (c *Client) Close() error { - return c.impl.Close() -} - -// Query sends a request and receives a response. -func (c *Client) Query(req *Request) (*Response, error) { - return c.impl.Query(req) -} Added: v2/doc.go (+7 -0) 100644 =================================================================== --- /dev/null +++ v2/doc.go 2017-06-09 09:39:36 +0900 (129abb0) @@ -0,0 +1,7 @@ +/* +Package grnci provides Groonga clients. + +This package is experimental and supports only a subset of Groonga commands. +See http://groonga.org/docs/reference/command.html for details about Groonga commands. +*/ +package grnci Modified: v2/error.go (+227 -20) =================================================================== --- v2/error.go 2017-05-17 19:23:04 +0900 (876af5f) +++ v2/error.go 2017-06-09 09:39:36 +0900 (2f50085) @@ -1,42 +1,249 @@ package grnci -import "encoding/json" - -// Code is an error code. -type Code int +import ( + "encoding/json" + "net/http" +) -// List of error codes. +// Error status codes. const ( - CodeSuccess = Code(0) + StatusInvalidAddress = 1000 + iota + StatusInvalidCommand + StatusInvalidOperation + StatusInvalidResponse + StatusNetworkError + StatusUnknownError ) -// String returns a string which briefly describes an error. -func (c Code) String() string { - switch c { - case CodeSuccess: - return "Success" +// StatusText returns a status text. +func StatusText(status int) string { + text := http.StatusText(status) + if text != "" { + return text + } + switch status { + case 0: + return "GRN_SUCCESS" + case 1: + return "GRN_END_OF_DATA" + case -1: + return "GRN_UNKNOWN_ERROR" + case -2: + return "GRN_OPERATION_NOT_PERMITTED" + case -3: + return "GRN_NO_SUCH_FILE_OR_DIRECTORY" + case -4: + return "GRN_NO_SUCH_PROCESS" + case -5: + return "GRN_INTERRUPTED_FUNCTION_CALL" + case -6: + return "GRN_INPUT_OUTPUT_ERROR" + case -7: + return "GRN_NO_SUCH_DEVICE_OR_ADDRESS" + case -8: + return "GRN_ARG_LIST_TOO_LONG" + case -9: + return "GRN_EXEC_FORMAT_ERROR" + case -10: + return "GRN_BAD_FILE_DESCRIPTOR" + case -11: + return "GRN_NO_CHILD_PROCESSES" + case -12: + return "GRN_RESOURCE_TEMPORARILY_UNAVAILABLE" + case -13: + return "GRN_NOT_ENOUGH_SPACE" + case -14: + return "GRN_PERMISSION_DENIED" + case -15: + return "GRN_BAD_ADDRESS" + case -16: + return "GRN_RESOURCE_BUSY" + case -17: + return "GRN_FILE_EXISTS" + case -18: + return "GRN_IMPROPER_LINK" + case -19: + return "GRN_NO_SUCH_DEVICE" + case -20: + return "GRN_NOT_A_DIRECTORY" + case -21: + return "GRN_IS_A_DIRECTORY" + case -22: + return "GRN_INVALID_ARGUMENT" + case -23: + return "GRN_TOO_MANY_OPEN_FILES_IN_SYSTEM" + case -24: + return "GRN_TOO_MANY_OPEN_FILES" + case -25: + return "GRN_INAPPROPRIATE_I_O_CONTROL_OPERATION" + case -26: + return "GRN_FILE_TOO_LARGE" + case -27: + return "GRN_NO_SPACE_LEFT_ON_DEVICE" + case -28: + return "GRN_INVALID_SEEK" + case -29: + return "GRN_READ_ONLY_FILE_SYSTEM" + case -30: + return "GRN_TOO_MANY_LINKS" + case -31: + return "GRN_BROKEN_PIPE" + case -32: + return "GRN_DOMAIN_ERROR" + case -33: + return "GRN_RESULT_TOO_LARGE" + case -34: + return "GRN_RESOURCE_DEADLOCK_AVOIDED" + case -35: + return "GRN_NO_MEMORY_AVAILABLE" + case -36: + return "GRN_FILENAME_TOO_LONG" + case -37: + return "GRN_NO_LOCKS_AVAILABLE" + case -38: + return "GRN_FUNCTION_NOT_IMPLEMENTED" + case -39: + return "GRN_DIRECTORY_NOT_EMPTY" + case -40: + return "GRN_ILLEGAL_BYTE_SEQUENCE" + case -41: + return "GRN_SOCKET_NOT_INITIALIZED" + case -42: + return "GRN_OPERATION_WOULD_BLOCK" + case -43: + return "GRN_ADDRESS_IS_NOT_AVAILABLE" + case -44: + return "GRN_NETWORK_IS_DOWN" + case -45: + return "GRN_NO_BUFFER" + case -46: + return "GRN_SOCKET_IS_ALREADY_CONNECTED" + case -47: + return "GRN_SOCKET_IS_NOT_CONNECTED" + case -48: + return "GRN_SOCKET_IS_ALREADY_SHUTDOWNED" + case -49: + return "GRN_OPERATION_TIMEOUT" + case -50: + return "GRN_CONNECTION_REFUSED" + case -51: + return "GRN_RANGE_ERROR" + case -52: + return "GRN_TOKENIZER_ERROR" + case -53: + return "GRN_FILE_CORRUPT" + case -54: + return "GRN_INVALID_FORMAT" + case -55: + return "GRN_OBJECT_CORRUPT" + case -56: + return "GRN_TOO_MANY_SYMBOLIC_LINKS" + case -57: + return "GRN_NOT_SOCKET" + case -58: + return "GRN_OPERATION_NOT_SUPPORTED" + case -59: + return "GRN_ADDRESS_IS_IN_USE" + case -60: + return "GRN_ZLIB_ERROR" + case -61: + return "GRN_LZ4_ERROR" + case -62: + return "GRN_STACK_OVER_FLOW" + case -63: + return "GRN_SYNTAX_ERROR" + case -64: + return "GRN_RETRY_MAX" + case -65: + return "GRN_INCOMPATIBLE_FILE_FORMAT" + case -66: + return "GRN_UPDATE_NOT_ALLOWED" + case -67: + return "GRN_TOO_SMALL_OFFSET" + case -68: + return "GRN_TOO_LARGE_OFFSET" + case -69: + return "GRN_TOO_SMALL_LIMIT" + case -70: + return "GRN_CAS_ERROR" + case -71: + return "GRN_UNSUPPORTED_COMMAND_VERSION" + case -72: + return "GRN_NORMALIZER_ERROR" + case -73: + return "GRN_TOKEN_FILTER_ERROR" + case -74: + return "GRN_COMMAND_ERROR" + case -75: + return "GRN_PLUGIN_ERROR" + case -76: + return "GRN_SCORER_ERROR" + case -77: + return "GRN_CANCEL" + case -78: + return "GRN_WINDOW_FUNCTION_ERROR" + case -79: + return "GRN_ZSTD_ERROR" + + case StatusInvalidAddress: + return "invalid address" + case StatusInvalidCommand: + return "invalid command" + case StatusInvalidOperation: + return "invalid operation" + case StatusInvalidResponse: + return "invalid response" + case StatusNetworkError: + return "network error" + case StatusUnknownError: + return "unknown error" + default: - return "Unknown error" + return "undefined error" } } -// Error stores details of an error. +// Error stores an error. type Error struct { - Code Code - Message string - Data interface{} + Code int `json:"code"` + Text string `json:"text"` + Data map[string]interface{} `json:"data,omitempty"` } -// NewError creates an error object. -func NewError(code Code, data interface{}) error { +// NewError returns a new Error. +func NewError(code int, data map[string]interface{}) *Error { return &Error{ Code: code, + Text: StatusText(code), Data: data, } } -// Error returns a string which describes an error. -func (e Error) Error() string { +// EnhanceError adds data to err and returns it. +// Note that the arguments err and data may be modified. +func EnhanceError(err error, data map[string]interface{}) *Error { + if err, ok := err.(*Error); ok { + if err.Data == nil { + err.Data = data + } else { + for k, v := range data { + err.Data[k] = v + } + } + return err + } + if data == nil { + data = map[string]interface{}{ + "error": err.Error(), + } + } else if _, ok := data["error"]; !ok { + data["error"] = err.Error() + } + return NewError(StatusUnknownError, data) +} + +// Error returns a string which describes the Error. +func (e *Error) Error() string { b, _ := json.Marshal(e) return string(b) } Added: v2/error_test.go (+46 -0) 100644 =================================================================== --- /dev/null +++ v2/error_test.go 2017-06-09 09:39:36 +0900 (2ab83c5) @@ -0,0 +1,46 @@ +package grnci + +import "testing" + +func TestNewError(t *testing.T) { + err := NewError(StatusInvalidAddress, map[string]interface{}{ + "key": "value", + }) + if err.Code != StatusInvalidAddress { + t.Fatalf("NewError failed: Code: actual = %d, want = %d", + err.Code, StatusInvalidAddress) + } + if err.Text != StatusText(StatusInvalidAddress) { + t.Fatalf("NewError failed: Text: actual = %s, want = %s", + err.Text, StatusText(StatusInvalidAddress)) + } + if err.Data["key"] != "value" { + t.Fatalf("NewError failed: Data[\"key\"]: actual = %s, want = %s", + err.Data["key"], "value") + } +} + +func TestEnhanceError(t *testing.T) { + err := NewError(StatusInvalidAddress, map[string]interface{}{ + "key": "value", + }) + err = EnhanceError(err, map[string]interface{}{ + "newKey": "newValue", + }) + if err.Code != StatusInvalidAddress { + t.Fatalf("NewError failed: Code: actual = %d, want = %d", + err.Code, StatusInvalidAddress) + } + if err.Text != StatusText(StatusInvalidAddress) { + t.Fatalf("NewError failed: Text: actual = %s, want = %s", + err.Text, StatusText(StatusInvalidAddress)) + } + if err.Data["key"] != "value" { + t.Fatalf("NewError failed: Data[\"key\"]: actual = %s, want = %s", + err.Data["key"], "value") + } + if err.Data["newKey"] != "newValue" { + t.Fatalf("NewError failed: Data[\"newKey\"]: actual = %s, want = %s", + err.Data["newKey"], "newValue") + } +} Modified: v2/gqtp.go (+433 -15) =================================================================== --- v2/gqtp.go 2017-05-17 19:23:04 +0900 (3075594) +++ v2/gqtp.go 2017-06-09 09:39:36 +0900 (6173c25) @@ -1,43 +1,461 @@ package grnci import ( + "encoding/binary" "fmt" + "io" + "io/ioutil" "net" + "strings" + "time" ) -// gqtpClient is a GQTP client. -type gqtpClient struct { - conn net.Conn +// Constants for gqtpHeader. +const ( + gqtpProtocol = byte(0xc7) + + // gqtpQueryTypeNone = byte(0) + // gqtpQueryTypeTSV = byte(1) + // gqtpQueryTypeJSON = byte(2) + // gqtpQueryTypeXML = byte(3) + // gqtpQueryTypeMsgPack = byte(4) + + // gqtpFlagMore = byte(0x01) + gqtpFlagTail = byte(0x02) + // gqtpFlagHead = byte(0x04) + // gqtpFlagQuiet = byte(0x08) + // gqtpFlagQuit = byte(0x10) +) + +const ( + gqtpMaxChunkSize = 1 << 30 // Maximum chunk size + gqtpDefaultBufferSize = 1 << 16 // Default buffer size + gqtpMaxIdleConns = 2 // Maximum number of idle connections +) + +// gqtpHeader is a GQTP header. +type gqtpHeader struct { + Protocol byte // Must be 0xc7 + QueryType byte // Body type + KeyLength uint16 // Unused + Level byte // Unused + Flags byte // Flags + Status uint16 // Return code + Size uint32 // Body size + Opaque uint32 // Unused + CAS uint64 // Unused } -// dialGQTP returns a new gqtpClient connected to a GQTP server. -func dialGQTP(a *Address) (*gqtpClient, error) { - conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", a.Host, a.Port)) +// gqtpResponse is a GQTP response. +type gqtpResponse struct { + client *GQTPClient // Client + conn *GQTPConn // Connection + head gqtpHeader // Current header + start time.Time // Start time + elapsed time.Duration // Elapsed time + err error // Error response + left int // Number of bytes left in the current chunk + broken bool // Whether or not the connection is broken + closed bool // Whether or not the response is closed +} + +// newGQTPResponse returns a new GQTP response. +func newGQTPResponse(conn *GQTPConn, head gqtpHeader, start time.Time, name string) *gqtpResponse { + resp := &gqtpResponse{ + conn: conn, + head: head, + start: start, + elapsed: time.Now().Sub(start), + left: int(head.Size), + } + if head.Status > 32767 { + status := int(head.Status) - 65536 + resp.err = NewError(status, nil) + if _, ok := CommandRules[name]; !ok { + data, err := ioutil.ReadAll(resp) + if err != nil { + resp.broken = true + } else { + resp.err = EnhanceError(resp.err, map[string]interface{}{ + "error": string(data), + }) + } + } + } + return resp +} + +func (r *gqtpResponse) Status() int { + if err, ok := r.err.(*Error); ok { + return err.Code + } + return 0 +} + +func (r *gqtpResponse) Start() time.Time { + return r.start +} + +func (r *gqtpResponse) Elapsed() time.Duration { + return r.elapsed +} + +func (r *gqtpResponse) Read(p []byte) (int, error) { + if r.closed { + return 0, io.EOF + } + for r.left == 0 { + if r.head.Flags&gqtpFlagTail != 0 { + return 0, io.EOF + } + head, err := r.conn.recvHeader() + if err != nil { + r.broken = true + return 0, err + } + r.head = head + r.left = int(head.Size) + } + if r.left < len(p) { + p = p[:r.left] + } + n, err := r.conn.conn.Read(p) + r.left -= n + if err == io.EOF { + return n, io.EOF + } + if err != nil { + r.broken = true + return n, NewError(StatusNetworkError, map[string]interface{}{ + "method": "net.Conn.Read", + "n": n, + "error": err.Error(), + }) + } + return n, nil +} + +func (r *gqtpResponse) Close() error { + if r.closed { + return nil + } + var err error + if _, e := io.CopyBuffer(ioutil.Discard, r, r.conn.getBuffer()); e != nil { + r.broken = true + err = NewError(StatusNetworkError, map[string]interface{}{ + "method": "io.CopyBuffer", + "error": err.Error(), + }) + } + r.closed = true + if err == nil { + r.conn.ready = true + } + if r.client != nil { + // Broken connections are closed. + if r.broken { + if e := r.conn.Close(); e != nil && err != nil { + err = e + } + } + select { + case r.client.idleConns <- r.conn: + default: + if e := r.conn.Close(); e != nil && err != nil { + err = e + } + } + } + return err +} + +func (r *gqtpResponse) Err() error { + return r.err +} + +// GQTPConn is a thread-unsafe GQTP client. +type GQTPConn struct { + conn net.Conn // Connection to a GQTP server + buf []byte // Copy buffer + bufSize int // Copy buffer size + ready bool // Whether or not Exec and Query are ready +} + +// DialGQTP returns a new GQTPConn connected to a GQTP server. +// The expected address format is [scheme://][host][:port]. +func DialGQTP(addr string) (*GQTPConn, error) { + a, err := ParseGQTPAddress(addr) if err != nil { return nil, err } - return newGQTPClient(conn) + conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", a.Host, a.Port)) + if err != nil { + return nil, NewError(StatusNetworkError, map[string]interface{}{ + "host": a.Host, + "port": a.Port, + "error": err.Error(), + }) + } + return NewGQTPConn(conn), nil } -// newGQTPClient returns a new gqtpClient using an existing connection. -func newGQTPClient(conn net.Conn) (*gqtpClient, error) { - return &gqtpClient{conn: conn}, nil +// NewGQTPConn returns a new GQTPConn using an existing connection. +func NewGQTPConn(conn net.Conn) *GQTPConn { + return &GQTPConn{ + conn: conn, + bufSize: gqtpDefaultBufferSize, + ready: true, + } } // Close closes the connection. -func (c *gqtpClient) Close() error { +func (c *GQTPConn) Close() error { if err := c.conn.Close(); err != nil { + return NewError(StatusNetworkError, map[string]interface{}{ + "method": "net.Conn.Close", + "error": err.Error(), + }) + } + return nil +} + +// SetBufferSize updates the size of the copy buffer. +func (c *GQTPConn) SetBufferSize(n int) { + if n <= 0 || n > gqtpMaxChunkSize { + n = gqtpDefaultBufferSize + } + c.bufSize = n +} + +// getBuffer returns the copy buffer. +func (c *GQTPConn) getBuffer() []byte { + if len(c.buf) != c.bufSize { + c.buf = make([]byte, c.bufSize) + } + return c.buf +} + +// sendHeader sends a GQTP header. +func (c *GQTPConn) sendHeader(flags byte, size int) error { + head := gqtpHeader{ + Protocol: gqtpProtocol, + Flags: flags, + Size: uint32(size), + } + if err := binary.Write(c.conn, binary.BigEndian, head); err != nil { + return NewError(StatusNetworkError, map[string]interface{}{ + "method": "binary.Write", + "error": err.Error(), + }) + } + return nil +} + +// sendChunkBytes sends data with flags. +func (c *GQTPConn) sendChunkBytes(data []byte, flags byte) error { + if err := c.sendHeader(flags, len(data)); err != nil { return err } + if _, err := c.conn.Write(data); err != nil { + return NewError(StatusNetworkError, map[string]interface{}{ + "method": "net.Conn.Write", + "error": err.Error(), + }) + } + return nil +} + +// sendChunkString sends data with flags. +func (c *GQTPConn) sendChunkString(data string, flags byte) error { + if err := c.sendHeader(flags, len(data)); err != nil { + return err + } + if _, err := io.WriteString(c.conn, data); err != nil { + return NewError(StatusNetworkError, map[string]interface{}{ + "method": "io.WriteString", + "error": err.Error(), + }) + } return nil } +// recvHeader receives a GQTP header. +func (c *GQTPConn) recvHeader() (gqtpHeader, error) { + var head gqtpHeader + if err := binary.Read(c.conn, binary.BigEndian, &head); err != nil { + return head, NewError(StatusNetworkError, map[string]interface{}{ + "method": "binary.Read", + "error": err.Error(), + }) + } + return head, nil +} + +// exec sends a command without body and receives a response. +func (c *GQTPConn) exec(cmd string) (Response, error) { + start := time.Now() + name := strings.TrimLeft(cmd, " \t\r\n") + if idx := strings.IndexAny(name, " \t\r\n"); idx != -1 { + name = name[:idx] + } + if err := c.sendChunkString(cmd, gqtpFlagTail); err != nil { + return nil, err + } + head, err := c.recvHeader() + if err != nil { + return nil, err + } + return newGQTPResponse(c, head, start, name), nil +} + +// execBody sends a command with body and receives a response. +func (c *GQTPConn) execBody(cmd string, body io.Reader) (Response, error) { + start := time.Now() + name := strings.TrimLeft(cmd, " \t\r\n") + if idx := strings.IndexAny(name, " \t\r\n"); idx != -1 { + name = name[:idx] + } + if err := c.sendChunkString(cmd, 0); err != nil { + return nil, err + } + head, err := c.recvHeader() + if err != nil { + return nil, err + } + if head.Status != 0 || head.Size != 0 { + return newGQTPResponse(c, head, start, name), nil + } + n := 0 + buf := c.getBuffer() + for { + m, err := body.Read(buf[n:]) + n += m + if err != nil { + if err := c.sendChunkBytes(buf[:n], gqtpFlagTail); err != nil { + return nil, err + } + head, err = c.recvHeader() + if err != nil { + return nil, err + } + return newGQTPResponse(c, head, start, name), nil + } + if n == len(buf) { + if err := c.sendChunkBytes(buf, 0); err != nil { + return nil, err + } + head, err = c.recvHeader() + if err != nil { + return nil, err + } + if head.Status != 0 || head.Size != 0 { + return newGQTPResponse(c, head, start, name), nil + } + n = 0 + } + } +} + +// Exec sends a request and receives a response. +// It is the caller's responsibility to close the response. +// The GQTPConn should not be used until the response is closed. +func (c *GQTPConn) Exec(cmd string, body io.Reader) (Response, error) { + if !c.ready { + return nil, NewError(StatusInvalidOperation, map[string]interface{}{ + "error": "The connection is not ready to send a request.", + }) + } + if len(cmd) > gqtpMaxChunkSize { + return nil, NewError(StatusInvalidCommand, map[string]interface{}{ + "length": len(cmd), + "error": "The command is too long.", + }) + } + c.ready = false + if body == nil { + return c.exec(cmd) + } + return c.execBody(cmd, body) +} + // Query sends a request and receives a response. -func (c *gqtpClient) Query(req *Request) (*Response, error) { - if err := req.Check(); err != nil { +func (c *GQTPConn) Query(req *Request) (Response, error) { + cmd, body, err := req.GQTPRequest() + if err != nil { + return nil, err + } + return c.Exec(cmd, body) +} + +// GQTPClient is a thread-safe GQTP client. +type GQTPClient struct { + addr *Address + idleConns chan *GQTPConn +} + +// NewGQTPClient returns a new GQTPClient connected to a GQTP server. +// The expected address format is [scheme://][host][:port]. +func NewGQTPClient(addr string) (*GQTPClient, error) { + a, err := ParseGQTPAddress(addr) + if err != nil { + return nil, err + } + conn, err := DialGQTP(addr) + if err != nil { + return nil, err + } + conns := make(chan *GQTPConn, gqtpMaxIdleConns) + conns <- conn + return &GQTPClient{ + addr: a, + idleConns: conns, + }, nil +} + +// Close closes the idle connections. +// Close should be called after all responses are closed. +// Otherwise, connections will be leaked. +func (c *GQTPClient) Close() error { + var err error + for { + select { + case conn := <-c.idleConns: + if e := conn.Close(); e != nil && err == nil { + err = e + } + default: + return err + } + } +} + +// Exec sends a request and receives a response. +// It is the caller's responsibility to close the response. +func (c *GQTPClient) Exec(cmd string, body io.Reader) (Response, error) { + var conn *GQTPConn + var err error + select { + case conn = <-c.idleConns: + default: + conn, err = DialGQTP(c.addr.String()) + if err != nil { + return nil, err + } + } + resp, err := conn.Exec(cmd, body) + if err != nil { + conn.Close() return nil, err } + resp.(*gqtpResponse).client = c + return resp, nil +} - // TODO - return nil, nil +// Query calls Exec with req.GQTPRequest and returns the result. +func (c *GQTPClient) Query(req *Request) (Response, error) { + cmd, body, err := req.GQTPRequest() + if err != nil { + return nil, err + } + return c.Exec(cmd, body) } Added: v2/gqtp_test.go (+103 -0) 100644 =================================================================== --- /dev/null +++ v2/gqtp_test.go 2017-06-09 09:39:36 +0900 (1d9080b) @@ -0,0 +1,103 @@ +package grnci + +import ( + "io" + "io/ioutil" + "log" + "strings" + "testing" +) + +func TestGQTPConn(t *testing.T) { + type Pair struct { + Command string + Body string + } + pairs := []Pair{ + Pair{"no_such_command", ""}, + Pair{"status", ""}, + Pair{`table_create Tbl TABLE_PAT_KEY ShortText`, ""}, + Pair{`column_create Tbl Col COLUMN_SCALAR Int32`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test"]]'`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test" invalid_format]]'`, ""}, + Pair{"load --table Tbl", `[["_key"],["test"]]`}, + Pair{"load --table Tbl", `[["_key"],["test" invalid_format]]`}, + Pair{"select --table Tbl", ""}, + Pair{"dump", ""}, + } + + conn, err := DialGQTP("") + if err != nil { + t.Skipf("DialGQTP failed: %v", err) + } + defer conn.Close() + + for _, pair := range pairs { + var body io.Reader + if pair.Body != "" { + body = strings.NewReader(pair.Body) + } + log.Printf("command = %s", pair.Command) + resp, err := conn.Exec(pair.Command, body) + if err != nil { + t.Fatalf("conn.Exec failed: %v", err) + } + result, err := ioutil.ReadAll(resp) + if err != nil { + t.Fatalf("ioutil.ReadAll failed: %v", err) + } + log.Printf("status = %d, err = %v", resp.Status(), resp.Err()) + log.Printf("start = %v, elapsed = %v", resp.Start(), resp.Elapsed()) + log.Printf("result = %s", result) + if err := resp.Close(); err != nil { + t.Fatalf("resp.Close failed: %v", err) + } + } +} + +func TestGQTPClient(t *testing.T) { + type Pair struct { + Command string + Body string + } + pairs := []Pair{ + Pair{"no_such_command", ""}, + Pair{"status", ""}, + Pair{`table_create Tbl TABLE_PAT_KEY ShortText`, ""}, + Pair{`column_create Tbl Col COLUMN_SCALAR Int32`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test"]]'`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test" invalid_format]]'`, ""}, + Pair{"load --table Tbl", `[["_key"],["test"]]`}, + Pair{"load --table Tbl", `[["_key"],["test" invalid_format]]`}, + Pair{"select --table Tbl", ""}, + Pair{"dump", ""}, + } + + client, err := NewGQTPClient("") + if err != nil { + t.Skipf("NewGQTPClient failed: %v", err) + } + defer client.Close() + + for _, pair := range pairs { + var body io.Reader + if pair.Body != "" { + body = strings.NewReader(pair.Body) + } + log.Printf("command = %s", pair.Command) + resp, err := client.Exec(pair.Command, body) + if err != nil { + t.Fatalf("conn.Exec failed: %v", err) + } + result, err := ioutil.ReadAll(resp) + if err != nil { + t.Fatalf("ioutil.ReadAll failed: %v", err) + } + log.Printf("status = %d, err = %v", resp.Status(), resp.Err()) + log.Printf("start = %v, elapsed = %v", resp.Start(), resp.Elapsed()) + log.Printf("result = %s", result) + if err := resp.Close(); err != nil { + t.Fatalf("resp.Close failed: %v", err) + } + } +} Modified: v2/http.go (+348 -37) =================================================================== --- v2/http.go 2017-05-17 19:23:04 +0900 (14e2fbf) +++ v2/http.go 2017-06-09 09:39:36 +0900 (9c5e32d) @@ -1,73 +1,384 @@ package grnci import ( - "fmt" + "bytes" + "encoding/json" + "io" "io/ioutil" + "math" "net/http" "net/url" "path" + "time" ) -// httpClient is an HTTP client. -type httpClient struct { +const ( + httpBufferSize = 1024 // Enough size to store the response header +) + +// httpResponse is an HTTP response. +type httpResponse struct { + resp *http.Response // HTTP response + plain bool // Whether or not the response is plain + start time.Time // Start time + elapsed time.Duration // Elapsed time + err error // Error response + left []byte // Data left in buf + buf [1]byte // Buffer for the next byte +} + +// extractHTTPResponseHeader extracts the HTTP resonse header. +func extractHTTPResponseHeader(data []byte) (head, left []byte, err error) { + left = bytes.TrimLeft(data[1:], " \t\r\n") + if !bytes.HasPrefix(left, []byte("[")) { + err = NewError(StatusInvalidResponse, map[string]interface{}{ + "error": "The response does not contain a header.", + }) + return + } + var i int + stack := []byte{']'} +Loop: + for i = 1; i < len(left); i++ { + switch left[i] { + case '[': + stack = append(stack, ']') + case '{': + stack = append(stack, '}') + case ']', '}': + if left[i] != stack[len(stack)-1] { + err = NewError(StatusInvalidResponse, map[string]interface{}{ + "error": "The response header is broken.", + }) + return + } + stack = stack[:len(stack)-1] + if len(stack) == 0 { + break Loop + } + case '"': + for i++; i < len(left); i++ { + if left[i] == '\\' { + i++ + continue + } + if left[i] == '"' { + break + } + } + } + } + if len(stack) != 0 { + err = NewError(StatusInvalidResponse, map[string]interface{}{ + "error": "The response header is too long or broken.", + }) + return + } + head = left[:i+1] + left = bytes.TrimLeft(left[i+1:], " \t\r\n") + if bytes.HasPrefix(left, []byte(",")) { + left = bytes.TrimLeft(left[1:], " \t\r\n") + } + return +} + +// parseHTTPResponseHeaderError parses the error information in the HTTP resonse header. +func parseHTTPResponseHeaderError(status int, elems []interface{}) error { + err := NewError(status, nil) + if len(elems) >= 1 { + err = EnhanceError(err, map[string]interface{}{ + "message": elems[0], + }) + } + if len(elems) >= 2 { + if locs, ok := elems[1].([]interface{}); ok { + if len(locs) >= 1 { + if grnLocs, ok := locs[0].([]interface{}); ok { + if len(grnLocs) >= 1 { + if f, ok := grnLocs[0].(string); ok { + err = EnhanceError(err, map[string]interface{}{ + "function": f, + }) + } + } + if len(grnLocs) >= 2 { + if f, ok := grnLocs[1].(string); ok { + err = EnhanceError(err, map[string]interface{}{ + "file": f, + }) + } + } + if len(grnLocs) >= 3 { + if f, ok := grnLocs[2].(float64); ok { + err = EnhanceError(err, map[string]interface{}{ + "line": int(f), + }) + } + } + } + } + } + } + return err +} + +// parseHTTPResponseHeader parses the HTTP resonse header. +func parseHTTPResponseHeader(resp *http.Response, data []byte) (*httpResponse, error) { + head, left, err := extractHTTPResponseHeader(data) + if err != nil { + return nil, err + } + + var elems []interface{} + if err := json.Unmarshal(head, &elems); err != nil { + return nil, NewError(StatusInvalidResponse, map[string]interface{}{ + "method": "json.Unmarshal", + "error": err.Error(), + }) + } + if len(elems) < 3 { + return nil, NewError(StatusInvalidResponse, map[string]interface{}{ + "method": "json.Unmarshal", + "error": "Too few elements in the response header.", + }) + } + f, ok := elems[0].(float64) + if !ok { + return nil, NewError(StatusInvalidResponse, map[string]interface{}{ + "status": elems[0], + "error": "status must be a number.", + }) + } + status := int(f) + f, ok = elems[1].(float64) + if !ok { + return nil, NewError(StatusInvalidResponse, map[string]interface{}{ + "start": elems[1], + "error": "start must be a number.", + }) + } + i, f := math.Modf(f) + start := time.Unix(int64(i), int64(math.Floor(f*1000000+0.5))*1000).Local() + f, ok = elems[2].(float64) + if !ok { + return nil, NewError(StatusInvalidResponse, map[string]interface{}{ + "elapsed": elems[2], + "error": "elapsed must be a number.", + }) + } + elapsed := time.Duration(f * float64(time.Second)) + + if status != 0 { + err = parseHTTPResponseHeaderError(status, elems[3:]) + } + + return &httpResponse{ + resp: resp, + start: start, + elapsed: elapsed, + err: err, + left: left, + }, nil +} + +// newHTTPResponse returns a new httpResponse. +func newHTTPResponse(resp *http.Response, start time.Time) (*httpResponse, error) { + buf := make([]byte, httpBufferSize) + n := 0 + for n < len(buf) { + m, err := resp.Body.Read(buf[n:]) + n += m + if err == io.EOF { + break + } + if err != nil { + return nil, NewError(StatusNetworkError, map[string]interface{}{ + "method": "http.Response.Body.Read", + "error": err.Error(), + }) + } + } + data := bytes.TrimLeft(buf[:n], " \t\r\n") + if bytes.HasPrefix(data, []byte("[")) { + return parseHTTPResponseHeader(resp, data) + } + var err error + if resp.StatusCode != http.StatusOK { + err = NewError(resp.StatusCode, map[string]interface{}{ + "note": "The response format is not JSON.", + }) + } + return &httpResponse{ + resp: resp, + plain: true, + start: start, + elapsed: time.Now().Sub(start), + err: err, + left: data, + }, nil +} + +func (r *httpResponse) Status() int { + if err, ok := r.err.(*Error); ok { + return err.Code + } + return 0 +} + +func (r *httpResponse) Start() time.Time { + return r.start +} + +func (r *httpResponse) Elapsed() time.Duration { + return r.elapsed +} + +func (r *httpResponse) Read(p []byte) (n int, err error) { + if len(r.left) != 0 { + n = copy(p, r.left) + r.left = r.left[n:] + if len(r.left) != 0 { + return + } + } + var m int + if n < len(p) { + m, err = r.resp.Body.Read(p[n:]) + n += m + if err != nil { + if !r.plain && n > 0 && p[n-1] == ']' { + n-- + } + if err != io.EOF { + err = NewError(StatusNetworkError, map[string]interface{}{ + "method": "http.Response.Body.Read", + "error": err.Error(), + }) + } + return + } + } + if r.plain || n == 0 || p[n-1] != ']' { + return + } + m, err = r.resp.Body.Read(r.buf[:]) + if err == nil { + r.left = r.buf[:m] + return + } + if m == 0 { + n-- + } + if err != io.EOF { + err = NewError(StatusNetworkError, map[string]interface{}{ + "method": "http.Response.Body.Read", + "error": err.Error(), + }) + } + return +} + +func (r *httpResponse) Close() error { + io.Copy(ioutil.Discard, r.resp.Body) + if err := r.resp.Body.Close(); err != nil { + return NewError(StatusNetworkError, map[string]interface{}{ + "method": "http.Response.Body.Close", + "error": err.Error(), + }) + } + return nil +} + +func (r *httpResponse) Err() error { + return r.err +} + +// HTTPClient is a thread-safe HTTP client. +type HTTPClient struct { url *url.URL client *http.Client } -// newHTTPClient returns a new httpClient. -func newHTTPClient(a *Address, client *http.Client) (*httpClient, error) { +// NewHTTPClient returns a new HTTPClient. +// The expected address format is +// [scheme://][username[:password]@][host][:port][path][?query][#fragment]. +// If client is nil, NewHTTPClient uses http.DefaultClient. +func NewHTTPClient(addr string, client *http.Client) (*HTTPClient, error) { + a, err := ParseHTTPAddress(addr) + if err != nil { + return nil, err + } url, err := url.Parse(a.String()) if err != nil { - return nil, fmt.Errorf("url.Parse failed: %v", err) + return nil, NewError(StatusInvalidAddress, map[string]interface{}{ + "url": a.String(), + "method": "url.Parse", + "error": err.Error(), + }) } if client == nil { client = http.DefaultClient } - return &httpClient{ + return &HTTPClient{ url: url, client: client, }, nil } -// Close closes a client. -func (c *httpClient) Close() error { +// Close does nothing. +func (c *HTTPClient) Close() error { return nil } -// Query sends a request and receives a response. -func (c *httpClient) Query(req *Request) (*Response, error) { - if err := req.Check(); err != nil { - return nil, err +// exec sends a request and receives a response. +func (c *HTTPClient) exec(cmd string, params map[string]string, body io.Reader) (*http.Response, error) { + url := *c.url + url.Path = path.Join(url.Path, cmd) + if len(params) != 0 { + query := url.Query() + for k, v := range params { + query.Add(k, v) + } + url.RawQuery = query.Encode() } - - u := *c.url - u.Path = path.Join(u.Path, req.Command) - if len(req.Arguments) != 0 { - q := u.Query() - for _, arg := range req.Arguments { - q.Set(arg.Key, arg.Value) + if body == nil { + resp, err := c.client.Get(url.String()) + if err != nil { + return nil, NewError(StatusNetworkError, map[string]interface{}{ + "url": url.String(), + "method": "http.Client.Get", + "error": err.Error(), + }) } - u.RawQuery = q.Encode() + return resp, nil } - addr := u.String() + resp, err := c.client.Post(url.String(), "application/json", body) + if err != nil { + return nil, NewError(StatusNetworkError, map[string]interface{}{ + "url": url.String(), + "method": "http.Client.Post", + "error": err.Error(), + }) + } + return resp, nil +} - var resp *http.Response - var err error - if req.Body == nil { - if resp, err = c.client.Get(addr); err != nil { - return nil, fmt.Errorf("c.client.Get failed: %v", err) - } - } else { - if resp, err = c.client.Post(addr, "application/json", req.Body); err != nil { - return nil, fmt.Errorf("c.client.Post failed: %v", err) - } +// Exec sends a command and returns a response. +// It is the caller's responsibility to close the response. +func (c *HTTPClient) Exec(cmd string, params map[string]string, body io.Reader) (Response, error) { + start := time.Now() + resp, err := c.exec(cmd, params, body) + if err != nil { + return nil, err } - defer resp.Body.Close() + return newHTTPResponse(resp, start) +} - // TODO: parse the response. - respBytes, err := ioutil.ReadAll(resp.Body) +// Query calls Exec with req.HTTPRequest and returns the result. +func (c *HTTPClient) Query(req *Request) (Response, error) { + cmd, params, body, err := req.HTTPRequest() if err != nil { - return nil, fmt.Errorf("ioutil.ReadAll failed: %v", err) + return nil, err } - return &Response{Bytes: respBytes}, nil + return c.Exec(cmd, params, body) } Added: v2/http_test.go (+60 -0) 100644 =================================================================== --- /dev/null +++ v2/http_test.go 2017-06-09 09:39:36 +0900 (eca1bfd) @@ -0,0 +1,60 @@ +package grnci + +import ( + "io" + "io/ioutil" + "log" + "strings" + "testing" +) + +func TestHTTPClient(t *testing.T) { + type Pair struct { + Command string + Body string + } + pairs := []Pair{ + Pair{"no_such_command", ""}, + Pair{"status", ""}, + Pair{`table_create Tbl TABLE_PAT_KEY ShortText`, ""}, + Pair{`column_create Tbl Col COLUMN_SCALAR Int32`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test"]]'`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test" invalid_format]]'`, ""}, + Pair{"load --table Tbl", `[["_key"],["test"]]`}, + Pair{"load --table Tbl", `[["_key"],["test" invalid_format]]`}, + Pair{"select --table Tbl", ""}, + Pair{"dump", ""}, + } + + client, err := NewHTTPClient("", nil) + if err != nil { + t.Skipf("NewHTTPClient failed: %v", err) + } + defer client.Close() + + for _, pair := range pairs { + var body io.Reader + if pair.Body != "" { + body = strings.NewReader(pair.Body) + } + req, err := ParseRequest(pair.Command, body) + if err != nil { + t.Fatalf("ParseRequest failed: %v", err) + } + log.Printf("command = %s", pair.Command) + resp, err := client.Query(req) + if err != nil { + t.Fatalf("conn.Exec failed: %v", err) + } + result, err := ioutil.ReadAll(resp) + if err != nil { + t.Fatalf("ioutil.ReadAll failed: %v", err) + } + log.Printf("status = %d, err = %v", resp.Status(), resp.Err()) + log.Printf("start = %v, elapsed = %v", resp.Start(), resp.Elapsed()) + log.Printf("result = %s", result) + if err := resp.Close(); err != nil { + t.Fatalf("resp.Close failed: %v", err) + } + } +} Modified: v2/libgrn/client.go (+109 -15) =================================================================== --- v2/libgrn/client.go 2017-05-17 19:23:04 +0900 (e08e8fc) +++ v2/libgrn/client.go 2017-06-09 09:39:36 +0900 (eddaa5c) @@ -1,27 +1,121 @@ package libgrn -// #cgo pkg-config: groonga -// #include <groonga.h> -// #include <stdlib.h> -import "C" -import "github.com/groonga/grnci/v2" +import ( + "io" -// Client is a GQTP client. + "github.com/groonga/grnci/v2" +) + +const ( + maxIdleConns = 2 // Maximum number of idle connections +) + +// Client is a thread-safe GQTP client or DB handle. type Client struct { - ctx *grnCtx + addr *grnci.Address + baseConn *Conn + idleConns chan *Conn +} + +// DialClient returns a new Client connected to a GQTP server. +// The expected address format is [scheme://][host][:port]. +func DialClient(addr string) (*Client, error) { + a, err := grnci.ParseGQTPAddress(addr) + if err != nil { + return nil, err + } + conn, err := Dial(addr) + if err != nil { + return nil, err + } + conns := make(chan *Conn, maxIdleConns) + conns <- conn + return &Client{ + addr: a, + idleConns: conns, + }, nil } -// Connect establishes a connection with a GQTP server. -func Connect(addr string) (*grnci.Client, error) { - return nil, nil +// OpenClient opens an existing DB and returns a new Client. +func OpenClient(path string) (*Client, error) { + conn, err := Open(path) + if err != nil { + return nil, err + } + return &Client{ + baseConn: conn, + }, nil } -// Close closes a client. +// CreateClient creates a new DB and returns a new Client. +func CreateClient(path string) (*Client, error) { + conn, err := Create(path) + if err != nil { + return nil, err + } + return &Client{ + baseConn: conn, + }, nil +} + +// Close closes the idle connections. +// Close should be called after all responses are closed. +// Otherwise, connections will be leaked. func (c *Client) Close() error { - return nil + var err error +Loop: + for { + select { + case conn := <-c.idleConns: + if e := conn.Close(); e != nil && err == nil { + err = e + } + default: + break Loop + } + } + if c.baseConn != nil { + if e := c.baseConn.Close(); e != nil { + err = e + } + } + return err +} + +// Exec sends a request and receives a response. +// It is the caller's responsibility to close the response. +func (c *Client) Exec(cmd string, body io.Reader) (grnci.Response, error) { + var conn *Conn + var err error + select { + case conn = <-c.idleConns: + default: + if c.addr != nil { + conn, err = Dial(c.addr.String()) + if err != nil { + return nil, err + } + } else { + conn, err = c.baseConn.Dup() + if err != nil { + return nil, err + } + } + } + resp, err := conn.Exec(cmd, body) + if err != nil { + conn.Close() + return nil, err + } + resp.(*response).client = c + return resp, nil } -// Query sends a request and receives a response. -func (c *Client) Query(req *grnci.Request) (*grnci.Response, error) { - return nil, nil +// Query calls Exec with req.GQTPRequest and returns the result. +func (c *Client) Query(req *grnci.Request) (grnci.Response, error) { + cmd, body, err := req.GQTPRequest() + if err != nil { + return nil, err + } + return c.Exec(cmd, body) } Added: v2/libgrn/client_test.go (+103 -0) 100644 =================================================================== --- /dev/null +++ v2/libgrn/client_test.go 2017-06-09 09:39:36 +0900 (74b93a6) @@ -0,0 +1,103 @@ +package libgrn + +import ( + "io" + "io/ioutil" + "log" + "strings" + "testing" +) + +func TestClientGQTP(t *testing.T) { + type Pair struct { + Command string + Body string + } + pairs := []Pair{ + Pair{"no_such_command", ""}, + Pair{"status", ""}, + Pair{`table_create Tbl TABLE_PAT_KEY ShortText`, ""}, + Pair{`column_create Tbl Col COLUMN_SCALAR Int32`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test"]]'`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test" invalid_format]]'`, ""}, + Pair{"load --table Tbl", `[["_key"],["test"]]`}, + Pair{"load --table Tbl", `[["_key"],["test" invalid_format]]`}, + Pair{"select --table Tbl", ""}, + Pair{"dump", ""}, + } + + client, err := DialClient("") + if err != nil { + t.Skipf("DialClient failed: %v", err) + } + defer client.Close() + + for _, pair := range pairs { + var body io.Reader + if pair.Body != "" { + body = strings.NewReader(pair.Body) + } + log.Printf("command = %s", pair.Command) + resp, err := client.Exec(pair.Command, body) + if err != nil { + t.Fatalf("client.Exec failed: %v", err) + } + result, err := ioutil.ReadAll(resp) + if err != nil { + t.Fatalf("ioutil.ReadAll failed: %v", err) + } + log.Printf("status = %d, err = %v", resp.Status(), resp.Err()) + log.Printf("start = %v, elapsed = %v", resp.Start(), resp.Elapsed()) + log.Printf("result = %s", result) + if err := resp.Close(); err != nil { + t.Fatalf("resp.Close failed: %v", err) + } + } +} + +func TestClientDB(t *testing.T) { + type Pair struct { + Command string + Body string + } + pairs := []Pair{ + Pair{"no_such_command", ""}, + Pair{"status", ""}, + Pair{`table_create Tbl TABLE_PAT_KEY ShortText`, ""}, + Pair{`column_create Tbl Col COLUMN_SCALAR Int32`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test"]]'`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test" invalid_format]]'`, ""}, + Pair{"load --table Tbl", `[["_key"],["test"]]`}, + Pair{"load --table Tbl", `[["_key"],["test" invalid_format]]`}, + Pair{"select --table Tbl", ""}, + Pair{"dump", ""}, + } + + client, err := OpenClient("/tmp/db/db") + if err != nil { + t.Skipf("OpenClient failed: %v", err) + } + defer client.Close() + + for _, pair := range pairs { + var body io.Reader + if pair.Body != "" { + body = strings.NewReader(pair.Body) + } + log.Printf("command = %s", pair.Command) + resp, err := client.Exec(pair.Command, body) + if err != nil { + t.Fatalf("client.Exec failed: %v", err) + } + result, err := ioutil.ReadAll(resp) + if err != nil { + t.Fatalf("ioutil.ReadAll failed: %v", err) + } + log.Printf("status = %d, err = %v", resp.Status(), resp.Err()) + log.Printf("start = %v, elapsed = %v", resp.Start(), resp.Elapsed()) + log.Printf("result = %s", result) + if err := resp.Close(); err != nil { + t.Fatalf("resp.Close failed: %v", err) + } + } +} Added: v2/libgrn/conn.go (+294 -0) 100644 =================================================================== --- /dev/null +++ v2/libgrn/conn.go 2017-06-09 09:39:36 +0900 (f7413de) @@ -0,0 +1,294 @@ +package libgrn + +// #cgo pkg-config: groonga +// #include <groonga.h> +// #include <stdlib.h> +import "C" +import ( + "io" + "strings" + "time" + "unsafe" + + "github.com/groonga/grnci/v2" +) + +const ( + maxChunkSize = 1 << 30 // Maximum chunk size + defaultBufferSize = 1 << 16 // Default buffer size +) + +// Conn is a thread-unsafe GQTP client or DB handle. +type Conn struct { + ctx *grnCtx // C.grn_ctx + db *grnDB // C.grn_obj + buf []byte // Copy buffer + bufSize int // Copy buffer size + ready bool // Whether or not Exec is ready +} + +// newConn returns a new Conn. +func newConn(ctx *grnCtx, db *grnDB) *Conn { + return &Conn{ + ctx: ctx, + db: db, + bufSize: defaultBufferSize, + ready: true, + } +} + +// Dial returns a new Conn connected to a GQTP server. +func Dial(addr string) (*Conn, error) { + a, err := grnci.ParseGQTPAddress(addr) + if err != nil { + return nil, err + } + ctx, err := newGrnCtx() + if err != nil { + return nil, err + } + cHost := C.CString(a.Host) + defer C.free(unsafe.Pointer(cHost)) + // C.grn_ctx_connect always returns ctx.ctx.rc. + C.grn_ctx_connect(ctx.ctx, cHost, C.int(a.Port), 0) + if err := ctx.Err("C.grn_ctx_connect"); err != nil { + ctx.Close() + return nil, err + } + return newConn(ctx, nil), nil +} + +// Open opens an existing DB and returns a new Conn as its handle. +func Open(path string) (*Conn, error) { + ctx, err := newGrnCtx() + if err != nil { + return nil, err + } + db, err := openGrnDB(ctx, path) + if err != nil { + ctx.Close() + return nil, err + } + return newConn(ctx, db), nil +} + +// Create creates a new DB and returns a new Conn as its handle. +func Create(path string) (*Conn, error) { + ctx, err := newGrnCtx() + if err != nil { + return nil, err + } + db, err := createGrnDB(ctx, path) + if err != nil { + ctx.Close() + return nil, err + } + return newConn(ctx, db), nil +} + +// Dup duplicates the Conn if it is a DB handle. +func (c *Conn) Dup() (*Conn, error) { + if c.db == nil { + return nil, grnci.NewError(grnci.StatusInvalidOperation, map[string]interface{}{ + "error": "GQTP clients do not support Dup.", + }) + } + ctx, err := c.db.Dup() + if err != nil { + return nil, err + } + return newConn(ctx, c.db), nil +} + +// Close closes the Conn. +func (c *Conn) Close() error { + var err error + if c.db != nil { + if e := c.db.Close(c.ctx); e != nil { + err = e + } + } + if e := c.ctx.Close(); e != nil { + if err == nil { + err = e + } + } + return err +} + +// SetBufferSize updates the size of the copy buffer. +func (c *Conn) SetBufferSize(n int) { + if n <= 0 || n > maxChunkSize { + n = defaultBufferSize + } + c.bufSize = n +} + +// getBuffer returns the copy buffer. +func (c *Conn) getBuffer() []byte { + if len(c.buf) != c.bufSize { + c.buf = make([]byte, c.bufSize) + } + return c.buf +} + +// execGQTP sends a command and receives a response. +func (c *Conn) execGQTP(cmd string) (grnci.Response, error) { + start := time.Now() + name := strings.TrimLeft(cmd, " \t\r\n") + if idx := strings.IndexAny(name, " \t\r\n"); idx != -1 { + name = name[:idx] + } + if err := c.ctx.Send([]byte(cmd), flagTail); err != nil { + return nil, err + } + data, flags, err := c.ctx.Recv() + if err != nil && len(data) == 0 { + return nil, err + } + return newGQTPResponse(c, start, name, data, flags, err), nil +} + +// execDB executes a command and receives a response. +func (c *Conn) execDB(cmd string) (grnci.Response, error) { + start := time.Now() + if err := c.ctx.Send([]byte(cmd), flagTail); err != nil { + data, flags, _ := c.ctx.Recv() + return newDBResponse(c, start, data, flags, err), nil + } + data, flags, err := c.ctx.Recv() + return newDBResponse(c, start, data, flags, err), nil +} + +// exec sends a command without body and receives a response. +func (c *Conn) exec(cmd string) (grnci.Response, error) { + if c.db == nil { + return c.execGQTP(cmd) + } + return c.execDB(cmd) +} + +// execBodyGQTP sends a command and receives a response. +func (c *Conn) execBodyGQTP(cmd string, body io.Reader) (grnci.Response, error) { + start := time.Now() + name := strings.TrimLeft(cmd, " \t\r\n") + if idx := strings.IndexAny(name, " \t\r\n"); idx != -1 { + name = name[:idx] + } + if err := c.ctx.Send([]byte(cmd), 0); err != nil { + return nil, err + } + data, flags, err := c.ctx.Recv() + if len(data) != 0 { + return newGQTPResponse(c, start, name, data, flags, err), nil + } + if err != nil { + return nil, err + } + n := 0 + buf := c.getBuffer() + for { + m, err := body.Read(buf[n:]) + n += m + if err != nil { + if err := c.ctx.Send(buf[:n], flagTail); err != nil { + return nil, err + } + data, flags, err := c.ctx.Recv() + if len(data) != 0 || err == nil { + return newGQTPResponse(c, start, name, data, flags, err), nil + } + return nil, err + } + if n == len(buf) { + if err := c.ctx.Send(buf, 0); err != nil { + return nil, err + } + n = 0 + data, flags, err = c.ctx.Recv() + if len(data) != 0 { + return newGQTPResponse(c, start, name, data, flags, err), nil + } + if err != nil { + return nil, err + } + } + } +} + +// execBodyDB sends a command and receives a response. +func (c *Conn) execBodyDB(cmd string, body io.Reader) (grnci.Response, error) { + start := time.Now() + if err := c.ctx.Send([]byte(cmd), 0); err != nil { + data, flags, _ := c.ctx.Recv() + return newDBResponse(c, start, data, flags, err), nil + } + data, flags, err := c.ctx.Recv() + if len(data) != 0 || err != nil { + return newDBResponse(c, start, data, flags, err), nil + } + n := 0 + buf := c.getBuffer() + for { + m, err := body.Read(buf[n:]) + n += m + if err != nil { + if err := c.ctx.Send(buf[:n], flagTail); err != nil { + data, flags, _ := c.ctx.Recv() + return newDBResponse(c, start, data, flags, err), nil + } + data, flags, err := c.ctx.Recv() + return newDBResponse(c, start, data, flags, err), nil + } + if n == len(buf) { + if err := c.ctx.Send(buf, 0); err != nil { + data, flags, _ := c.ctx.Recv() + return newDBResponse(c, start, data, flags, err), nil + } + n = 0 + data, flags, err = c.ctx.Recv() + if len(data) != 0 || err != nil { + return newDBResponse(c, start, data, flags, err), nil + } + } + } +} + +// execBody sends a command with body and receives a response. +func (c *Conn) execBody(cmd string, body io.Reader) (grnci.Response, error) { + if c.db == nil { + return c.execBodyGQTP(cmd, body) + } + return c.execBodyDB(cmd, body) +} + +// Exec sends a request and receives a response. +// It is the caller's responsibility to close the response. +// The Conn should not be used until the response is closed. +func (c *Conn) Exec(cmd string, body io.Reader) (grnci.Response, error) { + if !c.ready { + return nil, grnci.NewError(grnci.StatusInvalidOperation, map[string]interface{}{ + "error": "The connection is not ready to send a request.", + }) + } + if len(cmd) > maxChunkSize { + return nil, grnci.NewError(grnci.StatusInvalidCommand, map[string]interface{}{ + "length": len(cmd), + "error": "The command is too long.", + }) + } + c.ready = false + if body == nil { + return c.exec(cmd) + } + return c.execBody(cmd, body) +} + +// Query calls Exec with req.GQTPRequest and returns the result. +func (c *Conn) Query(req *grnci.Request) (grnci.Response, error) { + cmd, body, err := req.GQTPRequest() + if err != nil { + return nil, err + } + return c.Exec(cmd, body) +} Added: v2/libgrn/conn_test.go (+103 -0) 100644 =================================================================== --- /dev/null +++ v2/libgrn/conn_test.go 2017-06-09 09:39:36 +0900 (fd8d110) @@ -0,0 +1,103 @@ +package libgrn + +import ( + "io" + "io/ioutil" + "log" + "strings" + "testing" +) + +func TestConnGQTP(t *testing.T) { + type Pair struct { + Command string + Body string + } + pairs := []Pair{ + Pair{"no_such_command", ""}, + Pair{"status", ""}, + Pair{`table_create Tbl TABLE_PAT_KEY ShortText`, ""}, + Pair{`column_create Tbl Col COLUMN_SCALAR Int32`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test"]]'`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test" invalid_format]]'`, ""}, + Pair{"load --table Tbl", `[["_key"],["test"]]`}, + Pair{"load --table Tbl", `[["_key"],["test" invalid_format]]`}, + Pair{"select --table Tbl", ""}, + Pair{"dump", ""}, + } + + conn, err := Dial("") + if err != nil { + t.Skipf("Dial failed: %v", err) + } + defer conn.Close() + + for _, pair := range pairs { + var body io.Reader + if pair.Body != "" { + body = strings.NewReader(pair.Body) + } + log.Printf("command = %s", pair.Command) + resp, err := conn.Exec(pair.Command, body) + if err != nil { + t.Fatalf("conn.Exec failed: %v", err) + } + result, err := ioutil.ReadAll(resp) + if err != nil { + t.Fatalf("ioutil.ReadAll failed: %v", err) + } + log.Printf("status = %d, err = %v", resp.Status(), resp.Err()) + log.Printf("start = %v, elapsed = %v", resp.Start(), resp.Elapsed()) + log.Printf("result = %s", result) + if err := resp.Close(); err != nil { + t.Fatalf("resp.Close failed: %v", err) + } + } +} + +func TestConnDB(t *testing.T) { + type Pair struct { + Command string + Body string + } + pairs := []Pair{ + Pair{"no_such_command", ""}, + Pair{"status", ""}, + Pair{`table_create Tbl TABLE_PAT_KEY ShortText`, ""}, + Pair{`column_create Tbl Col COLUMN_SCALAR Int32`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test"]]'`, ""}, + Pair{`load --table Tbl --values '[["_key"],["test" invalid_format]]'`, ""}, + Pair{"load --table Tbl", `[["_key"],["test"]]`}, + Pair{"load --table Tbl", `[["_key"],["test" invalid_format]]`}, + Pair{"select --table Tbl", ""}, + Pair{"dump", ""}, + } + + conn, err := Open("/tmp/db/db") + if err != nil { + t.Skipf("Open failed: %v", err) + } + defer conn.Close() + + for _, pair := range pairs { + var body io.Reader + if pair.Body != "" { + body = strings.NewReader(pair.Body) + } + log.Printf("command = %s", pair.Command) + resp, err := conn.Exec(pair.Command, body) + if err != nil { + t.Fatalf("conn.Exec failed: %v", err) + } + result, err := ioutil.ReadAll(resp) + if err != nil { + t.Fatalf("ioutil.ReadAll failed: %v", err) + } + log.Printf("status = %d, err = %v", resp.Status(), resp.Err()) + log.Printf("start = %v, elapsed = %v", resp.Start(), resp.Elapsed()) + log.Printf("result = %s", result) + if err := resp.Close(); err != nil { + t.Fatalf("resp.Close failed: %v", err) + } + } +} Deleted: v2/libgrn/handle.go (+0 -134) 100644 =================================================================== --- v2/libgrn/handle.go 2017-05-17 19:23:04 +0900 (ce133dc) +++ /dev/null @@ -1,134 +0,0 @@ -package libgrn - -// #cgo pkg-config: groonga -// #include <groonga.h> -// #include <stdlib.h> -import "C" -import ( - "fmt" - "io/ioutil" - "unsafe" - - "github.com/groonga/grnci/v2" -) - -// Handle is a handle for a local DB. -type Handle struct { - ctx *grnCtx - db *grnDB -} - -// Open opens a local DB and returns a handle. -func Open(path string) (*Handle, error) { - ctx, err := newGrnCtx() - if err != nil { - return nil, err - } - db, err := openGrnDB(ctx, path) - if err != nil { - ctx.Close() - return nil, err - } - return &Handle{ - ctx: ctx, - db: db, - }, nil -} - -// Create creates a local DB and returns a hendle. -func Create(path string) (*Handle, error) { - ctx, err := newGrnCtx() - if err != nil { - return nil, err - } - db, err := createGrnDB(ctx, path) - if err != nil { - ctx.Close() - return nil, err - } - return &Handle{ - ctx: ctx, - db: db, - }, nil -} - -// Close closes a handle. -func (h *Handle) Close() error { - if err := h.db.Close(h.ctx); err != nil { - return err - } - if err := h.ctx.Close(); err != nil { - return err - } - return nil -} - -// Dup duplicates a handle. -func (h *Handle) Dup() (*Handle, error) { - ctx, err := h.db.Dup() - if err != nil { - return nil, err - } - return &Handle{ - ctx: ctx, - db: h.db, - }, nil -} - -// send sends data. -func (h *Handle) send(data []byte) error { - var p *C.char - if len(data) != 0 { - p = (*C.char)(unsafe.Pointer(&data[0])) - } - rc := C.grn_rc(C.grn_ctx_send(h.ctx.ctx, p, C.uint(len(data)), C.int(0))) - if (rc != C.GRN_SUCCESS) || (h.ctx.ctx.rc != C.GRN_SUCCESS) { - return fmt.Errorf("C.grn_ctx_send failed: rc = %d", rc) - } - return nil -} - -// recv receives data. -func (h *Handle) recv() ([]byte, error) { - var resp *C.char - var respLen C.uint - var respFlags C.int - rc := C.grn_rc(C.grn_ctx_recv(h.ctx.ctx, &resp, &respLen, &respFlags)) - if (rc != C.GRN_SUCCESS) || (h.ctx.ctx.rc != C.GRN_SUCCESS) { - return nil, fmt.Errorf("C.grn_ctx_recv failed: rc = %d", rc) - } - return C.GoBytes(unsafe.Pointer(resp), C.int(respLen)), nil -} - -// Query sends a request and receives a response. -// -// TODO: error handling -func (h *Handle) Query(req *grnci.Request) (*grnci.Response, error) { - cmd, err := req.Assemble() - if err != nil { - return nil, err - } - if err := h.send(cmd); err != nil { - respBytes, _ := h.recv() - resp, _ := grnci.NewResponse(respBytes) - return resp, err - } - respBytes, err := h.recv() - if (req.Body == nil) || (err != nil) { - resp, _ := grnci.NewResponse(respBytes) - return resp, err - } - if len(respBytes) != 0 { - resp, _ := grnci.NewResponse(respBytes) - return resp, fmt.Errorf("unexpected response") - } - body, _ := ioutil.ReadAll(req.Body) - if err := h.send(body); err != nil { - respBytes, _ := h.recv() - resp, _ := grnci.NewResponse(respBytes) - return resp, err - } - respBytes, _ = h.recv() - resp, _ := grnci.NewResponse(respBytes) - return resp, nil -} Modified: v2/libgrn/libgrn.go (+112 -53) =================================================================== --- v2/libgrn/libgrn.go 2017-05-17 19:23:04 +0900 (a984236) +++ v2/libgrn/libgrn.go 2017-06-09 09:39:36 +0900 (07e3edf) @@ -1,4 +1,4 @@ -// Package libgrn provides Client using libgroonga. +// Package libgrn provides GQTP clients and DB handles using libgroonga. package libgrn // #cgo pkg-config: groonga @@ -6,11 +6,19 @@ package libgrn // #include <stdlib.h> import "C" import ( - "errors" - "fmt" "reflect" "sync" "unsafe" + + "github.com/groonga/grnci/v2" +) + +const ( + flagMore = byte(0x01) + flagTail = byte(0x02) + flagHead = byte(0x04) + flagQuiet = byte(0x08) + flagQuit = byte(0x10) ) var ( @@ -18,6 +26,8 @@ var ( libCount int // libMutex is used for exclusion control in Init and Fin. libMutex sync.Mutex + // initFinDisabled represents whether or not Init and Fin are disabled. + initFinDisabled bool ) // Init initializes libgroonga. @@ -26,13 +36,16 @@ var ( // Otherwise, Init only increments the internal counter. // // There is no need to call Init explicitly. -// libgrn calls Init when it creates a Client. +// libgrn calls Init when it creates a Conn. func Init() error { libMutex.Lock() defer libMutex.Unlock() - if libCount == 0 { + if !initFinDisabled && libCount == 0 { if rc := C.grn_init(); rc != C.GRN_SUCCESS { - return fmt.Errorf("C.grn_init failed: rc = %d", rc) + return grnci.NewError(int(rc), map[string]interface{}{ + "method": "C.grn_init", + "error": "Failed to initialize libgroonga.", + }) } } libCount++ @@ -45,23 +58,36 @@ func Init() error { // Otherwise, Fin only decrements the internal counter. // // There is no need to call Fin explicitly. -// libgrn calls Fin when it closes a Client. +// libgrn calls Fin when it closes a Conn. func Fin() error { libMutex.Lock() defer libMutex.Unlock() - if libCount == 0 { - return fmt.Errorf("libCount = 0") + if libCount <= 0 { + return grnci.NewError(grnci.StatusInvalidOperation, map[string]interface{}{ + "libCount": libCount, + "error": "libCount must be greater than 0.", + }) } libCount-- - if libCount == 0 { + if !initFinDisabled && libCount == 0 { if rc := C.grn_fin(); rc != C.GRN_SUCCESS { - return fmt.Errorf("C.grn_fin failed: rc = %d", rc) + return grnci.NewError(int(rc), map[string]interface{}{ + "method": "C.grn_fin", + "error": "Failed to finalize libgroonga.", + }) } } return nil } -// ctx is a Groonga context. +// DisableInitFin disables Init and Fin. +// If another package initializes and finalizes libgroonga, +// DisableInitFin must be called before creation of the first Conn. +func DisableInitFin() { + initFinDisabled = true +} + +// grnCtx wraps C.grn_ctx. type grnCtx struct { ctx *C.grn_ctx } @@ -69,71 +95,91 @@ type grnCtx struct { // newGrnCtx returns a new grnCtx. func newGrnCtx() (*grnCtx, error) { if err := Init(); err != nil { - return nil, fmt.Errorf("Init failed: %v", err) + return nil, err } ctx := C.grn_ctx_open(C.int(0)) if ctx == nil { Fin() - return nil, errors.New("C.grn_ctx_open failed") + return nil, grnci.NewError(grnci.StatusUnknownError, map[string]interface{}{ + "method": "C.grn_ctx_open", + }) } return &grnCtx{ctx: ctx}, nil } -// Close closes a grnCtx. +// Close closes the grnCtx. func (c *grnCtx) Close() error { if rc := C.grn_ctx_close(c.ctx); rc != C.GRN_SUCCESS { - return fmt.Errorf("C.grn_ctx_close failed: %s", rc) + return grnci.NewError(int(rc), map[string]interface{}{ + "method": "C.grn_ctx_close", + }) } if err := Fin(); err != nil { - return fmt.Errorf("Fin failed: %v", err) + return err } return nil } -// TODO -func (c *grnCtx) Err() error { +// Err returns the stored error. +func (c *grnCtx) Err(method string) error { if c.ctx.rc == C.GRN_SUCCESS { return nil } - return fmt.Errorf("rc = %s: %s", c.ctx.rc, C.GoString(&c.ctx.errbuf[0])) + data := map[string]interface{}{ + "method": method, + } + if c.ctx.errline != 0 { + data["line"] = int(c.ctx.errline) + } + if c.ctx.errfile != nil { + data["file"] = C.GoString(c.ctx.errfile) + } + if c.ctx.errfunc != nil { + data["function"] = C.GoString(c.ctx.errfunc) + } + if c.ctx.errbuf[0] != 0 { + data["error"] = C.GoString(&c.ctx.errbuf[0]) + } + return grnci.NewError(int(c.ctx.rc), data) } -// Send sends data. -func (c *grnCtx) Send(data []byte, flags int) error { +// Send sends data with flags. +// The behavior depends on the grnCtx. +func (c *grnCtx) Send(data []byte, flags byte) error { var p *C.char if len(data) != 0 { p = (*C.char)(unsafe.Pointer(&data[0])) } - rc := C.grn_rc(C.grn_ctx_send(c.ctx, p, C.uint(len(data)), C.int(flags))) - if (rc != C.GRN_SUCCESS) || (c.ctx.rc != C.GRN_SUCCESS) { - return fmt.Errorf("C.grn_ctx_send failed: rc = %d", rc) + // C.grn_ctx_send always returns 0. + C.grn_ctx_send(c.ctx, p, C.uint(len(data)), C.int(flags)) + if err := c.Err("C.grn_ctx_send"); err != nil { + return err } return nil } -// Recv receives data. -// -// Note that data will be desrtoyed by the next operation on the same context. -func (c *grnCtx) Recv() (data []byte, flags int, err error) { +// Recv receives data with flags. +// The data will be destroyed by the next operation on the grnCtx. +func (c *grnCtx) Recv() (data []byte, flags byte, err error) { var cPtr *C.char var cLen C.uint var cFlags C.int - rc := C.grn_rc(C.grn_ctx_recv(c.ctx, &cPtr, &cLen, &cFlags)) - if (rc != C.GRN_SUCCESS) || (c.ctx.rc != C.GRN_SUCCESS) { - return nil, 0, fmt.Errorf("C.grn_ctx_recv failed: rc = %s", rc) - } + // C.grn_ctx_recv always returns 0 if c.ctx is not nil. + C.grn_ctx_recv(c.ctx, &cPtr, &cLen, &cFlags) head := (*reflect.SliceHeader)(unsafe.Pointer(&data)) head.Data = uintptr(unsafe.Pointer(cPtr)) head.Len = int(cLen) head.Cap = int(cLen) - flags = int(cFlags) + if err = c.Err("C.grn_ctx_recv"); err != nil { + return + } + flags = byte(cFlags) return } -// grnDB is a DB handle. +// grnDB wraps a C.grn_obj referring to a DB object. type grnDB struct { obj *C.grn_obj - path string count int mutex sync.Mutex } @@ -144,14 +190,15 @@ func createGrnDB(ctx *grnCtx, path string) (*grnDB, error) { defer C.free(unsafe.Pointer(cPath)) obj := C.grn_db_create(ctx.ctx, cPath, nil) if obj == nil { - return nil, fmt.Errorf("C.grn_db_create failed: %v", ctx.Err()) - } - if cAbsPath := C.grn_obj_path(ctx.ctx, obj); cAbsPath != nil { - path = C.GoString(cAbsPath) + if err := ctx.Err("C.grn_db_create"); err != nil { + return nil, err + } + return nil, grnci.NewError(grnci.StatusUnknownError, map[string]interface{}{ + "method": "C.grn_db_create", + }) } return &grnDB{ obj: obj, - path: path, count: 1, }, nil } @@ -162,45 +209,57 @@ func openGrnDB(ctx *grnCtx, path string) (*grnDB, error) { defer C.free(unsafe.Pointer(cPath)) obj := C.grn_db_open(ctx.ctx, cPath) if obj == nil { - return nil, fmt.Errorf("C.grn_db_create failed: %v", ctx.Err()) - } - if cAbsPath := C.grn_obj_path(ctx.ctx, obj); cAbsPath != nil { - path = C.GoString(cAbsPath) + if err := ctx.Err("C.grn_db_open"); err != nil { + return nil, err + } + return nil, grnci.NewError(grnci.StatusUnknownError, map[string]interface{}{ + "method": "C.grn_db_open", + }) } return &grnDB{ obj: obj, - path: path, count: 1, }, nil } -// Close closes a DB. +// Close closes the grnDB. func (db *grnDB) Close(ctx *grnCtx) error { db.mutex.Lock() defer db.mutex.Unlock() if db.count <= 0 { - return fmt.Errorf("underflow: count = %d", db.count) + return grnci.NewError(grnci.StatusInvalidOperation, map[string]interface{}{ + "count": db.count, + "error": "count must be greater than 0.", + }) } db.count-- if db.count == 0 { if rc := C.grn_obj_close(ctx.ctx, db.obj); rc != C.GRN_SUCCESS { - return fmt.Errorf("C.grn_obj_close failed: rc = %s", rc) + if err := ctx.Err("C.grn_obj_close"); err != nil { + return grnci.EnhanceError(err, map[string]interface{}{ + "rc": int(rc), + }) + } + return grnci.NewError(int(rc), map[string]interface{}{ + "method": "C.grn_obj_close", + }) } db.obj = nil } return nil } -// Dup duplicates a DB handle. +// Dup returns a new grnCtx to handle the grnDB. func (db *grnDB) Dup() (*grnCtx, error) { ctx, err := newGrnCtx() if err != nil { - return nil, fmt.Errorf("newGrnCtx failed: %v", err) + return nil, err } + // C.grn_ctx_use returns ctx.ctx.rc. C.grn_ctx_use(ctx.ctx, db.obj) - if err := ctx.Err(); err != nil { + if err := ctx.Err("C.grn_ctx_use"); err != nil { ctx.Close() - return nil, fmt.Errorf("C.grn_ctx_use failed: %v", err) + return nil, err } db.mutex.Lock() db.count++ Added: v2/libgrn/response.go (+141 -0) 100644 =================================================================== --- /dev/null +++ v2/libgrn/response.go 2017-06-09 09:39:36 +0900 (f73e353) @@ -0,0 +1,141 @@ +package libgrn + +import ( + "io" + "io/ioutil" + "time" + + "github.com/groonga/grnci/v2" +) + +// response is a response. +type response struct { + client *Client + conn *Conn + start time.Time + elapsed time.Duration + left []byte + flags byte + err error + broken bool + closed bool +} + +// newGQTPResponse returns a new GQTP response. +func newGQTPResponse(conn *Conn, start time.Time, name string, data []byte, flags byte, err error) *response { + resp := &response{ + conn: conn, + start: start, + elapsed: time.Now().Sub(start), + left: data, + flags: flags, + err: err, + } + if resp.err != nil { + if _, ok := grnci.CommandRules[name]; !ok { + data, err := ioutil.ReadAll(resp) + resp.err = grnci.EnhanceError(resp.err, map[string]interface{}{ + "error": string(data), + }) + if err != nil { + resp.broken = true + } + } + } + return resp +} + +// newDBResponse returns a new DB response. +func newDBResponse(conn *Conn, start time.Time, data []byte, flags byte, err error) *response { + return &response{ + conn: conn, + start: start, + elapsed: time.Now().Sub(start), + left: data, + flags: flags, + err: err, + } +} + +// Status returns the status code. +func (r *response) Status() int { + if err, ok := r.err.(*grnci.Error); ok { + return err.Code + } + return 0 +} + +// Start returns the start time. +func (r *response) Start() time.Time { + return r.start +} + +// Elapsed returns the elapsed time. +func (r *response) Elapsed() time.Duration { + return r.elapsed +} + +// Read reads the response body at most len(p) bytes into p. +// The return value n is the number of bytes read. +func (r *response) Read(p []byte) (n int, err error) { + if r.closed { + return 0, io.EOF + } + for len(r.left) == 0 { + if r.flags&flagMore == 0 { + return 0, io.EOF + } + data, flags, err := r.conn.ctx.Recv() + if err != nil { + r.broken = true + return 0, err + } + r.left = data + r.flags = flags + } + n = copy(p, r.left) + r.left = r.left[n:] + return +} + +// Close closes the response body. +func (r *response) Close() error { + if r.closed { + return nil + } + var err error + if !r.broken { + if _, err = io.CopyBuffer(ioutil.Discard, r, r.conn.getBuffer()); err != nil { + r.broken = true + err = grnci.NewError(grnci.StatusNetworkError, map[string]interface{}{ + "method": "io.CopyBuffer", + "error": err.Error(), + }) + } + } + r.closed = true + if !r.broken { + r.conn.ready = true + } + if r.client != nil { + // Broken connections are closed. + if r.broken { + if e := r.conn.Close(); e != nil && err != nil { + err = e + } + } + select { + case r.client.idleConns <- r.conn: + default: + if e := r.conn.Close(); e != nil && err != nil { + err = e + } + } + } + return err +} + +// Err returns the stored error. +func (r *response) Err() error { + return r.err +} Modified: v2/request.go (+271 -47) =================================================================== --- v2/request.go 2017-05-17 19:23:04 +0900 (d7bfa2c) +++ v2/request.go 2017-06-09 09:39:36 +0900 (c984c0e) @@ -1,82 +1,306 @@ package grnci import ( - "errors" "fmt" "io" + "sort" + "strings" ) -// Request stores a Groonga command with arguments. +// Request is a request. type Request struct { - Command string - Arguments []Argument - Body io.Reader + Command string // Command name + CommandRule *CommandRule // Command rule + Params map[string]string // Command parameters + NAnonParams int // Number of unnamed parameters + Body io.Reader // Body (nil is allowed) } -// checkCommand checks if s is valid as a command name. -func checkCommand(s string) error { - if s == "" { - return errors.New("invalid name: s = ") +// NewRequest returns a new Request. +func NewRequest(cmd string, params map[string]string, body io.Reader) (*Request, error) { + if err := checkCommand(cmd); err != nil { + return nil, err } - if s[0] == '_' { - return fmt.Errorf("invalid name: s = %s", s) + cr := GetCommandRule(cmd) + paramsCopy := make(map[string]string) + for k, v := range params { + if err := cr.CheckParam(k, v); err != nil { + return nil, EnhanceError(err, map[string]interface{}{ + "command": cmd, + }) + } + paramsCopy[k] = v } - for i := 0; i < len(s); i++ { - if !(s[i] >= 'a' && s[i] <= 'z') && s[i] != '_' { - return fmt.Errorf("invalid name: s = %s", s) + return &Request{ + Command: cmd, + CommandRule: cr, + Params: paramsCopy, + Body: body, + }, nil +} + +// unescapeCommandByte returns an unescaped byte. +func unescapeCommandByte(b byte) byte { + switch b { + case 'b': + return '\b' + case 't': + return '\t' + case 'r': + return '\r' + case 'n': + return '\n' + default: + return b + } +} + +// tokenizeCommand tokenizes s as a command. +func tokenizeCommand(s string) []string { + var tokens []string + var token []byte + for { + s = strings.TrimLeft(s, " \t\r\n") + if len(s) == 0 { + break } + switch s[0] { + case '"', '\'': + i := 1 + for ; i < len(s); i++ { + if s[i] == s[0] { + i++ + break + } + if s[i] != '\\' { + token = append(token, s[i]) + continue + } + i++ + if i == len(s) { + break + } + token = append(token, unescapeCommandByte(s[i])) + } + s = s[i:] + default: + i := 0 + Loop: + for ; i < len(s); i++ { + switch s[i] { + case ' ', '\t', '\r', '\n', '"', '\'': + break Loop + case '\\': + i++ + if i == len(s) { + break Loop + } + token = append(token, unescapeCommandByte(s[i])) + default: + token = append(token, s[i]) + } + } + s = s[i:] + } + tokens = append(tokens, string(token)) + token = token[:0] } - return nil + return tokens } -// Check checks if req is valid. -func (r *Request) Check() error { - if err := checkCommand(r.Command); err != nil { - return fmt.Errorf("checkCommand failed: %v", err) +// ParseRequest parses a request. +func ParseRequest(cmd string, body io.Reader) (*Request, error) { + tokens := tokenizeCommand(cmd) + if len(tokens) == 0 { + return nil, NewError(StatusInvalidCommand, map[string]interface{}{ + "tokens": tokens, + "error": "len(tokens) must not be 0.", + }) } - for _, arg := range r.Arguments { - if err := arg.Check(); err != nil { - return fmt.Errorf("arg.Check failed: %v", err) + if err := checkCommand(tokens[0]); err != nil { + return nil, err + } + cr := GetCommandRule(tokens[0]) + r := &Request{ + Command: tokens[0], + CommandRule: cr, + Body: body, + } + for i := 1; i < len(tokens); i++ { + var k, v string + if strings.HasPrefix(tokens[i], "--") { + k = tokens[i][2:] + i++ + if i < len(tokens) { + v = tokens[i] + } + } else { + v = tokens[i] + } + if err := r.AddParam(k, v); err != nil { + return nil, err } } + return r, nil +} + +// AddParam adds a parameter. +// AddParam assumes that Command is already set. +func (r *Request) AddParam(key, value string) error { + if r.CommandRule == nil { + r.CommandRule = GetCommandRule(r.Command) + } + if key == "" { + if r.NAnonParams >= len(r.CommandRule.ParamRules) { + return NewError(StatusInvalidCommand, map[string]interface{}{ + "command": r.Command, + "error": fmt.Sprintf("The command accepts at most %d unnamed parameters.", + len(r.CommandRule.ParamRules)), + }) + } + pr := r.CommandRule.ParamRules[r.NAnonParams] + if err := pr.CheckValue(value); err != nil { + return EnhanceError(err, map[string]interface{}{ + "command": r.Command, + "key": key, + }) + } + if r.Params == nil { + r.Params = make(map[string]string) + } + r.Params[pr.Key] = value + r.NAnonParams++ + return nil + } + if err := r.CommandRule.CheckParam(key, value); err != nil { + return EnhanceError(err, map[string]interface{}{ + "command": r.Command, + }) + } + if r.Params == nil { + r.Params = make(map[string]string) + } + r.Params[key] = value return nil } -// Assemble assembles Command and Arguments into command bytes. +// GQTPRequest returns components for a GQTP request. +// If the request is invalid, GQTPRequest returns an error. // -// The command format is -// Command --Arguments[i].Key 'Arguments[i].Value' ... -func (r *Request) Assemble() ([]byte, error) { - if err := r.Check(); err != nil { - return nil, err +// GQTPRequest assembles Command and Params into a string. +// Parameters in the string are sorted in key order. +func (r *Request) GQTPRequest() (cmd string, body io.Reader, err error) { + if err = r.Check(); err != nil { + return } size := len(r.Command) - for _, arg := range r.Arguments { - if len(arg.Key) != 0 { - size += len(arg.Key) + 3 - } - size += len(arg.Value)*2 + 3 + for k, v := range r.Params { + size += len(k) + 3 + size += len(v)*2 + 3 } buf := make([]byte, 0, size) buf = append(buf, r.Command...) - for _, arg := range r.Arguments { - buf = append(buf, ' ') - if len(arg.Key) != 0 { - buf = append(buf, "--"...) - buf = append(buf, arg.Key...) - } - buf = append(buf, '\'') - for i := 0; i < len(arg.Value); i++ { - switch arg.Value[i] { - case '\'': - buf = append(buf, '\'') - case '\\': + keys := make([]string, 0, len(r.Params)) + for k := range r.Params { + keys = append(keys, k) + } + sort.Strings(keys) + for _, k := range keys { + v := r.Params[k] + buf = append(buf, " --"...) + buf = append(buf, k...) + buf = append(buf, " '"...) + for i := 0; i < len(v); i++ { + switch v[i] { + case '\'', '\\', '\b', '\t', '\r', '\n': buf = append(buf, '\'') } - buf = append(buf, arg.Value[i]) + buf = append(buf, v[i]) } buf = append(buf, '\'') + } + cmd = string(buf) + body = r.Body + return +} + +// HTTPRequest returns components for an HTTP request. +// If the request is invalid, HTTPRequest returns an error. +func (r *Request) HTTPRequest() (cmd string, params map[string]string, body io.Reader, err error) { + if err = r.Check(); err != nil { + return + } + cmd = r.Command + params = r.Params + body = r.Body + return +} +// NeedBody returns whether or not the request requires a body. +func (r *Request) NeedBody() bool { + switch r.Command { + case "load": + _, ok := r.Params["values"] + return !ok + default: + return false } - return buf, nil +} + +// Check checks whether or not the request is valid. +func (r *Request) Check() error { + if err := checkCommand(r.Command); err != nil { + return err + } + cr := r.CommandRule + if cr == nil { + cr = GetCommandRule(r.Command) + } + for k, v := range r.Params { + if err := cr.CheckParam(k, v); err != nil { + return EnhanceError(err, map[string]interface{}{ + "command": r.Command, + }) + } + } + for _, pr := range cr.ParamRules { + if pr.Required { + if _, ok := r.Params[pr.Key]; !ok { + return NewError(StatusInvalidCommand, map[string]interface{}{ + "command": r.Command, + "key": pr.Key, + "error": "The parameter is required.", + }) + } + } + } + switch r.Command { + case "load": + if _, ok := r.Params["values"]; ok { + if r.Body != nil { + return NewError(StatusInvalidCommand, map[string]interface{}{ + "command": r.Command, + "hasValues": true, + "hasBody": true, + "error": "The command does not accept a body.", + }) + } + } else if r.Body == nil { + return NewError(StatusInvalidCommand, map[string]interface{}{ + "command": r.Command, + "hasValues": false, + "hasBody": false, + "error": "The command requires a body.", + }) + } + default: + if r.Body != nil { + return NewError(StatusInvalidCommand, map[string]interface{}{ + "command": r.Command, + "hasBody": true, + "error": "The command does not accept a body.", + }) + } + } + return nil } Added: v2/request_test.go (+166 -0) 100644 =================================================================== --- /dev/null +++ v2/request_test.go 2017-06-09 09:39:36 +0900 (813843a) @@ -0,0 +1,166 @@ +package grnci + +import ( + "testing" +) + +func TestNewRequest(t *testing.T) { + params := map[string]string{ + "table": "Tbl", + "filter": "value < 100", + "sort_keys": "value", + } + req, err := NewRequest("select", params, nil) + if err != nil { + t.Fatalf("NewRequest failed: %v", err) + } + if req.Command != "select" { + t.Fatalf("ParseRequest failed: cmd = %s, want = %s", + req.Command, "select") + } + for key, value := range params { + if req.Params[key] != value { + t.Fatalf("ParseRequest failed: params[\"%s\"] = %s, want = %s", + key, req.Params[key], value) + } + } +} + +func TestParseRequest(t *testing.T) { + req, err := ParseRequest(`select Tbl --query "\"apple juice\"" --filter 'price < 100'`, nil) + if err != nil { + t.Fatalf("ParseRequest failed: %v", err) + } + if req.Command != "select" { + t.Fatalf("ParseRequest failed: command: actual = %s, want = %s", + req.Command, "select") + } + if req.Params["table"] != "Tbl" { + t.Fatalf("ParseRequest failed: params[\"table\"] = %s, want = %s", + req.Params["table"], "Tbl") + } + if req.Params["query"] != "\"apple juice\"" { + t.Fatalf("ParseRequest failed: params[\"query\"] = %s, want = %s", + req.Params["query"], "apple juice") + } + if req.Params["filter"] != "price < 100" { + t.Fatalf("ParseRequest failed: params[\"filter\"] = %s, want = %s", + req.Params["filter"], "price < 100") + } +} + +func TestRequestAddParam(t *testing.T) { + params := map[string]string{ + "table": "Tbl", + "filter": "value < 100", + "sort_keys": "value", + } + req, err := NewRequest("select", nil, nil) + if err != nil { + t.Fatalf("NewRequest failed: %v", err) + } + for key, value := range params { + if err := req.AddParam(key, value); err != nil { + t.Fatalf("req.AddParam failed: %v", err) + } + } + if req.Command != "select" { + t.Fatalf("ParseRequest failed: cmd = %s, want = %s", + req.Command, "select") + } + for key, value := range params { + if req.Params[key] != value { + t.Fatalf("ParseRequest failed: params[\"%s\"] = %s, want = %s", + key, req.Params[key], value) + } + } +} + +func TestRequestCheck(t *testing.T) { + data := map[string]bool{ + "status": true, + "select Tbl": true, + "select --123 xyz": false, + "_select --table Tbl": true, + "load --table Tbl": false, + "load --table Tbl --values []": true, + } + for cmd, want := range data { + req, err := ParseRequest(cmd, nil) + if err != nil { + t.Fatalf("ParseRequest failed: %v", err) + } + err = req.Check() + actual := err == nil + if actual != want { + t.Fatalf("req.Check failed: cmd = %s, actual = %v, want = %v, err = %v", + cmd, actual, want, err) + } + } +} + +func TestRequestGQTPRequest(t *testing.T) { + params := map[string]string{ + "table": "Tbl", + "filter": "value < 100", + "sort_keys": "value", + } + req, err := NewRequest("select", params, nil) + if err != nil { + t.Fatalf("NewRequest failed: %v", err) + } + actual, _, err := req.GQTPRequest() + if err != nil { + t.Fatalf("req.GQTPRequest failed: %v", err) + } + want := "select --filter 'value < 100' --sort_keys 'value' --table 'Tbl'" + if actual != want { + t.Fatalf("req.GQTPRequest failed: actual = %s, want = %s", + actual, want) + } +} + +func TestRequestHTTPRequest(t *testing.T) { + req, err := ParseRequest(`select Tbl --query "\"apple juice\"" --filter 'price < 100'`, nil) + if err != nil { + t.Fatalf("ParseRequest failed: %v", err) + } + cmd, params, _, err := req.HTTPRequest() + if err != nil { + t.Fatalf("req.HTTPRequest failed: %v", err) + } + if cmd != "select" { + t.Fatalf("req.HTTPRequest failed: cmd = %s, want = %s", cmd, "select") + } + if params["table"] != "Tbl" { + t.Fatalf("ParseRequest failed: params[\"table\"] = %s, want = %s", + params["table"], "Tbl") + } + if params["query"] != "\"apple juice\"" { + t.Fatalf("ParseRequest failed: params[\"query\"] = %s, want = %s", + params["query"], "apple juice") + } + if params["filter"] != "price < 100" { + t.Fatalf("ParseRequest failed: params[\"filter\"] = %s, want = %s", + params["filter"], "price < 100") + } +} + +func TestRequestNeedBody(t *testing.T) { + data := map[string]bool{ + "status": false, + "select Tbl": false, + "load --table Tbl": true, + "load --table Tbl --values []": false, + } + for cmd, want := range data { + req, err := ParseRequest(cmd, nil) + if err != nil { + t.Fatalf("ParseRequest failed: %v", err) + } + actual := req.NeedBody() + if actual != want { + t.Fatalf("req.NeedBody failed: actual = %v, want = %v", actual, want) + } + } +} Modified: v2/response.go (+19 -9) =================================================================== --- v2/response.go 2017-05-17 19:23:04 +0900 (c5323e6) +++ v2/response.go 2017-06-09 09:39:36 +0900 (dac839d) @@ -4,14 +4,24 @@ import ( "time" ) -// Response stores a response of Groonga. -type Response struct { - Bytes []byte - Error error - Time time.Time - Elapsed time.Duration -} +// Response is an interface for responses. +type Response interface { + // Status returns the status code. + Status() int + + // Start returns the start time. + Start() time.Time + + // Elapsed returns the elapsed time. + Elapsed() time.Duration + + // Read reads the response body at most len(p) bytes into p. + // The return value n is the number of bytes read. + Read(p []byte) (n int, err error) + + // Close closes the response body. + Close() error -func NewResponse([]byte) (*Response, error) { - return nil, nil + // Err returns an error. + Err() error } Added: v2/rule.go (+523 -0) 100644 =================================================================== --- /dev/null +++ v2/rule.go 2017-06-09 09:39:36 +0900 (3496045) @@ -0,0 +1,523 @@ +package grnci + +// TODO: add functions to check parameters. + +// checkParamKeyDefault is the default function to check parameter keys. +func checkParamKeyDefault(k string) error { + if k == "" { + return NewError(StatusInvalidCommand, map[string]interface{}{ + "key": k, + "error": "len(key) must not be 0.", + }) + } + for i := 0; i < len(k); i++ { + switch { + case k[i] >= '0' && k[i] <= '9': + case k[i] >= 'a' && k[i] <= 'z': + case k[i] >= 'A' && k[i] <= 'Z': + default: + switch k[i] { + case '#', '@', '-', '_', '.', '[', ']': + default: + return NewError(StatusInvalidCommand, map[string]interface{}{ + "key": k, + "error": "key must consist of [0-9a-zA-Z#@-_.[]].", + }) + } + } + } + return nil +} + +// checkParamValueDefault is the default function to check parameter values. +func checkParamValueDefault(v string) error { + return nil +} + +// checkParamDefault is the default function to check parameters. +func checkParamDefault(k, v string) error { + if err := checkParamKeyDefault(k); err != nil { + return EnhanceError(err, map[string]interface{}{ + "value": v, + }) + } + if err := checkParamValueDefault(v); err != nil { + return EnhanceError(err, map[string]interface{}{ + "key": k, + }) + } + return nil +} + +// checkCommand checks whether s is valid as a command. +func checkCommand(s string) error { + if _, ok := CommandRules[s]; ok { + return nil + } + if s == "" { + return NewError(StatusInvalidCommand, map[string]interface{}{ + "command": s, + "error": "len(command) must not be 0.", + }) + } + for i := 0; i < len(s); i++ { + if !(s[i] >= 'a' && s[i] <= 'z') && s[i] != '_' { + return NewError(StatusInvalidCommand, map[string]interface{}{ + "command": s, + "error": "command must consist of [a-z_].", + }) + } + } + return nil +} + +// ParamRule is a parameter rule. +type ParamRule struct { + Key string // Parameter key + ValueChecker func(v string) error // Function to check parameter values + Required bool // Whether the parameter is required +} + +// NewParamRule returns a new ParamRule. +func NewParamRule(key string, valueChecker func(v string) error, required bool) *ParamRule { + return &ParamRule{ + Key: key, + ValueChecker: valueChecker, + Required: required, + } +} + +// CheckValue checks a parameter value. +func (pr *ParamRule) CheckValue(v string) error { + if pr.ValueChecker != nil { + return pr.ValueChecker(v) + } + return checkParamValueDefault(v) +} + +// CommandRule is a command rule. +type CommandRule struct { + ParamChecker func(k, v string) error // Function to check uncommon parameters + ParamRules []*ParamRule // Ordered common parameters + ParamRulesMap map[string]*ParamRule // Index for ParamRules +} + +// GetCommandRule returns the command rule for the specified command. +func GetCommandRule(cmd string) *CommandRule { + if cr := CommandRules[cmd]; cr != nil { + return cr + } + return DefaultCommandRule +} + +// NewCommandRule returns a new CommandRule. +func NewCommandRule(paramChecker func(k, v string) error, prs ...*ParamRule) *CommandRule { + prMap := make(map[string]*ParamRule) + for _, pr := range prs { + prMap[pr.Key] = pr + } + return &CommandRule{ + ParamChecker: paramChecker, + ParamRules: prs, + ParamRulesMap: prMap, + } +} + +// CheckParam checks a parameter. +func (cr *CommandRule) CheckParam(k, v string) error { + if cr, ok := cr.ParamRulesMap[k]; ok { + if err := cr.CheckValue(v); err != nil { + return EnhanceError(err, map[string]interface{}{ + "key": k, + }) + } + return nil + } + if cr.ParamChecker != nil { + return cr.ParamChecker(k, v) + } + return checkParamDefault(k, v) +} + +// commandRules is provided to hide CommandRules in doc. +var commandRules = map[string]*CommandRule{ + "cache_limit": NewCommandRule( + nil, + NewParamRule("max", nil, false), + ), + "check": NewCommandRule( + nil, + NewParamRule("obj", nil, true), + ), + "clearlock": NewCommandRule( + nil, + NewParamRule("objname", nil, true), + ), + "column_copy": NewCommandRule( + nil, + NewParamRule("from_table", nil, true), + NewParamRule("from_name", nil, true), + NewParamRule("to_table", nil, true), + NewParamRule("to_name", nil, true), + ), + "column_create": NewCommandRule( + nil, + NewParamRule("table", nil, true), + NewParamRule("name", nil, true), + NewParamRule("flags", nil, true), + NewParamRule("type", nil, true), + NewParamRule("source", nil, false), + ), + "column_list": NewCommandRule( + nil, + NewParamRule("table", nil, true), + ), + "column_remove": NewCommandRule( + nil, + NewParamRule("table", nil, true), + NewParamRule("name", nil, true), + ), + "column_rename": NewCommandRule( + nil, + NewParamRule("table", nil, true), + NewParamRule("name", nil, true), + NewParamRule("new_name", nil, true), + ), + "config_delete": NewCommandRule( + nil, + NewParamRule("key", nil, true), + ), + "config_get": NewCommandRule( + nil, + NewParamRule("key", nil, true), + ), + "config_set": NewCommandRule( + nil, + NewParamRule("key", nil, true), + NewParamRule("value", nil, true), + ), + "database_unmap": NewCommandRule( + nil, + ), + "define_selector": NewCommandRule( + nil, + NewParamRule("name", nil, true), + NewParamRule("table", nil, true), + NewParamRule("match_columns", nil, false), + NewParamRule("query", nil, false), + NewParamRule("filter", nil, false), + NewParamRule("scorer", nil, false), + NewParamRule("sortby", nil, false), + NewParamRule("output_columns", nil, false), + NewParamRule("offset", nil, false), + NewParamRule("limit", nil, false), + NewParamRule("drilldown", nil, false), + NewParamRule("drilldown_sortby", nil, false), + NewParamRule("drilldown_output_columns", nil, false), + NewParamRule("drilldown_offset", nil, false), + NewParamRule("drilldown_limit", nil, false), + ), + "defrag": NewCommandRule( + nil, + NewParamRule("objname", nil, true), + NewParamRule("threshold", nil, true), + ), + "delete": NewCommandRule( + nil, + NewParamRule("table", nil, true), + NewParamRule("key", nil, false), + NewParamRule("id", nil, false), + NewParamRule("filter", nil, false), + ), + "dump": NewCommandRule( + nil, + NewParamRule("tables", nil, false), + NewParamRule("dump_plugins", nil, false), + NewParamRule("dump_schema", nil, false), + NewParamRule("dump_records", nil, false), + NewParamRule("dump_indexes", nil, false), + ), + "io_flush": NewCommandRule( + nil, + NewParamRule("target_name", nil, false), + NewParamRule("recursive", nil, false), + ), + "load": NewCommandRule( + nil, + NewParamRule("values", nil, false), // values may be passes as a body. + NewParamRule("table", nil, true), + NewParamRule("columns", nil, false), + NewParamRule("ifexists", nil, false), + NewParamRule("input_type", nil, false), + ), + "lock_acquire": NewCommandRule( + nil, + NewParamRule("target_name", nil, false), + ), + "lock_clear": NewCommandRule( + nil, + NewParamRule("target_name", nil, false), + ), + "lock_release": NewCommandRule( + nil, + NewParamRule("target_name", nil, false), + ), + "log_level": NewCommandRule( + nil, + NewParamRule("level", nil, true), + ), + "log_put": NewCommandRule( + nil, + NewParamRule("level", nil, true), + NewParamRule("message", nil, true), + ), + "log_reopen": NewCommandRule( + nil, + ), + "logical_count": NewCommandRule( + nil, + NewParamRule("logical_table", nil, true), + NewParamRule("shard_key", nil, true), + NewParamRule("min", nil, false), + NewParamRule("min_border", nil, false), + NewParamRule("max", nil, false), + NewParamRule("max_border", nil, false), + NewParamRule("filter", nil, false), + ), + "logical_parameters": NewCommandRule( + nil, + NewParamRule("range_index", nil, false), + ), + "logical_range_filter": NewCommandRule( + nil, + NewParamRule("logical_table", nil, true), + NewParamRule("shard_key", nil, true), + NewParamRule("min", nil, false), + NewParamRule("min_border", nil, false), + NewParamRule("max", nil, false), + NewParamRule("max_border", nil, false), + NewParamRule("order", nil, false), + NewParamRule("filter", nil, false), + NewParamRule("offset", nil, false), + NewParamRule("limit", nil, false), + NewParamRule("output_columns", nil, false), + NewParamRule("use_range_index", nil, false), + ), + "logical_select": NewCommandRule( + nil, + NewParamRule("logical_table", nil, true), + NewParamRule("shard_key", nil, true), + NewParamRule("min", nil, false), + NewParamRule("min_border", nil, false), + NewParamRule("max", nil, false), + NewParamRule("max_border", nil, false), + NewParamRule("filter", nil, false), + NewParamRule("sortby", nil, false), + NewParamRule("output_columns", nil, false), + NewParamRule("offset", nil, false), + NewParamRule("limit", nil, false), + NewParamRule("drilldown", nil, false), + NewParamRule("drilldown_sortby", nil, false), + NewParamRule("drilldown_output_columns", nil, false), + NewParamRule("drilldown_offset", nil, false), + NewParamRule("drilldown_limit", nil, false), + NewParamRule("drilldown_calc_types", nil, false), + NewParamRule("drilldown_calc_target", nil, false), + NewParamRule("sort_keys", nil, false), + NewParamRule("drilldown_sort_keys", nil, false), + NewParamRule("match_columns", nil, false), + NewParamRule("query", nil, false), + NewParamRule("drilldown_filter", nil, false), + ), + "logical_shard_list": NewCommandRule( + nil, + NewParamRule("logical_table", nil, true), + ), + "logical_table_remove": NewCommandRule( + nil, + NewParamRule("logical_table", nil, true), + NewParamRule("shard_key", nil, true), + NewParamRule("min", nil, false), + NewParamRule("min_border", nil, false), + NewParamRule("max", nil, false), + NewParamRule("max_border", nil, false), + NewParamRule("dependent", nil, false), + NewParamRule("force", nil, false), + ), + "normalize": NewCommandRule( + nil, + NewParamRule("normalizer", nil, true), + NewParamRule("string", nil, true), + NewParamRule("flags", nil, false), + ), + "normalizer_list": NewCommandRule( + nil, + ), + "object_exist": NewCommandRule( + nil, + NewParamRule("name", nil, true), + ), + "object_inspect": NewCommandRule( + nil, + NewParamRule("name", nil, false), + ), + "object_list": NewCommandRule( + nil, + ), + "object_remove": NewCommandRule( + nil, + NewParamRule("name", nil, true), + NewParamRule("force", nil, false), + ), + "plugin_register": NewCommandRule( + nil, + NewParamRule("name", nil, true), + ), + "plugin_unregister": NewCommandRule( + nil, + NewParamRule("name", nil, true), + ), + "query_expand": NewCommandRule( + nil, + ), // TODO + "quit": NewCommandRule( + nil, + ), + "range_filter": NewCommandRule( + nil, + ), // TODO + "register": NewCommandRule( + nil, + NewParamRule("path", nil, true), + ), + "reindex": NewCommandRule( + nil, + NewParamRule("target_name", nil, false), + ), + "request_cancel": NewCommandRule( + nil, + NewParamRule("id", nil, true), + ), + "ruby_eval": NewCommandRule( + nil, + NewParamRule("script", nil, true), + ), + "ruby_load": NewCommandRule( + nil, + NewParamRule("path", nil, true), + ), + "schema": NewCommandRule( + nil, + ), + "select": NewCommandRule( + nil, + NewParamRule("table", nil, true), + NewParamRule("match_columns", nil, false), + NewParamRule("query", nil, false), + NewParamRule("filter", nil, false), + NewParamRule("scorer", nil, false), + NewParamRule("sortby", nil, false), + NewParamRule("output_columns", nil, false), + NewParamRule("offset", nil, false), + NewParamRule("limit", nil, false), + NewParamRule("drilldown", nil, false), + NewParamRule("drilldown_sortby", nil, false), + NewParamRule("drilldown_output_columns", nil, false), + NewParamRule("drilldown_offset", nil, false), + NewParamRule("drilldown_limit", nil, false), + NewParamRule("cache", nil, false), + NewParamRule("match_escalation_threshold", nil, false), + NewParamRule("query_expansion", nil, false), + NewParamRule("query_flags", nil, false), + NewParamRule("query_expander", nil, false), + NewParamRule("adjuster", nil, false), + NewParamRule("drilldown_calc_types", nil, false), + NewParamRule("drilldown_calc_target", nil, false), + NewParamRule("drilldown_filter", nil, false), + NewParamRule("sort_keys", nil, false), + NewParamRule("drilldown_sort_keys", nil, false), + ), + "shutdown": NewCommandRule( + nil, + NewParamRule("mode", nil, false), + ), + "status": NewCommandRule( + nil, + ), + "suggest": NewCommandRule( + nil, + NewParamRule("types", nil, true), + NewParamRule("table", nil, true), + NewParamRule("column", nil, true), + NewParamRule("query", nil, true), + NewParamRule("sortby", nil, false), + NewParamRule("output_columns", nil, false), + NewParamRule("offset", nil, false), + NewParamRule("limit", nil, false), + NewParamRule("frequency_threshold", nil, false), + NewParamRule("conditional_probability_threshold", nil, false), + NewParamRule("prefix_search", nil, false), + ), + "table_copy": NewCommandRule( + nil, + NewParamRule("from_name", nil, true), + NewParamRule("to_name", nil, true), + ), + "table_create": NewCommandRule( + nil, + NewParamRule("name", nil, true), + NewParamRule("flags", nil, false), + NewParamRule("key_type", nil, false), + NewParamRule("value_type", nil, false), + NewParamRule("default_tokenizer", nil, false), + NewParamRule("normalizer", nil, false), + NewParamRule("token_filters", nil, false), + ), + "table_list": NewCommandRule( + nil, + ), + "table_remove": NewCommandRule( + nil, + NewParamRule("name", nil, true), + NewParamRule("dependent", nil, false), + ), + "table_rename": NewCommandRule( + nil, + NewParamRule("name", nil, true), + NewParamRule("new_name", nil, true), + ), + "table_tokenize": NewCommandRule( + nil, + NewParamRule("table", nil, true), + NewParamRule("string", nil, true), + NewParamRule("flags", nil, false), + NewParamRule("mode", nil, false), + NewParamRule("index_column", nil, false), + ), + "thread_limit": NewCommandRule( + nil, + NewParamRule("max", nil, false), + ), + "tokenize": NewCommandRule( + nil, + NewParamRule("tokenizer", nil, true), + NewParamRule("string", nil, true), + NewParamRule("normalizer", nil, false), + NewParamRule("flags", nil, false), + NewParamRule("mode", nil, false), + NewParamRule("token_filters", nil, false), + ), + "tokenizer_list": NewCommandRule( + nil, + ), + "truncate": NewCommandRule( + nil, + NewParamRule("target_name", nil, true), + ), +} + +// CommandRules is a map of command rules. +var CommandRules = commandRules + +// DefaultCommandRule is applied to commands not listed in CommandRules. +var DefaultCommandRule = NewCommandRule(nil)