Skip to content
This repository has been archived by the owner on Jan 28, 2021. It is now read-only.

Add support for DROP VIEW #860

Open
wants to merge 7 commits into
base: feature/views
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (e *Engine) Query(
case *plan.CreateIndex:
typ = sql.CreateIndexProcess
perm = auth.ReadPerm | auth.WritePerm
case *plan.InsertInto, *plan.DeleteFrom, *plan.Update, *plan.DropIndex, *plan.UnlockTables, *plan.LockTables, *plan.CreateView:
case *plan.InsertInto, *plan.DeleteFrom, *plan.Update, *plan.DropIndex, *plan.UnlockTables, *plan.LockTables, *plan.CreateView, *plan.DropView:
perm = auth.ReadPerm | auth.WritePerm
}

Expand Down
1 change: 1 addition & 0 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3149,6 +3149,7 @@ func TestReadOnly(t *testing.T) {
`DROP INDEX foo ON mytable`,
`INSERT INTO mytable (i, s) VALUES(42, 'yolo')`,
`CREATE VIEW myview AS SELECT i FROM mytable`,
`DROP VIEW myview`,
}

for _, query := range writingQueries {
Expand Down
4 changes: 4 additions & 0 deletions sql/analyzer/assign_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func assignCatalog(ctx *sql.Context, a *Analyzer, n sql.Node) (sql.Node, error)
nc := *node
nc.Catalog = a.Catalog
return &nc, nil
case *plan.DropView:
nc := *node
nc.Catalog = a.Catalog
return &nc, nil
default:
return n, nil
}
Expand Down
6 changes: 6 additions & 0 deletions sql/analyzer/assign_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,10 @@ func TestAssignCatalog(t *testing.T) {
cv, ok := node.(*plan.CreateView)
require.True(ok)
require.Equal(c, cv.Catalog)

node, err = f.Apply(sql.NewEmptyContext(), a, plan.NewDropView(nil, false))
require.NoError(err)
dv, ok := node.(*plan.DropView)
require.True(ok)
require.Equal(c, dv.Catalog)
}
3 changes: 3 additions & 0 deletions sql/parse/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ var (
describeTablesRegex = regexp.MustCompile(`^(describe|desc)\s+table\s+(.*)`)
createIndexRegex = regexp.MustCompile(`^create\s+index\s+`)
createViewRegex = regexp.MustCompile(`^create\s+(or\s+replace\s+)?view\s+`)
dropViewRegex = regexp.MustCompile(`^drop\s+(if\s+exists\s+)?view\s+`)
dropIndexRegex = regexp.MustCompile(`^drop\s+index\s+`)
showIndexRegex = regexp.MustCompile(`^show\s+(index|indexes|keys)\s+(from|in)\s+\S+\s*`)
showCreateRegex = regexp.MustCompile(`^show create\s+\S+\s*`)
Expand Down Expand Up @@ -84,6 +85,8 @@ func Parse(ctx *sql.Context, query string) (sql.Node, error) {
return parseCreateIndex(ctx, s)
case createViewRegex.MatchString(lowerQuery):
return parseCreateView(ctx, s)
case dropViewRegex.MatchString(lowerQuery):
return parseDropView(ctx, s)
case dropIndexRegex.MatchString(lowerQuery):
return parseDropIndex(s)
case showIndexRegex.MatchString(lowerQuery):
Expand Down
62 changes: 62 additions & 0 deletions sql/parse/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,3 +516,65 @@ func maybeList(opening, separator, closing rune, list *[]string) parseFunc {
}
}
}

// A QualifiedName represents an identifier of type "db_name.table_name"
type QualifiedName struct {
agarciamontoro marked this conversation as resolved.
Show resolved Hide resolved
qualifier string
name string
}

// readQualifiedIdentifierList reads a comma-separated list of QualifiedNames.
// Any number of spaces between the qualified names are accepted. The qualifier
// may be empty, in which case the period is optional.
// An example of a correctly formed list is:
// "my_db.myview, db_2.mytable , aTable"
func readQualifiedIdentifierList(list *[]QualifiedName) parseFunc {
return func(rd *bufio.Reader) error {
for {
var newItem []string
err := parseFuncs{
skipSpaces,
readIdentList('.', &newItem),
skipSpaces,
}.exec(rd)

if err != nil {
return err
}

if len(newItem) < 1 || len(newItem) > 2 {
return errUnexpectedSyntax.New(
"[qualifier.]name",
strings.Join(newItem, "."),
)
}

var qualifier, name string

if len(newItem) == 1 {
qualifier = ""
name = newItem[0]
} else {
qualifier = newItem[0]
name = newItem[1]
}

*list = append(*list, QualifiedName{qualifier, name})

r, _, err := rd.ReadRune()
if err != nil {
if err == io.EOF {
return nil
}
return err
}

switch r {
case ',':
continue
default:
return rd.UnreadRune()
}
}
}
}
57 changes: 57 additions & 0 deletions sql/parse/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,3 +465,60 @@ func TestReadSpaces(t *testing.T) {
require.Equal(fixture.expectedRemaining, actualRemaining)
}
}

// Tests that readQualifiedIdentifierList correctly parses well-formed lists,
// populating the list of identifiers, and that it errors with partial lists
// and when it does not found any identifiers
func TestReadQualifiedIdentifierList(t *testing.T) {
require := require.New(t)

testFixtures := []struct {
string string
expectedList []QualifiedName
expectedError bool
expectedRemaining string
}{
{
"my_db.myview, db_2.mytable , aTable",
[]QualifiedName{{"my_db", "myview"}, {"db_2", "mytable"}, {"", "aTable"}},
false,
"",
},
{
"single_identifier -remaining",
[]QualifiedName{{"", "single_identifier"}},
false,
"-remaining",
},
{
"",
nil,
true,
"",
},
{
"partial_list,",
[]QualifiedName{{"", "partial_list"}},
true,
"",
},
}

for _, fixture := range testFixtures {
reader := bufio.NewReader(strings.NewReader(fixture.string))
var actualList []QualifiedName

err := readQualifiedIdentifierList(&actualList)(reader)

if fixture.expectedError {
require.Error(err)
} else {
require.NoError(err)
}

require.Equal(fixture.expectedList, actualList)

actualRemaining, _ := reader.ReadString('\n')
require.Equal(fixture.expectedRemaining, actualRemaining)
}
}
46 changes: 46 additions & 0 deletions sql/parse/views.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

var ErrMalformedViewName = errors.NewKind("the view name '%s' is not correct")
var ErrMalformedCreateView = errors.NewKind("view definition %#v is not a SELECT query")
var ErrViewsToDropNotFound = errors.NewKind("the list of views to drop must contain at least one view")

// parseCreateView parses
// CREATE [OR REPLACE] VIEW [db_name.]view_name AS select_statement
Expand Down Expand Up @@ -87,3 +88,48 @@ func parseCreateView(ctx *sql.Context, s string) (sql.Node, error) {
sql.UnresolvedDatabase(databaseName), viewName, columns, subqueryAlias, isReplace,
), nil
}

// parseDropView parses
// DROP VIEW [IF EXISTS] [db_name1.]view_name1 [, [db_name2.]view_name2, ...]
// [RESTRICT] [CASCADE]
// and returns a DropView node in case of success. As per MySQL specification,
// RESTRICT and CASCADE, if given, are parsed and ignored.
func parseDropView(ctx *sql.Context, s string) (sql.Node, error) {
r := bufio.NewReader(strings.NewReader(s))

var (
views []QualifiedName
ifExists bool
unusedBool bool
)

err := parseFuncs{
expect("drop"),
skipSpaces,
expect("view"),
skipSpaces,
multiMaybe(&ifExists, "if", "exists"),
skipSpaces,
readQualifiedIdentifierList(&views),
skipSpaces,
maybe(&unusedBool, "restrict"),
skipSpaces,
maybe(&unusedBool, "cascade"),
checkEOF,
}.exec(r)

if err != nil {
return nil, err
}

if len(views) < 1 {
return nil, ErrViewsToDropNotFound.New()
}

plans := make([]sql.Node, len(views))
for i, view := range views {
plans[i] = plan.NewSingleDropView(sql.UnresolvedDatabase(view.qualifier), view.name)
}

return plan.NewDropView(plans, ifExists), nil
}
74 changes: 74 additions & 0 deletions sql/parse/views_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,77 @@ func TestParseCreateView(t *testing.T) {
})
}
}

func TestParseDropView(t *testing.T) {
var fixtures = map[string]sql.Node{
`DROP VIEW view1`: plan.NewDropView(
[]sql.Node{plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view1")},
false,
),
`DROP VIEW view1, view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view2"),
},
false,
),
`DROP VIEW db1.view1`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
},
false,
),
`DROP VIEW db1.view1, view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view2"),
},
false,
),
`DROP VIEW view1, db2.view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase(""), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase("db2"), "view2"),
},
false,
),
`DROP VIEW db1.view1, db2.view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase("db2"), "view2"),
},
false,
),
`DROP VIEW IF EXISTS myview`: plan.NewDropView(
[]sql.Node{plan.NewSingleDropView(sql.UnresolvedDatabase(""), "myview")},
true,
),
`DROP VIEW IF EXISTS db1.view1, db2.view2`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase("db2"), "view2"),
},
true,
),
`DROP VIEW IF EXISTS db1.view1, db2.view2 RESTRICT CASCADE`: plan.NewDropView(
[]sql.Node{
plan.NewSingleDropView(sql.UnresolvedDatabase("db1"), "view1"),
plan.NewSingleDropView(sql.UnresolvedDatabase("db2"), "view2"),
},
true,
),
}

for query, expectedPlan := range fixtures {
t.Run(query, func(t *testing.T) {
require := require.New(t)

ctx := sql.NewEmptyContext()
lowerquery := strings.ToLower(query)
result, err := parseDropView(ctx, lowerquery)

require.NoError(err)
require.Equal(expectedPlan, result)
})
}
}
Loading