How to store data
Data storage tutorial
Some indicators need to refer to previous data as a reference when processing logic, such as the time of last processing, the result of last data processing, etc. Here we need to store the data in the process or the result data for subsequent use.
Currently, DB-level data storage is not supported, and support for Redis-level data storage is currently provided.
Set up a Redis connection
When testing and developing locally, set REDIS_URL = env.str('REDIS_URL', 'redis://127.0.0.1:6379/0')
. in the crawlers/config.py
file to the local RUL to use.
Redis usage
Some commonly used Redis operations are defined in the uitls/redis_conn.py
file of the project library.
At present, only three methods are provided externally, and only three methods are supported, as shown in the following code:
import logging
import redis
import time
from crawlers.config import REDIS_URL
from urllib.parse import urlparse
_REDIS_URL = urlparse(REDIS_URL)
pool = redis.connection.ConnectionPool(
host=_REDIS_URL.hostname,
port=_REDIS_URL.port,
db=_REDIS_URL.path[1:],
)
_redis_client = redis.Redis(connection_pool=pool)
class rds:
@classmethod
def getex(cls, prefix, name):
"""
Return the value at key ``prefix + ':' + name``, or None if the key doesn't exist
prefix: The prefix parameter indicates the name value of the current crawler
name: Customize the name value related to the current business
"""
key = prefix + ':' + name
if len(key.encode()) > 1024:
logging.warning('Key is too large')
return None
value = _redis_client.get(key)
if value:
return str(value, encoding="utf-8")
return value
@classmethod
def setex(cls, prefix, name: str, value: str, ttl):
"""
Return the value at key ``prefix + ':' + name``, or None if the key doesn't exist
prefix: The prefix parameter indicates the name value of the current crawler
name: Customize the name value related to the current business. The key string size cannot exceed 1KB
value: The stored value cannot exceed 128 KB
ttl: Expiration time must be set, and value must be taken according to business requirements
"""
key = prefix + ':' + name
# Size limit, value cannot exceed 128 KB, key cannot exceed 1 KB
if len(value.encode()) > 1024 * 128 or len(key.encode()) > 1024:
logging.warning('Key or Value is too large')
return False
if _redis_client.get(key) is not None:
return False
if ttl:
_redis_client.set(key, value, ex=ttl)
else:
_redis_client.set(key, value)
return True
@classmethod
def get_and_set_key(cls, prefix, name: str, value: str, ttl: int = None):
"""
Return the value at key ``prefix + ':' + name``, or True if the key exist
prefix: The prefix parameter indicates the name value of the current crawler
name: Customize the name value related to the current business. The key string size cannot exceed 1KB
value: The stored value cannot exceed 128 KB
ttl: Expiration time must be set, and value must be taken according to business requirements
"""
key = prefix + ':' + name
# Size limit, value cannot exceed 128 KB, key cannot exceed 1 KB
if len(value.encode()) > 1024 * 128 or len(key.encode()) > 1024:
logging.warning('Key or Value is too large')
return False
if _redis_client.get(name):
return True
_redis_client.set(name, value)
if ttl:
_redis_client.expire(name, ttl)
@classmethod
def thing_lock(cls, name, expiration_time=2, time_out=3):
"""
code pessimistic locking
Function: Avoid simultaneous execution of functions, resulting in unpredictable problems
"""
def outer_func(func):
def wrapper_func(*args, **kwargs):
lock_name = f'lock:{name}'
end_time = time.time() + time_out
while time.time() < end_time:
if _redis_client.setnx(lock_name, expiration_time):
_redis_client.expire(lock_name, expiration_time)
data = func(*args, **kwargs)
_redis_client.delete(lock_name)
return data
time.sleep(0.001)
return func(*args, **kwargs)
return wrapper_func
return outer_func
Todo:
The prefix field is the prefix of the Redis key and is taken from the name value of the spider.
Demo
Set value:
import json
import scrapy
import time
import datetime
from crawlers.utils import SpiderBase, rds
from crawlers.utils.group_alarm import catch_except
from jinja2 import Template
class BtcArh999Spider(SpiderBase):
name = 'idx-btc-arh999'
url = 'https://fapi.coinglass.com/api/index/ahr999'
def start_requests(self):
yield scrapy.Request(url=self.url)
@catch_except
def parse(self, response, **kwargs):
data = response.json()['data']
params = {
'arh_999': round(data[-1]['ahr999'], 2),
'btc_price': data[-1]['value'],
'change': round(((float(data[-1]['value']) - float(data[-2]['value'])) / float(data[-2]['value'])) * 100, 2)
}
today_start_time = str(int(time.mktime(time.strptime(str(datetime.date.today()), '%Y-%m-%d'))) + 1)
value = rds.get(self.name, today_start_time)
if value is not None:
return
rds.set(self.name, today_start_time, json.dumps(params), 60 * 60 * 24 * 2)
print(Template(self.alert_en_template()).render(params))
print(Template(self.alert_cn_template()).render(params))
# must be declare
def alert_en_template(self):
return """The current BTC ahr999 (AHR Index) is {{arh_999}}. This spot is theoretically unsuitable for bottom fishing or long-term fixed investment. The current price of BTC is {{btc_price}}, and 24H change is {{change}}. (The above content does not constitute investment advice and is for your reference only. Invest at your own risk.)
"""
# must be declare
def alert_cn_template(self):
return """当前 BTC ahr999(九神指数)为 {{arh_999}},理论上不宜买入抄底或定投 BTC。当前 BTC 现价 {{btc_price}},24小时涨跌幅为 {{change}}。(以上内容仅供参考,非投资建议,风险自担。)
"""
Get value:
import json
import scrapy
import time
import datetime
from crawlers.utils import SpiderBase, rds
from crawlers.utils.group_alarm import catch_except
from jinja2 import Template
class BtcArh999Spider(SpiderBase):
name = 'idx-btc-arh999'
url = 'https://fapi.coinglass.com/api/index/ahr999'
def start_requests(self):
yield scrapy.Request(url=self.url)
@catch_except
def parse(self, response, **kwargs):
data = response.json()['data']
params = {
'arh_999': round(data[-1]['ahr999'], 2),
'btc_price': data[-1]['value'],
'change': round(((float(data[-1]['value']) - float(data[-2]['value'])) / float(data[-2]['value'])) * 100, 2)
}
today_start_time = str(int(time.mktime(time.strptime(str(datetime.date.today()), '%Y-%m-%d'))) + 1)
# redis get method
value = rds.get(self.name, today_start_time)
if value is not None:
return
# redis set method
rds.set(self.name, today_start_time, json.dumps(params), 60 * 60 * 24 * 2)
print(Template(self.alert_en_template()).render(params))
print(Template(self.alert_cn_template()).render(params))
# must be declare
def alert_en_template(self):
return """The current BTC ahr999 (AHR Index) is {{arh_999}}. This spot is theoretically unsuitable for bottom fishing or long-term fixed investment. The current price of BTC is {{btc_price}}, and 24H change is {{change}}. (The above content does not constitute investment advice and is for your reference only. Invest at your own risk.)
"""
# must be declare
def alert_cn_template(self):
return """当前 BTC ahr999(九神指数)为 {{arh_999}},理论上不宜买入抄底或定投 BTC。当前 BTC 现价 {{btc_price}},24小时涨跌幅为 {{change}}。(以上内容仅供参考,非投资建议,风险自担。)
"""
Last updated