Our BI data warehouse is a Snowflake database that ingests data via Fivetran. The setup log for both is contained herein. Our Snowflake host.
Fivetran
I followed this setup guide to configure our aurora cluster. This walks you through creating a fivetran database user with proper permissions and also enabling the replication binlog on the aurora cluster. To connect to our private databases, Fivetran uses an ssh haven machine as a tunnel. Here’s how that was provisioned:
In ec2, created a new t3.micro instance using the “Amazon Linux 2 AMI” in the SwipeSense VPC. Any public subnet is fine.
Since this is a haven instance from an outside entity, we also lock down its outgoing rules. This instance is only allowed to make outgoing SQL connections to servers in our VPC subnets:
Once the instance is up, ssh into the ec2-user.
During fivetran setup, they provide a public key that their ssh user will use to login. Using the console, create a new “fivetran” user. Add the provided key as an authorized key:
To allow access from this instance, we need another security group that will allow this instance to connect. I created a new sg, RDSAllowFivetran which looks like:
This group was applied to the database in question.
Snowflake
Instead of using fivetran for the s3 mesh events, we’ll be using snowflake snowpipe which reads incoming s3 records by subscribing a snowflake-owned sqs queue to a swipesense-owned sns channel which alerts when new bucket objects are created. I somewhat loosely followed this guide. However, I skipped all of Configuring Secure Access to Cloud Storage. Those steps set up a reusable “Cloud Storage Integration” which allows you to set up new snowflake S3 stages without providing IAM keys. Instead, I created a “snowflake” IAM user and granted that user S3:Get* and S3:List* to ingest-mesh-events-v3-production and ingest-mesh-events-v3-production/*. That policy is called “S3ReadMeshEvents”. I then used the provided IAM keys to create a snowpipe stage pointing to the bucket.
To comply with our SOC2 policy, IAM keys need to be rotated every 90 days. After you have created the new IAM key (but before you deactivate the old IAM key), in Snowflake navigate to the mesh events table - > stage -> mesh_events and click the three dots at the upper right -> edit. You can update the key there. After the key is updated in Snowflake, deactivate the old key.
Since our ingest bucket already has SNS notifications set up so that our microservices can subscribe, I followed Option 2 from the guide, which subscribes a snowflake owned SQS queue to an existing SNS channel.
Raw Events Table
After initial implementation, we realized that using a table with only a raw json column granted none of the performance improvements expected by snowflake. We refactored the pipe definition and table structure to break out all columns at ingest time:
-- New table definition
create or replace TABLE RAW_EVENTS cluster by (to_date(captured_at))(
JSON VARIANT NOT NULL,
SWIPESENSED_VERSION VARCHAR(16777216),
AUTH_TOKEN VARCHAR(16777216),
EVENT_ID VARCHAR(16777216),
EVENT_TYPE VARCHAR(16777216),
EVENT_TYPE_VERSION VARCHAR(16777216),
COMM_HUB_HWID VARCHAR(16777216),
MEASURING_DEVICE_HWID VARCHAR(16777216),
DEVICE_HWID VARCHAR(16777216),
DEVICE_TYPE NUMBER(38,0),
READ_COUNT NUMBER(38,0),
WRITE_COUNT NUMBER(38,0),
DROPPED_PACKETS NUMBER(38,0),
AVERAGE_RSSI NUMBER(38,0),
UPDATE_TYPE NUMBER(38,0),
FIRMWARE_VERSION NUMBER(38,0),
UPLINK_QUEUE_SIZE NUMBER(38,0),
DOWNLINK_QUEUE_SIZE NUMBER(38,0),
PEAK_QUEUE_SIZE NUMBER(38,0),
MISC NUMBER(38,0),
LOCATION_HUB_HWID VARCHAR(16777216),
BADGE_HWID VARCHAR(16777216),
LOCATION_HUB_FIRMWARE_VERSION NUMBER(38,0),
BADGE_VOLTAGE NUMBER(38,0),
BADGE_FIRMWARE_VERSION NUMBER(38,0),
HYGIENE_SENSOR_CONFIG_HASH NUMBER(38,0),
DURATION NUMBER(38,0),
RSSI_SAMPLES ARRAY,
ACC_SAMPLES ARRAY,
ACC_STATUS NUMBER(38,0),
BADGE_ASLEEP BOOLEAN,
COMM_HUB_FIRMWARE_VERSION NUMBER(38,0),
CONNECTED_DEVICES_COUNT NUMBER(38,0),
RADIO_FREQUENCY NUMBER(38,0),
LOCAL_IP_ADDRESS VARCHAR(16777216),
BADGE_TYPE NUMBER(38,0),
STEP_SIZE NUMBER(38,0),
HYGIENE_SENSOR_HWID VARCHAR(16777216),
FIRST_BADGE_HWID VARCHAR(16777216),
FIRST_BADGE_RSSI NUMBER(38,0),
FIRST_BADGE_ACC NUMBER(38,0),
FIRST_BADGE_FIRMWARE_VERSION NUMBER(38,0),
FIRST_BADGE_ACC_STATUS NUMBER(38,0),
FIRST_BADGE_ASLEEP BOOLEAN,
SECOND_BADGE_HWID VARCHAR(16777216),
SECOND_BADGE_RSSI NUMBER(38,0),
SECOND_BADGE_ACC NUMBER(38,0),
SECOND_BADGE_FIRMWARE_VERSION NUMBER(38,0),
SECOND_BADGE_ACC_STATUS NUMBER(38,0),
SECOND_BADGE_ASLEEP BOOLEAN,
THIRD_BADGE_HWID VARCHAR(16777216),
THIRD_BADGE_RSSI NUMBER(38,0),
THIRD_BADGE_ACC NUMBER(38,0),
THIRD_BADGE_FIRMWARE_VERSION NUMBER(38,0),
THIRD_BADGE_ACC_STATUS NUMBER(38,0),
THIRD_BADGE_ASLEEP BOOLEAN,
FOURTH_BADGE_HWID VARCHAR(16777216),
FOURTH_BADGE_RSSI NUMBER(38,0),
FOURTH_BADGE_ACC NUMBER(38,0),
FOURTH_BADGE_FIRMWARE_VERSION NUMBER(38,0),
FOURTH_BADGE_ACC_STATUS NUMBER(38,0),
FOURTH_BADGE_ASLEEP BOOLEAN,
HYGIENE_SENSOR_FIRMWARE_VERSION NUMBER(38,0),
HYGIENE_SENSOR_VOLTAGE NUMBER(38,0),
HYGIENE_SENSOR_ACC NUMBER(38,0),
NUMBER_OF_DEVICES NUMBER(38,0),
FIRST_DEVICE_HWID VARCHAR(16777216),
FIRST_DEVICE_TYPE NUMBER(38,0),
FIRST_DEVICE_DURATION NUMBER(38,0),
FIRST_DEVICE_MAX_RSSI NUMBER(38,0),
FIRST_DEVICE_MEAN_RSSI FLOAT,
FIRST_DEVICE_STD_DEV_RSSI FLOAT,
SECOND_DEVICE_HWID VARCHAR(16777216),
SECOND_DEVICE_TYPE NUMBER(38,0),
SECOND_DEVICE_DURATION NUMBER(38,0),
SECOND_DEVICE_MAX_RSSI NUMBER(38,0),
SECOND_DEVICE_MEAN_RSSI FLOAT,
SECOND_DEVICE_STD_DEV_RSSI FLOAT,
THIRD_DEVICE_HWID VARCHAR(16777216),
THIRD_DEVICE_TYPE NUMBER(38,0),
THIRD_DEVICE_DURATION NUMBER(38,0),
THIRD_DEVICE_MAX_RSSI NUMBER(38,0),
THIRD_DEVICE_MEAN_RSSI FLOAT,
THIRD_DEVICE_STD_DEV_RSSI FLOAT,
FOURTH_DEVICE_HWID VARCHAR(16777216),
FOURTH_DEVICE_TYPE NUMBER(38,0),
FOURTH_DEVICE_DURATION NUMBER(38,0),
FOURTH_DEVICE_MAX_RSSI NUMBER(38,0),
FOURTH_DEVICE_MEAN_RSSI FLOAT,
FOURTH_DEVICE_STD_DEV_RSSI FLOAT,
FIFTH_DEVICE_HWID VARCHAR(16777216),
FIFTH_DEVICE_TYPE NUMBER(38,0),
FIFTH_DEVICE_DURATION NUMBER(38,0),
FIFTH_DEVICE_MAX_RSSI NUMBER(38,0),
FIFTH_DEVICE_MEAN_RSSI FLOAT,
FIFTH_DEVICE_STD_DEV_RSSI FLOAT,
SIXTH_DEVICE_HWID VARCHAR(16777216),
SIXTH_DEVICE_TYPE NUMBER(38,0),
SIXTH_DEVICE_DURATION NUMBER(38,0),
SIXTH_DEVICE_MAX_RSSI NUMBER(38,0),
SIXTH_DEVICE_MEAN_RSSI FLOAT,
SIXTH_DEVICE_STD_DEV_RSSI FLOAT,
PARENT_DEVICE_HWID VARCHAR(16777216),
RSSI NUMBER(38,0),
STATUS NUMBER(38,0),
TAG_FIRMWARE_VERSION NUMBER(38,0),
ACCELEROMETER_STATUS NUMBER(38,0),
READS NUMBER(38,0),
WRITES NUMBER(38,0),
CAPTURED_AT TIMESTAMP_LTZ(9),
COLLECTED_AT TIMESTAMP_LTZ(9)
);
-- Recreate the pipe definition so all rows are extracted into their proper columns
CREATE OR replace pipe "DATA_WAREHOUSE"."MESH_EVENTS"."MESH_EVENTS"
auto_ingest = TRUE
aws_sns_topic = 'arn:aws:sns:us-east-1:825374148190:ingest-mesh-events-v3-production'
AS copy INTO "DATA_WAREHOUSE"."MESH_EVENTS"."RAW_EVENTS"
FROM
(
SELECT
$1 AS json,
$1:swipesensed_version::string AS swipesensed_version,
$1:auth_token::string AS auth_token,
$1:event_id::string AS event_id,
$1:event_type::string AS event_type,
$1:event_type_version::string AS event_type_version,
$1:comm_hub_hwid::string AS comm_hub_hwid,
$1:measuring_device_hwid::string AS measuring_device_hwid,
$1:device_hwid::string AS device_hwid,
$1:device_type::int AS device_type,
$1:read_count::int AS read_count,
$1:write_count::int AS write_count,
$1:dropped_packets::int AS dropped_packets,
$1:average_rssi::int AS average_rssi,
$1:update_type::int AS update_type,
$1:firmware_version::int AS firmware_version,
$1:uplink_queue_size::int AS uplink_queue_size,
$1:downlink_queue_size::int AS downlink_queue_size,
$1:peak_queue_size::int AS peak_queue_size,
$1:misc::bigint AS misc,
$1:location_hub_hwid::string AS location_hub_hwid,
$1:badge_hwid::string AS badge_hwid,
$1:location_hub_firmware_version::int AS location_hub_firmware_version,
$1:badge_voltage::int AS badge_voltage,
$1:badge_firmware_version::int AS badge_firmware_version,
$1:hygiene_sensor_config_hash::int AS hygiene_sensor_config_hash,
$1:duration::int AS duration,
$1:rssi_samples::array AS rssi_samples,
$1:acc_samples::array AS acc_samples,
$1:acc_status::int AS acc_status,
$1:badge_asleep::boolean AS badge_asleep,
$1:comm_hub_firmware_version::int AS comm_hub_firmware_version,
$1:connected_devices_count::int AS connected_devices_count,
$1:radio_frequency::int AS radio_frequency,
$1:local_ip_address::string AS local_ip_address,
$1:badge_type::int AS badge_type,
$1:step_size::int AS step_size,
$1:hygiene_sensor_hwid::string AS hygiene_sensor_hwid,
$1:first_badge_hwid::string AS first_badge_hwid,
$1:first_badge_rssi::int AS first_badge_rssi,
$1:first_badge_acc::int AS first_badge_acc,
$1:first_badge_firmware_version::int AS first_badge_firmware_version,
$1:first_badge_acc_status::int AS first_badge_acc_status,
$1:first_badge_asleep::boolean AS first_badge_asleep,
$1:second_badge_hwid::string AS second_badge_hwid,
$1:second_badge_rssi::int AS second_badge_rssi,
$1:second_badge_acc::int AS second_badge_acc,
$1:second_badge_firmware_version::int AS second_badge_firmware_version,
$1:second_badge_acc_status::int AS second_badge_acc_status,
$1:second_badge_asleep::boolean AS second_badge_asleep,
$1:third_badge_hwid::string AS third_badge_hwid,
$1:third_badge_rssi::int AS third_badge_rssi,
$1:third_badge_acc::int AS third_badge_acc,
$1:third_badge_firmware_version::int AS third_badge_firmware_version,
$1:third_badge_acc_status::int AS third_badge_acc_status,
$1:third_badge_asleep::boolean AS third_badge_asleep,
$1:fourth_badge_hwid::string AS fourth_badge_hwid,
$1:fourth_badge_rssi::int AS fourth_badge_rssi,
$1:fourth_badge_acc::int AS fourth_badge_acc,
$1:fourth_badge_firmware_version::int AS fourth_badge_firmware_version,
$1:fourth_badge_acc_status::int AS fourth_badge_acc_status,
$1:fourth_badge_asleep::boolean AS fourth_badge_asleep,
$1:hygiene_sensor_firmware_version::int AS hygiene_sensor_firmware_version,
$1:hygiene_sensor_voltage::int AS hygiene_sensor_voltage,
$1:hygiene_sensor_acc::int AS hygiene_sensor_acc,
$1:number_of_devices::int AS number_of_devices,
$1:first_device_hwid::string AS first_device_hwid,
$1:first_device_type::int AS first_device_type,
$1:first_device_duration::int AS first_device_duration,
$1:first_device_max_rssi::int AS first_device_max_rssi,
$1:first_device_mean_rssi::double AS first_device_mean_rssi,
$1:first_device_std_dev_rssi::double AS first_device_std_dev_rssi,
$1:second_device_hwid::string AS second_device_hwid,
$1:second_device_type::int AS second_device_type,
$1:second_device_duration::int AS second_device_duration,
$1:second_device_max_rssi::int AS second_device_max_rssi,
$1:second_device_mean_rssi::double AS second_device_mean_rssi,
$1:second_device_std_dev_rssi::double AS second_device_std_dev_rssi,
$1:third_device_hwid::string AS third_device_hwid,
$1:third_device_type::int AS third_device_type,
$1:third_device_duration::int AS third_device_duration,
$1:third_device_max_rssi::int AS third_device_max_rssi,
$1:third_device_mean_rssi::double AS third_device_mean_rssi,
$1:third_device_std_dev_rssi::double AS third_device_std_dev_rssi,
$1:fourth_device_hwid::string AS fourth_device_hwid,
$1:fourth_device_type::int AS fourth_device_type,
$1:fourth_device_duration::int AS fourth_device_duration,
$1:fourth_device_max_rssi::int AS fourth_device_max_rssi,
$1:fourth_device_mean_rssi::double AS fourth_device_mean_rssi,
$1:fourth_device_std_dev_rssi::double AS fourth_device_std_dev_rssi,
$1:fifth_device_hwid::string AS fifth_device_hwid,
$1:fifth_device_type::int AS fifth_device_type,
$1:fifth_device_duration::int AS fifth_device_duration,
$1:fifth_device_max_rssi::int AS fifth_device_max_rssi,
$1:fifth_device_mean_rssi::double AS fifth_device_mean_rssi,
$1:fifth_device_std_dev_rssi::double AS fifth_device_std_dev_rssi,
$1:sixth_device_hwid::string AS sixth_device_hwid,
$1:sixth_device_type::int AS sixth_device_type,
$1:sixth_device_duration::int AS sixth_device_duration,
$1:sixth_device_max_rssi::int AS sixth_device_max_rssi,
$1:sixth_device_mean_rssi::double AS sixth_device_mean_rssi,
$1:sixth_device_std_dev_rssi::double AS sixth_device_std_dev_rssi,
$1:parent_device_hwid::string AS parent_device_hwid,
$1:rssi::int AS rssi,
$1:status::int AS status,
$1:tag_firmware_version::int AS tag_firmware_version,
$1:accelerometer_status::int AS accelerometer_status,
$1:reads::int AS "reads",
$1:writes::int AS writes,
$1:captured_at::string AS captured_at,
$1:collected_at::string AS collected_at
FROM
@"DATA_WAREHOUSE"."MESH_EVENTS"."MESH_EVENTS");
Looker Setup
Followed this article. Initial setup script run in snowflake:
-- change role to ACCOUNTADMIN use role ACCOUNTADMIN; -- create role for looker create role if not exists looker_role; grant role looker_role to role SYSADMIN; -- Note that we are not making the looker_role a SYSADMIN, -- but rather granting users with the SYSADMIN role to modify the looker_role -- create a user for looker create user if not exists looker_user password = ''; grant role looker_role to user looker_user; alter user looker_user set default_role = looker_role default_warehouse = 'looker_wh'; -- change role use role SYSADMIN; -- create a warehouse for looker (optional) create warehouse if not exists looker_wh -- set the size based on your dataset warehouse_size = small warehouse_type = standard auto_suspend = 300 auto_resume = true initially_suspended = true; grant all privileges on warehouse looker_wh to role looker_role; -- grant read only database access (repeat for all database/schemas) grant usage on database DATA_WAREHOUSE to role looker_role; grant usage on all schemas in database DATA_WAREHOUSE to role looker_role; -- rerun the following any time a table is added to the schema grant select on all tables in schema DATA_WAREHOUSE.ADMIN_OLTP to role looker_role; grant select on DATA_WAREHOUSE.MESH_EVENTS.RAW_EVENTS to role looker_role; -- or grant select on future tables in schema DATA_WAREHOUSE.ADMIN_OLTP to role looker_role; -- create schema for looker to write back to use database DATA_WAREHOUSE; create schema if not exists looker_scratch; use role ACCOUNTADMIN; grant ownership on schema looker_scratch to role SYSADMIN revoke current grants; grant all on schema looker_scratch to role looker_role;
The default lookML project is called “swipesense” and is stored at this git repo: https://github.com/swipesense/looker
I also set up the Snowflake Cost & Usage looker dashboard which requires the following permissions:
grant imported privileges on database SNOWFLAKE to role looker_role; grant usage on schema SNOWFLAKE.ACCOUNT_USAGE to role looker_role; grant select on all tables in schema SNOWFLAKE.ACCOUNT_USAGE to role looker_role;