Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Add support for DROP VIEW #860

Open
wants to merge 7 commits into
base: feature/views
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *Engine) Query(
case *plan.CreateIndex:
typ = sql.CreateIndexProcess
perm = auth.ReadPerm | auth.WritePerm
case *plan.InsertInto, *plan.DeleteFrom, *plan.Update, *plan.DropIndex, *plan.UnlockTables, *plan.LockTables, *plan.CreateView:
case *plan.InsertInto, *plan.DeleteFrom, *plan.Update, *plan.DropIndex, *plan.UnlockTables, *plan.LockTables, *plan.CreateView, *plan.DropView:
perm = auth.ReadPerm | auth.WritePerm
}

Expand Down
1 change: 1 addition & 0 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3149,6 +3149,7 @@ func TestReadOnly(t *testing.T) {
`DROP INDEX foo ON mytable`,
`INSERT INTO mytable (i, s) VALUES(42, 'yolo')`,
`CREATE VIEW myview AS SELECT i FROM mytable`,
`DROP VIEW myview`,
}

for _, query := range writingQueries {
Expand Down
4 changes: 4 additions & 0 deletions sql/analyzer/assign_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func assignCatalog(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error)
nc := *node
nc.Catalog = a.Catalog
return &nc, nil
case *plan.DropView:
nc := *node
nc.Catalog = a.Catalog
return &nc, nil
default:
return n, nil
}
Expand Down
6 changes: 6 additions & 0 deletions sql/analyzer/assign_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,10 @@ func TestAssignCatalog(t *testing.T) {
cv, ok := node.(*plan.CreateView)
require.True(ok)
require.Equal(c, cv.Catalog)

node, err = f.Apply(sql.NewEmptyContext(), a, plan.NewDropView(nil, false))
require.NoError(err)
dv, ok := node.(*plan.DropView)
require.True(ok)
require.Equal(c, dv.Catalog)
}
3 changes: 3 additions & 0 deletions sql/parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
describeTablesRegex = regexp.MustCompile(`^(describe|desc)\s+table\s+(.*)`)
createIndexRegex = regexp.MustCompile(`^create\s+index\s+`)
createViewRegex = regexp.MustCompile(`^create\s+(or\s+replace\s+)?view\s+`)
dropViewRegex = regexp.MustCompile(`^drop\s+(if\s+exists\s+)?view\s+`)
dropIndexRegex = regexp.MustCompile(`^drop\s+index\s+`)
showIndexRegex = regexp.MustCompile(`^show\s+(index|indexes|keys)\s+(from|in)\s+\S+\s*`)
showCreateRegex = regexp.MustCompile(`^show create\s+\S+\s*`)
Expand Down Expand Up @@ -84,6 +85,8 @@ func Parse(ctx *sql.Context, query string) (sql.Node, error) {
return parseCreateIndex(ctx, s)
case createViewRegex.MatchString(lowerQuery):
return parseCreateView(ctx, s)
case dropViewRegex.MatchString(lowerQuery):
return parseDropView(ctx, s)
case dropIndexRegex.MatchString(lowerQuery):
return parseDropIndex(s)
case showIndexRegex.MatchString(lowerQuery):
Expand Down
62 changes: 62 additions & 0 deletions sql/parse/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,65 @@ func maybeList(opening, separator, closing rune, list *[]string) parseFunc {
}
}
}

// A qualifiedName represents an identifier of type "db_name.table_name"
type qualifiedName struct {
qualifier string
name string
}

// readQualifiedIdentifierList reads a comma-separated list of qualifiedNames.
// Any number of spaces between the qualified names are accepted. The qualifier
// may be empty, in which case the period is optional.
// An example of a correctly formed list is:
// "my_db.myview, db_2.mytable , aTable"
func readQualifiedIdentifierList(list *[]qualifiedName) parseFunc {
return func(rd *bufio.Reader) error {
for {
var newItem []string
err := parseFuncs{
skipSpaces,
readIdentList('.', &newItem),
skipSpaces,
}.exec(rd)

if err != nil {
return err
}

if len(newItem) < 1 || len(newItem) > 2 {
return errUnexpectedSyntax.New(
"[qualifier.]name",
strings.Join(newItem, "."),
)
}

var qualifier, name string

if len(newItem) == 1 {
qualifier = ""
name = newItem[0]
} else {
qualifier = newItem[0]
name = newItem[1]
}

*list = append(*list, qualifiedName{qualifier, name})

r, _, err := rd.ReadRune()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

switch r {
case ',':
continue
default:
return rd.UnreadRune()
}
}
}
}
57 changes: 57 additions & 0 deletions sql/parse/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,60 @@ func TestReadSpaces(t *testing.T) {
require.Equal(fixture.expectedRemaining, actualRemaining)
}
}

// Tests that readQualifiedIdentifierList correctly parses well-formed lists,
// populating the list of identifiers, and that it errors with partial lists
// and when it does not found any identifiers
func TestReadQualifiedIdentifierList(t *testing.T) {
require := require.New(t)

testFixtures := []struct {
string string
expectedList []qualifiedName
expectedError bool
expectedRemaining string
}{
{
"my_db.myview, db_2.mytable , aTable",
[]qualifiedName{{"my_db", "myview"}, {"db_2", "mytable"}, {"", "aTable"}},
false,
"",
},
{
"single_identifier -remaining",
[]qualifiedName{{"", "single_identifier"}},
false,
"-remaining",
},
{
"",
nil,
true,
"",
},
{
"partial_list,",
[]qualifiedName{{"", "partial_list"}},
true,
"",
},
}

for _, fixture := range testFixtures {
reader := bufio.NewReader(strings.NewReader(fixture.string))
var actualList []qualifiedName

err := readQualifiedIdentifierList(&actualList)(reader)

if fixture.expectedError {
require.Error(err)
} else {
require.NoError(err)
}

require.Equal(fixture.expectedList, actualList)

actualRemaining, _ := reader.ReadString('\n')
require.Equal(fixture.expectedRemaining, actualRemaining)
}
}
46 changes: 46 additions & 0 deletions sql/parse/views.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

var ErrMalformedViewName = errors.NewKind("the view name '%s' is not correct")
var ErrMalformedCreateView = errors.NewKind("view definition %#v is not a SELECT query")
var ErrViewsToDropNotFound = errors.NewKind("the list of views to drop must contain at least one view")

// parseCreateView parses
// CREATE [OR REPLACE] VIEW [db_name.]view_name AS select_statement
Expand Down Expand Up @@ -87,3 +88,48 @@ func parseCreateView(ctx *sql.Context, s string) (sql.Node, error) {
sql.UnresolvedDatabase(databaseName), viewName, columns, subqueryAlias, isReplace,
), nil
}

// parseDropView parses
// DROP VIEW [IF EXISTS] [db_name1.]view_name1 [, [db_name2.]view_name2, ...]
// [RESTRICT] [CASCADE]
// and returns a DropView node in case of success. As per MySQL specification,
// RESTRICT and CASCADE, if given, are parsed and ignored.
func parseDropView(ctx *sql.Context, s string) (sql.Node, error) {
r := bufio.NewReader(strings.NewReader(s))

var (
views []qualifiedName
ifExists bool
unusedBool bool
)

err := parseFuncs{
expect("drop"),
skipSpaces,
expect("view"),
skipSpaces,
multiMaybe(&ifExists, "if", "exists"),
skipSpaces,
readQualifiedIdentifierList(&views),
skipSpaces,
maybe(&unusedBool, "restrict"),
skipSpaces,
maybe(&unusedBool, "cascade"),
checkEOF,
}.exec(r)

if err != nil {
return nil, err
}

if len(views) < 1 {
return nil, ErrViewsToDropNotFound.New()
}

plans := make([]sql.Node, len(views))
for i, view := range views {
plans[i] = plan.NewSingleDropView(sql.UnresolvedDatabase(view.qualifier), view.name)
}

return plan.NewDropView(plans, ifExists), nil
}
74 changes: 74 additions & 0 deletions sql/parse/views_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,77 @@ func TestParseCreateView(t *testing.T) {
})
}
}

func TestParseDropView(t *testing.T) {
var fixtures = map[string]sql.Node{
`DROP VIEW view1`: plan.NewDropView(
[]sql.Node{plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view1")},
false,
),
`DROP VIEW view1, view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view2"),
},
false,
),
`DROP VIEW db1.view1`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
},
false,
),
`DROP VIEW db1.view1, view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view2"),
},
false,
),
`DROP VIEW view1, db2.view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase("db2"), "view2"),
},
false,
),
`DROP VIEW db1.view1, db2.view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase("db2"), "view2"),
},
false,
),
`DROP VIEW IF EXISTS myview`: plan.NewDropView(
[]sql.Node{plan.NewSingleDropView(sql.UnresolvedDatabase(""), "myview")},
true,
),
`DROP VIEW IF EXISTS db1.view1, db2.view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase("db2"), "view2"),
},
true,
),
`DROP VIEW IF EXISTS db1.view1, db2.view2 RESTRICT CASCADE`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase("db2"), "view2"),
},
true,
),
}

for query, expectedPlan := range fixtures {
t.Run(query, func(t *testing.T) {
require := require.New(t)

ctx := sql.NewEmptyContext()
lowerquery := strings.ToLower(query)
result, err := parseDropView(ctx, lowerquery)

require.NoError(err)
require.Equal(expectedPlan, result)
})
}
}
Loading