So after doing extensive research and also opening a case with AWS support, they told me it is not possible from Python shell or Glue pyspark job at this moment. But I just tried something creative and it worked! The idea is to use py4j that sparks relies on already and utilize standard java sql package.
Two huge benefits of this approach:
A huge benefit of this that you can define your database connection as Glue data connection and keep jdbc details and credentials in there without hardcoding them in the Glue code. My example below does that by calling glueContext.extract_jdbc_conf('your_glue_data_connection_name')
to get jdbc url and credentials, defined in Glue.
If you need to run SQL commands on a supported out of the box Glue database, you don't even need to use/pass jdbc driver for that database - just make sure you set up Glue connection for that database and add that connection to your Glue job - Glue will upload proper database driver jars.
Remember this code below is executed by a driver process and cannot be executed by Spark workers/executors.
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
logger = glueContext.get_logger()
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# dw-poc-dev spark test
source_jdbc_conf = glueContext.extract_jdbc_conf('your_glue_database_connection_name')
from py4j.java_gateway import java_import
java_import(sc._gateway.jvm,"java.sql.Connection")
java_import(sc._gateway.jvm,"java.sql.DatabaseMetaData")
java_import(sc._gateway.jvm,"java.sql.DriverManager")
java_import(sc._gateway.jvm,"java.sql.SQLException")
conn = sc._gateway.jvm.DriverManager.getConnection(source_jdbc_conf.get('url'), source_jdbc_conf.get('user'), source_jdbc_conf.get('password'))
print(conn.getMetaData().getDatabaseProductName())
# call stored procedure, in this case I call sp_start_job
cstmt = conn.prepareCall("{call dbo.sp_start_job(?)}");
cstmt.setString("job_name", "testjob");
results = cstmt.execute();
conn.close()
与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…