用python-tcptest 对Elasticsearch进行测试
昨天 python-tcptest 的版本升级了,现在也可用于 Python3 系列。(虽然是我自己提交的Pull Request)
为了纪念这个,我将介绍如何使用tcptest在Python中测试使用Elasticsearch的代码。
“tcptest是什么?”
-
- 寻找可用的端口,
-
- 使用该端口启动指定的服务器软件,
- 测试完成后关闭该服务器软件。
我认为以下三点是令人感激的。
-
- サーバーソフトの実物を使うのでモックと実物の違いを気にしなくていい
-
- DBにテスト用以外のデータが混じってて間違えて一緒に消しちゃったみたいなミスがなくなる
- わざわざテスト用に自分でサーバーソフトを立てたり落としたりしなくていい (人手の温もりがない)
如何安装python-tcptest
$ pip install tcptest
Python-tcptest的过程是怎样的?
一开始
-
- TestServer.__init__: 设置超时等参数(如果启动需要时间,则可以适当延长)
TestServer._before_start: 在此处编写服务器启动前需要执行的操作(例如创建临时目录)(默认情况下不执行任何操作)
TestServer.start: 寻找可用端口并执行 TestServer.build_command 方法的命令。
TestServer._after_start: 在此处编写服务器启动后需要执行的操作(例如进行初始设置)(默认情况下不执行任何操作)
终止时刻
-
- TestServer._before_stop: 在这里写下在服务器停止之前需要处理的事情(默认情况下不做任何处理)
TestServer.stop: 关闭服务器。
TestServer._after_start: 在这里写下在服务器启动后需要处理的事情(例如删除临时目录)(默认情况下不做任何处理)。
让我们为Elasticsearch进行编写尝试。
python-tcptest 包含了用于测试Redis、Fluentd和memcached的测试类。由于这次想要测试Elasticsearch,所以我打算自己创建一个类来测试。
import os
import shutil
import tempfile
import tcptest
class ESTestServer(tcptest.TestServer):
def build_command(self):
return ('elasticsearch',
'-Des.network.bind_host=127.0.0.1',
'-Des.network.host=127.0.0.1',
'-Des.http.port=%s' % self.port,
"-Des.node.master=true",
"-Des.node.local=true",
'-Des.path.data=%s' % self.data_dir,
'-Des.path.logs=%s' % self.logs_dir
)
def _before_start(self):
self.data_dir = tempfile.mkdtemp(prefix='esdata')
self.logs_dir = tempfile.mkdtemp(prefix='eslogs')
def _after_stop(self):
for path in filter(os.path.exists, (self.data_dir, self.logs_dir)):
shutil.rmtree(path)
只需要继承tcptest.TestServer类并重写三个方法即可!
确认使用代码
我会根据elasticsearch-py的示例进行确认。
from datetime import datetime
import os
from elasticsearch import Elasticsearch
with ESTestServer(timeout=30) as server:
es = Elasticsearch(['localhost:%s' % server.port])
print(es.transport.hosts)
# from https://elasticsearch-py.readthedocs.org/en/master/#example-usage
doc = {
'author': 'kimchy',
'text': 'Elasticsearch: cool. bonsai cool.',
'timestamp': datetime.now(),
}
res = es.index(index="test-index", doc_type='tweet', id=1, body=doc)
print(res['created'])
res = es.get(index="test-index", doc_type='tweet', id=1)
print(res['_source'])
es.indices.refresh(index="test-index")
res = es.search(index="test-index", body={"query": {"match_all": {}}})
print("Got %d Hits:" % res['hits']['total'])
for hit in res['hits']['hits']:
print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])
print('data_dir contains:', os.listdir(server.data_dir))
print('logs_dir contains:', os.listdir(server.logs_dir))
data_dir = server.data_dir
logs_dir = server.logs_dir
print('data_dir exists:', os.path.exists(data_dir))
print('logs_dir exists:', os.path.exists(logs_dir))
执行上述代码将会得到如下结果↓
[{'port': 55003, 'host': 'localhost'}]
True
{'timestamp': '2015-10-30T14:10:16.703556', 'author': 'kimchy', 'text': 'Elasticsearch: cool. bonsai cool.'}
Got 1 Hits:
2015-10-30T14:10:16.703556 kimchy: Elasticsearch: cool. bonsai cool.
data_dir contains: ['elasticsearch_brew']
logs_dir contains: ['elasticsearch_brew.log', 'elasticsearch_brew_index_indexing_slowlog.log', 'elasticsearch_brew_index_search_slowlog.log']
data_dir exists: False
logs_dir exists: False
在我不太清楚的空闲端口上启动Elasticsearch,然后随便创建一个数据目录和日志目录,确保它能正常运行,最后我发现结束后都清理得很干净。
这就是以上所述的。