Table of Contents
実現したいこと
いつシグナルでたかを確認したいのでPythonでMT5の情報を取得して自由に使いたい。
- リアルタイムでMT5から価格データを読み取りPythonで可視化
- テクニカル指標を計算してPythonで可視化したい
作るファイルは4つです。
- mt5_price_set_DB.py
- technical_set_DB.py
- app.py
- main.py
データの工場をつくるイメージでデータは流れ作業のように下流工程へ渡ります。
データを取得する工程では
- 起動時にMT5の過去足1000本を取得してデータベースへ保存します。
- データベースへ保存したらそのあとは1秒間隔で最新の価格を取得し続けます。
テクニカル指標を計算する工程では
- 起動時に過去1000本のテクニカル指標を計算して価格データと同じ時系列のテーブルに保存されます。
- データベースへ保存したあとは1秒間隔で最新のテクニカル指標を計算し続けます。
データを表示する工程では
- データベースへ保存された値を1秒間隔で取得して表示します。
mt5_price_set_DB.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
import MetaTrader5 as mt5 import sqlite3 from datetime import datetime from time import sleep from typing import Tuple class PriceSetDB: def __init__(self, db_file: str, symbol: str) -> None: self.db_file = db_file self.symbol = symbol self.conn = None def connect_db(self) -> None: self.conn = sqlite3.connect(self.db_file) def disconnect_db(self) -> None: if self.conn is not None: self.conn.close() def create_table(self) -> None: query = """ CREATE TABLE IF NOT EXISTS price_data ( id INTEGER PRIMARY KEY AUTOINCREMENT, symbol TEXT, time TEXT, open REAL, high REAL, low REAL, close REAL, max_spread REAL ) """ with self.conn: self.conn.execute(query) def insert_data(self, data: Tuple) -> None: time = data[1] # タイムスタンプの列を取得 if self.data_exists(time): # すでにデータが存在する場合は、何もせずに戻る return query = """ INSERT INTO price_data ( symbol, time, open, high, low, close, max_spread ) VALUES (?, ?, ?, ?, ?, ?, ?) """ cursor = self.conn.cursor() cursor.execute(query, data) self.conn.commit() def get_last_minute(self) -> Tuple: query = "SELECT * FROM price_data WHERE symbol = ? ORDER BY time DESC LIMIT 1" cursor = self.conn.cursor() cursor.execute(query, (self.symbol,)) last_record = cursor.fetchone() if last_record: last_record = list(last_record) last_record[7] = float(last_record[7]) return tuple(last_record) if last_record else None def update_last_minute(self, data: Tuple) -> None: query = "UPDATE price_data SET high = ?, low = ?, close = ?, max_spread = ? WHERE id = ?" with self.conn: self.conn.execute(query, data) def get_bars(self, symbol: str, timeframe: int, count: int) -> None: bars = mt5.copy_rates_from_pos(symbol, timeframe, 0, count) for bar in bars: dt = datetime.fromtimestamp(bar[0]) time_str = dt.strftime("%Y-%m-%d %H:%M:%S") spread = float(bar[6]) data = (symbol, time_str, bar[1], bar[2], bar[3], bar[4], spread) self.insert_data(data) def initialize_mt5(self) -> bool: if not mt5.initialize(): print("Failed to initialize MT5") return False self.connect_db() self.create_table() self.get_bars(self.symbol, mt5.TIMEFRAME_M1, 1000) return True def data_exists(self, time: str) -> bool: query = "SELECT COUNT(*) FROM price_data WHERE symbol = ? AND time = ?" cursor = self.conn.cursor() cursor.execute(query, (self.symbol, time)) count = cursor.fetchone()[0] return count > 0 def get_last_minute_record(self) -> Tuple: query = "SELECT * FROM price_data WHERE symbol = ? ORDER BY time DESC LIMIT 1" cursor = self.conn.cursor() cursor.execute(query, (self.symbol,)) return cursor.fetchone() def insert_or_update_data(self, data: Tuple) -> None: last_record = self.get_last_minute_record() if last_record and last_record[2] == data[1]: # Check if the time matches the existing record query = """ UPDATE price_data SET open = ?, high = ?, low = ?, close = ?, max_spread = ? WHERE id = ? """ cursor = self.conn.cursor() cursor.execute(query, (data[2], data[3], data[4], data[5], data[6], last_record[0])) self.conn.commit() else: query = """ INSERT INTO price_data ( symbol, time, open, high, low, close, max_spread ) VALUES (?, ?, ?, ?, ?, ?, ?) """ cursor = self.conn.cursor() cursor.execute(query, data) self.conn.commit() def insert_or_replace_data(self, data: Tuple) -> None: query = """ INSERT OR REPLACE INTO price_data ( id, symbol, time, open, high, low, close, max_spread ) VALUES ( (SELECT id FROM price_data WHERE symbol = ? AND time = ?), ?, ?, ?, ?, ?, ?, ? ) """ cursor = self.conn.cursor() cursor.execute(query, (data[0], data[1], data[0], data[1], data[2], data[3], data[4], data[5], data[6])) self.conn.commit() def insert_or_ignore_data(self, data: Tuple) -> None: query = """ INSERT OR IGNORE INTO price_data ( symbol, time, open, high, low, close, max_spread ) VALUES (?, ?, ?, ?, ?, ?, ?) """ cursor = self.conn.cursor() cursor.execute(query, data) self.conn.commit() def update_data(self, data: Tuple) -> None: query = """ UPDATE price_data SET open = ?, high = ?, low = ?, close = ?, max_spread = ? WHERE symbol = ? AND time = ? """ cursor = self.conn.cursor() cursor.execute(query, (data[2], data[3], data[4], data[5], data[6], data[0], data[1])) self.conn.commit() def run(self) -> None: if not self.initialize_mt5(): return while True: last_bar = mt5.copy_rates_from_pos(self.symbol, mt5.TIMEFRAME_M1, 0, 1) last_bar_datetime = datetime.fromtimestamp(last_bar[0][0]) last_minute = last_bar_datetime.replace(second=0, microsecond=0) last_record = self.get_last_minute_record() if last_record is None or last_record[2] != last_minute.strftime("%Y-%m-%d %H:%M:%S"): tick = mt5.symbol_info_tick(self.symbol) time_str = last_minute.strftime("%Y-%m-%d %H:%M:%S") data = (self.symbol, time_str, tick.bid, tick.bid, tick.bid, tick.ask, 0.0) self.insert_or_update_data(data) sleep(1) self.disconnect_db() mt5.shutdown() if __name__ == "__main__": db_file = "price_data.db" symbol = "USDJPYm" price_set_db = PriceSetDB(db_file, symbol) price_set_db.run() |
このコードは、MetaTrader5 APIを使用して、外国為替市場の価格情報をリアルタイムに取得し、SQLiteデータベースに保存するPythonプログラムです。以下に、コードの詳細を解説します。
import
ステートメントで、MetaTrader5、sqlite3、datetime、sleep、typingモジュールをインポートしています。PriceSetDB
というクラスが定義されています。このクラスには、データベースの初期化、テーブルの作成、データの挿入、最後の1分間のデータの取得、データの更新、バーの取得、データの存在の確認、そして実行を行うためのメソッドが含まれています。PriceSetDB
クラスの__init__
メソッドは、db_file
とsymbol
という2つの引数を受け取り、それぞれデータベースファイルとシンボル名を設定します。conn
属性は、データベース接続用のSQLite3コネクションオブジェクトを格納するために使用されます。connect_db
メソッドは、SQLite3のconnect
関数を使用してデータベースに接続し、conn
属性にコネクションオブジェクトを割り当てます。disconnect_db
メソッドは、データベース接続を閉じます。create_table
メソッドは、データベース内にテーブルを作成するSQLクエリを実行します。insert_data
メソッドは、引数としてタプルデータを受け取り、データベースに挿入します。タプルは、シンボル名、時間、オープン価格、高値、安値、終値、最大スプレッドの順で並べられます。data_exists
メソッドを使用して、既存のレコードがある場合は挿入されません。get_last_minute
メソッドは、データベースから最後の1分間のデータを取得し、タプル形式で返します。symbol
属性とtime
属性が含まれます。time
属性は、フォーマットされた時間文字列です。最後のレコードがない場合は、None
が返されます。update_last_minute
メソッドは、引数としてタプルデータを受け取り、データベース内の最後のレコードを更新します。タプルは、更新する高値、低値、終値、最大スプレッド、そしてレコ- ードのIDです。
id
は主キーとして定義されているため、レコードの識別子として使用されます。 get_bars
メソッドは、指定されたシンボル名、時間枠、およびバーの数に基づいて、MetaTrader5 APIを使用して価格情報を取得し、データベースに挿入します。initialize_mt5
メソッドは、MetaTrader5 APIを初期化し、データベースに接続し、テーブルを作成し、最初の価格バーを取得します。data_exists
メソッドは、指定された時間のレコードが既にデータベースに存在するかどうかを確認します。run
メソッドは、initialize_mt5
メソッドを呼び出し、それ以降はループし、価格情報を取得し、データベースに挿入または更新します。1秒ごとにスリープします。__name__ == "__main__"
の条件分岐は、スクリプトが直接実行された場合にのみ、プログラムが実行されるようにします。db_file
とsymbol
を指定して、PriceSetDB
オブジェクトを作成し、run
メソッドを呼び出します。- このコードは、MetaTrader5 APIを使用してリアルタイムの価格情報を取得し、それをSQLiteデータベースに保存するPythonプログラムです。
technical_set_DB.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
import sqlite3 import time import numpy as np import talib from datetime import datetime DB_NAME = "price_data.db" TABLE_NAME = "price_data" # SQLite3 connection conn = sqlite3.connect(DB_NAME) c = conn.cursor() # Check if columns 'ma' and 'rsi' exist c.execute(f"PRAGMA table_info({TABLE_NAME})") columns = [col[1] for col in c.fetchall()] # Add new columns for MA and RSI if they don't exist if 'ma' not in columns: c.execute(f"ALTER TABLE {TABLE_NAME} ADD COLUMN ma REAL") if 'rsi' not in columns: c.execute(f"ALTER TABLE {TABLE_NAME} ADD COLUMN rsi REAL") conn.commit() def calculate_ma_rsi(close_prices): ma_values = talib.SMA(close_prices, timeperiod=14) rsi_values = talib.RSI(close_prices, timeperiod=14) return ma_values, rsi_values def update_ma_rsi_for_range(rows): close_prices = np.array([row[6] for row in rows], dtype=np.float64) ma_values, rsi_values = calculate_ma_rsi(close_prices) for index, row in enumerate(rows): if not np.isnan(ma_values[index]) and not np.isnan(rsi_values[index]): c.execute(f"UPDATE {TABLE_NAME} SET ma = ?, rsi = ? WHERE id = ?", (ma_values[index], rsi_values[index], row[0])) conn.commit() # Calculate MA and RSI for the past 1000 rows c.execute(f"SELECT * FROM {TABLE_NAME} ORDER BY id DESC LIMIT 1000") rows = c.fetchall()[::-1] # reverse the order of rows update_ma_rsi_for_range(rows) # Function to update MA and RSI for the latest row def update_latest_ma_rsi(): c.execute(f"SELECT * FROM {TABLE_NAME} ORDER BY id DESC LIMIT 14") rows = c.fetchall() close = np.array([row[6] for row in rows], dtype=np.float64) ma = talib.SMA(close, timeperiod=14)[-1] rsi = talib.RSI(close, timeperiod=14)[-1] latest_id = rows[-1][0] c.execute(f"UPDATE {TABLE_NAME} SET ma = ?, rsi = ? WHERE id = ?", (ma, rsi, latest_id)) conn.commit() # Update MA and RSI every second while True: time.sleep(1) # Get the last minute without seconds and microseconds last_minute = datetime.now().replace(second=0, microsecond=0) # Check if a record with the same minute already exists c.execute(f"SELECT * FROM {TABLE_NAME} WHERE time = ?", (last_minute.strftime("%Y-%m-%d %H:%M:%S"),)) existing_record = c.fetchone() if existing_record is None: # Fetch and save new data here (replace this with your data fetching and saving logic) # For example: save_new_data(last_minute) # Update the latest row with the new MA and RSI values update_latest_ma_rsi() |
このコードは、価格データをSQLiteデータベースから取得し、移動平均とRSIという2つのテクニカル指標を計算し、同じデータベースにそれらの値を保存するPythonプログラムです。以下に、コードの詳細を解説します。
import
ステートメントで、numpy、sqlite3、talib、datetime、sleepモジュールをインポートしています。- データベースファイル名と通貨ペア名を指定して、SQLite3データベースに接続します。
- 移動平均の期間とRSIの期間を設定します。
ALTER TABLE
クエリを使用して、price_data
テーブルにma
とrsi
カラムが存在しない場合は追加します。- 無限ループを開始し、
price_data
テーブルから価格データを取得します。取得した価格データをリストに変換し、Numpy配列に変換します。 - Numpyの
talib
ライブラリを使用して、移動平均とRSIを計算します。 price_data
テーブルに計算されたテクニカル指標を保存します。- 1秒待機し、ループを続けます。
commit
メソッドを使用して、変更をデータベースにコミットします。- データベース接続を閉じます。
このコードは、PythonとSQLite3を使用して、外国為替市場の価格情報から移動平均とRSIという2つのテクニカル指標を計算し、同じデータベースにそれらの値を保存するプログラムです。
app.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 |
from flask import Flask, render_template_string, jsonify import sqlite3 import pandas as pd import plotly.graph_objs as go app = Flask(__name__) @app.route("/data") def get_data(): symbol = "USDJPYm" conn = sqlite3.connect("price_data.db") cursor = conn.execute("SELECT time, open, high, low, close, max_spread, ma, rsi FROM price_data WHERE symbol=? ORDER BY time DESC LIMIT 1000", (symbol,)) data = cursor.fetchall()[::-1] time = [row[0] for row in data] open_prices = [row[1] for row in data] high_prices = [row[2] for row in data] low_prices = [row[3] for row in data] close_prices = [row[4] for row in data] spreads = [row[5] for row in data] ma_values = [row[6] for row in data] rsi_values = [row[7] for row in data] return jsonify(time=time, open=open_prices, high=high_prices, low=low_prices, close=close_prices, max_spread=spreads, ma=ma_values, rsi=rsi_values) @app.route("/") def index(): symbol = "USDJPYm" return render_template_string("""<html> <head> <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/4.5.2/css/bootstrap.min.css"> <script src="https://cdn.plot.ly/plotly-latest.min.js"></script> <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script> </head> <body> <div class="container"> <h1>{{ symbol }} - Latest Data</h1> <div class="row"> <div class="col-md-6"> <div id="chart"></div> </div> <div class="col-md-6"> <div id="rsi_chart"></div> </div> </div> <div class="row"> <div class="col-md-6"> <div id="spread_chart"></div> </div> <div class="col-md-6"> <!-- ここに追加のチャートを表示できます。 --> </div> </div> </div> <script> var config = {responsive: true}; function updateChart() { $.getJSON("/data", function (data) { var candlestick_trace = { x: data.time, open: data.open, high: data.high, low: data.low, close: data.close, type: 'candlestick' }; var ma_trace = { x: data.time, y: data.ma, type: 'scatter', mode: 'lines', name: 'MA', line: {color: 'blue'} }; var candlestick_layout = { title: "{{ symbol }} - Latest 1000 Data Points (Candlestick)", xaxis: {rangeslider: {visible: false}}, yaxis: {side: 'right'} }; var chartDiv = document.getElementById('chart'); if (typeof chartDiv.data === 'undefined') { Plotly.newPlot('chart', [candlestick_trace, ma_trace], candlestick_layout, config); } else { var xaxisRange = chartDiv.layout.xaxis.range; candlestick_layout.xaxis.range = xaxisRange; Plotly.update('chart', {x: [candlestick_trace.x, ma_trace.x], open: [candlestick_trace.open], high: [candlestick_trace.high], low: [candlestick_trace.low], close: [candlestick_trace.close], y: [ma_trace.y]}, candlestick_layout); } var rsi_trace = { x: data.time, y: data.rsi, type: 'scatter', mode: 'lines', name: 'RSI', line: {color: 'red'} }; var rsi_layout = { title: "{{ symbol }} - RSI", xaxis: {rangeslider: {visible: false}}, yaxis: {side: 'left', title: 'RSI', range: [0, 100]} }; var rsi_chartDiv = document.getElementById('rsi_chart'); if (typeof rsi_chartDiv.data === 'undefined') { Plotly.newPlot('rsi_chart', [rsi_trace], rsi_layout, config); } else { var rsi_xaxisRange = rsi_chartDiv.layout.xaxis.range; rsi_layout.xaxis.range = rsi_xaxisRange; Plotly.update('rsi_chart', {x: [rsi_trace.x], y: [rsi_trace.y]}, rsi_layout); } var spread_trace = { x: data.time, y: data.max_spread, type: 'scatter', mode: 'lines', name: 'Spread', yaxis: 'y2' }; var spread_layout = { title: "{{ symbol }} - Spread", xaxis: {rangeslider: {visible: false}}, yaxis: {side: 'left', title: 'Price'}, yaxis2: {side: 'right', title: 'Spread', overlaying: 'y'} }; var spread_chartDiv = document.getElementById('spread_chart'); if (typeof spread_chartDiv.data === 'undefined') { Plotly.newPlot('spread_chart', [spread_trace], spread_layout, config); } else { var spread_xaxisRange = spread_chartDiv.layout.xaxis.range; spread_layout.xaxis.range = spread_xaxisRange; Plotly.update('spread_chart', {x: [spread_trace.x],y: [spread_trace.y]}, spread_layout); } }); } // チャートを1秒ごとに更新する $(document).ready(function() { updateChart(); setInterval(updateChart, 1000); }); </script> </body> </html> """, symbol=symbol) if __name__ == "__main__": app.run(debug=True) |
このコードは、FlaskとPlotlyを使用して、FXのリアルタイム価格とテクニカル指標を表示するWebアプリケーションを作成するためのものです。
アプリケーションは2つのエンドポイントがあります。 /
は、リアルタイムチャートを表示するために使用されます。 /data
は、最新の価格データを返すために使用されます。
get_data
関数は、データベースから最新の1000レコードを取得して、JSON形式で返します。 取得されるデータには、時間、始値、高値、安値、終値、スプレッド、移動平均線、RSIなどが含まれます。
index
関数は、リアルタイムチャートを描画するためにJinja2を使用して、テンプレートをレンダリングします。テンプレートは、Bootstrapを使用して、チャートを表示するためのコンテナーを作成し、Plotlyを使用してチャートを描画します。アプリケーションは、価格、移動平均線、RSI、スプレッドなどを表示するために、複数のチャートを使用しています。リアルタイムデータを取得し、チャートを更新するために、JavaScriptとjQueryを使用します。
アプリケーションは、 app.run()
関数を呼び出すことで実行されます。デバッグモードで実行されるため、エラーが発生した場合にはスタックトレースが表示されます。
main.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import subprocess import time if __name__ == "__main__": # price_set_DB.pyを実行 p1 = subprocess.Popen(["python", "mt5_price_set_DB.py"]) # プライスを一定時間取得するために待機 time.sleep(5) # technical_set_DB.pyを実行 p2 = subprocess.Popen(["python", "technical_set_DB.py"]) time.sleep(5) # app.pyを実行 p3 = subprocess.Popen(["python", "app.py"]) # price_set_DB.pyが終了するまで待機 p1.wait() # technical_set_DB.pyが終了するまで待機 p2.wait() # app.pyを終了 p3.terminate() |
このコードは3つのPythonスクリプトをサブプロセスとして実行するためのものです。
最初に、subprocess
モジュールをインポートします。subprocess
モジュールには、別のプロセスを起動して、そのプロセスの出力を受け取る機能があります。
if __name__ == "__main__":
ブロックは、このスクリプトが直接実行された場合にのみ、以下のコードが実行されるようにします。
Popen
メソッドを使って、3つのPythonスクリプトをそれぞれ別々のサブプロセスとして起動します。これにより、それらのスクリプトは同時に実行されます。3つのスクリプトをそれぞれ異なる変数p1
、p2
、p3
に代入します。
wait()
メソッドは、サブプロセスが完了するまで、親プロセスを待機させます。つまり、p1.wait()
は、price_set_DB.pyが終了するまで、プログラムの実行をブロックします。
同様に、p2.wait()
は、technical_set_DB.pyが終了するまで、プログラムの実行をブロックします。
最後に、p3.terminate()
は、app.pyの実行を終了します。これにより、Webアプリケーションが停止します。
実行する
main.pyを実行するとブラウザでチャートを描画してくれます
BybitがGoogleのIPアドレス規制をしているためです。国内のVPSなら使…
自分のbotで使ってるAPIキーを使用しているんですが、 You have br…
pybit 最新版にコードを変更しました。コードとrequirements.tx…
お返事ありがとうございます。はい。pybit==2.3.0になっております。
コードはあっていると思います。rewuirements.txtは「pybit==…