How to connect to an Apache Cassandra cluster from Go using GoCQL driver

October 5, 2020

In this post we will be learning how easy it is to connect to Apache Cassandra from Go using the GOCQL driver implementation.
As an example we will use a users schema storing user info and having a login requirement using email and password.

Run Cassandra inside Docker

We will be using the official cassandra Docker image.

Create a Cassandra cluster locally:

# Network
docker network create -d bridge cassandra-network

# Node 1
docker run --name cassandra1 --network cassandra-network -d cassandra:3

# Node 2
docker run --name cassandra2 --network cassandra-network -e CASSANDRA_SEEDS=cassandra1 -d cassandra:3

Notice: Adding the second node may eat all your RAM!

View Cassandra logs:
docker logs -ft cassandra1

Create the data model

Connect to Cassandra from cqlsh (Cassandra Query Language Shell:
docker run -it --network cassandra-network --rm cassandra cqlsh cassandra1

The database server has no password setup so we can freely connect to it.

From inside a cqlsh prompt we can run the DESCRIBE keyspaces command to see available keyspaces/schemas.

By default we should only see the ones created by the system.

system_traces  system_schema  system_auth  system  system_distributed

Keyspace

Create test app keyspace:


-- single node cluster
CREATE KEYSPACE IF NOT EXISTS app WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};

or

-- multi node cluster
CREATE KEYSPACE IF NOT EXISTS app WITH REPLICATION = {'class': 'NetworkTopologyStrategy', 'datacenter1': 1};

Table model

For demonstration purposes we will be creating a users model for performing authentication and then looking up user information.


-- Query user by id: SELECT * FROM app.users WHERE userid = ?;
CREATE TABLE IF NOT EXISTS app.users (
    userid uuid PRIMARY KEY,
    firstname text, 
    lastname text,
    email text,
    active boolean,
    created_at timestamp
);

-- Query user by email: SELECT password_hash, password_salt, user_id FROM user_credentials WHERE email = ?;
CREATE TABLE IF NOT EXISTS app.user_credentials (
    email text PRIMARY KEY,
    password_hash text,
    password_salt text,
    userid uuid
);

-- Query by user's login time: SELECT email, time FROM app.user_login_history WHERE userid = ? ORDER BY time DESC;
CREATE TABLE IF NOT EXISTS app.user_login_history (
    userid uuid,
    email text,
    time timestamp,
    PRIMARY KEY (userid, time)
);

-- Check what the schema looks like after table creation
DESCRIBE app;

GoCQL

To connect from a Go program to Cassandra database we need a client library that has a driver which implements the communication protocol.

For this will use the gocql package, which “implements a fast and robust Cassandra client for the Go programming language”

Install

go mod init example.org/go-cassandra -- replace the module with your own project's path
go get -u github.com/gocql/gocql

Demo

The demo showcases how an application might use the schema defined above to perform the following use cases:

  • User logs into the website: Find a user by an email to validate that user exists and password is valid
  • After login, show information about the user: Find a user by id
  • Keep history of every sucessful login performed by the user
main package
package main

import (
	"fmt"
	"log"
	"math/rand"
	"time"

	"github.com/gocql/gocql"
	"gitlab.com/alibitek-go/go-cassandra/pkg/model"
	"gitlab.com/alibitek-go/go-cassandra/pkg/repository"
	"gitlab.com/alibitek-go/go-cassandra/pkg/security"
)

var testPassword = "topsecret"

func getDemoUser() *model.User {
	userID, err := gocql.RandomUUID()
	if err != nil {
		log.Fatal("Unable to generate v4 random UUID before inserting user", err)
	}

	passwordData, err := security.GetPasswordHashWithRandomSalt(testPassword)
	if err != nil {
		log.Fatal("Unable to generate password hash", err)
	}

	randomSource := rand.NewSource(time.Now().UnixNano())
	randomGenerator := rand.New(randomSource)
	userNumber := randomGenerator.Intn(100000)
	return &model.User{
		ID:           userID,
		FirstName:    fmt.Sprintf("First Name %d", userNumber),
		LastName:     fmt.Sprintf("Last Name %d", userNumber),
		Email:        fmt.Sprintf("mail%d@example.org", userNumber),
		Active:       true,
		CreatedAt:    time.Now().UTC(),
		PasswordHash: passwordData.Hash,
		PasswordSalt: passwordData.Salt,
	}
}

func manualDataBindingTest(session *gocql.Session) {
	userRepository := repository.NewGoCQLUserRepository(session)

	// Insert user
	insertedUser := getDemoUser()
	if err := userRepository.Insert(insertedUser); err != nil {
		log.Fatalf("Cannot insert user %s in database: %s", insertedUser, err)
	}
	log.Println("Inserted user:", insertedUser)

	// Get single user
	foundUser, err := userRepository.SearchUserByID(insertedUser.ID)
	if err != nil {
		log.Fatalf("Canot find user by ID: %s, reason: %s\n", insertedUser.ID, err)
	}
	if foundUser.ID.String() != "00000000-0000-0000-0000-000000000000" {
		log.Println("User found by id:", foundUser)
	} else {
		log.Println("Cannot find user by id:", insertedUser.ID)
	}

	// Get all users
	users, err := userRepository.GetAllUsers()
	if err != nil {
		log.Fatal("Cannot get all users from database", err)
	}
	log.Println("List of all users")
	for idx, user := range users {
		log.Printf("[%d] User: %s\n", idx, user)
	}

	// Login existing user
	loggedinUser, err := userRepository.Login(foundUser.Email, testPassword)
	if err != nil {
		log.Printf("Cannot login with existing user! Email: %s, Password: %s, Reason: %s\n", foundUser.Email, testPassword, err)
	} else {
		log.Printf("Successfully logged in with user: %s\n", loggedinUser)
	}

	// Login non-existing user
	nonUserEmail := "non-existing-user@example.org"
	_, err = userRepository.Login(nonUserEmail, testPassword)
	if err != nil {
		log.Printf("Login with non existing user failed! Email: %s, Password: %s, Reason: %s\n", nonUserEmail, testPassword, err)
	}
}

func getClusterConfig() *gocql.ClusterConfig {
	cluster := gocql.NewCluster("172.21.0.2")
	cluster.Keyspace = "app"
	cluster.Consistency = gocql.Quorum
	return cluster
}

func main() {
	cluster := getClusterConfig()

	session, err := cluster.CreateSession()
	if err != nil {
		log.Fatal("Unable to open up a session with the Cassandra database!", err)
	}
	defer session.Close()

	manualDataBindingTest(session)
}
model package
package model

import (
	"fmt"
	"time"

	"github.com/gocql/gocql"
)

// User represents the model for a user entity.
type User struct {
	ID           gocql.UUID
	FirstName    string
	LastName     string
	Email        string
	Active       bool
	CreatedAt    time.Time
	PasswordHash string
	PasswordSalt string
}

// String turns a user into a string.
func (u *User) String() string {
	return fmt.Sprintf("User{ID: %s, FirstName: %s, LastName: %s, Email: %s, PasswordHash: %s, Active: %t, Created At: %v}\n",
		u.ID.String(), u.FirstName, u.LastName, u.Email, u.PasswordHash, u.Active, u.CreatedAt)
}
repository package
package repository

import (
	"fmt"
	"log"
	"time"

	"github.com/gocql/gocql"
	"gitlab.com/alibitek-go/go-cassandra/pkg/model"
	"gitlab.com/alibitek-go/go-cassandra/pkg/security"
)

// GoCQLUserRepository illustrates working with GoCQL for inserting data and retrieving a single row and multiple rows of data
type GoCQLUserRepository interface {
	Insert(*model.User) error
	SearchUserByID(userID gocql.UUID) (*model.User, error)
	SearchUserByEmail(email string) (*model.User, error)
	GetAllUsers() ([]*model.User, error)
	Login(email, password string) (*model.User, error)
	AddLoginToHistory(userID gocql.UUID, email string) error
}

// NewGoCQLUserRepository create a manual repository
func NewGoCQLUserRepository(s *gocql.Session) GoCQLUserRepository {
	return &goCQLUserRepository{
		session: s,
	}
}

type goCQLUserRepository struct {
	session *gocql.Session
}

func (r *goCQLUserRepository) Insert(user *model.User) error {
	if err := r.session.Query(`INSERT INTO users (userId, firstname, lastname, email, active, created_at) VALUES (?, ?, ?, ?, ?, ?)`,
		user.ID, user.FirstName, user.LastName, user.Email, user.Active, user.CreatedAt).Exec(); err != nil {
		return err
	}

	if err := r.session.Query(`INSERT INTO user_credentials (email, password_hash, password_salt, userId) VALUES (?, ?, ?, ?)`,
		user.Email, user.PasswordHash, user.PasswordSalt, user.ID).Exec(); err != nil {
		return err
	}

	return nil
}

func (r *goCQLUserRepository) SearchUserByID(userID gocql.UUID) (*model.User, error) {
	user := &model.User{
		ID: userID,
	}

	log.Println("Searching user by id:", userID)
	if err := r.session.Query(`SELECT firstname, lastname, email, active, created_at FROM users WHERE userId = ? LIMIT 1`,
		userID).Consistency(gocql.One).Scan(&user.FirstName, &user.LastName, &user.Email, &user.Active, &user.CreatedAt); err != nil {
		return nil, err
	}

	return user, nil
}

func (r *goCQLUserRepository) SearchUserByEmail(email string) (*model.User, error) {
	user := &model.User{
		Email: email,
	}

	log.Println("Searching user by email:", email)
	if err := r.session.Query(`SELECT password_hash, password_salt, userId FROM user_credentials WHERE email = ? LIMIT 1`,
		email).Consistency(gocql.One).Scan(&user.PasswordHash, &user.PasswordSalt, &user.ID); err != nil {
		return nil, err
	}

	return user, nil
}

func (r *goCQLUserRepository) GetAllUsers() ([]*model.User, error) {
	var users []*model.User

	var userID gocql.UUID
	var firstName, lastName, email string
	var active bool
	var createdAt time.Time

	iterator := r.session.Query(`SELECT userid, firstname, lastname, email, active, created_at FROM users`).Iter()

	for {
		row := map[string]interface{}{
			"userid":     &userID,
			"firstname":  &firstName,
			"lastname":   &lastName,
			"email":      &email,
			"active":     &active,
			"created_at": &createdAt,
		}

		if !iterator.MapScan(row) {
			break
		}

		users = append(users, &model.User{
			ID:        userID,
			FirstName: firstName,
			LastName:  lastName,
			Email:     email,
			Active:    active,
			CreatedAt: createdAt,
		})
	}

	return users, nil
}

func (r *goCQLUserRepository) AddLoginToHistory(userID gocql.UUID, email string) error {
	if err := r.session.Query(`INSERT INTO user_login_history (userid, email, time) VALUES (?, ?, ?)`,
		userID, email, time.Now().UTC()).Exec(); err != nil {
		return err
	}

	return nil
}

func (r *goCQLUserRepository) Login(email, password string) (*model.User, error) {
	// Find user by email
	userByEmail, err := r.SearchUserByEmail(email)
	if err != nil {
		return nil, err
	}

	// Validate password
	valid, err := security.VerifyPassword(password, userByEmail.PasswordHash, userByEmail.PasswordSalt)
	if err != nil {
		return nil, err
	}
	if !valid {
		return nil, fmt.Errorf("Invalid password")
	}

	// Retrieve additional user info
	user, err := r.SearchUserByID(userByEmail.ID)
	if err != nil {
		return nil, err
	}

	user.PasswordHash = userByEmail.PasswordHash
	user.PasswordSalt = userByEmail.PasswordSalt

	// Track successful login
	if err := r.AddLoginToHistory(user.ID, email); err != nil {
		return nil, err
	}

	return user, nil
}
security package
package security

import (
	"crypto/sha256"
	"encoding/hex"
	"math/rand"
	"time"

	"golang.org/x/crypto/scrypt"
)

func getRandomSalt() ([]byte, error) {
	salt := make([]byte, 16)
	rs := rand.NewSource(time.Now().UnixNano())
	rg := rand.New(rs)
	_, err := rg.Read(salt)
	if err != nil {
		return nil, err
	}
	return salt, nil
}

// PasswordData encapsulates the generated hash and salt for a password.
type PasswordData struct {
	Hash string
	Salt string
}

// GetPasswordHashWithRandomSalt creates a hash of the password with random salt
func GetPasswordHashWithRandomSalt(password string) (*PasswordData, error) {
	passData := &PasswordData{}

	salt, err := getRandomSalt()
	if err != nil {
		return nil, err
	}
	key, err := scrypt.Key([]byte(password), salt, 32768, 8, 1, 16)
	if err != nil {
		return nil, err
	}

	hashDigest := sha256.New()
	_, err = hashDigest.Write(key)
	if err != nil {
		return nil, err
	}
	hash := hashDigest.Sum(nil)

	passData.Hash = hex.EncodeToString(hash)
	passData.Salt = hex.EncodeToString(salt)

	return passData, nil
}

// GetPasswordHashWithGivenSalt creates a hash of the password with given salt
func GetPasswordHashWithGivenSalt(password string, salt []byte) (*PasswordData, error) {
	passData := &PasswordData{}

	key, err := scrypt.Key([]byte(password), salt, 32768, 8, 1, 16)
	if err != nil {
		return nil, err
	}

	hashDigest := sha256.New()
	_, err = hashDigest.Write(key)
	if err != nil {
		return nil, err
	}
	hash := hashDigest.Sum(nil)

	passData.Hash = hex.EncodeToString(hash)
	passData.Salt = hex.EncodeToString(salt)

	return passData, nil
}

// VerifyPassword checks if the password's key matches the hash+salt
func VerifyPassword(password, hash, salt string) (bool, error) {
	saltBytes, err := hex.DecodeString(salt)
	if err != nil {
		return false, err
	}

	passwordData, err := GetPasswordHashWithGivenSalt(password, saltBytes)
	if err != nil {
		return false, err
	}

	if hash != passwordData.Hash {
		return false, nil
	}

	return true, nil
}

Conclusion

As we can see, it is quite straightforward to connect from Go to Apache Cassandra using GoCQL.

However, the GoCQL driver requires us to manually scan and bind every field we send or receive from the database,
which can be a hassle in the long run, where we might have many tables with many columns.

A better approach would be to have some kind of two-way binding mechanism between a Go struct and a Cassandra table,
such that we can INSERT a Go struct into a table and SELECT a row from a table into a Go struct.

We will explore how to do this in a future post.

© 2020 Alex Bitek. All rights reserved.
Handmade with ❤ in Transylvania.