Skip to content

Redshift

Delta to Redshift

Delta and Redshift

Database

sql
create database <SCHEMA_NAME> with owner <USER_NAME>;

Schema

sql
create schema <SCHEMA_NAME> authorization <USER_NAME>;
grant all on schema <SCHEMA_NAME> to <USER_NAME>;

Table

WIP

Work in progress

Users

List

sql
select * from pg_user;

Superuser

sql
ALTER USER "user_name" WITH CREATEUSER

Groups

Create

sql
create group <GROUP_NAME> with user <USER_NAME1>, <USER_NAME2>;

Add user to group

sql
alter group <GROUP_NAME> add user <USER_NAME>;

Remove users from group

sql
alter group <GROUP_NAME> drop user <USER_NAME1>, <USER_NAME2>;

Delete group

sql
drop group <GROUP_NAME>;

TIP

If the group has privileges for an object, first revoke the privileges before dropping the group. The following example revokes all privileges on all tables in the public schema from the GUEST user group, and then drops the group.

sql
revoke all on all tables in schema public from group guest;
drop group guests;

Parquet to Redshift

python
    def spark_to_redshift_type(self, spark_type:str, precision:Union[str,None]):
        """
        - Convert spark type to redshift type
        """

        mapping = {
            "integer": "INTEGER",
            "long": "BIGINT",
            "float": "REAL",
            "double": "DOUBLE PRECISION",
            "decimal": f"DECIMAL{precision}",
            "string ": "VARCHAR",
            "timestamp": "TIMESTAMP",
            "date": "DATE",
        }

        return mapping.get(spark_type, "VARCHAR")

    def generate_redshift_create_table(self, df: DataFrame, table_name: str) -> str:
        schema = df.schema
        columns = []
        for field in schema.fields:

            if isinstance(field.dataType, DecimalType):
                precision = f"({field.dataType.precision},{field.dataType.scale})"
            else:
                precision = None

            redshift_type = self.spark_to_redshift_type(field.dataType.typeName(), precision)
            columns.append(f"{field.name} {redshift_type}")
        columns_str = ", ".join(columns)
        create_table_sql = f"CREATE TABLE IF NOT EXISTS {table_name} ({columns_str});"

        logging.warning(f"create_table_sql: {create_table_sql}")
        return create_table_sql

    def execute_redshift_query(self, query: str, redshift_credentials: dict):
        conn = psycopg2.connect(
            dbname=redshift_credentials["dbname"],
            user=redshift_credentials["user"],
            password=redshift_credentials["password"],
            host=redshift_credentials["host"],
            port=redshift_credentials["port"],
        )
        with conn.cursor() as cursor:
            cursor.execute(query)
        conn.commit()
        conn.close()

And just use it like:

python
schema_table = schema + "." + table_name
create_table_sql = self.generate_redshift_create_table(df, schema_table)
self.execute_redshift_query(create_table_sql, self.redshift_credentials)

Errors on copy

If you get errors like this:

sql
error:  Spectrum Scan Error
  code:      15007
  context:   File 'https://... parquet ... c000.snappy.parquet' has an incompatible Parquet schema for column 's3://'
  query:     43966693
  location:  dory_util.cpp:1509
  process:   worker_thread [pid=13526]

You can query the details on:

sql
select message
  from SVL_S3LOG
 where query = 43966714
 order by query,segment,slice;

Feel free to use any content here.