How can I store all queries submitted to presto cluster in a file (ORC file) or may be some other database. Purpose is the keep the record of all queries executed on presto workers.
I am aware that I need to overwrite queryCompleted method, I have also tried to follow this and other link mentioned over there but I am unable to create correct jar using maven. After placing the presto jar file generated by maven, my presto stopped working.
I am new to presto as well as in maven. It would be great if someone can help me with this.
This is my way, and It works on EMR5.9 (presto 0.184).
Firstly, as you already know, you can use event-listener. In my case, I use https://github.com/wyukawa/presto-fluentd for collecting query logs because fluentd is convenient.(easy to retry, easy to send to multiple data store) if you want to create new event-listener plugin, also you can reference this because it's very simple. (or https://github.com/zz22394/presto-audit can also use for it)
Next, you have to install event-listener plugin.
If you use EMR, you can use this script for installing presto-fluentd
on bootstrap actions
# cf. https://github.com/mozilla/emr-bootstrap-presto/blob/master/files/bootstrap/presto-plugins.sh
#!/bin/bash
set -exo pipefail
# re-exec with sudo into background
if [ $(whoami) != root ]; then
sudo "$0" "$@" &
exit 0
fi
# set variables
s3uri=$1
fluentd_endpoint=$2
# wait until presto is installed and running
until test -s /var/run/presto/presto-server.pid; do sleep 1; done
# make symbolic link
sudo mkdir -p /usr/lib/presto/etc 2>/dev/null
sudo ln -s /usr/lib/presto/etc /mnt/var/lib/presto/data
# download presto plugins
aws s3 sync $s3uri/jar/ /usr/lib/presto/plugin/
aws s3 sync $s3uri/properties /usr/lib/presto/etc/
# make sure all plugins are owned by presto user
chown -R presto:presto /usr/lib/presto/plugin
chown -R presto:presto /usr/lib/presto/etc
# set event-listner.properties endpoint parameter
echo "event-listener.fluentd-host=$fluentd_endpoint" >>
/usr/lib/presto/etc/event-listener.properties
# restart presto
stop presto-server
start presto-server
event-listener.properties:
event-listener.name=presto-fluentd
event-listener.fluentd-port=24224
event-listener.fluentd-tag=presto.query
inside s3 dir:
$ aws s3 ls s3://<s3 bucket>/emr/bootstrap_actions/plugins/jar/presto-fluentd/
2017-10-30 19:12:59 90318 fluency-1.3.0.jar
2017-10-30 19:12:59 2521113 guava-21.0.jar
2017-10-30 19:12:59 55783 jackson-annotations-2.8.1.jar
2017-10-30 19:12:59 252303 jackson-core-2.7.1.jar
2017-10-30 19:12:59 1199160 jackson-databind-2.7.1.jar
2017-10-30 19:12:59 30488 jackson-dataformat-msgpack-0.8.12.jar
2017-10-30 19:12:59 3907 log-0.148.jar
2017-10-30 19:12:59 116125 msgpack-core-0.8.12.jar
2017-10-30 19:12:59 5509 phi-accural-failure-detector-0.0.4.jar
2017-10-30 19:12:59 6130 presto-fluentd-0.0.1.jar
2017-10-30 19:12:59 41077 slf4j-api-1.7.22.jar
$ aws s3 ls s3://<s3 bucket>/emr/bootstrap_actions/plugins/properties/
2017-10-30 19:12:59 109 event-listener.properties
and just receive query logs by fluentd working on another host like below
<match presto.query>
@type copy
<store>
# another data store
</store>
<store>
@type relabel
@label @presto-query-storage
</store>
</match>
# In my case, I use bigquery for storing query log
<label @presto-query-storage>
<match **>
@label @presto-bigquery-out
@type record_reformer
renew_record true
tag presto.query_storage.big_query
<record>
query_id ${record["queryId"]}
user_name ${record["user"]}
elapsed_time ${(record["endTime"] - record["createTime"]) / 1000.0}
start_at
${Time.at(record["executionStartTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S.%3N")}
end_at ${Time.at(record["endTime"]/1000).utc.strftime("%Y-%m-%d %H:%M:%S")}
query ${record["query"]}
status ${record["state"]}
</record>
</match>
</label>
I use this script for collecting dependencies of presto-fluentd.
require 'fileutils'
require 'open3'
include FileUtils
TMP_PATH = File.expand_path('../../tmp', __FILE__)
JAR_PATH = File.expand_path('../bootstrap_actions/plugins/jar', __FILE__)
CLONE_URI = 'https://github.com/wyukawa/presto-fluentd'
NEEDED_JAR = %w(
fluency-1.3.0.jar
guava-21.0.jar
jackson-annotations-2.8.1.jar
jackson-core-2.7.1.jar
jackson-databind-2.7.1.jar
jackson-dataformat-msgpack-0.8.12.jar
log-0.148.jar
msgpack-core-0.8.12.jar
phi-accural-failure-detector-0.0.4.jar
presto-fluentd-0.0.1.jar
slf4j-api-1.7.22.jar
)
def cleanup_dir
puts "Clean up #{TMP_PATH}/presto-fluentd ..."
rm_r(Dir.glob("#{TMP_PATH}/presto-fluentd"))
mkdir_p("#{JAR_PATH}/presto-fluentd")
puts "Clean up #{JAR_PATH}/presto-fluentd ..."
rm(Dir.glob("#{JAR_PATH}/presto-fluentd/*.jar"))
end
def clone
cd(TMP_PATH)
puts "Download presto-fluentd repo ..."
out, err, status = Open3.capture2("git clone #{CLONE_URI} #{TMP_PATH}/presto-fluentd")
puts out
end
def mvn
cd("#{TMP_PATH}/presto-fluentd")
puts "Build presto-fluentd ..."
out, err, status = Open3.capture2("mvn clean package")
puts out
out, err, status = Open3.capture2("mvn dependency:copy-dependencies -DoutputDirectory=target -DincludeScope=runtime")
puts out
end
def copy_dependencies
cd("#{TMP_PATH}/presto-fluentd/target")
puts "Copy jar files to #{JAR_PATH} ..."
# FIXME: it's better to fix actual pom.xml for assign scope
mv(Dir.glob("*.jar").select{|file| NEEDED_JAR.include?(file)}, "#{JAR_PATH}/presto-fluentd")
puts "done !!"
end
cleanup_dir
clone
mvn
copy_dependencies
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With