Cassandra Data Model – ALLOW FILTERING vs NOT

cassandranosqlschema

I have a toy Cassandra cluster running on some RaspberryPis at home. I am currently logging CryptoCoin data to it in hopes of learning more about Cassandra and as well as some other things along the way.

My question here today is to find out if I am structuring my schema correctly on this one table.

The table doesn't have many fields, the primary keys are the name field and the timestamp field. I want to query the last N hours of data (data is logged every minute) from all coins. If I use a simple WHERE clause I get the 'ALLOW FILTERING' warning. I understand why it happens but am struggling to understand the correct path forward to ensure a scale-able solution. Right now the table only has about 320k records and I can use ALLOW FILTERING with no problem, but I realize this might not always be the case.

I setup a test to see how long it took to run two different query methods. The ALLOW FILTERING method currently is the fastest, but is it likely to stay that way? This is where I am deficient in knowledge.

I had an idea to add another field that would be the day of the week, and maybe a month field as well. The thought was this might allow for more filtering in a query so I don't have to iterate through all the coins like I am doing below, but I don't know if this is a good idea or not. If I do this, do I make them a PrimaryKey or not? Think this is where I am most confused with Cassandra, but not entirely; maybe just enough to be unconfident.

CQL Table Description:

CREATE TABLE cryptocoindb.worldcoinindex (
    name text,
    timestamp int,
    label text,
    price_btc double,
    price_cny double,
    price_eur double,
    price_gbp double,
    price_rur double,
    price_usd double,
    volume_24h double,
    PRIMARY KEY (name, timestamp)
) WITH CLUSTERING ORDER BY (timestamp ASC)
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

Code in Python:

# First method using ALLOW FILTERING:
startTime = time.time()
oneDaySec = 60*60*24
prior24hr = int(time.time()-oneDaySec)

query = "SELECT * FROM {}.{} WHERE timestamp > {} ALLOW FILTERING;".format(CASSANDRA_DB, CASSANDRA_TABLE, prior24hr)

rslt = session.execute(query, timeout=None)
worldcoinindex = rslt._current_rows
elapseTime = time.time()-startTime

print("Elapsed Time for this method: {}".format(elapseTime))

Elapsed Time for this method: 0.6223547458648682

# Second method using multiple queries...

startTime = time.time()

# I get the unique coin names here.
qryGetCoinList = "SELECT DISTINCT name FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)
rslt = session.execute(qryGetCoinList, timeout=None)
rsltGetCoinList = rslt._current_rows
rsltGetCoinList = rsltGetCoinList.name.tolist()

oneDaySec = 60*60*24
prior24hr = int(time.time()-oneDaySec)

# This iterates over the unique coin names and queries 
# the last 24 hrs worth of data per coin.
# NOTE: There are 518 unique coins.  

rsltTodayPrices = pd.DataFrame()
for coin in rsltGetCoinList:

    qryTodayPrices = """
                    SELECT * FROM {}.{} 
                    WHERE name = '{}' AND timestamp > {};
                    """.format(CASSANDRA_DB, 
                               CASSANDRA_TABLE, 
                               coin, 
                               prior24hr)
    rslt = session.execute(qryTodayPrices, timeout=None)
    TodayPrices = rslt._current_rows
    rsltTodayPrices.append(TodayPrices)

elapseTime = time.time()-startTime
print("Elapsed Time for this method: {}".format(elapseTime))

Elapsed Time for this method: 1.4576539993286133

Thank you!

Best Answer

Right now the table only has about 320k records and I can use ALLOW FILTERING with no problem, but I realize this might not always be the case.

So here's the thing: Cassandra is very good at querying data by a specific key. It is also good at retrieving a range of data within a partition.

"SELECT * FROM {}.{} WHERE timestamp > {} ALLOW FILTERING;"

But due to its distributed nature, it is not good at scanning an entire table to compile a result set. And that's what you are asking it to do with the above query.

Network traffic is expensive. So the main goal with Cassandra, is to ensure that your query is served by a single node. When using ALLOW FILTERING without specifying your partition key (name) causes your query to require a coordinator node, and check each node in your cluster for values that may match your WHERE clause.

Essentially, the more nodes that are in your cluster, the more detrimental ALLOW FILTERING becomes to performance (unless you specify at least your partition key...only then you are guaranteeing that your query can be served by a single node). Note, that your slower query actually does this right, and solves that problem for you.

I had an idea to add another field that would be the day of the week, and maybe a month field as well.

And this is a good idea!

It solves two problems.

  1. It ensures that your query will be served by a single node.
  2. It protects your partitions from getting too big.

Cassandra has a limit of 2 billion cells per partition. As your partition key is "name" and you keep adding unique timestamps inside it, you will progress toward that limit until you either reach it, or your partition becomes too big to use (probably the latter).

Here is how I would solve this:

CREATE TABLE cryptocoindb.worldcoinindex_byday (
    daybucket text,
    name text,
    datetime timestamp,
    label text,
    price_btc double,
    price_cny double,
    price_eur double,
    price_gbp double,
    price_rur double,
    price_usd double,
    volume_24h double,
    PRIMARY KEY (daybucket, datetime, name)
) WITH CLUSTERING ORDER BY (datetime DESC, name ASC);

Now you could query like this:

SELECT * FROM cryptocoindb.worldcoinindex
WHERE daybucket='20170825' AND datetime > '2017-08-25 17:20';

Additionally, by clustering your rows on "datetime" descending, you are ensuring that the most-recent data is at the top of each cell (giving Cassandra less to have to parse through).

I moved "name" to be the last clustering column, just to maintain uniqueness. If you're never going to query by "name," then it doesn't make sense to use it as your partition key.

Hope this helps.

Note: I changed your timestamp int to datetime timestamp because it added clarity to the example. You can use whatever works for you, but just be careful of confusion arising from naming a column after a data type.

Edit 20170826

Is the following the same as your code or different? PRIMARY KEY ((daybucket, datetime), name)

No, that's not the same. That's using something called a composite partition key. It will give you better data distribution in your cluster, but it will make querying harder for you, and basically set you back to doing table scans.

For a good, comprehensive description of Cassandra primary keys, Carlo Bertuccini has great answer on StackOverflow:

https://stackoverflow.com/questions/24949676/difference-between-partition-key-composite-key-and-clustering-key-in-cassandra/24953331#24953331

Is there a way to alter the way Cassandra reads timestamps or an easy way to make changes to that whole datafield to alter the timestamp so it will be correctly read?

Not really. Cassandra timestamps can be tricky to work with. They store with millisecond precision, but don't actually show that full precision when queried. Also, as of one of the 2.1 patches, it automatically displays time in GMT; so that can be confusing to people as well. If your way of managing timestamps on the application side is working for you, just stick with that.

Related Question